feat(#176): Integrate M4.2 infrastructure with M4.1 coordinator

Add CoordinatorIntegrationModule providing REST API endpoints for the Python
coordinator to communicate with the NestJS API infrastructure:

- POST /coordinator/jobs - Create job from coordinator webhook events
- PATCH /coordinator/jobs/:id/status - Update job status (PENDING -> RUNNING)
- PATCH /coordinator/jobs/:id/progress - Update job progress percentage
- POST /coordinator/jobs/:id/complete - Mark job complete with results
- POST /coordinator/jobs/:id/fail - Mark job failed with gate results
- GET /coordinator/jobs/:id - Get job details with events and steps
- GET /coordinator/health - Integration health check

Integration features:
- Job creation dispatches to BullMQ queues
- Status updates emit JobEvents for audit logging
- Completion/failure events broadcast via Herald to Discord
- Status transition validation (PENDING -> QUEUED -> RUNNING -> COMPLETED/FAILED)
- Health check includes BullMQ connection status and queue counts

Also adds JOB_PROGRESS event type to event-types.ts for progress tracking.

Fixes #176

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-01 21:54:34 -06:00
parent 3cdcbf6774
commit 5a51ee8c30
17 changed files with 1262 additions and 0 deletions

View File

@@ -27,6 +27,7 @@ import { TelemetryModule, TelemetryInterceptor } from "./telemetry";
import { RunnerJobsModule } from "./runner-jobs/runner-jobs.module";
import { JobEventsModule } from "./job-events/job-events.module";
import { JobStepsModule } from "./job-steps/job-steps.module";
import { CoordinatorIntegrationModule } from "./coordinator-integration/coordinator-integration.module";
@Module({
imports: [
@@ -55,6 +56,7 @@ import { JobStepsModule } from "./job-steps/job-steps.module";
RunnerJobsModule,
JobEventsModule,
JobStepsModule,
CoordinatorIntegrationModule,
],
controllers: [AppController],
providers: [

View File

@@ -0,0 +1,184 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { RunnerJobStatus } from "@prisma/client";
import { CoordinatorIntegrationController } from "./coordinator-integration.controller";
import { CoordinatorIntegrationService } from "./coordinator-integration.service";
import type { CoordinatorJobResult, CoordinatorHealthStatus } from "./interfaces";
import { CoordinatorJobStatus } from "./dto";
describe("CoordinatorIntegrationController", () => {
let controller: CoordinatorIntegrationController;
const mockJobResult: CoordinatorJobResult = {
jobId: "job-123",
status: "PENDING",
queueName: "mosaic:main",
};
const mockJob = {
id: "job-123",
workspaceId: "workspace-123",
type: "code-task",
status: RunnerJobStatus.PENDING,
priority: 10,
progressPercent: 0,
agentTaskId: null,
result: null,
error: null,
startedAt: null,
completedAt: null,
createdAt: new Date(),
updatedAt: new Date(),
};
const mockHealthStatus: CoordinatorHealthStatus = {
api: true,
bullmq: {
connected: true,
queues: { main: 5, runner: 2 },
},
timestamp: new Date(),
};
const mockService = {
createJob: vi.fn(),
updateJobStatus: vi.fn(),
updateJobProgress: vi.fn(),
completeJob: vi.fn(),
failJob: vi.fn(),
getJobDetails: vi.fn(),
getIntegrationHealth: vi.fn(),
};
beforeEach(async () => {
vi.clearAllMocks();
const module: TestingModule = await Test.createTestingModule({
controllers: [CoordinatorIntegrationController],
providers: [{ provide: CoordinatorIntegrationService, useValue: mockService }],
}).compile();
controller = module.get<CoordinatorIntegrationController>(CoordinatorIntegrationController);
});
describe("POST /coordinator/jobs", () => {
it("should create a job and return job result", async () => {
const dto = {
workspaceId: "workspace-123",
type: "code-task",
issueNumber: 42,
repository: "mosaic/stack",
};
mockService.createJob.mockResolvedValue(mockJobResult);
const result = await controller.createJob(dto);
expect(result).toEqual(mockJobResult);
expect(mockService.createJob).toHaveBeenCalledWith(dto);
});
});
describe("PATCH /coordinator/jobs/:id/status", () => {
it("should update job status", async () => {
const updatedJob = { ...mockJob, status: RunnerJobStatus.RUNNING };
mockService.updateJobStatus.mockResolvedValue(updatedJob);
const result = await controller.updateJobStatus("job-123", {
status: CoordinatorJobStatus.RUNNING,
agentId: "agent-42",
});
expect(result.status).toBe(RunnerJobStatus.RUNNING);
expect(mockService.updateJobStatus).toHaveBeenCalledWith("job-123", {
status: CoordinatorJobStatus.RUNNING,
agentId: "agent-42",
});
});
});
describe("PATCH /coordinator/jobs/:id/progress", () => {
it("should update job progress", async () => {
const updatedJob = { ...mockJob, progressPercent: 50 };
mockService.updateJobProgress.mockResolvedValue(updatedJob);
const result = await controller.updateJobProgress("job-123", {
progressPercent: 50,
currentStep: "Running tests",
});
expect(result.progressPercent).toBe(50);
expect(mockService.updateJobProgress).toHaveBeenCalledWith("job-123", {
progressPercent: 50,
currentStep: "Running tests",
});
});
});
describe("POST /coordinator/jobs/:id/complete", () => {
it("should complete a job", async () => {
const completedJob = {
...mockJob,
status: RunnerJobStatus.COMPLETED,
progressPercent: 100,
};
mockService.completeJob.mockResolvedValue(completedJob);
const result = await controller.completeJob("job-123", {
result: { commitSha: "abc123" },
});
expect(result.status).toBe(RunnerJobStatus.COMPLETED);
expect(mockService.completeJob).toHaveBeenCalledWith("job-123", {
result: { commitSha: "abc123" },
});
});
});
describe("POST /coordinator/jobs/:id/fail", () => {
it("should fail a job", async () => {
const failedJob = {
...mockJob,
status: RunnerJobStatus.FAILED,
error: "Test failed",
};
mockService.failJob.mockResolvedValue(failedJob);
const result = await controller.failJob("job-123", {
error: "Test failed",
gateResults: { lint: true, test: false },
});
expect(result.status).toBe(RunnerJobStatus.FAILED);
expect(result.error).toBe("Test failed");
expect(mockService.failJob).toHaveBeenCalledWith("job-123", {
error: "Test failed",
gateResults: { lint: true, test: false },
});
});
});
describe("GET /coordinator/jobs/:id", () => {
it("should return job details", async () => {
const jobWithDetails = { ...mockJob, steps: [], events: [] };
mockService.getJobDetails.mockResolvedValue(jobWithDetails);
const result = await controller.getJobDetails("job-123");
expect(result).toEqual(jobWithDetails);
expect(mockService.getJobDetails).toHaveBeenCalledWith("job-123");
});
});
describe("GET /coordinator/health", () => {
it("should return integration health status", async () => {
mockService.getIntegrationHealth.mockResolvedValue(mockHealthStatus);
const result = await controller.getHealth();
expect(result.api).toBe(true);
expect(result.bullmq.connected).toBe(true);
expect(mockService.getIntegrationHealth).toHaveBeenCalled();
});
});
});

View File

@@ -0,0 +1,97 @@
import { Controller, Post, Patch, Get, Body, Param } from "@nestjs/common";
import { CoordinatorIntegrationService } from "./coordinator-integration.service";
import {
CreateCoordinatorJobDto,
UpdateJobStatusDto,
UpdateJobProgressDto,
CompleteJobDto,
FailJobDto,
} from "./dto";
import type { CoordinatorJobResult, CoordinatorHealthStatus } from "./interfaces";
/**
* CoordinatorIntegrationController - REST API for Python coordinator communication
*
* Endpoints:
* - POST /coordinator/jobs - Create a job from coordinator
* - PATCH /coordinator/jobs/:id/status - Update job status
* - PATCH /coordinator/jobs/:id/progress - Update job progress
* - POST /coordinator/jobs/:id/complete - Mark job as complete
* - POST /coordinator/jobs/:id/fail - Mark job as failed
* - GET /coordinator/jobs/:id - Get job details
* - GET /coordinator/health - Integration health check
*/
@Controller("coordinator")
export class CoordinatorIntegrationController {
constructor(private readonly service: CoordinatorIntegrationService) {}
/**
* Create a job from the coordinator
*/
@Post("jobs")
async createJob(@Body() dto: CreateCoordinatorJobDto): Promise<CoordinatorJobResult> {
return this.service.createJob(dto);
}
/**
* Update job status from the coordinator
*/
@Patch("jobs/:id/status")
async updateJobStatus(
@Param("id") id: string,
@Body() dto: UpdateJobStatusDto
): Promise<Awaited<ReturnType<typeof this.service.updateJobStatus>>> {
return this.service.updateJobStatus(id, dto);
}
/**
* Update job progress from the coordinator
*/
@Patch("jobs/:id/progress")
async updateJobProgress(
@Param("id") id: string,
@Body() dto: UpdateJobProgressDto
): Promise<Awaited<ReturnType<typeof this.service.updateJobProgress>>> {
return this.service.updateJobProgress(id, dto);
}
/**
* Mark job as complete from the coordinator
*/
@Post("jobs/:id/complete")
async completeJob(
@Param("id") id: string,
@Body() dto: CompleteJobDto
): Promise<Awaited<ReturnType<typeof this.service.completeJob>>> {
return this.service.completeJob(id, dto);
}
/**
* Mark job as failed from the coordinator
*/
@Post("jobs/:id/fail")
async failJob(
@Param("id") id: string,
@Body() dto: FailJobDto
): Promise<Awaited<ReturnType<typeof this.service.failJob>>> {
return this.service.failJob(id, dto);
}
/**
* Get job details with events and steps
*/
@Get("jobs/:id")
async getJobDetails(
@Param("id") id: string
): Promise<Awaited<ReturnType<typeof this.service.getJobDetails>>> {
return this.service.getJobDetails(id);
}
/**
* Integration health check
*/
@Get("health")
async getHealth(): Promise<CoordinatorHealthStatus> {
return this.service.getIntegrationHealth();
}
}

View File

@@ -0,0 +1,27 @@
import { Module } from "@nestjs/common";
import { CoordinatorIntegrationController } from "./coordinator-integration.controller";
import { CoordinatorIntegrationService } from "./coordinator-integration.service";
import { PrismaModule } from "../prisma/prisma.module";
import { BullMqModule } from "../bullmq/bullmq.module";
import { JobEventsModule } from "../job-events/job-events.module";
import { HeraldModule } from "../herald/herald.module";
/**
* CoordinatorIntegrationModule - Bridge between Python coordinator and NestJS API
*
* Provides REST endpoints for the M4.1 coordinator (Python FastAPI) to
* communicate with the M4.2 infrastructure (NestJS).
*
* Key integration points:
* - Job creation from coordinator webhook events
* - Job status updates during processing
* - Job completion and failure handling
* - Event bridging to Herald for Discord notifications
*/
@Module({
imports: [PrismaModule, BullMqModule, JobEventsModule, HeraldModule],
controllers: [CoordinatorIntegrationController],
providers: [CoordinatorIntegrationService],
exports: [CoordinatorIntegrationService],
})
export class CoordinatorIntegrationModule {}

View File

@@ -0,0 +1,310 @@
import { describe, it, expect, beforeEach, vi } from "vitest";
import { Test, TestingModule } from "@nestjs/testing";
import { NotFoundException, BadRequestException } from "@nestjs/common";
import { RunnerJobStatus } from "@prisma/client";
import { CoordinatorIntegrationService } from "./coordinator-integration.service";
import { PrismaService } from "../prisma/prisma.service";
import { JobEventsService } from "../job-events/job-events.service";
import { HeraldService } from "../herald/herald.service";
import { BullMqService } from "../bullmq/bullmq.service";
describe("CoordinatorIntegrationService", () => {
let service: CoordinatorIntegrationService;
let prismaService: PrismaService;
let jobEventsService: JobEventsService;
let heraldService: HeraldService;
let bullMqService: BullMqService;
const mockWorkspace = {
id: "workspace-123",
name: "Test Workspace",
slug: "test-workspace",
settings: {},
createdAt: new Date(),
updatedAt: new Date(),
};
const mockJob = {
id: "job-123",
workspaceId: "workspace-123",
type: "code-task",
status: RunnerJobStatus.PENDING,
priority: 10,
progressPercent: 0,
agentTaskId: null,
result: null,
error: null,
startedAt: null,
completedAt: null,
createdAt: new Date(),
updatedAt: new Date(),
};
const mockEvent = {
id: "event-123",
jobId: "job-123",
stepId: null,
type: "job.created",
timestamp: new Date(),
actor: "coordinator",
payload: {},
};
const mockPrismaService = {
workspace: {
findUnique: vi.fn(),
},
runnerJob: {
create: vi.fn(),
findUnique: vi.fn(),
update: vi.fn(),
},
};
const mockJobEventsService = {
emitEvent: vi.fn(),
emitJobCreated: vi.fn(),
emitJobStarted: vi.fn(),
emitJobCompleted: vi.fn(),
emitJobFailed: vi.fn(),
};
const mockHeraldService = {
broadcastJobEvent: vi.fn(),
};
const mockBullMqService = {
addJob: vi.fn(),
healthCheck: vi.fn(),
getHealthStatus: vi.fn(),
};
beforeEach(async () => {
vi.clearAllMocks();
const module: TestingModule = await Test.createTestingModule({
providers: [
CoordinatorIntegrationService,
{ provide: PrismaService, useValue: mockPrismaService },
{ provide: JobEventsService, useValue: mockJobEventsService },
{ provide: HeraldService, useValue: mockHeraldService },
{ provide: BullMqService, useValue: mockBullMqService },
],
}).compile();
service = module.get<CoordinatorIntegrationService>(CoordinatorIntegrationService);
prismaService = module.get<PrismaService>(PrismaService);
jobEventsService = module.get<JobEventsService>(JobEventsService);
heraldService = module.get<HeraldService>(HeraldService);
bullMqService = module.get<BullMqService>(BullMqService);
});
describe("createJob", () => {
it("should create a job and add it to the queue", async () => {
const dto = {
workspaceId: "workspace-123",
type: "code-task",
issueNumber: 42,
repository: "mosaic/stack",
priority: 10,
metadata: { assignedAgent: "sonnet" },
};
mockPrismaService.workspace.findUnique.mockResolvedValue(mockWorkspace);
mockPrismaService.runnerJob.create.mockResolvedValue(mockJob);
mockJobEventsService.emitJobCreated.mockResolvedValue(mockEvent);
mockBullMqService.addJob.mockResolvedValue({ id: "bullmq-job-123" });
const result = await service.createJob(dto);
expect(result).toHaveProperty("jobId", mockJob.id);
expect(result).toHaveProperty("status", "PENDING");
expect(mockPrismaService.runnerJob.create).toHaveBeenCalled();
expect(mockJobEventsService.emitJobCreated).toHaveBeenCalledWith(
mockJob.id,
expect.any(Object)
);
expect(mockBullMqService.addJob).toHaveBeenCalled();
});
it("should throw NotFoundException if workspace does not exist", async () => {
const dto = {
workspaceId: "non-existent",
type: "code-task",
issueNumber: 42,
repository: "mosaic/stack",
};
mockPrismaService.workspace.findUnique.mockResolvedValue(null);
await expect(service.createJob(dto)).rejects.toThrow(NotFoundException);
});
});
describe("updateJobStatus", () => {
it("should update job status to RUNNING", async () => {
const updatedJob = { ...mockJob, status: RunnerJobStatus.RUNNING, startedAt: new Date() };
mockPrismaService.runnerJob.findUnique.mockResolvedValue(mockJob);
mockPrismaService.runnerJob.update.mockResolvedValue(updatedJob);
mockJobEventsService.emitJobStarted.mockResolvedValue(mockEvent);
mockHeraldService.broadcastJobEvent.mockResolvedValue(undefined);
const result = await service.updateJobStatus("job-123", {
status: "RUNNING" as const,
agentId: "agent-42",
});
expect(result.status).toBe(RunnerJobStatus.RUNNING);
expect(mockJobEventsService.emitJobStarted).toHaveBeenCalled();
});
it("should throw NotFoundException if job does not exist", async () => {
mockPrismaService.runnerJob.findUnique.mockResolvedValue(null);
await expect(
service.updateJobStatus("non-existent", { status: "RUNNING" as const })
).rejects.toThrow(NotFoundException);
});
it("should throw BadRequestException for invalid status transition", async () => {
const completedJob = { ...mockJob, status: RunnerJobStatus.COMPLETED };
mockPrismaService.runnerJob.findUnique.mockResolvedValue(completedJob);
await expect(
service.updateJobStatus("job-123", { status: "RUNNING" as const })
).rejects.toThrow(BadRequestException);
});
});
describe("updateJobProgress", () => {
it("should update job progress percentage", async () => {
const runningJob = { ...mockJob, status: RunnerJobStatus.RUNNING };
const updatedJob = { ...runningJob, progressPercent: 50 };
mockPrismaService.runnerJob.findUnique.mockResolvedValue(runningJob);
mockPrismaService.runnerJob.update.mockResolvedValue(updatedJob);
mockJobEventsService.emitEvent.mockResolvedValue(mockEvent);
const result = await service.updateJobProgress("job-123", {
progressPercent: 50,
currentStep: "Running tests",
});
expect(result.progressPercent).toBe(50);
expect(mockJobEventsService.emitEvent).toHaveBeenCalledWith(
"job-123",
expect.objectContaining({ type: "job.progress" })
);
});
it("should throw BadRequestException if job is not running", async () => {
mockPrismaService.runnerJob.findUnique.mockResolvedValue(mockJob);
await expect(service.updateJobProgress("job-123", { progressPercent: 50 })).rejects.toThrow(
BadRequestException
);
});
});
describe("completeJob", () => {
it("should mark job as completed and broadcast", async () => {
const runningJob = { ...mockJob, status: RunnerJobStatus.RUNNING, startedAt: new Date() };
const completedJob = {
...runningJob,
status: RunnerJobStatus.COMPLETED,
progressPercent: 100,
completedAt: new Date(),
};
mockPrismaService.runnerJob.findUnique.mockResolvedValue(runningJob);
mockPrismaService.runnerJob.update.mockResolvedValue(completedJob);
mockJobEventsService.emitJobCompleted.mockResolvedValue(mockEvent);
mockHeraldService.broadcastJobEvent.mockResolvedValue(undefined);
const result = await service.completeJob("job-123", {
result: { commitSha: "abc123" },
});
expect(result.status).toBe(RunnerJobStatus.COMPLETED);
expect(result.progressPercent).toBe(100);
expect(mockJobEventsService.emitJobCompleted).toHaveBeenCalled();
expect(mockHeraldService.broadcastJobEvent).toHaveBeenCalled();
});
});
describe("failJob", () => {
it("should mark job as failed and broadcast", async () => {
const runningJob = { ...mockJob, status: RunnerJobStatus.RUNNING };
const failedJob = {
...runningJob,
status: RunnerJobStatus.FAILED,
error: "Test failed",
completedAt: new Date(),
};
mockPrismaService.runnerJob.findUnique.mockResolvedValue(runningJob);
mockPrismaService.runnerJob.update.mockResolvedValue(failedJob);
mockJobEventsService.emitJobFailed.mockResolvedValue(mockEvent);
mockHeraldService.broadcastJobEvent.mockResolvedValue(undefined);
const result = await service.failJob("job-123", {
error: "Test failed",
gateResults: { lint: false, test: false },
});
expect(result.status).toBe(RunnerJobStatus.FAILED);
expect(result.error).toBe("Test failed");
expect(mockJobEventsService.emitJobFailed).toHaveBeenCalled();
expect(mockHeraldService.broadcastJobEvent).toHaveBeenCalled();
});
});
describe("getIntegrationHealth", () => {
it("should return health status with all components", async () => {
mockBullMqService.getHealthStatus.mockResolvedValue({
connected: true,
queues: { main: 5, runner: 2 },
});
const result = await service.getIntegrationHealth();
expect(result).toHaveProperty("api", true);
expect(result).toHaveProperty("bullmq");
expect(result.bullmq.connected).toBe(true);
});
it("should handle BullMQ health check failure gracefully", async () => {
mockBullMqService.getHealthStatus.mockRejectedValue(new Error("Connection failed"));
const result = await service.getIntegrationHealth();
expect(result.api).toBe(true);
expect(result.bullmq.connected).toBe(false);
});
});
describe("getJobDetails", () => {
it("should return job with events and steps", async () => {
const jobWithDetails = {
...mockJob,
steps: [],
events: [mockEvent],
};
mockPrismaService.runnerJob.findUnique.mockResolvedValue(jobWithDetails);
const result = await service.getJobDetails("job-123");
expect(result).toHaveProperty("id", "job-123");
expect(result).toHaveProperty("events");
expect(result).toHaveProperty("steps");
});
it("should throw NotFoundException if job does not exist", async () => {
mockPrismaService.runnerJob.findUnique.mockResolvedValue(null);
await expect(service.getJobDetails("non-existent")).rejects.toThrow(NotFoundException);
});
});
});

View File

@@ -0,0 +1,372 @@
import { Injectable, Logger, NotFoundException, BadRequestException } from "@nestjs/common";
import { Prisma, RunnerJobStatus } from "@prisma/client";
import { PrismaService } from "../prisma/prisma.service";
import { JobEventsService } from "../job-events/job-events.service";
import { HeraldService } from "../herald/herald.service";
import { BullMqService } from "../bullmq/bullmq.service";
import { QUEUE_NAMES } from "../bullmq/queues";
import { JOB_PROGRESS } from "../job-events/event-types";
import {
CoordinatorJobStatus,
type CreateCoordinatorJobDto,
type UpdateJobStatusDto,
type UpdateJobProgressDto,
type CompleteJobDto,
type FailJobDto,
} from "./dto";
import type { CoordinatorJobResult, CoordinatorHealthStatus } from "./interfaces";
/**
* CoordinatorIntegrationService - Bridge between Python coordinator and NestJS API
*
* Responsibilities:
* - Create jobs from coordinator webhook events
* - Update job status as coordinator processes
* - Handle job completion and failure
* - Broadcast events via Herald
* - Provide integration health status
*/
@Injectable()
export class CoordinatorIntegrationService {
private readonly logger = new Logger(CoordinatorIntegrationService.name);
constructor(
private readonly prisma: PrismaService,
private readonly jobEvents: JobEventsService,
private readonly herald: HeraldService,
private readonly bullMq: BullMqService
) {}
/**
* Create a job from the coordinator
*/
async createJob(dto: CreateCoordinatorJobDto): Promise<CoordinatorJobResult> {
this.logger.log(`Creating job for issue #${String(dto.issueNumber)} from ${dto.repository}`);
// Verify workspace exists
const workspace = await this.prisma.workspace.findUnique({
where: { id: dto.workspaceId },
select: { id: true },
});
if (!workspace) {
throw new NotFoundException(`Workspace with ID ${dto.workspaceId} not found`);
}
// Create RunnerJob in database
const job = await this.prisma.runnerJob.create({
data: {
workspaceId: dto.workspaceId,
type: dto.type,
priority: dto.priority ?? 10,
status: RunnerJobStatus.PENDING,
progressPercent: 0,
},
});
// Emit job.created event
await this.jobEvents.emitJobCreated(job.id, {
issueNumber: dto.issueNumber,
repository: dto.repository,
type: dto.type,
priority: dto.priority ?? 10,
metadata: dto.metadata,
source: "coordinator",
});
// Add job to BullMQ queue
await this.bullMq.addJob(
QUEUE_NAMES.MAIN,
dto.type,
{
jobId: job.id,
workspaceId: dto.workspaceId,
issueNumber: dto.issueNumber,
repository: dto.repository,
metadata: dto.metadata,
},
{ priority: dto.priority ?? 10 }
);
this.logger.log(`Job ${job.id} created and queued for issue #${String(dto.issueNumber)}`);
return {
jobId: job.id,
status: job.status,
queueName: QUEUE_NAMES.MAIN,
};
}
/**
* Update job status from the coordinator
*/
async updateJobStatus(
jobId: string,
dto: UpdateJobStatusDto
): Promise<Awaited<ReturnType<typeof this.prisma.runnerJob.update>>> {
this.logger.log(`Updating job ${jobId} status to ${dto.status}`);
// Verify job exists
const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId },
select: { id: true, status: true, workspaceId: true },
});
if (!job) {
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
}
// Validate status transition
if (!this.isValidStatusTransition(job.status, dto.status as RunnerJobStatus)) {
throw new BadRequestException(
`Invalid status transition from ${job.status} to ${dto.status}`
);
}
const updateData: Prisma.RunnerJobUpdateInput = {
status: dto.status as RunnerJobStatus,
};
// Set startedAt when transitioning to RUNNING
if (dto.status === CoordinatorJobStatus.RUNNING) {
updateData.startedAt = new Date();
}
const updatedJob = await this.prisma.runnerJob.update({
where: { id: jobId },
data: updateData,
});
// Emit appropriate event
if (dto.status === CoordinatorJobStatus.RUNNING) {
const event = await this.jobEvents.emitJobStarted(jobId, {
agentId: dto.agentId,
agentType: dto.agentType,
});
// Broadcast via Herald
await this.herald.broadcastJobEvent(jobId, event);
}
return updatedJob;
}
/**
* Update job progress from the coordinator
*/
async updateJobProgress(
jobId: string,
dto: UpdateJobProgressDto
): Promise<Awaited<ReturnType<typeof this.prisma.runnerJob.update>>> {
this.logger.log(`Updating job ${jobId} progress to ${String(dto.progressPercent)}%`);
// Verify job exists and is running
const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId },
select: { id: true, status: true },
});
if (!job) {
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
}
if (job.status !== RunnerJobStatus.RUNNING) {
throw new BadRequestException(`Cannot update progress for job with status ${job.status}`);
}
const updatedJob = await this.prisma.runnerJob.update({
where: { id: jobId },
data: { progressPercent: dto.progressPercent },
});
// Emit progress event
await this.jobEvents.emitEvent(jobId, {
type: JOB_PROGRESS,
actor: "coordinator",
payload: {
progressPercent: dto.progressPercent,
currentStep: dto.currentStep,
tokensUsed: dto.tokensUsed,
},
});
return updatedJob;
}
/**
* Mark job as completed from the coordinator
*/
async completeJob(
jobId: string,
dto: CompleteJobDto
): Promise<Awaited<ReturnType<typeof this.prisma.runnerJob.update>>> {
this.logger.log(`Completing job ${jobId}`);
// Verify job exists
const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId },
select: { id: true, status: true, startedAt: true },
});
if (!job) {
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
}
// Validate status transition
if (!this.isValidStatusTransition(job.status, RunnerJobStatus.COMPLETED)) {
throw new BadRequestException(`Cannot complete job with status ${job.status}`);
}
// Calculate duration if not provided
let durationSeconds = dto.durationSeconds;
if (durationSeconds === undefined && job.startedAt) {
durationSeconds = Math.round((new Date().getTime() - job.startedAt.getTime()) / 1000);
}
const updateData: Prisma.RunnerJobUpdateInput = {
status: RunnerJobStatus.COMPLETED,
progressPercent: 100,
completedAt: new Date(),
};
if (dto.result) {
updateData.result = dto.result as Prisma.InputJsonValue;
}
const updatedJob = await this.prisma.runnerJob.update({
where: { id: jobId },
data: updateData,
});
// Emit completion event
const event = await this.jobEvents.emitJobCompleted(jobId, {
result: dto.result,
tokensUsed: dto.tokensUsed,
durationSeconds,
});
// Broadcast via Herald
await this.herald.broadcastJobEvent(jobId, event);
return updatedJob;
}
/**
* Mark job as failed from the coordinator
*/
async failJob(
jobId: string,
dto: FailJobDto
): Promise<Awaited<ReturnType<typeof this.prisma.runnerJob.update>>> {
this.logger.log(`Failing job ${jobId}: ${dto.error}`);
// Verify job exists
const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId },
select: { id: true, status: true },
});
if (!job) {
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
}
// Validate status transition
if (!this.isValidStatusTransition(job.status, RunnerJobStatus.FAILED)) {
throw new BadRequestException(`Cannot fail job with status ${job.status}`);
}
const updatedJob = await this.prisma.runnerJob.update({
where: { id: jobId },
data: {
status: RunnerJobStatus.FAILED,
error: dto.error,
completedAt: new Date(),
},
});
// Emit failure event
const event = await this.jobEvents.emitJobFailed(jobId, {
error: dto.error,
gateResults: dto.gateResults,
failedStep: dto.failedStep,
continuationPrompt: dto.continuationPrompt,
});
// Broadcast via Herald
await this.herald.broadcastJobEvent(jobId, event);
return updatedJob;
}
/**
* Get job details with events and steps
*/
async getJobDetails(
jobId: string
): Promise<Awaited<ReturnType<typeof this.prisma.runnerJob.findUnique>>> {
const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId },
include: {
steps: {
orderBy: { ordinal: "asc" },
},
events: {
orderBy: { timestamp: "asc" },
},
},
});
if (!job) {
throw new NotFoundException(`RunnerJob with ID ${jobId} not found`);
}
return job;
}
/**
* Get integration health status
*/
async getIntegrationHealth(): Promise<CoordinatorHealthStatus> {
let bullmqStatus = { connected: false, queues: {} as Record<string, number> };
try {
bullmqStatus = await this.bullMq.getHealthStatus();
} catch (error) {
this.logger.error("Failed to get BullMQ health status", error);
}
return {
api: true,
bullmq: bullmqStatus,
timestamp: new Date(),
};
}
/**
* Validate status transitions
*/
private isValidStatusTransition(
currentStatus: RunnerJobStatus,
newStatus: RunnerJobStatus
): boolean {
// Define valid transitions
const validTransitions: Record<RunnerJobStatus, RunnerJobStatus[]> = {
[RunnerJobStatus.PENDING]: [
RunnerJobStatus.QUEUED,
RunnerJobStatus.RUNNING,
RunnerJobStatus.CANCELLED,
],
[RunnerJobStatus.QUEUED]: [RunnerJobStatus.RUNNING, RunnerJobStatus.CANCELLED],
[RunnerJobStatus.RUNNING]: [
RunnerJobStatus.COMPLETED,
RunnerJobStatus.FAILED,
RunnerJobStatus.CANCELLED,
],
[RunnerJobStatus.COMPLETED]: [],
[RunnerJobStatus.FAILED]: [],
[RunnerJobStatus.CANCELLED]: [],
};
return validTransitions[currentStatus].includes(newStatus);
}
}

