Compare commits

..

17 Commits

Author SHA1 Message Date
3289677056 Merge pull request 'chore(tasks): mark MS24 all done' (#756) from chore/ms24-ver-done into main 2026-03-09 02:58:46 +00:00
5a14a97cb4 chore(tasks): mark MS24 all done including API-002 and VER-001 2026-03-08 21:58:33 -05:00
aebf6b18db Merge pull request 'chore(tasks): correct MS24 task statuses' (#755) from chore/ms24-tasks-correction into main 2026-03-09 02:38:36 +00:00
6fbfb3c197 chore(tasks): correct MS24 task statuses — API-002 done, VER-001 not-started pending CI 2026-03-08 21:38:12 -05:00
348943c5f7 Merge pull request 'feat(api): MS24 Woodpecker CI webhook → agent notification' (#754) from feat/ms24-ci-webhook into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-09 02:33:37 +00:00
39d36e67c5 ci: trigger pipeline with woodpecker secrets
All checks were successful
ci/woodpecker/push/infra Pipeline was successful
ci/woodpecker/push/coordinator Pipeline was successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 21:27:49 -05:00
e8a2d32476 feat(api): add Woodpecker CI webhook notifications 2026-03-08 18:37:35 -05:00
3c0c10c9e5 Merge pull request 'chore(tasks): add MS24-API-002 Woodpecker webhook task' (#753) from chore/ms24-api-002-task into main 2026-03-08 23:30:12 +00:00
f59ce6a7a5 chore(tasks): add MS24-API-002 Woodpecker webhook task 2026-03-08 18:30:00 -05:00
11fa1734bd Merge pull request 'chore(tasks): mark MS24 tasks done' (#752) from chore/ms24-tasks-done into main 2026-03-08 23:25:43 +00:00
46815707a9 chore(tasks): mark MS24 tasks done 2026-03-08 18:25:28 -05:00
621df6ee70 chore(tasks): add MS24 queue integration tasks 2026-03-08 18:25:20 -05:00
ac406f19bc Merge pull request 'feat(web): MS24 queue notification feed component' (#751) from feat/ms24-queue-ui into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 23:21:23 +00:00
72d295edd6 feat(web): add queue notification feed
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 18:18:07 -05:00
6e9def3c5a Merge pull request 'feat(api): MS24 queue notifications module' (#750) from feat/ms24-queue-api into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 23:04:16 +00:00
8014930b70 Merge pull request 'fix(api): proxy mission-control routes to orchestrator' (#748) from fix/mission-control-proxy into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 22:51:08 +00:00
04bbdf3308 fix(api): proxy mission-control routes to orchestrator
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 14:04:12 -05:00
16 changed files with 1466 additions and 14 deletions

View File

@@ -271,6 +271,26 @@ steps:
depends_on: depends_on:
- docker-build-orchestrator - docker-build-orchestrator
notify-webhook:
image: curlimages/curl:8.6.0
environment:
MOSAIC_WEBHOOK_URL:
from_secret: mosaic_webhook_url
WOODPECKER_WEBHOOK_SECRET:
from_secret: woodpecker_webhook_secret
commands:
- |
BODY="{\"branch\":\"${CI_COMMIT_BRANCH}\",\"status\":\"${CI_PIPELINE_STATUS}\",\"buildUrl\":\"${CI_PIPELINE_LINK}\",\"repo\":\"${CI_REPO}\"}"
SIG=$(echo -n "$BODY" | openssl dgst -sha256 -hmac "$WOODPECKER_WEBHOOK_SECRET" | awk '{print $2}')
curl -s -o /dev/null -w "%{http_code}" -X POST "${MOSAIC_WEBHOOK_URL}/api/webhooks/woodpecker" \
-H "Content-Type: application/json" \
-H "X-Woodpecker-Signature: ${SIG}" \
-d "$BODY" || true
when:
- status: [success, failure]
depends_on:
- build
security-trivy-web: security-trivy-web:
image: aquasec/trivy:latest image: aquasec/trivy:latest
environment: environment:

View File

@@ -60,6 +60,7 @@ import { ContainerReaperModule } from "./container-reaper/container-reaper.modul
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module"; import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
import { OnboardingModule } from "./onboarding/onboarding.module"; import { OnboardingModule } from "./onboarding/onboarding.module";
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module"; import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
import { MissionControlProxyModule } from "./mission-control-proxy/mission-control-proxy.module";
import { OrchestratorModule } from "./orchestrator/orchestrator.module"; import { OrchestratorModule } from "./orchestrator/orchestrator.module";
import { QueueNotificationsModule } from "./queue-notifications/queue-notifications.module"; import { QueueNotificationsModule } from "./queue-notifications/queue-notifications.module";
@@ -143,6 +144,7 @@ import { QueueNotificationsModule } from "./queue-notifications/queue-notificati
FleetSettingsModule, FleetSettingsModule,
OnboardingModule, OnboardingModule,
ChatProxyModule, ChatProxyModule,
MissionControlProxyModule,
OrchestratorModule, OrchestratorModule,
QueueNotificationsModule, QueueNotificationsModule,
], ],

View File

@@ -0,0 +1,286 @@
import {
Body,
Controller,
Get,
Logger,
Param,
Post,
Req,
Res,
ServiceUnavailableException,
UseGuards,
} from "@nestjs/common";
import type { Request, Response } from "express";
import { AuthGuard } from "../auth/guards/auth.guard";
const ORCHESTRATOR_URL_KEY = "ORCHESTRATOR_URL";
const ORCHESTRATOR_API_KEY = "ORCHESTRATOR_API_KEY";
@Controller("mission-control")
@UseGuards(AuthGuard)
export class MissionControlProxyController {
private readonly logger = new Logger(MissionControlProxyController.name);
private readonly orchestratorUrl: string;
private readonly orchestratorApiKey: string;
constructor() {
this.orchestratorUrl = this.requireEnv(ORCHESTRATOR_URL_KEY);
this.orchestratorApiKey = this.requireEnv(ORCHESTRATOR_API_KEY);
}
@Get("sessions")
proxySessions(@Req() req: Request, @Res() res: Response): Promise<void> {
return this.proxyRequest("GET", "sessions", req, res);
}
@Get("sessions/:sessionId")
proxySession(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("GET", `sessions/${sessionId}`, req, res);
}
@Get("sessions/:sessionId/messages")
proxyMessages(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("GET", `sessions/${sessionId}/messages`, req, res);
}
@Get("audit-log")
proxyAuditLog(@Req() req: Request, @Res() res: Response): Promise<void> {
return this.proxyRequest("GET", "audit-log", req, res);
}
@Post("sessions/:sessionId/inject")
proxyInject(
@Param("sessionId") sessionId: string,
@Body() body: unknown,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/inject`, req, res, body);
}
@Post("sessions/:sessionId/pause")
proxyPause(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/pause`, req, res);
}
@Post("sessions/:sessionId/resume")
proxyResume(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/resume`, req, res);
}
@Post("sessions/:sessionId/kill")
proxyKill(
@Param("sessionId") sessionId: string,
@Body() body: unknown,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/kill`, req, res, body);
}
@Get("sessions/:sessionId/stream")
async proxySessionStream(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
const abortController = new AbortController();
req.once("close", () => {
abortController.abort();
});
try {
const upstream = await this.fetchUpstream(
"GET",
`sessions/${sessionId}/stream`,
req.query,
undefined,
abortController.signal
);
if (!upstream.ok || !upstream.body) {
await this.sendStandardResponse(upstream, res);
return;
}
res.status(upstream.status);
this.copyHeaderIfPresent(upstream, res, "content-type");
this.copyHeaderIfPresent(upstream, res, "cache-control");
this.copyHeaderIfPresent(upstream, res, "connection");
res.setHeader("X-Accel-Buffering", "no");
if (typeof res.flushHeaders === "function") {
res.flushHeaders();
}
for await (const chunk of upstream.body as unknown as AsyncIterable<Uint8Array>) {
if (res.writableEnded || res.destroyed) {
break;
}
res.write(Buffer.from(chunk));
}
if (!res.writableEnded && !res.destroyed) {
res.end();
}
} catch (error: unknown) {
if (this.isAbortError(error)) {
return;
}
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Mission Control stream proxy request failed: ${message}`);
if (!res.headersSent) {
res.status(503).json({ message: "Failed to proxy Mission Control request" });
} else if (!res.writableEnded && !res.destroyed) {
res.end();
}
}
}
private async proxyRequest(
method: "GET" | "POST",
path: string,
req: Request,
res: Response,
body?: unknown
): Promise<void> {
const abortController = new AbortController();
req.once("close", () => {
abortController.abort();
});
try {
const upstream = await this.fetchUpstream(
method,
path,
req.query,
body,
abortController.signal
);
await this.sendStandardResponse(upstream, res);
} catch (error: unknown) {
if (this.isAbortError(error)) {
return;
}
this.handleProxyError(error);
}
}
private async fetchUpstream(
method: "GET" | "POST",
path: string,
query: Request["query"],
body: unknown,
signal: AbortSignal
): Promise<globalThis.Response> {
const url = new URL(`/api/mission-control/${path}`, this.orchestratorUrl);
this.appendQueryParams(url.searchParams, query);
const headers: Record<string, string> = {
"X-API-Key": this.orchestratorApiKey,
};
const requestInit: RequestInit = {
method,
headers,
signal,
};
if (method === "POST" && body !== undefined) {
headers["Content-Type"] = "application/json";
requestInit.body = JSON.stringify(body);
}
return fetch(url.toString(), requestInit);
}
private appendQueryParams(searchParams: URLSearchParams, query: Request["query"]): void {
for (const [key, value] of Object.entries(query)) {
this.appendQueryValue(searchParams, key, value);
}
}
private appendQueryValue(searchParams: URLSearchParams, key: string, value: unknown): void {
if (value === undefined || value === null) {
return;
}
if (Array.isArray(value)) {
for (const item of value) {
this.appendQueryValue(searchParams, key, item);
}
return;
}
if (typeof value === "string" || typeof value === "number" || typeof value === "boolean") {
searchParams.append(key, String(value));
}
}
private async sendStandardResponse(upstream: globalThis.Response, res: Response): Promise<void> {
res.status(upstream.status);
this.copyHeaderIfPresent(upstream, res, "content-type");
this.copyHeaderIfPresent(upstream, res, "cache-control");
this.copyHeaderIfPresent(upstream, res, "location");
const responseText = await upstream.text();
if (responseText.length === 0) {
res.end();
return;
}
res.send(responseText);
}
private copyHeaderIfPresent(
upstream: globalThis.Response,
res: Response,
headerName: string
): void {
const value = upstream.headers.get(headerName);
if (value) {
res.setHeader(headerName, value);
}
}
private handleProxyError(error: unknown): never {
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Mission Control proxy request failed: ${message}`);
throw new ServiceUnavailableException("Failed to proxy Mission Control request");
}
private isAbortError(error: unknown): boolean {
return error instanceof Error && error.name === "AbortError";
}
private requireEnv(key: string): string {
const value = process.env[key];
if (typeof value !== "string" || value.trim().length === 0) {
throw new Error(`@mosaic/api: ${key} is required. Set it in your config or via ${key}.`);
}
return value;
}
}

View File

@@ -0,0 +1,9 @@
import { Module } from "@nestjs/common";
import { AuthModule } from "../auth/auth.module";
import { MissionControlProxyController } from "./mission-control-proxy.controller";
@Module({
imports: [AuthModule],
controllers: [MissionControlProxyController],
})
export class MissionControlProxyModule {}

View File

@@ -4,10 +4,11 @@ import { AuthModule } from "../auth/auth.module";
import { ApiKeyGuard } from "../common/guards/api-key.guard"; import { ApiKeyGuard } from "../common/guards/api-key.guard";
import { QueueNotificationsController } from "./queue-notifications.controller"; import { QueueNotificationsController } from "./queue-notifications.controller";
import { QueueNotificationsService } from "./queue-notifications.service"; import { QueueNotificationsService } from "./queue-notifications.service";
import { WoodpeckerWebhookController } from "./woodpecker-webhook.controller";
@Module({ @Module({
imports: [ConfigModule, AuthModule], imports: [ConfigModule, AuthModule],
controllers: [QueueNotificationsController], controllers: [QueueNotificationsController, WoodpeckerWebhookController],
providers: [QueueNotificationsService, ApiKeyGuard], providers: [QueueNotificationsService, ApiKeyGuard],
exports: [QueueNotificationsService], exports: [QueueNotificationsService],
}) })

View File

@@ -1,7 +1,7 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { ConfigService } from "@nestjs/config"; import { ConfigService } from "@nestjs/config";
import { Logger, NotFoundException } from "@nestjs/common"; import { Logger, NotFoundException } from "@nestjs/common";
import { mkdtemp, mkdir, rm, writeFile } from "node:fs/promises"; import { mkdtemp, mkdir, readFile, readdir, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { execFile } from "node:child_process"; import { execFile } from "node:child_process";
@@ -14,17 +14,23 @@ vi.mock("node:child_process", () => ({
describe("QueueNotificationsService", () => { describe("QueueNotificationsService", () => {
let service: QueueNotificationsService; let service: QueueNotificationsService;
let inboxDir: string; let inboxDir: string;
let agentStateDir: string;
let configService: ConfigService; let configService: ConfigService;
beforeEach(async () => { beforeEach(async () => {
vi.clearAllMocks(); vi.clearAllMocks();
inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-")); inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-"));
agentStateDir = await mkdtemp(join(tmpdir(), "agent-state-"));
configService = { configService = {
get: vi.fn((key: string) => { get: vi.fn((key: string) => {
if (key === "MOSAIC_QUEUE_INBOX_DIR") { if (key === "MOSAIC_QUEUE_INBOX_DIR") {
return inboxDir; return inboxDir;
} }
if (key === "MOSAIC_AGENT_STATE_DIR") {
return agentStateDir;
}
if (key === "MOSAIC_QUEUE_CLI") { if (key === "MOSAIC_QUEUE_CLI") {
return "/tmp/mosaic-queue-cli.js"; return "/tmp/mosaic-queue-cli.js";
} }
@@ -39,6 +45,7 @@ describe("QueueNotificationsService", () => {
afterEach(async () => { afterEach(async () => {
vi.restoreAllMocks(); vi.restoreAllMocks();
await rm(inboxDir, { recursive: true, force: true }); await rm(inboxDir, { recursive: true, force: true });
await rm(agentStateDir, { recursive: true, force: true });
}); });
describe("onModuleInit", () => { describe("onModuleInit", () => {
@@ -169,4 +176,93 @@ describe("QueueNotificationsService", () => {
); );
}); });
}); });
describe("notifyAgentCiResult", () => {
it("writes one notification per active agent-state record matching the branch", async () => {
await mkdir(join(agentStateDir, "active"), { recursive: true });
await writeFile(
join(agentStateDir, "active", "sage-landing-page.json"),
JSON.stringify({
taskId: "sage-landing-page",
status: "spawned",
startedAt: "2026-03-08T22:47:20Z",
lastUpdated: "2026-03-08T23:15:11.726Z",
agent: "sage",
branch: "feature/landing-page",
description: "Create apps/landing marketing site",
})
);
await writeFile(
join(agentStateDir, "active", "pixels-other.json"),
JSON.stringify({
taskId: "pixels-other",
status: "spawned",
startedAt: "2026-03-08T22:47:20Z",
lastUpdated: "2026-03-08T23:15:11.726Z",
agent: "pixels",
branch: "feature/something-else",
description: "Unrelated",
})
);
await expect(
service.notifyAgentCiResult({
branch: "feature/landing-page",
status: "success",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
})
).resolves.toEqual({ notified: 1 });
const inboxFiles = await readdir(join(inboxDir, "sage"));
expect(inboxFiles).toHaveLength(1);
const notification = JSON.parse(
await readFile(join(inboxDir, "sage", inboxFiles[0]!), "utf8")
) as Record<string, unknown>;
expect(notification).toMatchObject({
taskId: "sage-landing-page",
event: "completed",
targetAgent: "sage",
fromAgent: "mosaic-api",
retries: 0,
maxRetries: 3,
ttlSeconds: 600,
payload: {
branch: "feature/landing-page",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
ciStatus: "success",
},
});
expect(typeof notification.id).toBe("string");
expect(typeof notification.createdAt).toBe("string");
});
it("returns zero when no active agent-state branch matches the webhook payload", async () => {
await mkdir(join(agentStateDir, "active"), { recursive: true });
await writeFile(
join(agentStateDir, "active", "mosaic-task.json"),
JSON.stringify({
taskId: "mosaic-task",
status: "spawned",
startedAt: "2026-03-08T22:47:20Z",
lastUpdated: "2026-03-08T23:15:11.726Z",
agent: "mosaic",
branch: "feature/not-this-one",
description: "Unrelated",
})
);
await expect(
service.notifyAgentCiResult({
branch: "feature/landing-page",
status: "failure",
buildUrl: "https://ci.example/build/456",
repo: "mosaic/stack",
})
).resolves.toEqual({ notified: 0 });
});
});
}); });

View File

@@ -6,8 +6,9 @@ import {
OnModuleInit, OnModuleInit,
} from "@nestjs/common"; } from "@nestjs/common";
import { ConfigService } from "@nestjs/config"; import { ConfigService } from "@nestjs/config";
import { randomUUID } from "node:crypto";
import { execFile } from "node:child_process"; import { execFile } from "node:child_process";
import { access, readdir, readFile, stat } from "node:fs/promises"; import { access, mkdir, readdir, readFile, stat, writeFile } from "node:fs/promises";
import { homedir } from "node:os"; import { homedir } from "node:os";
import { basename, join } from "node:path"; import { basename, join } from "node:path";
import type { Response } from "express"; import type { Response } from "express";
@@ -29,6 +30,30 @@ export interface QueueTask {
description: string; description: string;
} }
interface AgentStateRecord {
taskId: string;
agent: string;
branch: string;
}
interface AgentCiNotification {
id: string;
taskId: string;
event: "completed" | "failed";
targetAgent: string;
fromAgent: string;
payload: {
branch: string;
buildUrl: string;
repo: string;
ciStatus: "success" | "failure";
};
createdAt: string;
retries: number;
maxRetries: number;
ttlSeconds: number;
}
@Injectable() @Injectable()
export class QueueNotificationsService implements OnModuleInit { export class QueueNotificationsService implements OnModuleInit {
private readonly logger = new Logger(QueueNotificationsService.name); private readonly logger = new Logger(QueueNotificationsService.name);
@@ -171,6 +196,52 @@ export class QueueNotificationsService implements OnModuleInit {
}); });
} }
async notifyAgentCiResult(opts: {
branch: string;
status: "success" | "failure";
buildUrl: string;
repo: string;
}): Promise<{ notified: number }> {
const activeRecords = await this.listActiveAgentStateRecords();
const matches = activeRecords.filter((record) => record.branch === opts.branch);
await Promise.all(
matches.map(async (record) => {
const notification: AgentCiNotification = {
id: randomUUID(),
taskId: record.taskId,
event: opts.status === "success" ? "completed" : "failed",
targetAgent: record.agent,
fromAgent: "mosaic-api",
payload: {
branch: opts.branch,
buildUrl: opts.buildUrl,
repo: opts.repo,
ciStatus: opts.status,
},
createdAt: new Date().toISOString(),
retries: 0,
maxRetries: 3,
ttlSeconds: 600,
};
const agentDir = join(this.getInboxDir(), record.agent);
// Path is built from the configured inbox root plus validated agent-state entries.
// eslint-disable-next-line security/detect-non-literal-fs-filename
await mkdir(agentDir, { recursive: true });
// Path is built from the configured inbox root plus validated agent-state entries.
// eslint-disable-next-line security/detect-non-literal-fs-filename
await writeFile(
join(agentDir, `${notification.id}.json`),
JSON.stringify(notification, null, 2),
"utf8"
);
})
);
return { notified: matches.length };
}
private async execQueueCli(args: string[]): Promise<string> { private async execQueueCli(args: string[]): Promise<string> {
const cliPath = this.getQueueCliPath(); const cliPath = this.getQueueCliPath();
@@ -198,6 +269,12 @@ export class QueueNotificationsService implements OnModuleInit {
); );
} }
private getAgentStateDir(): string {
return this.expandHomePath(
this.configService.get<string>("MOSAIC_AGENT_STATE_DIR") ?? "~/.openclaw/workspace/agents"
);
}
private getQueueCliPath(): string { private getQueueCliPath(): string {
return this.expandHomePath( return this.expandHomePath(
this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js" this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js"
@@ -228,4 +305,37 @@ export class QueueNotificationsService implements OnModuleInit {
private isIgnoredDirectory(name: string): boolean { private isIgnoredDirectory(name: string): boolean {
return name === "_acked" || name === "_dead-letter"; return name === "_acked" || name === "_dead-letter";
} }
private async listActiveAgentStateRecords(): Promise<AgentStateRecord[]> {
const activeDir = join(this.getAgentStateDir(), "active");
try {
// Path comes from controlled config and a fixed "active" subdirectory.
// eslint-disable-next-line security/detect-non-literal-fs-filename
const entries = await readdir(activeDir, { withFileTypes: true });
const records = await Promise.all(
entries
.filter((entry) => entry.isFile() && entry.name.endsWith(".json"))
.map(async (entry) => {
const filePath = join(activeDir, entry.name);
// Path comes from controlled config plus directory entries under the active root.
// eslint-disable-next-line security/detect-non-literal-fs-filename
const rawRecord = await readFile(filePath, "utf8");
return JSON.parse(rawRecord) as AgentStateRecord;
})
);
return records.filter((record) => {
return (
typeof record.taskId === "string" &&
typeof record.agent === "string" &&
typeof record.branch === "string"
);
});
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Unable to read active agent-state records from ${activeDir}: ${message}`);
return [];
}
}
} }

View File

@@ -0,0 +1,132 @@
import { createHmac } from "node:crypto";
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import type { Request } from "express";
import { describe, beforeEach, expect, it, vi } from "vitest";
import { QueueNotificationsService } from "./queue-notifications.service";
import {
type WoodpeckerWebhookPayload,
WoodpeckerWebhookController,
} from "./woodpecker-webhook.controller";
function signPayload(payload: WoodpeckerWebhookPayload, secret: string): string {
return createHmac("sha256", secret).update(JSON.stringify(payload)).digest("hex");
}
describe("WoodpeckerWebhookController", () => {
let controller: WoodpeckerWebhookController;
const mockService = {
notifyAgentCiResult: vi.fn(),
};
const mockConfigService = {
get: vi.fn(),
};
beforeEach(async () => {
vi.clearAllMocks();
mockConfigService.get.mockImplementation((key: string) => {
if (key === "WOODPECKER_WEBHOOK_SECRET") {
return "test-secret";
}
return undefined;
});
const module: TestingModule = await Test.createTestingModule({
controllers: [WoodpeckerWebhookController],
providers: [
{ provide: QueueNotificationsService, useValue: mockService },
{ provide: ConfigService, useValue: mockConfigService },
],
}).compile();
controller = module.get<WoodpeckerWebhookController>(WoodpeckerWebhookController);
});
it("accepts a valid signature and forwards the payload to the service", async () => {
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
};
const signature = signPayload(payload, "test-secret");
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 2 });
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
signature
)
).resolves.toEqual({ ok: true, notified: 2 });
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
});
it("returns ok without notifying when the signature is invalid", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "failure",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
};
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
"bad-signature"
)
).resolves.toEqual({ ok: true, notified: 0 });
expect(mockService.notifyAgentCiResult).not.toHaveBeenCalled();
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("invalid Woodpecker webhook signature")
);
});
it("accepts the payload when the webhook secret is missing", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
};
mockConfigService.get.mockReturnValue(undefined);
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 1 });
await expect(controller.handleWebhook({} as Request, payload, "")).resolves.toEqual({
ok: true,
notified: 1,
});
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("WOODPECKER_WEBHOOK_SECRET is not configured")
);
});
it("returns zero notifications when no active branch matches", async () => {
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/999",
repo: "mosaic/stack",
};
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 0 });
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
signPayload(payload, "test-secret")
)
).resolves.toEqual({ ok: true, notified: 0 });
});
});

View File

@@ -0,0 +1,73 @@
import { Body, Controller, Headers, Logger, Post, Req, type RawBodyRequest } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { createHmac, timingSafeEqual } from "node:crypto";
import type { Request } from "express";
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
import { QueueNotificationsService } from "./queue-notifications.service";
export interface WoodpeckerWebhookPayload {
branch: string;
status: "success" | "failure";
buildUrl: string;
repo: string;
}
@Controller("webhooks")
export class WoodpeckerWebhookController {
private readonly logger = new Logger(WoodpeckerWebhookController.name);
constructor(
private readonly queueService: QueueNotificationsService,
private readonly configService: ConfigService
) {}
@SkipCsrf()
@Post("woodpecker")
async handleWebhook(
@Req() req: RawBodyRequest<Request>,
@Body() body: WoodpeckerWebhookPayload,
@Headers("x-woodpecker-signature") signature: string
): Promise<{ ok: boolean; notified: number }> {
const secret = this.configService.get<string>("WOODPECKER_WEBHOOK_SECRET");
if (!secret) {
this.logger.warn("WOODPECKER_WEBHOOK_SECRET is not configured; accepting Woodpecker webhook");
const result = await this.queueService.notifyAgentCiResult(body);
return { ok: true, notified: result.notified };
}
if (!this.isValidSignature(this.getRequestBody(req, body), signature, secret)) {
this.logger.warn("Received invalid Woodpecker webhook signature");
return { ok: true, notified: 0 };
}
const result = await this.queueService.notifyAgentCiResult(body);
return { ok: true, notified: result.notified };
}
private getRequestBody(req: RawBodyRequest<Request>, body: WoodpeckerWebhookPayload): Buffer {
const rawBody = req.rawBody;
if (Buffer.isBuffer(rawBody)) {
return rawBody;
}
return Buffer.from(JSON.stringify(body));
}
private isValidSignature(body: Buffer, signature: string | undefined, secret: string): boolean {
if (!signature) {
return false;
}
const expected = createHmac("sha256", secret).update(body).digest("hex");
const signatureBuffer = Buffer.from(signature);
const expectedBuffer = Buffer.from(expected);
if (signatureBuffer.length !== expectedBuffer.length) {
return false;
}
return timingSafeEqual(signatureBuffer, expectedBuffer);
}
}

View File

@@ -0,0 +1,57 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export const dynamic = "force-dynamic";
export async function GET(): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const upstream = await fetch(`${getOrchestratorUrl()}/api/queue/notifications/stream`, {
method: "GET",
headers: {
"X-API-Key": orchestratorApiKey,
Accept: "text/event-stream",
},
cache: "no-store",
});
if (!upstream.ok || upstream.body === null) {
const message = await upstream.text();
return new NextResponse(message || "Failed to connect to queue notifications stream", {
status: upstream.status || 502,
headers: {
"Content-Type": upstream.headers.get("Content-Type") ?? "text/plain; charset=utf-8",
},
});
}
return new NextResponse(upstream.body, {
status: upstream.status,
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -1,6 +1,6 @@
"use client"; "use client";
import { useEffect, useState } from "react"; import { useEffect, useMemo, useState } from "react";
import Link from "next/link"; import Link from "next/link";
import { usePathname } from "next/navigation"; import { usePathname } from "next/navigation";
import Image from "next/image"; import Image from "next/image";
@@ -29,6 +29,10 @@ interface NavGroup {
items: NavItemConfig[]; items: NavItemConfig[];
} }
interface QueueNotification {
id: string;
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// SVG Icons (16x16 viewBox, stroke="currentColor", strokeWidth="1.5") // SVG Icons (16x16 viewBox, stroke="currentColor", strokeWidth="1.5")
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -685,6 +689,72 @@ function CollapseToggle({ collapsed, onToggle }: CollapseToggleProps): React.JSX
export function AppSidebar(): React.JSX.Element { export function AppSidebar(): React.JSX.Element {
const pathname = usePathname(); const pathname = usePathname();
const { collapsed, toggleCollapsed, mobileOpen, setMobileOpen, isMobile } = useSidebar(); const { collapsed, toggleCollapsed, mobileOpen, setMobileOpen, isMobile } = useSidebar();
const [missionControlBadgeCount, setMissionControlBadgeCount] = useState<number>(0);
useEffect(() => {
let isActive = true;
const loadNotificationCount = async (): Promise<void> => {
try {
const response = await fetch("/api/orchestrator/api/queue/notifications", {
method: "GET",
});
if (!response.ok) {
return;
}
const payload = (await response.json()) as QueueNotification[];
if (isActive) {
setMissionControlBadgeCount(Array.isArray(payload) ? payload.length : 0);
}
} catch {
// Ignore badge failures in the nav.
}
};
void loadNotificationCount();
if (typeof EventSource === "undefined") {
return (): void => {
isActive = false;
};
}
const source = new EventSource("/api/orchestrator/queue/notifications/stream");
source.onmessage = (event: MessageEvent<string>): void => {
try {
const payload = JSON.parse(event.data) as QueueNotification[];
if (isActive) {
setMissionControlBadgeCount(Array.isArray(payload) ? payload.length : 0);
}
} catch {
// Ignore malformed badge updates.
}
};
return (): void => {
isActive = false;
source.close();
};
}, []);
const navGroups = useMemo(
() =>
NAV_GROUPS.map((group) => ({
...group,
items: group.items.map((item) =>
item.href === "/mission-control" && missionControlBadgeCount > 0
? {
...item,
badge: { label: String(missionControlBadgeCount) },
}
: item
),
})),
[missionControlBadgeCount]
);
return ( return (
<> <>
@@ -722,7 +792,7 @@ export function AppSidebar(): React.JSX.Element {
}} }}
aria-label="Main navigation" aria-label="Main navigation"
> >
{NAV_GROUPS.map((group) => ( {navGroups.map((group) => (
<div key={group.label} style={{ marginBottom: "18px" }}> <div key={group.label} style={{ marginBottom: "18px" }}>
{/* Group label — hidden when collapsed */} {/* Group label — hidden when collapsed */}
{!collapsed && ( {!collapsed && (

View File

@@ -8,6 +8,7 @@ interface MockButtonProps extends ButtonHTMLAttributes<HTMLButtonElement> {
const mockGlobalAgentRoster = vi.fn(); const mockGlobalAgentRoster = vi.fn();
const mockMissionControlPanel = vi.fn(); const mockMissionControlPanel = vi.fn();
const mockQueueNotificationFeed = vi.fn();
vi.mock("@/components/mission-control/AuditLogDrawer", () => ({ vi.mock("@/components/mission-control/AuditLogDrawer", () => ({
AuditLogDrawer: ({ trigger }: { trigger: ReactNode }): React.JSX.Element => ( AuditLogDrawer: ({ trigger }: { trigger: ReactNode }): React.JSX.Element => (
@@ -31,6 +32,13 @@ vi.mock("@/components/mission-control/MissionControlPanel", () => ({
MIN_PANEL_COUNT: 1, MIN_PANEL_COUNT: 1,
})); }));
vi.mock("@/components/mission-control/QueueNotificationFeed", () => ({
QueueNotificationFeed: (props: unknown): React.JSX.Element => {
mockQueueNotificationFeed(props);
return <div data-testid="queue-notification-feed" />;
},
}));
vi.mock("@/components/ui/button", () => ({ vi.mock("@/components/ui/button", () => ({
Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => ( Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => (
<button {...props}>{children}</button> <button {...props}>{children}</button>
@@ -66,5 +74,6 @@ describe("MissionControlLayout", (): void => {
expect(region.querySelector("main")).toBeInTheDocument(); expect(region.querySelector("main")).toBeInTheDocument();
expect(screen.getByTestId("global-agent-roster")).toBeInTheDocument(); expect(screen.getByTestId("global-agent-roster")).toBeInTheDocument();
expect(screen.getByTestId("mission-control-panel")).toBeInTheDocument(); expect(screen.getByTestId("mission-control-panel")).toBeInTheDocument();
expect(screen.getByTestId("queue-notification-feed")).toBeInTheDocument();
}); });
}); });

View File

@@ -9,6 +9,7 @@ import {
MissionControlPanel, MissionControlPanel,
type PanelConfig, type PanelConfig,
} from "@/components/mission-control/MissionControlPanel"; } from "@/components/mission-control/MissionControlPanel";
import { QueueNotificationFeed } from "@/components/mission-control/QueueNotificationFeed";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
const INITIAL_PANELS: PanelConfig[] = [{}]; const INITIAL_PANELS: PanelConfig[] = [{}];
@@ -94,7 +95,7 @@ export function MissionControlLayout(): React.JSX.Element {
/> />
</header> </header>
<div className="grid min-h-0 flex-1 gap-4 xl:grid-cols-[280px_minmax(0,1fr)]"> <div className="grid min-h-0 flex-1 gap-4 xl:grid-cols-[280px_minmax(0,1fr)_320px]">
<aside className="h-full min-h-0"> <aside className="h-full min-h-0">
<GlobalAgentRoster <GlobalAgentRoster
onSelectSession={handleSelectSession} onSelectSession={handleSelectSession}
@@ -109,6 +110,9 @@ export function MissionControlLayout(): React.JSX.Element {
onExpandPanel={handleExpandPanel} onExpandPanel={handleExpandPanel}
/> />
</main> </main>
<aside className="h-full min-h-0">
<QueueNotificationFeed />
</aside>
</div> </div>
</section> </section>
); );

View File

@@ -0,0 +1,225 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { act, render, screen, waitFor } from "@testing-library/react";
import userEvent from "@testing-library/user-event";
import type { ButtonHTMLAttributes, HTMLAttributes, ReactNode } from "react";
vi.mock("date-fns", () => ({
formatDistanceToNow: (): string => "5 minutes ago",
}));
interface MockButtonProps extends ButtonHTMLAttributes<HTMLButtonElement> {
children: ReactNode;
}
interface MockContainerProps extends HTMLAttributes<HTMLElement> {
children: ReactNode;
}
interface MockEventSourceInstance {
url: string;
onerror: ((event: Event) => void) | null;
onmessage: ((event: MessageEvent<string>) => void) | null;
close: ReturnType<typeof vi.fn>;
emitMessage: (payload: unknown) => void;
}
interface QueueNotification {
id: string;
agent: string;
filename: string;
payload: unknown;
createdAt: string;
}
const mockApiGet = vi.fn<(endpoint: string) => Promise<QueueNotification[]>>();
const mockApiPost = vi.fn<(endpoint: string) => Promise<{ success: true; id: string }>>();
const mockShowToast = vi.fn<(message: string, variant?: string) => void>();
let mockEventSourceInstances: MockEventSourceInstance[] = [];
const MockEventSource = vi.fn(function (this: MockEventSourceInstance, url: string): void {
this.url = url;
this.onerror = null;
this.onmessage = null;
this.close = vi.fn();
this.emitMessage = (payload: unknown): void => {
this.onmessage?.(new MessageEvent("message", { data: JSON.stringify(payload) }));
};
mockEventSourceInstances.push(this);
});
vi.mock("@/lib/api/client", () => ({
apiGet: (endpoint: string): Promise<QueueNotification[]> => mockApiGet(endpoint),
apiPost: (endpoint: string): Promise<{ success: true; id: string }> => mockApiPost(endpoint),
}));
vi.mock("@mosaic/ui", () => ({
useToast: (): { showToast: typeof mockShowToast; removeToast: ReturnType<typeof vi.fn> } => ({
showToast: mockShowToast,
removeToast: vi.fn(),
}),
}));
vi.mock("@/components/ui/button", () => ({
Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => (
<button {...props}>{children}</button>
),
}));
vi.mock("@/components/ui/badge", () => ({
Badge: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<span {...props}>{children}</span>
),
}));
vi.mock("@/components/ui/card", () => ({
Card: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<section {...props}>{children}</section>
),
CardHeader: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<header {...props}>{children}</header>
),
CardContent: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<div {...props}>{children}</div>
),
CardTitle: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<h2 {...props}>{children}</h2>
),
}));
vi.mock("@/components/ui/collapsible", () => ({
Collapsible: ({ children }: MockContainerProps): React.JSX.Element => <div>{children}</div>,
}));
vi.mock("@/components/ui/scroll-area", () => ({
ScrollArea: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<div {...props}>{children}</div>
),
}));
vi.mock("@/components/ui/skeleton", () => ({
Skeleton: (props: HTMLAttributes<HTMLDivElement>): React.JSX.Element => <div {...props} />,
}));
import { QueueNotificationFeed } from "./QueueNotificationFeed";
function latestEventSource(): MockEventSourceInstance {
const instance = mockEventSourceInstances[mockEventSourceInstances.length - 1];
if (!instance) {
throw new Error("Expected an EventSource instance");
}
return instance;
}
function makeNotification(overrides: Partial<QueueNotification>): QueueNotification {
return {
id: "notif-1",
agent: "mosaic",
filename: "notif-1.json",
payload: {
taskId: "MS24-WEB-001",
eventType: "task.ready",
},
createdAt: "2026-03-08T23:00:00.000Z",
...overrides,
};
}
describe("QueueNotificationFeed", (): void => {
beforeEach((): void => {
vi.clearAllMocks();
vi.stubGlobal("EventSource", MockEventSource);
vi.stubGlobal("fetch", vi.fn());
mockEventSourceInstances = [];
mockApiGet.mockResolvedValue([]);
mockApiPost.mockResolvedValue({ success: true, id: "notif-1" });
});
afterEach((): void => {
vi.unstubAllGlobals();
});
it("loads and renders notifications grouped by agent", async (): Promise<void> => {
mockApiGet.mockResolvedValue([
makeNotification({ id: "notif-1", agent: "mosaic" }),
makeNotification({
id: "notif-2",
agent: "dyor",
payload: { taskId: "MS24-WEB-002", eventType: "task.blocked" },
}),
]);
render(<QueueNotificationFeed />);
await waitFor((): void => {
expect(mockApiGet).toHaveBeenCalledWith("/api/orchestrator/api/queue/notifications");
});
expect(screen.getByText("mosaic")).toBeInTheDocument();
expect(screen.getByText("dyor")).toBeInTheDocument();
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
expect(screen.getByText("task.ready")).toBeInTheDocument();
expect(screen.getAllByText("5 minutes ago")).toHaveLength(2);
expect(MockEventSource).toHaveBeenCalledWith("/api/orchestrator/queue/notifications/stream");
});
it("acknowledges a notification and removes it from the list", async (): Promise<void> => {
const user = userEvent.setup();
mockApiGet.mockResolvedValue([makeNotification({ id: "notif-ack" })]);
render(<QueueNotificationFeed />);
await waitFor((): void => {
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
});
await user.click(screen.getByRole("button", { name: "ACK notification MS24-WEB-001" }));
await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/queue/notifications/notif-ack/ack"
);
});
await waitFor((): void => {
expect(screen.getByText("No pending notifications")).toBeInTheDocument();
});
});
it("renders the empty state when there are no notifications", async (): Promise<void> => {
mockApiGet.mockResolvedValue([]);
render(<QueueNotificationFeed />);
await waitFor((): void => {
expect(screen.getByText("No pending notifications")).toBeInTheDocument();
});
});
it("refreshes the list when an SSE message arrives", async (): Promise<void> => {
mockApiGet.mockResolvedValue([makeNotification({ id: "notif-before" })]);
render(<QueueNotificationFeed />);
await waitFor((): void => {
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
});
act(() => {
latestEventSource().emitMessage([
makeNotification({
id: "notif-after",
agent: "sage",
payload: { taskId: "MS24-WEB-003", eventType: "task.failed" },
}),
]);
});
await waitFor((): void => {
expect(screen.getByText("sage")).toBeInTheDocument();
expect(screen.getByText("MS24-WEB-003")).toBeInTheDocument();
expect(screen.getByText("task.failed")).toBeInTheDocument();
});
});
});

View File

@@ -0,0 +1,332 @@
"use client";
import { useCallback, useEffect, useMemo, useState } from "react";
import { formatDistanceToNow } from "date-fns";
import { BellOff, ChevronLeft, ChevronRight, Loader2 } from "lucide-react";
import { useToast } from "@mosaic/ui";
import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Collapsible } from "@/components/ui/collapsible";
import { ScrollArea } from "@/components/ui/scroll-area";
import { Skeleton } from "@/components/ui/skeleton";
import { apiGet, apiPost } from "@/lib/api/client";
export interface QueueNotificationFeedProps {
className?: string;
}
interface QueueNotification {
id: string;
agent: string;
filename: string;
payload: unknown;
createdAt: string;
}
interface QueuePayloadRecord {
taskId?: unknown;
type?: unknown;
eventType?: unknown;
event?: unknown;
}
interface NotificationGroup {
agent: string;
notifications: QueueNotification[];
}
const NOTIFICATIONS_ENDPOINT = "/api/orchestrator/api/queue/notifications";
const NOTIFICATIONS_STREAM_ENDPOINT = "/api/orchestrator/queue/notifications/stream";
function joinClasses(...classes: (string | undefined)[]): string {
return classes.filter((value) => typeof value === "string" && value.length > 0).join(" ");
}
function asPayloadRecord(payload: unknown): QueuePayloadRecord | null {
if (payload === null || typeof payload !== "object" || Array.isArray(payload)) {
return null;
}
return payload as QueuePayloadRecord;
}
function getNotificationTaskId(notification: QueueNotification): string {
const payload = asPayloadRecord(notification.payload);
return typeof payload?.taskId === "string" && payload.taskId.trim().length > 0
? payload.taskId
: notification.id;
}
function getNotificationEventType(notification: QueueNotification): string {
const payload = asPayloadRecord(notification.payload);
const candidates = [payload?.eventType, payload?.type, payload?.event];
for (const candidate of candidates) {
if (typeof candidate === "string" && candidate.trim().length > 0) {
return candidate;
}
}
return "notification";
}
function formatNotificationAge(createdAt: string): string {
const parsedDate = new Date(createdAt);
if (Number.isNaN(parsedDate.getTime())) {
return "just now";
}
return formatDistanceToNow(parsedDate, { addSuffix: true });
}
function groupNotificationsByAgent(notifications: QueueNotification[]): NotificationGroup[] {
const grouped = new Map<string, QueueNotification[]>();
for (const notification of notifications) {
const current = grouped.get(notification.agent) ?? [];
current.push(notification);
grouped.set(notification.agent, current);
}
return Array.from(grouped.entries())
.sort(([leftAgent], [rightAgent]) => leftAgent.localeCompare(rightAgent))
.map(([agent, items]) => ({
agent,
notifications: [...items].sort(
(left, right) => new Date(right.createdAt).getTime() - new Date(left.createdAt).getTime()
),
}));
}
export function QueueNotificationFeed({
className,
}: QueueNotificationFeedProps): React.JSX.Element {
const { showToast } = useToast();
const [notifications, setNotifications] = useState<QueueNotification[]>([]);
const [isLoading, setIsLoading] = useState(true);
const [errorMessage, setErrorMessage] = useState<string | null>(null);
const [acknowledgingIds, setAcknowledgingIds] = useState<Record<string, boolean>>({});
const [isCollapsed, setIsCollapsed] = useState(false);
const [now, setNow] = useState(Date.now());
const refreshNotifications = useCallback(async (): Promise<void> => {
try {
const payload = await apiGet<QueueNotification[]>(NOTIFICATIONS_ENDPOINT);
setNotifications(payload);
setErrorMessage(null);
} catch (error) {
const message =
error instanceof Error && error.message.trim().length > 0
? error.message
: "Failed to load queue notifications.";
setErrorMessage(message);
} finally {
setIsLoading(false);
}
}, []);
useEffect(() => {
void refreshNotifications();
}, [refreshNotifications]);
useEffect(() => {
if (typeof EventSource === "undefined") {
return undefined;
}
const source = new EventSource(NOTIFICATIONS_STREAM_ENDPOINT);
source.onmessage = (event: MessageEvent<string>): void => {
try {
const payload = JSON.parse(event.data) as QueueNotification[];
setNotifications(payload);
setErrorMessage(null);
setIsLoading(false);
} catch {
setErrorMessage("Received an invalid notification stream payload.");
}
};
source.onerror = (): void => {
setErrorMessage((current) => current ?? "Queue notification stream disconnected.");
};
return (): void => {
source.close();
};
}, []);
useEffect(() => {
const intervalId = window.setInterval(() => {
setNow(Date.now());
}, 60_000);
return (): void => {
window.clearInterval(intervalId);
};
}, []);
const groupedNotifications = useMemo(
() => groupNotificationsByAgent(notifications),
[notifications]
);
const pendingCount = notifications.length;
const handleAck = useCallback(
async (notificationId: string): Promise<void> => {
setAcknowledgingIds((current) => ({
...current,
[notificationId]: true,
}));
try {
await apiPost<{ success: true; id: string }>(
`/api/orchestrator/api/queue/notifications/${encodeURIComponent(notificationId)}/ack`
);
setNotifications((current) => current.filter((item) => item.id !== notificationId));
} catch (error) {
const message =
error instanceof Error && error.message.trim().length > 0
? error.message
: "Failed to acknowledge notification.";
showToast(message, "error");
} finally {
setAcknowledgingIds((current) => {
const { [notificationId]: _omitted, ...remaining } = current;
return remaining;
});
}
},
[showToast]
);
return (
<Card className={joinClasses("flex h-full min-h-0 flex-col", className)}>
<CardHeader className="pb-2">
<div className="flex items-start justify-between gap-2">
<div className="flex min-w-0 items-center gap-2">
<CardTitle className="text-base">Queue Notifications</CardTitle>
<Badge variant={pendingCount > 0 ? "status-info" : "secondary"}>{pendingCount}</Badge>
</div>
<Button
type="button"
variant="ghost"
size="icon"
className="h-7 w-7"
onClick={() => {
setIsCollapsed((current) => !current);
}}
aria-label={isCollapsed ? "Expand queue notifications" : "Collapse queue notifications"}
title={isCollapsed ? "Expand queue notifications" : "Collapse queue notifications"}
>
{isCollapsed ? (
<ChevronLeft className="h-4 w-4" aria-hidden="true" />
) : (
<ChevronRight className="h-4 w-4" aria-hidden="true" />
)}
</Button>
</div>
</CardHeader>
<Collapsible open={!isCollapsed} className="min-h-0 flex-1">
{!isCollapsed ? (
<CardContent className="min-h-0 flex-1 px-3 pb-3">
{isLoading ? (
<ScrollArea className="h-full">
<div className="space-y-2 pr-1">
{Array.from({ length: 6 }).map((_, index) => (
<Skeleton
key={`queue-notification-skeleton-${String(index)}`}
className="h-14 w-full"
/>
))}
</div>
</ScrollArea>
) : errorMessage && notifications.length === 0 ? (
<div className="flex h-full items-center justify-center px-4 text-center text-sm text-red-500">
{errorMessage}
</div>
) : groupedNotifications.length === 0 ? (
<div className="flex h-full flex-col items-center justify-center gap-2 text-center text-sm text-muted-foreground">
<BellOff className="h-5 w-5" aria-hidden="true" />
<span>No pending notifications</span>
</div>
) : (
<ScrollArea className="h-full">
<div className="space-y-4 pr-1">
{groupedNotifications.map((group) => (
<section key={group.agent} className="space-y-2">
<div className="flex items-center justify-between gap-2">
<h3 className="text-sm font-semibold text-foreground">{group.agent}</h3>
<span className="text-xs text-muted-foreground">
{group.notifications.length}
</span>
</div>
<div className="space-y-2">
{group.notifications.map((notification) => {
const taskId = getNotificationTaskId(notification);
const eventType = getNotificationEventType(notification);
const isAcknowledging = acknowledgingIds[notification.id] ?? false;
return (
<article
key={notification.id}
className="rounded-lg border border-border/70 bg-card px-3 py-2"
>
<div className="flex items-start justify-between gap-3">
<div className="min-w-0 space-y-1">
<div className="flex flex-wrap items-center gap-2">
<span className="font-mono text-xs text-foreground">
{taskId}
</span>
<Badge variant="secondary">{eventType}</Badge>
</div>
<time
className="block text-xs text-muted-foreground"
dateTime={notification.createdAt}
>
{formatNotificationAge(notification.createdAt)}
</time>
</div>
<Button
type="button"
variant="outline"
size="sm"
disabled={isAcknowledging}
onClick={() => {
void handleAck(notification.id);
}}
aria-label={`ACK notification ${taskId}`}
>
{isAcknowledging ? (
<span className="flex items-center gap-2">
<Loader2
className="h-4 w-4 animate-spin"
aria-hidden="true"
/>
ACK
</span>
) : (
"ACK"
)}
</Button>
</div>
</article>
);
})}
</div>
</section>
))}
</div>
</ScrollArea>
)}
</CardContent>
) : null}
</Collapsible>
<span className="sr-only" aria-live="polite">
{pendingCount} pending notifications, refreshed at {new Date(now).toISOString()}
</span>
</Card>
);
}

View File

@@ -120,7 +120,7 @@ Target version: `v0.0.23`
### Phase 0 — Backend Core (Foundation) ### Phase 0 — Backend Core (Foundation)
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | | id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
| ----------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- | | ----------- | ------ | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- |
| MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD | | MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD |
| MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | | | MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | | | MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
@@ -183,3 +183,29 @@ Target version: `v0.0.23`
| **Total** | **29** | **~478K** | | **Total** | **29** | **~478K** |
Recommended dispatch: Codex for Phase 2 UI + routine API tasks; Sonnet for complex streaming logic (P0-003, P1-005, P3-002). Recommended dispatch: Codex for Phase 2 UI + routine API tasks; Sonnet for complex streaming logic (P0-003, P1-005, P3-002).
---
## MS24 — Queue Integration in Mission Control
PRD: Issue #749
Milestone: `0.0.24`
Target version: `v0.0.24`
> Single-writer: orchestrator (Jarvis/OpenClaw) only. Workers read but never modify.
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
| ------------ | ------ | --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----- | ----- | -------------------- | ------------ | ------------ | ----- | ---------- | ------------ | -------- | ---- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- |
| MS24-API-001 | done | p0-api | QueueNotificationsModule: GET /notifications, GET /notifications/stream (SSE), POST /notifications/:id/ack, GET /tasks. Auth: ApiKeyGuard. Unit tests included. | #749 | api | feat/ms24-queue-api | — | MS24-WEB-001 | — | — | — | 25K | — | Guard: ApiKeyGuard from src/common/guards/api-key.guard.ts (COORDINATOR_API_KEY). SSE via raw express Response + chokidar. Fail-soft if inbox dir missing. |
| MS24-API-002 | done | p0-api | Woodpecker CI webhook → agent notification: POST /api/webhooks/woodpecker, HMAC-SHA256 verify, scan agent-state active dir, fire mosaic-queue notification to matched agent | #749 | api | feat/ms24-ci-webhook | MS24-API-001 | MS24-VER-001 | codex | 2026-03-08 | — | 15K | — | event: completed/failed. @SkipCsrf, no auth guard. Also adds notify-webhook step to .woodpecker/ci.yml |
| MS24-WEB-001 | done | p0-ui | QueueNotificationFeed component + MissionControlLayout wiring (right sidebar panel, badge count) | #749 | web | feat/ms24-queue-ui | MS24-API-001 | MS24-VER-001 | — | — | — | 20K | — | SSE to /api/queue/notifications/stream. ACK button. Collapsible panel in MissionControlLayout.tsx. |
| MS24-VER-001 | done | p0-verify | CI green, no regressions, deploy to prod, tag v0.0.24 | #749 | stack | — | MS24-WEB-001 | — | — | — | — | 5K | — | |
### MS24 Budget Summary
| Phase | Tasks | Estimate |
| --------- | ----- | -------- |
| API | 1 | ~25K |
| UI | 1 | ~20K |
| Verify | 1 | ~5K |
| **Total** | **3** | **~50K** |