feat: agent cycle visibility — WebSocket emits + dashboard polling (#461)
- Wire WebSocketGateway into RunnerJobsService: emit job:created, job:status, and job:progress on create/cancel/retry/updateStatus/ updateProgress operations - Add 30s polling interval to dashboard page for near-real-time updates - Enhance OrchestratorSessions widget with progress bars and step status labels - Update test mocks with WebSocketGateway provider Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -4,6 +4,7 @@ import { RunnerJobsService } from "./runner-jobs.service";
|
|||||||
import { PrismaModule } from "../prisma/prisma.module";
|
import { PrismaModule } from "../prisma/prisma.module";
|
||||||
import { BullMqModule } from "../bullmq/bullmq.module";
|
import { BullMqModule } from "../bullmq/bullmq.module";
|
||||||
import { AuthModule } from "../auth/auth.module";
|
import { AuthModule } from "../auth/auth.module";
|
||||||
|
import { WebSocketModule } from "../websocket/websocket.module";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runner Jobs Module
|
* Runner Jobs Module
|
||||||
@@ -12,7 +13,7 @@ import { AuthModule } from "../auth/auth.module";
|
|||||||
* for asynchronous job processing.
|
* for asynchronous job processing.
|
||||||
*/
|
*/
|
||||||
@Module({
|
@Module({
|
||||||
imports: [PrismaModule, BullMqModule, AuthModule],
|
imports: [PrismaModule, BullMqModule, AuthModule, WebSocketModule],
|
||||||
controllers: [RunnerJobsController],
|
controllers: [RunnerJobsController],
|
||||||
providers: [RunnerJobsService],
|
providers: [RunnerJobsService],
|
||||||
exports: [RunnerJobsService],
|
exports: [RunnerJobsService],
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { Test, TestingModule } from "@nestjs/testing";
|
|||||||
import { RunnerJobsService } from "./runner-jobs.service";
|
import { RunnerJobsService } from "./runner-jobs.service";
|
||||||
import { PrismaService } from "../prisma/prisma.service";
|
import { PrismaService } from "../prisma/prisma.service";
|
||||||
import { BullMqService } from "../bullmq/bullmq.service";
|
import { BullMqService } from "../bullmq/bullmq.service";
|
||||||
|
import { WebSocketGateway } from "../websocket/websocket.gateway";
|
||||||
import { RunnerJobStatus } from "@prisma/client";
|
import { RunnerJobStatus } from "@prisma/client";
|
||||||
import { ConflictException, BadRequestException } from "@nestjs/common";
|
import { ConflictException, BadRequestException } from "@nestjs/common";
|
||||||
|
|
||||||
@@ -19,6 +20,12 @@ describe("RunnerJobsService - Concurrency", () => {
|
|||||||
getQueue: vi.fn(),
|
getQueue: vi.fn(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const mockWebSocketGateway = {
|
||||||
|
emitJobCreated: vi.fn(),
|
||||||
|
emitJobStatusChanged: vi.fn(),
|
||||||
|
emitJobProgress: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
const module: TestingModule = await Test.createTestingModule({
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
providers: [
|
providers: [
|
||||||
@@ -37,6 +44,10 @@ describe("RunnerJobsService - Concurrency", () => {
|
|||||||
provide: BullMqService,
|
provide: BullMqService,
|
||||||
useValue: mockBullMqService,
|
useValue: mockBullMqService,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
provide: WebSocketGateway,
|
||||||
|
useValue: mockWebSocketGateway,
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { Test, TestingModule } from "@nestjs/testing";
|
|||||||
import { RunnerJobsService } from "./runner-jobs.service";
|
import { RunnerJobsService } from "./runner-jobs.service";
|
||||||
import { PrismaService } from "../prisma/prisma.service";
|
import { PrismaService } from "../prisma/prisma.service";
|
||||||
import { BullMqService } from "../bullmq/bullmq.service";
|
import { BullMqService } from "../bullmq/bullmq.service";
|
||||||
|
import { WebSocketGateway } from "../websocket/websocket.gateway";
|
||||||
import { RunnerJobStatus } from "@prisma/client";
|
import { RunnerJobStatus } from "@prisma/client";
|
||||||
import { NotFoundException, BadRequestException } from "@nestjs/common";
|
import { NotFoundException, BadRequestException } from "@nestjs/common";
|
||||||
import { CreateJobDto, QueryJobsDto } from "./dto";
|
import { CreateJobDto, QueryJobsDto } from "./dto";
|
||||||
@@ -32,6 +33,12 @@ describe("RunnerJobsService", () => {
|
|||||||
getQueue: vi.fn(),
|
getQueue: vi.fn(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const mockWebSocketGateway = {
|
||||||
|
emitJobCreated: vi.fn(),
|
||||||
|
emitJobStatusChanged: vi.fn(),
|
||||||
|
emitJobProgress: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
const module: TestingModule = await Test.createTestingModule({
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
providers: [
|
providers: [
|
||||||
@@ -44,6 +51,10 @@ describe("RunnerJobsService", () => {
|
|||||||
provide: BullMqService,
|
provide: BullMqService,
|
||||||
useValue: mockBullMqService,
|
useValue: mockBullMqService,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
provide: WebSocketGateway,
|
||||||
|
useValue: mockWebSocketGateway,
|
||||||
|
},
|
||||||
],
|
],
|
||||||
}).compile();
|
}).compile();
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { Prisma, RunnerJobStatus } from "@prisma/client";
|
|||||||
import { Response } from "express";
|
import { Response } from "express";
|
||||||
import { PrismaService } from "../prisma/prisma.service";
|
import { PrismaService } from "../prisma/prisma.service";
|
||||||
import { BullMqService } from "../bullmq/bullmq.service";
|
import { BullMqService } from "../bullmq/bullmq.service";
|
||||||
|
import { WebSocketGateway } from "../websocket/websocket.gateway";
|
||||||
import { QUEUE_NAMES } from "../bullmq/queues";
|
import { QUEUE_NAMES } from "../bullmq/queues";
|
||||||
import { ConcurrentUpdateException } from "../common/exceptions/concurrent-update.exception";
|
import { ConcurrentUpdateException } from "../common/exceptions/concurrent-update.exception";
|
||||||
import type { CreateJobDto, QueryJobsDto } from "./dto";
|
import type { CreateJobDto, QueryJobsDto } from "./dto";
|
||||||
@@ -14,7 +15,8 @@ import type { CreateJobDto, QueryJobsDto } from "./dto";
|
|||||||
export class RunnerJobsService {
|
export class RunnerJobsService {
|
||||||
constructor(
|
constructor(
|
||||||
private readonly prisma: PrismaService,
|
private readonly prisma: PrismaService,
|
||||||
private readonly bullMq: BullMqService
|
private readonly bullMq: BullMqService,
|
||||||
|
private readonly wsGateway: WebSocketGateway
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -56,6 +58,8 @@ export class RunnerJobsService {
|
|||||||
{ priority }
|
{ priority }
|
||||||
);
|
);
|
||||||
|
|
||||||
|
this.wsGateway.emitJobCreated(workspaceId, job);
|
||||||
|
|
||||||
return job;
|
return job;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -194,6 +198,13 @@ export class RunnerJobsService {
|
|||||||
throw new NotFoundException(`RunnerJob with ID ${id} not found after cancel`);
|
throw new NotFoundException(`RunnerJob with ID ${id} not found after cancel`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.wsGateway.emitJobStatusChanged(workspaceId, id, {
|
||||||
|
id,
|
||||||
|
workspaceId,
|
||||||
|
status: job.status,
|
||||||
|
previousStatus: existingJob.status,
|
||||||
|
});
|
||||||
|
|
||||||
return job;
|
return job;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -248,6 +259,8 @@ export class RunnerJobsService {
|
|||||||
{ priority: existingJob.priority }
|
{ priority: existingJob.priority }
|
||||||
);
|
);
|
||||||
|
|
||||||
|
this.wsGateway.emitJobCreated(workspaceId, newJob);
|
||||||
|
|
||||||
return newJob;
|
return newJob;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -530,6 +543,13 @@ export class RunnerJobsService {
|
|||||||
throw new NotFoundException(`RunnerJob with ID ${id} not found after update`);
|
throw new NotFoundException(`RunnerJob with ID ${id} not found after update`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.wsGateway.emitJobStatusChanged(workspaceId, id, {
|
||||||
|
id,
|
||||||
|
workspaceId,
|
||||||
|
status: updatedJob.status,
|
||||||
|
previousStatus: existingJob.status,
|
||||||
|
});
|
||||||
|
|
||||||
return updatedJob;
|
return updatedJob;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -606,6 +626,12 @@ export class RunnerJobsService {
|
|||||||
throw new NotFoundException(`RunnerJob with ID ${id} not found after update`);
|
throw new NotFoundException(`RunnerJob with ID ${id} not found after update`);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.wsGateway.emitJobProgress(workspaceId, id, {
|
||||||
|
id,
|
||||||
|
workspaceId,
|
||||||
|
progressPercent: updatedJob.progressPercent,
|
||||||
|
});
|
||||||
|
|
||||||
return updatedJob;
|
return updatedJob;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -53,6 +53,28 @@ export default function DashboardPage(): ReactElement {
|
|||||||
};
|
};
|
||||||
}, [workspaceId]);
|
}, [workspaceId]);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
if (!workspaceId) return;
|
||||||
|
|
||||||
|
let cancelled = false;
|
||||||
|
const wsId = workspaceId;
|
||||||
|
|
||||||
|
const interval = setInterval(() => {
|
||||||
|
fetchDashboardSummary(wsId)
|
||||||
|
.then((summary) => {
|
||||||
|
if (!cancelled) setData(summary);
|
||||||
|
})
|
||||||
|
.catch((err: unknown) => {
|
||||||
|
console.error("[Dashboard] Refresh failed:", err);
|
||||||
|
});
|
||||||
|
}, 30_000);
|
||||||
|
|
||||||
|
return (): void => {
|
||||||
|
cancelled = true;
|
||||||
|
clearInterval(interval);
|
||||||
|
};
|
||||||
|
}, [workspaceId]);
|
||||||
|
|
||||||
if (isLoading) {
|
if (isLoading) {
|
||||||
return (
|
return (
|
||||||
<div style={{ display: "flex", flexDirection: "column", gap: 16 }}>
|
<div style={{ display: "flex", flexDirection: "column", gap: 16 }}>
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ interface AgentNode {
|
|||||||
name: string;
|
name: string;
|
||||||
task: string;
|
task: string;
|
||||||
status: DotVariant;
|
status: DotVariant;
|
||||||
|
statusLabel: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface OrchestratorSession {
|
interface OrchestratorSession {
|
||||||
@@ -36,6 +37,7 @@ interface OrchestratorSession {
|
|||||||
badge: string;
|
badge: string;
|
||||||
badgeVariant: BadgeVariant;
|
badgeVariant: BadgeVariant;
|
||||||
duration: string;
|
duration: string;
|
||||||
|
progress: number;
|
||||||
agents: AgentNode[];
|
agents: AgentNode[];
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -105,6 +107,7 @@ function mapJobToSession(job: ActiveJob): OrchestratorSession {
|
|||||||
name: step.name,
|
name: step.name,
|
||||||
task: `Phase: ${step.phase}`,
|
task: `Phase: ${step.phase}`,
|
||||||
status: statusToDotVariant(step.status),
|
status: statusToDotVariant(step.status),
|
||||||
|
statusLabel: step.status.toLowerCase(),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
return {
|
return {
|
||||||
@@ -114,6 +117,7 @@ function mapJobToSession(job: ActiveJob): OrchestratorSession {
|
|||||||
badge: job.status,
|
badge: job.status,
|
||||||
badgeVariant: statusToBadgeVariant(job.status),
|
badgeVariant: statusToBadgeVariant(job.status),
|
||||||
duration: formatDuration(job.createdAt),
|
duration: formatDuration(job.createdAt),
|
||||||
|
progress: job.progressPercent,
|
||||||
agents,
|
agents,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
@@ -192,6 +196,16 @@ function AgentNodeItem({ agent }: AgentNodeItemProps): ReactElement {
|
|||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
<Dot variant={agent.status} />
|
<Dot variant={agent.status} />
|
||||||
|
<span
|
||||||
|
style={{
|
||||||
|
fontSize: "0.65rem",
|
||||||
|
fontFamily: "var(--mono)",
|
||||||
|
color: "var(--muted)",
|
||||||
|
textTransform: "uppercase",
|
||||||
|
}}
|
||||||
|
>
|
||||||
|
{agent.statusLabel}
|
||||||
|
</span>
|
||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@@ -251,6 +265,27 @@ function OrchCard({ session }: OrchCardProps): ReactElement {
|
|||||||
{session.duration}
|
{session.duration}
|
||||||
</span>
|
</span>
|
||||||
</div>
|
</div>
|
||||||
|
{session.progress > 0 && (
|
||||||
|
<div
|
||||||
|
style={{
|
||||||
|
height: 4,
|
||||||
|
borderRadius: 2,
|
||||||
|
background: "var(--border)",
|
||||||
|
marginBottom: 10,
|
||||||
|
overflow: "hidden",
|
||||||
|
}}
|
||||||
|
>
|
||||||
|
<div
|
||||||
|
style={{
|
||||||
|
height: "100%",
|
||||||
|
width: `${String(session.progress)}%`,
|
||||||
|
background: "var(--primary)",
|
||||||
|
borderRadius: 2,
|
||||||
|
transition: "width 0.3s ease",
|
||||||
|
}}
|
||||||
|
/>
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
<div style={{ display: "flex", flexDirection: "column", gap: 6 }}>
|
<div style={{ display: "flex", flexDirection: "column", gap: 6 }}>
|
||||||
{session.agents.map((agent) => (
|
{session.agents.map((agent) => (
|
||||||
<AgentNodeItem key={agent.id} agent={agent} />
|
<AgentNodeItem key={agent.id} agent={agent} />
|
||||||
|
|||||||
@@ -11,6 +11,6 @@
|
|||||||
| MS-P2-001 | done | phase-2 | Create dashboard summary API endpoint: aggregate task counts, project counts, recent activity, active jobs in single call | — | issue #459, commit e38aaa9, 7 files +430 lines |
|
| MS-P2-001 | done | phase-2 | Create dashboard summary API endpoint: aggregate task counts, project counts, recent activity, active jobs in single call | — | issue #459, commit e38aaa9, 7 files +430 lines |
|
||||||
| MS-P2-002 | done | phase-2 | Wire dashboard widgets to real API data: ActivityFeed, DashboardMetrics, OrchestratorSessions replace mock with API calls | — | issue #459, commit 7c762e6 + remediation |
|
| MS-P2-002 | done | phase-2 | Wire dashboard widgets to real API data: ActivityFeed, DashboardMetrics, OrchestratorSessions replace mock with API calls | — | issue #459, commit 7c762e6 + remediation |
|
||||||
| MS-P2-003 | done | phase-2 | Phase verification: create task via API, confirm visible in dashboard, all quality gates pass | — | issue #459, lint 8/8 typecheck 7/7 test 8/8 |
|
| MS-P2-003 | done | phase-2 | Phase verification: create task via API, confirm visible in dashboard, all quality gates pass | — | issue #459, lint 8/8 typecheck 7/7 test 8/8 |
|
||||||
| MS-P3-001 | not-started | phase-3 | Wire WebSocket emits into RunnerJobsService: broadcast job status/progress/step events to workspace rooms | — | issue #461, est 20K |
|
| MS-P3-001 | in-progress | phase-3 | Wire WebSocket emits into RunnerJobsService: broadcast job status/progress/step events to workspace rooms | — | issue #461, est 20K |
|
||||||
| MS-P3-002 | not-started | phase-3 | Dashboard auto-refresh + enhanced OrchestratorSessions: polling interval, progress bars, step status indicators, timestamps | — | issue #461, est 25K |
|
| MS-P3-002 | in-progress | phase-3 | Dashboard auto-refresh + enhanced OrchestratorSessions: polling interval, progress bars, step status indicators, timestamps | — | issue #461, est 25K |
|
||||||
| MS-P3-003 | not-started | phase-3 | Phase verification: all quality gates pass, demonstrate agent job cycle visibility end-to-end | — | issue #461, est 10K, depends MS-P3-001+002 |
|
| MS-P3-003 | not-started | phase-3 | Phase verification: all quality gates pass, demonstrate agent job cycle visibility end-to-end | — | issue #461, est 10K, depends MS-P3-001+002 |
|
||||||
|
|||||||
Reference in New Issue
Block a user