Compare commits

...

8 Commits

7 changed files with 441 additions and 9 deletions

View File

@@ -271,6 +271,26 @@ steps:
depends_on: depends_on:
- docker-build-orchestrator - docker-build-orchestrator
notify-webhook:
image: curlimages/curl:8.6.0
environment:
MOSAIC_WEBHOOK_URL:
from_secret: mosaic_webhook_url
WOODPECKER_WEBHOOK_SECRET:
from_secret: woodpecker_webhook_secret
commands:
- |
BODY="{\"branch\":\"${CI_COMMIT_BRANCH}\",\"status\":\"${CI_PIPELINE_STATUS}\",\"buildUrl\":\"${CI_PIPELINE_LINK}\",\"repo\":\"${CI_REPO}\"}"
SIG=$(echo -n "$BODY" | openssl dgst -sha256 -hmac "$WOODPECKER_WEBHOOK_SECRET" | awk '{print $2}')
curl -s -o /dev/null -w "%{http_code}" -X POST "${MOSAIC_WEBHOOK_URL}/api/webhooks/woodpecker" \
-H "Content-Type: application/json" \
-H "X-Woodpecker-Signature: ${SIG}" \
-d "$BODY" || true
when:
- status: [success, failure]
depends_on:
- build
security-trivy-web: security-trivy-web:
image: aquasec/trivy:latest image: aquasec/trivy:latest
environment: environment:

View File

@@ -4,10 +4,11 @@ import { AuthModule } from "../auth/auth.module";
import { ApiKeyGuard } from "../common/guards/api-key.guard"; import { ApiKeyGuard } from "../common/guards/api-key.guard";
import { QueueNotificationsController } from "./queue-notifications.controller"; import { QueueNotificationsController } from "./queue-notifications.controller";
import { QueueNotificationsService } from "./queue-notifications.service"; import { QueueNotificationsService } from "./queue-notifications.service";
import { WoodpeckerWebhookController } from "./woodpecker-webhook.controller";
@Module({ @Module({
imports: [ConfigModule, AuthModule], imports: [ConfigModule, AuthModule],
controllers: [QueueNotificationsController], controllers: [QueueNotificationsController, WoodpeckerWebhookController],
providers: [QueueNotificationsService, ApiKeyGuard], providers: [QueueNotificationsService, ApiKeyGuard],
exports: [QueueNotificationsService], exports: [QueueNotificationsService],
}) })

View File

