From 4d089cd0200c11332c1de86f6e3a591af5efb757 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Tue, 17 Feb 2026 15:44:43 -0600 Subject: [PATCH] feat(orchestrator): add recent events API and monitor script --- AGENTS.md | 2 + apps/orchestrator/README.md | 1 + .../src/api/agents/agent-events.service.ts | 19 +++++ .../src/api/agents/agents.controller.spec.ts | 37 ++++++++++ .../src/api/agents/agents.controller.ts | 15 ++++ .../api/orchestrator/events/recent/route.ts | 47 ++++++++++++ docs/tasks.md | 16 +++++ scripts/agent/orchestrator-events.sh | 72 +++++++++++++++++++ 8 files changed, 209 insertions(+) create mode 100644 apps/web/src/app/api/orchestrator/events/recent/route.ts create mode 100755 scripts/agent/orchestrator-events.sh diff --git a/AGENTS.md b/AGENTS.md index 036762a..bd92747 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -25,6 +25,8 @@ Optional: ```bash bash scripts/agent/log-limitation.sh "Short Name" +bash scripts/agent/orchestrator-daemon.sh status +bash scripts/agent/orchestrator-events.sh recent --limit 50 ``` ## Repo Context diff --git a/apps/orchestrator/README.md b/apps/orchestrator/README.md index 0f470c9..ada1408 100644 --- a/apps/orchestrator/README.md +++ b/apps/orchestrator/README.md @@ -52,6 +52,7 @@ Monitored via `apps/web/` (Agent Dashboard). | POST | `/agents/:agentId/kill` | Kill a single agent | | POST | `/agents/kill-all` | Kill all active agents | | GET | `/agents/events` | SSE lifecycle/task events | +| GET | `/agents/events/recent` | Recent events (polling) | ### Queue diff --git a/apps/orchestrator/src/api/agents/agent-events.service.ts b/apps/orchestrator/src/api/agents/agent-events.service.ts index 9d91168..1941309 100644 --- a/apps/orchestrator/src/api/agents/agent-events.service.ts +++ b/apps/orchestrator/src/api/agents/agent-events.service.ts @@ -4,11 +4,13 @@ import { ValkeyService } from "../../valkey/valkey.service"; import type { EventHandler, OrchestratorEvent } from "../../valkey/types"; type UnsubscribeFn = () => void; +const MAX_RECENT_EVENTS = 500; @Injectable() export class AgentEventsService implements OnModuleInit { private readonly logger = new Logger(AgentEventsService.name); private readonly subscribers = new Map(); + private readonly recentEvents: OrchestratorEvent[] = []; private connected = false; constructor(private readonly valkeyService: ValkeyService) {} @@ -18,6 +20,7 @@ export class AgentEventsService implements OnModuleInit { await this.valkeyService.subscribeToEvents( (event) => { + this.appendRecentEvent(event); this.subscribers.forEach((handler) => { void handler(event); }); @@ -67,4 +70,20 @@ export class AgentEventsService implements OnModuleInit { }, }; } + + getRecentEvents(limit = 100): OrchestratorEvent[] { + const safeLimit = Math.min(Math.max(Math.floor(limit), 1), MAX_RECENT_EVENTS); + if (safeLimit >= this.recentEvents.length) { + return [...this.recentEvents]; + } + + return this.recentEvents.slice(-safeLimit); + } + + private appendRecentEvent(event: OrchestratorEvent): void { + this.recentEvents.push(event); + if (this.recentEvents.length > MAX_RECENT_EVENTS) { + this.recentEvents.shift(); + } + } } diff --git a/apps/orchestrator/src/api/agents/agents.controller.spec.ts b/apps/orchestrator/src/api/agents/agents.controller.spec.ts index 5cf6b42..75393ec 100644 --- a/apps/orchestrator/src/api/agents/agents.controller.spec.ts +++ b/apps/orchestrator/src/api/agents/agents.controller.spec.ts @@ -28,6 +28,7 @@ describe("AgentsController", () => { subscribe: ReturnType; getInitialSnapshot: ReturnType; createHeartbeat: ReturnType; + getRecentEvents: ReturnType; }; beforeEach(() => { @@ -65,6 +66,7 @@ describe("AgentsController", () => { timestamp: new Date().toISOString(), data: { heartbeat: true }, }), + getRecentEvents: vi.fn().mockReturnValue([]), }; // Create controller with mocked services @@ -362,4 +364,39 @@ describe("AgentsController", () => { }); }); }); + + describe("getRecentEvents", () => { + it("should return recent events with default limit", () => { + eventsService.getRecentEvents.mockReturnValue([ + { + type: "task.completed", + timestamp: "2026-02-17T15:00:00.000Z", + taskId: "task-123", + }, + ]); + + const result = controller.getRecentEvents(); + + expect(eventsService.getRecentEvents).toHaveBeenCalledWith(100); + expect(result).toEqual({ + events: [ + { + type: "task.completed", + timestamp: "2026-02-17T15:00:00.000Z", + taskId: "task-123", + }, + ], + }); + }); + + it("should parse and pass custom limit", () => { + controller.getRecentEvents("25"); + expect(eventsService.getRecentEvents).toHaveBeenCalledWith(25); + }); + + it("should fallback to default when limit is invalid", () => { + controller.getRecentEvents("invalid"); + expect(eventsService.getRecentEvents).toHaveBeenCalledWith(100); + }); + }); }); diff --git a/apps/orchestrator/src/api/agents/agents.controller.ts b/apps/orchestrator/src/api/agents/agents.controller.ts index e0ac25f..687129e 100644 --- a/apps/orchestrator/src/api/agents/agents.controller.ts +++ b/apps/orchestrator/src/api/agents/agents.controller.ts @@ -13,6 +13,7 @@ import { ParseUUIDPipe, Sse, MessageEvent, + Query, } from "@nestjs/common"; import { Throttle } from "@nestjs/throttler"; import { Observable } from "rxjs"; @@ -128,6 +129,20 @@ export class AgentsController { }); } + /** + * Return recent orchestrator events for non-streaming consumers. + */ + @Get("events/recent") + @Throttle({ status: { limit: 200, ttl: 60000 } }) + getRecentEvents(@Query("limit") limit?: string): { + events: ReturnType; + } { + const parsedLimit = Number.parseInt(limit ?? "100", 10); + return { + events: this.eventsService.getRecentEvents(Number.isNaN(parsedLimit) ? 100 : parsedLimit), + }; + } + /** * List all agents * @returns Array of all agent sessions with their status diff --git a/apps/web/src/app/api/orchestrator/events/recent/route.ts b/apps/web/src/app/api/orchestrator/events/recent/route.ts new file mode 100644 index 0000000..a680c63 --- /dev/null +++ b/apps/web/src/app/api/orchestrator/events/recent/route.ts @@ -0,0 +1,47 @@ +import { NextResponse } from "next/server"; +import type { NextRequest } from "next/server"; + +const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001"; + +function getOrchestratorUrl(): string { + return ( + process.env.ORCHESTRATOR_URL ?? + process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ?? + process.env.NEXT_PUBLIC_API_URL ?? + DEFAULT_ORCHESTRATOR_URL + ); +} + +export async function GET(request: NextRequest): Promise { + const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY; + if (!orchestratorApiKey) { + return NextResponse.json( + { error: "ORCHESTRATOR_API_KEY is not configured on the web server." }, + { status: 503 } + ); + } + + const limit = request.nextUrl.searchParams.get("limit"); + const query = limit ? `?limit=${encodeURIComponent(limit)}` : ""; + + try { + const response = await fetch(`${getOrchestratorUrl()}/agents/events/recent${query}`, { + method: "GET", + headers: { + "Content-Type": "application/json", + "X-API-Key": orchestratorApiKey, + }, + cache: "no-store", + }); + + const text = await response.text(); + return new NextResponse(text, { + status: response.status, + headers: { + "Content-Type": response.headers.get("Content-Type") ?? "application/json", + }, + }); + } catch { + return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 }); + } +} diff --git a/docs/tasks.md b/docs/tasks.md index 2c03a4e..ff581b5 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -365,3 +365,19 @@ - `pnpm --filter @mosaic/orchestrator test -- src/api/queue/queue.controller.spec.ts src/api/agents/agents.controller.spec.ts src/api/agents/agents-killswitch.controller.spec.ts src/queue/queue.service.spec.ts src/config/orchestrator.config.spec.ts`: pass (`26` files, `737` tests) - `pnpm --filter @mosaic/web test -- src/components/widgets/__tests__/TaskProgressWidget.test.tsx src/components/widgets/__tests__/AgentStatusWidget.test.tsx`: pass (`89` files, `1117` tests, `3` skipped) + +--- + +## 2026-02-17 Orchestrator Observability Follow-up + +**Orchestrator:** Jarvis (Codex runtime) +**Branch:** `feature/mosaic-stack-finalization` + +### Tasks + +| id | status | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | +| ------------ | ------ | --------------------------------------------------------------------------------------------------- | ----- | ----------------- | --------------------------------- | ------------ | ------ | ----- | ----------------- | ----------------- | -------- | ---- | +| ORCH-OBS-001 | done | Add recent event buffer + endpoint (`GET /agents/events/recent?limit=`) for non-SSE polling clients | #411 | orchestrator | feature/mosaic-stack-finalization | ORCH-FU-001 | | orch | 2026-02-17T16:20Z | 2026-02-17T16:28Z | 10K | 8K | +| ORCH-OBS-002 | done | Add web proxy route for recent orchestrator events (`/api/orchestrator/events/recent`) | #411 | web | feature/mosaic-stack-finalization | ORCH-OBS-001 | | orch | 2026-02-17T16:28Z | 2026-02-17T16:31Z | 5K | 4K | +| ORCH-OBS-003 | done | Add repo-level monitor script (`scripts/agent/orchestrator-events.sh`) for recent/watch modes | #411 | tooling | feature/mosaic-stack-finalization | ORCH-OBS-001 | | orch | 2026-02-17T16:31Z | 2026-02-17T16:36Z | 8K | 5K | +| ORCH-OBS-004 | done | Add tests/docs updates for recent events and operator command usage | #411 | orchestrator,docs | feature/mosaic-stack-finalization | ORCH-OBS-001 | | orch | 2026-02-17T16:36Z | 2026-02-17T16:40Z | 8K | 6K | diff --git a/scripts/agent/orchestrator-events.sh b/scripts/agent/orchestrator-events.sh new file mode 100755 index 0000000..ca806c2 --- /dev/null +++ b/scripts/agent/orchestrator-events.sh @@ -0,0 +1,72 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +# shellcheck source=./common.sh +source "$SCRIPT_DIR/common.sh" + +ensure_repo_root + +ORCH_URL="${ORCHESTRATOR_URL:-http://localhost:3001}" +API_KEY="${ORCHESTRATOR_API_KEY:-}" + +usage() { + cat < [--limit N] + +Commands: + recent Fetch recent orchestrator events (JSON) + watch Stream live orchestrator events (SSE) + +Options: + --limit N Number of recent events to return (default: 50) + +Environment: + ORCHESTRATOR_URL Orchestrator base URL (default: http://localhost:3001) + ORCHESTRATOR_API_KEY Required API key for orchestrator requests +USAGE +} + +cmd="${1:-recent}" +if [[ $# -gt 0 ]]; then + shift +fi + +limit=50 +while [[ $# -gt 0 ]]; do + case "$1" in + --limit) + limit="${2:-50}" + shift 2 + ;; + *) + echo "[agent-framework] unknown argument: $1" >&2 + usage + exit 1 + ;; + esac +done + +if [[ -z "$API_KEY" ]]; then + echo "[agent-framework] ORCHESTRATOR_API_KEY is required" >&2 + exit 1 +fi + +case "$cmd" in + recent) + curl -fsSL \ + -H "X-API-Key: $API_KEY" \ + -H "Content-Type: application/json" \ + "$ORCH_URL/agents/events/recent?limit=$limit" + ;; + watch) + curl -NfsSL \ + -H "X-API-Key: $API_KEY" \ + -H "Accept: text/event-stream" \ + "$ORCH_URL/agents/events" + ;; + *) + usage + exit 1 + ;; +esac