diff --git a/apps/api/src/runner-jobs/runner-jobs.module.ts b/apps/api/src/runner-jobs/runner-jobs.module.ts index 828fff2..0cda3f7 100644 --- a/apps/api/src/runner-jobs/runner-jobs.module.ts +++ b/apps/api/src/runner-jobs/runner-jobs.module.ts @@ -4,6 +4,7 @@ import { RunnerJobsService } from "./runner-jobs.service"; import { PrismaModule } from "../prisma/prisma.module"; import { BullMqModule } from "../bullmq/bullmq.module"; import { AuthModule } from "../auth/auth.module"; +import { WebSocketModule } from "../websocket/websocket.module"; /** * Runner Jobs Module @@ -12,7 +13,7 @@ import { AuthModule } from "../auth/auth.module"; * for asynchronous job processing. */ @Module({ - imports: [PrismaModule, BullMqModule, AuthModule], + imports: [PrismaModule, BullMqModule, AuthModule, WebSocketModule], controllers: [RunnerJobsController], providers: [RunnerJobsService], exports: [RunnerJobsService], diff --git a/apps/api/src/runner-jobs/runner-jobs.service.concurrency.spec.ts b/apps/api/src/runner-jobs/runner-jobs.service.concurrency.spec.ts index c5b4d54..9e96baf 100644 --- a/apps/api/src/runner-jobs/runner-jobs.service.concurrency.spec.ts +++ b/apps/api/src/runner-jobs/runner-jobs.service.concurrency.spec.ts @@ -3,6 +3,7 @@ import { Test, TestingModule } from "@nestjs/testing"; import { RunnerJobsService } from "./runner-jobs.service"; import { PrismaService } from "../prisma/prisma.service"; import { BullMqService } from "../bullmq/bullmq.service"; +import { WebSocketGateway } from "../websocket/websocket.gateway"; import { RunnerJobStatus } from "@prisma/client"; import { ConflictException, BadRequestException } from "@nestjs/common"; @@ -19,6 +20,12 @@ describe("RunnerJobsService - Concurrency", () => { getQueue: vi.fn(), }; + const mockWebSocketGateway = { + emitJobCreated: vi.fn(), + emitJobStatusChanged: vi.fn(), + emitJobProgress: vi.fn(), + }; + beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ providers: [ @@ -37,6 +44,10 @@ describe("RunnerJobsService - Concurrency", () => { provide: BullMqService, useValue: mockBullMqService, }, + { + provide: WebSocketGateway, + useValue: mockWebSocketGateway, + }, ], }).compile(); diff --git a/apps/api/src/runner-jobs/runner-jobs.service.spec.ts b/apps/api/src/runner-jobs/runner-jobs.service.spec.ts index 2632192..ac0ca10 100644 --- a/apps/api/src/runner-jobs/runner-jobs.service.spec.ts +++ b/apps/api/src/runner-jobs/runner-jobs.service.spec.ts @@ -3,6 +3,7 @@ import { Test, TestingModule } from "@nestjs/testing"; import { RunnerJobsService } from "./runner-jobs.service"; import { PrismaService } from "../prisma/prisma.service"; import { BullMqService } from "../bullmq/bullmq.service"; +import { WebSocketGateway } from "../websocket/websocket.gateway"; import { RunnerJobStatus } from "@prisma/client"; import { NotFoundException, BadRequestException } from "@nestjs/common"; import { CreateJobDto, QueryJobsDto } from "./dto"; @@ -32,6 +33,12 @@ describe("RunnerJobsService", () => { getQueue: vi.fn(), }; + const mockWebSocketGateway = { + emitJobCreated: vi.fn(), + emitJobStatusChanged: vi.fn(), + emitJobProgress: vi.fn(), + }; + beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ providers: [ @@ -44,6 +51,10 @@ describe("RunnerJobsService", () => { provide: BullMqService, useValue: mockBullMqService, }, + { + provide: WebSocketGateway, + useValue: mockWebSocketGateway, + }, ], }).compile(); diff --git a/apps/api/src/runner-jobs/runner-jobs.service.ts b/apps/api/src/runner-jobs/runner-jobs.service.ts index 8149a23..02d340d 100644 --- a/apps/api/src/runner-jobs/runner-jobs.service.ts +++ b/apps/api/src/runner-jobs/runner-jobs.service.ts @@ -3,6 +3,7 @@ import { Prisma, RunnerJobStatus } from "@prisma/client"; import { Response } from "express"; import { PrismaService } from "../prisma/prisma.service"; import { BullMqService } from "../bullmq/bullmq.service"; +import { WebSocketGateway } from "../websocket/websocket.gateway"; import { QUEUE_NAMES } from "../bullmq/queues"; import { ConcurrentUpdateException } from "../common/exceptions/concurrent-update.exception"; import type { CreateJobDto, QueryJobsDto } from "./dto"; @@ -14,7 +15,8 @@ import type { CreateJobDto, QueryJobsDto } from "./dto"; export class RunnerJobsService { constructor( private readonly prisma: PrismaService, - private readonly bullMq: BullMqService + private readonly bullMq: BullMqService, + private readonly wsGateway: WebSocketGateway ) {} /** @@ -56,6 +58,8 @@ export class RunnerJobsService { { priority } ); + this.wsGateway.emitJobCreated(workspaceId, job); + return job; } @@ -194,6 +198,13 @@ export class RunnerJobsService { throw new NotFoundException(`RunnerJob with ID ${id} not found after cancel`); } + this.wsGateway.emitJobStatusChanged(workspaceId, id, { + id, + workspaceId, + status: job.status, + previousStatus: existingJob.status, + }); + return job; }); } @@ -248,6 +259,8 @@ export class RunnerJobsService { { priority: existingJob.priority } ); + this.wsGateway.emitJobCreated(workspaceId, newJob); + return newJob; } @@ -530,6 +543,13 @@ export class RunnerJobsService { throw new NotFoundException(`RunnerJob with ID ${id} not found after update`); } + this.wsGateway.emitJobStatusChanged(workspaceId, id, { + id, + workspaceId, + status: updatedJob.status, + previousStatus: existingJob.status, + }); + return updatedJob; }); } @@ -606,6 +626,12 @@ export class RunnerJobsService { throw new NotFoundException(`RunnerJob with ID ${id} not found after update`); } + this.wsGateway.emitJobProgress(workspaceId, id, { + id, + workspaceId, + progressPercent: updatedJob.progressPercent, + }); + return updatedJob; }); } diff --git a/apps/web/src/app/(authenticated)/page.tsx b/apps/web/src/app/(authenticated)/page.tsx index 2105b48..dbaf667 100644 --- a/apps/web/src/app/(authenticated)/page.tsx +++ b/apps/web/src/app/(authenticated)/page.tsx @@ -53,6 +53,28 @@ export default function DashboardPage(): ReactElement { }; }, [workspaceId]); + useEffect(() => { + if (!workspaceId) return; + + let cancelled = false; + const wsId = workspaceId; + + const interval = setInterval(() => { + fetchDashboardSummary(wsId) + .then((summary) => { + if (!cancelled) setData(summary); + }) + .catch((err: unknown) => { + console.error("[Dashboard] Refresh failed:", err); + }); + }, 30_000); + + return (): void => { + cancelled = true; + clearInterval(interval); + }; + }, [workspaceId]); + if (isLoading) { return (