All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
116 lines
2.9 KiB
TypeScript
116 lines
2.9 KiB
TypeScript
import { Controller, Get, Res, UseGuards } from "@nestjs/common";
|
|
import { AgentStatus } from "@prisma/client";
|
|
import type { Response } from "express";
|
|
import { AuthGuard } from "../auth/guards/auth.guard";
|
|
import { PrismaService } from "../prisma/prisma.service";
|
|
|
|
const AGENT_POLL_INTERVAL_MS = 5_000;
|
|
const SSE_HEARTBEAT_MS = 15_000;
|
|
|
|
interface OrchestratorAgentDto {
|
|
id: string;
|
|
name: string | null;
|
|
status: AgentStatus;
|
|
type: string;
|
|
createdAt: Date;
|
|
}
|
|
|
|
@Controller("orchestrator")
|
|
@UseGuards(AuthGuard)
|
|
export class OrchestratorController {
|
|
constructor(private readonly prisma: PrismaService) {}
|
|
|
|
@Get("agents")
|
|
async getAgents(): Promise<OrchestratorAgentDto[]> {
|
|
return this.fetchActiveAgents();
|
|
}
|
|
|
|
@Get("events")
|
|
async streamEvents(@Res() res: Response): Promise<void> {
|
|
res.setHeader("Content-Type", "text/event-stream");
|
|
res.setHeader("Cache-Control", "no-cache");
|
|
res.setHeader("Connection", "keep-alive");
|
|
res.setHeader("X-Accel-Buffering", "no");
|
|
|
|
if (typeof res.flushHeaders === "function") {
|
|
res.flushHeaders();
|
|
}
|
|
|
|
let isClosed = false;
|
|
let previousSnapshot = "";
|
|
|
|
const emitSnapshotIfChanged = async (): Promise<void> => {
|
|
if (isClosed) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const agents = await this.fetchActiveAgents();
|
|
const snapshot = JSON.stringify(agents);
|
|
|
|
if (snapshot !== previousSnapshot) {
|
|
previousSnapshot = snapshot;
|
|
res.write(
|
|
`data: ${JSON.stringify({
|
|
type: "agents:updated",
|
|
agents,
|
|
timestamp: new Date().toISOString(),
|
|
})}\n\n`
|
|
);
|
|
}
|
|
} catch (error: unknown) {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
res.write(`event: error\n`);
|
|
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
|
|
}
|
|
};
|
|
|
|
await emitSnapshotIfChanged();
|
|
|
|
const pollInterval = setInterval(() => {
|
|
void emitSnapshotIfChanged();
|
|
}, AGENT_POLL_INTERVAL_MS);
|
|
|
|
const heartbeatInterval = setInterval(() => {
|
|
if (!isClosed) {
|
|
res.write(": keepalive\n\n");
|
|
}
|
|
}, SSE_HEARTBEAT_MS);
|
|
|
|
res.on("close", () => {
|
|
isClosed = true;
|
|
clearInterval(pollInterval);
|
|
clearInterval(heartbeatInterval);
|
|
res.end();
|
|
});
|
|
}
|
|
|
|
private async fetchActiveAgents(): Promise<OrchestratorAgentDto[]> {
|
|
const agents = await this.prisma.agent.findMany({
|
|
where: {
|
|
status: {
|
|
not: AgentStatus.TERMINATED,
|
|
},
|
|
},
|
|
orderBy: {
|
|
createdAt: "desc",
|
|
},
|
|
select: {
|
|
id: true,
|
|
name: true,
|
|
status: true,
|
|
role: true,
|
|
createdAt: true,
|
|
},
|
|
});
|
|
|
|
return agents.map((agent) => ({
|
|
id: agent.id,
|
|
name: agent.name,
|
|
status: agent.status,
|
|
type: agent.role ?? "agent",
|
|
createdAt: agent.createdAt,
|
|
}));
|
|
}
|
|
}
|