@@ -1,7 +1,7 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { ConfigService } from "@nestjs/config"; import { ConfigService } from "@nestjs/config";
import { Logger, NotFoundException } from "@nestjs/common"; import { Logger, NotFoundException } from "@nestjs/common";
import { mkdtemp, mkdir, rm, writeFile } from "node:fs/promises"; import { mkdtemp, mkdir, readFile, readdir, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import { execFile } from "node:child_process"; import { execFile } from "node:child_process";
@@ -14,17 +14,23 @@ vi.mock("node:child_process", () => ({
describe("QueueNotificationsService", () => { describe("QueueNotificationsService", () => {
let service: QueueNotificationsService; let service: QueueNotificationsService;
let inboxDir: string; let inboxDir: string;
let agentStateDir: string;
let configService: ConfigService; let configService: ConfigService;
beforeEach(async () => { beforeEach(async () => {
vi.clearAllMocks(); vi.clearAllMocks();
inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-")); inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-"));
agentStateDir = await mkdtemp(join(tmpdir(), "agent-state-"));
configService = { configService = {
get: vi.fn((key: string) => { get: vi.fn((key: string) => {
if (key === "MOSAIC_QUEUE_INBOX_DIR") { if (key === "MOSAIC_QUEUE_INBOX_DIR") {
return inboxDir; return inboxDir;
} }
if (key === "MOSAIC_AGENT_STATE_DIR") {
return agentStateDir;
}
if (key === "MOSAIC_QUEUE_CLI") { if (key === "MOSAIC_QUEUE_CLI") {
return "/tmp/mosaic-queue-cli.js"; return "/tmp/mosaic-queue-cli.js";
} }
@@ -39,6 +45,7 @@ describe("QueueNotificationsService", () => {
afterEach(async () => { afterEach(async () => {
vi.restoreAllMocks(); vi.restoreAllMocks();
await rm(inboxDir, { recursive: true, force: true }); await rm(inboxDir, { recursive: true, force: true });
await rm(agentStateDir, { recursive: true, force: true });
}); });
describe("onModuleInit", () => { describe("onModuleInit", () => {
@@ -169,4 +176,93 @@ describe("QueueNotificationsService", () => {
); );
}); });
}); });
describe("notifyAgentCiResult", () => {
it("writes one notification per active agent-state record matching the branch", async () => {
await mkdir(join(agentStateDir, "active"), { recursive: true });
await writeFile(
join(agentStateDir, "active", "sage-landing-page.json"),
JSON.stringify({
taskId: "sage-landing-page",
status: "spawned",
startedAt: "2026-03-08T22:47:20Z",
lastUpdated: "2026-03-08T23:15:11.726Z",
agent: "sage",
branch: "feature/landing-page",
description: "Create apps/landing marketing site",
})
);
await writeFile(
join(agentStateDir, "active", "pixels-other.json"),
JSON.stringify({
taskId: "pixels-other",
status: "spawned",
startedAt: "2026-03-08T22:47:20Z",
lastUpdated: "2026-03-08T23:15:11.726Z",
agent: "pixels",
branch: "feature/something-else",
description: "Unrelated",
})
);
await expect(
service.notifyAgentCiResult({
branch: "feature/landing-page",
status: "success",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
})
).resolves.toEqual({ notified: 1 });
const inboxFiles = await readdir(join(inboxDir, "sage"));
expect(inboxFiles).toHaveLength(1);
const notification = JSON.parse(
await readFile(join(inboxDir, "sage", inboxFiles[0]!), "utf8")
) as Record<string, unknown>;
expect(notification).toMatchObject({
taskId: "sage-landing-page",
event: "completed",
targetAgent: "sage",
fromAgent: "mosaic-api",
retries: 0,
maxRetries: 3,
ttlSeconds: 600,
payload: {
branch: "feature/landing-page",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
ciStatus: "success",
},
});
expect(typeof notification.id).toBe("string");
expect(typeof notification.createdAt).toBe("string");
});
it("returns zero when no active agent-state branch matches the webhook payload", async () => {
await mkdir(join(agentStateDir, "active"), { recursive: true });
await writeFile(
join(agentStateDir, "active", "mosaic-task.json"),
JSON.stringify({
taskId: "mosaic-task",
status: "spawned",
startedAt: "2026-03-08T22:47:20Z",
lastUpdated: "2026-03-08T23:15:11.726Z",
agent: "mosaic",
branch: "feature/not-this-one",
description: "Unrelated",
})
);
await expect(
service.notifyAgentCiResult({
branch: "feature/landing-page",
status: "failure",
buildUrl: "https://ci.example/build/456",
repo: "mosaic/stack",
})
).resolves.toEqual({ notified: 0 });
});
});
}); });

View File

@@ -6,8 +6,9 @@ import {
OnModuleInit, OnModuleInit,
} from "@nestjs/common"; } from "@nestjs/common";
import { ConfigService } from "@nestjs/config"; import { ConfigService } from "@nestjs/config";
import { randomUUID } from "node:crypto";
import { execFile } from "node:child_process"; import { execFile } from "node:child_process";
import { access, readdir, readFile, stat } from "node:fs/promises"; import { access, mkdir, readdir, readFile, stat, writeFile } from "node:fs/promises";
import { homedir } from "node:os"; import { homedir } from "node:os";
import { basename, join } from "node:path"; import { basename, join } from "node:path";
import type { Response } from "express"; import type { Response } from "express";
@@ -29,6 +30,30 @@ export interface QueueTask {
description: string; description: string;
} }
interface AgentStateRecord {
taskId: string;
agent: string;
branch: string;
}
interface AgentCiNotification {
id: string;
taskId: string;
event: "completed" | "failed";
targetAgent: string;
fromAgent: string;
payload: {
branch: string;
buildUrl: string;
repo: string;
ciStatus: "success" | "failure";
};
createdAt: string;
retries: number;
maxRetries: number;
ttlSeconds: number;
}
@Injectable() @Injectable()
export class QueueNotificationsService implements OnModuleInit { export class QueueNotificationsService implements OnModuleInit {
private readonly logger = new Logger(QueueNotificationsService.name); private readonly logger = new Logger(QueueNotificationsService.name);
@@ -171,6 +196,52 @@ export class QueueNotificationsService implements OnModuleInit {
}); });
} }
async notifyAgentCiResult(opts: {
branch: string;
status: "success" | "failure";
buildUrl: string;
repo: string;
}): Promise<{ notified: number }> {
const activeRecords = await this.listActiveAgentStateRecords();
const matches = activeRecords.filter((record) => record.branch === opts.branch);
await Promise.all(
matches.map(async (record) => {
const notification: AgentCiNotification = {
id: randomUUID(),
taskId: record.taskId,
event: opts.status === "success" ? "completed" : "failed",
targetAgent: record.agent,
fromAgent: "mosaic-api",
payload: {
branch: opts.branch,
buildUrl: opts.buildUrl,
repo: opts.repo,
ciStatus: opts.status,
},
createdAt: new Date().toISOString(),
retries: 0,
maxRetries: 3,
ttlSeconds: 600,
};
const agentDir = join(this.getInboxDir(), record.agent);
// Path is built from the configured inbox root plus validated agent-state entries.
// eslint-disable-next-line security/detect-non-literal-fs-filename
await mkdir(agentDir, { recursive: true });
// Path is built from the configured inbox root plus validated agent-state entries.
// eslint-disable-next-line security/detect-non-literal-fs-filename
await writeFile(
join(agentDir, `${notification.id}.json`),
JSON.stringify(notification, null, 2),
"utf8"
);
})
);
return { notified: matches.length };
}
private async execQueueCli(args: string[]): Promise<string> { private async execQueueCli(args: string[]): Promise<string> {
const cliPath = this.getQueueCliPath(); const cliPath = this.getQueueCliPath();
@@ -198,6 +269,12 @@ export class QueueNotificationsService implements OnModuleInit {
); );
} }
private getAgentStateDir(): string {
return this.expandHomePath(
this.configService.get<string>("MOSAIC_AGENT_STATE_DIR") ?? "~/.openclaw/workspace/agents"
);
}
private getQueueCliPath(): string { private getQueueCliPath(): string {
return this.expandHomePath( return this.expandHomePath(
this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js" this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js"
@@ -228,4 +305,37 @@ export class QueueNotificationsService implements OnModuleInit {
private isIgnoredDirectory(name: string): boolean { private isIgnoredDirectory(name: string): boolean {
return name === "_acked" || name === "_dead-letter"; return name === "_acked" || name === "_dead-letter";
} }
private async listActiveAgentStateRecords(): Promise<AgentStateRecord[]> {
const activeDir = join(this.getAgentStateDir(), "active");
try {
// Path comes from controlled config and a fixed "active" subdirectory.
// eslint-disable-next-line security/detect-non-literal-fs-filename
const entries = await readdir(activeDir, { withFileTypes: true });
const records = await Promise.all(
entries
.filter((entry) => entry.isFile() && entry.name.endsWith(".json"))
.map(async (entry) => {
const filePath = join(activeDir, entry.name);
// Path comes from controlled config plus directory entries under the active root.
// eslint-disable-next-line security/detect-non-literal-fs-filename
const rawRecord = await readFile(filePath, "utf8");
return JSON.parse(rawRecord) as AgentStateRecord;
})
);
return records.filter((record) => {
return (
typeof record.taskId === "string" &&
typeof record.agent === "string" &&
typeof record.branch === "string"
);
});
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Unable to read active agent-state records from ${activeDir}: ${message}`);
return [];
}
}
} }

View File

@@ -0,0 +1,132 @@
import { createHmac } from "node:crypto";
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import type { Request } from "express";
import { describe, beforeEach, expect, it, vi } from "vitest";
import { QueueNotificationsService } from "./queue-notifications.service";
import {
type WoodpeckerWebhookPayload,
WoodpeckerWebhookController,
} from "./woodpecker-webhook.controller";
function signPayload(payload: WoodpeckerWebhookPayload, secret: string): string {
return createHmac("sha256", secret).update(JSON.stringify(payload)).digest("hex");
}
describe("WoodpeckerWebhookController", () => {
let controller: WoodpeckerWebhookController;
const mockService = {
notifyAgentCiResult: vi.fn(),
};
const mockConfigService = {
get: vi.fn(),
};
beforeEach(async () => {
vi.clearAllMocks();
mockConfigService.get.mockImplementation((key: string) => {
if (key === "WOODPECKER_WEBHOOK_SECRET") {
return "test-secret";
}
return undefined;
});
const module: TestingModule = await Test.createTestingModule({
controllers: [WoodpeckerWebhookController],
providers: [
{ provide: QueueNotificationsService, useValue: mockService },
{ provide: ConfigService, useValue: mockConfigService },
],
}).compile();
controller = module.get<WoodpeckerWebhookController>(WoodpeckerWebhookController);
});
it("accepts a valid signature and forwards the payload to the service", async () => {
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
};
const signature = signPayload(payload, "test-secret");
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 2 });
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
signature
)
).resolves.toEqual({ ok: true, notified: 2 });
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
});
it("returns ok without notifying when the signature is invalid", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "failure",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
};
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
"bad-signature"
)
).resolves.toEqual({ ok: true, notified: 0 });
expect(mockService.notifyAgentCiResult).not.toHaveBeenCalled();
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("invalid Woodpecker webhook signature")
);
});
it("accepts the payload when the webhook secret is missing", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
};
mockConfigService.get.mockReturnValue(undefined);
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 1 });
await expect(controller.handleWebhook({} as Request, payload, "")).resolves.toEqual({
ok: true,
notified: 1,
});
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("WOODPECKER_WEBHOOK_SECRET is not configured")
);
});
it("returns zero notifications when no active branch matches", async () => {
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/999",
repo: "mosaic/stack",
};
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 0 });
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
signPayload(payload, "test-secret")
)
).resolves.toEqual({ ok: true, notified: 0 });
});
});

View File

@@ -0,0 +1,73 @@
import { Body, Controller, Headers, Logger, Post, Req, type RawBodyRequest } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { createHmac, timingSafeEqual } from "node:crypto";
import type { Request } from "express";
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
import { QueueNotificationsService } from "./queue-notifications.service";
export interface WoodpeckerWebhookPayload {
branch: string;
status: "success" | "failure";
buildUrl: string;
repo: string;
}
@Controller("webhooks")
export class WoodpeckerWebhookController {
private readonly logger = new Logger(WoodpeckerWebhookController.name);
constructor(
private readonly queueService: QueueNotificationsService,
private readonly configService: ConfigService
) {}
@SkipCsrf()
@Post("woodpecker")
async handleWebhook(
@Req() req: RawBodyRequest<Request>,
@Body() body: WoodpeckerWebhookPayload,
@Headers("x-woodpecker-signature") signature: string
): Promise<{ ok: boolean; notified: number }> {
const secret = this.configService.get<string>("WOODPECKER_WEBHOOK_SECRET");
if (!secret) {
this.logger.warn("WOODPECKER_WEBHOOK_SECRET is not configured; accepting Woodpecker webhook");
const result = await this.queueService.notifyAgentCiResult(body);
return { ok: true, notified: result.notified };
}
if (!this.isValidSignature(this.getRequestBody(req, body), signature, secret)) {
this.logger.warn("Received invalid Woodpecker webhook signature");
return { ok: true, notified: 0 };
}
const result = await this.queueService.notifyAgentCiResult(body);
return { ok: true, notified: result.notified };
}
private getRequestBody(req: RawBodyRequest<Request>, body: WoodpeckerWebhookPayload): Buffer {
const rawBody = req.rawBody;
if (Buffer.isBuffer(rawBody)) {
return rawBody;
}
return Buffer.from(JSON.stringify(body));
}
private isValidSignature(body: Buffer, signature: string | undefined, secret: string): boolean {
if (!signature) {
return false;
}
const expected = createHmac("sha256", secret).update(body).digest("hex");
const signatureBuffer = Buffer.from(signature);
const expectedBuffer = Buffer.from(expected);
if (signatureBuffer.length !== expectedBuffer.length) {
return false;
}
return timingSafeEqual(signatureBuffer, expectedBuffer);
}
}

View File

@@ -194,12 +194,12 @@ Target version: `v0.0.24`
> Single-writer: orchestrator (Jarvis/OpenClaw) only. Workers read but never modify. > Single-writer: orchestrator (Jarvis/OpenClaw) only. Workers read but never modify.
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | | id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
| ------------ | ----------- | --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----- | ----- | -------------------- | ------------ | ------------ | ----- | ---------- | ------------ | -------- | ---- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | | ------------ | ------ | --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----- | ----- | -------------------- | ------------ | ------------ | ----- | ---------- | ------------ | -------- | ---- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- |
| MS24-API-001 | done | p0-api | QueueNotificationsModule: GET /notifications, GET /notifications/stream (SSE), POST /notifications/:id/ack, GET /tasks. Auth: ApiKeyGuard. Unit tests included. | #749 | api | feat/ms24-queue-api | — | MS24-WEB-001 | — | — | — | 25K | — | Guard: ApiKeyGuard from src/common/guards/api-key.guard.ts (COORDINATOR_API_KEY). SSE via raw express Response + chokidar. Fail-soft if inbox dir missing. | | MS24-API-001 | done | p0-api | QueueNotificationsModule: GET /notifications, GET /notifications/stream (SSE), POST /notifications/:id/ack, GET /tasks. Auth: ApiKeyGuard. Unit tests included. | #749 | api | feat/ms24-queue-api | — | MS24-WEB-001 | — | — | — | 25K | — | Guard: ApiKeyGuard from src/common/guards/api-key.guard.ts (COORDINATOR_API_KEY). SSE via raw express Response + chokidar. Fail-soft if inbox dir missing. |
| MS24-API-002 | in-progress | p0-api | Woodpecker CI webhook → agent notification: POST /api/webhooks/woodpecker, HMAC-SHA256 verify, scan agent-state active dir, fire mosaic-queue notification to matched agent | #749 | api | feat/ms24-ci-webhook | MS24-API-001 | MS24-VER-001 | codex | 2026-03-08 | — | 15K | — | event: completed/failed. @SkipCsrf, no auth guard. Also adds notify-webhook step to .woodpecker/ci.yml | | MS24-API-002 | done | p0-api | Woodpecker CI webhook → agent notification: POST /api/webhooks/woodpecker, HMAC-SHA256 verify, scan agent-state active dir, fire mosaic-queue notification to matched agent | #749 | api | feat/ms24-ci-webhook | MS24-API-001 | MS24-VER-001 | codex | 2026-03-08 | — | 15K | — | event: completed/failed. @SkipCsrf, no auth guard. Also adds notify-webhook step to .woodpecker/ci.yml |
| MS24-WEB-001 | done | p0-ui | QueueNotificationFeed component + MissionControlLayout wiring (right sidebar panel, badge count) | #749 | web | feat/ms24-queue-ui | MS24-API-001 | MS24-VER-001 | — | — | — | 20K | — | SSE to /api/queue/notifications/stream. ACK button. Collapsible panel in MissionControlLayout.tsx. | | MS24-WEB-001 | done | p0-ui | QueueNotificationFeed component + MissionControlLayout wiring (right sidebar panel, badge count) | #749 | web | feat/ms24-queue-ui | MS24-API-001 | MS24-VER-001 | — | — | — | 20K | — | SSE to /api/queue/notifications/stream. ACK button. Collapsible panel in MissionControlLayout.tsx. |
| MS24-VER-001 | done | p0-verify | CI green, no regressions, deploy to prod, tag v0.0.24 | #749 | stack | — | MS24-WEB-001 | — | — | — | — | 5K | — | | | MS24-VER-001 | done | p0-verify | CI green, no regressions, deploy to prod, tag v0.0.24 | #749 | stack | — | MS24-WEB-001 | — | — | — | — | 5K | — | |
### MS24 Budget Summary ### MS24 Budget Summary