feat(orchestrator): add recent events API and monitor script

This commit is contained in:
Jason Woltje
2026-02-17 15:44:43 -06:00
parent 3258cd4f4d
commit 4d089cd020
8 changed files with 209 additions and 0 deletions

View File

@@ -52,6 +52,7 @@ Monitored via `apps/web/` (Agent Dashboard).
| POST | `/agents/:agentId/kill` | Kill a single agent |
| POST | `/agents/kill-all` | Kill all active agents |
| GET | `/agents/events` | SSE lifecycle/task events |
| GET | `/agents/events/recent` | Recent events (polling) |
### Queue

View File

@@ -4,11 +4,13 @@ import { ValkeyService } from "../../valkey/valkey.service";
import type { EventHandler, OrchestratorEvent } from "../../valkey/types";
type UnsubscribeFn = () => void;
const MAX_RECENT_EVENTS = 500;
@Injectable()
export class AgentEventsService implements OnModuleInit {
private readonly logger = new Logger(AgentEventsService.name);
private readonly subscribers = new Map<string, EventHandler>();
private readonly recentEvents: OrchestratorEvent[] = [];
private connected = false;
constructor(private readonly valkeyService: ValkeyService) {}
@@ -18,6 +20,7 @@ export class AgentEventsService implements OnModuleInit {
await this.valkeyService.subscribeToEvents(
(event) => {
this.appendRecentEvent(event);
this.subscribers.forEach((handler) => {
void handler(event);
});
@@ -67,4 +70,20 @@ export class AgentEventsService implements OnModuleInit {
},
};
}
getRecentEvents(limit = 100): OrchestratorEvent[] {
const safeLimit = Math.min(Math.max(Math.floor(limit), 1), MAX_RECENT_EVENTS);
if (safeLimit >= this.recentEvents.length) {
return [...this.recentEvents];
}
return this.recentEvents.slice(-safeLimit);
}
private appendRecentEvent(event: OrchestratorEvent): void {
this.recentEvents.push(event);
if (this.recentEvents.length > MAX_RECENT_EVENTS) {
this.recentEvents.shift();
}
}
}

View File

@@ -28,6 +28,7 @@ describe("AgentsController", () => {
subscribe: ReturnType<typeof vi.fn>;
getInitialSnapshot: ReturnType<typeof vi.fn>;
createHeartbeat: ReturnType<typeof vi.fn>;
getRecentEvents: ReturnType<typeof vi.fn>;
};
beforeEach(() => {
@@ -65,6 +66,7 @@ describe("AgentsController", () => {
timestamp: new Date().toISOString(),
data: { heartbeat: true },
}),
getRecentEvents: vi.fn().mockReturnValue([]),
};
// Create controller with mocked services
@@ -362,4 +364,39 @@ describe("AgentsController", () => {
});
});
});
describe("getRecentEvents", () => {
it("should return recent events with default limit", () => {
eventsService.getRecentEvents.mockReturnValue([
{
type: "task.completed",
timestamp: "2026-02-17T15:00:00.000Z",
taskId: "task-123",
},
]);
const result = controller.getRecentEvents();
expect(eventsService.getRecentEvents).toHaveBeenCalledWith(100);
expect(result).toEqual({
events: [
{
type: "task.completed",
timestamp: "2026-02-17T15:00:00.000Z",
taskId: "task-123",
},
],
});
});
it("should parse and pass custom limit", () => {
controller.getRecentEvents("25");
expect(eventsService.getRecentEvents).toHaveBeenCalledWith(25);
});
it("should fallback to default when limit is invalid", () => {
controller.getRecentEvents("invalid");
expect(eventsService.getRecentEvents).toHaveBeenCalledWith(100);
});
});
});

View File

@@ -13,6 +13,7 @@ import {
ParseUUIDPipe,
Sse,
MessageEvent,
Query,
} from "@nestjs/common";
import { Throttle } from "@nestjs/throttler";
import { Observable } from "rxjs";
@@ -128,6 +129,20 @@ export class AgentsController {
});
}
/**
* Return recent orchestrator events for non-streaming consumers.
*/
@Get("events/recent")
@Throttle({ status: { limit: 200, ttl: 60000 } })
getRecentEvents(@Query("limit") limit?: string): {
events: ReturnType<AgentEventsService["getRecentEvents"]>;
} {
const parsedLimit = Number.parseInt(limit ?? "100", 10);
return {
events: this.eventsService.getRecentEvents(Number.isNaN(parsedLimit) ? 100 : parsedLimit),
};
}
/**
* List all agents
* @returns Array of all agent sessions with their status

View File

@@ -0,0 +1,47 @@
import { NextResponse } from "next/server";
import type { NextRequest } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export async function GET(request: NextRequest): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
const limit = request.nextUrl.searchParams.get("limit");
const query = limit ? `?limit=${encodeURIComponent(limit)}` : "";
try {
const response = await fetch(`${getOrchestratorUrl()}/agents/events/recent${query}`, {
method: "GET",
headers: {
"Content-Type": "application/json",
"X-API-Key": orchestratorApiKey,
},
cache: "no-store",
});
const text = await response.text();
return new NextResponse(text, {
status: response.status,
headers: {
"Content-Type": response.headers.get("Content-Type") ?? "application/json",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}