fix(api): widget throttling and orchestrator endpoints (#624)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #624.
This commit is contained in:
194
apps/api/src/orchestrator/orchestrator.controller.spec.ts
Normal file
194
apps/api/src/orchestrator/orchestrator.controller.spec.ts
Normal file
@@ -0,0 +1,194 @@
|
||||
import { beforeEach, describe, expect, it, vi, afterEach } from "vitest";
|
||||
import type { Response } from "express";
|
||||
import { AgentStatus } from "@prisma/client";
|
||||
import { OrchestratorController } from "./orchestrator.controller";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
import { AuthGuard } from "../auth/guards/auth.guard";
|
||||
|
||||
describe("OrchestratorController", () => {
|
||||
const mockPrismaService = {
|
||||
agent: {
|
||||
findMany: vi.fn(),
|
||||
},
|
||||
};
|
||||
|
||||
let controller: OrchestratorController;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
controller = new OrchestratorController(mockPrismaService as unknown as PrismaService);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
describe("getAgents", () => {
|
||||
it("returns active agents with API widget shape", async () => {
|
||||
mockPrismaService.agent.findMany.mockResolvedValue([
|
||||
{
|
||||
id: "agent-1",
|
||||
name: "Planner",
|
||||
status: AgentStatus.WORKING,
|
||||
role: "planner",
|
||||
createdAt: new Date("2026-02-28T10:00:00.000Z"),
|
||||
},
|
||||
]);
|
||||
|
||||
const result = await controller.getAgents();
|
||||
|
||||
expect(result).toEqual([
|
||||
{
|
||||
id: "agent-1",
|
||||
name: "Planner",
|
||||
status: AgentStatus.WORKING,
|
||||
type: "planner",
|
||||
createdAt: new Date("2026-02-28T10:00:00.000Z"),
|
||||
},
|
||||
]);
|
||||
|
||||
expect(mockPrismaService.agent.findMany).toHaveBeenCalledWith({
|
||||
where: {
|
||||
status: {
|
||||
not: AgentStatus.TERMINATED,
|
||||
},
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: "desc",
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
name: true,
|
||||
status: true,
|
||||
role: true,
|
||||
createdAt: true,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("falls back to type=agent when role is missing", async () => {
|
||||
mockPrismaService.agent.findMany.mockResolvedValue([
|
||||
{
|
||||
id: "agent-2",
|
||||
name: null,
|
||||
status: AgentStatus.IDLE,
|
||||
role: null,
|
||||
createdAt: new Date("2026-02-28T11:00:00.000Z"),
|
||||
},
|
||||
]);
|
||||
|
||||
const result = await controller.getAgents();
|
||||
|
||||
expect(result[0]).toMatchObject({
|
||||
id: "agent-2",
|
||||
type: "agent",
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("streamEvents", () => {
|
||||
it("sets SSE headers and writes initial data payload", async () => {
|
||||
const onHandlers: Record<string, (() => void) | undefined> = {};
|
||||
const mockRes = {
|
||||
setHeader: vi.fn(),
|
||||
write: vi.fn(),
|
||||
end: vi.fn(),
|
||||
on: vi.fn((event: string, handler: () => void) => {
|
||||
onHandlers[event] = handler;
|
||||
return mockRes;
|
||||
}),
|
||||
} as unknown as Response;
|
||||
|
||||
mockPrismaService.agent.findMany.mockResolvedValue([
|
||||
{
|
||||
id: "agent-1",
|
||||
name: "Worker",
|
||||
status: AgentStatus.WORKING,
|
||||
role: "worker",
|
||||
createdAt: new Date("2026-02-28T12:00:00.000Z"),
|
||||
},
|
||||
]);
|
||||
|
||||
await controller.streamEvents(mockRes);
|
||||
|
||||
expect(mockRes.setHeader).toHaveBeenCalledWith("Content-Type", "text/event-stream");
|
||||
expect(mockRes.setHeader).toHaveBeenCalledWith("Cache-Control", "no-cache");
|
||||
expect(mockRes.setHeader).toHaveBeenCalledWith("Connection", "keep-alive");
|
||||
expect(mockRes.setHeader).toHaveBeenCalledWith("X-Accel-Buffering", "no");
|
||||
|
||||
expect(mockRes.write).toHaveBeenCalledWith(
|
||||
expect.stringContaining('"type":"agents:updated"')
|
||||
);
|
||||
expect(typeof onHandlers.close).toBe("function");
|
||||
});
|
||||
|
||||
it("polls every 5 seconds and only emits when payload changes", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
const onHandlers: Record<string, (() => void) | undefined> = {};
|
||||
const mockRes = {
|
||||
setHeader: vi.fn(),
|
||||
write: vi.fn(),
|
||||
end: vi.fn(),
|
||||
on: vi.fn((event: string, handler: () => void) => {
|
||||
onHandlers[event] = handler;
|
||||
return mockRes;
|
||||
}),
|
||||
} as unknown as Response;
|
||||
|
||||
const firstPayload = [
|
||||
{
|
||||
id: "agent-1",
|
||||
name: "Worker",
|
||||
status: AgentStatus.WORKING,
|
||||
role: "worker",
|
||||
createdAt: new Date("2026-02-28T12:00:00.000Z"),
|
||||
},
|
||||
];
|
||||
const secondPayload = [
|
||||
{
|
||||
id: "agent-1",
|
||||
name: "Worker",
|
||||
status: AgentStatus.WAITING,
|
||||
role: "worker",
|
||||
createdAt: new Date("2026-02-28T12:00:00.000Z"),
|
||||
},
|
||||
];
|
||||
|
||||
mockPrismaService.agent.findMany
|
||||
.mockResolvedValueOnce(firstPayload)
|
||||
.mockResolvedValueOnce(firstPayload)
|
||||
.mockResolvedValueOnce(secondPayload);
|
||||
|
||||
await controller.streamEvents(mockRes);
|
||||
|
||||
// 1 initial data event
|
||||
const getDataEventCalls = () =>
|
||||
mockRes.write.mock.calls.filter(
|
||||
(call) => typeof call[0] === "string" && call[0].startsWith("data: ")
|
||||
);
|
||||
|
||||
expect(getDataEventCalls()).toHaveLength(1);
|
||||
|
||||
// No change after first poll => no new data event
|
||||
await vi.advanceTimersByTimeAsync(5000);
|
||||
expect(getDataEventCalls()).toHaveLength(1);
|
||||
|
||||
// Status changed on second poll => emits new data event
|
||||
await vi.advanceTimersByTimeAsync(5000);
|
||||
expect(getDataEventCalls()).toHaveLength(2);
|
||||
|
||||
onHandlers.close?.();
|
||||
expect(mockRes.end).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("security", () => {
|
||||
it("uses AuthGuard at the controller level", () => {
|
||||
const guards = Reflect.getMetadata("__guards__", OrchestratorController) as unknown[];
|
||||
const guardClasses = guards.map((guard) => guard);
|
||||
|
||||
expect(guardClasses).toContain(AuthGuard);
|
||||
});
|
||||
});
|
||||
});
|
||||
115
apps/api/src/orchestrator/orchestrator.controller.ts
Normal file
115
apps/api/src/orchestrator/orchestrator.controller.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
import { Controller, Get, Res, UseGuards } from "@nestjs/common";
|
||||
import { AgentStatus } from "@prisma/client";
|
||||
import type { Response } from "express";
|
||||
import { AuthGuard } from "../auth/guards/auth.guard";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
|
||||
const AGENT_POLL_INTERVAL_MS = 5_000;
|
||||
const SSE_HEARTBEAT_MS = 15_000;
|
||||
|
||||
interface OrchestratorAgentDto {
|
||||
id: string;
|
||||
name: string | null;
|
||||
status: AgentStatus;
|
||||
type: string;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
@Controller("orchestrator")
|
||||
@UseGuards(AuthGuard)
|
||||
export class OrchestratorController {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
@Get("agents")
|
||||
async getAgents(): Promise<OrchestratorAgentDto[]> {
|
||||
return this.fetchActiveAgents();
|
||||
}
|
||||
|
||||
@Get("events")
|
||||
async streamEvents(@Res() res: Response): Promise<void> {
|
||||
res.setHeader("Content-Type", "text/event-stream");
|
||||
res.setHeader("Cache-Control", "no-cache");
|
||||
res.setHeader("Connection", "keep-alive");
|
||||
res.setHeader("X-Accel-Buffering", "no");
|
||||
|
||||
if (typeof res.flushHeaders === "function") {
|
||||
res.flushHeaders();
|
||||
}
|
||||
|
||||
let isClosed = false;
|
||||
let previousSnapshot = "";
|
||||
|
||||
const emitSnapshotIfChanged = async (): Promise<void> => {
|
||||
if (isClosed) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const agents = await this.fetchActiveAgents();
|
||||
const snapshot = JSON.stringify(agents);
|
||||
|
||||
if (snapshot !== previousSnapshot) {
|
||||
previousSnapshot = snapshot;
|
||||
res.write(
|
||||
`data: ${JSON.stringify({
|
||||
type: "agents:updated",
|
||||
agents,
|
||||
timestamp: new Date().toISOString(),
|
||||
})}\n\n`
|
||||
);
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
res.write(`event: error\n`);
|
||||
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
|
||||
}
|
||||
};
|
||||
|
||||
await emitSnapshotIfChanged();
|
||||
|
||||
const pollInterval = setInterval(() => {
|
||||
void emitSnapshotIfChanged();
|
||||
}, AGENT_POLL_INTERVAL_MS);
|
||||
|
||||
const heartbeatInterval = setInterval(() => {
|
||||
if (!isClosed) {
|
||||
res.write(": keepalive\n\n");
|
||||
}
|
||||
}, SSE_HEARTBEAT_MS);
|
||||
|
||||
res.on("close", () => {
|
||||
isClosed = true;
|
||||
clearInterval(pollInterval);
|
||||
clearInterval(heartbeatInterval);
|
||||
res.end();
|
||||
});
|
||||
}
|
||||
|
||||
private async fetchActiveAgents(): Promise<OrchestratorAgentDto[]> {
|
||||
const agents = await this.prisma.agent.findMany({
|
||||
where: {
|
||||
status: {
|
||||
not: AgentStatus.TERMINATED,
|
||||
},
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: "desc",
|
||||
},
|
||||
select: {
|
||||
id: true,
|
||||
name: true,
|
||||
status: true,
|
||||
role: true,
|
||||
createdAt: true,
|
||||
},
|
||||
});
|
||||
|
||||
return agents.map((agent) => ({
|
||||
id: agent.id,
|
||||
name: agent.name,
|
||||
status: agent.status,
|
||||
type: agent.role ?? "agent",
|
||||
createdAt: agent.createdAt,
|
||||
}));
|
||||
}
|
||||
}
|
||||
10
apps/api/src/orchestrator/orchestrator.module.ts
Normal file
10
apps/api/src/orchestrator/orchestrator.module.ts
Normal file
@@ -0,0 +1,10 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { AuthModule } from "../auth/auth.module";
|
||||
import { PrismaModule } from "../prisma/prisma.module";
|
||||
import { OrchestratorController } from "./orchestrator.controller";
|
||||
|
||||
@Module({
|
||||
imports: [AuthModule, PrismaModule],
|
||||
controllers: [OrchestratorController],
|
||||
})
|
||||
export class OrchestratorModule {}
|
||||
Reference in New Issue
Block a user