View File

@@ -0,0 +1,20 @@
import { IsOptional, IsObject, IsNumber, Min } from "class-validator";
/**
* DTO for completing a job from the coordinator
*/
export class CompleteJobDto {
@IsOptional()
@IsObject()
result?: Record<string, unknown>;
@IsOptional()
@IsNumber()
@Min(0)
tokensUsed?: number;
@IsOptional()
@IsNumber()
@Min(0)
durationSeconds?: number;
}

View File

@@ -0,0 +1,28 @@
import { IsString, IsOptional, IsNumber, IsObject, Min, Max, IsUUID } from "class-validator";
/**
* DTO for creating a job from the coordinator
*/
export class CreateCoordinatorJobDto {
@IsUUID("4")
workspaceId!: string;
@IsString()
type!: string; // 'code-task', 'git-status', 'priority-calc'
@IsNumber()
issueNumber!: number;
@IsString()
repository!: string;
@IsOptional()
@IsNumber()
@Min(1)
@Max(100)
priority?: number;
@IsOptional()
@IsObject()
metadata?: Record<string, unknown>;
}

View File

@@ -0,0 +1,22 @@
import { IsString, IsOptional, IsObject } from "class-validator";
import type { QualityGateResult } from "../interfaces";
/**
* DTO for failing a job from the coordinator
*/
export class FailJobDto {
@IsString()
error!: string;
@IsOptional()
@IsObject()
gateResults?: QualityGateResult;
@IsOptional()
@IsString()
failedStep?: string;
@IsOptional()
@IsString()
continuationPrompt?: string;
}

