Phase 3: Agent Cycle Visibility (#461) (#462)
All checks were successful
ci/woodpecker/push/api Pipeline was successful
ci/woodpecker/push/web Pipeline was successful

Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #462.
This commit is contained in:
2026-02-23 01:07:29 +00:00
committed by jason.woltje
parent 7581d26567
commit 458cac7cdd
9 changed files with 136 additions and 12 deletions

View File

@@ -4,6 +4,7 @@ import { RunnerJobsService } from "./runner-jobs.service";
import { PrismaModule } from "../prisma/prisma.module";
import { BullMqModule } from "../bullmq/bullmq.module";
import { AuthModule } from "../auth/auth.module";
import { WebSocketModule } from "../websocket/websocket.module";
/**
* Runner Jobs Module
@@ -12,7 +13,7 @@ import { AuthModule } from "../auth/auth.module";
* for asynchronous job processing.
*/
@Module({
imports: [PrismaModule, BullMqModule, AuthModule],
imports: [PrismaModule, BullMqModule, AuthModule, WebSocketModule],
controllers: [RunnerJobsController],
providers: [RunnerJobsService],
exports: [RunnerJobsService],

View File

@@ -3,6 +3,7 @@ import { Test, TestingModule } from "@nestjs/testing";
import { RunnerJobsService } from "./runner-jobs.service";
import { PrismaService } from "../prisma/prisma.service";
import { BullMqService } from "../bullmq/bullmq.service";
import { WebSocketGateway } from "../websocket/websocket.gateway";
import { RunnerJobStatus } from "@prisma/client";
import { ConflictException, BadRequestException } from "@nestjs/common";
@@ -19,6 +20,12 @@ describe("RunnerJobsService - Concurrency", () => {
getQueue: vi.fn(),
};
const mockWebSocketGateway = {
emitJobCreated: vi.fn(),
emitJobStatusChanged: vi.fn(),
emitJobProgress: vi.fn(),
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
@@ -37,6 +44,10 @@ describe("RunnerJobsService - Concurrency", () => {
provide: BullMqService,
useValue: mockBullMqService,
},
{
provide: WebSocketGateway,
useValue: mockWebSocketGateway,
},
],
}).compile();

View File

@@ -3,6 +3,7 @@ import { Test, TestingModule } from "@nestjs/testing";
import { RunnerJobsService } from "./runner-jobs.service";
import { PrismaService } from "../prisma/prisma.service";
import { BullMqService } from "../bullmq/bullmq.service";
import { WebSocketGateway } from "../websocket/websocket.gateway";
import { RunnerJobStatus } from "@prisma/client";
import { NotFoundException, BadRequestException } from "@nestjs/common";
import { CreateJobDto, QueryJobsDto } from "./dto";
@@ -32,6 +33,12 @@ describe("RunnerJobsService", () => {
getQueue: vi.fn(),
};
const mockWebSocketGateway = {
emitJobCreated: vi.fn(),
emitJobStatusChanged: vi.fn(),
emitJobProgress: vi.fn(),
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
@@ -44,6 +51,10 @@ describe("RunnerJobsService", () => {
provide: BullMqService,
useValue: mockBullMqService,
},
{
provide: WebSocketGateway,
useValue: mockWebSocketGateway,
},
],
}).compile();

View File

@@ -3,6 +3,7 @@ import { Prisma, RunnerJobStatus } from "@prisma/client";
import { Response } from "express";
import { PrismaService } from "../prisma/prisma.service";
import { BullMqService } from "../bullmq/bullmq.service";
import { WebSocketGateway } from "../websocket/websocket.gateway";
import { QUEUE_NAMES } from "../bullmq/queues";
import { ConcurrentUpdateException } from "../common/exceptions/concurrent-update.exception";
import type { CreateJobDto, QueryJobsDto } from "./dto";
@@ -14,7 +15,8 @@ import type { CreateJobDto, QueryJobsDto } from "./dto";
export class RunnerJobsService {
constructor(
private readonly prisma: PrismaService,
private readonly bullMq: BullMqService
private readonly bullMq: BullMqService,
private readonly wsGateway: WebSocketGateway
) {}
/**
@@ -56,6 +58,8 @@ export class RunnerJobsService {
{ priority }
);
this.wsGateway.emitJobCreated(workspaceId, job);
return job;
}
@@ -194,6 +198,13 @@ export class RunnerJobsService {
throw new NotFoundException(`RunnerJob with ID ${id} not found after cancel`);
}
this.wsGateway.emitJobStatusChanged(workspaceId, id, {
id,
workspaceId,
status: job.status,
previousStatus: existingJob.status,
});
return job;
});
}
@@ -248,6 +259,8 @@ export class RunnerJobsService {
{ priority: existingJob.priority }
);
this.wsGateway.emitJobCreated(workspaceId, newJob);
return newJob;
}
@@ -530,6 +543,13 @@ export class RunnerJobsService {
throw new NotFoundException(`RunnerJob with ID ${id} not found after update`);
}
this.wsGateway.emitJobStatusChanged(workspaceId, id, {
id,
workspaceId,
status: updatedJob.status,
previousStatus: existingJob.status,
});
return updatedJob;
});
}
@@ -606,6 +626,12 @@ export class RunnerJobsService {
throw new NotFoundException(`RunnerJob with ID ${id} not found after update`);
}
this.wsGateway.emitJobProgress(workspaceId, id, {
id,
workspaceId,
progressPercent: updatedJob.progressPercent,
});
return updatedJob;
});
}