333 lines
12 KiB
TypeScript
333 lines
12 KiB
TypeScript
"use client";
|
|
|
|
import { useCallback, useEffect, useMemo, useState } from "react";
|
|
import { formatDistanceToNow } from "date-fns";
|
|
import { BellOff, ChevronLeft, ChevronRight, Loader2 } from "lucide-react";
|
|
import { useToast } from "@mosaic/ui";
|
|
import { Badge } from "@/components/ui/badge";
|
|
import { Button } from "@/components/ui/button";
|
|
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
|
|
import { Collapsible } from "@/components/ui/collapsible";
|
|
import { ScrollArea } from "@/components/ui/scroll-area";
|
|
import { Skeleton } from "@/components/ui/skeleton";
|
|
import { apiGet, apiPost } from "@/lib/api/client";
|
|
|
|
export interface QueueNotificationFeedProps {
|
|
className?: string;
|
|
}
|
|
|
|
interface QueueNotification {
|
|
id: string;
|
|
agent: string;
|
|
filename: string;
|
|
payload: unknown;
|
|
createdAt: string;
|
|
}
|
|
|
|
interface QueuePayloadRecord {
|
|
taskId?: unknown;
|
|
type?: unknown;
|
|
eventType?: unknown;
|
|
event?: unknown;
|
|
}
|
|
|
|
interface NotificationGroup {
|
|
agent: string;
|
|
notifications: QueueNotification[];
|
|
}
|
|
|
|
const NOTIFICATIONS_ENDPOINT = "/api/orchestrator/api/queue/notifications";
|
|
const NOTIFICATIONS_STREAM_ENDPOINT = "/api/orchestrator/queue/notifications/stream";
|
|
|
|
function joinClasses(...classes: (string | undefined)[]): string {
|
|
return classes.filter((value) => typeof value === "string" && value.length > 0).join(" ");
|
|
}
|
|
|
|
function asPayloadRecord(payload: unknown): QueuePayloadRecord | null {
|
|
if (payload === null || typeof payload !== "object" || Array.isArray(payload)) {
|
|
return null;
|
|
}
|
|
|
|
return payload as QueuePayloadRecord;
|
|
}
|
|
|
|
function getNotificationTaskId(notification: QueueNotification): string {
|
|
const payload = asPayloadRecord(notification.payload);
|
|
return typeof payload?.taskId === "string" && payload.taskId.trim().length > 0
|
|
? payload.taskId
|
|
: notification.id;
|
|
}
|
|
|
|
function getNotificationEventType(notification: QueueNotification): string {
|
|
const payload = asPayloadRecord(notification.payload);
|
|
const candidates = [payload?.eventType, payload?.type, payload?.event];
|
|
|
|
for (const candidate of candidates) {
|
|
if (typeof candidate === "string" && candidate.trim().length > 0) {
|
|
return candidate;
|
|
}
|
|
}
|
|
|
|
return "notification";
|
|
}
|
|
|
|
function formatNotificationAge(createdAt: string): string {
|
|
const parsedDate = new Date(createdAt);
|
|
if (Number.isNaN(parsedDate.getTime())) {
|
|
return "just now";
|
|
}
|
|
|
|
return formatDistanceToNow(parsedDate, { addSuffix: true });
|
|
}
|
|
|
|
function groupNotificationsByAgent(notifications: QueueNotification[]): NotificationGroup[] {
|
|
const grouped = new Map<string, QueueNotification[]>();
|
|
|
|
for (const notification of notifications) {
|
|
const current = grouped.get(notification.agent) ?? [];
|
|
current.push(notification);
|
|
grouped.set(notification.agent, current);
|
|
}
|
|
|
|
return Array.from(grouped.entries())
|
|
.sort(([leftAgent], [rightAgent]) => leftAgent.localeCompare(rightAgent))
|
|
.map(([agent, items]) => ({
|
|
agent,
|
|
notifications: [...items].sort(
|
|
(left, right) => new Date(right.createdAt).getTime() - new Date(left.createdAt).getTime()
|
|
),
|
|
}));
|
|
}
|
|
|
|
export function QueueNotificationFeed({
|
|
className,
|
|
}: QueueNotificationFeedProps): React.JSX.Element {
|
|
const { showToast } = useToast();
|
|
const [notifications, setNotifications] = useState<QueueNotification[]>([]);
|
|
const [isLoading, setIsLoading] = useState(true);
|
|
const [errorMessage, setErrorMessage] = useState<string | null>(null);
|
|
const [acknowledgingIds, setAcknowledgingIds] = useState<Record<string, boolean>>({});
|
|
const [isCollapsed, setIsCollapsed] = useState(false);
|
|
const [now, setNow] = useState(Date.now());
|
|
|
|
const refreshNotifications = useCallback(async (): Promise<void> => {
|
|
try {
|
|
const payload = await apiGet<QueueNotification[]>(NOTIFICATIONS_ENDPOINT);
|
|
setNotifications(payload);
|
|
setErrorMessage(null);
|
|
} catch (error) {
|
|
const message =
|
|
error instanceof Error && error.message.trim().length > 0
|
|
? error.message
|
|
: "Failed to load queue notifications.";
|
|
setErrorMessage(message);
|
|
} finally {
|
|
setIsLoading(false);
|
|
}
|
|
}, []);
|
|
|
|
useEffect(() => {
|
|
void refreshNotifications();
|
|
}, [refreshNotifications]);
|
|
|
|
useEffect(() => {
|
|
if (typeof EventSource === "undefined") {
|
|
return undefined;
|
|
}
|
|
|
|
const source = new EventSource(NOTIFICATIONS_STREAM_ENDPOINT);
|
|
|
|
source.onmessage = (event: MessageEvent<string>): void => {
|
|
try {
|
|
const payload = JSON.parse(event.data) as QueueNotification[];
|
|
setNotifications(payload);
|
|
setErrorMessage(null);
|
|
setIsLoading(false);
|
|
} catch {
|
|
setErrorMessage("Received an invalid notification stream payload.");
|
|
}
|
|
};
|
|
|
|
source.onerror = (): void => {
|
|
setErrorMessage((current) => current ?? "Queue notification stream disconnected.");
|
|
};
|
|
|
|
return (): void => {
|
|
source.close();
|
|
};
|
|
}, []);
|
|
|
|
useEffect(() => {
|
|
const intervalId = window.setInterval(() => {
|
|
setNow(Date.now());
|
|
}, 60_000);
|
|
|
|
return (): void => {
|
|
window.clearInterval(intervalId);
|
|
};
|
|
}, []);
|
|
|
|
const groupedNotifications = useMemo(
|
|
() => groupNotificationsByAgent(notifications),
|
|
[notifications]
|
|
);
|
|
|
|
const pendingCount = notifications.length;
|
|
|
|
const handleAck = useCallback(
|
|
async (notificationId: string): Promise<void> => {
|
|
setAcknowledgingIds((current) => ({
|
|
...current,
|
|
[notificationId]: true,
|
|
}));
|
|
|
|
try {
|
|
await apiPost<{ success: true; id: string }>(
|
|
`/api/orchestrator/api/queue/notifications/${encodeURIComponent(notificationId)}/ack`
|
|
);
|
|
setNotifications((current) => current.filter((item) => item.id !== notificationId));
|
|
} catch (error) {
|
|
const message =
|
|
error instanceof Error && error.message.trim().length > 0
|
|
? error.message
|
|
: "Failed to acknowledge notification.";
|
|
showToast(message, "error");
|
|
} finally {
|
|
setAcknowledgingIds((current) => {
|
|
const { [notificationId]: _omitted, ...remaining } = current;
|
|
return remaining;
|
|
});
|
|
}
|
|
},
|
|
[showToast]
|
|
);
|
|
|
|
return (
|
|
<Card className={joinClasses("flex h-full min-h-0 flex-col", className)}>
|
|
<CardHeader className="pb-2">
|
|
<div className="flex items-start justify-between gap-2">
|
|
<div className="flex min-w-0 items-center gap-2">
|
|
<CardTitle className="text-base">Queue Notifications</CardTitle>
|
|
<Badge variant={pendingCount > 0 ? "status-info" : "secondary"}>{pendingCount}</Badge>
|
|
</div>
|
|
<Button
|
|
type="button"
|
|
variant="ghost"
|
|
size="icon"
|
|
className="h-7 w-7"
|
|
onClick={() => {
|
|
setIsCollapsed((current) => !current);
|
|
}}
|
|
aria-label={isCollapsed ? "Expand queue notifications" : "Collapse queue notifications"}
|
|
title={isCollapsed ? "Expand queue notifications" : "Collapse queue notifications"}
|
|
>
|
|
{isCollapsed ? (
|
|
<ChevronLeft className="h-4 w-4" aria-hidden="true" />
|
|
) : (
|
|
<ChevronRight className="h-4 w-4" aria-hidden="true" />
|
|
)}
|
|
</Button>
|
|
</div>
|
|
</CardHeader>
|
|
<Collapsible open={!isCollapsed} className="min-h-0 flex-1">
|
|
{!isCollapsed ? (
|
|
<CardContent className="min-h-0 flex-1 px-3 pb-3">
|
|
{isLoading ? (
|
|
<ScrollArea className="h-full">
|
|
<div className="space-y-2 pr-1">
|
|
{Array.from({ length: 6 }).map((_, index) => (
|
|
<Skeleton
|
|
key={`queue-notification-skeleton-${String(index)}`}
|
|
className="h-14 w-full"
|
|
/>
|
|
))}
|
|
</div>
|
|
</ScrollArea>
|
|
) : errorMessage && notifications.length === 0 ? (
|
|
<div className="flex h-full items-center justify-center px-4 text-center text-sm text-red-500">
|
|
{errorMessage}
|
|
</div>
|
|
) : groupedNotifications.length === 0 ? (
|
|
<div className="flex h-full flex-col items-center justify-center gap-2 text-center text-sm text-muted-foreground">
|
|
<BellOff className="h-5 w-5" aria-hidden="true" />
|
|
<span>No pending notifications</span>
|
|
</div>
|
|
) : (
|
|
<ScrollArea className="h-full">
|
|
<div className="space-y-4 pr-1">
|
|
{groupedNotifications.map((group) => (
|
|
<section key={group.agent} className="space-y-2">
|
|
<div className="flex items-center justify-between gap-2">
|
|
<h3 className="text-sm font-semibold text-foreground">{group.agent}</h3>
|
|
<span className="text-xs text-muted-foreground">
|
|
{group.notifications.length}
|
|
</span>
|
|
</div>
|
|
<div className="space-y-2">
|
|
{group.notifications.map((notification) => {
|
|
const taskId = getNotificationTaskId(notification);
|
|
const eventType = getNotificationEventType(notification);
|
|
const isAcknowledging = acknowledgingIds[notification.id] ?? false;
|
|
|
|
return (
|
|
<article
|
|
key={notification.id}
|
|
className="rounded-lg border border-border/70 bg-card px-3 py-2"
|
|
>
|
|
<div className="flex items-start justify-between gap-3">
|
|
<div className="min-w-0 space-y-1">
|
|
<div className="flex flex-wrap items-center gap-2">
|
|
<span className="font-mono text-xs text-foreground">
|
|
{taskId}
|
|
</span>
|
|
<Badge variant="secondary">{eventType}</Badge>
|
|
</div>
|
|
<time
|
|
className="block text-xs text-muted-foreground"
|
|
dateTime={notification.createdAt}
|
|
>
|
|
{formatNotificationAge(notification.createdAt)}
|
|
</time>
|
|
</div>
|
|
<Button
|
|
type="button"
|
|
variant="outline"
|
|
size="sm"
|
|
disabled={isAcknowledging}
|
|
onClick={() => {
|
|
void handleAck(notification.id);
|
|
}}
|
|
aria-label={`ACK notification ${taskId}`}
|
|
>
|
|
{isAcknowledging ? (
|
|
<span className="flex items-center gap-2">
|
|
<Loader2
|
|
className="h-4 w-4 animate-spin"
|
|
aria-hidden="true"
|
|
/>
|
|
ACK
|
|
</span>
|
|
) : (
|
|
"ACK"
|
|
)}
|
|
</Button>
|
|
</div>
|
|
</article>
|
|
);
|
|
})}
|
|
</div>
|
|
</section>
|
|
))}
|
|
</div>
|
|
</ScrollArea>
|
|
)}
|
|
</CardContent>
|
|
) : null}
|
|
</Collapsible>
|
|
<span className="sr-only" aria-live="polite">
|
|
{pendingCount} pending notifications, refreshed at {new Date(now).toISOString()}
|
|
</span>
|
|
</Card>
|
|
);
|
|
}
|