View File

@@ -0,0 +1,5 @@
export * from "./create-coordinator-job.dto";
export * from "./update-job-status.dto";
export * from "./update-job-progress.dto";
export * from "./complete-job.dto";
export * from "./fail-job.dto";

View File

@@ -0,0 +1,19 @@
import { IsNumber, IsOptional, IsString, Min, Max } from "class-validator";
/**
* DTO for updating job progress from the coordinator
*/
export class UpdateJobProgressDto {
@IsNumber()
@Min(0)
@Max(100)
progressPercent!: number;
@IsOptional()
@IsString()
currentStep?: string;
@IsOptional()
@IsNumber()
tokensUsed?: number;
}

View File

@@ -0,0 +1,25 @@
import { IsString, IsOptional, IsEnum } from "class-validator";
/**
* Valid status values for coordinator status updates
*/
export enum CoordinatorJobStatus {
RUNNING = "RUNNING",
PENDING = "PENDING",
}
/**
* DTO for updating job status from the coordinator
*/
export class UpdateJobStatusDto {
@IsEnum(CoordinatorJobStatus)
status!: CoordinatorJobStatus;
@IsOptional()
@IsString()
agentId?: string;
@IsOptional()
@IsString()
agentType?: string;
}

View File

@@ -0,0 +1,5 @@
export * from "./coordinator-integration.module";
export * from "./coordinator-integration.service";
export * from "./coordinator-integration.controller";
export * from "./dto";
export * from "./interfaces";

