"use client"; import { useCallback, useEffect, useMemo, useState } from "react"; import { formatDistanceToNow } from "date-fns"; import { BellOff, ChevronLeft, ChevronRight, Loader2 } from "lucide-react"; import { useToast } from "@mosaic/ui"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { Collapsible } from "@/components/ui/collapsible"; import { ScrollArea } from "@/components/ui/scroll-area"; import { Skeleton } from "@/components/ui/skeleton"; import { apiGet, apiPost } from "@/lib/api/client"; export interface QueueNotificationFeedProps { className?: string; } interface QueueNotification { id: string; agent: string; filename: string; payload: unknown; createdAt: string; } interface QueuePayloadRecord { taskId?: unknown; type?: unknown; eventType?: unknown; event?: unknown; } interface NotificationGroup { agent: string; notifications: QueueNotification[]; } const NOTIFICATIONS_ENDPOINT = "/api/orchestrator/api/queue/notifications"; const NOTIFICATIONS_STREAM_ENDPOINT = "/api/orchestrator/queue/notifications/stream"; function joinClasses(...classes: (string | undefined)[]): string { return classes.filter((value) => typeof value === "string" && value.length > 0).join(" "); } function asPayloadRecord(payload: unknown): QueuePayloadRecord | null { if (payload === null || typeof payload !== "object" || Array.isArray(payload)) { return null; } return payload as QueuePayloadRecord; } function getNotificationTaskId(notification: QueueNotification): string { const payload = asPayloadRecord(notification.payload); return typeof payload?.taskId === "string" && payload.taskId.trim().length > 0 ? payload.taskId : notification.id; } function getNotificationEventType(notification: QueueNotification): string { const payload = asPayloadRecord(notification.payload); const candidates = [payload?.eventType, payload?.type, payload?.event]; for (const candidate of candidates) { if (typeof candidate === "string" && candidate.trim().length > 0) { return candidate; } } return "notification"; } function formatNotificationAge(createdAt: string): string { const parsedDate = new Date(createdAt); if (Number.isNaN(parsedDate.getTime())) { return "just now"; } return formatDistanceToNow(parsedDate, { addSuffix: true }); } function groupNotificationsByAgent(notifications: QueueNotification[]): NotificationGroup[] { const grouped = new Map(); for (const notification of notifications) { const current = grouped.get(notification.agent) ?? []; current.push(notification); grouped.set(notification.agent, current); } return Array.from(grouped.entries()) .sort(([leftAgent], [rightAgent]) => leftAgent.localeCompare(rightAgent)) .map(([agent, items]) => ({ agent, notifications: [...items].sort( (left, right) => new Date(right.createdAt).getTime() - new Date(left.createdAt).getTime() ), })); } export function QueueNotificationFeed({ className, }: QueueNotificationFeedProps): React.JSX.Element { const { showToast } = useToast(); const [notifications, setNotifications] = useState([]); const [isLoading, setIsLoading] = useState(true); const [errorMessage, setErrorMessage] = useState(null); const [acknowledgingIds, setAcknowledgingIds] = useState>({}); const [isCollapsed, setIsCollapsed] = useState(false); const [now, setNow] = useState(Date.now()); const refreshNotifications = useCallback(async (): Promise => { try { const payload = await apiGet(NOTIFICATIONS_ENDPOINT); setNotifications(payload); setErrorMessage(null); } catch (error) { const message = error instanceof Error && error.message.trim().length > 0 ? error.message : "Failed to load queue notifications."; setErrorMessage(message); } finally { setIsLoading(false); } }, []); useEffect(() => { void refreshNotifications(); }, [refreshNotifications]); useEffect(() => { if (typeof EventSource === "undefined") { return undefined; } const source = new EventSource(NOTIFICATIONS_STREAM_ENDPOINT); source.onmessage = (event: MessageEvent): void => { try { const payload = JSON.parse(event.data) as QueueNotification[]; setNotifications(payload); setErrorMessage(null); setIsLoading(false); } catch { setErrorMessage("Received an invalid notification stream payload."); } }; source.onerror = (): void => { setErrorMessage((current) => current ?? "Queue notification stream disconnected."); }; return (): void => { source.close(); }; }, []); useEffect(() => { const intervalId = window.setInterval(() => { setNow(Date.now()); }, 60_000); return (): void => { window.clearInterval(intervalId); }; }, []); const groupedNotifications = useMemo( () => groupNotificationsByAgent(notifications), [notifications] ); const pendingCount = notifications.length; const handleAck = useCallback( async (notificationId: string): Promise => { setAcknowledgingIds((current) => ({ ...current, [notificationId]: true, })); try { await apiPost<{ success: true; id: string }>( `/api/orchestrator/api/queue/notifications/${encodeURIComponent(notificationId)}/ack` ); setNotifications((current) => current.filter((item) => item.id !== notificationId)); } catch (error) { const message = error instanceof Error && error.message.trim().length > 0 ? error.message : "Failed to acknowledge notification."; showToast(message, "error"); } finally { setAcknowledgingIds((current) => { const { [notificationId]: _omitted, ...remaining } = current; return remaining; }); } }, [showToast] ); return (
Queue Notifications 0 ? "status-info" : "secondary"}>{pendingCount}
{!isCollapsed ? ( {isLoading ? (
{Array.from({ length: 6 }).map((_, index) => ( ))}
) : errorMessage && notifications.length === 0 ? (
{errorMessage}
) : groupedNotifications.length === 0 ? (
) : (
{groupedNotifications.map((group) => (

{group.agent}

{group.notifications.length}
{group.notifications.map((notification) => { const taskId = getNotificationTaskId(notification); const eventType = getNotificationEventType(notification); const isAcknowledging = acknowledgingIds[notification.id] ?? false; return (
{taskId} {eventType}
); })}
))}
)}
) : null}
{pendingCount} pending notifications, refreshed at {new Date(now).toISOString()}
); }