feat: agent cycle visibility — WebSocket emits + dashboard polling (#461)

- Wire WebSocketGateway into RunnerJobsService: emit job:created,
  job:status, and job:progress on create/cancel/retry/updateStatus/
  updateProgress operations
- Add 30s polling interval to dashboard page for near-real-time updates
- Enhance OrchestratorSessions widget with progress bars and step
  status labels
- Update test mocks with WebSocketGateway provider

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-22 19:02:51 -06:00
parent 63178df643
commit 5d3045ab4f
7 changed files with 110 additions and 4 deletions

View File

@@ -4,6 +4,7 @@ import { RunnerJobsService } from "./runner-jobs.service";
import { PrismaModule } from "../prisma/prisma.module"; import { PrismaModule } from "../prisma/prisma.module";
import { BullMqModule } from "../bullmq/bullmq.module"; import { BullMqModule } from "../bullmq/bullmq.module";
import { AuthModule } from "../auth/auth.module"; import { AuthModule } from "../auth/auth.module";
import { WebSocketModule } from "../websocket/websocket.module";
/** /**
* Runner Jobs Module * Runner Jobs Module
@@ -12,7 +13,7 @@ import { AuthModule } from "../auth/auth.module";
* for asynchronous job processing. * for asynchronous job processing.
*/ */
@Module({ @Module({
imports: [PrismaModule, BullMqModule, AuthModule], imports: [PrismaModule, BullMqModule, AuthModule, WebSocketModule],
controllers: [RunnerJobsController], controllers: [RunnerJobsController],
providers: [RunnerJobsService], providers: [RunnerJobsService],
exports: [RunnerJobsService], exports: [RunnerJobsService],

View File

@@ -3,6 +3,7 @@ import { Test, TestingModule } from "@nestjs/testing";
import { RunnerJobsService } from "./runner-jobs.service"; import { RunnerJobsService } from "./runner-jobs.service";
import { PrismaService } from "../prisma/prisma.service"; import { PrismaService } from "../prisma/prisma.service";
import { BullMqService } from "../bullmq/bullmq.service"; import { BullMqService } from "../bullmq/bullmq.service";
import { WebSocketGateway } from "../websocket/websocket.gateway";
import { RunnerJobStatus } from "@prisma/client"; import { RunnerJobStatus } from "@prisma/client";
import { ConflictException, BadRequestException } from "@nestjs/common"; import { ConflictException, BadRequestException } from "@nestjs/common";
@@ -19,6 +20,12 @@ describe("RunnerJobsService - Concurrency", () => {
getQueue: vi.fn(), getQueue: vi.fn(),
}; };
const mockWebSocketGateway = {
emitJobCreated: vi.fn(),
emitJobStatusChanged: vi.fn(),
emitJobProgress: vi.fn(),
};
beforeEach(async () => { beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [
@@ -37,6 +44,10 @@ describe("RunnerJobsService - Concurrency", () => {
provide: BullMqService, provide: BullMqService,
useValue: mockBullMqService, useValue: mockBullMqService,
}, },
{
provide: WebSocketGateway,
useValue: mockWebSocketGateway,
},
], ],
}).compile(); }).compile();

View File

@@ -3,6 +3,7 @@ import { Test, TestingModule } from "@nestjs/testing";
import { RunnerJobsService } from "./runner-jobs.service"; import { RunnerJobsService } from "./runner-jobs.service";
import { PrismaService } from "../prisma/prisma.service"; import { PrismaService } from "../prisma/prisma.service";
import { BullMqService } from "../bullmq/bullmq.service"; import { BullMqService } from "../bullmq/bullmq.service";
import { WebSocketGateway } from "../websocket/websocket.gateway";
import { RunnerJobStatus } from "@prisma/client"; import { RunnerJobStatus } from "@prisma/client";
import { NotFoundException, BadRequestException } from "@nestjs/common"; import { NotFoundException, BadRequestException } from "@nestjs/common";
import { CreateJobDto, QueryJobsDto } from "./dto"; import { CreateJobDto, QueryJobsDto } from "./dto";
@@ -32,6 +33,12 @@ describe("RunnerJobsService", () => {
getQueue: vi.fn(), getQueue: vi.fn(),
}; };
const mockWebSocketGateway = {
emitJobCreated: vi.fn(),
emitJobStatusChanged: vi.fn(),
emitJobProgress: vi.fn(),
};
beforeEach(async () => { beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [
@@ -44,6 +51,10 @@ describe("RunnerJobsService", () => {
provide: BullMqService, provide: BullMqService,
useValue: mockBullMqService, useValue: mockBullMqService,
}, },
{
provide: WebSocketGateway,
useValue: mockWebSocketGateway,
},
], ],
}).compile(); }).compile();

