From 5a51ee8c30be233b92ba8f0ad36662150f61ac97 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 1 Feb 2026 21:54:34 -0600 Subject: [PATCH] feat(#176): Integrate M4.2 infrastructure with M4.1 coordinator Add CoordinatorIntegrationModule providing REST API endpoints for the Python coordinator to communicate with the NestJS API infrastructure: - POST /coordinator/jobs - Create job from coordinator webhook events - PATCH /coordinator/jobs/:id/status - Update job status (PENDING -> RUNNING) - PATCH /coordinator/jobs/:id/progress - Update job progress percentage - POST /coordinator/jobs/:id/complete - Mark job complete with results - POST /coordinator/jobs/:id/fail - Mark job failed with gate results - GET /coordinator/jobs/:id - Get job details with events and steps - GET /coordinator/health - Integration health check Integration features: - Job creation dispatches to BullMQ queues - Status updates emit JobEvents for audit logging - Completion/failure events broadcast via Herald to Discord - Status transition validation (PENDING -> QUEUED -> RUNNING -> COMPLETED/FAILED) - Health check includes BullMQ connection status and queue counts Also adds JOB_PROGRESS event type to event-types.ts for progress tracking. Fixes #176 Co-Authored-By: Claude Opus 4.5 --- apps/api/src/app.module.ts | 2 + ...coordinator-integration.controller.spec.ts | 184 +++++++++ .../coordinator-integration.controller.ts | 97 +++++ .../coordinator-integration.module.ts | 27 ++ .../coordinator-integration.service.spec.ts | 310 +++++++++++++++ .../coordinator-integration.service.ts | 372 ++++++++++++++++++ .../dto/complete-job.dto.ts | 20 + .../dto/create-coordinator-job.dto.ts | 28 ++ .../dto/fail-job.dto.ts | 22 ++ .../src/coordinator-integration/dto/index.ts | 5 + .../dto/update-job-progress.dto.ts | 19 + .../dto/update-job-status.dto.ts | 25 ++ apps/api/src/coordinator-integration/index.ts | 5 + .../interfaces/coordinator-job.interface.ts | 41 ++ .../interfaces/index.ts | 1 + apps/api/src/job-events/event-types.ts | 2 + .../176-coordinator-integration.md | 102 +++++ 17 files changed, 1262 insertions(+) create mode 100644 apps/api/src/coordinator-integration/coordinator-integration.controller.spec.ts create mode 100644 apps/api/src/coordinator-integration/coordinator-integration.controller.ts create mode 100644 apps/api/src/coordinator-integration/coordinator-integration.module.ts create mode 100644 apps/api/src/coordinator-integration/coordinator-integration.service.spec.ts create mode 100644 apps/api/src/coordinator-integration/coordinator-integration.service.ts create mode 100644 apps/api/src/coordinator-integration/dto/complete-job.dto.ts create mode 100644 apps/api/src/coordinator-integration/dto/create-coordinator-job.dto.ts create mode 100644 apps/api/src/coordinator-integration/dto/fail-job.dto.ts create mode 100644 apps/api/src/coordinator-integration/dto/index.ts create mode 100644 apps/api/src/coordinator-integration/dto/update-job-progress.dto.ts create mode 100644 apps/api/src/coordinator-integration/dto/update-job-status.dto.ts create mode 100644 apps/api/src/coordinator-integration/index.ts create mode 100644 apps/api/src/coordinator-integration/interfaces/coordinator-job.interface.ts create mode 100644 apps/api/src/coordinator-integration/interfaces/index.ts create mode 100644 docs/scratchpads/176-coordinator-integration.md diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index 9ac57c5..f5fdc50 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -27,6 +27,7 @@ import { TelemetryModule, TelemetryInterceptor } from "./telemetry"; import { RunnerJobsModule } from "./runner-jobs/runner-jobs.module"; import { JobEventsModule } from "./job-events/job-events.module"; import { JobStepsModule } from "./job-steps/job-steps.module"; +import { CoordinatorIntegrationModule } from "./coordinator-integration/coordinator-integration.module"; @Module({ imports: [ @@ -55,6 +56,7 @@ import { JobStepsModule } from "./job-steps/job-steps.module"; RunnerJobsModule, JobEventsModule, JobStepsModule, + CoordinatorIntegrationModule, ], controllers: [AppController], providers: [ diff --git a/apps/api/src/coordinator-integration/coordinator-integration.controller.spec.ts b/apps/api/src/coordinator-integration/coordinator-integration.controller.spec.ts new file mode 100644 index 0000000..12cd87c --- /dev/null +++ b/apps/api/src/coordinator-integration/coordinator-integration.controller.spec.ts @@ -0,0 +1,184 @@ +import { describe, it, expect, beforeEach, vi } from "vitest"; +import { Test, TestingModule } from "@nestjs/testing"; +import { RunnerJobStatus } from "@prisma/client"; +import { CoordinatorIntegrationController } from "./coordinator-integration.controller"; +import { CoordinatorIntegrationService } from "./coordinator-integration.service"; +import type { CoordinatorJobResult, CoordinatorHealthStatus } from "./interfaces"; +import { CoordinatorJobStatus } from "./dto"; + +describe("CoordinatorIntegrationController", () => { + let controller: CoordinatorIntegrationController; + + const mockJobResult: CoordinatorJobResult = { + jobId: "job-123", + status: "PENDING", + queueName: "mosaic:main", + }; + + const mockJob = { + id: "job-123", + workspaceId: "workspace-123", + type: "code-task", + status: RunnerJobStatus.PENDING, + priority: 10, + progressPercent: 0, + agentTaskId: null, + result: null, + error: null, + startedAt: null, + completedAt: null, + createdAt: new Date(), + updatedAt: new Date(), + }; + + const mockHealthStatus: CoordinatorHealthStatus = { + api: true, + bullmq: { + connected: true, + queues: { main: 5, runner: 2 }, + }, + timestamp: new Date(), + }; + + const mockService = { + createJob: vi.fn(), + updateJobStatus: vi.fn(), + updateJobProgress: vi.fn(), + completeJob: vi.fn(), + failJob: vi.fn(), + getJobDetails: vi.fn(), + getIntegrationHealth: vi.fn(), + }; + + beforeEach(async () => { + vi.clearAllMocks(); + + const module: TestingModule = await Test.createTestingModule({ + controllers: [CoordinatorIntegrationController], + providers: [{ provide: CoordinatorIntegrationService, useValue: mockService }], + }).compile(); + + controller = module.get(CoordinatorIntegrationController); + }); + + describe("POST /coordinator/jobs", () => { + it("should create a job and return job result", async () => { + const dto = { + workspaceId: "workspace-123", + type: "code-task", + issueNumber: 42, + repository: "mosaic/stack", + }; + + mockService.createJob.mockResolvedValue(mockJobResult); + + const result = await controller.createJob(dto); + + expect(result).toEqual(mockJobResult); + expect(mockService.createJob).toHaveBeenCalledWith(dto); + }); + }); + + describe("PATCH /coordinator/jobs/:id/status", () => { + it("should update job status", async () => { + const updatedJob = { ...mockJob, status: RunnerJobStatus.RUNNING }; + mockService.updateJobStatus.mockResolvedValue(updatedJob); + + const result = await controller.updateJobStatus("job-123", { + status: CoordinatorJobStatus.RUNNING, + agentId: "agent-42", + }); + + expect(result.status).toBe(RunnerJobStatus.RUNNING); + expect(mockService.updateJobStatus).toHaveBeenCalledWith("job-123", { + status: CoordinatorJobStatus.RUNNING, + agentId: "agent-42", + }); + }); + }); + + describe("PATCH /coordinator/jobs/:id/progress", () => { + it("should update job progress", async () => { + const updatedJob = { ...mockJob, progressPercent: 50 }; + mockService.updateJobProgress.mockResolvedValue(updatedJob); + + const result = await controller.updateJobProgress("job-123", { + progressPercent: 50, + currentStep: "Running tests", + }); + + expect(result.progressPercent).toBe(50); + expect(mockService.updateJobProgress).toHaveBeenCalledWith("job-123", { + progressPercent: 50, + currentStep: "Running tests", + }); + }); + }); + + describe("POST /coordinator/jobs/:id/complete", () => { + it("should complete a job", async () => { + const completedJob = { + ...mockJob, + status: RunnerJobStatus.COMPLETED, + progressPercent: 100, + }; + mockService.completeJob.mockResolvedValue(completedJob); + + const result = await controller.completeJob("job-123", { + result: { commitSha: "abc123" }, + }); + + expect(result.status).toBe(RunnerJobStatus.COMPLETED); + expect(mockService.completeJob).toHaveBeenCalledWith("job-123", { + result: { commitSha: "abc123" }, + }); + }); + }); + + describe("POST /coordinator/jobs/:id/fail", () => { + it("should fail a job", async () => { + const failedJob = { + ...mockJob, + status: RunnerJobStatus.FAILED, + error: "Test failed", + }; + mockService.failJob.mockResolvedValue(failedJob); + + const result = await controller.failJob("job-123", { + error: "Test failed", + gateResults: { lint: true, test: false }, + }); + + expect(result.status).toBe(RunnerJobStatus.FAILED); + expect(result.error).toBe("Test failed"); + expect(mockService.failJob).toHaveBeenCalledWith("job-123", { + error: "Test failed", + gateResults: { lint: true, test: false }, + }); + }); + }); + + describe("GET /coordinator/jobs/:id", () => { + it("should return job details", async () => { + const jobWithDetails = { ...mockJob, steps: [], events: [] }; + mockService.getJobDetails.mockResolvedValue(jobWithDetails); + + const result = await controller.getJobDetails("job-123"); + + expect(result).toEqual(jobWithDetails); + expect(mockService.getJobDetails).toHaveBeenCalledWith("job-123"); + }); + }); + + describe("GET /coordinator/health", () => { + it("should return integration health status", async () => { + mockService.getIntegrationHealth.mockResolvedValue(mockHealthStatus); + + const result = await controller.getHealth(); + + expect(result.api).toBe(true); + expect(result.bullmq.connected).toBe(true); + expect(mockService.getIntegrationHealth).toHaveBeenCalled(); + }); + }); +}); diff --git a/apps/api/src/coordinator-integration/coordinator-integration.controller.ts b/apps/api/src/coordinator-integration/coordinator-integration.controller.ts new file mode 100644 index 0000000..393fa3e --- /dev/null +++ b/apps/api/src/coordinator-integration/coordinator-integration.controller.ts @@ -0,0 +1,97 @@ +import { Controller, Post, Patch, Get, Body, Param } from "@nestjs/common"; +import { CoordinatorIntegrationService } from "./coordinator-integration.service"; +import { + CreateCoordinatorJobDto, + UpdateJobStatusDto, + UpdateJobProgressDto, + CompleteJobDto, + FailJobDto, +} from "./dto"; +import type { CoordinatorJobResult, CoordinatorHealthStatus } from "./interfaces"; + +/** + * CoordinatorIntegrationController - REST API for Python coordinator communication + * + * Endpoints: + * - POST /coordinator/jobs - Create a job from coordinator + * - PATCH /coordinator/jobs/:id/status - Update job status + * - PATCH /coordinator/jobs/:id/progress - Update job progress + * - POST /coordinator/jobs/:id/complete - Mark job as complete + * - POST /coordinator/jobs/:id/fail - Mark job as failed + * - GET /coordinator/jobs/:id - Get job details + * - GET /coordinator/health - Integration health check + */ +@Controller("coordinator") +export class CoordinatorIntegrationController { + constructor(private readonly service: CoordinatorIntegrationService) {} + + /** + * Create a job from the coordinator + */ + @Post("jobs") + async createJob(@Body() dto: CreateCoordinatorJobDto): Promise { + return this.service.createJob(dto); + } + + /** + * Update job status from the coordinator + */ + @Patch("jobs/:id/status") + async updateJobStatus( + @Param("id") id: string, + @Body() dto: UpdateJobStatusDto + ): Promise>> { + return this.service.updateJobStatus(id, dto); + } + + /** + * Update job progress from the coordinator + */ + @Patch("jobs/:id/progress") + async updateJobProgress( + @Param("id") id: string, + @Body() dto: UpdateJobProgressDto + ): Promise>> { + return this.service.updateJobProgress(id, dto); + } + + /** + * Mark job as complete from the coordinator + */ + @Post("jobs/:id/complete") + async completeJob( + @Param("id") id: string, + @Body() dto: CompleteJobDto + ): Promise>> { + return this.service.completeJob(id, dto); + } + + /** + * Mark job as failed from the coordinator + */ + @Post("jobs/:id/fail") + async failJob( + @Param("id") id: string, + @Body() dto: FailJobDto + ): Promise>> { + return this.service.failJob(id, dto); + } + + /** + * Get job details with events and steps + */ + @Get("jobs/:id") + async getJobDetails( + @Param("id") id: string + ): Promise>> { + return this.service.getJobDetails(id); + } + + /** + * Integration health check + */ + @Get("health") + async getHealth(): Promise { + return this.service.getIntegrationHealth(); + } +} diff --git a/apps/api/src/coordinator-integration/coordinator-integration.module.ts b/apps/api/src/coordinator-integration/coordinator-integration.module.ts new file mode 100644 index 0000000..e2615c6 --- /dev/null +++ b/apps/api/src/coordinator-integration/coordinator-integration.module.ts @@ -0,0 +1,27 @@ +import { Module } from "@nestjs/common"; +import { CoordinatorIntegrationController } from "./coordinator-integration.controller"; +import { CoordinatorIntegrationService } from "./coordinator-integration.service"; +import { PrismaModule } from "../prisma/prisma.module"; +import { BullMqModule } from "../bullmq/bullmq.module"; +import { JobEventsModule } from "../job-events/job-events.module"; +import { HeraldModule } from "../herald/herald.module"; + +/** + * CoordinatorIntegrationModule - Bridge between Python coordinator and NestJS API + * + * Provides REST endpoints for the M4.1 coordinator (Python FastAPI) to + * communicate with the M4.2 infrastructure (NestJS). + * + * Key integration points: + * - Job creation from coordinator webhook events + * - Job status updates during processing + * - Job completion and failure handling + * - Event bridging to Herald for Discord notifications + */ +@Module({ + imports: [PrismaModule, BullMqModule, JobEventsModule, HeraldModule], + controllers: [CoordinatorIntegrationController], + providers: [CoordinatorIntegrationService], + exports: [CoordinatorIntegrationService], +}) +export class CoordinatorIntegrationModule {} diff --git a/apps/api/src/coordinator-integration/coordinator-integration.service.spec.ts b/apps/api/src/coordinator-integration/coordinator-integration.service.spec.ts new file mode 100644 index 0000000..8b206bd --- /dev/null +++ b/apps/api/src/coordinator-integration/coordinator-integration.service.spec.ts @@ -0,0 +1,310 @@ +import { describe, it, expect, beforeEach, vi } from "vitest"; +import { Test, TestingModule } from "@nestjs/testing"; +import { NotFoundException, BadRequestException } from "@nestjs/common"; +import { RunnerJobStatus } from "@prisma/client"; +import { CoordinatorIntegrationService } from "./coordinator-integration.service"; +import { PrismaService } from "../prisma/prisma.service"; +import { JobEventsService } from "../job-events/job-events.service"; +import { HeraldService } from "../herald/herald.service"; +import { BullMqService } from "../bullmq/bullmq.service"; + +describe("CoordinatorIntegrationService", () => { + let service: CoordinatorIntegrationService; + let prismaService: PrismaService; + let jobEventsService: JobEventsService; + let heraldService: HeraldService; + let bullMqService: BullMqService; + + const mockWorkspace = { + id: "workspace-123", + name: "Test Workspace", + slug: "test-workspace", + settings: {}, + createdAt: new Date(), + updatedAt: new Date(), + }; + + const mockJob = { + id: "job-123", + workspaceId: "workspace-123", + type: "code-task", + status: RunnerJobStatus.PENDING, + priority: 10, + progressPercent: 0, + agentTaskId: null, + result: null, + error: null, + startedAt: null, + completedAt: null, + createdAt: new Date(), + updatedAt: new Date(), + }; + + const mockEvent = { + id: "event-123", + jobId: "job-123", + stepId: null, + type: "job.created", + timestamp: new Date(), + actor: "coordinator", + payload: {}, + }; + + const mockPrismaService = { + workspace: { + findUnique: vi.fn(), + }, + runnerJob: { + create: vi.fn(), + findUnique: vi.fn(), + update: vi.fn(), + }, + }; + + const mockJobEventsService = { + emitEvent: vi.fn(), + emitJobCreated: vi.fn(), + emitJobStarted: vi.fn(), + emitJobCompleted: vi.fn(), + emitJobFailed: vi.fn(), + }; + + const mockHeraldService = { + broadcastJobEvent: vi.fn(), + }; + + const mockBullMqService = { + addJob: vi.fn(), + healthCheck: vi.fn(), + getHealthStatus: vi.fn(), + }; + + beforeEach(async () => { + vi.clearAllMocks(); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + CoordinatorIntegrationService, + { provide: PrismaService, useValue: mockPrismaService }, + { provide: JobEventsService, useValue: mockJobEventsService }, + { provide: HeraldService, useValue: mockHeraldService }, + { provide: BullMqService, useValue: mockBullMqService }, + ], + }).compile(); + + service = module.get(CoordinatorIntegrationService); + prismaService = module.get(PrismaService); + jobEventsService = module.get(JobEventsService); + heraldService = module.get(HeraldService); + bullMqService = module.get(BullMqService); + }); + + describe("createJob", () => { + it("should create a job and add it to the queue", async () => { + const dto = { + workspaceId: "workspace-123", + type: "code-task", + issueNumber: 42, + repository: "mosaic/stack", + priority: 10, + metadata: { assignedAgent: "sonnet" }, + }; + + mockPrismaService.workspace.findUnique.mockResolvedValue(mockWorkspace); + mockPrismaService.runnerJob.create.mockResolvedValue(mockJob); + mockJobEventsService.emitJobCreated.mockResolvedValue(mockEvent); + mockBullMqService.addJob.mockResolvedValue({ id: "bullmq-job-123" }); + + const result = await service.createJob(dto); + + expect(result).toHaveProperty("jobId", mockJob.id); + expect(result).toHaveProperty("status", "PENDING"); + expect(mockPrismaService.runnerJob.create).toHaveBeenCalled(); + expect(mockJobEventsService.emitJobCreated).toHaveBeenCalledWith( + mockJob.id, + expect.any(Object) + ); + expect(mockBullMqService.addJob).toHaveBeenCalled(); + }); + + it("should throw NotFoundException if workspace does not exist", async () => { + const dto = { + workspaceId: "non-existent", + type: "code-task", + issueNumber: 42, + repository: "mosaic/stack", + }; + + mockPrismaService.workspace.findUnique.mockResolvedValue(null); + + await expect(service.createJob(dto)).rejects.toThrow(NotFoundException); + }); + }); + + describe("updateJobStatus", () => { + it("should update job status to RUNNING", async () => { + const updatedJob = { ...mockJob, status: RunnerJobStatus.RUNNING, startedAt: new Date() }; + + mockPrismaService.runnerJob.findUnique.mockResolvedValue(mockJob); + mockPrismaService.runnerJob.update.mockResolvedValue(updatedJob); + mockJobEventsService.emitJobStarted.mockResolvedValue(mockEvent); + mockHeraldService.broadcastJobEvent.mockResolvedValue(undefined); + + const result = await service.updateJobStatus("job-123", { + status: "RUNNING" as const, + agentId: "agent-42", + }); + + expect(result.status).toBe(RunnerJobStatus.RUNNING); + expect(mockJobEventsService.emitJobStarted).toHaveBeenCalled(); + }); + + it("should throw NotFoundException if job does not exist", async () => { + mockPrismaService.runnerJob.findUnique.mockResolvedValue(null); + + await expect( + service.updateJobStatus("non-existent", { status: "RUNNING" as const }) + ).rejects.toThrow(NotFoundException); + }); + + it("should throw BadRequestException for invalid status transition", async () => { + const completedJob = { ...mockJob, status: RunnerJobStatus.COMPLETED }; + mockPrismaService.runnerJob.findUnique.mockResolvedValue(completedJob); + + await expect( + service.updateJobStatus("job-123", { status: "RUNNING" as const }) + ).rejects.toThrow(BadRequestException); + }); + }); + + describe("updateJobProgress", () => { + it("should update job progress percentage", async () => { + const runningJob = { ...mockJob, status: RunnerJobStatus.RUNNING }; + const updatedJob = { ...runningJob, progressPercent: 50 }; + + mockPrismaService.runnerJob.findUnique.mockResolvedValue(runningJob); + mockPrismaService.runnerJob.update.mockResolvedValue(updatedJob); + mockJobEventsService.emitEvent.mockResolvedValue(mockEvent); + + const result = await service.updateJobProgress("job-123", { + progressPercent: 50, + currentStep: "Running tests", + }); + + expect(result.progressPercent).toBe(50); + expect(mockJobEventsService.emitEvent).toHaveBeenCalledWith( + "job-123", + expect.objectContaining({ type: "job.progress" }) + ); + }); + + it("should throw BadRequestException if job is not running", async () => { + mockPrismaService.runnerJob.findUnique.mockResolvedValue(mockJob); + + await expect(service.updateJobProgress("job-123", { progressPercent: 50 })).rejects.toThrow( + BadRequestException + ); + }); + }); + + describe("completeJob", () => { + it("should mark job as completed and broadcast", async () => { + const runningJob = { ...mockJob, status: RunnerJobStatus.RUNNING, startedAt: new Date() }; + const completedJob = { + ...runningJob, + status: RunnerJobStatus.COMPLETED, + progressPercent: 100, + completedAt: new Date(), + }; + + mockPrismaService.runnerJob.findUnique.mockResolvedValue(runningJob); + mockPrismaService.runnerJob.update.mockResolvedValue(completedJob); + mockJobEventsService.emitJobCompleted.mockResolvedValue(mockEvent); + mockHeraldService.broadcastJobEvent.mockResolvedValue(undefined); + + const result = await service.completeJob("job-123", { + result: { commitSha: "abc123" }, + }); + + expect(result.status).toBe(RunnerJobStatus.COMPLETED); + expect(result.progressPercent).toBe(100); + expect(mockJobEventsService.emitJobCompleted).toHaveBeenCalled(); + expect(mockHeraldService.broadcastJobEvent).toHaveBeenCalled(); + }); + }); + + describe("failJob", () => { + it("should mark job as failed and broadcast", async () => { + const runningJob = { ...mockJob, status: RunnerJobStatus.RUNNING }; + const failedJob = { + ...runningJob, + status: RunnerJobStatus.FAILED, + error: "Test failed", + completedAt: new Date(), + }; + + mockPrismaService.runnerJob.findUnique.mockResolvedValue(runningJob); + mockPrismaService.runnerJob.update.mockResolvedValue(failedJob); + mockJobEventsService.emitJobFailed.mockResolvedValue(mockEvent); + mockHeraldService.broadcastJobEvent.mockResolvedValue(undefined); + + const result = await service.failJob("job-123", { + error: "Test failed", + gateResults: { lint: false, test: false }, + }); + + expect(result.status).toBe(RunnerJobStatus.FAILED); + expect(result.error).toBe("Test failed"); + expect(mockJobEventsService.emitJobFailed).toHaveBeenCalled(); + expect(mockHeraldService.broadcastJobEvent).toHaveBeenCalled(); + }); + }); + + describe("getIntegrationHealth", () => { + it("should return health status with all components", async () => { + mockBullMqService.getHealthStatus.mockResolvedValue({ + connected: true, + queues: { main: 5, runner: 2 }, + }); + + const result = await service.getIntegrationHealth(); + + expect(result).toHaveProperty("api", true); + expect(result).toHaveProperty("bullmq"); + expect(result.bullmq.connected).toBe(true); + }); + + it("should handle BullMQ health check failure gracefully", async () => { + mockBullMqService.getHealthStatus.mockRejectedValue(new Error("Connection failed")); + + const result = await service.getIntegrationHealth(); + + expect(result.api).toBe(true); + expect(result.bullmq.connected).toBe(false); + }); + }); + + describe("getJobDetails", () => { + it("should return job with events and steps", async () => { + const jobWithDetails = { + ...mockJob, + steps: [], + events: [mockEvent], + }; + + mockPrismaService.runnerJob.findUnique.mockResolvedValue(jobWithDetails); + + const result = await service.getJobDetails("job-123"); + + expect(result).toHaveProperty("id", "job-123"); + expect(result).toHaveProperty("events"); + expect(result).toHaveProperty("steps"); + }); + + it("should throw NotFoundException if job does not exist", async () => { + mockPrismaService.runnerJob.findUnique.mockResolvedValue(null); + + await expect(service.getJobDetails("non-existent")).rejects.toThrow(NotFoundException); + }); + }); +}); diff --git a/apps/api/src/coordinator-integration/coordinator-integration.service.ts b/apps/api/src/coordinator-integration/coordinator-integration.service.ts new file mode 100644 index 0000000..8bf69e4 --- /dev/null +++ b/apps/api/src/coordinator-integration/coordinator-integration.service.ts @@ -0,0 +1,372 @@ +import { Injectable, Logger, NotFoundException, BadRequestException } from "@nestjs/common"; +import { Prisma, RunnerJobStatus } from "@prisma/client"; +import { PrismaService } from "../prisma/prisma.service"; +import { JobEventsService } from "../job-events/job-events.service"; +import { HeraldService } from "../herald/herald.service"; +import { BullMqService } from "../bullmq/bullmq.service"; +import { QUEUE_NAMES } from "../bullmq/queues"; +import { JOB_PROGRESS } from "../job-events/event-types"; +import { + CoordinatorJobStatus, + type CreateCoordinatorJobDto, + type UpdateJobStatusDto, + type UpdateJobProgressDto, + type CompleteJobDto, + type FailJobDto, +} from "./dto"; +import type { CoordinatorJobResult, CoordinatorHealthStatus } from "./interfaces"; + +/** + * CoordinatorIntegrationService - Bridge between Python coordinator and NestJS API + * + * Responsibilities: + * - Create jobs from coordinator webhook events + * - Update job status as coordinator processes + * - Handle job completion and failure + * - Broadcast events via Herald + * - Provide integration health status + */ +@Injectable() +export class CoordinatorIntegrationService { + private readonly logger = new Logger(CoordinatorIntegrationService.name); + + constructor( + private readonly prisma: PrismaService, + private readonly jobEvents: JobEventsService, + private readonly herald: HeraldService, + private readonly bullMq: BullMqService + ) {} + + /** + * Create a job from the coordinator + */ + async createJob(dto: CreateCoordinatorJobDto): Promise { + this.logger.log(`Creating job for issue #${String(dto.issueNumber)} from ${dto.repository}`); + + // Verify workspace exists + const workspace = await this.prisma.workspace.findUnique({ + where: { id: dto.workspaceId }, + select: { id: true }, + }); + + if (!workspace) { + throw new NotFoundException(`Workspace with ID ${dto.workspaceId} not found`); + } + + // Create RunnerJob in database + const job = await this.prisma.runnerJob.create({ + data: { + workspaceId: dto.workspaceId, + type: dto.type, + priority: dto.priority ?? 10, + status: RunnerJobStatus.PENDING, + progressPercent: 0, + }, + }); + + // Emit job.created event + await this.jobEvents.emitJobCreated(job.id, { + issueNumber: dto.issueNumber, + repository: dto.repository, + type: dto.type, + priority: dto.priority ?? 10, + metadata: dto.metadata, + source: "coordinator", + }); + + // Add job to BullMQ queue + await this.bullMq.addJob( + QUEUE_NAMES.MAIN, + dto.type, + { + jobId: job.id, + workspaceId: dto.workspaceId, + issueNumber: dto.issueNumber, + repository: dto.repository, + metadata: dto.metadata, + }, + { priority: dto.priority ?? 10 } + ); + + this.logger.log(`Job ${job.id} created and queued for issue #${String(dto.issueNumber)}`); + + return { + jobId: job.id, + status: job.status, + queueName: QUEUE_NAMES.MAIN, + }; + } + + /** + * Update job status from the coordinator + */ + async updateJobStatus( + jobId: string, + dto: UpdateJobStatusDto + ): Promise>> { + this.logger.log(`Updating job ${jobId} status to ${dto.status}`); + + // Verify job exists + const job = await this.prisma.runnerJob.findUnique({ + where: { id: jobId }, + select: { id: true, status: true, workspaceId: true }, + }); + + if (!job) { + throw new NotFoundException(`RunnerJob with ID ${jobId} not found`); + } + + // Validate status transition + if (!this.isValidStatusTransition(job.status, dto.status as RunnerJobStatus)) { + throw new BadRequestException( + `Invalid status transition from ${job.status} to ${dto.status}` + ); + } + + const updateData: Prisma.RunnerJobUpdateInput = { + status: dto.status as RunnerJobStatus, + }; + + // Set startedAt when transitioning to RUNNING + if (dto.status === CoordinatorJobStatus.RUNNING) { + updateData.startedAt = new Date(); + } + + const updatedJob = await this.prisma.runnerJob.update({ + where: { id: jobId }, + data: updateData, + }); + + // Emit appropriate event + if (dto.status === CoordinatorJobStatus.RUNNING) { + const event = await this.jobEvents.emitJobStarted(jobId, { + agentId: dto.agentId, + agentType: dto.agentType, + }); + + // Broadcast via Herald + await this.herald.broadcastJobEvent(jobId, event); + } + + return updatedJob; + } + + /** + * Update job progress from the coordinator + */ + async updateJobProgress( + jobId: string, + dto: UpdateJobProgressDto + ): Promise>> { + this.logger.log(`Updating job ${jobId} progress to ${String(dto.progressPercent)}%`); + + // Verify job exists and is running + const job = await this.prisma.runnerJob.findUnique({ + where: { id: jobId }, + select: { id: true, status: true }, + }); + + if (!job) { + throw new NotFoundException(`RunnerJob with ID ${jobId} not found`); + } + + if (job.status !== RunnerJobStatus.RUNNING) { + throw new BadRequestException(`Cannot update progress for job with status ${job.status}`); + } + + const updatedJob = await this.prisma.runnerJob.update({ + where: { id: jobId }, + data: { progressPercent: dto.progressPercent }, + }); + + // Emit progress event + await this.jobEvents.emitEvent(jobId, { + type: JOB_PROGRESS, + actor: "coordinator", + payload: { + progressPercent: dto.progressPercent, + currentStep: dto.currentStep, + tokensUsed: dto.tokensUsed, + }, + }); + + return updatedJob; + } + + /** + * Mark job as completed from the coordinator + */ + async completeJob( + jobId: string, + dto: CompleteJobDto + ): Promise>> { + this.logger.log(`Completing job ${jobId}`); + + // Verify job exists + const job = await this.prisma.runnerJob.findUnique({ + where: { id: jobId }, + select: { id: true, status: true, startedAt: true }, + }); + + if (!job) { + throw new NotFoundException(`RunnerJob with ID ${jobId} not found`); + } + + // Validate status transition + if (!this.isValidStatusTransition(job.status, RunnerJobStatus.COMPLETED)) { + throw new BadRequestException(`Cannot complete job with status ${job.status}`); + } + + // Calculate duration if not provided + let durationSeconds = dto.durationSeconds; + if (durationSeconds === undefined && job.startedAt) { + durationSeconds = Math.round((new Date().getTime() - job.startedAt.getTime()) / 1000); + } + + const updateData: Prisma.RunnerJobUpdateInput = { + status: RunnerJobStatus.COMPLETED, + progressPercent: 100, + completedAt: new Date(), + }; + + if (dto.result) { + updateData.result = dto.result as Prisma.InputJsonValue; + } + + const updatedJob = await this.prisma.runnerJob.update({ + where: { id: jobId }, + data: updateData, + }); + + // Emit completion event + const event = await this.jobEvents.emitJobCompleted(jobId, { + result: dto.result, + tokensUsed: dto.tokensUsed, + durationSeconds, + }); + + // Broadcast via Herald + await this.herald.broadcastJobEvent(jobId, event); + + return updatedJob; + } + + /** + * Mark job as failed from the coordinator + */ + async failJob( + jobId: string, + dto: FailJobDto + ): Promise>> { + this.logger.log(`Failing job ${jobId}: ${dto.error}`); + + // Verify job exists + const job = await this.prisma.runnerJob.findUnique({ + where: { id: jobId }, + select: { id: true, status: true }, + }); + + if (!job) { + throw new NotFoundException(`RunnerJob with ID ${jobId} not found`); + } + + // Validate status transition + if (!this.isValidStatusTransition(job.status, RunnerJobStatus.FAILED)) { + throw new BadRequestException(`Cannot fail job with status ${job.status}`); + } + + const updatedJob = await this.prisma.runnerJob.update({ + where: { id: jobId }, + data: { + status: RunnerJobStatus.FAILED, + error: dto.error, + completedAt: new Date(), + }, + }); + + // Emit failure event + const event = await this.jobEvents.emitJobFailed(jobId, { + error: dto.error, + gateResults: dto.gateResults, + failedStep: dto.failedStep, + continuationPrompt: dto.continuationPrompt, + }); + + // Broadcast via Herald + await this.herald.broadcastJobEvent(jobId, event); + + return updatedJob; + } + + /** + * Get job details with events and steps + */ + async getJobDetails( + jobId: string + ): Promise>> { + const job = await this.prisma.runnerJob.findUnique({ + where: { id: jobId }, + include: { + steps: { + orderBy: { ordinal: "asc" }, + }, + events: { + orderBy: { timestamp: "asc" }, + }, + }, + }); + + if (!job) { + throw new NotFoundException(`RunnerJob with ID ${jobId} not found`); + } + + return job; + } + + /** + * Get integration health status + */ + async getIntegrationHealth(): Promise { + let bullmqStatus = { connected: false, queues: {} as Record }; + + try { + bullmqStatus = await this.bullMq.getHealthStatus(); + } catch (error) { + this.logger.error("Failed to get BullMQ health status", error); + } + + return { + api: true, + bullmq: bullmqStatus, + timestamp: new Date(), + }; + } + + /** + * Validate status transitions + */ + private isValidStatusTransition( + currentStatus: RunnerJobStatus, + newStatus: RunnerJobStatus + ): boolean { + // Define valid transitions + const validTransitions: Record = { + [RunnerJobStatus.PENDING]: [ + RunnerJobStatus.QUEUED, + RunnerJobStatus.RUNNING, + RunnerJobStatus.CANCELLED, + ], + [RunnerJobStatus.QUEUED]: [RunnerJobStatus.RUNNING, RunnerJobStatus.CANCELLED], + [RunnerJobStatus.RUNNING]: [ + RunnerJobStatus.COMPLETED, + RunnerJobStatus.FAILED, + RunnerJobStatus.CANCELLED, + ], + [RunnerJobStatus.COMPLETED]: [], + [RunnerJobStatus.FAILED]: [], + [RunnerJobStatus.CANCELLED]: [], + }; + + return validTransitions[currentStatus].includes(newStatus); + } +} diff --git a/apps/api/src/coordinator-integration/dto/complete-job.dto.ts b/apps/api/src/coordinator-integration/dto/complete-job.dto.ts new file mode 100644 index 0000000..470c2e2 --- /dev/null +++ b/apps/api/src/coordinator-integration/dto/complete-job.dto.ts @@ -0,0 +1,20 @@ +import { IsOptional, IsObject, IsNumber, Min } from "class-validator"; + +/** + * DTO for completing a job from the coordinator + */ +export class CompleteJobDto { + @IsOptional() + @IsObject() + result?: Record; + + @IsOptional() + @IsNumber() + @Min(0) + tokensUsed?: number; + + @IsOptional() + @IsNumber() + @Min(0) + durationSeconds?: number; +} diff --git a/apps/api/src/coordinator-integration/dto/create-coordinator-job.dto.ts b/apps/api/src/coordinator-integration/dto/create-coordinator-job.dto.ts new file mode 100644 index 0000000..3ab5dcd --- /dev/null +++ b/apps/api/src/coordinator-integration/dto/create-coordinator-job.dto.ts @@ -0,0 +1,28 @@ +import { IsString, IsOptional, IsNumber, IsObject, Min, Max, IsUUID } from "class-validator"; + +/** + * DTO for creating a job from the coordinator + */ +export class CreateCoordinatorJobDto { + @IsUUID("4") + workspaceId!: string; + + @IsString() + type!: string; // 'code-task', 'git-status', 'priority-calc' + + @IsNumber() + issueNumber!: number; + + @IsString() + repository!: string; + + @IsOptional() + @IsNumber() + @Min(1) + @Max(100) + priority?: number; + + @IsOptional() + @IsObject() + metadata?: Record; +} diff --git a/apps/api/src/coordinator-integration/dto/fail-job.dto.ts b/apps/api/src/coordinator-integration/dto/fail-job.dto.ts new file mode 100644 index 0000000..64250c6 --- /dev/null +++ b/apps/api/src/coordinator-integration/dto/fail-job.dto.ts @@ -0,0 +1,22 @@ +import { IsString, IsOptional, IsObject } from "class-validator"; +import type { QualityGateResult } from "../interfaces"; + +/** + * DTO for failing a job from the coordinator + */ +export class FailJobDto { + @IsString() + error!: string; + + @IsOptional() + @IsObject() + gateResults?: QualityGateResult; + + @IsOptional() + @IsString() + failedStep?: string; + + @IsOptional() + @IsString() + continuationPrompt?: string; +} diff --git a/apps/api/src/coordinator-integration/dto/index.ts b/apps/api/src/coordinator-integration/dto/index.ts new file mode 100644 index 0000000..87302a4 --- /dev/null +++ b/apps/api/src/coordinator-integration/dto/index.ts @@ -0,0 +1,5 @@ +export * from "./create-coordinator-job.dto"; +export * from "./update-job-status.dto"; +export * from "./update-job-progress.dto"; +export * from "./complete-job.dto"; +export * from "./fail-job.dto"; diff --git a/apps/api/src/coordinator-integration/dto/update-job-progress.dto.ts b/apps/api/src/coordinator-integration/dto/update-job-progress.dto.ts new file mode 100644 index 0000000..b6194a3 --- /dev/null +++ b/apps/api/src/coordinator-integration/dto/update-job-progress.dto.ts @@ -0,0 +1,19 @@ +import { IsNumber, IsOptional, IsString, Min, Max } from "class-validator"; + +/** + * DTO for updating job progress from the coordinator + */ +export class UpdateJobProgressDto { + @IsNumber() + @Min(0) + @Max(100) + progressPercent!: number; + + @IsOptional() + @IsString() + currentStep?: string; + + @IsOptional() + @IsNumber() + tokensUsed?: number; +} diff --git a/apps/api/src/coordinator-integration/dto/update-job-status.dto.ts b/apps/api/src/coordinator-integration/dto/update-job-status.dto.ts new file mode 100644 index 0000000..b89e71f --- /dev/null +++ b/apps/api/src/coordinator-integration/dto/update-job-status.dto.ts @@ -0,0 +1,25 @@ +import { IsString, IsOptional, IsEnum } from "class-validator"; + +/** + * Valid status values for coordinator status updates + */ +export enum CoordinatorJobStatus { + RUNNING = "RUNNING", + PENDING = "PENDING", +} + +/** + * DTO for updating job status from the coordinator + */ +export class UpdateJobStatusDto { + @IsEnum(CoordinatorJobStatus) + status!: CoordinatorJobStatus; + + @IsOptional() + @IsString() + agentId?: string; + + @IsOptional() + @IsString() + agentType?: string; +} diff --git a/apps/api/src/coordinator-integration/index.ts b/apps/api/src/coordinator-integration/index.ts new file mode 100644 index 0000000..e4c02e6 --- /dev/null +++ b/apps/api/src/coordinator-integration/index.ts @@ -0,0 +1,5 @@ +export * from "./coordinator-integration.module"; +export * from "./coordinator-integration.service"; +export * from "./coordinator-integration.controller"; +export * from "./dto"; +export * from "./interfaces"; diff --git a/apps/api/src/coordinator-integration/interfaces/coordinator-job.interface.ts b/apps/api/src/coordinator-integration/interfaces/coordinator-job.interface.ts new file mode 100644 index 0000000..2f5fe09 --- /dev/null +++ b/apps/api/src/coordinator-integration/interfaces/coordinator-job.interface.ts @@ -0,0 +1,41 @@ +/** + * Result of job creation from coordinator + */ +export interface CoordinatorJobResult { + jobId: string; + status: string; + queueName: string; + estimatedStartTime?: Date; +} + +/** + * Health status for coordinator integration + */ +export interface CoordinatorHealthStatus { + api: boolean; + bullmq: { + connected: boolean; + queues: Record; + }; + timestamp: Date; +} + +/** + * Quality gate result from coordinator + */ +export interface QualityGateResult { + lint?: boolean; + typecheck?: boolean; + test?: boolean; + coverage?: boolean; + build?: boolean; +} + +/** + * Agent assignment info from coordinator + */ +export interface AgentAssignment { + agentType: string; // 'sonnet', 'opus', 'haiku', 'glm' + agentId: string; + estimatedContext: number; +} diff --git a/apps/api/src/coordinator-integration/interfaces/index.ts b/apps/api/src/coordinator-integration/interfaces/index.ts new file mode 100644 index 0000000..e756fd3 --- /dev/null +++ b/apps/api/src/coordinator-integration/interfaces/index.ts @@ -0,0 +1 @@ +export * from "./coordinator-job.interface"; diff --git a/apps/api/src/job-events/event-types.ts b/apps/api/src/job-events/event-types.ts index f4a44f4..0905000 100644 --- a/apps/api/src/job-events/event-types.ts +++ b/apps/api/src/job-events/event-types.ts @@ -7,6 +7,7 @@ export const JOB_CREATED = "job.created"; export const JOB_QUEUED = "job.queued"; export const JOB_STARTED = "job.started"; +export const JOB_PROGRESS = "job.progress"; export const JOB_COMPLETED = "job.completed"; export const JOB_FAILED = "job.failed"; export const JOB_CANCELLED = "job.cancelled"; @@ -36,6 +37,7 @@ export const ALL_EVENT_TYPES = [ JOB_CREATED, JOB_QUEUED, JOB_STARTED, + JOB_PROGRESS, JOB_COMPLETED, JOB_FAILED, JOB_CANCELLED, diff --git a/docs/scratchpads/176-coordinator-integration.md b/docs/scratchpads/176-coordinator-integration.md new file mode 100644 index 0000000..136f42d --- /dev/null +++ b/docs/scratchpads/176-coordinator-integration.md @@ -0,0 +1,102 @@ +# Issue #176: Coordinator Integration + +## Objective + +Integrate M4.2 infrastructure (NestJS API) with M4.1 coordinator (Python FastAPI) to enable seamless job orchestration between the two systems. + +## Architecture Analysis + +### M4.1 Coordinator (Python) + +- FastAPI application at `apps/coordinator` +- Handles Gitea webhooks, queue management, agent orchestration +- Uses file-based JSON queue for persistence +- Has QueueManager, Coordinator, and OrchestrationLoop classes +- Exposes `/webhook/gitea` and `/health` endpoints + +### M4.2 Infrastructure (NestJS) + +- StitcherModule: Workflow orchestration, webhook handling, job dispatch +- RunnerJobsModule: CRUD for RunnerJob entities, BullMQ integration +- JobEventsModule: Event tracking and audit logging +- JobStepsModule: Step tracking for jobs +- HeraldModule: Status broadcasting to Discord +- BullMqModule: Queue infrastructure with Valkey backend +- BridgeModule: Discord integration + +## Integration Design + +### Flow 1: Webhook -> Job Creation + +``` +Gitea -> Coordinator (Python) -> NestJS API -> RunnerJob + BullMQ + ^ + | HTTP POST /api/coordinator/jobs +``` + +### Flow 2: Job Status Updates + +``` +Coordinator (Python) -> NestJS API -> JobEvent -> Herald -> Discord + ^ + | HTTP PATCH /api/coordinator/jobs/:id/status +``` + +### Flow 3: Job Completion + +``` +Coordinator (Python) -> NestJS API -> Complete RunnerJob -> Herald broadcast + ^ + | HTTP POST /api/coordinator/jobs/:id/complete +``` + +## Implementation Plan + +### 1. Create Coordinator Integration Module + +- `apps/api/src/coordinator-integration/` + - `coordinator-integration.module.ts` - NestJS module + - `coordinator-integration.controller.ts` - REST endpoints for Python coordinator + - `coordinator-integration.service.ts` - Business logic + - `dto/` - DTOs for coordinator communication + - `interfaces/` - Type definitions + +### 2. Endpoints for Python Coordinator + +- `POST /api/coordinator/jobs` - Create job from coordinator +- `PATCH /api/coordinator/jobs/:id/status` - Update job status +- `POST /api/coordinator/jobs/:id/complete` - Mark job complete +- `POST /api/coordinator/jobs/:id/fail` - Mark job failed +- `GET /api/coordinator/health` - Integration health check + +### 3. Event Bridging + +- When coordinator reports progress -> emit JobEvent +- When coordinator completes -> update RunnerJob + emit completion event +- Herald subscribes and broadcasts to Discord + +## TDD Approach + +1. Write tests for CoordinatorIntegrationService +2. Write tests for CoordinatorIntegrationController +3. Implement minimal code to pass tests +4. Refactor + +## Progress + +- [x] Analyze coordinator structure +- [x] Analyze M4.2 infrastructure +- [x] Design integration layer +- [x] Write failing tests for service +- [x] Implement service +- [x] Write failing tests for controller +- [x] Implement controller +- [x] Add DTOs and interfaces +- [x] Run quality gates +- [x] Commit + +## Notes + +- The Python coordinator uses httpx.AsyncClient for HTTP calls +- API auth can be handled via shared secret (API key) +- Events follow established patterns from job-events module