"use client"; /** * useAgentStream hook * * Connects to the orchestrator SSE event stream at /api/orchestrator/events * and maintains a Map of agentId → AgentSession with accumulated output, * status, and lifecycle metadata. * * SSE event types consumed: * - agent:spawned — { agentId, type, jobId } * - agent:output — { agentId, data } (stdout/stderr lines) * - agent:completed — { agentId, exitCode, result } * - agent:error — { agentId, error } * * Features: * - Auto-reconnect with exponential backoff on connection loss * - Cleans up EventSource on unmount * - Accumulates output lines per agent */ import { useEffect, useRef, useState, useCallback } from "react"; // ========================================== // Types // ========================================== export type AgentStatus = "spawning" | "running" | "completed" | "error"; export interface AgentSession { /** Agent identifier from the orchestrator */ agentId: string; /** Agent type or name (e.g., "worker", "planner") */ agentType: string; /** Optional job ID this agent is associated with */ jobId?: string; /** Current lifecycle status */ status: AgentStatus; /** Accumulated output lines (stdout/stderr) */ outputLines: string[]; /** Timestamp when the agent was spawned */ startedAt: number; /** Timestamp when the agent completed or errored */ endedAt?: number; /** Exit code from agent:completed event */ exitCode?: number; /** Error message from agent:error event */ errorMessage?: string; } export interface UseAgentStreamReturn { /** Map of agentId → AgentSession */ agents: Map; /** Whether the SSE stream is currently connected */ isConnected: boolean; /** Connection error message, if any */ connectionError: string | null; /** Dismiss (remove) an agent tab by agentId */ dismissAgent: (agentId: string) => void; } // ========================================== // SSE payload shapes // ========================================== interface SpawnedPayload { agentId: string; type?: string; jobId?: string; } interface OutputPayload { agentId: string; data: string; } interface CompletedPayload { agentId: string; exitCode?: number; result?: unknown; } interface ErrorPayload { agentId: string; error?: string; } // ========================================== // Backoff config // ========================================== const RECONNECT_BASE_MS = 1_000; const RECONNECT_MAX_MS = 30_000; const RECONNECT_MULTIPLIER = 2; // ========================================== // Hook // ========================================== /** * Connects to the orchestrator SSE stream and tracks all agent sessions. * * @returns Agent sessions map, connection status, and dismiss callback */ export function useAgentStream(): UseAgentStreamReturn { const [agents, setAgents] = useState>(new Map()); const [isConnected, setIsConnected] = useState(false); const [connectionError, setConnectionError] = useState(null); const eventSourceRef = useRef(null); const reconnectTimerRef = useRef | null>(null); const reconnectDelayRef = useRef(RECONNECT_BASE_MS); const isMountedRef = useRef(true); // ========================================== // Agent state update helpers // ========================================== const handleAgentSpawned = useCallback((payload: SpawnedPayload): void => { setAgents((prev) => { const next = new Map(prev); next.set(payload.agentId, { agentId: payload.agentId, agentType: payload.type ?? "agent", ...(payload.jobId !== undefined ? { jobId: payload.jobId } : {}), status: "spawning", outputLines: [], startedAt: Date.now(), }); return next; }); }, []); const handleAgentOutput = useCallback((payload: OutputPayload): void => { setAgents((prev) => { const existing = prev.get(payload.agentId); if (!existing) { // First output for an agent we haven't seen spawned — create it const next = new Map(prev); next.set(payload.agentId, { agentId: payload.agentId, agentType: "agent", status: "running", outputLines: [payload.data], startedAt: Date.now(), }); return next; } const next = new Map(prev); next.set(payload.agentId, { ...existing, status: existing.status === "spawning" ? "running" : existing.status, outputLines: [...existing.outputLines, payload.data], }); return next; }); }, []); const handleAgentCompleted = useCallback((payload: CompletedPayload): void => { setAgents((prev) => { const existing = prev.get(payload.agentId); if (!existing) return prev; const next = new Map(prev); next.set(payload.agentId, { ...existing, status: "completed", endedAt: Date.now(), ...(payload.exitCode !== undefined ? { exitCode: payload.exitCode } : {}), }); return next; }); }, []); const handleAgentError = useCallback((payload: ErrorPayload): void => { setAgents((prev) => { const existing = prev.get(payload.agentId); if (!existing) return prev; const next = new Map(prev); next.set(payload.agentId, { ...existing, status: "error", endedAt: Date.now(), ...(payload.error !== undefined ? { errorMessage: payload.error } : {}), }); return next; }); }, []); // ========================================== // SSE connection // ========================================== const connect = useCallback((): void => { if (!isMountedRef.current) return; if (typeof EventSource === "undefined") return; // Clean up any existing connection if (eventSourceRef.current) { eventSourceRef.current.close(); eventSourceRef.current = null; } const es = new EventSource("/api/orchestrator/events"); eventSourceRef.current = es; es.onopen = (): void => { if (!isMountedRef.current) return; setIsConnected(true); setConnectionError(null); reconnectDelayRef.current = RECONNECT_BASE_MS; }; es.onerror = (): void => { if (!isMountedRef.current) return; setIsConnected(false); es.close(); eventSourceRef.current = null; // Schedule reconnect with backoff const delay = reconnectDelayRef.current; reconnectDelayRef.current = Math.min(delay * RECONNECT_MULTIPLIER, RECONNECT_MAX_MS); const delaySecs = Math.round(delay / 1000).toString(); setConnectionError(`SSE connection lost. Reconnecting in ${delaySecs}s...`); reconnectTimerRef.current = setTimeout(() => { if (isMountedRef.current) { connect(); } }, delay); }; es.addEventListener("agent:spawned", (event: MessageEvent) => { if (!isMountedRef.current) return; try { const payload = JSON.parse(event.data) as SpawnedPayload; handleAgentSpawned(payload); } catch { // Ignore malformed events } }); es.addEventListener("agent:output", (event: MessageEvent) => { if (!isMountedRef.current) return; try { const payload = JSON.parse(event.data) as OutputPayload; handleAgentOutput(payload); } catch { // Ignore malformed events } }); es.addEventListener("agent:completed", (event: MessageEvent) => { if (!isMountedRef.current) return; try { const payload = JSON.parse(event.data) as CompletedPayload; handleAgentCompleted(payload); } catch { // Ignore malformed events } }); es.addEventListener("agent:error", (event: MessageEvent) => { if (!isMountedRef.current) return; try { const payload = JSON.parse(event.data) as ErrorPayload; handleAgentError(payload); } catch { // Ignore malformed events } }); }, [handleAgentSpawned, handleAgentOutput, handleAgentCompleted, handleAgentError]); // ========================================== // Mount / unmount // ========================================== useEffect(() => { isMountedRef.current = true; connect(); return (): void => { isMountedRef.current = false; if (reconnectTimerRef.current !== null) { clearTimeout(reconnectTimerRef.current); reconnectTimerRef.current = null; } if (eventSourceRef.current) { eventSourceRef.current.close(); eventSourceRef.current = null; } }; }, [connect]); // ========================================== // Dismiss agent // ========================================== const dismissAgent = useCallback((agentId: string): void => { setAgents((prev) => { const next = new Map(prev); next.delete(agentId); return next; }); }, []); return { agents, isConnected, connectionError, dismissAgent, }; }