feat(web): add agent output terminal tabs for orchestrator sessions (#522)
All checks were successful
ci/woodpecker/push/web Pipeline was successful

Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #522.
This commit is contained in:
2026-02-26 04:04:26 +00:00
committed by jason.woltje
parent b110c469c4
commit 9b2520ce1f
7 changed files with 1991 additions and 10 deletions

View File

@@ -0,0 +1,542 @@
/**
* @file useAgentStream.test.ts
* @description Unit tests for the useAgentStream hook
*
* Tests cover:
* - SSE event parsing (agent:spawned, agent:output, agent:completed, agent:error)
* - Agent lifecycle state transitions
* - Auto-reconnect behavior on connection loss
* - Cleanup on unmount
* - Dismiss agent
*/
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
import { renderHook, act } from "@testing-library/react";
import { useAgentStream } from "../useAgentStream";
// ==========================================
// Mock EventSource
// ==========================================
interface MockEventSourceInstance {
url: string;
onopen: (() => void) | null;
onerror: ((event: Event) => void) | null;
onmessage: ((event: MessageEvent) => void) | null;
close: ReturnType<typeof vi.fn>;
addEventListener: ReturnType<typeof vi.fn>;
dispatchEvent: (type: string, data: string) => void;
_listeners: Map<string, ((event: MessageEvent<string>) => void)[]>;
readyState: number;
}
let mockEventSourceInstances: MockEventSourceInstance[] = [];
const MockEventSource = vi.fn(function (this: MockEventSourceInstance, url: string): void {
this.url = url;
this.onopen = null;
this.onerror = null;
this.onmessage = null;
this.close = vi.fn();
this.readyState = 0;
this._listeners = new Map();
this.addEventListener = vi.fn(
(type: string, handler: (event: MessageEvent<string>) => void): void => {
if (!this._listeners.has(type)) {
this._listeners.set(type, []);
}
const list = this._listeners.get(type);
if (list) list.push(handler);
}
);
this.dispatchEvent = (type: string, data: string): void => {
const handlers = this._listeners.get(type) ?? [];
const event = new MessageEvent(type, { data });
for (const handler of handlers) {
handler(event);
}
};
mockEventSourceInstances.push(this);
});
// Add static constants
Object.assign(MockEventSource, {
CONNECTING: 0,
OPEN: 1,
CLOSED: 2,
});
vi.stubGlobal("EventSource", MockEventSource);
// ==========================================
// Helpers
// ==========================================
function getLatestES(): MockEventSourceInstance {
const es = mockEventSourceInstances[mockEventSourceInstances.length - 1];
if (!es) throw new Error("No EventSource instance created");
return es;
}
function triggerOpen(): void {
const es = getLatestES();
if (es.onopen) es.onopen();
}
function triggerError(): void {
const es = getLatestES();
if (es.onerror) es.onerror(new Event("error"));
}
function emitEvent(type: string, data: unknown): void {
const es = getLatestES();
es.dispatchEvent(type, JSON.stringify(data));
}
// ==========================================
// Tests
// ==========================================
describe("useAgentStream", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.useFakeTimers();
mockEventSourceInstances = [];
});
afterEach(() => {
vi.runAllTimers();
vi.useRealTimers();
});
// ==========================================
// Initialization
// ==========================================
describe("initialization", () => {
it("creates an EventSource connecting to /api/orchestrator/events", () => {
renderHook(() => useAgentStream());
expect(MockEventSource).toHaveBeenCalledWith("/api/orchestrator/events");
});
it("starts with isConnected=false before onopen fires", () => {
const { result } = renderHook(() => useAgentStream());
expect(result.current.isConnected).toBe(false);
});
it("starts with an empty agents map", () => {
const { result } = renderHook(() => useAgentStream());
expect(result.current.agents.size).toBe(0);
});
it("sets isConnected=true when EventSource opens", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
});
expect(result.current.isConnected).toBe(true);
});
it("clears connectionError when EventSource opens", () => {
const { result } = renderHook(() => useAgentStream());
// Trigger an error first to set connectionError
act(() => {
triggerError();
});
// Start a fresh reconnect and open it
act(() => {
vi.advanceTimersByTime(2000);
});
act(() => {
triggerOpen();
});
expect(result.current.connectionError).toBeNull();
});
});
// ==========================================
// SSE event: agent:spawned
// ==========================================
describe("agent:spawned event", () => {
it("adds an agent with status=spawning", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker", jobId: "job-abc" });
});
expect(result.current.agents.has("agent-1")).toBe(true);
expect(result.current.agents.get("agent-1")?.status).toBe("spawning");
});
it("sets agentType from the type field", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "planner" });
});
expect(result.current.agents.get("agent-1")?.agentType).toBe("planner");
});
it("defaults agentType to 'agent' when type is missing", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-2" });
});
expect(result.current.agents.get("agent-2")?.agentType).toBe("agent");
});
it("stores jobId when present", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-3", type: "worker", jobId: "job-xyz" });
});
expect(result.current.agents.get("agent-3")?.jobId).toBe("job-xyz");
});
it("starts with empty outputLines", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
});
expect(result.current.agents.get("agent-1")?.outputLines).toEqual([]);
});
});
// ==========================================
// SSE event: agent:output
// ==========================================
describe("agent:output event", () => {
it("appends output to the agent's outputLines", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
emitEvent("agent:output", { agentId: "agent-1", data: "Hello world\n" });
});
expect(result.current.agents.get("agent-1")?.outputLines).toContain("Hello world\n");
});
it("transitions status from spawning to running on first output", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
emitEvent("agent:output", { agentId: "agent-1", data: "Starting...\n" });
});
expect(result.current.agents.get("agent-1")?.status).toBe("running");
});
it("accumulates multiple output lines", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
emitEvent("agent:output", { agentId: "agent-1", data: "Line 1\n" });
emitEvent("agent:output", { agentId: "agent-1", data: "Line 2\n" });
emitEvent("agent:output", { agentId: "agent-1", data: "Line 3\n" });
});
const lines = result.current.agents.get("agent-1")?.outputLines ?? [];
expect(lines).toHaveLength(3);
expect(lines[0]).toBe("Line 1\n");
expect(lines[1]).toBe("Line 2\n");
expect(lines[2]).toBe("Line 3\n");
});
it("creates a new agent entry if output arrives before spawned event", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:output", { agentId: "unknown-agent", data: "Surprise output\n" });
});
expect(result.current.agents.has("unknown-agent")).toBe(true);
expect(result.current.agents.get("unknown-agent")?.status).toBe("running");
});
});
// ==========================================
// SSE event: agent:completed
// ==========================================
describe("agent:completed event", () => {
it("sets status to completed", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
emitEvent("agent:output", { agentId: "agent-1", data: "Working...\n" });
emitEvent("agent:completed", { agentId: "agent-1", exitCode: 0 });
});
expect(result.current.agents.get("agent-1")?.status).toBe("completed");
});
it("stores the exitCode", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
emitEvent("agent:completed", { agentId: "agent-1", exitCode: 42 });
});
expect(result.current.agents.get("agent-1")?.exitCode).toBe(42);
});
it("sets endedAt timestamp", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
emitEvent("agent:completed", { agentId: "agent-1", exitCode: 0 });
});
expect(result.current.agents.get("agent-1")?.endedAt).toBeDefined();
});
it("ignores completed event for unknown agent", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:completed", { agentId: "ghost-agent", exitCode: 0 });
});
expect(result.current.agents.has("ghost-agent")).toBe(false);
});
});
// ==========================================
// SSE event: agent:error
// ==========================================
describe("agent:error event", () => {
it("sets status to error", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
emitEvent("agent:error", { agentId: "agent-1", error: "Out of memory" });
});
expect(result.current.agents.get("agent-1")?.status).toBe("error");
});
it("stores the error message", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
emitEvent("agent:error", { agentId: "agent-1", error: "Segfault" });
});
expect(result.current.agents.get("agent-1")?.errorMessage).toBe("Segfault");
});
it("sets endedAt on error", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
emitEvent("agent:error", { agentId: "agent-1", error: "Crash" });
});
expect(result.current.agents.get("agent-1")?.endedAt).toBeDefined();
});
it("ignores error event for unknown agent", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:error", { agentId: "ghost-agent", error: "Crash" });
});
expect(result.current.agents.has("ghost-agent")).toBe(false);
});
});
// ==========================================
// Reconnect behavior
// ==========================================
describe("auto-reconnect", () => {
it("sets isConnected=false on error", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
});
act(() => {
triggerError();
});
expect(result.current.isConnected).toBe(false);
});
it("sets connectionError on error", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
triggerError();
});
expect(result.current.connectionError).not.toBeNull();
});
it("creates a new EventSource after reconnect delay", () => {
renderHook(() => useAgentStream());
const initialCount = mockEventSourceInstances.length;
act(() => {
triggerOpen();
triggerError();
});
act(() => {
vi.advanceTimersByTime(1500); // initial backoff = 1000ms
});
expect(mockEventSourceInstances.length).toBeGreaterThan(initialCount);
});
it("closes the old EventSource before reconnecting", () => {
renderHook(() => useAgentStream());
act(() => {
triggerOpen();
triggerError();
});
const closedInstance = mockEventSourceInstances[0];
expect(closedInstance?.close).toHaveBeenCalled();
});
});
// ==========================================
// Cleanup on unmount
// ==========================================
describe("cleanup on unmount", () => {
it("closes EventSource when the hook is unmounted", () => {
const { unmount } = renderHook(() => useAgentStream());
const es = getLatestES();
unmount();
expect(es.close).toHaveBeenCalled();
});
it("does not attempt to reconnect after unmount", () => {
const { unmount } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
triggerError();
});
const countBeforeUnmount = mockEventSourceInstances.length;
unmount();
act(() => {
vi.advanceTimersByTime(5000);
});
// No new instances created after unmount
expect(mockEventSourceInstances.length).toBe(countBeforeUnmount);
});
});
// ==========================================
// Dismiss agent
// ==========================================
describe("dismissAgent", () => {
it("removes the agent from the map", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
emitEvent("agent:completed", { agentId: "agent-1", exitCode: 0 });
});
act(() => {
result.current.dismissAgent("agent-1");
});
expect(result.current.agents.has("agent-1")).toBe(false);
});
it("is a no-op for unknown agentId", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
emitEvent("agent:spawned", { agentId: "agent-1", type: "worker" });
});
act(() => {
result.current.dismissAgent("nonexistent-agent");
});
expect(result.current.agents.has("agent-1")).toBe(true);
});
});
// ==========================================
// Malformed event handling
// ==========================================
describe("malformed events", () => {
it("ignores malformed JSON without throwing", () => {
const { result } = renderHook(() => useAgentStream());
act(() => {
triggerOpen();
// Dispatch raw bad JSON
const es = getLatestES();
es.dispatchEvent("agent:spawned", "NOT JSON {{{");
});
// Should not crash, agents map stays empty
expect(result.current.agents.size).toBe(0);
});
});
});