View File

@@ -3,6 +3,7 @@ import { Prisma, RunnerJobStatus } from "@prisma/client";
import { Response } from "express"; import { Response } from "express";
import { PrismaService } from "../prisma/prisma.service"; import { PrismaService } from "../prisma/prisma.service";
import { BullMqService } from "../bullmq/bullmq.service"; import { BullMqService } from "../bullmq/bullmq.service";
import { WebSocketGateway } from "../websocket/websocket.gateway";
import { QUEUE_NAMES } from "../bullmq/queues"; import { QUEUE_NAMES } from "../bullmq/queues";
import { ConcurrentUpdateException } from "../common/exceptions/concurrent-update.exception"; import { ConcurrentUpdateException } from "../common/exceptions/concurrent-update.exception";
import type { CreateJobDto, QueryJobsDto } from "./dto"; import type { CreateJobDto, QueryJobsDto } from "./dto";
@@ -14,7 +15,8 @@ import type { CreateJobDto, QueryJobsDto } from "./dto";
export class RunnerJobsService { export class RunnerJobsService {
constructor( constructor(
private readonly prisma: PrismaService, private readonly prisma: PrismaService,
private readonly bullMq: BullMqService private readonly bullMq: BullMqService,
private readonly wsGateway: WebSocketGateway
) {} ) {}
/** /**
@@ -56,6 +58,8 @@ export class RunnerJobsService {
{ priority } { priority }
); );
this.wsGateway.emitJobCreated(workspaceId, job);
return job; return job;
} }
@@ -194,6 +198,13 @@ export class RunnerJobsService {
throw new NotFoundException(`RunnerJob with ID ${id} not found after cancel`); throw new NotFoundException(`RunnerJob with ID ${id} not found after cancel`);
} }
this.wsGateway.emitJobStatusChanged(workspaceId, id, {
id,
workspaceId,
status: job.status,
previousStatus: existingJob.status,
});
return job; return job;
}); });
} }
@@ -248,6 +259,8 @@ export class RunnerJobsService {
{ priority: existingJob.priority } { priority: existingJob.priority }
); );
this.wsGateway.emitJobCreated(workspaceId, newJob);
return newJob; return newJob;
} }
@@ -530,6 +543,13 @@ export class RunnerJobsService {
throw new NotFoundException(`RunnerJob with ID ${id} not found after update`); throw new NotFoundException(`RunnerJob with ID ${id} not found after update`);
} }
this.wsGateway.emitJobStatusChanged(workspaceId, id, {
id,
workspaceId,
status: updatedJob.status,
previousStatus: existingJob.status,
});
return updatedJob; return updatedJob;
}); });
} }
@@ -606,6 +626,12 @@ export class RunnerJobsService {
throw new NotFoundException(`RunnerJob with ID ${id} not found after update`); throw new NotFoundException(`RunnerJob with ID ${id} not found after update`);
} }
this.wsGateway.emitJobProgress(workspaceId, id, {
id,
workspaceId,
progressPercent: updatedJob.progressPercent,
});
return updatedJob; return updatedJob;
}); });
} }

View File

@@ -53,6 +53,28 @@ export default function DashboardPage(): ReactElement {
}; };
}, [workspaceId]); }, [workspaceId]);
useEffect(() => {
if (!workspaceId) return;
let cancelled = false;
const wsId = workspaceId;
const interval = setInterval(() => {
fetchDashboardSummary(wsId)
.then((summary) => {
if (!cancelled) setData(summary);
})
.catch((err: unknown) => {
console.error("[Dashboard] Refresh failed:", err);
});
}, 30_000);
return (): void => {
cancelled = true;
clearInterval(interval);
};
}, [workspaceId]);
if (isLoading) { if (isLoading) {
return ( return (
<div style={{ display: "flex", flexDirection: "column", gap: 16 }}> <div style={{ display: "flex", flexDirection: "column", gap: 16 }}>

View File

@@ -27,6 +27,7 @@ interface AgentNode {
name: string; name: string;
task: string; task: string;
status: DotVariant; status: DotVariant;
statusLabel: string;
} }
interface OrchestratorSession { interface OrchestratorSession {
@@ -36,6 +37,7 @@ interface OrchestratorSession {
badge: string; badge: string;
badgeVariant: BadgeVariant; badgeVariant: BadgeVariant;
duration: string; duration: string;
progress: number;
agents: AgentNode[]; agents: AgentNode[];
} }
@@ -105,6 +107,7 @@ function mapJobToSession(job: ActiveJob): OrchestratorSession {
name: step.name, name: step.name,
task: `Phase: ${step.phase}`, task: `Phase: ${step.phase}`,
status: statusToDotVariant(step.status), status: statusToDotVariant(step.status),
statusLabel: step.status.toLowerCase(),
})); }));
return { return {
@@ -114,6 +117,7 @@ function mapJobToSession(job: ActiveJob): OrchestratorSession {
badge: job.status, badge: job.status,
badgeVariant: statusToBadgeVariant(job.status), badgeVariant: statusToBadgeVariant(job.status),
duration: formatDuration(job.createdAt), duration: formatDuration(job.createdAt),
progress: job.progressPercent,
agents, agents,
}; };
} }
@@ -192,6 +196,16 @@ function AgentNodeItem({ agent }: AgentNodeItemProps): ReactElement {
</div> </div>
</div> </div>
<Dot variant={agent.status} /> <Dot variant={agent.status} />
<span
style={{
fontSize: "0.65rem",
fontFamily: "var(--mono)",
color: "var(--muted)",
textTransform: "uppercase",
}}
>
{agent.statusLabel}
</span>
</div> </div>
); );
} }
@@ -251,6 +265,27 @@ function OrchCard({ session }: OrchCardProps): ReactElement {
{session.duration} {session.duration}
</span> </span>
</div> </div>
{session.progress > 0 && (
<div
style={{
height: 4,
borderRadius: 2,
background: "var(--border)",
marginBottom: 10,
overflow: "hidden",
}}
>
<div
style={{
height: "100%",
width: `${String(session.progress)}%`,
background: "var(--primary)",
borderRadius: 2,
transition: "width 0.3s ease",
}}
/>
</div>
)}
<div style={{ display: "flex", flexDirection: "column", gap: 6 }}> <div style={{ display: "flex", flexDirection: "column", gap: 6 }}>
{session.agents.map((agent) => ( {session.agents.map((agent) => (
<AgentNodeItem key={agent.id} agent={agent} /> <AgentNodeItem key={agent.id} agent={agent} />

View File

@@ -11,6 +11,6 @@
| MS-P2-001 | done | phase-2 | Create dashboard summary API endpoint: aggregate task counts, project counts, recent activity, active jobs in single call | — | issue #459, commit e38aaa9, 7 files +430 lines | | MS-P2-001 | done | phase-2 | Create dashboard summary API endpoint: aggregate task counts, project counts, recent activity, active jobs in single call | — | issue #459, commit e38aaa9, 7 files +430 lines |
| MS-P2-002 | done | phase-2 | Wire dashboard widgets to real API data: ActivityFeed, DashboardMetrics, OrchestratorSessions replace mock with API calls | — | issue #459, commit 7c762e6 + remediation | | MS-P2-002 | done | phase-2 | Wire dashboard widgets to real API data: ActivityFeed, DashboardMetrics, OrchestratorSessions replace mock with API calls | — | issue #459, commit 7c762e6 + remediation |
| MS-P2-003 | done | phase-2 | Phase verification: create task via API, confirm visible in dashboard, all quality gates pass | — | issue #459, lint 8/8 typecheck 7/7 test 8/8 | | MS-P2-003 | done | phase-2 | Phase verification: create task via API, confirm visible in dashboard, all quality gates pass | — | issue #459, lint 8/8 typecheck 7/7 test 8/8 |
| MS-P3-001 | not-started | phase-3 | Wire WebSocket emits into RunnerJobsService: broadcast job status/progress/step events to workspace rooms | — | issue #461, est 20K | | MS-P3-001 | in-progress | phase-3 | Wire WebSocket emits into RunnerJobsService: broadcast job status/progress/step events to workspace rooms | — | issue #461, est 20K |
| MS-P3-002 | not-started | phase-3 | Dashboard auto-refresh + enhanced OrchestratorSessions: polling interval, progress bars, step status indicators, timestamps | — | issue #461, est 25K | | MS-P3-002 | in-progress | phase-3 | Dashboard auto-refresh + enhanced OrchestratorSessions: polling interval, progress bars, step status indicators, timestamps | — | issue #461, est 25K |
| MS-P3-003 | not-started | phase-3 | Phase verification: all quality gates pass, demonstrate agent job cycle visibility end-to-end | — | issue #461, est 10K, depends MS-P3-001+002 | | MS-P3-003 | not-started | phase-3 | Phase verification: all quality gates pass, demonstrate agent job cycle visibility end-to-end | — | issue #461, est 10K, depends MS-P3-001+002 |