View File

@@ -0,0 +1,41 @@
/**
* Result of job creation from coordinator
*/
export interface CoordinatorJobResult {
jobId: string;
status: string;
queueName: string;
estimatedStartTime?: Date;
}
/**
* Health status for coordinator integration
*/
export interface CoordinatorHealthStatus {
api: boolean;
bullmq: {
connected: boolean;
queues: Record<string, number>;
};
timestamp: Date;
}
/**
* Quality gate result from coordinator
*/
export interface QualityGateResult {
lint?: boolean;
typecheck?: boolean;
test?: boolean;
coverage?: boolean;
build?: boolean;
}
/**
* Agent assignment info from coordinator
*/
export interface AgentAssignment {
agentType: string; // 'sonnet', 'opus', 'haiku', 'glm'
agentId: string;
estimatedContext: number;
}

View File

@@ -0,0 +1 @@
export * from "./coordinator-job.interface";

View File

@@ -7,6 +7,7 @@
export const JOB_CREATED = "job.created";
export const JOB_QUEUED = "job.queued";
export const JOB_STARTED = "job.started";
export const JOB_PROGRESS = "job.progress";
export const JOB_COMPLETED = "job.completed";
export const JOB_FAILED = "job.failed";
export const JOB_CANCELLED = "job.cancelled";
@@ -36,6 +37,7 @@ export const ALL_EVENT_TYPES = [
JOB_CREATED,
JOB_QUEUED,
JOB_STARTED,
JOB_PROGRESS,
JOB_COMPLETED,
JOB_FAILED,
JOB_CANCELLED,

View File

@@ -0,0 +1,102 @@
# Issue #176: Coordinator Integration
## Objective
Integrate M4.2 infrastructure (NestJS API) with M4.1 coordinator (Python FastAPI) to enable seamless job orchestration between the two systems.
## Architecture Analysis
### M4.1 Coordinator (Python)
- FastAPI application at `apps/coordinator`
- Handles Gitea webhooks, queue management, agent orchestration
- Uses file-based JSON queue for persistence
- Has QueueManager, Coordinator, and OrchestrationLoop classes
- Exposes `/webhook/gitea` and `/health` endpoints
### M4.2 Infrastructure (NestJS)
- StitcherModule: Workflow orchestration, webhook handling, job dispatch
- RunnerJobsModule: CRUD for RunnerJob entities, BullMQ integration
- JobEventsModule: Event tracking and audit logging
- JobStepsModule: Step tracking for jobs
- HeraldModule: Status broadcasting to Discord
- BullMqModule: Queue infrastructure with Valkey backend
- BridgeModule: Discord integration
## Integration Design
### Flow 1: Webhook -> Job Creation
```
Gitea -> Coordinator (Python) -> NestJS API -> RunnerJob + BullMQ
^
| HTTP POST /api/coordinator/jobs
```
### Flow 2: Job Status Updates
```
Coordinator (Python) -> NestJS API -> JobEvent -> Herald -> Discord
^
| HTTP PATCH /api/coordinator/jobs/:id/status
```
### Flow 3: Job Completion
```
Coordinator (Python) -> NestJS API -> Complete RunnerJob -> Herald broadcast
^
| HTTP POST /api/coordinator/jobs/:id/complete
```
## Implementation Plan
### 1. Create Coordinator Integration Module
- `apps/api/src/coordinator-integration/`
- `coordinator-integration.module.ts` - NestJS module
- `coordinator-integration.controller.ts` - REST endpoints for Python coordinator
- `coordinator-integration.service.ts` - Business logic
- `dto/` - DTOs for coordinator communication
- `interfaces/` - Type definitions
### 2. Endpoints for Python Coordinator
- `POST /api/coordinator/jobs` - Create job from coordinator
- `PATCH /api/coordinator/jobs/:id/status` - Update job status
- `POST /api/coordinator/jobs/:id/complete` - Mark job complete
- `POST /api/coordinator/jobs/:id/fail` - Mark job failed
- `GET /api/coordinator/health` - Integration health check
### 3. Event Bridging
- When coordinator reports progress -> emit JobEvent
- When coordinator completes -> update RunnerJob + emit completion event
- Herald subscribes and broadcasts to Discord
## TDD Approach
1. Write tests for CoordinatorIntegrationService
2. Write tests for CoordinatorIntegrationController
3. Implement minimal code to pass tests
4. Refactor
## Progress
- [x] Analyze coordinator structure
- [x] Analyze M4.2 infrastructure
- [x] Design integration layer
- [x] Write failing tests for service
- [x] Implement service
- [x] Write failing tests for controller
- [x] Implement controller
- [x] Add DTOs and interfaces
- [x] Run quality gates
- [x] Commit
## Notes
- The Python coordinator uses httpx.AsyncClient for HTTP calls
- API auth can be handled via shared secret (API key)
- Events follow established patterns from job-events module