All checks were successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
212 lines
5.3 KiB
TypeScript
212 lines
5.3 KiB
TypeScript
import { Controller, Get, Query, Res, UseGuards } from "@nestjs/common";
|
|
import { AgentStatus } from "@prisma/client";
|
|
import type { Response } from "express";
|
|
import { AuthGuard } from "../auth/guards/auth.guard";
|
|
import { PrismaService } from "../prisma/prisma.service";
|
|
|
|
const AGENT_POLL_INTERVAL_MS = 5_000;
|
|
const SSE_HEARTBEAT_MS = 15_000;
|
|
const DEFAULT_EVENTS_LIMIT = 25;
|
|
|
|
interface OrchestratorAgentDto {
|
|
id: string;
|
|
name: string | null;
|
|
status: AgentStatus;
|
|
type: string;
|
|
createdAt: Date;
|
|
}
|
|
|
|
interface OrchestratorEventDto {
|
|
type: string;
|
|
timestamp: string;
|
|
agentId?: string;
|
|
taskId?: string;
|
|
data?: Record<string, unknown>;
|
|
}
|
|
|
|
interface OrchestratorHealthDto {
|
|
status: "healthy" | "degraded" | "unhealthy";
|
|
database: "connected" | "disconnected";
|
|
agents: {
|
|
total: number;
|
|
working: number;
|
|
idle: number;
|
|
errored: number;
|
|
};
|
|
timestamp: string;
|
|
}
|
|
|
|
@Controller("orchestrator")
|
|
@UseGuards(AuthGuard)
|
|
export class OrchestratorController {
|
|
constructor(private readonly prisma: PrismaService) {}
|
|
|
|
@Get("agents")
|
|
async getAgents(): Promise<OrchestratorAgentDto[]> {
|
|
return this.fetchActiveAgents();
|
|
}
|
|
|
|
@Get("events/recent")
|
|
async getRecentEvents(
|
|
@Query("limit") limit?: string
|
|
): Promise<{ events: OrchestratorEventDto[] }> {
|
|
const eventsLimit = limit ? parseInt(limit, 10) : DEFAULT_EVENTS_LIMIT;
|
|
const safeLimit = Math.min(Math.max(eventsLimit, 1), 100);
|
|
|
|
// Fetch recent agent activity to derive events
|
|
const agents = await this.prisma.agent.findMany({
|
|
where: {
|
|
status: {
|
|
not: AgentStatus.TERMINATED,
|
|
},
|
|
},
|
|
orderBy: {
|
|
createdAt: "desc",
|
|
},
|
|
take: safeLimit,
|
|
});
|
|
|
|
// Derive events from agent status changes
|
|
const events: OrchestratorEventDto[] = agents.map((agent) => ({
|
|
type: `agent:${agent.status.toLowerCase()}`,
|
|
timestamp: agent.createdAt.toISOString(),
|
|
agentId: agent.id,
|
|
data: {
|
|
name: agent.name,
|
|
role: agent.role,
|
|
model: agent.model,
|
|
},
|
|
}));
|
|
|
|
return { events };
|
|
}
|
|
|
|
@Get("health")
|
|
async getHealth(): Promise<OrchestratorHealthDto> {
|
|
let databaseConnected = false;
|
|
let agents: OrchestratorAgentDto[] = [];
|
|
|
|
try {
|
|
// Check database connectivity
|
|
await this.prisma.$queryRaw`SELECT 1`;
|
|
databaseConnected = true;
|
|
|
|
// Get agent counts
|
|
agents = await this.fetchActiveAgents();
|
|
} catch {
|
|
databaseConnected = false;
|
|
}
|
|
|
|
const working = agents.filter((a) => a.status === AgentStatus.WORKING).length;
|
|
const idle = agents.filter((a) => a.status === AgentStatus.IDLE).length;
|
|
const errored = agents.filter((a) => a.status === AgentStatus.ERROR).length;
|
|
|
|
let status: OrchestratorHealthDto["status"] = "healthy";
|
|
if (!databaseConnected) {
|
|
status = "unhealthy";
|
|
} else if (errored > 0) {
|
|
status = "degraded";
|
|
}
|
|
|
|
return {
|
|
status,
|
|
database: databaseConnected ? "connected" : "disconnected",
|
|
agents: {
|
|
total: agents.length,
|
|
working,
|
|
idle,
|
|
errored,
|
|
},
|
|
timestamp: new Date().toISOString(),
|
|
};
|
|
}
|
|
|
|
@Get("events")
|
|
async streamEvents(@Res() res: Response): Promise<void> {
|
|
res.setHeader("Content-Type", "text/event-stream");
|
|
res.setHeader("Cache-Control", "no-cache");
|
|
res.setHeader("Connection", "keep-alive");
|
|
res.setHeader("X-Accel-Buffering", "no");
|
|
|
|
if (typeof res.flushHeaders === "function") {
|
|
res.flushHeaders();
|
|
}
|
|
|
|
let isClosed = false;
|
|
let previousSnapshot = "";
|
|
|
|
const emitSnapshotIfChanged = async (): Promise<void> => {
|
|
if (isClosed) {
|
|
return;
|
|
}
|
|
|
|
try {
|
|
const agents = await this.fetchActiveAgents();
|
|
const snapshot = JSON.stringify(agents);
|
|
|
|
if (snapshot !== previousSnapshot) {
|
|
previousSnapshot = snapshot;
|
|
res.write(
|
|
`data: ${JSON.stringify({
|
|
type: "agents:updated",
|
|
agents,
|
|
timestamp: new Date().toISOString(),
|
|
})}\n\n`
|
|
);
|
|
}
|
|
} catch (error: unknown) {
|
|
const message = error instanceof Error ? error.message : String(error);
|
|
res.write(`event: error\n`);
|
|
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
|
|
}
|
|
};
|
|
|
|
await emitSnapshotIfChanged();
|
|
|
|
const pollInterval = setInterval(() => {
|
|
void emitSnapshotIfChanged();
|
|
}, AGENT_POLL_INTERVAL_MS);
|
|
|
|
const heartbeatInterval = setInterval(() => {
|
|
if (!isClosed) {
|
|
res.write(": keepalive\n\n");
|
|
}
|
|
}, SSE_HEARTBEAT_MS);
|
|
|
|
res.on("close", () => {
|
|
isClosed = true;
|
|
clearInterval(pollInterval);
|
|
clearInterval(heartbeatInterval);
|
|
res.end();
|
|
});
|
|
}
|
|
|
|
private async fetchActiveAgents(): Promise<OrchestratorAgentDto[]> {
|
|
const agents = await this.prisma.agent.findMany({
|
|
where: {
|
|
status: {
|
|
not: AgentStatus.TERMINATED,
|
|
},
|
|
},
|
|
orderBy: {
|
|
createdAt: "desc",
|
|
},
|
|
select: {
|
|
id: true,
|
|
name: true,
|
|
status: true,
|
|
role: true,
|
|
createdAt: true,
|
|
},
|
|
});
|
|
|
|
return agents.map((agent) => ({
|
|
id: agent.id,
|
|
name: agent.name,
|
|
status: agent.status,
|
|
type: agent.role ?? "agent",
|
|
createdAt: agent.createdAt,
|
|
}));
|
|
}
|
|
}
|