diff --git a/.woodpecker/ci.yml b/.woodpecker/ci.yml index 2ffb48a..77cf6b9 100644 --- a/.woodpecker/ci.yml +++ b/.woodpecker/ci.yml @@ -271,6 +271,26 @@ steps: depends_on: - docker-build-orchestrator + notify-webhook: + image: curlimages/curl:8.6.0 + environment: + MOSAIC_WEBHOOK_URL: + from_secret: mosaic_webhook_url + WOODPECKER_WEBHOOK_SECRET: + from_secret: woodpecker_webhook_secret + commands: + - | + BODY="{\"branch\":\"${CI_COMMIT_BRANCH}\",\"status\":\"${CI_PIPELINE_STATUS}\",\"buildUrl\":\"${CI_PIPELINE_LINK}\",\"repo\":\"${CI_REPO}\"}" + SIG=$(echo -n "$BODY" | openssl dgst -sha256 -hmac "$WOODPECKER_WEBHOOK_SECRET" | awk '{print $2}') + curl -s -o /dev/null -w "%{http_code}" -X POST "${MOSAIC_WEBHOOK_URL}/api/webhooks/woodpecker" \ + -H "Content-Type: application/json" \ + -H "X-Woodpecker-Signature: ${SIG}" \ + -d "$BODY" || true + when: + - status: [success, failure] + depends_on: + - build + security-trivy-web: image: aquasec/trivy:latest environment: diff --git a/apps/api/src/queue-notifications/queue-notifications.module.ts b/apps/api/src/queue-notifications/queue-notifications.module.ts index 59f17a0..61a3b9c 100644 --- a/apps/api/src/queue-notifications/queue-notifications.module.ts +++ b/apps/api/src/queue-notifications/queue-notifications.module.ts @@ -4,10 +4,11 @@ import { AuthModule } from "../auth/auth.module"; import { ApiKeyGuard } from "../common/guards/api-key.guard"; import { QueueNotificationsController } from "./queue-notifications.controller"; import { QueueNotificationsService } from "./queue-notifications.service"; +import { WoodpeckerWebhookController } from "./woodpecker-webhook.controller"; @Module({ imports: [ConfigModule, AuthModule], - controllers: [QueueNotificationsController], + controllers: [QueueNotificationsController, WoodpeckerWebhookController], providers: [QueueNotificationsService, ApiKeyGuard], exports: [QueueNotificationsService], }) diff --git a/apps/api/src/queue-notifications/queue-notifications.service.spec.ts b/apps/api/src/queue-notifications/queue-notifications.service.spec.ts index b9b5722..89f7012 100644 --- a/apps/api/src/queue-notifications/queue-notifications.service.spec.ts +++ b/apps/api/src/queue-notifications/queue-notifications.service.spec.ts @@ -1,7 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { ConfigService } from "@nestjs/config"; import { Logger, NotFoundException } from "@nestjs/common"; -import { mkdtemp, mkdir, rm, writeFile } from "node:fs/promises"; +import { mkdtemp, mkdir, readFile, readdir, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { execFile } from "node:child_process"; @@ -14,17 +14,23 @@ vi.mock("node:child_process", () => ({ describe("QueueNotificationsService", () => { let service: QueueNotificationsService; let inboxDir: string; + let agentStateDir: string; let configService: ConfigService; beforeEach(async () => { vi.clearAllMocks(); inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-")); + agentStateDir = await mkdtemp(join(tmpdir(), "agent-state-")); configService = { get: vi.fn((key: string) => { if (key === "MOSAIC_QUEUE_INBOX_DIR") { return inboxDir; } + if (key === "MOSAIC_AGENT_STATE_DIR") { + return agentStateDir; + } + if (key === "MOSAIC_QUEUE_CLI") { return "/tmp/mosaic-queue-cli.js"; } @@ -39,6 +45,7 @@ describe("QueueNotificationsService", () => { afterEach(async () => { vi.restoreAllMocks(); await rm(inboxDir, { recursive: true, force: true }); + await rm(agentStateDir, { recursive: true, force: true }); }); describe("onModuleInit", () => { @@ -169,4 +176,93 @@ describe("QueueNotificationsService", () => { ); }); }); + + describe("notifyAgentCiResult", () => { + it("writes one notification per active agent-state record matching the branch", async () => { + await mkdir(join(agentStateDir, "active"), { recursive: true }); + await writeFile( + join(agentStateDir, "active", "sage-landing-page.json"), + JSON.stringify({ + taskId: "sage-landing-page", + status: "spawned", + startedAt: "2026-03-08T22:47:20Z", + lastUpdated: "2026-03-08T23:15:11.726Z", + agent: "sage", + branch: "feature/landing-page", + description: "Create apps/landing marketing site", + }) + ); + await writeFile( + join(agentStateDir, "active", "pixels-other.json"), + JSON.stringify({ + taskId: "pixels-other", + status: "spawned", + startedAt: "2026-03-08T22:47:20Z", + lastUpdated: "2026-03-08T23:15:11.726Z", + agent: "pixels", + branch: "feature/something-else", + description: "Unrelated", + }) + ); + + await expect( + service.notifyAgentCiResult({ + branch: "feature/landing-page", + status: "success", + buildUrl: "https://ci.example/build/123", + repo: "mosaic/stack", + }) + ).resolves.toEqual({ notified: 1 }); + + const inboxFiles = await readdir(join(inboxDir, "sage")); + expect(inboxFiles).toHaveLength(1); + + const notification = JSON.parse( + await readFile(join(inboxDir, "sage", inboxFiles[0]!), "utf8") + ) as Record; + + expect(notification).toMatchObject({ + taskId: "sage-landing-page", + event: "completed", + targetAgent: "sage", + fromAgent: "mosaic-api", + retries: 0, + maxRetries: 3, + ttlSeconds: 600, + payload: { + branch: "feature/landing-page", + buildUrl: "https://ci.example/build/123", + repo: "mosaic/stack", + ciStatus: "success", + }, + }); + expect(typeof notification.id).toBe("string"); + expect(typeof notification.createdAt).toBe("string"); + }); + + it("returns zero when no active agent-state branch matches the webhook payload", async () => { + await mkdir(join(agentStateDir, "active"), { recursive: true }); + await writeFile( + join(agentStateDir, "active", "mosaic-task.json"), + JSON.stringify({ + taskId: "mosaic-task", + status: "spawned", + startedAt: "2026-03-08T22:47:20Z", + lastUpdated: "2026-03-08T23:15:11.726Z", + agent: "mosaic", + branch: "feature/not-this-one", + description: "Unrelated", + }) + ); + + await expect( + service.notifyAgentCiResult({ + branch: "feature/landing-page", + status: "failure", + buildUrl: "https://ci.example/build/456", + repo: "mosaic/stack", + }) + ).resolves.toEqual({ notified: 0 }); + }); + }); }); diff --git a/apps/api/src/queue-notifications/queue-notifications.service.ts b/apps/api/src/queue-notifications/queue-notifications.service.ts index 6db96a9..ca69515 100644 --- a/apps/api/src/queue-notifications/queue-notifications.service.ts +++ b/apps/api/src/queue-notifications/queue-notifications.service.ts @@ -6,8 +6,9 @@ import { OnModuleInit, } from "@nestjs/common"; import { ConfigService } from "@nestjs/config"; +import { randomUUID } from "node:crypto"; import { execFile } from "node:child_process"; -import { access, readdir, readFile, stat } from "node:fs/promises"; +import { access, mkdir, readdir, readFile, stat, writeFile } from "node:fs/promises"; import { homedir } from "node:os"; import { basename, join } from "node:path"; import type { Response } from "express"; @@ -29,6 +30,30 @@ export interface QueueTask { description: string; } +interface AgentStateRecord { + taskId: string; + agent: string; + branch: string; +} + +interface AgentCiNotification { + id: string; + taskId: string; + event: "completed" | "failed"; + targetAgent: string; + fromAgent: string; + payload: { + branch: string; + buildUrl: string; + repo: string; + ciStatus: "success" | "failure"; + }; + createdAt: string; + retries: number; + maxRetries: number; + ttlSeconds: number; +} + @Injectable() export class QueueNotificationsService implements OnModuleInit { private readonly logger = new Logger(QueueNotificationsService.name); @@ -171,6 +196,52 @@ export class QueueNotificationsService implements OnModuleInit { }); } + async notifyAgentCiResult(opts: { + branch: string; + status: "success" | "failure"; + buildUrl: string; + repo: string; + }): Promise<{ notified: number }> { + const activeRecords = await this.listActiveAgentStateRecords(); + const matches = activeRecords.filter((record) => record.branch === opts.branch); + + await Promise.all( + matches.map(async (record) => { + const notification: AgentCiNotification = { + id: randomUUID(), + taskId: record.taskId, + event: opts.status === "success" ? "completed" : "failed", + targetAgent: record.agent, + fromAgent: "mosaic-api", + payload: { + branch: opts.branch, + buildUrl: opts.buildUrl, + repo: opts.repo, + ciStatus: opts.status, + }, + createdAt: new Date().toISOString(), + retries: 0, + maxRetries: 3, + ttlSeconds: 600, + }; + + const agentDir = join(this.getInboxDir(), record.agent); + // Path is built from the configured inbox root plus validated agent-state entries. + // eslint-disable-next-line security/detect-non-literal-fs-filename + await mkdir(agentDir, { recursive: true }); + // Path is built from the configured inbox root plus validated agent-state entries. + // eslint-disable-next-line security/detect-non-literal-fs-filename + await writeFile( + join(agentDir, `${notification.id}.json`), + JSON.stringify(notification, null, 2), + "utf8" + ); + }) + ); + + return { notified: matches.length }; + } + private async execQueueCli(args: string[]): Promise { const cliPath = this.getQueueCliPath(); @@ -198,6 +269,12 @@ export class QueueNotificationsService implements OnModuleInit { ); } + private getAgentStateDir(): string { + return this.expandHomePath( + this.configService.get("MOSAIC_AGENT_STATE_DIR") ?? "~/.openclaw/workspace/agents" + ); + } + private getQueueCliPath(): string { return this.expandHomePath( this.configService.get("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js" @@ -228,4 +305,37 @@ export class QueueNotificationsService implements OnModuleInit { private isIgnoredDirectory(name: string): boolean { return name === "_acked" || name === "_dead-letter"; } + + private async listActiveAgentStateRecords(): Promise { + const activeDir = join(this.getAgentStateDir(), "active"); + + try { + // Path comes from controlled config and a fixed "active" subdirectory. + // eslint-disable-next-line security/detect-non-literal-fs-filename + const entries = await readdir(activeDir, { withFileTypes: true }); + const records = await Promise.all( + entries + .filter((entry) => entry.isFile() && entry.name.endsWith(".json")) + .map(async (entry) => { + const filePath = join(activeDir, entry.name); + // Path comes from controlled config plus directory entries under the active root. + // eslint-disable-next-line security/detect-non-literal-fs-filename + const rawRecord = await readFile(filePath, "utf8"); + return JSON.parse(rawRecord) as AgentStateRecord; + }) + ); + + return records.filter((record) => { + return ( + typeof record.taskId === "string" && + typeof record.agent === "string" && + typeof record.branch === "string" + ); + }); + } catch (error: unknown) { + const message = error instanceof Error ? error.message : String(error); + this.logger.warn(`Unable to read active agent-state records from ${activeDir}: ${message}`); + return []; + } + } } diff --git a/apps/api/src/queue-notifications/woodpecker-webhook.controller.spec.ts b/apps/api/src/queue-notifications/woodpecker-webhook.controller.spec.ts new file mode 100644 index 0000000..3aec8b7 --- /dev/null +++ b/apps/api/src/queue-notifications/woodpecker-webhook.controller.spec.ts @@ -0,0 +1,132 @@ +import { createHmac } from "node:crypto"; +import { Logger } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; +import { Test, type TestingModule } from "@nestjs/testing"; +import type { Request } from "express"; +import { describe, beforeEach, expect, it, vi } from "vitest"; +import { QueueNotificationsService } from "./queue-notifications.service"; +import { + type WoodpeckerWebhookPayload, + WoodpeckerWebhookController, +} from "./woodpecker-webhook.controller"; + +function signPayload(payload: WoodpeckerWebhookPayload, secret: string): string { + return createHmac("sha256", secret).update(JSON.stringify(payload)).digest("hex"); +} + +describe("WoodpeckerWebhookController", () => { + let controller: WoodpeckerWebhookController; + + const mockService = { + notifyAgentCiResult: vi.fn(), + }; + + const mockConfigService = { + get: vi.fn(), + }; + + beforeEach(async () => { + vi.clearAllMocks(); + mockConfigService.get.mockImplementation((key: string) => { + if (key === "WOODPECKER_WEBHOOK_SECRET") { + return "test-secret"; + } + + return undefined; + }); + + const module: TestingModule = await Test.createTestingModule({ + controllers: [WoodpeckerWebhookController], + providers: [ + { provide: QueueNotificationsService, useValue: mockService }, + { provide: ConfigService, useValue: mockConfigService }, + ], + }).compile(); + + controller = module.get(WoodpeckerWebhookController); + }); + + it("accepts a valid signature and forwards the payload to the service", async () => { + const payload: WoodpeckerWebhookPayload = { + branch: "feat/ms24-ci-webhook", + status: "success", + buildUrl: "https://ci.example/build/123", + repo: "mosaic/stack", + }; + const signature = signPayload(payload, "test-secret"); + mockService.notifyAgentCiResult.mockResolvedValue({ notified: 2 }); + + await expect( + controller.handleWebhook( + { rawBody: Buffer.from(JSON.stringify(payload)) } as Request, + payload, + signature + ) + ).resolves.toEqual({ ok: true, notified: 2 }); + + expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload); + }); + + it("returns ok without notifying when the signature is invalid", async () => { + const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined); + const payload: WoodpeckerWebhookPayload = { + branch: "feat/ms24-ci-webhook", + status: "failure", + buildUrl: "https://ci.example/build/123", + repo: "mosaic/stack", + }; + + await expect( + controller.handleWebhook( + { rawBody: Buffer.from(JSON.stringify(payload)) } as Request, + payload, + "bad-signature" + ) + ).resolves.toEqual({ ok: true, notified: 0 }); + + expect(mockService.notifyAgentCiResult).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining("invalid Woodpecker webhook signature") + ); + }); + + it("accepts the payload when the webhook secret is missing", async () => { + const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined); + const payload: WoodpeckerWebhookPayload = { + branch: "feat/ms24-ci-webhook", + status: "success", + buildUrl: "https://ci.example/build/123", + repo: "mosaic/stack", + }; + mockConfigService.get.mockReturnValue(undefined); + mockService.notifyAgentCiResult.mockResolvedValue({ notified: 1 }); + + await expect(controller.handleWebhook({} as Request, payload, "")).resolves.toEqual({ + ok: true, + notified: 1, + }); + + expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining("WOODPECKER_WEBHOOK_SECRET is not configured") + ); + }); + + it("returns zero notifications when no active branch matches", async () => { + const payload: WoodpeckerWebhookPayload = { + branch: "feat/ms24-ci-webhook", + status: "success", + buildUrl: "https://ci.example/build/999", + repo: "mosaic/stack", + }; + mockService.notifyAgentCiResult.mockResolvedValue({ notified: 0 }); + + await expect( + controller.handleWebhook( + { rawBody: Buffer.from(JSON.stringify(payload)) } as Request, + payload, + signPayload(payload, "test-secret") + ) + ).resolves.toEqual({ ok: true, notified: 0 }); + }); +}); diff --git a/apps/api/src/queue-notifications/woodpecker-webhook.controller.ts b/apps/api/src/queue-notifications/woodpecker-webhook.controller.ts new file mode 100644 index 0000000..b24be07 --- /dev/null +++ b/apps/api/src/queue-notifications/woodpecker-webhook.controller.ts @@ -0,0 +1,73 @@ +import { Body, Controller, Headers, Logger, Post, Req, type RawBodyRequest } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; +import { createHmac, timingSafeEqual } from "node:crypto"; +import type { Request } from "express"; +import { SkipCsrf } from "../common/decorators/skip-csrf.decorator"; +import { QueueNotificationsService } from "./queue-notifications.service"; + +export interface WoodpeckerWebhookPayload { + branch: string; + status: "success" | "failure"; + buildUrl: string; + repo: string; +} + +@Controller("webhooks") +export class WoodpeckerWebhookController { + private readonly logger = new Logger(WoodpeckerWebhookController.name); + + constructor( + private readonly queueService: QueueNotificationsService, + private readonly configService: ConfigService + ) {} + + @SkipCsrf() + @Post("woodpecker") + async handleWebhook( + @Req() req: RawBodyRequest, + @Body() body: WoodpeckerWebhookPayload, + @Headers("x-woodpecker-signature") signature: string + ): Promise<{ ok: boolean; notified: number }> { + const secret = this.configService.get("WOODPECKER_WEBHOOK_SECRET"); + + if (!secret) { + this.logger.warn("WOODPECKER_WEBHOOK_SECRET is not configured; accepting Woodpecker webhook"); + const result = await this.queueService.notifyAgentCiResult(body); + return { ok: true, notified: result.notified }; + } + + if (!this.isValidSignature(this.getRequestBody(req, body), signature, secret)) { + this.logger.warn("Received invalid Woodpecker webhook signature"); + return { ok: true, notified: 0 }; + } + + const result = await this.queueService.notifyAgentCiResult(body); + return { ok: true, notified: result.notified }; + } + + private getRequestBody(req: RawBodyRequest, body: WoodpeckerWebhookPayload): Buffer { + const rawBody = req.rawBody; + + if (Buffer.isBuffer(rawBody)) { + return rawBody; + } + + return Buffer.from(JSON.stringify(body)); + } + + private isValidSignature(body: Buffer, signature: string | undefined, secret: string): boolean { + if (!signature) { + return false; + } + + const expected = createHmac("sha256", secret).update(body).digest("hex"); + const signatureBuffer = Buffer.from(signature); + const expectedBuffer = Buffer.from(expected); + + if (signatureBuffer.length !== expectedBuffer.length) { + return false; + } + + return timingSafeEqual(signatureBuffer, expectedBuffer); + } +}