diff --git a/apps/web/src/app/api/orchestrator/queue/notifications/stream/route.ts b/apps/web/src/app/api/orchestrator/queue/notifications/stream/route.ts new file mode 100644 index 0000000..2c7f9c9 --- /dev/null +++ b/apps/web/src/app/api/orchestrator/queue/notifications/stream/route.ts @@ -0,0 +1,57 @@ +import { NextResponse } from "next/server"; + +const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001"; + +function getOrchestratorUrl(): string { + return ( + process.env.ORCHESTRATOR_URL ?? + process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ?? + process.env.NEXT_PUBLIC_API_URL ?? + DEFAULT_ORCHESTRATOR_URL + ); +} + +export const dynamic = "force-dynamic"; + +export async function GET(): Promise { + const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY; + if (!orchestratorApiKey) { + return NextResponse.json( + { error: "ORCHESTRATOR_API_KEY is not configured on the web server." }, + { status: 503 } + ); + } + + try { + const upstream = await fetch(`${getOrchestratorUrl()}/api/queue/notifications/stream`, { + method: "GET", + headers: { + "X-API-Key": orchestratorApiKey, + Accept: "text/event-stream", + }, + cache: "no-store", + }); + + if (!upstream.ok || upstream.body === null) { + const message = await upstream.text(); + return new NextResponse(message || "Failed to connect to queue notifications stream", { + status: upstream.status || 502, + headers: { + "Content-Type": upstream.headers.get("Content-Type") ?? "text/plain; charset=utf-8", + }, + }); + } + + return new NextResponse(upstream.body, { + status: upstream.status, + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache, no-transform", + Connection: "keep-alive", + "X-Accel-Buffering": "no", + }, + }); + } catch { + return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 }); + } +} diff --git a/apps/web/src/components/layout/AppSidebar.tsx b/apps/web/src/components/layout/AppSidebar.tsx index 6126c42..92f5c68 100644 --- a/apps/web/src/components/layout/AppSidebar.tsx +++ b/apps/web/src/components/layout/AppSidebar.tsx @@ -1,6 +1,6 @@ "use client"; -import { useEffect, useState } from "react"; +import { useEffect, useMemo, useState } from "react"; import Link from "next/link"; import { usePathname } from "next/navigation"; import Image from "next/image"; @@ -29,6 +29,10 @@ interface NavGroup { items: NavItemConfig[]; } +interface QueueNotification { + id: string; +} + // --------------------------------------------------------------------------- // SVG Icons (16x16 viewBox, stroke="currentColor", strokeWidth="1.5") // --------------------------------------------------------------------------- @@ -685,6 +689,72 @@ function CollapseToggle({ collapsed, onToggle }: CollapseToggleProps): React.JSX export function AppSidebar(): React.JSX.Element { const pathname = usePathname(); const { collapsed, toggleCollapsed, mobileOpen, setMobileOpen, isMobile } = useSidebar(); + const [missionControlBadgeCount, setMissionControlBadgeCount] = useState(0); + + useEffect(() => { + let isActive = true; + + const loadNotificationCount = async (): Promise => { + try { + const response = await fetch("/api/orchestrator/api/queue/notifications", { + method: "GET", + }); + + if (!response.ok) { + return; + } + + const payload = (await response.json()) as QueueNotification[]; + if (isActive) { + setMissionControlBadgeCount(Array.isArray(payload) ? payload.length : 0); + } + } catch { + // Ignore badge failures in the nav. + } + }; + + void loadNotificationCount(); + + if (typeof EventSource === "undefined") { + return (): void => { + isActive = false; + }; + } + + const source = new EventSource("/api/orchestrator/queue/notifications/stream"); + + source.onmessage = (event: MessageEvent): void => { + try { + const payload = JSON.parse(event.data) as QueueNotification[]; + if (isActive) { + setMissionControlBadgeCount(Array.isArray(payload) ? payload.length : 0); + } + } catch { + // Ignore malformed badge updates. + } + }; + + return (): void => { + isActive = false; + source.close(); + }; + }, []); + + const navGroups = useMemo( + () => + NAV_GROUPS.map((group) => ({ + ...group, + items: group.items.map((item) => + item.href === "/mission-control" && missionControlBadgeCount > 0 + ? { + ...item, + badge: { label: String(missionControlBadgeCount) }, + } + : item + ), + })), + [missionControlBadgeCount] + ); return ( <> @@ -722,7 +792,7 @@ export function AppSidebar(): React.JSX.Element { }} aria-label="Main navigation" > - {NAV_GROUPS.map((group) => ( + {navGroups.map((group) => (
{/* Group label — hidden when collapsed */} {!collapsed && ( diff --git a/apps/web/src/components/mission-control/MissionControlLayout.test.tsx b/apps/web/src/components/mission-control/MissionControlLayout.test.tsx index 3b4b35f..15a5f3a 100644 --- a/apps/web/src/components/mission-control/MissionControlLayout.test.tsx +++ b/apps/web/src/components/mission-control/MissionControlLayout.test.tsx @@ -8,6 +8,7 @@ interface MockButtonProps extends ButtonHTMLAttributes { const mockGlobalAgentRoster = vi.fn(); const mockMissionControlPanel = vi.fn(); +const mockQueueNotificationFeed = vi.fn(); vi.mock("@/components/mission-control/AuditLogDrawer", () => ({ AuditLogDrawer: ({ trigger }: { trigger: ReactNode }): React.JSX.Element => ( @@ -31,6 +32,13 @@ vi.mock("@/components/mission-control/MissionControlPanel", () => ({ MIN_PANEL_COUNT: 1, })); +vi.mock("@/components/mission-control/QueueNotificationFeed", () => ({ + QueueNotificationFeed: (props: unknown): React.JSX.Element => { + mockQueueNotificationFeed(props); + return
; + }, +})); + vi.mock("@/components/ui/button", () => ({ Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => ( @@ -66,5 +74,6 @@ describe("MissionControlLayout", (): void => { expect(region.querySelector("main")).toBeInTheDocument(); expect(screen.getByTestId("global-agent-roster")).toBeInTheDocument(); expect(screen.getByTestId("mission-control-panel")).toBeInTheDocument(); + expect(screen.getByTestId("queue-notification-feed")).toBeInTheDocument(); }); }); diff --git a/apps/web/src/components/mission-control/MissionControlLayout.tsx b/apps/web/src/components/mission-control/MissionControlLayout.tsx index 889d6bd..b565962 100644 --- a/apps/web/src/components/mission-control/MissionControlLayout.tsx +++ b/apps/web/src/components/mission-control/MissionControlLayout.tsx @@ -9,6 +9,7 @@ import { MissionControlPanel, type PanelConfig, } from "@/components/mission-control/MissionControlPanel"; +import { QueueNotificationFeed } from "@/components/mission-control/QueueNotificationFeed"; import { Button } from "@/components/ui/button"; const INITIAL_PANELS: PanelConfig[] = [{}]; @@ -94,7 +95,7 @@ export function MissionControlLayout(): React.JSX.Element { /> -
+
); diff --git a/apps/web/src/components/mission-control/QueueNotificationFeed.test.tsx b/apps/web/src/components/mission-control/QueueNotificationFeed.test.tsx new file mode 100644 index 0000000..9d58ef9 --- /dev/null +++ b/apps/web/src/components/mission-control/QueueNotificationFeed.test.tsx @@ -0,0 +1,225 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { act, render, screen, waitFor } from "@testing-library/react"; +import userEvent from "@testing-library/user-event"; +import type { ButtonHTMLAttributes, HTMLAttributes, ReactNode } from "react"; + +vi.mock("date-fns", () => ({ + formatDistanceToNow: (): string => "5 minutes ago", +})); + +interface MockButtonProps extends ButtonHTMLAttributes { + children: ReactNode; +} + +interface MockContainerProps extends HTMLAttributes { + children: ReactNode; +} + +interface MockEventSourceInstance { + url: string; + onerror: ((event: Event) => void) | null; + onmessage: ((event: MessageEvent) => void) | null; + close: ReturnType; + emitMessage: (payload: unknown) => void; +} + +interface QueueNotification { + id: string; + agent: string; + filename: string; + payload: unknown; + createdAt: string; +} + +const mockApiGet = vi.fn<(endpoint: string) => Promise>(); +const mockApiPost = vi.fn<(endpoint: string) => Promise<{ success: true; id: string }>>(); +const mockShowToast = vi.fn<(message: string, variant?: string) => void>(); + +let mockEventSourceInstances: MockEventSourceInstance[] = []; + +const MockEventSource = vi.fn(function (this: MockEventSourceInstance, url: string): void { + this.url = url; + this.onerror = null; + this.onmessage = null; + this.close = vi.fn(); + this.emitMessage = (payload: unknown): void => { + this.onmessage?.(new MessageEvent("message", { data: JSON.stringify(payload) })); + }; + + mockEventSourceInstances.push(this); +}); + +vi.mock("@/lib/api/client", () => ({ + apiGet: (endpoint: string): Promise => mockApiGet(endpoint), + apiPost: (endpoint: string): Promise<{ success: true; id: string }> => mockApiPost(endpoint), +})); + +vi.mock("@mosaic/ui", () => ({ + useToast: (): { showToast: typeof mockShowToast; removeToast: ReturnType } => ({ + showToast: mockShowToast, + removeToast: vi.fn(), + }), +})); + +vi.mock("@/components/ui/button", () => ({ + Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => ( + + ), +})); + +vi.mock("@/components/ui/badge", () => ({ + Badge: ({ children, ...props }: MockContainerProps): React.JSX.Element => ( + {children} + ), +})); + +vi.mock("@/components/ui/card", () => ({ + Card: ({ children, ...props }: MockContainerProps): React.JSX.Element => ( +
{children}
+ ), + CardHeader: ({ children, ...props }: MockContainerProps): React.JSX.Element => ( +
{children}
+ ), + CardContent: ({ children, ...props }: MockContainerProps): React.JSX.Element => ( +
{children}
+ ), + CardTitle: ({ children, ...props }: MockContainerProps): React.JSX.Element => ( +

{children}

+ ), +})); + +vi.mock("@/components/ui/collapsible", () => ({ + Collapsible: ({ children }: MockContainerProps): React.JSX.Element =>
{children}
, +})); + +vi.mock("@/components/ui/scroll-area", () => ({ + ScrollArea: ({ children, ...props }: MockContainerProps): React.JSX.Element => ( +
{children}
+ ), +})); + +vi.mock("@/components/ui/skeleton", () => ({ + Skeleton: (props: HTMLAttributes): React.JSX.Element =>
, +})); + +import { QueueNotificationFeed } from "./QueueNotificationFeed"; + +function latestEventSource(): MockEventSourceInstance { + const instance = mockEventSourceInstances[mockEventSourceInstances.length - 1]; + if (!instance) { + throw new Error("Expected an EventSource instance"); + } + return instance; +} + +function makeNotification(overrides: Partial): QueueNotification { + return { + id: "notif-1", + agent: "mosaic", + filename: "notif-1.json", + payload: { + taskId: "MS24-WEB-001", + eventType: "task.ready", + }, + createdAt: "2026-03-08T23:00:00.000Z", + ...overrides, + }; +} + +describe("QueueNotificationFeed", (): void => { + beforeEach((): void => { + vi.clearAllMocks(); + vi.stubGlobal("EventSource", MockEventSource); + vi.stubGlobal("fetch", vi.fn()); + mockEventSourceInstances = []; + mockApiGet.mockResolvedValue([]); + mockApiPost.mockResolvedValue({ success: true, id: "notif-1" }); + }); + + afterEach((): void => { + vi.unstubAllGlobals(); + }); + + it("loads and renders notifications grouped by agent", async (): Promise => { + mockApiGet.mockResolvedValue([ + makeNotification({ id: "notif-1", agent: "mosaic" }), + makeNotification({ + id: "notif-2", + agent: "dyor", + payload: { taskId: "MS24-WEB-002", eventType: "task.blocked" }, + }), + ]); + + render(); + + await waitFor((): void => { + expect(mockApiGet).toHaveBeenCalledWith("/api/orchestrator/api/queue/notifications"); + }); + + expect(screen.getByText("mosaic")).toBeInTheDocument(); + expect(screen.getByText("dyor")).toBeInTheDocument(); + expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument(); + expect(screen.getByText("task.ready")).toBeInTheDocument(); + expect(screen.getAllByText("5 minutes ago")).toHaveLength(2); + expect(MockEventSource).toHaveBeenCalledWith("/api/orchestrator/queue/notifications/stream"); + }); + + it("acknowledges a notification and removes it from the list", async (): Promise => { + const user = userEvent.setup(); + mockApiGet.mockResolvedValue([makeNotification({ id: "notif-ack" })]); + + render(); + + await waitFor((): void => { + expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument(); + }); + + await user.click(screen.getByRole("button", { name: "ACK notification MS24-WEB-001" })); + + await waitFor((): void => { + expect(mockApiPost).toHaveBeenCalledWith( + "/api/orchestrator/api/queue/notifications/notif-ack/ack" + ); + }); + + await waitFor((): void => { + expect(screen.getByText("No pending notifications")).toBeInTheDocument(); + }); + }); + + it("renders the empty state when there are no notifications", async (): Promise => { + mockApiGet.mockResolvedValue([]); + + render(); + + await waitFor((): void => { + expect(screen.getByText("No pending notifications")).toBeInTheDocument(); + }); + }); + + it("refreshes the list when an SSE message arrives", async (): Promise => { + mockApiGet.mockResolvedValue([makeNotification({ id: "notif-before" })]); + + render(); + + await waitFor((): void => { + expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument(); + }); + + act(() => { + latestEventSource().emitMessage([ + makeNotification({ + id: "notif-after", + agent: "sage", + payload: { taskId: "MS24-WEB-003", eventType: "task.failed" }, + }), + ]); + }); + + await waitFor((): void => { + expect(screen.getByText("sage")).toBeInTheDocument(); + expect(screen.getByText("MS24-WEB-003")).toBeInTheDocument(); + expect(screen.getByText("task.failed")).toBeInTheDocument(); + }); + }); +}); diff --git a/apps/web/src/components/mission-control/QueueNotificationFeed.tsx b/apps/web/src/components/mission-control/QueueNotificationFeed.tsx new file mode 100644 index 0000000..211fa6a --- /dev/null +++ b/apps/web/src/components/mission-control/QueueNotificationFeed.tsx @@ -0,0 +1,332 @@ +"use client"; + +import { useCallback, useEffect, useMemo, useState } from "react"; +import { formatDistanceToNow } from "date-fns"; +import { BellOff, ChevronLeft, ChevronRight, Loader2 } from "lucide-react"; +import { useToast } from "@mosaic/ui"; +import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Collapsible } from "@/components/ui/collapsible"; +import { ScrollArea } from "@/components/ui/scroll-area"; +import { Skeleton } from "@/components/ui/skeleton"; +import { apiGet, apiPost } from "@/lib/api/client"; + +export interface QueueNotificationFeedProps { + className?: string; +} + +interface QueueNotification { + id: string; + agent: string; + filename: string; + payload: unknown; + createdAt: string; +} + +interface QueuePayloadRecord { + taskId?: unknown; + type?: unknown; + eventType?: unknown; + event?: unknown; +} + +interface NotificationGroup { + agent: string; + notifications: QueueNotification[]; +} + +const NOTIFICATIONS_ENDPOINT = "/api/orchestrator/api/queue/notifications"; +const NOTIFICATIONS_STREAM_ENDPOINT = "/api/orchestrator/queue/notifications/stream"; + +function joinClasses(...classes: (string | undefined)[]): string { + return classes.filter((value) => typeof value === "string" && value.length > 0).join(" "); +} + +function asPayloadRecord(payload: unknown): QueuePayloadRecord | null { + if (payload === null || typeof payload !== "object" || Array.isArray(payload)) { + return null; + } + + return payload as QueuePayloadRecord; +} + +function getNotificationTaskId(notification: QueueNotification): string { + const payload = asPayloadRecord(notification.payload); + return typeof payload?.taskId === "string" && payload.taskId.trim().length > 0 + ? payload.taskId + : notification.id; +} + +function getNotificationEventType(notification: QueueNotification): string { + const payload = asPayloadRecord(notification.payload); + const candidates = [payload?.eventType, payload?.type, payload?.event]; + + for (const candidate of candidates) { + if (typeof candidate === "string" && candidate.trim().length > 0) { + return candidate; + } + } + + return "notification"; +} + +function formatNotificationAge(createdAt: string): string { + const parsedDate = new Date(createdAt); + if (Number.isNaN(parsedDate.getTime())) { + return "just now"; + } + + return formatDistanceToNow(parsedDate, { addSuffix: true }); +} + +function groupNotificationsByAgent(notifications: QueueNotification[]): NotificationGroup[] { + const grouped = new Map(); + + for (const notification of notifications) { + const current = grouped.get(notification.agent) ?? []; + current.push(notification); + grouped.set(notification.agent, current); + } + + return Array.from(grouped.entries()) + .sort(([leftAgent], [rightAgent]) => leftAgent.localeCompare(rightAgent)) + .map(([agent, items]) => ({ + agent, + notifications: [...items].sort( + (left, right) => new Date(right.createdAt).getTime() - new Date(left.createdAt).getTime() + ), + })); +} + +export function QueueNotificationFeed({ + className, +}: QueueNotificationFeedProps): React.JSX.Element { + const { showToast } = useToast(); + const [notifications, setNotifications] = useState([]); + const [isLoading, setIsLoading] = useState(true); + const [errorMessage, setErrorMessage] = useState(null); + const [acknowledgingIds, setAcknowledgingIds] = useState>({}); + const [isCollapsed, setIsCollapsed] = useState(false); + const [now, setNow] = useState(Date.now()); + + const refreshNotifications = useCallback(async (): Promise => { + try { + const payload = await apiGet(NOTIFICATIONS_ENDPOINT); + setNotifications(payload); + setErrorMessage(null); + } catch (error) { + const message = + error instanceof Error && error.message.trim().length > 0 + ? error.message + : "Failed to load queue notifications."; + setErrorMessage(message); + } finally { + setIsLoading(false); + } + }, []); + + useEffect(() => { + void refreshNotifications(); + }, [refreshNotifications]); + + useEffect(() => { + if (typeof EventSource === "undefined") { + return undefined; + } + + const source = new EventSource(NOTIFICATIONS_STREAM_ENDPOINT); + + source.onmessage = (event: MessageEvent): void => { + try { + const payload = JSON.parse(event.data) as QueueNotification[]; + setNotifications(payload); + setErrorMessage(null); + setIsLoading(false); + } catch { + setErrorMessage("Received an invalid notification stream payload."); + } + }; + + source.onerror = (): void => { + setErrorMessage((current) => current ?? "Queue notification stream disconnected."); + }; + + return (): void => { + source.close(); + }; + }, []); + + useEffect(() => { + const intervalId = window.setInterval(() => { + setNow(Date.now()); + }, 60_000); + + return (): void => { + window.clearInterval(intervalId); + }; + }, []); + + const groupedNotifications = useMemo( + () => groupNotificationsByAgent(notifications), + [notifications] + ); + + const pendingCount = notifications.length; + + const handleAck = useCallback( + async (notificationId: string): Promise => { + setAcknowledgingIds((current) => ({ + ...current, + [notificationId]: true, + })); + + try { + await apiPost<{ success: true; id: string }>( + `/api/orchestrator/api/queue/notifications/${encodeURIComponent(notificationId)}/ack` + ); + setNotifications((current) => current.filter((item) => item.id !== notificationId)); + } catch (error) { + const message = + error instanceof Error && error.message.trim().length > 0 + ? error.message + : "Failed to acknowledge notification."; + showToast(message, "error"); + } finally { + setAcknowledgingIds((current) => { + const { [notificationId]: _omitted, ...remaining } = current; + return remaining; + }); + } + }, + [showToast] + ); + + return ( + + +
+
+ Queue Notifications + 0 ? "status-info" : "secondary"}>{pendingCount} +
+ +
+
+ + {!isCollapsed ? ( + + {isLoading ? ( + +
+ {Array.from({ length: 6 }).map((_, index) => ( + + ))} +
+
+ ) : errorMessage && notifications.length === 0 ? ( +
+ {errorMessage} +
+ ) : groupedNotifications.length === 0 ? ( +
+
+ ) : ( + +
+ {groupedNotifications.map((group) => ( +
+
+

{group.agent}

+ + {group.notifications.length} + +
+
+ {group.notifications.map((notification) => { + const taskId = getNotificationTaskId(notification); + const eventType = getNotificationEventType(notification); + const isAcknowledging = acknowledgingIds[notification.id] ?? false; + + return ( +
+
+
+
+ + {taskId} + + {eventType} +
+ +
+ +
+
+ ); + })} +
+
+ ))} +
+
+ )} +
+ ) : null} +
+ + {pendingCount} pending notifications, refreshed at {new Date(now).toISOString()} + +
+ ); +}