View File

@@ -0,0 +1,319 @@
"use client";
/**
* useAgentStream hook
*
* Connects to the orchestrator SSE event stream at /api/orchestrator/events
* and maintains a Map of agentId → AgentSession with accumulated output,
* status, and lifecycle metadata.
*
* SSE event types consumed:
* - agent:spawned — { agentId, type, jobId }
* - agent:output — { agentId, data } (stdout/stderr lines)
* - agent:completed — { agentId, exitCode, result }
* - agent:error — { agentId, error }
*
* Features:
* - Auto-reconnect with exponential backoff on connection loss
* - Cleans up EventSource on unmount
* - Accumulates output lines per agent
*/
import { useEffect, useRef, useState, useCallback } from "react";
// ==========================================
// Types
// ==========================================
export type AgentStatus = "spawning" | "running" | "completed" | "error";
export interface AgentSession {
/** Agent identifier from the orchestrator */
agentId: string;
/** Agent type or name (e.g., "worker", "planner") */
agentType: string;
/** Optional job ID this agent is associated with */
jobId?: string;
/** Current lifecycle status */
status: AgentStatus;
/** Accumulated output lines (stdout/stderr) */
outputLines: string[];
/** Timestamp when the agent was spawned */
startedAt: number;
/** Timestamp when the agent completed or errored */
endedAt?: number;
/** Exit code from agent:completed event */
exitCode?: number;
/** Error message from agent:error event */
errorMessage?: string;
}
export interface UseAgentStreamReturn {
/** Map of agentId → AgentSession */
agents: Map<string, AgentSession>;
/** Whether the SSE stream is currently connected */
isConnected: boolean;
/** Connection error message, if any */
connectionError: string | null;
/** Dismiss (remove) an agent tab by agentId */
dismissAgent: (agentId: string) => void;
}
// ==========================================
// SSE payload shapes
// ==========================================
interface SpawnedPayload {
agentId: string;
type?: string;
jobId?: string;
}
interface OutputPayload {
agentId: string;
data: string;
}
interface CompletedPayload {
agentId: string;
exitCode?: number;
result?: unknown;
}
interface ErrorPayload {
agentId: string;
error?: string;
}
// ==========================================
// Backoff config
// ==========================================
const RECONNECT_BASE_MS = 1_000;
const RECONNECT_MAX_MS = 30_000;
const RECONNECT_MULTIPLIER = 2;
// ==========================================
// Hook
// ==========================================
/**
* Connects to the orchestrator SSE stream and tracks all agent sessions.
*
* @returns Agent sessions map, connection status, and dismiss callback
*/
export function useAgentStream(): UseAgentStreamReturn {
const [agents, setAgents] = useState<Map<string, AgentSession>>(new Map());
const [isConnected, setIsConnected] = useState(false);
const [connectionError, setConnectionError] = useState<string | null>(null);
const eventSourceRef = useRef<EventSource | null>(null);
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const reconnectDelayRef = useRef<number>(RECONNECT_BASE_MS);
const isMountedRef = useRef(true);
// ==========================================
// Agent state update helpers
// ==========================================
const handleAgentSpawned = useCallback((payload: SpawnedPayload): void => {
setAgents((prev) => {
const next = new Map(prev);
next.set(payload.agentId, {
agentId: payload.agentId,
agentType: payload.type ?? "agent",
...(payload.jobId !== undefined ? { jobId: payload.jobId } : {}),
status: "spawning",
outputLines: [],
startedAt: Date.now(),
});
return next;
});
}, []);
const handleAgentOutput = useCallback((payload: OutputPayload): void => {
setAgents((prev) => {
const existing = prev.get(payload.agentId);
if (!existing) {
// First output for an agent we haven't seen spawned — create it
const next = new Map(prev);
next.set(payload.agentId, {
agentId: payload.agentId,
agentType: "agent",
status: "running",
outputLines: [payload.data],
startedAt: Date.now(),
});
return next;
}
const next = new Map(prev);
next.set(payload.agentId, {
...existing,
status: existing.status === "spawning" ? "running" : existing.status,
outputLines: [...existing.outputLines, payload.data],
});
return next;
});
}, []);
const handleAgentCompleted = useCallback((payload: CompletedPayload): void => {
setAgents((prev) => {
const existing = prev.get(payload.agentId);
if (!existing) return prev;
const next = new Map(prev);
next.set(payload.agentId, {
...existing,
status: "completed",
endedAt: Date.now(),
...(payload.exitCode !== undefined ? { exitCode: payload.exitCode } : {}),
});
return next;
});
}, []);
const handleAgentError = useCallback((payload: ErrorPayload): void => {
setAgents((prev) => {
const existing = prev.get(payload.agentId);
if (!existing) return prev;
const next = new Map(prev);
next.set(payload.agentId, {
...existing,
status: "error",
endedAt: Date.now(),
...(payload.error !== undefined ? { errorMessage: payload.error } : {}),
});
return next;
});
}, []);
// ==========================================
// SSE connection
// ==========================================
const connect = useCallback((): void => {
if (!isMountedRef.current) return;
if (typeof EventSource === "undefined") return;
// Clean up any existing connection
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
const es = new EventSource("/api/orchestrator/events");
eventSourceRef.current = es;
es.onopen = (): void => {
if (!isMountedRef.current) return;
setIsConnected(true);
setConnectionError(null);
reconnectDelayRef.current = RECONNECT_BASE_MS;
};
es.onerror = (): void => {
if (!isMountedRef.current) return;
setIsConnected(false);
es.close();
eventSourceRef.current = null;
// Schedule reconnect with backoff
const delay = reconnectDelayRef.current;
reconnectDelayRef.current = Math.min(delay * RECONNECT_MULTIPLIER, RECONNECT_MAX_MS);
const delaySecs = Math.round(delay / 1000).toString();
setConnectionError(`SSE connection lost. Reconnecting in ${delaySecs}s...`);
reconnectTimerRef.current = setTimeout(() => {
if (isMountedRef.current) {
connect();
}
}, delay);
};
es.addEventListener("agent:spawned", (event: MessageEvent<string>) => {
if (!isMountedRef.current) return;
try {
const payload = JSON.parse(event.data) as SpawnedPayload;
handleAgentSpawned(payload);
} catch {
// Ignore malformed events
}
});
es.addEventListener("agent:output", (event: MessageEvent<string>) => {
if (!isMountedRef.current) return;
try {
const payload = JSON.parse(event.data) as OutputPayload;
handleAgentOutput(payload);
} catch {
// Ignore malformed events
}
});
es.addEventListener("agent:completed", (event: MessageEvent<string>) => {
if (!isMountedRef.current) return;
try {
const payload = JSON.parse(event.data) as CompletedPayload;
handleAgentCompleted(payload);
} catch {
// Ignore malformed events
}
});
es.addEventListener("agent:error", (event: MessageEvent<string>) => {
if (!isMountedRef.current) return;
try {
const payload = JSON.parse(event.data) as ErrorPayload;
handleAgentError(payload);
} catch {
// Ignore malformed events
}
});
}, [handleAgentSpawned, handleAgentOutput, handleAgentCompleted, handleAgentError]);
// ==========================================
// Mount / unmount
// ==========================================
useEffect(() => {
isMountedRef.current = true;
connect();
return (): void => {
isMountedRef.current = false;
if (reconnectTimerRef.current !== null) {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = null;
}
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
};
}, [connect]);
// ==========================================
// Dismiss agent
// ==========================================
const dismissAgent = useCallback((agentId: string): void => {
setAgents((prev) => {
const next = new Map(prev);
next.delete(agentId);
return next;
});
}, []);
return {
agents,
isConnected,
connectionError,
dismissAgent,
};
}