Compare commits
24 Commits
fix/missio
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 3289677056 | |||
| 5a14a97cb4 | |||
| aebf6b18db | |||
| 6fbfb3c197 | |||
| 348943c5f7 | |||
| 39d36e67c5 | |||
| e8a2d32476 | |||
| 3c0c10c9e5 | |||
| f59ce6a7a5 | |||
| 11fa1734bd | |||
| 46815707a9 | |||
| 621df6ee70 | |||
| ac406f19bc | |||
| 72d295edd6 | |||
| 6e9def3c5a | |||
| 456d53fc7f | |||
| 8014930b70 | |||
| 06f2cc4be3 | |||
| 04bbdf3308 | |||
| a6f1438f40 | |||
| 523662656e | |||
| 27120ac3f2 | |||
| ad9921107c | |||
| 3c288f9849 |
@@ -271,6 +271,26 @@ steps:
|
|||||||
depends_on:
|
depends_on:
|
||||||
- docker-build-orchestrator
|
- docker-build-orchestrator
|
||||||
|
|
||||||
|
notify-webhook:
|
||||||
|
image: curlimages/curl:8.6.0
|
||||||
|
environment:
|
||||||
|
MOSAIC_WEBHOOK_URL:
|
||||||
|
from_secret: mosaic_webhook_url
|
||||||
|
WOODPECKER_WEBHOOK_SECRET:
|
||||||
|
from_secret: woodpecker_webhook_secret
|
||||||
|
commands:
|
||||||
|
- |
|
||||||
|
BODY="{\"branch\":\"${CI_COMMIT_BRANCH}\",\"status\":\"${CI_PIPELINE_STATUS}\",\"buildUrl\":\"${CI_PIPELINE_LINK}\",\"repo\":\"${CI_REPO}\"}"
|
||||||
|
SIG=$(echo -n "$BODY" | openssl dgst -sha256 -hmac "$WOODPECKER_WEBHOOK_SECRET" | awk '{print $2}')
|
||||||
|
curl -s -o /dev/null -w "%{http_code}" -X POST "${MOSAIC_WEBHOOK_URL}/api/webhooks/woodpecker" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-H "X-Woodpecker-Signature: ${SIG}" \
|
||||||
|
-d "$BODY" || true
|
||||||
|
when:
|
||||||
|
- status: [success, failure]
|
||||||
|
depends_on:
|
||||||
|
- build
|
||||||
|
|
||||||
security-trivy-web:
|
security-trivy-web:
|
||||||
image: aquasec/trivy:latest
|
image: aquasec/trivy:latest
|
||||||
environment:
|
environment:
|
||||||
|
|||||||
@@ -56,6 +56,7 @@
|
|||||||
"bcryptjs": "^3.0.3",
|
"bcryptjs": "^3.0.3",
|
||||||
"better-auth": "^1.4.17",
|
"better-auth": "^1.4.17",
|
||||||
"bullmq": "^5.67.2",
|
"bullmq": "^5.67.2",
|
||||||
|
"chokidar": "^4.0.3",
|
||||||
"class-transformer": "^0.5.1",
|
"class-transformer": "^0.5.1",
|
||||||
"class-validator": "^0.14.3",
|
"class-validator": "^0.14.3",
|
||||||
"cookie-parser": "^1.4.7",
|
"cookie-parser": "^1.4.7",
|
||||||
|
|||||||
@@ -60,7 +60,9 @@ import { ContainerReaperModule } from "./container-reaper/container-reaper.modul
|
|||||||
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
|
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
|
||||||
import { OnboardingModule } from "./onboarding/onboarding.module";
|
import { OnboardingModule } from "./onboarding/onboarding.module";
|
||||||
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
|
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
|
||||||
|
import { MissionControlProxyModule } from "./mission-control-proxy/mission-control-proxy.module";
|
||||||
import { OrchestratorModule } from "./orchestrator/orchestrator.module";
|
import { OrchestratorModule } from "./orchestrator/orchestrator.module";
|
||||||
|
import { QueueNotificationsModule } from "./queue-notifications/queue-notifications.module";
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
@@ -142,7 +144,9 @@ import { OrchestratorModule } from "./orchestrator/orchestrator.module";
|
|||||||
FleetSettingsModule,
|
FleetSettingsModule,
|
||||||
OnboardingModule,
|
OnboardingModule,
|
||||||
ChatProxyModule,
|
ChatProxyModule,
|
||||||
|
MissionControlProxyModule,
|
||||||
OrchestratorModule,
|
OrchestratorModule,
|
||||||
|
QueueNotificationsModule,
|
||||||
],
|
],
|
||||||
controllers: [AppController, CsrfController],
|
controllers: [AppController, CsrfController],
|
||||||
providers: [
|
providers: [
|
||||||
|
|||||||
@@ -0,0 +1,286 @@
|
|||||||
|
import {
|
||||||
|
Body,
|
||||||
|
Controller,
|
||||||
|
Get,
|
||||||
|
Logger,
|
||||||
|
Param,
|
||||||
|
Post,
|
||||||
|
Req,
|
||||||
|
Res,
|
||||||
|
ServiceUnavailableException,
|
||||||
|
UseGuards,
|
||||||
|
} from "@nestjs/common";
|
||||||
|
import type { Request, Response } from "express";
|
||||||
|
import { AuthGuard } from "../auth/guards/auth.guard";
|
||||||
|
|
||||||
|
const ORCHESTRATOR_URL_KEY = "ORCHESTRATOR_URL";
|
||||||
|
const ORCHESTRATOR_API_KEY = "ORCHESTRATOR_API_KEY";
|
||||||
|
|
||||||
|
@Controller("mission-control")
|
||||||
|
@UseGuards(AuthGuard)
|
||||||
|
export class MissionControlProxyController {
|
||||||
|
private readonly logger = new Logger(MissionControlProxyController.name);
|
||||||
|
private readonly orchestratorUrl: string;
|
||||||
|
private readonly orchestratorApiKey: string;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.orchestratorUrl = this.requireEnv(ORCHESTRATOR_URL_KEY);
|
||||||
|
this.orchestratorApiKey = this.requireEnv(ORCHESTRATOR_API_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions")
|
||||||
|
proxySessions(@Req() req: Request, @Res() res: Response): Promise<void> {
|
||||||
|
return this.proxyRequest("GET", "sessions", req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions/:sessionId")
|
||||||
|
proxySession(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("GET", `sessions/${sessionId}`, req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions/:sessionId/messages")
|
||||||
|
proxyMessages(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("GET", `sessions/${sessionId}/messages`, req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("audit-log")
|
||||||
|
proxyAuditLog(@Req() req: Request, @Res() res: Response): Promise<void> {
|
||||||
|
return this.proxyRequest("GET", "audit-log", req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/inject")
|
||||||
|
proxyInject(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Body() body: unknown,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("POST", `sessions/${sessionId}/inject`, req, res, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/pause")
|
||||||
|
proxyPause(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("POST", `sessions/${sessionId}/pause`, req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/resume")
|
||||||
|
proxyResume(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("POST", `sessions/${sessionId}/resume`, req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/kill")
|
||||||
|
proxyKill(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Body() body: unknown,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("POST", `sessions/${sessionId}/kill`, req, res, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions/:sessionId/stream")
|
||||||
|
async proxySessionStream(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
const abortController = new AbortController();
|
||||||
|
req.once("close", () => {
|
||||||
|
abortController.abort();
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
const upstream = await this.fetchUpstream(
|
||||||
|
"GET",
|
||||||
|
`sessions/${sessionId}/stream`,
|
||||||
|
req.query,
|
||||||
|
undefined,
|
||||||
|
abortController.signal
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!upstream.ok || !upstream.body) {
|
||||||
|
await this.sendStandardResponse(upstream, res);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
res.status(upstream.status);
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "content-type");
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "cache-control");
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "connection");
|
||||||
|
res.setHeader("X-Accel-Buffering", "no");
|
||||||
|
|
||||||
|
if (typeof res.flushHeaders === "function") {
|
||||||
|
res.flushHeaders();
|
||||||
|
}
|
||||||
|
|
||||||
|
for await (const chunk of upstream.body as unknown as AsyncIterable<Uint8Array>) {
|
||||||
|
if (res.writableEnded || res.destroyed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
res.write(Buffer.from(chunk));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!res.writableEnded && !res.destroyed) {
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
} catch (error: unknown) {
|
||||||
|
if (this.isAbortError(error)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
this.logger.warn(`Mission Control stream proxy request failed: ${message}`);
|
||||||
|
|
||||||
|
if (!res.headersSent) {
|
||||||
|
res.status(503).json({ message: "Failed to proxy Mission Control request" });
|
||||||
|
} else if (!res.writableEnded && !res.destroyed) {
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async proxyRequest(
|
||||||
|
method: "GET" | "POST",
|
||||||
|
path: string,
|
||||||
|
req: Request,
|
||||||
|
res: Response,
|
||||||
|
body?: unknown
|
||||||
|
): Promise<void> {
|
||||||
|
const abortController = new AbortController();
|
||||||
|
req.once("close", () => {
|
||||||
|
abortController.abort();
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
const upstream = await this.fetchUpstream(
|
||||||
|
method,
|
||||||
|
path,
|
||||||
|
req.query,
|
||||||
|
body,
|
||||||
|
abortController.signal
|
||||||
|
);
|
||||||
|
await this.sendStandardResponse(upstream, res);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
if (this.isAbortError(error)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.handleProxyError(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async fetchUpstream(
|
||||||
|
method: "GET" | "POST",
|
||||||
|
path: string,
|
||||||
|
query: Request["query"],
|
||||||
|
body: unknown,
|
||||||
|
signal: AbortSignal
|
||||||
|
): Promise<globalThis.Response> {
|
||||||
|
const url = new URL(`/api/mission-control/${path}`, this.orchestratorUrl);
|
||||||
|
this.appendQueryParams(url.searchParams, query);
|
||||||
|
|
||||||
|
const headers: Record<string, string> = {
|
||||||
|
"X-API-Key": this.orchestratorApiKey,
|
||||||
|
};
|
||||||
|
|
||||||
|
const requestInit: RequestInit = {
|
||||||
|
method,
|
||||||
|
headers,
|
||||||
|
signal,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (method === "POST" && body !== undefined) {
|
||||||
|
headers["Content-Type"] = "application/json";
|
||||||
|
requestInit.body = JSON.stringify(body);
|
||||||
|
}
|
||||||
|
|
||||||
|
return fetch(url.toString(), requestInit);
|
||||||
|
}
|
||||||
|
|
||||||
|
private appendQueryParams(searchParams: URLSearchParams, query: Request["query"]): void {
|
||||||
|
for (const [key, value] of Object.entries(query)) {
|
||||||
|
this.appendQueryValue(searchParams, key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private appendQueryValue(searchParams: URLSearchParams, key: string, value: unknown): void {
|
||||||
|
if (value === undefined || value === null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Array.isArray(value)) {
|
||||||
|
for (const item of value) {
|
||||||
|
this.appendQueryValue(searchParams, key, item);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof value === "string" || typeof value === "number" || typeof value === "boolean") {
|
||||||
|
searchParams.append(key, String(value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async sendStandardResponse(upstream: globalThis.Response, res: Response): Promise<void> {
|
||||||
|
res.status(upstream.status);
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "content-type");
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "cache-control");
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "location");
|
||||||
|
|
||||||
|
const responseText = await upstream.text();
|
||||||
|
|
||||||
|
if (responseText.length === 0) {
|
||||||
|
res.end();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
res.send(responseText);
|
||||||
|
}
|
||||||
|
|
||||||
|
private copyHeaderIfPresent(
|
||||||
|
upstream: globalThis.Response,
|
||||||
|
res: Response,
|
||||||
|
headerName: string
|
||||||
|
): void {
|
||||||
|
const value = upstream.headers.get(headerName);
|
||||||
|
if (value) {
|
||||||
|
res.setHeader(headerName, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleProxyError(error: unknown): never {
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
this.logger.warn(`Mission Control proxy request failed: ${message}`);
|
||||||
|
throw new ServiceUnavailableException("Failed to proxy Mission Control request");
|
||||||
|
}
|
||||||
|
|
||||||
|
private isAbortError(error: unknown): boolean {
|
||||||
|
return error instanceof Error && error.name === "AbortError";
|
||||||
|
}
|
||||||
|
|
||||||
|
private requireEnv(key: string): string {
|
||||||
|
const value = process.env[key];
|
||||||
|
|
||||||
|
if (typeof value !== "string" || value.trim().length === 0) {
|
||||||
|
throw new Error(`@mosaic/api: ${key} is required. Set it in your config or via ${key}.`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
import { Module } from "@nestjs/common";
|
||||||
|
import { AuthModule } from "../auth/auth.module";
|
||||||
|
import { MissionControlProxyController } from "./mission-control-proxy.controller";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [AuthModule],
|
||||||
|
controllers: [MissionControlProxyController],
|
||||||
|
})
|
||||||
|
export class MissionControlProxyModule {}
|
||||||
@@ -0,0 +1,120 @@
|
|||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { NotFoundException } from "@nestjs/common";
|
||||||
|
import type { Response } from "express";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { Test, type TestingModule } from "@nestjs/testing";
|
||||||
|
import { QueueNotificationsController } from "./queue-notifications.controller";
|
||||||
|
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||||
|
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
||||||
|
|
||||||
|
describe("QueueNotificationsController", () => {
|
||||||
|
let controller: QueueNotificationsController;
|
||||||
|
|
||||||
|
const mockService = {
|
||||||
|
listNotifications: vi.fn(),
|
||||||
|
streamNotifications: vi.fn(),
|
||||||
|
ackNotification: vi.fn(),
|
||||||
|
listTasks: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const mockConfigService = {
|
||||||
|
get: vi.fn().mockReturnValue("coordinator-api-key"),
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
|
||||||
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
|
controllers: [QueueNotificationsController],
|
||||||
|
providers: [
|
||||||
|
{ provide: QueueNotificationsService, useValue: mockService },
|
||||||
|
{ provide: ConfigService, useValue: mockConfigService },
|
||||||
|
],
|
||||||
|
})
|
||||||
|
.overrideGuard(ApiKeyGuard)
|
||||||
|
.useValue({ canActivate: () => true })
|
||||||
|
.compile();
|
||||||
|
|
||||||
|
controller = module.get<QueueNotificationsController>(QueueNotificationsController);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns notification objects", async () => {
|
||||||
|
mockService.listNotifications.mockResolvedValue([
|
||||||
|
{
|
||||||
|
id: "notif-1",
|
||||||
|
agent: "mosaic",
|
||||||
|
filename: "notif-1.json",
|
||||||
|
payload: { type: "task.ready" },
|
||||||
|
createdAt: new Date("2026-03-08T22:00:00.000Z"),
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
|
||||||
|
await expect(controller.getNotifications()).resolves.toEqual([
|
||||||
|
expect.objectContaining({
|
||||||
|
id: "notif-1",
|
||||||
|
agent: "mosaic",
|
||||||
|
filename: "notif-1.json",
|
||||||
|
payload: { type: "task.ready" },
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("streams notifications through the response object", async () => {
|
||||||
|
const res = {
|
||||||
|
setHeader: vi.fn(),
|
||||||
|
flushHeaders: vi.fn(),
|
||||||
|
write: vi.fn(),
|
||||||
|
on: vi.fn(),
|
||||||
|
end: vi.fn(),
|
||||||
|
} as unknown as Response;
|
||||||
|
|
||||||
|
mockService.streamNotifications.mockResolvedValue(undefined);
|
||||||
|
|
||||||
|
await controller.streamNotifications(res);
|
||||||
|
|
||||||
|
expect(mockService.streamNotifications).toHaveBeenCalledWith(res);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("acks a notification by id", async () => {
|
||||||
|
mockService.ackNotification.mockResolvedValue({ success: true, id: "notif-2" });
|
||||||
|
|
||||||
|
await expect(controller.ackNotification("notif-2")).resolves.toEqual({
|
||||||
|
success: true,
|
||||||
|
id: "notif-2",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("surfaces ack errors", async () => {
|
||||||
|
mockService.ackNotification.mockRejectedValue(new NotFoundException("missing"));
|
||||||
|
|
||||||
|
await expect(controller.ackNotification("missing")).rejects.toThrow(NotFoundException);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns parsed queue tasks", async () => {
|
||||||
|
mockService.listTasks.mockResolvedValue([
|
||||||
|
{
|
||||||
|
id: "task-1",
|
||||||
|
project: "mosaic-stack",
|
||||||
|
taskId: "MS24-API-001",
|
||||||
|
status: "pending",
|
||||||
|
description: "Build queue notifications module",
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
|
||||||
|
await expect(controller.getTasks()).resolves.toEqual([
|
||||||
|
{
|
||||||
|
id: "task-1",
|
||||||
|
project: "mosaic-stack",
|
||||||
|
taskId: "MS24-API-001",
|
||||||
|
status: "pending",
|
||||||
|
description: "Build queue notifications module",
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("uses ApiKeyGuard at the controller level", () => {
|
||||||
|
const guards = Reflect.getMetadata("__guards__", QueueNotificationsController) as unknown[];
|
||||||
|
|
||||||
|
expect(guards).toContain(ApiKeyGuard);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,36 @@
|
|||||||
|
import { Controller, Get, Param, Post, Res, UseGuards } from "@nestjs/common";
|
||||||
|
import type { Response } from "express";
|
||||||
|
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
|
||||||
|
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
||||||
|
import {
|
||||||
|
QueueNotificationsService,
|
||||||
|
type QueueNotification,
|
||||||
|
type QueueTask,
|
||||||
|
} from "./queue-notifications.service";
|
||||||
|
|
||||||
|
@Controller("queue")
|
||||||
|
@UseGuards(ApiKeyGuard)
|
||||||
|
export class QueueNotificationsController {
|
||||||
|
constructor(private readonly queueNotificationsService: QueueNotificationsService) {}
|
||||||
|
|
||||||
|
@Get("notifications")
|
||||||
|
async getNotifications(): Promise<QueueNotification[]> {
|
||||||
|
return this.queueNotificationsService.listNotifications();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("notifications/stream")
|
||||||
|
async streamNotifications(@Res() res: Response): Promise<void> {
|
||||||
|
await this.queueNotificationsService.streamNotifications(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SkipCsrf()
|
||||||
|
@Post("notifications/:id/ack")
|
||||||
|
async ackNotification(@Param("id") id: string): Promise<{ success: true; id: string }> {
|
||||||
|
return this.queueNotificationsService.ackNotification(id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("tasks")
|
||||||
|
async getTasks(): Promise<QueueTask[]> {
|
||||||
|
return this.queueNotificationsService.listTasks();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
import { Module } from "@nestjs/common";
|
||||||
|
import { ConfigModule } from "@nestjs/config";
|
||||||
|
import { AuthModule } from "../auth/auth.module";
|
||||||
|
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
||||||
|
import { QueueNotificationsController } from "./queue-notifications.controller";
|
||||||
|
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||||
|
import { WoodpeckerWebhookController } from "./woodpecker-webhook.controller";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [ConfigModule, AuthModule],
|
||||||
|
controllers: [QueueNotificationsController, WoodpeckerWebhookController],
|
||||||
|
providers: [QueueNotificationsService, ApiKeyGuard],
|
||||||
|
exports: [QueueNotificationsService],
|
||||||
|
})
|
||||||
|
export class QueueNotificationsModule {}
|
||||||
@@ -0,0 +1,268 @@
|
|||||||
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { Logger, NotFoundException } from "@nestjs/common";
|
||||||
|
import { mkdtemp, mkdir, readFile, readdir, rm, writeFile } from "node:fs/promises";
|
||||||
|
import { tmpdir } from "node:os";
|
||||||
|
import { join } from "node:path";
|
||||||
|
import { execFile } from "node:child_process";
|
||||||
|
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||||
|
|
||||||
|
vi.mock("node:child_process", () => ({
|
||||||
|
execFile: vi.fn(),
|
||||||
|
}));
|
||||||
|
|
||||||
|
describe("QueueNotificationsService", () => {
|
||||||
|
let service: QueueNotificationsService;
|
||||||
|
let inboxDir: string;
|
||||||
|
let agentStateDir: string;
|
||||||
|
let configService: ConfigService;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-"));
|
||||||
|
agentStateDir = await mkdtemp(join(tmpdir(), "agent-state-"));
|
||||||
|
configService = {
|
||||||
|
get: vi.fn((key: string) => {
|
||||||
|
if (key === "MOSAIC_QUEUE_INBOX_DIR") {
|
||||||
|
return inboxDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (key === "MOSAIC_AGENT_STATE_DIR") {
|
||||||
|
return agentStateDir;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (key === "MOSAIC_QUEUE_CLI") {
|
||||||
|
return "/tmp/mosaic-queue-cli.js";
|
||||||
|
}
|
||||||
|
|
||||||
|
return undefined;
|
||||||
|
}),
|
||||||
|
} as unknown as ConfigService;
|
||||||
|
|
||||||
|
service = new QueueNotificationsService(configService);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
vi.restoreAllMocks();
|
||||||
|
await rm(inboxDir, { recursive: true, force: true });
|
||||||
|
await rm(agentStateDir, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("onModuleInit", () => {
|
||||||
|
it("logs a warning when the inbox directory does not exist", async () => {
|
||||||
|
await rm(inboxDir, { recursive: true, force: true });
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
|
||||||
|
await service.onModuleInit();
|
||||||
|
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("Queue notifications inbox directory does not exist")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("listNotifications", () => {
|
||||||
|
it("returns parsed notifications from agent inbox directories", async () => {
|
||||||
|
await mkdir(join(inboxDir, "mosaic"), { recursive: true });
|
||||||
|
await mkdir(join(inboxDir, "mosaic", "_acked"), { recursive: true });
|
||||||
|
await mkdir(join(inboxDir, "sage"), { recursive: true });
|
||||||
|
await writeFile(
|
||||||
|
join(inboxDir, "mosaic", "notif-1.json"),
|
||||||
|
JSON.stringify({ type: "task.ready", taskId: "MS24-API-001" })
|
||||||
|
);
|
||||||
|
await writeFile(
|
||||||
|
join(inboxDir, "mosaic", "_acked", "notif-ignored.json"),
|
||||||
|
JSON.stringify({ ignored: true })
|
||||||
|
);
|
||||||
|
await writeFile(join(inboxDir, "sage", "notif-2.json"), JSON.stringify({ type: "done" }));
|
||||||
|
|
||||||
|
const notifications = await service.listNotifications();
|
||||||
|
|
||||||
|
expect(notifications).toHaveLength(2);
|
||||||
|
expect(notifications).toEqual(
|
||||||
|
expect.arrayContaining([
|
||||||
|
expect.objectContaining({
|
||||||
|
id: "notif-1",
|
||||||
|
agent: "mosaic",
|
||||||
|
filename: "notif-1.json",
|
||||||
|
payload: { type: "task.ready", taskId: "MS24-API-001" },
|
||||||
|
}),
|
||||||
|
expect.objectContaining({
|
||||||
|
id: "notif-2",
|
||||||
|
agent: "sage",
|
||||||
|
filename: "notif-2.json",
|
||||||
|
payload: { type: "done" },
|
||||||
|
}),
|
||||||
|
])
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns an empty array when the inbox directory is missing", async () => {
|
||||||
|
await rm(inboxDir, { recursive: true, force: true });
|
||||||
|
|
||||||
|
await expect(service.listNotifications()).resolves.toEqual([]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("ackNotification", () => {
|
||||||
|
it("executes the queue CLI with node and ack args", async () => {
|
||||||
|
await mkdir(join(inboxDir, "mosaic"), { recursive: true });
|
||||||
|
await writeFile(join(inboxDir, "mosaic", "notif-3.json"), JSON.stringify({ ok: true }));
|
||||||
|
vi.mocked(execFile).mockImplementation(
|
||||||
|
(
|
||||||
|
_command: string,
|
||||||
|
_args: readonly string[],
|
||||||
|
callback: (error: Error | null, stdout: string, stderr: string) => void
|
||||||
|
) => callback(null, "acked", "")
|
||||||
|
);
|
||||||
|
|
||||||
|
await expect(service.ackNotification("notif-3")).resolves.toEqual({
|
||||||
|
success: true,
|
||||||
|
id: "notif-3",
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(execFile).toHaveBeenCalledWith(
|
||||||
|
"node",
|
||||||
|
["/tmp/mosaic-queue-cli.js", "ack", "notif-3"],
|
||||||
|
expect.any(Function)
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws NotFoundException when the notification does not exist", async () => {
|
||||||
|
await expect(service.ackNotification("missing")).rejects.toThrow(NotFoundException);
|
||||||
|
expect(execFile).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("listTasks", () => {
|
||||||
|
it("parses tab-separated CLI output", async () => {
|
||||||
|
vi.mocked(execFile).mockImplementation(
|
||||||
|
(
|
||||||
|
_command: string,
|
||||||
|
_args: readonly string[],
|
||||||
|
callback: (error: Error | null, stdout: string, stderr: string) => void
|
||||||
|
) =>
|
||||||
|
callback(
|
||||||
|
null,
|
||||||
|
[
|
||||||
|
"task-1\tmosaic-stack/MS24-API-001\t[pending]\tBuild queue notifications module",
|
||||||
|
"task-2\tmosaic-stack/MS24-API-002\t[done]\tWrite tests",
|
||||||
|
].join("\n"),
|
||||||
|
""
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
await expect(service.listTasks()).resolves.toEqual([
|
||||||
|
{
|
||||||
|
id: "task-1",
|
||||||
|
project: "mosaic-stack",
|
||||||
|
taskId: "MS24-API-001",
|
||||||
|
status: "pending",
|
||||||
|
description: "Build queue notifications module",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
id: "task-2",
|
||||||
|
project: "mosaic-stack",
|
||||||
|
taskId: "MS24-API-002",
|
||||||
|
status: "done",
|
||||||
|
description: "Write tests",
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(execFile).toHaveBeenCalledWith(
|
||||||
|
"node",
|
||||||
|
["/tmp/mosaic-queue-cli.js", "list", "mosaic-stack"],
|
||||||
|
expect.any(Function)
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("notifyAgentCiResult", () => {
|
||||||
|
it("writes one notification per active agent-state record matching the branch", async () => {
|
||||||
|
await mkdir(join(agentStateDir, "active"), { recursive: true });
|
||||||
|
await writeFile(
|
||||||
|
join(agentStateDir, "active", "sage-landing-page.json"),
|
||||||
|
JSON.stringify({
|
||||||
|
taskId: "sage-landing-page",
|
||||||
|
status: "spawned",
|
||||||
|
startedAt: "2026-03-08T22:47:20Z",
|
||||||
|
lastUpdated: "2026-03-08T23:15:11.726Z",
|
||||||
|
agent: "sage",
|
||||||
|
branch: "feature/landing-page",
|
||||||
|
description: "Create apps/landing marketing site",
|
||||||
|
})
|
||||||
|
);
|
||||||
|
await writeFile(
|
||||||
|
join(agentStateDir, "active", "pixels-other.json"),
|
||||||
|
JSON.stringify({
|
||||||
|
taskId: "pixels-other",
|
||||||
|
status: "spawned",
|
||||||
|
startedAt: "2026-03-08T22:47:20Z",
|
||||||
|
lastUpdated: "2026-03-08T23:15:11.726Z",
|
||||||
|
agent: "pixels",
|
||||||
|
branch: "feature/something-else",
|
||||||
|
description: "Unrelated",
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
service.notifyAgentCiResult({
|
||||||
|
branch: "feature/landing-page",
|
||||||
|
status: "success",
|
||||||
|
buildUrl: "https://ci.example/build/123",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
})
|
||||||
|
).resolves.toEqual({ notified: 1 });
|
||||||
|
|
||||||
|
const inboxFiles = await readdir(join(inboxDir, "sage"));
|
||||||
|
expect(inboxFiles).toHaveLength(1);
|
||||||
|
|
||||||
|
const notification = JSON.parse(
|
||||||
|
await readFile(join(inboxDir, "sage", inboxFiles[0]!), "utf8")
|
||||||
|
) as Record<string, unknown>;
|
||||||
|
|
||||||
|
expect(notification).toMatchObject({
|
||||||
|
taskId: "sage-landing-page",
|
||||||
|
event: "completed",
|
||||||
|
targetAgent: "sage",
|
||||||
|
fromAgent: "mosaic-api",
|
||||||
|
retries: 0,
|
||||||
|
maxRetries: 3,
|
||||||
|
ttlSeconds: 600,
|
||||||
|
payload: {
|
||||||
|
branch: "feature/landing-page",
|
||||||
|
buildUrl: "https://ci.example/build/123",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
ciStatus: "success",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
expect(typeof notification.id).toBe("string");
|
||||||
|
expect(typeof notification.createdAt).toBe("string");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns zero when no active agent-state branch matches the webhook payload", async () => {
|
||||||
|
await mkdir(join(agentStateDir, "active"), { recursive: true });
|
||||||
|
await writeFile(
|
||||||
|
join(agentStateDir, "active", "mosaic-task.json"),
|
||||||
|
JSON.stringify({
|
||||||
|
taskId: "mosaic-task",
|
||||||
|
status: "spawned",
|
||||||
|
startedAt: "2026-03-08T22:47:20Z",
|
||||||
|
lastUpdated: "2026-03-08T23:15:11.726Z",
|
||||||
|
agent: "mosaic",
|
||||||
|
branch: "feature/not-this-one",
|
||||||
|
description: "Unrelated",
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
service.notifyAgentCiResult({
|
||||||
|
branch: "feature/landing-page",
|
||||||
|
status: "failure",
|
||||||
|
buildUrl: "https://ci.example/build/456",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
})
|
||||||
|
).resolves.toEqual({ notified: 0 });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
341
apps/api/src/queue-notifications/queue-notifications.service.ts
Normal file
341
apps/api/src/queue-notifications/queue-notifications.service.ts
Normal file
@@ -0,0 +1,341 @@
|
|||||||
|
import {
|
||||||
|
Injectable,
|
||||||
|
InternalServerErrorException,
|
||||||
|
Logger,
|
||||||
|
NotFoundException,
|
||||||
|
OnModuleInit,
|
||||||
|
} from "@nestjs/common";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { randomUUID } from "node:crypto";
|
||||||
|
import { execFile } from "node:child_process";
|
||||||
|
import { access, mkdir, readdir, readFile, stat, writeFile } from "node:fs/promises";
|
||||||
|
import { homedir } from "node:os";
|
||||||
|
import { basename, join } from "node:path";
|
||||||
|
import type { Response } from "express";
|
||||||
|
import chokidar from "chokidar";
|
||||||
|
|
||||||
|
export interface QueueNotification {
|
||||||
|
id: string;
|
||||||
|
agent: string;
|
||||||
|
filename: string;
|
||||||
|
payload: unknown;
|
||||||
|
createdAt: Date;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface QueueTask {
|
||||||
|
id: string;
|
||||||
|
project: string;
|
||||||
|
taskId: string;
|
||||||
|
status: string;
|
||||||
|
description: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface AgentStateRecord {
|
||||||
|
taskId: string;
|
||||||
|
agent: string;
|
||||||
|
branch: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface AgentCiNotification {
|
||||||
|
id: string;
|
||||||
|
taskId: string;
|
||||||
|
event: "completed" | "failed";
|
||||||
|
targetAgent: string;
|
||||||
|
fromAgent: string;
|
||||||
|
payload: {
|
||||||
|
branch: string;
|
||||||
|
buildUrl: string;
|
||||||
|
repo: string;
|
||||||
|
ciStatus: "success" | "failure";
|
||||||
|
};
|
||||||
|
createdAt: string;
|
||||||
|
retries: number;
|
||||||
|
maxRetries: number;
|
||||||
|
ttlSeconds: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class QueueNotificationsService implements OnModuleInit {
|
||||||
|
private readonly logger = new Logger(QueueNotificationsService.name);
|
||||||
|
|
||||||
|
constructor(private readonly configService: ConfigService) {}
|
||||||
|
|
||||||
|
async onModuleInit(): Promise<void> {
|
||||||
|
if (!(await this.inboxDirExists())) {
|
||||||
|
this.logger.warn(`Queue notifications inbox directory does not exist: ${this.getInboxDir()}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async listNotifications(): Promise<QueueNotification[]> {
|
||||||
|
const inboxDir = this.getInboxDir();
|
||||||
|
|
||||||
|
if (!(await this.inboxDirExists())) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
// Paths come from controlled config plus directory entries under the inbox root.
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
const agentEntries = await readdir(inboxDir, { withFileTypes: true });
|
||||||
|
const notifications: QueueNotification[] = [];
|
||||||
|
|
||||||
|
for (const agentEntry of agentEntries) {
|
||||||
|
if (!agentEntry.isDirectory() || this.isIgnoredDirectory(agentEntry.name)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const agentDir = join(inboxDir, agentEntry.name);
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
const files = await readdir(agentDir, { withFileTypes: true });
|
||||||
|
|
||||||
|
for (const fileEntry of files) {
|
||||||
|
if (!fileEntry.isFile() || !fileEntry.name.endsWith(".json")) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const filePath = join(agentDir, fileEntry.name);
|
||||||
|
const [rawPayload, fileStats] = await Promise.all([
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
readFile(filePath, "utf8"),
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
stat(filePath),
|
||||||
|
]);
|
||||||
|
|
||||||
|
notifications.push({
|
||||||
|
id: basename(fileEntry.name, ".json"),
|
||||||
|
agent: agentEntry.name,
|
||||||
|
filename: fileEntry.name,
|
||||||
|
payload: JSON.parse(rawPayload) as unknown,
|
||||||
|
createdAt: fileStats.birthtime,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return notifications.sort(
|
||||||
|
(left, right) => right.createdAt.getTime() - left.createdAt.getTime()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async streamNotifications(res: Response): Promise<void> {
|
||||||
|
res.setHeader("Content-Type", "text/event-stream");
|
||||||
|
res.setHeader("Cache-Control", "no-cache");
|
||||||
|
res.setHeader("Connection", "keep-alive");
|
||||||
|
res.setHeader("X-Accel-Buffering", "no");
|
||||||
|
|
||||||
|
if (typeof res.flushHeaders === "function") {
|
||||||
|
res.flushHeaders();
|
||||||
|
}
|
||||||
|
|
||||||
|
const emitNotifications = async (): Promise<void> => {
|
||||||
|
try {
|
||||||
|
const notifications = await this.listNotifications();
|
||||||
|
res.write(`data: ${JSON.stringify(notifications)}\n\n`);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
res.write(`event: error\n`);
|
||||||
|
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
await emitNotifications();
|
||||||
|
|
||||||
|
const watcher = chokidar.watch(this.getInboxDir(), {
|
||||||
|
ignoreInitial: true,
|
||||||
|
persistent: true,
|
||||||
|
ignored: (watchedPath: string) => {
|
||||||
|
return watchedPath.includes("/_acked/") || watchedPath.includes("/_dead-letter/");
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
watcher.on("add", () => {
|
||||||
|
void emitNotifications();
|
||||||
|
});
|
||||||
|
|
||||||
|
watcher.on("unlink", () => {
|
||||||
|
void emitNotifications();
|
||||||
|
});
|
||||||
|
|
||||||
|
res.on("close", () => {
|
||||||
|
void watcher.close();
|
||||||
|
res.end();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async ackNotification(id: string): Promise<{ success: true; id: string }> {
|
||||||
|
const notification = (await this.listNotifications()).find((entry) => entry.id === id);
|
||||||
|
|
||||||
|
if (!notification) {
|
||||||
|
throw new NotFoundException(`Queue notification ${id} not found`);
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.execQueueCli(["ack", notification.id]);
|
||||||
|
|
||||||
|
return {
|
||||||
|
success: true,
|
||||||
|
id: notification.id,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async listTasks(): Promise<QueueTask[]> {
|
||||||
|
const stdout = await this.execQueueCli(["list", "mosaic-stack"]);
|
||||||
|
|
||||||
|
return stdout
|
||||||
|
.split(/\r?\n/)
|
||||||
|
.map((line) => line.trim())
|
||||||
|
.filter((line) => line.length > 0)
|
||||||
|
.map((line) => {
|
||||||
|
const [rawId = "", projectTaskId = "", rawStatus = "", description = ""] = line.split("\t");
|
||||||
|
const [project = "", taskId = ""] = projectTaskId.split("/");
|
||||||
|
|
||||||
|
return {
|
||||||
|
id: rawId,
|
||||||
|
project,
|
||||||
|
taskId,
|
||||||
|
status: rawStatus.replace(/^\[/, "").replace(/\]$/, ""),
|
||||||
|
description,
|
||||||
|
};
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async notifyAgentCiResult(opts: {
|
||||||
|
branch: string;
|
||||||
|
status: "success" | "failure";
|
||||||
|
buildUrl: string;
|
||||||
|
repo: string;
|
||||||
|
}): Promise<{ notified: number }> {
|
||||||
|
const activeRecords = await this.listActiveAgentStateRecords();
|
||||||
|
const matches = activeRecords.filter((record) => record.branch === opts.branch);
|
||||||
|
|
||||||
|
await Promise.all(
|
||||||
|
matches.map(async (record) => {
|
||||||
|
const notification: AgentCiNotification = {
|
||||||
|
id: randomUUID(),
|
||||||
|
taskId: record.taskId,
|
||||||
|
event: opts.status === "success" ? "completed" : "failed",
|
||||||
|
targetAgent: record.agent,
|
||||||
|
fromAgent: "mosaic-api",
|
||||||
|
payload: {
|
||||||
|
branch: opts.branch,
|
||||||
|
buildUrl: opts.buildUrl,
|
||||||
|
repo: opts.repo,
|
||||||
|
ciStatus: opts.status,
|
||||||
|
},
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
retries: 0,
|
||||||
|
maxRetries: 3,
|
||||||
|
ttlSeconds: 600,
|
||||||
|
};
|
||||||
|
|
||||||
|
const agentDir = join(this.getInboxDir(), record.agent);
|
||||||
|
// Path is built from the configured inbox root plus validated agent-state entries.
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
await mkdir(agentDir, { recursive: true });
|
||||||
|
// Path is built from the configured inbox root plus validated agent-state entries.
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
await writeFile(
|
||||||
|
join(agentDir, `${notification.id}.json`),
|
||||||
|
JSON.stringify(notification, null, 2),
|
||||||
|
"utf8"
|
||||||
|
);
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
return { notified: matches.length };
|
||||||
|
}
|
||||||
|
|
||||||
|
private async execQueueCli(args: string[]): Promise<string> {
|
||||||
|
const cliPath = this.getQueueCliPath();
|
||||||
|
|
||||||
|
return new Promise<string>((resolve, reject) => {
|
||||||
|
execFile("node", [cliPath, ...args], (error, stdout, stderr) => {
|
||||||
|
if (error) {
|
||||||
|
this.logger.error(
|
||||||
|
`Queue CLI command failed: node ${cliPath} ${args.join(" ")} | ${stderr || error.message}`
|
||||||
|
);
|
||||||
|
reject(
|
||||||
|
new InternalServerErrorException(`Queue CLI command failed: ${stderr || error.message}`)
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
resolve(stdout);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private getInboxDir(): string {
|
||||||
|
return this.expandHomePath(
|
||||||
|
this.configService.get<string>("MOSAIC_QUEUE_INBOX_DIR") ??
|
||||||
|
"~/.openclaw/workspace/agent-inbox"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private getAgentStateDir(): string {
|
||||||
|
return this.expandHomePath(
|
||||||
|
this.configService.get<string>("MOSAIC_AGENT_STATE_DIR") ?? "~/.openclaw/workspace/agents"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private getQueueCliPath(): string {
|
||||||
|
return this.expandHomePath(
|
||||||
|
this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private expandHomePath(value: string): string {
|
||||||
|
if (value === "~") {
|
||||||
|
return homedir();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value.startsWith("~/")) {
|
||||||
|
return join(homedir(), value.slice(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async inboxDirExists(): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
await access(this.getInboxDir());
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private isIgnoredDirectory(name: string): boolean {
|
||||||
|
return name === "_acked" || name === "_dead-letter";
|
||||||
|
}
|
||||||
|
|
||||||
|
private async listActiveAgentStateRecords(): Promise<AgentStateRecord[]> {
|
||||||
|
const activeDir = join(this.getAgentStateDir(), "active");
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Path comes from controlled config and a fixed "active" subdirectory.
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
const entries = await readdir(activeDir, { withFileTypes: true });
|
||||||
|
const records = await Promise.all(
|
||||||
|
entries
|
||||||
|
.filter((entry) => entry.isFile() && entry.name.endsWith(".json"))
|
||||||
|
.map(async (entry) => {
|
||||||
|
const filePath = join(activeDir, entry.name);
|
||||||
|
// Path comes from controlled config plus directory entries under the active root.
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
const rawRecord = await readFile(filePath, "utf8");
|
||||||
|
return JSON.parse(rawRecord) as AgentStateRecord;
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
return records.filter((record) => {
|
||||||
|
return (
|
||||||
|
typeof record.taskId === "string" &&
|
||||||
|
typeof record.agent === "string" &&
|
||||||
|
typeof record.branch === "string"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
this.logger.warn(`Unable to read active agent-state records from ${activeDir}: ${message}`);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,132 @@
|
|||||||
|
import { createHmac } from "node:crypto";
|
||||||
|
import { Logger } from "@nestjs/common";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { Test, type TestingModule } from "@nestjs/testing";
|
||||||
|
import type { Request } from "express";
|
||||||
|
import { describe, beforeEach, expect, it, vi } from "vitest";
|
||||||
|
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||||
|
import {
|
||||||
|
type WoodpeckerWebhookPayload,
|
||||||
|
WoodpeckerWebhookController,
|
||||||
|
} from "./woodpecker-webhook.controller";
|
||||||
|
|
||||||
|
function signPayload(payload: WoodpeckerWebhookPayload, secret: string): string {
|
||||||
|
return createHmac("sha256", secret).update(JSON.stringify(payload)).digest("hex");
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("WoodpeckerWebhookController", () => {
|
||||||
|
let controller: WoodpeckerWebhookController;
|
||||||
|
|
||||||
|
const mockService = {
|
||||||
|
notifyAgentCiResult: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const mockConfigService = {
|
||||||
|
get: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
mockConfigService.get.mockImplementation((key: string) => {
|
||||||
|
if (key === "WOODPECKER_WEBHOOK_SECRET") {
|
||||||
|
return "test-secret";
|
||||||
|
}
|
||||||
|
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
|
||||||
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
|
controllers: [WoodpeckerWebhookController],
|
||||||
|
providers: [
|
||||||
|
{ provide: QueueNotificationsService, useValue: mockService },
|
||||||
|
{ provide: ConfigService, useValue: mockConfigService },
|
||||||
|
],
|
||||||
|
}).compile();
|
||||||
|
|
||||||
|
controller = module.get<WoodpeckerWebhookController>(WoodpeckerWebhookController);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("accepts a valid signature and forwards the payload to the service", async () => {
|
||||||
|
const payload: WoodpeckerWebhookPayload = {
|
||||||
|
branch: "feat/ms24-ci-webhook",
|
||||||
|
status: "success",
|
||||||
|
buildUrl: "https://ci.example/build/123",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
};
|
||||||
|
const signature = signPayload(payload, "test-secret");
|
||||||
|
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 2 });
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
controller.handleWebhook(
|
||||||
|
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
|
||||||
|
payload,
|
||||||
|
signature
|
||||||
|
)
|
||||||
|
).resolves.toEqual({ ok: true, notified: 2 });
|
||||||
|
|
||||||
|
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns ok without notifying when the signature is invalid", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
const payload: WoodpeckerWebhookPayload = {
|
||||||
|
branch: "feat/ms24-ci-webhook",
|
||||||
|
status: "failure",
|
||||||
|
buildUrl: "https://ci.example/build/123",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
};
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
controller.handleWebhook(
|
||||||
|
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
|
||||||
|
payload,
|
||||||
|
"bad-signature"
|
||||||
|
)
|
||||||
|
).resolves.toEqual({ ok: true, notified: 0 });
|
||||||
|
|
||||||
|
expect(mockService.notifyAgentCiResult).not.toHaveBeenCalled();
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("invalid Woodpecker webhook signature")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("accepts the payload when the webhook secret is missing", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
const payload: WoodpeckerWebhookPayload = {
|
||||||
|
branch: "feat/ms24-ci-webhook",
|
||||||
|
status: "success",
|
||||||
|
buildUrl: "https://ci.example/build/123",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
};
|
||||||
|
mockConfigService.get.mockReturnValue(undefined);
|
||||||
|
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 1 });
|
||||||
|
|
||||||
|
await expect(controller.handleWebhook({} as Request, payload, "")).resolves.toEqual({
|
||||||
|
ok: true,
|
||||||
|
notified: 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("WOODPECKER_WEBHOOK_SECRET is not configured")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns zero notifications when no active branch matches", async () => {
|
||||||
|
const payload: WoodpeckerWebhookPayload = {
|
||||||
|
branch: "feat/ms24-ci-webhook",
|
||||||
|
status: "success",
|
||||||
|
buildUrl: "https://ci.example/build/999",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
};
|
||||||
|
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 0 });
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
controller.handleWebhook(
|
||||||
|
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
|
||||||
|
payload,
|
||||||
|
signPayload(payload, "test-secret")
|
||||||
|
)
|
||||||
|
).resolves.toEqual({ ok: true, notified: 0 });
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,73 @@
|
|||||||
|
import { Body, Controller, Headers, Logger, Post, Req, type RawBodyRequest } from "@nestjs/common";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { createHmac, timingSafeEqual } from "node:crypto";
|
||||||
|
import type { Request } from "express";
|
||||||
|
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
|
||||||
|
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||||
|
|
||||||
|
export interface WoodpeckerWebhookPayload {
|
||||||
|
branch: string;
|
||||||
|
status: "success" | "failure";
|
||||||
|
buildUrl: string;
|
||||||
|
repo: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Controller("webhooks")
|
||||||
|
export class WoodpeckerWebhookController {
|
||||||
|
private readonly logger = new Logger(WoodpeckerWebhookController.name);
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly queueService: QueueNotificationsService,
|
||||||
|
private readonly configService: ConfigService
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@SkipCsrf()
|
||||||
|
@Post("woodpecker")
|
||||||
|
async handleWebhook(
|
||||||
|
@Req() req: RawBodyRequest<Request>,
|
||||||
|
@Body() body: WoodpeckerWebhookPayload,
|
||||||
|
@Headers("x-woodpecker-signature") signature: string
|
||||||
|
): Promise<{ ok: boolean; notified: number }> {
|
||||||
|
const secret = this.configService.get<string>("WOODPECKER_WEBHOOK_SECRET");
|
||||||
|
|
||||||
|
if (!secret) {
|
||||||
|
this.logger.warn("WOODPECKER_WEBHOOK_SECRET is not configured; accepting Woodpecker webhook");
|
||||||
|
const result = await this.queueService.notifyAgentCiResult(body);
|
||||||
|
return { ok: true, notified: result.notified };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.isValidSignature(this.getRequestBody(req, body), signature, secret)) {
|
||||||
|
this.logger.warn("Received invalid Woodpecker webhook signature");
|
||||||
|
return { ok: true, notified: 0 };
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await this.queueService.notifyAgentCiResult(body);
|
||||||
|
return { ok: true, notified: result.notified };
|
||||||
|
}
|
||||||
|
|
||||||
|
private getRequestBody(req: RawBodyRequest<Request>, body: WoodpeckerWebhookPayload): Buffer {
|
||||||
|
const rawBody = req.rawBody;
|
||||||
|
|
||||||
|
if (Buffer.isBuffer(rawBody)) {
|
||||||
|
return rawBody;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Buffer.from(JSON.stringify(body));
|
||||||
|
}
|
||||||
|
|
||||||
|
private isValidSignature(body: Buffer, signature: string | undefined, secret: string): boolean {
|
||||||
|
if (!signature) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const expected = createHmac("sha256", secret).update(body).digest("hex");
|
||||||
|
const signatureBuffer = Buffer.from(signature);
|
||||||
|
const expectedBuffer = Buffer.from(expected);
|
||||||
|
|
||||||
|
if (signatureBuffer.length !== expectedBuffer.length) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return timingSafeEqual(signatureBuffer, expectedBuffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
93
apps/web/src/app/api/orchestrator/[...path]/route.ts
Normal file
93
apps/web/src/app/api/orchestrator/[...path]/route.ts
Normal file
@@ -0,0 +1,93 @@
|
|||||||
|
import { type NextRequest, NextResponse } from "next/server";
|
||||||
|
|
||||||
|
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
|
||||||
|
|
||||||
|
function getOrchestratorUrl(): string {
|
||||||
|
return (
|
||||||
|
process.env.ORCHESTRATOR_URL ??
|
||||||
|
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
|
||||||
|
process.env.NEXT_PUBLIC_API_URL ??
|
||||||
|
DEFAULT_ORCHESTRATOR_URL
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generic catch-all proxy for orchestrator API routes.
|
||||||
|
*
|
||||||
|
* Forwards any request to /api/orchestrator/<path> → ORCHESTRATOR_URL/<path>
|
||||||
|
* with the ORCHESTRATOR_API_KEY injected server-side so it never reaches the browser.
|
||||||
|
*
|
||||||
|
* Supports GET, POST, PATCH, DELETE, PUT.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
* GET /api/orchestrator/mission-control/sessions
|
||||||
|
* → GET ORCHESTRATOR_URL/api/mission-control/sessions
|
||||||
|
* POST /api/orchestrator/mission-control/sessions/abc/kill
|
||||||
|
* → POST ORCHESTRATOR_URL/api/mission-control/sessions/abc/kill
|
||||||
|
*/
|
||||||
|
async function proxyToOrchestrator(
|
||||||
|
request: NextRequest,
|
||||||
|
context: { params: Promise<{ path: string[] }> }
|
||||||
|
): Promise<NextResponse> {
|
||||||
|
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
|
||||||
|
if (!orchestratorApiKey) {
|
||||||
|
return NextResponse.json(
|
||||||
|
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
|
||||||
|
{ status: 503 }
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
const { path } = await context.params;
|
||||||
|
const upstreamPath = `/${path.join("/")}`;
|
||||||
|
const search = request.nextUrl.search;
|
||||||
|
const upstreamUrl = `${getOrchestratorUrl()}${upstreamPath}${search}`;
|
||||||
|
|
||||||
|
const controller = new AbortController();
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
controller.abort();
|
||||||
|
}, 30_000);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const headers: Record<string, string> = {
|
||||||
|
"X-API-Key": orchestratorApiKey,
|
||||||
|
};
|
||||||
|
|
||||||
|
const contentType = request.headers.get("Content-Type");
|
||||||
|
if (contentType) {
|
||||||
|
headers["Content-Type"] = contentType;
|
||||||
|
}
|
||||||
|
|
||||||
|
const hasBody = request.method !== "GET" && request.method !== "HEAD";
|
||||||
|
const body = hasBody ? await request.text() : null;
|
||||||
|
|
||||||
|
const upstream = await fetch(upstreamUrl, {
|
||||||
|
method: request.method,
|
||||||
|
headers,
|
||||||
|
...(body !== null ? { body } : {}),
|
||||||
|
cache: "no-store",
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
const responseText = await upstream.text();
|
||||||
|
return new NextResponse(responseText, {
|
||||||
|
status: upstream.status,
|
||||||
|
headers: {
|
||||||
|
"Content-Type": upstream.headers.get("Content-Type") ?? "application/json",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
const message =
|
||||||
|
error instanceof Error && error.name === "AbortError"
|
||||||
|
? "Orchestrator request timed out."
|
||||||
|
: "Unable to reach orchestrator.";
|
||||||
|
return NextResponse.json({ error: message }, { status: 502 });
|
||||||
|
} finally {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const GET = proxyToOrchestrator;
|
||||||
|
export const POST = proxyToOrchestrator;
|
||||||
|
export const PATCH = proxyToOrchestrator;
|
||||||
|
export const PUT = proxyToOrchestrator;
|
||||||
|
export const DELETE = proxyToOrchestrator;
|
||||||
@@ -0,0 +1,57 @@
|
|||||||
|
import { NextResponse } from "next/server";
|
||||||
|
|
||||||
|
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
|
||||||
|
|
||||||
|
function getOrchestratorUrl(): string {
|
||||||
|
return (
|
||||||
|
process.env.ORCHESTRATOR_URL ??
|
||||||
|
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
|
||||||
|
process.env.NEXT_PUBLIC_API_URL ??
|
||||||
|
DEFAULT_ORCHESTRATOR_URL
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
export const dynamic = "force-dynamic";
|
||||||
|
|
||||||
|
export async function GET(): Promise<NextResponse> {
|
||||||
|
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
|
||||||
|
if (!orchestratorApiKey) {
|
||||||
|
return NextResponse.json(
|
||||||
|
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
|
||||||
|
{ status: 503 }
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const upstream = await fetch(`${getOrchestratorUrl()}/api/queue/notifications/stream`, {
|
||||||
|
method: "GET",
|
||||||
|
headers: {
|
||||||
|
"X-API-Key": orchestratorApiKey,
|
||||||
|
Accept: "text/event-stream",
|
||||||
|
},
|
||||||
|
cache: "no-store",
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!upstream.ok || upstream.body === null) {
|
||||||
|
const message = await upstream.text();
|
||||||
|
return new NextResponse(message || "Failed to connect to queue notifications stream", {
|
||||||
|
status: upstream.status || 502,
|
||||||
|
headers: {
|
||||||
|
"Content-Type": upstream.headers.get("Content-Type") ?? "text/plain; charset=utf-8",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return new NextResponse(upstream.body, {
|
||||||
|
status: upstream.status,
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "text/event-stream",
|
||||||
|
"Cache-Control": "no-cache, no-transform",
|
||||||
|
Connection: "keep-alive",
|
||||||
|
"X-Accel-Buffering": "no",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
} catch {
|
||||||
|
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
"use client";
|
"use client";
|
||||||
|
|
||||||
import { useEffect, useState } from "react";
|
import { useEffect, useMemo, useState } from "react";
|
||||||
import Link from "next/link";
|
import Link from "next/link";
|
||||||
import { usePathname } from "next/navigation";
|
import { usePathname } from "next/navigation";
|
||||||
import Image from "next/image";
|
import Image from "next/image";
|
||||||
@@ -29,6 +29,10 @@ interface NavGroup {
|
|||||||
items: NavItemConfig[];
|
items: NavItemConfig[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface QueueNotification {
|
||||||
|
id: string;
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// SVG Icons (16x16 viewBox, stroke="currentColor", strokeWidth="1.5")
|
// SVG Icons (16x16 viewBox, stroke="currentColor", strokeWidth="1.5")
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
@@ -685,6 +689,72 @@ function CollapseToggle({ collapsed, onToggle }: CollapseToggleProps): React.JSX
|
|||||||
export function AppSidebar(): React.JSX.Element {
|
export function AppSidebar(): React.JSX.Element {
|
||||||
const pathname = usePathname();
|
const pathname = usePathname();
|
||||||
const { collapsed, toggleCollapsed, mobileOpen, setMobileOpen, isMobile } = useSidebar();
|
const { collapsed, toggleCollapsed, mobileOpen, setMobileOpen, isMobile } = useSidebar();
|
||||||
|
const [missionControlBadgeCount, setMissionControlBadgeCount] = useState<number>(0);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
let isActive = true;
|
||||||
|
|
||||||
|
const loadNotificationCount = async (): Promise<void> => {
|
||||||
|
try {
|
||||||
|
const response = await fetch("/api/orchestrator/api/queue/notifications", {
|
||||||
|
method: "GET",
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const payload = (await response.json()) as QueueNotification[];
|
||||||
|
if (isActive) {
|
||||||
|
setMissionControlBadgeCount(Array.isArray(payload) ? payload.length : 0);
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Ignore badge failures in the nav.
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
void loadNotificationCount();
|
||||||
|
|
||||||
|
if (typeof EventSource === "undefined") {
|
||||||
|
return (): void => {
|
||||||
|
isActive = false;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const source = new EventSource("/api/orchestrator/queue/notifications/stream");
|
||||||
|
|
||||||
|
source.onmessage = (event: MessageEvent<string>): void => {
|
||||||
|
try {
|
||||||
|
const payload = JSON.parse(event.data) as QueueNotification[];
|
||||||
|
if (isActive) {
|
||||||
|
setMissionControlBadgeCount(Array.isArray(payload) ? payload.length : 0);
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Ignore malformed badge updates.
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return (): void => {
|
||||||
|
isActive = false;
|
||||||
|
source.close();
|
||||||
|
};
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
const navGroups = useMemo(
|
||||||
|
() =>
|
||||||
|
NAV_GROUPS.map((group) => ({
|
||||||
|
...group,
|
||||||
|
items: group.items.map((item) =>
|
||||||
|
item.href === "/mission-control" && missionControlBadgeCount > 0
|
||||||
|
? {
|
||||||
|
...item,
|
||||||
|
badge: { label: String(missionControlBadgeCount) },
|
||||||
|
}
|
||||||
|
: item
|
||||||
|
),
|
||||||
|
})),
|
||||||
|
[missionControlBadgeCount]
|
||||||
|
);
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<>
|
<>
|
||||||
@@ -722,7 +792,7 @@ export function AppSidebar(): React.JSX.Element {
|
|||||||
}}
|
}}
|
||||||
aria-label="Main navigation"
|
aria-label="Main navigation"
|
||||||
>
|
>
|
||||||
{NAV_GROUPS.map((group) => (
|
{navGroups.map((group) => (
|
||||||
<div key={group.label} style={{ marginBottom: "18px" }}>
|
<div key={group.label} style={{ marginBottom: "18px" }}>
|
||||||
{/* Group label — hidden when collapsed */}
|
{/* Group label — hidden when collapsed */}
|
||||||
{!collapsed && (
|
{!collapsed && (
|
||||||
|
|||||||
@@ -158,7 +158,9 @@ async function fetchAuditLog(
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return await apiGet<AuditLogResponse>(`/api/mission-control/audit-log?${params.toString()}`);
|
return await apiGet<AuditLogResponse>(
|
||||||
|
`/api/orchestrator/api/mission-control/audit-log?${params.toString()}`
|
||||||
|
);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (isRateLimitError(error)) {
|
if (isRateLimitError(error)) {
|
||||||
return createEmptyAuditLogResponse(page, "Rate limited - retrying...");
|
return createEmptyAuditLogResponse(page, "Rate limited - retrying...");
|
||||||
|
|||||||
@@ -59,9 +59,12 @@ describe("BargeInInput", (): void => {
|
|||||||
await user.click(screen.getByRole("button", { name: "Send" }));
|
await user.click(screen.getByRole("button", { name: "Send" }));
|
||||||
|
|
||||||
await waitFor((): void => {
|
await waitFor((): void => {
|
||||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-1/inject", {
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
content: "execute plan",
|
"/api/orchestrator/api/mission-control/sessions/session-1/inject",
|
||||||
});
|
{
|
||||||
|
content: "execute plan",
|
||||||
|
}
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(onSent).toHaveBeenCalledTimes(1);
|
expect(onSent).toHaveBeenCalledTimes(1);
|
||||||
@@ -83,12 +86,18 @@ describe("BargeInInput", (): void => {
|
|||||||
|
|
||||||
const calls = mockApiPost.mock.calls as [string, unknown?][];
|
const calls = mockApiPost.mock.calls as [string, unknown?][];
|
||||||
|
|
||||||
expect(calls[0]).toEqual(["/api/mission-control/sessions/session-2/pause", undefined]);
|
expect(calls[0]).toEqual([
|
||||||
|
"/api/orchestrator/api/mission-control/sessions/session-2/pause",
|
||||||
|
undefined,
|
||||||
|
]);
|
||||||
expect(calls[1]).toEqual([
|
expect(calls[1]).toEqual([
|
||||||
"/api/mission-control/sessions/session-2/inject",
|
"/api/orchestrator/api/mission-control/sessions/session-2/inject",
|
||||||
{ content: "hello world" },
|
{ content: "hello world" },
|
||||||
]);
|
]);
|
||||||
expect(calls[2]).toEqual(["/api/mission-control/sessions/session-2/resume", undefined]);
|
expect(calls[2]).toEqual([
|
||||||
|
"/api/orchestrator/api/mission-control/sessions/session-2/resume",
|
||||||
|
undefined,
|
||||||
|
]);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("submits with Enter and does not submit on Shift+Enter", async (): Promise<void> => {
|
it("submits with Enter and does not submit on Shift+Enter", async (): Promise<void> => {
|
||||||
@@ -105,9 +114,12 @@ describe("BargeInInput", (): void => {
|
|||||||
fireEvent.keyDown(textarea, { key: "Enter", code: "Enter", shiftKey: false });
|
fireEvent.keyDown(textarea, { key: "Enter", code: "Enter", shiftKey: false });
|
||||||
|
|
||||||
await waitFor((): void => {
|
await waitFor((): void => {
|
||||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-3/inject", {
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
content: "first",
|
"/api/orchestrator/api/mission-control/sessions/session-3/inject",
|
||||||
});
|
{
|
||||||
|
content: "first",
|
||||||
|
}
|
||||||
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ export function BargeInInput({ sessionId, onSent }: BargeInInputProps): React.JS
|
|||||||
}
|
}
|
||||||
|
|
||||||
const encodedSessionId = encodeURIComponent(sessionId);
|
const encodedSessionId = encodeURIComponent(sessionId);
|
||||||
const baseEndpoint = `/api/mission-control/sessions/${encodedSessionId}`;
|
const baseEndpoint = `/api/orchestrator/api/mission-control/sessions/${encodedSessionId}`;
|
||||||
let didPause = false;
|
let didPause = false;
|
||||||
let didInject = false;
|
let didInject = false;
|
||||||
|
|
||||||
|
|||||||
@@ -177,9 +177,12 @@ describe("GlobalAgentRoster", (): void => {
|
|||||||
fireEvent.click(screen.getByRole("button", { name: "Kill session killme12" }));
|
fireEvent.click(screen.getByRole("button", { name: "Kill session killme12" }));
|
||||||
|
|
||||||
await waitFor((): void => {
|
await waitFor((): void => {
|
||||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/killme123456/kill", {
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
force: false,
|
"/api/orchestrator/api/mission-control/sessions/killme123456/kill",
|
||||||
});
|
{
|
||||||
|
force: false,
|
||||||
|
}
|
||||||
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -83,7 +83,7 @@ function groupByProvider(sessions: MissionControlSession[]): ProviderSessionGrou
|
|||||||
|
|
||||||
async function fetchSessions(): Promise<MissionControlSession[]> {
|
async function fetchSessions(): Promise<MissionControlSession[]> {
|
||||||
const payload = await apiGet<MissionControlSession[] | SessionsPayload>(
|
const payload = await apiGet<MissionControlSession[] | SessionsPayload>(
|
||||||
"/api/mission-control/sessions"
|
"/api/orchestrator/api/mission-control/sessions"
|
||||||
);
|
);
|
||||||
return Array.isArray(payload) ? payload : payload.sessions;
|
return Array.isArray(payload) ? payload : payload.sessions;
|
||||||
}
|
}
|
||||||
@@ -118,9 +118,12 @@ export function GlobalAgentRoster({
|
|||||||
|
|
||||||
const killMutation = useMutation({
|
const killMutation = useMutation({
|
||||||
mutationFn: async (sessionId: string): Promise<string> => {
|
mutationFn: async (sessionId: string): Promise<string> => {
|
||||||
await apiPost<{ message: string }>(`/api/mission-control/sessions/${sessionId}/kill`, {
|
await apiPost<{ message: string }>(
|
||||||
force: false,
|
`/api/orchestrator/api/mission-control/sessions/${sessionId}/kill`,
|
||||||
});
|
{
|
||||||
|
force: false,
|
||||||
|
}
|
||||||
|
);
|
||||||
return sessionId;
|
return sessionId;
|
||||||
},
|
},
|
||||||
onSuccess: (): void => {
|
onSuccess: (): void => {
|
||||||
|
|||||||
@@ -112,14 +112,20 @@ describe("KillAllDialog", (): void => {
|
|||||||
await user.click(screen.getByRole("button", { name: "Kill All Agents" }));
|
await user.click(screen.getByRole("button", { name: "Kill All Agents" }));
|
||||||
|
|
||||||
await waitFor((): void => {
|
await waitFor((): void => {
|
||||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/internal-1/kill", {
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
force: true,
|
"/api/orchestrator/api/mission-control/sessions/internal-1/kill",
|
||||||
});
|
{
|
||||||
|
force: true,
|
||||||
|
}
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(mockApiPost).not.toHaveBeenCalledWith("/api/mission-control/sessions/external-1/kill", {
|
expect(mockApiPost).not.toHaveBeenCalledWith(
|
||||||
force: true,
|
"/api/orchestrator/api/mission-control/sessions/external-1/kill",
|
||||||
});
|
{
|
||||||
|
force: true,
|
||||||
|
}
|
||||||
|
);
|
||||||
expect(onComplete).toHaveBeenCalledTimes(1);
|
expect(onComplete).toHaveBeenCalledTimes(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -141,12 +147,18 @@ describe("KillAllDialog", (): void => {
|
|||||||
await user.click(screen.getByRole("button", { name: "Kill All Agents" }));
|
await user.click(screen.getByRole("button", { name: "Kill All Agents" }));
|
||||||
|
|
||||||
await waitFor((): void => {
|
await waitFor((): void => {
|
||||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/internal-2/kill", {
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
force: true,
|
"/api/orchestrator/api/mission-control/sessions/internal-2/kill",
|
||||||
});
|
{
|
||||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/external-2/kill", {
|
force: true,
|
||||||
force: true,
|
}
|
||||||
});
|
);
|
||||||
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
|
"/api/orchestrator/api/mission-control/sessions/external-2/kill",
|
||||||
|
{
|
||||||
|
force: true,
|
||||||
|
}
|
||||||
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -96,9 +96,12 @@ export function KillAllDialog({ sessions, onComplete }: KillAllDialogProps): Rea
|
|||||||
|
|
||||||
const killRequests = targetSessions.map(async (session) => {
|
const killRequests = targetSessions.map(async (session) => {
|
||||||
try {
|
try {
|
||||||
await apiPost<{ message: string }>(`/api/mission-control/sessions/${session.id}/kill`, {
|
await apiPost<{ message: string }>(
|
||||||
force: true,
|
`/api/orchestrator/api/mission-control/sessions/${session.id}/kill`,
|
||||||
});
|
{
|
||||||
|
force: true,
|
||||||
|
}
|
||||||
|
);
|
||||||
return true;
|
return true;
|
||||||
} catch {
|
} catch {
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ interface MockButtonProps extends ButtonHTMLAttributes<HTMLButtonElement> {
|
|||||||
|
|
||||||
const mockGlobalAgentRoster = vi.fn();
|
const mockGlobalAgentRoster = vi.fn();
|
||||||
const mockMissionControlPanel = vi.fn();
|
const mockMissionControlPanel = vi.fn();
|
||||||
|
const mockQueueNotificationFeed = vi.fn();
|
||||||
|
|
||||||
vi.mock("@/components/mission-control/AuditLogDrawer", () => ({
|
vi.mock("@/components/mission-control/AuditLogDrawer", () => ({
|
||||||
AuditLogDrawer: ({ trigger }: { trigger: ReactNode }): React.JSX.Element => (
|
AuditLogDrawer: ({ trigger }: { trigger: ReactNode }): React.JSX.Element => (
|
||||||
@@ -31,6 +32,13 @@ vi.mock("@/components/mission-control/MissionControlPanel", () => ({
|
|||||||
MIN_PANEL_COUNT: 1,
|
MIN_PANEL_COUNT: 1,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/mission-control/QueueNotificationFeed", () => ({
|
||||||
|
QueueNotificationFeed: (props: unknown): React.JSX.Element => {
|
||||||
|
mockQueueNotificationFeed(props);
|
||||||
|
return <div data-testid="queue-notification-feed" />;
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
vi.mock("@/components/ui/button", () => ({
|
vi.mock("@/components/ui/button", () => ({
|
||||||
Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => (
|
Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => (
|
||||||
<button {...props}>{children}</button>
|
<button {...props}>{children}</button>
|
||||||
@@ -66,5 +74,6 @@ describe("MissionControlLayout", (): void => {
|
|||||||
expect(region.querySelector("main")).toBeInTheDocument();
|
expect(region.querySelector("main")).toBeInTheDocument();
|
||||||
expect(screen.getByTestId("global-agent-roster")).toBeInTheDocument();
|
expect(screen.getByTestId("global-agent-roster")).toBeInTheDocument();
|
||||||
expect(screen.getByTestId("mission-control-panel")).toBeInTheDocument();
|
expect(screen.getByTestId("mission-control-panel")).toBeInTheDocument();
|
||||||
|
expect(screen.getByTestId("queue-notification-feed")).toBeInTheDocument();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import {
|
|||||||
MissionControlPanel,
|
MissionControlPanel,
|
||||||
type PanelConfig,
|
type PanelConfig,
|
||||||
} from "@/components/mission-control/MissionControlPanel";
|
} from "@/components/mission-control/MissionControlPanel";
|
||||||
|
import { QueueNotificationFeed } from "@/components/mission-control/QueueNotificationFeed";
|
||||||
import { Button } from "@/components/ui/button";
|
import { Button } from "@/components/ui/button";
|
||||||
|
|
||||||
const INITIAL_PANELS: PanelConfig[] = [{}];
|
const INITIAL_PANELS: PanelConfig[] = [{}];
|
||||||
@@ -94,7 +95,7 @@ export function MissionControlLayout(): React.JSX.Element {
|
|||||||
/>
|
/>
|
||||||
</header>
|
</header>
|
||||||
|
|
||||||
<div className="grid min-h-0 flex-1 gap-4 xl:grid-cols-[280px_minmax(0,1fr)]">
|
<div className="grid min-h-0 flex-1 gap-4 xl:grid-cols-[280px_minmax(0,1fr)_320px]">
|
||||||
<aside className="h-full min-h-0">
|
<aside className="h-full min-h-0">
|
||||||
<GlobalAgentRoster
|
<GlobalAgentRoster
|
||||||
onSelectSession={handleSelectSession}
|
onSelectSession={handleSelectSession}
|
||||||
@@ -109,6 +110,9 @@ export function MissionControlLayout(): React.JSX.Element {
|
|||||||
onExpandPanel={handleExpandPanel}
|
onExpandPanel={handleExpandPanel}
|
||||||
/>
|
/>
|
||||||
</main>
|
</main>
|
||||||
|
<aside className="h-full min-h-0">
|
||||||
|
<QueueNotificationFeed />
|
||||||
|
</aside>
|
||||||
</div>
|
</div>
|
||||||
</section>
|
</section>
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -89,7 +89,7 @@ describe("PanelControls", (): void => {
|
|||||||
|
|
||||||
await waitFor((): void => {
|
await waitFor((): void => {
|
||||||
expect(mockApiPost).toHaveBeenCalledWith(
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
"/api/mission-control/sessions/session%20with%20space/pause",
|
"/api/orchestrator/api/mission-control/sessions/session%20with%20space/pause",
|
||||||
undefined
|
undefined
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
@@ -114,9 +114,12 @@ describe("PanelControls", (): void => {
|
|||||||
await user.click(screen.getByRole("button", { name: "Confirm" }));
|
await user.click(screen.getByRole("button", { name: "Confirm" }));
|
||||||
|
|
||||||
await waitFor((): void => {
|
await waitFor((): void => {
|
||||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-4/kill", {
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
force: false,
|
"/api/orchestrator/api/mission-control/sessions/session-4/kill",
|
||||||
});
|
{
|
||||||
|
force: false,
|
||||||
|
}
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(onStatusChange).toHaveBeenCalledWith("killed");
|
expect(onStatusChange).toHaveBeenCalledWith("killed");
|
||||||
@@ -137,9 +140,12 @@ describe("PanelControls", (): void => {
|
|||||||
await user.click(screen.getByRole("button", { name: "Confirm" }));
|
await user.click(screen.getByRole("button", { name: "Confirm" }));
|
||||||
|
|
||||||
await waitFor((): void => {
|
await waitFor((): void => {
|
||||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-5/kill", {
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
force: true,
|
"/api/orchestrator/api/mission-control/sessions/session-5/kill",
|
||||||
});
|
{
|
||||||
|
force: true,
|
||||||
|
}
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(onStatusChange).toHaveBeenCalledWith("killed");
|
expect(onStatusChange).toHaveBeenCalledWith("killed");
|
||||||
|
|||||||
@@ -50,23 +50,23 @@ export function PanelControls({
|
|||||||
switch (action) {
|
switch (action) {
|
||||||
case "pause":
|
case "pause":
|
||||||
await apiPost<{ message: string }>(
|
await apiPost<{ message: string }>(
|
||||||
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/pause`
|
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/pause`
|
||||||
);
|
);
|
||||||
return { nextStatus: "paused" };
|
return { nextStatus: "paused" };
|
||||||
case "resume":
|
case "resume":
|
||||||
await apiPost<{ message: string }>(
|
await apiPost<{ message: string }>(
|
||||||
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/resume`
|
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/resume`
|
||||||
);
|
);
|
||||||
return { nextStatus: "active" };
|
return { nextStatus: "active" };
|
||||||
case "graceful-kill":
|
case "graceful-kill":
|
||||||
await apiPost<{ message: string }>(
|
await apiPost<{ message: string }>(
|
||||||
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
|
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
|
||||||
{ force: false }
|
{ force: false }
|
||||||
);
|
);
|
||||||
return { nextStatus: "killed" };
|
return { nextStatus: "killed" };
|
||||||
case "force-kill":
|
case "force-kill":
|
||||||
await apiPost<{ message: string }>(
|
await apiPost<{ message: string }>(
|
||||||
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
|
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
|
||||||
{ force: true }
|
{ force: true }
|
||||||
);
|
);
|
||||||
return { nextStatus: "killed" };
|
return { nextStatus: "killed" };
|
||||||
|
|||||||
@@ -0,0 +1,225 @@
|
|||||||
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { act, render, screen, waitFor } from "@testing-library/react";
|
||||||
|
import userEvent from "@testing-library/user-event";
|
||||||
|
import type { ButtonHTMLAttributes, HTMLAttributes, ReactNode } from "react";
|
||||||
|
|
||||||
|
vi.mock("date-fns", () => ({
|
||||||
|
formatDistanceToNow: (): string => "5 minutes ago",
|
||||||
|
}));
|
||||||
|
|
||||||
|
interface MockButtonProps extends ButtonHTMLAttributes<HTMLButtonElement> {
|
||||||
|
children: ReactNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface MockContainerProps extends HTMLAttributes<HTMLElement> {
|
||||||
|
children: ReactNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface MockEventSourceInstance {
|
||||||
|
url: string;
|
||||||
|
onerror: ((event: Event) => void) | null;
|
||||||
|
onmessage: ((event: MessageEvent<string>) => void) | null;
|
||||||
|
close: ReturnType<typeof vi.fn>;
|
||||||
|
emitMessage: (payload: unknown) => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface QueueNotification {
|
||||||
|
id: string;
|
||||||
|
agent: string;
|
||||||
|
filename: string;
|
||||||
|
payload: unknown;
|
||||||
|
createdAt: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
const mockApiGet = vi.fn<(endpoint: string) => Promise<QueueNotification[]>>();
|
||||||
|
const mockApiPost = vi.fn<(endpoint: string) => Promise<{ success: true; id: string }>>();
|
||||||
|
const mockShowToast = vi.fn<(message: string, variant?: string) => void>();
|
||||||
|
|
||||||
|
let mockEventSourceInstances: MockEventSourceInstance[] = [];
|
||||||
|
|
||||||
|
const MockEventSource = vi.fn(function (this: MockEventSourceInstance, url: string): void {
|
||||||
|
this.url = url;
|
||||||
|
this.onerror = null;
|
||||||
|
this.onmessage = null;
|
||||||
|
this.close = vi.fn();
|
||||||
|
this.emitMessage = (payload: unknown): void => {
|
||||||
|
this.onmessage?.(new MessageEvent("message", { data: JSON.stringify(payload) }));
|
||||||
|
};
|
||||||
|
|
||||||
|
mockEventSourceInstances.push(this);
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.mock("@/lib/api/client", () => ({
|
||||||
|
apiGet: (endpoint: string): Promise<QueueNotification[]> => mockApiGet(endpoint),
|
||||||
|
apiPost: (endpoint: string): Promise<{ success: true; id: string }> => mockApiPost(endpoint),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@mosaic/ui", () => ({
|
||||||
|
useToast: (): { showToast: typeof mockShowToast; removeToast: ReturnType<typeof vi.fn> } => ({
|
||||||
|
showToast: mockShowToast,
|
||||||
|
removeToast: vi.fn(),
|
||||||
|
}),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/button", () => ({
|
||||||
|
Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => (
|
||||||
|
<button {...props}>{children}</button>
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/badge", () => ({
|
||||||
|
Badge: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<span {...props}>{children}</span>
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/card", () => ({
|
||||||
|
Card: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<section {...props}>{children}</section>
|
||||||
|
),
|
||||||
|
CardHeader: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<header {...props}>{children}</header>
|
||||||
|
),
|
||||||
|
CardContent: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<div {...props}>{children}</div>
|
||||||
|
),
|
||||||
|
CardTitle: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<h2 {...props}>{children}</h2>
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/collapsible", () => ({
|
||||||
|
Collapsible: ({ children }: MockContainerProps): React.JSX.Element => <div>{children}</div>,
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/scroll-area", () => ({
|
||||||
|
ScrollArea: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<div {...props}>{children}</div>
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/skeleton", () => ({
|
||||||
|
Skeleton: (props: HTMLAttributes<HTMLDivElement>): React.JSX.Element => <div {...props} />,
|
||||||
|
}));
|
||||||
|
|
||||||
|
import { QueueNotificationFeed } from "./QueueNotificationFeed";
|
||||||
|
|
||||||
|
function latestEventSource(): MockEventSourceInstance {
|
||||||
|
const instance = mockEventSourceInstances[mockEventSourceInstances.length - 1];
|
||||||
|
if (!instance) {
|
||||||
|
throw new Error("Expected an EventSource instance");
|
||||||
|
}
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeNotification(overrides: Partial<QueueNotification>): QueueNotification {
|
||||||
|
return {
|
||||||
|
id: "notif-1",
|
||||||
|
agent: "mosaic",
|
||||||
|
filename: "notif-1.json",
|
||||||
|
payload: {
|
||||||
|
taskId: "MS24-WEB-001",
|
||||||
|
eventType: "task.ready",
|
||||||
|
},
|
||||||
|
createdAt: "2026-03-08T23:00:00.000Z",
|
||||||
|
...overrides,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("QueueNotificationFeed", (): void => {
|
||||||
|
beforeEach((): void => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
vi.stubGlobal("EventSource", MockEventSource);
|
||||||
|
vi.stubGlobal("fetch", vi.fn());
|
||||||
|
mockEventSourceInstances = [];
|
||||||
|
mockApiGet.mockResolvedValue([]);
|
||||||
|
mockApiPost.mockResolvedValue({ success: true, id: "notif-1" });
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach((): void => {
|
||||||
|
vi.unstubAllGlobals();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("loads and renders notifications grouped by agent", async (): Promise<void> => {
|
||||||
|
mockApiGet.mockResolvedValue([
|
||||||
|
makeNotification({ id: "notif-1", agent: "mosaic" }),
|
||||||
|
makeNotification({
|
||||||
|
id: "notif-2",
|
||||||
|
agent: "dyor",
|
||||||
|
payload: { taskId: "MS24-WEB-002", eventType: "task.blocked" },
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
|
||||||
|
render(<QueueNotificationFeed />);
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(mockApiGet).toHaveBeenCalledWith("/api/orchestrator/api/queue/notifications");
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(screen.getByText("mosaic")).toBeInTheDocument();
|
||||||
|
expect(screen.getByText("dyor")).toBeInTheDocument();
|
||||||
|
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
|
||||||
|
expect(screen.getByText("task.ready")).toBeInTheDocument();
|
||||||
|
expect(screen.getAllByText("5 minutes ago")).toHaveLength(2);
|
||||||
|
expect(MockEventSource).toHaveBeenCalledWith("/api/orchestrator/queue/notifications/stream");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("acknowledges a notification and removes it from the list", async (): Promise<void> => {
|
||||||
|
const user = userEvent.setup();
|
||||||
|
mockApiGet.mockResolvedValue([makeNotification({ id: "notif-ack" })]);
|
||||||
|
|
||||||
|
render(<QueueNotificationFeed />);
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
|
||||||
|
});
|
||||||
|
|
||||||
|
await user.click(screen.getByRole("button", { name: "ACK notification MS24-WEB-001" }));
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
|
"/api/orchestrator/api/queue/notifications/notif-ack/ack"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(screen.getByText("No pending notifications")).toBeInTheDocument();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("renders the empty state when there are no notifications", async (): Promise<void> => {
|
||||||
|
mockApiGet.mockResolvedValue([]);
|
||||||
|
|
||||||
|
render(<QueueNotificationFeed />);
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(screen.getByText("No pending notifications")).toBeInTheDocument();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("refreshes the list when an SSE message arrives", async (): Promise<void> => {
|
||||||
|
mockApiGet.mockResolvedValue([makeNotification({ id: "notif-before" })]);
|
||||||
|
|
||||||
|
render(<QueueNotificationFeed />);
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
|
||||||
|
});
|
||||||
|
|
||||||
|
act(() => {
|
||||||
|
latestEventSource().emitMessage([
|
||||||
|
makeNotification({
|
||||||
|
id: "notif-after",
|
||||||
|
agent: "sage",
|
||||||
|
payload: { taskId: "MS24-WEB-003", eventType: "task.failed" },
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(screen.getByText("sage")).toBeInTheDocument();
|
||||||
|
expect(screen.getByText("MS24-WEB-003")).toBeInTheDocument();
|
||||||
|
expect(screen.getByText("task.failed")).toBeInTheDocument();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,332 @@
|
|||||||
|
"use client";
|
||||||
|
|
||||||
|
import { useCallback, useEffect, useMemo, useState } from "react";
|
||||||
|
import { formatDistanceToNow } from "date-fns";
|
||||||
|
import { BellOff, ChevronLeft, ChevronRight, Loader2 } from "lucide-react";
|
||||||
|
import { useToast } from "@mosaic/ui";
|
||||||
|
import { Badge } from "@/components/ui/badge";
|
||||||
|
import { Button } from "@/components/ui/button";
|
||||||
|
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
|
||||||
|
import { Collapsible } from "@/components/ui/collapsible";
|
||||||
|
import { ScrollArea } from "@/components/ui/scroll-area";
|
||||||
|
import { Skeleton } from "@/components/ui/skeleton";
|
||||||
|
import { apiGet, apiPost } from "@/lib/api/client";
|
||||||
|
|
||||||
|
export interface QueueNotificationFeedProps {
|
||||||
|
className?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface QueueNotification {
|
||||||
|
id: string;
|
||||||
|
agent: string;
|
||||||
|
filename: string;
|
||||||
|
payload: unknown;
|
||||||
|
createdAt: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface QueuePayloadRecord {
|
||||||
|
taskId?: unknown;
|
||||||
|
type?: unknown;
|
||||||
|
eventType?: unknown;
|
||||||
|
event?: unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface NotificationGroup {
|
||||||
|
agent: string;
|
||||||
|
notifications: QueueNotification[];
|
||||||
|
}
|
||||||
|
|
||||||
|
const NOTIFICATIONS_ENDPOINT = "/api/orchestrator/api/queue/notifications";
|
||||||
|
const NOTIFICATIONS_STREAM_ENDPOINT = "/api/orchestrator/queue/notifications/stream";
|
||||||
|
|
||||||
|
function joinClasses(...classes: (string | undefined)[]): string {
|
||||||
|
return classes.filter((value) => typeof value === "string" && value.length > 0).join(" ");
|
||||||
|
}
|
||||||
|
|
||||||
|
function asPayloadRecord(payload: unknown): QueuePayloadRecord | null {
|
||||||
|
if (payload === null || typeof payload !== "object" || Array.isArray(payload)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return payload as QueuePayloadRecord;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getNotificationTaskId(notification: QueueNotification): string {
|
||||||
|
const payload = asPayloadRecord(notification.payload);
|
||||||
|
return typeof payload?.taskId === "string" && payload.taskId.trim().length > 0
|
||||||
|
? payload.taskId
|
||||||
|
: notification.id;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getNotificationEventType(notification: QueueNotification): string {
|
||||||
|
const payload = asPayloadRecord(notification.payload);
|
||||||
|
const candidates = [payload?.eventType, payload?.type, payload?.event];
|
||||||
|
|
||||||
|
for (const candidate of candidates) {
|
||||||
|
if (typeof candidate === "string" && candidate.trim().length > 0) {
|
||||||
|
return candidate;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return "notification";
|
||||||
|
}
|
||||||
|
|
||||||
|
function formatNotificationAge(createdAt: string): string {
|
||||||
|
const parsedDate = new Date(createdAt);
|
||||||
|
if (Number.isNaN(parsedDate.getTime())) {
|
||||||
|
return "just now";
|
||||||
|
}
|
||||||
|
|
||||||
|
return formatDistanceToNow(parsedDate, { addSuffix: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
function groupNotificationsByAgent(notifications: QueueNotification[]): NotificationGroup[] {
|
||||||
|
const grouped = new Map<string, QueueNotification[]>();
|
||||||
|
|
||||||
|
for (const notification of notifications) {
|
||||||
|
const current = grouped.get(notification.agent) ?? [];
|
||||||
|
current.push(notification);
|
||||||
|
grouped.set(notification.agent, current);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Array.from(grouped.entries())
|
||||||
|
.sort(([leftAgent], [rightAgent]) => leftAgent.localeCompare(rightAgent))
|
||||||
|
.map(([agent, items]) => ({
|
||||||
|
agent,
|
||||||
|
notifications: [...items].sort(
|
||||||
|
(left, right) => new Date(right.createdAt).getTime() - new Date(left.createdAt).getTime()
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
export function QueueNotificationFeed({
|
||||||
|
className,
|
||||||
|
}: QueueNotificationFeedProps): React.JSX.Element {
|
||||||
|
const { showToast } = useToast();
|
||||||
|
const [notifications, setNotifications] = useState<QueueNotification[]>([]);
|
||||||
|
const [isLoading, setIsLoading] = useState(true);
|
||||||
|
const [errorMessage, setErrorMessage] = useState<string | null>(null);
|
||||||
|
const [acknowledgingIds, setAcknowledgingIds] = useState<Record<string, boolean>>({});
|
||||||
|
const [isCollapsed, setIsCollapsed] = useState(false);
|
||||||
|
const [now, setNow] = useState(Date.now());
|
||||||
|
|
||||||
|
const refreshNotifications = useCallback(async (): Promise<void> => {
|
||||||
|
try {
|
||||||
|
const payload = await apiGet<QueueNotification[]>(NOTIFICATIONS_ENDPOINT);
|
||||||
|
setNotifications(payload);
|
||||||
|
setErrorMessage(null);
|
||||||
|
} catch (error) {
|
||||||
|
const message =
|
||||||
|
error instanceof Error && error.message.trim().length > 0
|
||||||
|
? error.message
|
||||||
|
: "Failed to load queue notifications.";
|
||||||
|
setErrorMessage(message);
|
||||||
|
} finally {
|
||||||
|
setIsLoading(false);
|
||||||
|
}
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
void refreshNotifications();
|
||||||
|
}, [refreshNotifications]);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
if (typeof EventSource === "undefined") {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
const source = new EventSource(NOTIFICATIONS_STREAM_ENDPOINT);
|
||||||
|
|
||||||
|
source.onmessage = (event: MessageEvent<string>): void => {
|
||||||
|
try {
|
||||||
|
const payload = JSON.parse(event.data) as QueueNotification[];
|
||||||
|
setNotifications(payload);
|
||||||
|
setErrorMessage(null);
|
||||||
|
setIsLoading(false);
|
||||||
|
} catch {
|
||||||
|
setErrorMessage("Received an invalid notification stream payload.");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
source.onerror = (): void => {
|
||||||
|
setErrorMessage((current) => current ?? "Queue notification stream disconnected.");
|
||||||
|
};
|
||||||
|
|
||||||
|
return (): void => {
|
||||||
|
source.close();
|
||||||
|
};
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
const intervalId = window.setInterval(() => {
|
||||||
|
setNow(Date.now());
|
||||||
|
}, 60_000);
|
||||||
|
|
||||||
|
return (): void => {
|
||||||
|
window.clearInterval(intervalId);
|
||||||
|
};
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
const groupedNotifications = useMemo(
|
||||||
|
() => groupNotificationsByAgent(notifications),
|
||||||
|
[notifications]
|
||||||
|
);
|
||||||
|
|
||||||
|
const pendingCount = notifications.length;
|
||||||
|
|
||||||
|
const handleAck = useCallback(
|
||||||
|
async (notificationId: string): Promise<void> => {
|
||||||
|
setAcknowledgingIds((current) => ({
|
||||||
|
...current,
|
||||||
|
[notificationId]: true,
|
||||||
|
}));
|
||||||
|
|
||||||
|
try {
|
||||||
|
await apiPost<{ success: true; id: string }>(
|
||||||
|
`/api/orchestrator/api/queue/notifications/${encodeURIComponent(notificationId)}/ack`
|
||||||
|
);
|
||||||
|
setNotifications((current) => current.filter((item) => item.id !== notificationId));
|
||||||
|
} catch (error) {
|
||||||
|
const message =
|
||||||
|
error instanceof Error && error.message.trim().length > 0
|
||||||
|
? error.message
|
||||||
|
: "Failed to acknowledge notification.";
|
||||||
|
showToast(message, "error");
|
||||||
|
} finally {
|
||||||
|
setAcknowledgingIds((current) => {
|
||||||
|
const { [notificationId]: _omitted, ...remaining } = current;
|
||||||
|
return remaining;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
[showToast]
|
||||||
|
);
|
||||||
|
|
||||||
|
return (
|
||||||
|
<Card className={joinClasses("flex h-full min-h-0 flex-col", className)}>
|
||||||
|
<CardHeader className="pb-2">
|
||||||
|
<div className="flex items-start justify-between gap-2">
|
||||||
|
<div className="flex min-w-0 items-center gap-2">
|
||||||
|
<CardTitle className="text-base">Queue Notifications</CardTitle>
|
||||||
|
<Badge variant={pendingCount > 0 ? "status-info" : "secondary"}>{pendingCount}</Badge>
|
||||||
|
</div>
|
||||||
|
<Button
|
||||||
|
type="button"
|
||||||
|
variant="ghost"
|
||||||
|
size="icon"
|
||||||
|
className="h-7 w-7"
|
||||||
|
onClick={() => {
|
||||||
|
setIsCollapsed((current) => !current);
|
||||||
|
}}
|
||||||
|
aria-label={isCollapsed ? "Expand queue notifications" : "Collapse queue notifications"}
|
||||||
|
title={isCollapsed ? "Expand queue notifications" : "Collapse queue notifications"}
|
||||||
|
>
|
||||||
|
{isCollapsed ? (
|
||||||
|
<ChevronLeft className="h-4 w-4" aria-hidden="true" />
|
||||||
|
) : (
|
||||||
|
<ChevronRight className="h-4 w-4" aria-hidden="true" />
|
||||||
|
)}
|
||||||
|
</Button>
|
||||||
|
</div>
|
||||||
|
</CardHeader>
|
||||||
|
<Collapsible open={!isCollapsed} className="min-h-0 flex-1">
|
||||||
|
{!isCollapsed ? (
|
||||||
|
<CardContent className="min-h-0 flex-1 px-3 pb-3">
|
||||||
|
{isLoading ? (
|
||||||
|
<ScrollArea className="h-full">
|
||||||
|
<div className="space-y-2 pr-1">
|
||||||
|
{Array.from({ length: 6 }).map((_, index) => (
|
||||||
|
<Skeleton
|
||||||
|
key={`queue-notification-skeleton-${String(index)}`}
|
||||||
|
className="h-14 w-full"
|
||||||
|
/>
|
||||||
|
))}
|
||||||
|
</div>
|
||||||
|
</ScrollArea>
|
||||||
|
) : errorMessage && notifications.length === 0 ? (
|
||||||
|
<div className="flex h-full items-center justify-center px-4 text-center text-sm text-red-500">
|
||||||
|
{errorMessage}
|
||||||
|
</div>
|
||||||
|
) : groupedNotifications.length === 0 ? (
|
||||||
|
<div className="flex h-full flex-col items-center justify-center gap-2 text-center text-sm text-muted-foreground">
|
||||||
|
<BellOff className="h-5 w-5" aria-hidden="true" />
|
||||||
|
<span>No pending notifications</span>
|
||||||
|
</div>
|
||||||
|
) : (
|
||||||
|
<ScrollArea className="h-full">
|
||||||
|
<div className="space-y-4 pr-1">
|
||||||
|
{groupedNotifications.map((group) => (
|
||||||
|
<section key={group.agent} className="space-y-2">
|
||||||
|
<div className="flex items-center justify-between gap-2">
|
||||||
|
<h3 className="text-sm font-semibold text-foreground">{group.agent}</h3>
|
||||||
|
<span className="text-xs text-muted-foreground">
|
||||||
|
{group.notifications.length}
|
||||||
|
</span>
|
||||||
|
</div>
|
||||||
|
<div className="space-y-2">
|
||||||
|
{group.notifications.map((notification) => {
|
||||||
|
const taskId = getNotificationTaskId(notification);
|
||||||
|
const eventType = getNotificationEventType(notification);
|
||||||
|
const isAcknowledging = acknowledgingIds[notification.id] ?? false;
|
||||||
|
|
||||||
|
return (
|
||||||
|
<article
|
||||||
|
key={notification.id}
|
||||||
|
className="rounded-lg border border-border/70 bg-card px-3 py-2"
|
||||||
|
>
|
||||||
|
<div className="flex items-start justify-between gap-3">
|
||||||
|
<div className="min-w-0 space-y-1">
|
||||||
|
<div className="flex flex-wrap items-center gap-2">
|
||||||
|
<span className="font-mono text-xs text-foreground">
|
||||||
|
{taskId}
|
||||||
|
</span>
|
||||||
|
<Badge variant="secondary">{eventType}</Badge>
|
||||||
|
</div>
|
||||||
|
<time
|
||||||
|
className="block text-xs text-muted-foreground"
|
||||||
|
dateTime={notification.createdAt}
|
||||||
|
>
|
||||||
|
{formatNotificationAge(notification.createdAt)}
|
||||||
|
</time>
|
||||||
|
</div>
|
||||||
|
<Button
|
||||||
|
type="button"
|
||||||
|
variant="outline"
|
||||||
|
size="sm"
|
||||||
|
disabled={isAcknowledging}
|
||||||
|
onClick={() => {
|
||||||
|
void handleAck(notification.id);
|
||||||
|
}}
|
||||||
|
aria-label={`ACK notification ${taskId}`}
|
||||||
|
>
|
||||||
|
{isAcknowledging ? (
|
||||||
|
<span className="flex items-center gap-2">
|
||||||
|
<Loader2
|
||||||
|
className="h-4 w-4 animate-spin"
|
||||||
|
aria-hidden="true"
|
||||||
|
/>
|
||||||
|
ACK
|
||||||
|
</span>
|
||||||
|
) : (
|
||||||
|
"ACK"
|
||||||
|
)}
|
||||||
|
</Button>
|
||||||
|
</div>
|
||||||
|
</article>
|
||||||
|
);
|
||||||
|
})}
|
||||||
|
</div>
|
||||||
|
</section>
|
||||||
|
))}
|
||||||
|
</div>
|
||||||
|
</ScrollArea>
|
||||||
|
)}
|
||||||
|
</CardContent>
|
||||||
|
) : null}
|
||||||
|
</Collapsible>
|
||||||
|
<span className="sr-only" aria-live="polite">
|
||||||
|
{pendingCount} pending notifications, refreshed at {new Date(now).toISOString()}
|
||||||
|
</span>
|
||||||
|
</Card>
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -316,6 +316,8 @@ services:
|
|||||||
SANDBOX_ENABLED: "true"
|
SANDBOX_ENABLED: "true"
|
||||||
# API key for authenticating requests from the web proxy
|
# API key for authenticating requests from the web proxy
|
||||||
ORCHESTRATOR_API_KEY: ${ORCHESTRATOR_API_KEY}
|
ORCHESTRATOR_API_KEY: ${ORCHESTRATOR_API_KEY}
|
||||||
|
# Prisma database connection (uses the shared openbrain postgres)
|
||||||
|
DATABASE_URL: postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@openbrain_brain-db:5432/${POSTGRES_DB:-mosaic}
|
||||||
volumes:
|
volumes:
|
||||||
- /var/run/docker.sock:/var/run/docker.sock:ro
|
- /var/run/docker.sock:/var/run/docker.sock:ro
|
||||||
- orchestrator_workspace:/workspace
|
- orchestrator_workspace:/workspace
|
||||||
@@ -331,6 +333,7 @@ services:
|
|||||||
start_period: 40s
|
start_period: 40s
|
||||||
networks:
|
networks:
|
||||||
- internal
|
- internal
|
||||||
|
- openbrain-brain-internal
|
||||||
cap_drop:
|
cap_drop:
|
||||||
- ALL
|
- ALL
|
||||||
cap_add:
|
cap_add:
|
||||||
@@ -403,6 +406,7 @@ services:
|
|||||||
networks:
|
networks:
|
||||||
- internal
|
- internal
|
||||||
- traefik-public
|
- traefik-public
|
||||||
|
- openbrain-brain-internal
|
||||||
deploy:
|
deploy:
|
||||||
restart_policy:
|
restart_policy:
|
||||||
condition: on-failure
|
condition: on-failure
|
||||||
|
|||||||
@@ -119,14 +119,14 @@ Target version: `v0.0.23`
|
|||||||
|
|
||||||
### Phase 0 — Backend Core (Foundation)
|
### Phase 0 — Backend Core (Foundation)
|
||||||
|
|
||||||
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
|
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
|
||||||
| ----------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- |
|
| ----------- | ------ | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- |
|
||||||
| MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD |
|
| MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD |
|
||||||
| MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
|
| MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
|
||||||
| MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
|
| MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
|
||||||
| MS23-P0-004 | done | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-07 | 2026-03-07 | 15K | — | |
|
| MS23-P0-004 | done | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-07 | 2026-03-07 | 15K | — | |
|
||||||
| MS23-P0-005 | done | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
|
| MS23-P0-005 | done | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
|
||||||
| MS23-P0-006 | done | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | codex | 2026-03-07 | 2026-03-07 | 20K | — | Phase 0 gate: SSE stream verified via curl |
|
| MS23-P0-006 | done | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | codex | 2026-03-07 | 2026-03-07 | 20K | — | Phase 0 gate: SSE stream verified via curl |
|
||||||
|
|
||||||
### Phase 1 — Provider Interface (Plugin Architecture)
|
### Phase 1 — Provider Interface (Plugin Architecture)
|
||||||
|
|
||||||
@@ -183,3 +183,29 @@ Target version: `v0.0.23`
|
|||||||
| **Total** | **29** | **~478K** |
|
| **Total** | **29** | **~478K** |
|
||||||
|
|
||||||
Recommended dispatch: Codex for Phase 2 UI + routine API tasks; Sonnet for complex streaming logic (P0-003, P1-005, P3-002).
|
Recommended dispatch: Codex for Phase 2 UI + routine API tasks; Sonnet for complex streaming logic (P0-003, P1-005, P3-002).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## MS24 — Queue Integration in Mission Control
|
||||||
|
|
||||||
|
PRD: Issue #749
|
||||||
|
Milestone: `0.0.24`
|
||||||
|
Target version: `v0.0.24`
|
||||||
|
|
||||||
|
> Single-writer: orchestrator (Jarvis/OpenClaw) only. Workers read but never modify.
|
||||||
|
|
||||||
|
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
|
||||||
|
| ------------ | ------ | --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----- | ----- | -------------------- | ------------ | ------------ | ----- | ---------- | ------------ | -------- | ---- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||||
|
| MS24-API-001 | done | p0-api | QueueNotificationsModule: GET /notifications, GET /notifications/stream (SSE), POST /notifications/:id/ack, GET /tasks. Auth: ApiKeyGuard. Unit tests included. | #749 | api | feat/ms24-queue-api | — | MS24-WEB-001 | — | — | — | 25K | — | Guard: ApiKeyGuard from src/common/guards/api-key.guard.ts (COORDINATOR_API_KEY). SSE via raw express Response + chokidar. Fail-soft if inbox dir missing. |
|
||||||
|
| MS24-API-002 | done | p0-api | Woodpecker CI webhook → agent notification: POST /api/webhooks/woodpecker, HMAC-SHA256 verify, scan agent-state active dir, fire mosaic-queue notification to matched agent | #749 | api | feat/ms24-ci-webhook | MS24-API-001 | MS24-VER-001 | codex | 2026-03-08 | — | 15K | — | event: completed/failed. @SkipCsrf, no auth guard. Also adds notify-webhook step to .woodpecker/ci.yml |
|
||||||
|
| MS24-WEB-001 | done | p0-ui | QueueNotificationFeed component + MissionControlLayout wiring (right sidebar panel, badge count) | #749 | web | feat/ms24-queue-ui | MS24-API-001 | MS24-VER-001 | — | — | — | 20K | — | SSE to /api/queue/notifications/stream. ACK button. Collapsible panel in MissionControlLayout.tsx. |
|
||||||
|
| MS24-VER-001 | done | p0-verify | CI green, no regressions, deploy to prod, tag v0.0.24 | #749 | stack | — | MS24-WEB-001 | — | — | — | — | 5K | — | |
|
||||||
|
|
||||||
|
### MS24 Budget Summary
|
||||||
|
|
||||||
|
| Phase | Tasks | Estimate |
|
||||||
|
| --------- | ----- | -------- |
|
||||||
|
| API | 1 | ~25K |
|
||||||
|
| UI | 1 | ~20K |
|
||||||
|
| Verify | 1 | ~5K |
|
||||||
|
| **Total** | **3** | **~50K** |
|
||||||
|
|||||||
5
pnpm-lock.yaml
generated
5
pnpm-lock.yaml
generated
@@ -162,6 +162,9 @@ importers:
|
|||||||
bullmq:
|
bullmq:
|
||||||
specifier: ^5.67.2
|
specifier: ^5.67.2
|
||||||
version: 5.67.2
|
version: 5.67.2
|
||||||
|
chokidar:
|
||||||
|
specifier: ^4.0.3
|
||||||
|
version: 4.0.3
|
||||||
class-transformer:
|
class-transformer:
|
||||||
specifier: ^0.5.1
|
specifier: ^0.5.1
|
||||||
version: 0.5.1
|
version: 0.5.1
|
||||||
@@ -1625,7 +1628,6 @@ packages:
|
|||||||
|
|
||||||
'@mosaicstack/telemetry-client@0.1.1':
|
'@mosaicstack/telemetry-client@0.1.1':
|
||||||
resolution: {integrity: sha512-1udg6p4cs8rhQgQ2pKCfi7EpRlJieRRhA5CIqthRQ6HQZLgQ0wH+632jEulov3rlHSM1iplIQ+AAe5DWrvSkEA==, tarball: https://git.mosaicstack.dev/api/packages/mosaic/npm/%40mosaicstack%2Ftelemetry-client/-/0.1.1/telemetry-client-0.1.1.tgz}
|
resolution: {integrity: sha512-1udg6p4cs8rhQgQ2pKCfi7EpRlJieRRhA5CIqthRQ6HQZLgQ0wH+632jEulov3rlHSM1iplIQ+AAe5DWrvSkEA==, tarball: https://git.mosaicstack.dev/api/packages/mosaic/npm/%40mosaicstack%2Ftelemetry-client/-/0.1.1/telemetry-client-0.1.1.tgz}
|
||||||
engines: {node: '>=18'}
|
|
||||||
|
|
||||||
'@mrleebo/prisma-ast@0.13.1':
|
'@mrleebo/prisma-ast@0.13.1':
|
||||||
resolution: {integrity: sha512-XyroGQXcHrZdvmrGJvsA9KNeOOgGMg1Vg9OlheUsBOSKznLMDl+YChxbkboRHvtFYJEMRYmlV3uoo/njCw05iw==}
|
resolution: {integrity: sha512-XyroGQXcHrZdvmrGJvsA9KNeOOgGMg1Vg9OlheUsBOSKznLMDl+YChxbkboRHvtFYJEMRYmlV3uoo/njCw05iw==}
|
||||||
@@ -5176,6 +5178,7 @@ packages:
|
|||||||
|
|
||||||
glob@10.5.0:
|
glob@10.5.0:
|
||||||
resolution: {integrity: sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg==}
|
resolution: {integrity: sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg==}
|
||||||
|
deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
|
||||||
hasBin: true
|
hasBin: true
|
||||||
|
|
||||||
glob@13.0.0:
|
glob@13.0.0:
|
||||||
|
|||||||
Reference in New Issue
Block a user