Compare commits

..

27 Commits

Author SHA1 Message Date
5f0a7c847c fix: use double quotes for ConfigService key (prettier)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-10 22:50:58 -05:00
639abfaefa fix: bump simple-git to 3.32.3 and use ConfigService for giteaApiBaseUrl
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 22:44:03 -05:00
6e2b9a307e feat(gatekeeper): add PR merge automation service
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-10 21:35:11 -05:00
3289677056 Merge pull request 'chore(tasks): mark MS24 all done' (#756) from chore/ms24-ver-done into main 2026-03-09 02:58:46 +00:00
5a14a97cb4 chore(tasks): mark MS24 all done including API-002 and VER-001 2026-03-08 21:58:33 -05:00
aebf6b18db Merge pull request 'chore(tasks): correct MS24 task statuses' (#755) from chore/ms24-tasks-correction into main 2026-03-09 02:38:36 +00:00
6fbfb3c197 chore(tasks): correct MS24 task statuses — API-002 done, VER-001 not-started pending CI 2026-03-08 21:38:12 -05:00
348943c5f7 Merge pull request 'feat(api): MS24 Woodpecker CI webhook → agent notification' (#754) from feat/ms24-ci-webhook into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-09 02:33:37 +00:00
39d36e67c5 ci: trigger pipeline with woodpecker secrets
All checks were successful
ci/woodpecker/push/infra Pipeline was successful
ci/woodpecker/push/coordinator Pipeline was successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 21:27:49 -05:00
e8a2d32476 feat(api): add Woodpecker CI webhook notifications 2026-03-08 18:37:35 -05:00
3c0c10c9e5 Merge pull request 'chore(tasks): add MS24-API-002 Woodpecker webhook task' (#753) from chore/ms24-api-002-task into main 2026-03-08 23:30:12 +00:00
f59ce6a7a5 chore(tasks): add MS24-API-002 Woodpecker webhook task 2026-03-08 18:30:00 -05:00
11fa1734bd Merge pull request 'chore(tasks): mark MS24 tasks done' (#752) from chore/ms24-tasks-done into main 2026-03-08 23:25:43 +00:00
46815707a9 chore(tasks): mark MS24 tasks done 2026-03-08 18:25:28 -05:00
621df6ee70 chore(tasks): add MS24 queue integration tasks 2026-03-08 18:25:20 -05:00
ac406f19bc Merge pull request 'feat(web): MS24 queue notification feed component' (#751) from feat/ms24-queue-ui into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 23:21:23 +00:00
72d295edd6 feat(web): add queue notification feed
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 18:18:07 -05:00
6e9def3c5a Merge pull request 'feat(api): MS24 queue notifications module' (#750) from feat/ms24-queue-api into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 23:04:16 +00:00
456d53fc7f fix(api): add SkipCsrf to queue notification ack endpoint
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 17:54:35 -05:00
8014930b70 Merge pull request 'fix(api): proxy mission-control routes to orchestrator' (#748) from fix/mission-control-proxy into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 22:51:08 +00:00
06f2cc4be3 feat(api): add queue notifications module
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 17:27:41 -05:00
04bbdf3308 fix(api): proxy mission-control routes to orchestrator
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-08 14:04:12 -05:00
a6f1438f40 fix(web): route Mission Control API calls through orchestrator proxy (#747)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-08 17:00:27 +00:00
523662656e Merge pull request 'fix(deploy): use consistent network alias for openbrain-brain-internal' (#746) from fix/compose-network-alias-consistency into main 2026-03-08 16:35:32 +00:00
27120ac3f2 fix(deploy): use consistent network alias for openbrain-brain-internal
All checks were successful
ci/woodpecker/push/ci Infra-only: compose YAML fix, no app code
Service definitions were using 'openbrain_brain-internal' (underscore) but the
networks block defines the alias as 'openbrain-brain-internal' (hyphen), with
name: openbrain_brain-internal pointing to the actual Docker network.

This caused 'undefined network' errors on every Portainer deploy for
orchestrator and synapse services.

Fixed: all service network references now use 'openbrain-brain-internal'.
2026-03-08 11:34:38 -05:00
ad9921107c fix(deploy): add DATABASE_URL and openbrain network to orchestrator + synapse (#745)
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-08 15:51:22 +00:00
3c288f9849 fix(web): add ReactQueryProvider to root layout for Mission Control (#744)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
2026-03-08 15:50:40 +00:00
55 changed files with 3353 additions and 128 deletions

View File

@@ -292,6 +292,11 @@ GITEA_REPO_NAME=stack
# Configure in Gitea: Repository Settings → Webhooks → Add Webhook # Configure in Gitea: Repository Settings → Webhooks → Add Webhook
GITEA_WEBHOOK_SECRET=REPLACE_WITH_RANDOM_WEBHOOK_SECRET GITEA_WEBHOOK_SECRET=REPLACE_WITH_RANDOM_WEBHOOK_SECRET
# Gatekeeper merge automation
# Uses the same Gitea host to validate PR webhooks and to merge approved PRs after green CI.
GITEA_API_TOKEN=REPLACE_WITH_GATEKEEPER_GITEA_API_TOKEN
GATEKEEPER_ENABLED=true
# Coordinator API Key (service-to-service authentication) # Coordinator API Key (service-to-service authentication)
# CRITICAL: Generate a random API key with at least 32 characters # CRITICAL: Generate a random API key with at least 32 characters
# Example: openssl rand -base64 32 # Example: openssl rand -base64 32

View File

@@ -271,6 +271,26 @@ steps:
depends_on: depends_on:
- docker-build-orchestrator - docker-build-orchestrator
notify-webhook:
image: curlimages/curl:8.6.0
environment:
MOSAIC_WEBHOOK_URL:
from_secret: mosaic_webhook_url
WOODPECKER_WEBHOOK_SECRET:
from_secret: woodpecker_webhook_secret
commands:
- |
BODY="{\"branch\":\"${CI_COMMIT_BRANCH}\",\"status\":\"${CI_PIPELINE_STATUS}\",\"buildUrl\":\"${CI_PIPELINE_LINK}\",\"repo\":\"${CI_REPO}\",\"prNumber\":${CI_COMMIT_PULL_REQUEST:-null},\"headSha\":\"${CI_COMMIT_SHA}\"}"
SIG=$(echo -n "$BODY" | openssl dgst -sha256 -hmac "$WOODPECKER_WEBHOOK_SECRET" | awk '{print $2}')
curl -s -o /dev/null -w "%{http_code}" -X POST "${MOSAIC_WEBHOOK_URL}/api/webhooks/woodpecker" \
-H "Content-Type: application/json" \
-H "X-Woodpecker-Signature: ${SIG}" \
-d "$BODY" || true
when:
- status: [success, failure]
depends_on:
- build
security-trivy-web: security-trivy-web:
image: aquasec/trivy:latest image: aquasec/trivy:latest
environment: environment:

View File

@@ -24,6 +24,11 @@ ENCRYPTION_KEY=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
# In development, a random key is generated if not set # In development, a random key is generated if not set
CSRF_SECRET=fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210 CSRF_SECRET=fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210
# Gatekeeper merge automation
GITEA_WEBHOOK_SECRET=replace-with-gitea-webhook-secret
GITEA_API_TOKEN=replace-with-gitea-api-token
GATEKEEPER_ENABLED=true
# OpenTelemetry Configuration # OpenTelemetry Configuration
# Enable/disable OpenTelemetry tracing (default: true) # Enable/disable OpenTelemetry tracing (default: true)
OTEL_ENABLED=true OTEL_ENABLED=true

View File

@@ -12,7 +12,7 @@
"lint:fix": "eslint \"src/**/*.ts\" --fix", "lint:fix": "eslint \"src/**/*.ts\" --fix",
"typecheck": "tsc --noEmit", "typecheck": "tsc --noEmit",
"clean": "rm -rf dist", "clean": "rm -rf dist",
"test": "vitest run", "test": "node scripts/vitest-runner.mjs",
"test:watch": "vitest", "test:watch": "vitest",
"test:coverage": "vitest run --coverage", "test:coverage": "vitest run --coverage",
"test:e2e": "vitest run --config ./vitest.e2e.config.ts", "test:e2e": "vitest run --config ./vitest.e2e.config.ts",
@@ -56,6 +56,7 @@
"bcryptjs": "^3.0.3", "bcryptjs": "^3.0.3",
"better-auth": "^1.4.17", "better-auth": "^1.4.17",
"bullmq": "^5.67.2", "bullmq": "^5.67.2",
"chokidar": "^4.0.3",
"class-transformer": "^0.5.1", "class-transformer": "^0.5.1",
"class-validator": "^0.14.3", "class-validator": "^0.14.3",
"cookie-parser": "^1.4.7", "cookie-parser": "^1.4.7",

View File

@@ -0,0 +1,14 @@
CREATE TABLE "pending_merges" (
"id" UUID PRIMARY KEY DEFAULT gen_random_uuid(),
"repo" TEXT NOT NULL,
"pr_number" INTEGER NOT NULL,
"head_sha" TEXT NOT NULL,
"state" TEXT NOT NULL DEFAULT 'pending',
"review_result" JSONB,
"ci_status" TEXT,
"requester" TEXT,
"gitea_merge_url" TEXT,
"created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
"updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT "pending_merges_repo_pr_sha_key" UNIQUE ("repo", "pr_number", "head_sha")
);

View File

@@ -261,6 +261,23 @@ model User {
@@map("users") @@map("users")
} }
model PendingMerge {
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
repo String
prNumber Int @map("pr_number")
headSha String @map("head_sha")
state String @default("pending")
reviewResult Json? @map("review_result")
ciStatus String? @map("ci_status")
requester String?
giteaMergeUrl String? @map("gitea_merge_url")
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz
@@unique([repo, prNumber, headSha])
@@map("pending_merges")
}
model UserPreference { model UserPreference {
id String @id @default(uuid()) @db.Uuid id String @id @default(uuid()) @db.Uuid
userId String @unique @map("user_id") @db.Uuid userId String @unique @map("user_id") @db.Uuid

View File

@@ -0,0 +1,36 @@
import { spawnSync } from "node:child_process";
import { join } from "node:path";
const rawArgs = process.argv.slice(2);
const passthroughArgs = [];
for (let index = 0; index < rawArgs.length; index += 1) {
const arg = rawArgs[index];
if (arg === "--testPathPattern") {
const value = rawArgs[index + 1];
if (value) {
passthroughArgs.push(value);
index += 1;
}
continue;
}
if (arg.startsWith("--testPathPattern=")) {
passthroughArgs.push(arg.slice("--testPathPattern=".length));
continue;
}
passthroughArgs.push(arg);
}
const vitestBin = join(process.cwd(), "node_modules", ".bin", "vitest");
const result = spawnSync(vitestBin, ["run", ...passthroughArgs], {
stdio: "inherit",
});
if (result.error) {
throw result.error;
}
process.exit(result.status ?? 1);

View File

@@ -346,7 +346,9 @@ describe("AdminService", () => {
data: { deactivatedAt: expect.any(Date) }, data: { deactivatedAt: expect.any(Date) },
}) })
); );
expect(mockPrismaService.session.deleteMany).toHaveBeenCalledWith({ where: { userId: mockUserId } }); expect(mockPrismaService.session.deleteMany).toHaveBeenCalledWith({
where: { userId: mockUserId },
});
}); });
it("should throw NotFoundException if user does not exist", async () => { it("should throw NotFoundException if user does not exist", async () => {

View File

@@ -60,7 +60,10 @@ import { ContainerReaperModule } from "./container-reaper/container-reaper.modul
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module"; import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
import { OnboardingModule } from "./onboarding/onboarding.module"; import { OnboardingModule } from "./onboarding/onboarding.module";
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module"; import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
import { MissionControlProxyModule } from "./mission-control-proxy/mission-control-proxy.module";
import { OrchestratorModule } from "./orchestrator/orchestrator.module"; import { OrchestratorModule } from "./orchestrator/orchestrator.module";
import { QueueNotificationsModule } from "./queue-notifications/queue-notifications.module";
import { GatekeeperModule } from "./gatekeeper/gatekeeper.module";
@Module({ @Module({
imports: [ imports: [
@@ -142,7 +145,10 @@ import { OrchestratorModule } from "./orchestrator/orchestrator.module";
FleetSettingsModule, FleetSettingsModule,
OnboardingModule, OnboardingModule,
ChatProxyModule, ChatProxyModule,
MissionControlProxyModule,
OrchestratorModule, OrchestratorModule,
GatekeeperModule,
QueueNotificationsModule,
], ],
controllers: [AppController, CsrfController], controllers: [AppController, CsrfController],
providers: [ providers: [

View File

@@ -211,9 +211,7 @@ describe("AuthGuard", () => {
}); });
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException); await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
await expect(guard.canActivate(context)).rejects.toThrow( await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
"Invalid user data in session"
);
}); });
it("should throw UnauthorizedException when user is missing email", async () => { it("should throw UnauthorizedException when user is missing email", async () => {
@@ -227,9 +225,7 @@ describe("AuthGuard", () => {
}); });
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException); await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
await expect(guard.canActivate(context)).rejects.toThrow( await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
"Invalid user data in session"
);
}); });
it("should throw UnauthorizedException when user is missing name", async () => { it("should throw UnauthorizedException when user is missing name", async () => {
@@ -243,9 +239,7 @@ describe("AuthGuard", () => {
}); });
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException); await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
await expect(guard.canActivate(context)).rejects.toThrow( await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
"Invalid user data in session"
);
}); });
it("should throw UnauthorizedException when user is a string", async () => { it("should throw UnauthorizedException when user is a string", async () => {
@@ -259,9 +253,7 @@ describe("AuthGuard", () => {
}); });
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException); await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
await expect(guard.canActivate(context)).rejects.toThrow( await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
"Invalid user data in session"
);
}); });
it("should reject when user is null (typeof null === 'object' causes TypeError on 'in' operator)", async () => { it("should reject when user is null (typeof null === 'object' causes TypeError on 'in' operator)", async () => {
@@ -277,9 +269,7 @@ describe("AuthGuard", () => {
}); });
await expect(guard.canActivate(context)).rejects.toThrow(TypeError); await expect(guard.canActivate(context)).rejects.toThrow(TypeError);
await expect(guard.canActivate(context)).rejects.not.toBeInstanceOf( await expect(guard.canActivate(context)).rejects.not.toBeInstanceOf(UnauthorizedException);
UnauthorizedException
);
}); });
}); });

View File

@@ -154,16 +154,14 @@ describe("CoordinatorIntegrationService", () => {
// Mock transaction that passes through the callback // Mock transaction that passes through the callback
mockPrismaService.$transaction.mockImplementation(async (callback) => { mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = { const mockTx = {
$queryRaw: vi $queryRaw: vi.fn().mockResolvedValue([
.fn() {
.mockResolvedValue([ id: mockJob.id,
{ status: mockJob.status,
id: mockJob.id, workspace_id: mockJob.workspaceId,
status: mockJob.status, version: 1,
workspace_id: mockJob.workspaceId, },
version: 1, ]),
},
]),
runnerJob: { runnerJob: {
update: vi.fn().mockResolvedValue(updatedJob), update: vi.fn().mockResolvedValue(updatedJob),
}, },
@@ -204,16 +202,14 @@ describe("CoordinatorIntegrationService", () => {
// Mock transaction with completed job // Mock transaction with completed job
mockPrismaService.$transaction.mockImplementation(async (callback) => { mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = { const mockTx = {
$queryRaw: vi $queryRaw: vi.fn().mockResolvedValue([
.fn() {
.mockResolvedValue([ id: mockJob.id,
{ status: RunnerJobStatus.COMPLETED,
id: mockJob.id, workspace_id: mockJob.workspaceId,
status: RunnerJobStatus.COMPLETED, version: 1,
workspace_id: mockJob.workspaceId, },
version: 1, ]),
},
]),
runnerJob: { runnerJob: {
update: vi.fn(), update: vi.fn(),
}, },
@@ -271,16 +267,14 @@ describe("CoordinatorIntegrationService", () => {
// Mock transaction with running job // Mock transaction with running job
mockPrismaService.$transaction.mockImplementation(async (callback) => { mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = { const mockTx = {
$queryRaw: vi $queryRaw: vi.fn().mockResolvedValue([
.fn() {
.mockResolvedValue([ id: mockJob.id,
{ status: RunnerJobStatus.RUNNING,
id: mockJob.id, workspace_id: mockJob.workspaceId,
status: RunnerJobStatus.RUNNING, version: 1,
workspace_id: mockJob.workspaceId, },
version: 1, ]),
},
]),
runnerJob: { runnerJob: {
update: vi.fn().mockResolvedValue(completedJob), update: vi.fn().mockResolvedValue(completedJob),
}, },
@@ -315,16 +309,14 @@ describe("CoordinatorIntegrationService", () => {
// Mock transaction with running job // Mock transaction with running job
mockPrismaService.$transaction.mockImplementation(async (callback) => { mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = { const mockTx = {
$queryRaw: vi $queryRaw: vi.fn().mockResolvedValue([
.fn() {
.mockResolvedValue([ id: mockJob.id,
{ status: RunnerJobStatus.RUNNING,
id: mockJob.id, workspace_id: mockJob.workspaceId,
status: RunnerJobStatus.RUNNING, version: 1,
workspace_id: mockJob.workspaceId, },
version: 1, ]),
},
]),
runnerJob: { runnerJob: {
update: vi.fn().mockResolvedValue(failedJob), update: vi.fn().mockResolvedValue(failedJob),
}, },

View File

@@ -0,0 +1,113 @@
import {
IsArray,
IsIn,
IsInt,
IsObject,
IsOptional,
IsString,
MaxLength,
Min,
MinLength,
ValidateNested,
} from "class-validator";
import { Type } from "class-transformer";
class GiteaLabelDto {
@IsString()
@MinLength(1)
@MaxLength(255)
name!: string;
}
class GiteaRepoDto {
@IsString()
@MinLength(1)
@MaxLength(512)
full_name!: string;
}
class GiteaBranchRefDto {
@IsString()
@MinLength(1)
@MaxLength(255)
ref!: string;
}
class GiteaHeadDto {
@IsString()
@MinLength(7)
@MaxLength(128)
sha!: string;
}
class GiteaPullRequestDto {
@IsInt()
@Min(1)
number!: number;
@IsOptional()
@IsString()
body?: string;
@ValidateNested()
@Type(() => GiteaBranchRefDto)
base!: GiteaBranchRefDto;
@ValidateNested()
@Type(() => GiteaHeadDto)
head!: GiteaHeadDto;
@IsArray()
@ValidateNested({ each: true })
@Type(() => GiteaLabelDto)
labels!: GiteaLabelDto[];
@IsOptional()
@IsString()
html_url?: string;
@IsOptional()
@IsString()
url?: string;
}
class GiteaSenderDto {
@IsString()
@MinLength(1)
@MaxLength(255)
login!: string;
}
export class GiteaPrWebhookDto {
@IsString()
@IsIn(["pull_request"])
@MaxLength(64)
type!: string;
@IsString()
@IsIn(["opened", "labeled", "synchronize"])
@MaxLength(64)
action!: "opened" | "labeled" | "synchronize";
@ValidateNested()
@Type(() => GiteaRepoDto)
repository!: GiteaRepoDto;
@ValidateNested()
@Type(() => GiteaPullRequestDto)
pull_request!: GiteaPullRequestDto;
@IsOptional()
@ValidateNested()
@Type(() => GiteaLabelDto)
label?: GiteaLabelDto;
@IsOptional()
@ValidateNested()
@Type(() => GiteaSenderDto)
sender?: GiteaSenderDto;
@IsOptional()
@IsObject()
review_result?: Record<string, unknown>;
}

View File

@@ -0,0 +1,98 @@
import { createHmac } from "node:crypto";
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import type { Request } from "express";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { GatekeeperController } from "./gatekeeper.controller";
import { GatekeeperService } from "./gatekeeper.service";
describe("GatekeeperController", () => {
let controller: GatekeeperController;
const gatekeeperService = {
handlePrEvent: vi.fn(),
};
const configService = {
get: vi.fn(),
};
const payload = {
type: "pull_request",
action: "labeled",
label: { name: "auto-merge" },
repository: { full_name: "mosaic/stack" },
sender: { login: "jason" },
pull_request: {
number: 7,
body: "ready",
base: { ref: "main" },
head: { sha: "abcdef1234567890" },
labels: [{ name: "auto-merge" }],
},
};
beforeEach(async () => {
vi.clearAllMocks();
configService.get.mockImplementation((key: string) => {
if (key === "GITEA_WEBHOOK_SECRET") {
return "secret";
}
return undefined;
});
gatekeeperService.handlePrEvent.mockResolvedValue(undefined);
const module: TestingModule = await Test.createTestingModule({
controllers: [GatekeeperController],
providers: [
{ provide: GatekeeperService, useValue: gatekeeperService },
{ provide: ConfigService, useValue: configService },
],
}).compile();
controller = module.get(GatekeeperController);
});
it("accepts a valid signature and schedules processing", async () => {
const signature = createHmac("sha256", "secret").update(JSON.stringify(payload)).digest("hex");
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
signature
)
).resolves.toEqual({ ok: true });
expect(gatekeeperService.handlePrEvent).toHaveBeenCalledWith(payload);
});
it("ignores invalid signatures", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
await expect(controller.handleWebhook({} as Request, payload, "bad")).resolves.toEqual({
ok: true,
});
expect(gatekeeperService.handlePrEvent).not.toHaveBeenCalled();
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("invalid Gitea webhook signature")
);
});
it("accepts requests without validation when no secret is configured", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
configService.get.mockReturnValue(undefined);
await expect(controller.handleWebhook({} as Request, payload, "")).resolves.toEqual({
ok: true,
});
expect(gatekeeperService.handlePrEvent).toHaveBeenCalledWith(payload);
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("GITEA_WEBHOOK_SECRET is not configured")
);
});
});

View File

@@ -0,0 +1,67 @@
import { Body, Controller, Headers, Logger, Post, Req, type RawBodyRequest } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { createHmac, timingSafeEqual } from "node:crypto";
import type { Request } from "express";
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
import { GiteaPrWebhookDto } from "./dto/gitea-pr-webhook.dto";
import { GatekeeperService } from "./gatekeeper.service";
@Controller("gatekeeper/webhook")
export class GatekeeperController {
private readonly logger = new Logger(GatekeeperController.name);
constructor(
private readonly gatekeeperService: GatekeeperService,
private readonly configService: ConfigService
) {}
@SkipCsrf()
@Post("gitea")
handleWebhook(
@Req() req: RawBodyRequest<Request>,
@Body() body: GiteaPrWebhookDto,
@Headers("x-gitea-signature") signature: string | undefined
): Promise<{ ok: boolean }> {
const secret = this.configService.get<string>("GITEA_WEBHOOK_SECRET");
if (secret && !this.isValidSignature(this.getRequestBody(req, body), signature, secret)) {
this.logger.warn("Received invalid Gitea webhook signature");
return Promise.resolve({ ok: true });
}
if (!secret) {
this.logger.warn("GITEA_WEBHOOK_SECRET is not configured; accepting Gitea webhook");
}
void this.gatekeeperService.handlePrEvent(body).catch((error: unknown) => {
const message = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to process Gitea PR webhook: ${message}`);
});
return Promise.resolve({ ok: true });
}
private getRequestBody(req: RawBodyRequest<Request>, body: GiteaPrWebhookDto): Buffer {
if (Buffer.isBuffer(req.rawBody)) {
return req.rawBody;
}
return Buffer.from(JSON.stringify(body));
}
private isValidSignature(body: Buffer, signature: string | undefined, secret: string): boolean {
if (!signature) {
return false;
}
const expected = createHmac("sha256", secret).update(body).digest("hex");
const actual = Buffer.from(signature);
const expectedBuffer = Buffer.from(expected);
if (actual.length !== expectedBuffer.length) {
return false;
}
return timingSafeEqual(actual, expectedBuffer);
}
}

View File

@@ -0,0 +1,12 @@
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { GatekeeperController } from "./gatekeeper.controller";
import { GatekeeperService } from "./gatekeeper.service";
@Module({
imports: [ConfigModule],
controllers: [GatekeeperController],
providers: [GatekeeperService],
exports: [GatekeeperService],
})
export class GatekeeperModule {}

View File

@@ -0,0 +1,199 @@
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import { describe, beforeEach, expect, it, vi } from "vitest";
import { PrismaService } from "../prisma/prisma.service";
import type { GiteaPrWebhookDto } from "./dto/gitea-pr-webhook.dto";
import { GatekeeperService } from "./gatekeeper.service";
describe("GatekeeperService", () => {
let service: GatekeeperService;
const prisma = {
pendingMerge: {
upsert: vi.fn(),
update: vi.fn(),
findFirst: vi.fn(),
findUnique: vi.fn(),
},
};
const config = {
get: vi.fn(),
};
const basePayload: GiteaPrWebhookDto = {
type: "pull_request",
action: "labeled",
repository: { full_name: "mosaic/stack" },
label: { name: "auto-merge" },
sender: { login: "jason" },
pull_request: {
number: 42,
body: "Implements Gatekeeper",
base: { ref: "main" },
head: { sha: "abcdef1234567890" },
labels: [{ name: "auto-merge" }],
url: "https://git.mosaicstack.dev/api/v1/repos/mosaic/stack/pulls/42",
html_url: "https://git.mosaicstack.dev/mosaic/stack/pulls/42",
},
};
beforeEach(async () => {
vi.clearAllMocks();
config.get.mockImplementation((key: string) => {
switch (key) {
case "GATEKEEPER_ENABLED":
return "true";
case "GITEA_API_TOKEN":
return "token";
default:
return undefined;
}
});
const module: TestingModule = await Test.createTestingModule({
providers: [
GatekeeperService,
{ provide: PrismaService, useValue: prisma },
{ provide: ConfigService, useValue: config },
],
}).compile();
service = module.get(GatekeeperService);
});
it("moves labeled auto-merge PRs into awaiting_ci when review passes", async () => {
prisma.pendingMerge.upsert.mockResolvedValue({ id: "merge-1" });
prisma.pendingMerge.update.mockResolvedValue({});
await service.handlePrEvent(basePayload);
expect(prisma.pendingMerge.upsert).toHaveBeenCalledWith(
expect.objectContaining({
where: {
repo_prNumber_headSha: {
repo: "mosaic/stack",
prNumber: 42,
headSha: "abcdef1234567890",
},
},
})
);
expect(prisma.pendingMerge.update).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ data: expect.objectContaining({ state: "reviewing" }) })
);
expect(prisma.pendingMerge.update).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
data: expect.objectContaining({
state: "awaiting_ci",
reviewResult: { passed: true, issues: [] },
}),
})
);
});
it("rejects review failures and records the reason", async () => {
prisma.pendingMerge.upsert.mockResolvedValue({ id: "merge-2" });
prisma.pendingMerge.findUnique.mockResolvedValue({
id: "merge-2",
repo: "mosaic/stack",
prNumber: 42,
headSha: "abcdef1234567890",
});
prisma.pendingMerge.update.mockResolvedValue({});
const commentSpy = vi
.spyOn(
service as unknown as {
postPullRequestComment: (repo: string, prNumber: number, body: string) => Promise<void>;
},
"postPullRequestComment"
)
.mockResolvedValue();
await service.handlePrEvent({
...basePayload,
pull_request: {
...basePayload.pull_request,
body: "",
},
});
expect(prisma.pendingMerge.update).toHaveBeenCalledWith(
expect.objectContaining({
where: { id: "merge-2" },
data: {
state: "rejected",
reviewResult: {
passed: false,
issues: ["PR description must not be empty"],
},
},
})
);
expect(commentSpy).toHaveBeenCalledWith(
"mosaic/stack",
42,
expect.stringContaining("PR description must not be empty")
);
});
it("attempts merge on green CI for awaiting_ci records", async () => {
prisma.pendingMerge.findFirst.mockResolvedValue({
id: "merge-3",
repo: "mosaic/stack",
prNumber: 42,
headSha: "abcdef1234567890",
state: "awaiting_ci",
giteaMergeUrl: "https://git.mosaicstack.dev/api/v1/repos/mosaic/stack/pulls/42",
});
prisma.pendingMerge.update.mockResolvedValue({});
const mergeSpy = vi.spyOn(service, "attemptMerge").mockResolvedValue();
await service.handleCiEvent("mosaic/stack", 42, "abcdef1234567890", "success");
expect(prisma.pendingMerge.update).toHaveBeenCalledWith(
expect.objectContaining({
where: { id: "merge-3" },
data: expect.objectContaining({ ciStatus: "success" }),
})
);
expect(mergeSpy).toHaveBeenCalledWith("merge-3");
});
it("rejects failed CI results", async () => {
prisma.pendingMerge.findFirst.mockResolvedValue({
id: "merge-4",
repo: "mosaic/stack",
prNumber: 42,
headSha: "abcdef1234567890",
state: "awaiting_ci",
});
prisma.pendingMerge.update.mockResolvedValue({});
const rejectSpy = vi.spyOn(service, "rejectMerge").mockResolvedValue();
await service.handleCiEvent("mosaic/stack", 42, "abcdef1234567890", "failure");
expect(rejectSpy).toHaveBeenCalledWith("merge-4", "CI reported failure");
});
it("skips all work when Gatekeeper is disabled", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
config.get.mockImplementation((key: string) => {
if (key === "GATEKEEPER_ENABLED") {
return "false";
}
return undefined;
});
await service.handlePrEvent(basePayload);
await service.handleCiEvent("mosaic/stack", 42, "abcdef1234567890", "success");
expect(prisma.pendingMerge.upsert).not.toHaveBeenCalled();
expect(prisma.pendingMerge.findFirst).not.toHaveBeenCalled();
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("Gatekeeper is disabled"));
});
});

View File

@@ -0,0 +1,311 @@
import { Injectable, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Prisma } from "@prisma/client";
import { PrismaService } from "../prisma/prisma.service";
import type { GiteaPrWebhookDto } from "./dto/gitea-pr-webhook.dto";
export interface ReviewResult {
passed: boolean;
issues: string[];
}
@Injectable()
export class GatekeeperService {
private readonly logger = new Logger(GatekeeperService.name);
private get giteaApiBaseUrl(): string {
return `${this.configService.getOrThrow<string>("GITEA_URL")}/api/v1`;
}
constructor(
private readonly prisma: PrismaService,
private readonly configService: ConfigService
) {}
async handlePrEvent(payload: GiteaPrWebhookDto): Promise<void> {
if (!this.isEnabled()) {
return;
}
if (payload.type !== "pull_request") {
return;
}
const action = payload.action;
const hasAutoMergeLabel = this.hasAutoMergeLabel(payload);
if (!["opened", "labeled", "synchronize"].includes(action)) {
return;
}
if (action === "labeled" && payload.label?.name !== "auto-merge") {
return;
}
const merge = await this.prisma.pendingMerge.upsert({
where: {
repo_prNumber_headSha: {
repo: payload.repository.full_name,
prNumber: payload.pull_request.number,
headSha: payload.pull_request.head.sha,
},
},
create: {
repo: payload.repository.full_name,
prNumber: payload.pull_request.number,
headSha: payload.pull_request.head.sha,
...(payload.sender?.login ? { requester: payload.sender.login } : {}),
giteaMergeUrl:
payload.pull_request.url ??
`${this.giteaApiBaseUrl}/repos/${payload.repository.full_name}/pulls/${String(payload.pull_request.number)}`,
},
update: {
headSha: payload.pull_request.head.sha,
...(payload.sender?.login ? { requester: payload.sender.login } : {}),
giteaMergeUrl:
payload.pull_request.url ??
`${this.giteaApiBaseUrl}/repos/${payload.repository.full_name}/pulls/${String(payload.pull_request.number)}`,
},
});
if (action === "synchronize") {
await this.prisma.pendingMerge.update({
where: { id: merge.id },
data: {
state: hasAutoMergeLabel ? "pending" : "rejected",
ciStatus: null,
reviewResult: Prisma.DbNull,
},
});
if (hasAutoMergeLabel) {
await this.runReview(merge.id, payload);
}
return;
}
if (hasAutoMergeLabel) {
await this.runReview(merge.id, payload);
}
}
async handleCiEvent(
repo: string,
prNumber: number,
headSha: string,
status: "success" | "failure"
): Promise<void> {
if (!this.isEnabled()) {
return;
}
const merge = await this.prisma.pendingMerge.findFirst({
where: {
repo,
prNumber,
headSha,
},
orderBy: {
createdAt: "desc",
},
});
if (!merge) {
this.logger.debug(`No pending merge found for ${repo}#${String(prNumber)} @ ${headSha}`);
return;
}
await this.prisma.pendingMerge.update({
where: { id: merge.id },
data: {
ciStatus: status,
},
});
if (status === "failure") {
await this.rejectMerge(merge.id, "CI reported failure");
return;
}
if (merge.state === "awaiting_ci") {
await this.attemptMerge(merge.id);
}
}
reviewPr(payload: GiteaPrWebhookDto): Promise<ReviewResult> {
const issues: string[] = [];
if (!this.hasAutoMergeLabel(payload)) {
issues.push("PR must have the auto-merge label");
}
if (!payload.pull_request.body?.trim()) {
issues.push("PR description must not be empty");
}
if (payload.pull_request.base.ref !== "main") {
issues.push("PR base branch must be main");
}
if (!/^[0-9a-f]{7,128}$/i.test(payload.pull_request.head.sha)) {
issues.push("PR head SHA must be a valid git commit hash");
}
return Promise.resolve({
passed: issues.length === 0,
issues,
});
}
async attemptMerge(mergeId: string): Promise<void> {
const merge = await this.prisma.pendingMerge.findUnique({
where: { id: mergeId },
});
if (!merge) {
return;
}
const token = this.configService.get<string>("GITEA_API_TOKEN");
if (!token) {
await this.rejectMerge(merge.id, "GITEA_API_TOKEN is not configured");
return;
}
await this.prisma.pendingMerge.update({
where: { id: merge.id },
data: { state: "merging" },
});
const mergeUrl =
merge.giteaMergeUrl ??
`${this.giteaApiBaseUrl}/repos/${merge.repo}/pulls/${String(merge.prNumber)}`;
const response = await fetch(`${mergeUrl}/merge`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `token ${token}`,
},
body: JSON.stringify({
Do: "merge",
force_merge: true,
merge_message_field: "Auto-merged by Gatekeeper",
}),
});
if (!response.ok) {
const reason = await response.text();
await this.rejectMerge(
merge.id,
`Gitea merge API rejected the request: ${String(response.status)} ${reason}`
);
return;
}
await this.prisma.pendingMerge.update({
where: { id: merge.id },
data: { state: "merged" },
});
}
async rejectMerge(mergeId: string, reason: string): Promise<void> {
const merge = await this.prisma.pendingMerge.findUnique({
where: { id: mergeId },
});
if (!merge) {
return;
}
await this.prisma.pendingMerge.update({
where: { id: merge.id },
data: {
state: "rejected",
reviewResult: {
passed: false,
issues: [reason],
},
},
});
await this.postPullRequestComment(
merge.repo,
merge.prNumber,
`Gatekeeper rejected auto-merge for \`${merge.headSha}\`: ${reason}`
);
}
private async runReview(mergeId: string, payload: GiteaPrWebhookDto): Promise<void> {
await this.prisma.pendingMerge.update({
where: { id: mergeId },
data: { state: "reviewing" },
});
const result = await this.reviewPr(payload);
if (!result.passed) {
await this.rejectMerge(mergeId, result.issues.join("; "));
return;
}
const reviewResult: Prisma.InputJsonValue = {
passed: result.passed,
issues: result.issues,
};
await this.prisma.pendingMerge.update({
where: { id: mergeId },
data: {
state: "awaiting_ci",
reviewResult,
},
});
}
private hasAutoMergeLabel(payload: GiteaPrWebhookDto): boolean {
return payload.pull_request.labels.some((label) => label.name === "auto-merge");
}
private async postPullRequestComment(
repo: string,
prNumber: number,
body: string
): Promise<void> {
const token = this.configService.get<string>("GITEA_API_TOKEN");
if (!token) {
this.logger.warn(
`Skipping PR comment for ${repo}#${String(prNumber)}; GITEA_API_TOKEN is missing`
);
return;
}
const response = await fetch(
`${this.giteaApiBaseUrl}/repos/${repo}/issues/${String(prNumber)}/comments`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `token ${token}`,
},
body: JSON.stringify({ body }),
}
);
if (!response.ok) {
this.logger.warn(
`Failed to post Gatekeeper PR comment for ${repo}#${String(prNumber)}: ${String(response.status)}`
);
}
}
private isEnabled(): boolean {
const raw = this.configService.get<string>("GATEKEEPER_ENABLED");
const enabled = raw !== "false";
if (!enabled) {
this.logger.warn("Gatekeeper is disabled via GATEKEEPER_ENABLED");
}
return enabled;
}
}

View File

@@ -0,0 +1,286 @@
import {
Body,
Controller,
Get,
Logger,
Param,
Post,
Req,
Res,
ServiceUnavailableException,
UseGuards,
} from "@nestjs/common";
import type { Request, Response } from "express";
import { AuthGuard } from "../auth/guards/auth.guard";
const ORCHESTRATOR_URL_KEY = "ORCHESTRATOR_URL";
const ORCHESTRATOR_API_KEY = "ORCHESTRATOR_API_KEY";
@Controller("mission-control")
@UseGuards(AuthGuard)
export class MissionControlProxyController {
private readonly logger = new Logger(MissionControlProxyController.name);
private readonly orchestratorUrl: string;
private readonly orchestratorApiKey: string;
constructor() {
this.orchestratorUrl = this.requireEnv(ORCHESTRATOR_URL_KEY);
this.orchestratorApiKey = this.requireEnv(ORCHESTRATOR_API_KEY);
}
@Get("sessions")
proxySessions(@Req() req: Request, @Res() res: Response): Promise<void> {
return this.proxyRequest("GET", "sessions", req, res);
}
@Get("sessions/:sessionId")
proxySession(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("GET", `sessions/${sessionId}`, req, res);
}
@Get("sessions/:sessionId/messages")
proxyMessages(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("GET", `sessions/${sessionId}/messages`, req, res);
}
@Get("audit-log")
proxyAuditLog(@Req() req: Request, @Res() res: Response): Promise<void> {
return this.proxyRequest("GET", "audit-log", req, res);
}
@Post("sessions/:sessionId/inject")
proxyInject(
@Param("sessionId") sessionId: string,
@Body() body: unknown,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/inject`, req, res, body);
}
@Post("sessions/:sessionId/pause")
proxyPause(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/pause`, req, res);
}
@Post("sessions/:sessionId/resume")
proxyResume(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/resume`, req, res);
}
@Post("sessions/:sessionId/kill")
proxyKill(
@Param("sessionId") sessionId: string,
@Body() body: unknown,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
return this.proxyRequest("POST", `sessions/${sessionId}/kill`, req, res, body);
}
@Get("sessions/:sessionId/stream")
async proxySessionStream(
@Param("sessionId") sessionId: string,
@Req() req: Request,
@Res() res: Response
): Promise<void> {
const abortController = new AbortController();
req.once("close", () => {
abortController.abort();
});
try {
const upstream = await this.fetchUpstream(
"GET",
`sessions/${sessionId}/stream`,
req.query,
undefined,
abortController.signal
);
if (!upstream.ok || !upstream.body) {
await this.sendStandardResponse(upstream, res);
return;
}
res.status(upstream.status);
this.copyHeaderIfPresent(upstream, res, "content-type");
this.copyHeaderIfPresent(upstream, res, "cache-control");
this.copyHeaderIfPresent(upstream, res, "connection");
res.setHeader("X-Accel-Buffering", "no");
if (typeof res.flushHeaders === "function") {
res.flushHeaders();
}
for await (const chunk of upstream.body as unknown as AsyncIterable<Uint8Array>) {
if (res.writableEnded || res.destroyed) {
break;
}
res.write(Buffer.from(chunk));
}
if (!res.writableEnded && !res.destroyed) {
res.end();
}
} catch (error: unknown) {
if (this.isAbortError(error)) {
return;
}
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Mission Control stream proxy request failed: ${message}`);
if (!res.headersSent) {
res.status(503).json({ message: "Failed to proxy Mission Control request" });
} else if (!res.writableEnded && !res.destroyed) {
res.end();
}
}
}
private async proxyRequest(
method: "GET" | "POST",
path: string,
req: Request,
res: Response,
body?: unknown
): Promise<void> {
const abortController = new AbortController();
req.once("close", () => {
abortController.abort();
});
try {
const upstream = await this.fetchUpstream(
method,
path,
req.query,
body,
abortController.signal
);
await this.sendStandardResponse(upstream, res);
} catch (error: unknown) {
if (this.isAbortError(error)) {
return;
}
this.handleProxyError(error);
}
}
private async fetchUpstream(
method: "GET" | "POST",
path: string,
query: Request["query"],
body: unknown,
signal: AbortSignal
): Promise<globalThis.Response> {
const url = new URL(`/api/mission-control/${path}`, this.orchestratorUrl);
this.appendQueryParams(url.searchParams, query);
const headers: Record<string, string> = {
"X-API-Key": this.orchestratorApiKey,
};
const requestInit: RequestInit = {
method,
headers,
signal,
};
if (method === "POST" && body !== undefined) {
headers["Content-Type"] = "application/json";
requestInit.body = JSON.stringify(body);
}
return fetch(url.toString(), requestInit);
}
private appendQueryParams(searchParams: URLSearchParams, query: Request["query"]): void {
for (const [key, value] of Object.entries(query)) {
this.appendQueryValue(searchParams, key, value);
}
}
private appendQueryValue(searchParams: URLSearchParams, key: string, value: unknown): void {
if (value === undefined || value === null) {
return;
}
if (Array.isArray(value)) {
for (const item of value) {
this.appendQueryValue(searchParams, key, item);
}
return;
}
if (typeof value === "string" || typeof value === "number" || typeof value === "boolean") {
searchParams.append(key, String(value));
}
}
private async sendStandardResponse(upstream: globalThis.Response, res: Response): Promise<void> {
res.status(upstream.status);
this.copyHeaderIfPresent(upstream, res, "content-type");
this.copyHeaderIfPresent(upstream, res, "cache-control");
this.copyHeaderIfPresent(upstream, res, "location");
const responseText = await upstream.text();
if (responseText.length === 0) {
res.end();
return;
}
res.send(responseText);
}
private copyHeaderIfPresent(
upstream: globalThis.Response,
res: Response,
headerName: string
): void {
const value = upstream.headers.get(headerName);
if (value) {
res.setHeader(headerName, value);
}
}
private handleProxyError(error: unknown): never {
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Mission Control proxy request failed: ${message}`);
throw new ServiceUnavailableException("Failed to proxy Mission Control request");
}
private isAbortError(error: unknown): boolean {
return error instanceof Error && error.name === "AbortError";
}
private requireEnv(key: string): string {
const value = process.env[key];
if (typeof value !== "string" || value.trim().length === 0) {
throw new Error(`@mosaic/api: ${key} is required. Set it in your config or via ${key}.`);
}
return value;
}
}

View File

@@ -0,0 +1,9 @@
import { Module } from "@nestjs/common";
import { AuthModule } from "../auth/auth.module";
import { MissionControlProxyController } from "./mission-control-proxy.controller";
@Module({
imports: [AuthModule],
controllers: [MissionControlProxyController],
})
export class MissionControlProxyModule {}

View File

@@ -0,0 +1,120 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { NotFoundException } from "@nestjs/common";
import type { Response } from "express";
import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import { QueueNotificationsController } from "./queue-notifications.controller";
import { QueueNotificationsService } from "./queue-notifications.service";
import { ApiKeyGuard } from "../common/guards/api-key.guard";
describe("QueueNotificationsController", () => {
let controller: QueueNotificationsController;
const mockService = {
listNotifications: vi.fn(),
streamNotifications: vi.fn(),
ackNotification: vi.fn(),
listTasks: vi.fn(),
};
const mockConfigService = {
get: vi.fn().mockReturnValue("coordinator-api-key"),
};
beforeEach(async () => {
vi.clearAllMocks();
const module: TestingModule = await Test.createTestingModule({
controllers: [QueueNotificationsController],
providers: [
{ provide: QueueNotificationsService, useValue: mockService },
{ provide: ConfigService, useValue: mockConfigService },
],
})
.overrideGuard(ApiKeyGuard)
.useValue({ canActivate: () => true })
.compile();
controller = module.get<QueueNotificationsController>(QueueNotificationsController);
});
it("returns notification objects", async () => {
mockService.listNotifications.mockResolvedValue([
{
id: "notif-1",
agent: "mosaic",
filename: "notif-1.json",
payload: { type: "task.ready" },
createdAt: new Date("2026-03-08T22:00:00.000Z"),
},
]);
await expect(controller.getNotifications()).resolves.toEqual([
expect.objectContaining({
id: "notif-1",
agent: "mosaic",
filename: "notif-1.json",
payload: { type: "task.ready" },
}),
]);
});
it("streams notifications through the response object", async () => {
const res = {
setHeader: vi.fn(),
flushHeaders: vi.fn(),
write: vi.fn(),
on: vi.fn(),
end: vi.fn(),
} as unknown as Response;
mockService.streamNotifications.mockResolvedValue(undefined);
await controller.streamNotifications(res);
expect(mockService.streamNotifications).toHaveBeenCalledWith(res);
});
it("acks a notification by id", async () => {
mockService.ackNotification.mockResolvedValue({ success: true, id: "notif-2" });
await expect(controller.ackNotification("notif-2")).resolves.toEqual({
success: true,
id: "notif-2",
});
});
it("surfaces ack errors", async () => {
mockService.ackNotification.mockRejectedValue(new NotFoundException("missing"));
await expect(controller.ackNotification("missing")).rejects.toThrow(NotFoundException);
});
it("returns parsed queue tasks", async () => {
mockService.listTasks.mockResolvedValue([
{
id: "task-1",
project: "mosaic-stack",
taskId: "MS24-API-001",
status: "pending",
description: "Build queue notifications module",
},
]);
await expect(controller.getTasks()).resolves.toEqual([
{
id: "task-1",
project: "mosaic-stack",
taskId: "MS24-API-001",
status: "pending",
description: "Build queue notifications module",
},
]);
});
it("uses ApiKeyGuard at the controller level", () => {
const guards = Reflect.getMetadata("__guards__", QueueNotificationsController) as unknown[];
expect(guards).toContain(ApiKeyGuard);
});
});

View File

@@ -0,0 +1,36 @@
import { Controller, Get, Param, Post, Res, UseGuards } from "@nestjs/common";
import type { Response } from "express";
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
import { ApiKeyGuard } from "../common/guards/api-key.guard";
import {
QueueNotificationsService,
type QueueNotification,
type QueueTask,
} from "./queue-notifications.service";
@Controller("queue")
@UseGuards(ApiKeyGuard)
export class QueueNotificationsController {
constructor(private readonly queueNotificationsService: QueueNotificationsService) {}
@Get("notifications")
async getNotifications(): Promise<QueueNotification[]> {
return this.queueNotificationsService.listNotifications();
}
@Get("notifications/stream")
async streamNotifications(@Res() res: Response): Promise<void> {
await this.queueNotificationsService.streamNotifications(res);
}
@SkipCsrf()
@Post("notifications/:id/ack")
async ackNotification(@Param("id") id: string): Promise<{ success: true; id: string }> {
return this.queueNotificationsService.ackNotification(id);
}
@Get("tasks")
async getTasks(): Promise<QueueTask[]> {
return this.queueNotificationsService.listTasks();
}
}

View File

@@ -0,0 +1,16 @@
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { AuthModule } from "../auth/auth.module";
import { ApiKeyGuard } from "../common/guards/api-key.guard";
import { GatekeeperModule } from "../gatekeeper/gatekeeper.module";
import { QueueNotificationsController } from "./queue-notifications.controller";
import { QueueNotificationsService } from "./queue-notifications.service";
import { WoodpeckerWebhookController } from "./woodpecker-webhook.controller";
@Module({
imports: [ConfigModule, AuthModule, GatekeeperModule],
controllers: [QueueNotificationsController, WoodpeckerWebhookController],
providers: [QueueNotificationsService, ApiKeyGuard],
exports: [QueueNotificationsService],
})
export class QueueNotificationsModule {}

View File

@@ -0,0 +1,268 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { ConfigService } from "@nestjs/config";
import { Logger, NotFoundException } from "@nestjs/common";
import { mkdtemp, mkdir, readFile, readdir, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { execFile } from "node:child_process";
import { QueueNotificationsService } from "./queue-notifications.service";
vi.mock("node:child_process", () => ({
execFile: vi.fn(),
}));
describe("QueueNotificationsService", () => {
let service: QueueNotificationsService;
let inboxDir: string;
let agentStateDir: string;
let configService: ConfigService;
beforeEach(async () => {
vi.clearAllMocks();
inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-"));
agentStateDir = await mkdtemp(join(tmpdir(), "agent-state-"));
configService = {
get: vi.fn((key: string) => {
if (key === "MOSAIC_QUEUE_INBOX_DIR") {
return inboxDir;
}
if (key === "MOSAIC_AGENT_STATE_DIR") {
return agentStateDir;
}
if (key === "MOSAIC_QUEUE_CLI") {
return "/tmp/mosaic-queue-cli.js";
}
return undefined;
}),
} as unknown as ConfigService;
service = new QueueNotificationsService(configService);
});
afterEach(async () => {
vi.restoreAllMocks();
await rm(inboxDir, { recursive: true, force: true });
await rm(agentStateDir, { recursive: true, force: true });
});
describe("onModuleInit", () => {
it("logs a warning when the inbox directory does not exist", async () => {
await rm(inboxDir, { recursive: true, force: true });
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
await service.onModuleInit();
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("Queue notifications inbox directory does not exist")
);
});
});
describe("listNotifications", () => {
it("returns parsed notifications from agent inbox directories", async () => {
await mkdir(join(inboxDir, "mosaic"), { recursive: true });
await mkdir(join(inboxDir, "mosaic", "_acked"), { recursive: true });
await mkdir(join(inboxDir, "sage"), { recursive: true });
await writeFile(
join(inboxDir, "mosaic", "notif-1.json"),
JSON.stringify({ type: "task.ready", taskId: "MS24-API-001" })
);
await writeFile(
join(inboxDir, "mosaic", "_acked", "notif-ignored.json"),
JSON.stringify({ ignored: true })
);
await writeFile(join(inboxDir, "sage", "notif-2.json"), JSON.stringify({ type: "done" }));
const notifications = await service.listNotifications();
expect(notifications).toHaveLength(2);
expect(notifications).toEqual(
expect.arrayContaining([
expect.objectContaining({
id: "notif-1",
agent: "mosaic",
filename: "notif-1.json",
payload: { type: "task.ready", taskId: "MS24-API-001" },
}),
expect.objectContaining({
id: "notif-2",
agent: "sage",
filename: "notif-2.json",
payload: { type: "done" },
}),
])
);
});
it("returns an empty array when the inbox directory is missing", async () => {
await rm(inboxDir, { recursive: true, force: true });
await expect(service.listNotifications()).resolves.toEqual([]);
});
});
describe("ackNotification", () => {
it("executes the queue CLI with node and ack args", async () => {
await mkdir(join(inboxDir, "mosaic"), { recursive: true });
await writeFile(join(inboxDir, "mosaic", "notif-3.json"), JSON.stringify({ ok: true }));
vi.mocked(execFile).mockImplementation(
(
_command: string,
_args: readonly string[],
callback: (error: Error | null, stdout: string, stderr: string) => void
) => callback(null, "acked", "")
);
await expect(service.ackNotification("notif-3")).resolves.toEqual({
success: true,
id: "notif-3",
});
expect(execFile).toHaveBeenCalledWith(
"node",
["/tmp/mosaic-queue-cli.js", "ack", "notif-3"],
expect.any(Function)
);
});
it("throws NotFoundException when the notification does not exist", async () => {
await expect(service.ackNotification("missing")).rejects.toThrow(NotFoundException);
expect(execFile).not.toHaveBeenCalled();
});
});
describe("listTasks", () => {
it("parses tab-separated CLI output", async () => {
vi.mocked(execFile).mockImplementation(
(
_command: string,
_args: readonly string[],
callback: (error: Error | null, stdout: string, stderr: string) => void
) =>
callback(
null,
[
"task-1\tmosaic-stack/MS24-API-001\t[pending]\tBuild queue notifications module",
"task-2\tmosaic-stack/MS24-API-002\t[done]\tWrite tests",
].join("\n"),
""
)
);
await expect(service.listTasks()).resolves.toEqual([
{
id: "task-1",
project: "mosaic-stack",
taskId: "MS24-API-001",
status: "pending",
description: "Build queue notifications module",
},
{
id: "task-2",
project: "mosaic-stack",
taskId: "MS24-API-002",
status: "done",
description: "Write tests",
},
]);
expect(execFile).toHaveBeenCalledWith(
"node",
["/tmp/mosaic-queue-cli.js", "list", "mosaic-stack"],
expect.any(Function)
);
});
});
describe("notifyAgentCiResult", () => {
it("writes one notification per active agent-state record matching the branch", async () => {
await mkdir(join(agentStateDir, "active"), { recursive: true });
await writeFile(
join(agentStateDir, "active", "sage-landing-page.json"),
JSON.stringify({
taskId: "sage-landing-page",
status: "spawned",
startedAt: "2026-03-08T22:47:20Z",
lastUpdated: "2026-03-08T23:15:11.726Z",
agent: "sage",
branch: "feature/landing-page",
description: "Create apps/landing marketing site",
})
);
await writeFile(
join(agentStateDir, "active", "pixels-other.json"),
JSON.stringify({
taskId: "pixels-other",
status: "spawned",
startedAt: "2026-03-08T22:47:20Z",
lastUpdated: "2026-03-08T23:15:11.726Z",
agent: "pixels",
branch: "feature/something-else",
description: "Unrelated",
})
);
await expect(
service.notifyAgentCiResult({
branch: "feature/landing-page",
status: "success",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
})
).resolves.toEqual({ notified: 1 });
const inboxFiles = await readdir(join(inboxDir, "sage"));
expect(inboxFiles).toHaveLength(1);
const notification = JSON.parse(
await readFile(join(inboxDir, "sage", inboxFiles[0]!), "utf8")
) as Record<string, unknown>;
expect(notification).toMatchObject({
taskId: "sage-landing-page",
event: "completed",
targetAgent: "sage",
fromAgent: "mosaic-api",
retries: 0,
maxRetries: 3,
ttlSeconds: 600,
payload: {
branch: "feature/landing-page",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
ciStatus: "success",
},
});
expect(typeof notification.id).toBe("string");
expect(typeof notification.createdAt).toBe("string");
});
it("returns zero when no active agent-state branch matches the webhook payload", async () => {
await mkdir(join(agentStateDir, "active"), { recursive: true });
await writeFile(
join(agentStateDir, "active", "mosaic-task.json"),
JSON.stringify({
taskId: "mosaic-task",
status: "spawned",
startedAt: "2026-03-08T22:47:20Z",
lastUpdated: "2026-03-08T23:15:11.726Z",
agent: "mosaic",
branch: "feature/not-this-one",
description: "Unrelated",
})
);
await expect(
service.notifyAgentCiResult({
branch: "feature/landing-page",
status: "failure",
buildUrl: "https://ci.example/build/456",
repo: "mosaic/stack",
})
).resolves.toEqual({ notified: 0 });
});
});
});

View File

@@ -0,0 +1,341 @@
import {
Injectable,
InternalServerErrorException,
Logger,
NotFoundException,
OnModuleInit,
} from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { randomUUID } from "node:crypto";
import { execFile } from "node:child_process";
import { access, mkdir, readdir, readFile, stat, writeFile } from "node:fs/promises";
import { homedir } from "node:os";
import { basename, join } from "node:path";
import type { Response } from "express";
import chokidar from "chokidar";
export interface QueueNotification {
id: string;
agent: string;
filename: string;
payload: unknown;
createdAt: Date;
}
export interface QueueTask {
id: string;
project: string;
taskId: string;
status: string;
description: string;
}
interface AgentStateRecord {
taskId: string;
agent: string;
branch: string;
}
interface AgentCiNotification {
id: string;
taskId: string;
event: "completed" | "failed";
targetAgent: string;
fromAgent: string;
payload: {
branch: string;
buildUrl: string;
repo: string;
ciStatus: "success" | "failure";
};
createdAt: string;
retries: number;
maxRetries: number;
ttlSeconds: number;
}
@Injectable()
export class QueueNotificationsService implements OnModuleInit {
private readonly logger = new Logger(QueueNotificationsService.name);
constructor(private readonly configService: ConfigService) {}
async onModuleInit(): Promise<void> {
if (!(await this.inboxDirExists())) {
this.logger.warn(`Queue notifications inbox directory does not exist: ${this.getInboxDir()}`);
}
}
async listNotifications(): Promise<QueueNotification[]> {
const inboxDir = this.getInboxDir();
if (!(await this.inboxDirExists())) {
return [];
}
// Paths come from controlled config plus directory entries under the inbox root.
// eslint-disable-next-line security/detect-non-literal-fs-filename
const agentEntries = await readdir(inboxDir, { withFileTypes: true });
const notifications: QueueNotification[] = [];
for (const agentEntry of agentEntries) {
if (!agentEntry.isDirectory() || this.isIgnoredDirectory(agentEntry.name)) {
continue;
}
const agentDir = join(inboxDir, agentEntry.name);
// eslint-disable-next-line security/detect-non-literal-fs-filename
const files = await readdir(agentDir, { withFileTypes: true });
for (const fileEntry of files) {
if (!fileEntry.isFile() || !fileEntry.name.endsWith(".json")) {
continue;
}
const filePath = join(agentDir, fileEntry.name);
const [rawPayload, fileStats] = await Promise.all([
// eslint-disable-next-line security/detect-non-literal-fs-filename
readFile(filePath, "utf8"),
// eslint-disable-next-line security/detect-non-literal-fs-filename
stat(filePath),
]);
notifications.push({
id: basename(fileEntry.name, ".json"),
agent: agentEntry.name,
filename: fileEntry.name,
payload: JSON.parse(rawPayload) as unknown,
createdAt: fileStats.birthtime,
});
}
}
return notifications.sort(
(left, right) => right.createdAt.getTime() - left.createdAt.getTime()
);
}
async streamNotifications(res: Response): Promise<void> {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
if (typeof res.flushHeaders === "function") {
res.flushHeaders();
}
const emitNotifications = async (): Promise<void> => {
try {
const notifications = await this.listNotifications();
res.write(`data: ${JSON.stringify(notifications)}\n\n`);
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
res.write(`event: error\n`);
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
}
};
await emitNotifications();
const watcher = chokidar.watch(this.getInboxDir(), {
ignoreInitial: true,
persistent: true,
ignored: (watchedPath: string) => {
return watchedPath.includes("/_acked/") || watchedPath.includes("/_dead-letter/");
},
});
watcher.on("add", () => {
void emitNotifications();
});
watcher.on("unlink", () => {
void emitNotifications();
});
res.on("close", () => {
void watcher.close();
res.end();
});
}
async ackNotification(id: string): Promise<{ success: true; id: string }> {
const notification = (await this.listNotifications()).find((entry) => entry.id === id);
if (!notification) {
throw new NotFoundException(`Queue notification ${id} not found`);
}
await this.execQueueCli(["ack", notification.id]);
return {
success: true,
id: notification.id,
};
}
async listTasks(): Promise<QueueTask[]> {
const stdout = await this.execQueueCli(["list", "mosaic-stack"]);
return stdout
.split(/\r?\n/)
.map((line) => line.trim())
.filter((line) => line.length > 0)
.map((line) => {
const [rawId = "", projectTaskId = "", rawStatus = "", description = ""] = line.split("\t");
const [project = "", taskId = ""] = projectTaskId.split("/");
return {
id: rawId,
project,
taskId,
status: rawStatus.replace(/^\[/, "").replace(/\]$/, ""),
description,
};
});
}
async notifyAgentCiResult(opts: {
branch: string;
status: "success" | "failure";
buildUrl: string;
repo: string;
}): Promise<{ notified: number }> {
const activeRecords = await this.listActiveAgentStateRecords();
const matches = activeRecords.filter((record) => record.branch === opts.branch);
await Promise.all(
matches.map(async (record) => {
const notification: AgentCiNotification = {
id: randomUUID(),
taskId: record.taskId,
event: opts.status === "success" ? "completed" : "failed",
targetAgent: record.agent,
fromAgent: "mosaic-api",
payload: {
branch: opts.branch,
buildUrl: opts.buildUrl,
repo: opts.repo,
ciStatus: opts.status,
},
createdAt: new Date().toISOString(),
retries: 0,
maxRetries: 3,
ttlSeconds: 600,
};
const agentDir = join(this.getInboxDir(), record.agent);
// Path is built from the configured inbox root plus validated agent-state entries.
// eslint-disable-next-line security/detect-non-literal-fs-filename
await mkdir(agentDir, { recursive: true });
// Path is built from the configured inbox root plus validated agent-state entries.
// eslint-disable-next-line security/detect-non-literal-fs-filename
await writeFile(
join(agentDir, `${notification.id}.json`),
JSON.stringify(notification, null, 2),
"utf8"
);
})
);
return { notified: matches.length };
}
private async execQueueCli(args: string[]): Promise<string> {
const cliPath = this.getQueueCliPath();
return new Promise<string>((resolve, reject) => {
execFile("node", [cliPath, ...args], (error, stdout, stderr) => {
if (error) {
this.logger.error(
`Queue CLI command failed: node ${cliPath} ${args.join(" ")} | ${stderr || error.message}`
);
reject(
new InternalServerErrorException(`Queue CLI command failed: ${stderr || error.message}`)
);
return;
}
resolve(stdout);
});
});
}
private getInboxDir(): string {
return this.expandHomePath(
this.configService.get<string>("MOSAIC_QUEUE_INBOX_DIR") ??
"~/.openclaw/workspace/agent-inbox"
);
}
private getAgentStateDir(): string {
return this.expandHomePath(
this.configService.get<string>("MOSAIC_AGENT_STATE_DIR") ?? "~/.openclaw/workspace/agents"
);
}
private getQueueCliPath(): string {
return this.expandHomePath(
this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js"
);
}
private expandHomePath(value: string): string {
if (value === "~") {
return homedir();
}
if (value.startsWith("~/")) {
return join(homedir(), value.slice(2));
}
return value;
}
private async inboxDirExists(): Promise<boolean> {
try {
await access(this.getInboxDir());
return true;
} catch {
return false;
}
}
private isIgnoredDirectory(name: string): boolean {
return name === "_acked" || name === "_dead-letter";
}
private async listActiveAgentStateRecords(): Promise<AgentStateRecord[]> {
const activeDir = join(this.getAgentStateDir(), "active");
try {
// Path comes from controlled config and a fixed "active" subdirectory.
// eslint-disable-next-line security/detect-non-literal-fs-filename
const entries = await readdir(activeDir, { withFileTypes: true });
const records = await Promise.all(
entries
.filter((entry) => entry.isFile() && entry.name.endsWith(".json"))
.map(async (entry) => {
const filePath = join(activeDir, entry.name);
// Path comes from controlled config plus directory entries under the active root.
// eslint-disable-next-line security/detect-non-literal-fs-filename
const rawRecord = await readFile(filePath, "utf8");
return JSON.parse(rawRecord) as AgentStateRecord;
})
);
return records.filter((record) => {
return (
typeof record.taskId === "string" &&
typeof record.agent === "string" &&
typeof record.branch === "string"
);
});
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
this.logger.warn(`Unable to read active agent-state records from ${activeDir}: ${message}`);
return [];
}
}
}

View File

@@ -0,0 +1,163 @@
import { createHmac } from "node:crypto";
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import type { Request } from "express";
import { describe, beforeEach, expect, it, vi } from "vitest";
import { GatekeeperService } from "../gatekeeper/gatekeeper.service";
import { QueueNotificationsService } from "./queue-notifications.service";
import {
type WoodpeckerWebhookPayload,
WoodpeckerWebhookController,
} from "./woodpecker-webhook.controller";
function signPayload(payload: WoodpeckerWebhookPayload, secret: string): string {
return createHmac("sha256", secret).update(JSON.stringify(payload)).digest("hex");
}
describe("WoodpeckerWebhookController", () => {
let controller: WoodpeckerWebhookController;
const mockService = {
notifyAgentCiResult: vi.fn(),
};
const mockGatekeeperService = {
handleCiEvent: vi.fn(),
};
const mockConfigService = {
get: vi.fn(),
};
beforeEach(async () => {
vi.clearAllMocks();
mockConfigService.get.mockImplementation((key: string) => {
if (key === "WOODPECKER_WEBHOOK_SECRET") {
return "test-secret";
}
return undefined;
});
const module: TestingModule = await Test.createTestingModule({
controllers: [WoodpeckerWebhookController],
providers: [
{ provide: QueueNotificationsService, useValue: mockService },
{ provide: GatekeeperService, useValue: mockGatekeeperService },
{ provide: ConfigService, useValue: mockConfigService },
],
}).compile();
controller = module.get<WoodpeckerWebhookController>(WoodpeckerWebhookController);
});
it("accepts a valid signature and forwards the payload to the service", async () => {
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
prNumber: 42,
headSha: "abcdef1234567890",
};
const signature = signPayload(payload, "test-secret");
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 2 });
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
signature
)
).resolves.toEqual({ ok: true, notified: 2 });
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
expect(mockGatekeeperService.handleCiEvent).toHaveBeenCalledWith(
"mosaic/stack",
42,
"abcdef1234567890",
"success"
);
});
it("returns ok without notifying when the signature is invalid", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "failure",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
};
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
"bad-signature"
)
).resolves.toEqual({ ok: true, notified: 0 });
expect(mockService.notifyAgentCiResult).not.toHaveBeenCalled();
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("invalid Woodpecker webhook signature")
);
});
it("accepts the payload when the webhook secret is missing", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
};
mockConfigService.get.mockReturnValue(undefined);
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 1 });
await expect(controller.handleWebhook({} as Request, payload, "")).resolves.toEqual({
ok: true,
notified: 1,
});
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("WOODPECKER_WEBHOOK_SECRET is not configured")
);
});
it("returns zero notifications when no active branch matches", async () => {
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/999",
repo: "mosaic/stack",
};
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 0 });
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
signPayload(payload, "test-secret")
)
).resolves.toEqual({ ok: true, notified: 0 });
});
it("does not call Gatekeeper when the PR metadata is missing", async () => {
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/555",
repo: "mosaic/stack",
};
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 1 });
await controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
signPayload(payload, "test-secret")
);
expect(mockGatekeeperService.handleCiEvent).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,90 @@
import { Body, Controller, Headers, Logger, Post, Req, type RawBodyRequest } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { createHmac, timingSafeEqual } from "node:crypto";
import type { Request } from "express";
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
import { GatekeeperService } from "../gatekeeper/gatekeeper.service";
import { QueueNotificationsService } from "./queue-notifications.service";
export interface WoodpeckerWebhookPayload {
branch: string;
status: "success" | "failure";
buildUrl: string;
repo: string;
prNumber?: number;
headSha?: string;
}
@Controller("webhooks")
export class WoodpeckerWebhookController {
private readonly logger = new Logger(WoodpeckerWebhookController.name);
constructor(
private readonly queueService: QueueNotificationsService,
private readonly gatekeeperService: GatekeeperService,
private readonly configService: ConfigService
) {}
@SkipCsrf()
@Post("woodpecker")
async handleWebhook(
@Req() req: RawBodyRequest<Request>,
@Body() body: WoodpeckerWebhookPayload,
@Headers("x-woodpecker-signature") signature: string
): Promise<{ ok: boolean; notified: number }> {
const secret = this.configService.get<string>("WOODPECKER_WEBHOOK_SECRET");
if (!secret) {
this.logger.warn("WOODPECKER_WEBHOOK_SECRET is not configured; accepting Woodpecker webhook");
const result = await this.queueService.notifyAgentCiResult(body);
await this.forwardCiStatusToGatekeeper(body);
return { ok: true, notified: result.notified };
}
if (!this.isValidSignature(this.getRequestBody(req, body), signature, secret)) {
this.logger.warn("Received invalid Woodpecker webhook signature");
return { ok: true, notified: 0 };
}
const result = await this.queueService.notifyAgentCiResult(body);
await this.forwardCiStatusToGatekeeper(body);
return { ok: true, notified: result.notified };
}
private async forwardCiStatusToGatekeeper(body: WoodpeckerWebhookPayload): Promise<void> {
if (typeof body.prNumber === "number" && typeof body.headSha === "string") {
await this.gatekeeperService.handleCiEvent(
body.repo,
body.prNumber,
body.headSha,
body.status
);
}
}
private getRequestBody(req: RawBodyRequest<Request>, body: WoodpeckerWebhookPayload): Buffer {
const rawBody = req.rawBody;
if (Buffer.isBuffer(rawBody)) {
return rawBody;
}
return Buffer.from(JSON.stringify(body));
}
private isValidSignature(body: Buffer, signature: string | undefined, secret: string): boolean {
if (!signature) {
return false;
}
const expected = createHmac("sha256", secret).update(body).digest("hex");
const signatureBuffer = Buffer.from(signature);
const expectedBuffer = Buffer.from(expected);
if (signatureBuffer.length !== expectedBuffer.length) {
return false;
}
return timingSafeEqual(signatureBuffer, expectedBuffer);
}
}

View File

@@ -37,7 +37,7 @@
"ioredis": "^5.9.2", "ioredis": "^5.9.2",
"reflect-metadata": "^0.2.2", "reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1", "rxjs": "^7.8.1",
"simple-git": "^3.27.0", "simple-git": "^3.32.3",
"zod": "^3.24.1" "zod": "^3.24.1"
}, },
"devDependencies": { "devDependencies": {

View File

@@ -0,0 +1,93 @@
import { type NextRequest, NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
/**
* Generic catch-all proxy for orchestrator API routes.
*
* Forwards any request to /api/orchestrator/<path> → ORCHESTRATOR_URL/<path>
* with the ORCHESTRATOR_API_KEY injected server-side so it never reaches the browser.
*
* Supports GET, POST, PATCH, DELETE, PUT.
*
* Example:
* GET /api/orchestrator/mission-control/sessions
* → GET ORCHESTRATOR_URL/api/mission-control/sessions
* POST /api/orchestrator/mission-control/sessions/abc/kill
* → POST ORCHESTRATOR_URL/api/mission-control/sessions/abc/kill
*/
async function proxyToOrchestrator(
request: NextRequest,
context: { params: Promise<{ path: string[] }> }
): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
const { path } = await context.params;
const upstreamPath = `/${path.join("/")}`;
const search = request.nextUrl.search;
const upstreamUrl = `${getOrchestratorUrl()}${upstreamPath}${search}`;
const controller = new AbortController();
const timeout = setTimeout(() => {
controller.abort();
}, 30_000);
try {
const headers: Record<string, string> = {
"X-API-Key": orchestratorApiKey,
};
const contentType = request.headers.get("Content-Type");
if (contentType) {
headers["Content-Type"] = contentType;
}
const hasBody = request.method !== "GET" && request.method !== "HEAD";
const body = hasBody ? await request.text() : null;
const upstream = await fetch(upstreamUrl, {
method: request.method,
headers,
...(body !== null ? { body } : {}),
cache: "no-store",
signal: controller.signal,
});
const responseText = await upstream.text();
return new NextResponse(responseText, {
status: upstream.status,
headers: {
"Content-Type": upstream.headers.get("Content-Type") ?? "application/json",
},
});
} catch (error) {
const message =
error instanceof Error && error.name === "AbortError"
? "Orchestrator request timed out."
: "Unable to reach orchestrator.";
return NextResponse.json({ error: message }, { status: 502 });
} finally {
clearTimeout(timeout);
}
}
export const GET = proxyToOrchestrator;
export const POST = proxyToOrchestrator;
export const PATCH = proxyToOrchestrator;
export const PUT = proxyToOrchestrator;
export const DELETE = proxyToOrchestrator;

View File

@@ -0,0 +1,57 @@
import { NextResponse } from "next/server";
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
function getOrchestratorUrl(): string {
return (
process.env.ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
process.env.NEXT_PUBLIC_API_URL ??
DEFAULT_ORCHESTRATOR_URL
);
}
export const dynamic = "force-dynamic";
export async function GET(): Promise<NextResponse> {
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
if (!orchestratorApiKey) {
return NextResponse.json(
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
{ status: 503 }
);
}
try {
const upstream = await fetch(`${getOrchestratorUrl()}/api/queue/notifications/stream`, {
method: "GET",
headers: {
"X-API-Key": orchestratorApiKey,
Accept: "text/event-stream",
},
cache: "no-store",
});
if (!upstream.ok || upstream.body === null) {
const message = await upstream.text();
return new NextResponse(message || "Failed to connect to queue notifications stream", {
status: upstream.status || 502,
headers: {
"Content-Type": upstream.headers.get("Content-Type") ?? "text/plain; charset=utf-8",
},
});
}
return new NextResponse(upstream.body, {
status: upstream.status,
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
});
} catch {
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
}
}

View File

@@ -4,6 +4,7 @@ import { Outfit, Fira_Code } from "next/font/google";
import { AuthProvider } from "@/lib/auth/auth-context"; import { AuthProvider } from "@/lib/auth/auth-context";
import { ErrorBoundary } from "@/components/error-boundary"; import { ErrorBoundary } from "@/components/error-boundary";
import { ThemeProvider } from "@/providers/ThemeProvider"; import { ThemeProvider } from "@/providers/ThemeProvider";
import { ReactQueryProvider } from "@/providers/ReactQueryProvider";
import "./globals.css"; import "./globals.css";
export const dynamic = "force-dynamic"; export const dynamic = "force-dynamic";
@@ -56,9 +57,11 @@ export default function RootLayout({ children }: { children: ReactNode }): React
</head> </head>
<body> <body>
<ThemeProvider> <ThemeProvider>
<ErrorBoundary> <ReactQueryProvider>
<AuthProvider>{children}</AuthProvider> <ErrorBoundary>
</ErrorBoundary> <AuthProvider>{children}</AuthProvider>
</ErrorBoundary>
</ReactQueryProvider>
</ThemeProvider> </ThemeProvider>
</body> </body>
</html> </html>

View File

@@ -1,6 +1,6 @@
"use client"; "use client";
import { useEffect, useState } from "react"; import { useEffect, useMemo, useState } from "react";
import Link from "next/link"; import Link from "next/link";
import { usePathname } from "next/navigation"; import { usePathname } from "next/navigation";
import Image from "next/image"; import Image from "next/image";
@@ -29,6 +29,10 @@ interface NavGroup {
items: NavItemConfig[]; items: NavItemConfig[];
} }
interface QueueNotification {
id: string;
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// SVG Icons (16x16 viewBox, stroke="currentColor", strokeWidth="1.5") // SVG Icons (16x16 viewBox, stroke="currentColor", strokeWidth="1.5")
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -685,6 +689,72 @@ function CollapseToggle({ collapsed, onToggle }: CollapseToggleProps): React.JSX
export function AppSidebar(): React.JSX.Element { export function AppSidebar(): React.JSX.Element {
const pathname = usePathname(); const pathname = usePathname();
const { collapsed, toggleCollapsed, mobileOpen, setMobileOpen, isMobile } = useSidebar(); const { collapsed, toggleCollapsed, mobileOpen, setMobileOpen, isMobile } = useSidebar();
const [missionControlBadgeCount, setMissionControlBadgeCount] = useState<number>(0);
useEffect(() => {
let isActive = true;
const loadNotificationCount = async (): Promise<void> => {
try {
const response = await fetch("/api/orchestrator/api/queue/notifications", {
method: "GET",
});
if (!response.ok) {
return;
}
const payload = (await response.json()) as QueueNotification[];
if (isActive) {
setMissionControlBadgeCount(Array.isArray(payload) ? payload.length : 0);
}
} catch {
// Ignore badge failures in the nav.
}
};
void loadNotificationCount();
if (typeof EventSource === "undefined") {
return (): void => {
isActive = false;
};
}
const source = new EventSource("/api/orchestrator/queue/notifications/stream");
source.onmessage = (event: MessageEvent<string>): void => {
try {
const payload = JSON.parse(event.data) as QueueNotification[];
if (isActive) {
setMissionControlBadgeCount(Array.isArray(payload) ? payload.length : 0);
}
} catch {
// Ignore malformed badge updates.
}
};
return (): void => {
isActive = false;
source.close();
};
}, []);
const navGroups = useMemo(
() =>
NAV_GROUPS.map((group) => ({
...group,
items: group.items.map((item) =>
item.href === "/mission-control" && missionControlBadgeCount > 0
? {
...item,
badge: { label: String(missionControlBadgeCount) },
}
: item
),
})),
[missionControlBadgeCount]
);
return ( return (
<> <>
@@ -722,7 +792,7 @@ export function AppSidebar(): React.JSX.Element {
}} }}
aria-label="Main navigation" aria-label="Main navigation"
> >
{NAV_GROUPS.map((group) => ( {navGroups.map((group) => (
<div key={group.label} style={{ marginBottom: "18px" }}> <div key={group.label} style={{ marginBottom: "18px" }}>
{/* Group label — hidden when collapsed */} {/* Group label — hidden when collapsed */}
{!collapsed && ( {!collapsed && (

View File

@@ -158,7 +158,9 @@ async function fetchAuditLog(
} }
try { try {
return await apiGet<AuditLogResponse>(`/api/mission-control/audit-log?${params.toString()}`); return await apiGet<AuditLogResponse>(
`/api/orchestrator/api/mission-control/audit-log?${params.toString()}`
);
} catch (error) { } catch (error) {
if (isRateLimitError(error)) { if (isRateLimitError(error)) {
return createEmptyAuditLogResponse(page, "Rate limited - retrying..."); return createEmptyAuditLogResponse(page, "Rate limited - retrying...");

View File

@@ -59,9 +59,12 @@ describe("BargeInInput", (): void => {
await user.click(screen.getByRole("button", { name: "Send" })); await user.click(screen.getByRole("button", { name: "Send" }));
await waitFor((): void => { await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-1/inject", { expect(mockApiPost).toHaveBeenCalledWith(
content: "execute plan", "/api/orchestrator/api/mission-control/sessions/session-1/inject",
}); {
content: "execute plan",
}
);
}); });
expect(onSent).toHaveBeenCalledTimes(1); expect(onSent).toHaveBeenCalledTimes(1);
@@ -83,12 +86,18 @@ describe("BargeInInput", (): void => {
const calls = mockApiPost.mock.calls as [string, unknown?][]; const calls = mockApiPost.mock.calls as [string, unknown?][];
expect(calls[0]).toEqual(["/api/mission-control/sessions/session-2/pause", undefined]); expect(calls[0]).toEqual([
"/api/orchestrator/api/mission-control/sessions/session-2/pause",
undefined,
]);
expect(calls[1]).toEqual([ expect(calls[1]).toEqual([
"/api/mission-control/sessions/session-2/inject", "/api/orchestrator/api/mission-control/sessions/session-2/inject",
{ content: "hello world" }, { content: "hello world" },
]); ]);
expect(calls[2]).toEqual(["/api/mission-control/sessions/session-2/resume", undefined]); expect(calls[2]).toEqual([
"/api/orchestrator/api/mission-control/sessions/session-2/resume",
undefined,
]);
}); });
it("submits with Enter and does not submit on Shift+Enter", async (): Promise<void> => { it("submits with Enter and does not submit on Shift+Enter", async (): Promise<void> => {
@@ -105,9 +114,12 @@ describe("BargeInInput", (): void => {
fireEvent.keyDown(textarea, { key: "Enter", code: "Enter", shiftKey: false }); fireEvent.keyDown(textarea, { key: "Enter", code: "Enter", shiftKey: false });
await waitFor((): void => { await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-3/inject", { expect(mockApiPost).toHaveBeenCalledWith(
content: "first", "/api/orchestrator/api/mission-control/sessions/session-3/inject",
}); {
content: "first",
}
);
}); });
}); });

View File

@@ -39,7 +39,7 @@ export function BargeInInput({ sessionId, onSent }: BargeInInputProps): React.JS
} }
const encodedSessionId = encodeURIComponent(sessionId); const encodedSessionId = encodeURIComponent(sessionId);
const baseEndpoint = `/api/mission-control/sessions/${encodedSessionId}`; const baseEndpoint = `/api/orchestrator/api/mission-control/sessions/${encodedSessionId}`;
let didPause = false; let didPause = false;
let didInject = false; let didInject = false;

View File

@@ -177,9 +177,12 @@ describe("GlobalAgentRoster", (): void => {
fireEvent.click(screen.getByRole("button", { name: "Kill session killme12" })); fireEvent.click(screen.getByRole("button", { name: "Kill session killme12" }));
await waitFor((): void => { await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/killme123456/kill", { expect(mockApiPost).toHaveBeenCalledWith(
force: false, "/api/orchestrator/api/mission-control/sessions/killme123456/kill",
}); {
force: false,
}
);
}); });
}); });

View File

@@ -83,7 +83,7 @@ function groupByProvider(sessions: MissionControlSession[]): ProviderSessionGrou
async function fetchSessions(): Promise<MissionControlSession[]> { async function fetchSessions(): Promise<MissionControlSession[]> {
const payload = await apiGet<MissionControlSession[] | SessionsPayload>( const payload = await apiGet<MissionControlSession[] | SessionsPayload>(
"/api/mission-control/sessions" "/api/orchestrator/api/mission-control/sessions"
); );
return Array.isArray(payload) ? payload : payload.sessions; return Array.isArray(payload) ? payload : payload.sessions;
} }
@@ -118,9 +118,12 @@ export function GlobalAgentRoster({
const killMutation = useMutation({ const killMutation = useMutation({
mutationFn: async (sessionId: string): Promise<string> => { mutationFn: async (sessionId: string): Promise<string> => {
await apiPost<{ message: string }>(`/api/mission-control/sessions/${sessionId}/kill`, { await apiPost<{ message: string }>(
force: false, `/api/orchestrator/api/mission-control/sessions/${sessionId}/kill`,
}); {
force: false,
}
);
return sessionId; return sessionId;
}, },
onSuccess: (): void => { onSuccess: (): void => {

View File

@@ -112,14 +112,20 @@ describe("KillAllDialog", (): void => {
await user.click(screen.getByRole("button", { name: "Kill All Agents" })); await user.click(screen.getByRole("button", { name: "Kill All Agents" }));
await waitFor((): void => { await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/internal-1/kill", { expect(mockApiPost).toHaveBeenCalledWith(
force: true, "/api/orchestrator/api/mission-control/sessions/internal-1/kill",
}); {
force: true,
}
);
}); });
expect(mockApiPost).not.toHaveBeenCalledWith("/api/mission-control/sessions/external-1/kill", { expect(mockApiPost).not.toHaveBeenCalledWith(
force: true, "/api/orchestrator/api/mission-control/sessions/external-1/kill",
}); {
force: true,
}
);
expect(onComplete).toHaveBeenCalledTimes(1); expect(onComplete).toHaveBeenCalledTimes(1);
}); });
@@ -141,12 +147,18 @@ describe("KillAllDialog", (): void => {
await user.click(screen.getByRole("button", { name: "Kill All Agents" })); await user.click(screen.getByRole("button", { name: "Kill All Agents" }));
await waitFor((): void => { await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/internal-2/kill", { expect(mockApiPost).toHaveBeenCalledWith(
force: true, "/api/orchestrator/api/mission-control/sessions/internal-2/kill",
}); {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/external-2/kill", { force: true,
force: true, }
}); );
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/mission-control/sessions/external-2/kill",
{
force: true,
}
);
}); });
}); });

View File

@@ -96,9 +96,12 @@ export function KillAllDialog({ sessions, onComplete }: KillAllDialogProps): Rea
const killRequests = targetSessions.map(async (session) => { const killRequests = targetSessions.map(async (session) => {
try { try {
await apiPost<{ message: string }>(`/api/mission-control/sessions/${session.id}/kill`, { await apiPost<{ message: string }>(
force: true, `/api/orchestrator/api/mission-control/sessions/${session.id}/kill`,
}); {
force: true,
}
);
return true; return true;
} catch { } catch {
return false; return false;

View File

@@ -8,6 +8,7 @@ interface MockButtonProps extends ButtonHTMLAttributes<HTMLButtonElement> {
const mockGlobalAgentRoster = vi.fn(); const mockGlobalAgentRoster = vi.fn();
const mockMissionControlPanel = vi.fn(); const mockMissionControlPanel = vi.fn();
const mockQueueNotificationFeed = vi.fn();
vi.mock("@/components/mission-control/AuditLogDrawer", () => ({ vi.mock("@/components/mission-control/AuditLogDrawer", () => ({
AuditLogDrawer: ({ trigger }: { trigger: ReactNode }): React.JSX.Element => ( AuditLogDrawer: ({ trigger }: { trigger: ReactNode }): React.JSX.Element => (
@@ -31,6 +32,13 @@ vi.mock("@/components/mission-control/MissionControlPanel", () => ({
MIN_PANEL_COUNT: 1, MIN_PANEL_COUNT: 1,
})); }));
vi.mock("@/components/mission-control/QueueNotificationFeed", () => ({
QueueNotificationFeed: (props: unknown): React.JSX.Element => {
mockQueueNotificationFeed(props);
return <div data-testid="queue-notification-feed" />;
},
}));
vi.mock("@/components/ui/button", () => ({ vi.mock("@/components/ui/button", () => ({
Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => ( Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => (
<button {...props}>{children}</button> <button {...props}>{children}</button>
@@ -66,5 +74,6 @@ describe("MissionControlLayout", (): void => {
expect(region.querySelector("main")).toBeInTheDocument(); expect(region.querySelector("main")).toBeInTheDocument();
expect(screen.getByTestId("global-agent-roster")).toBeInTheDocument(); expect(screen.getByTestId("global-agent-roster")).toBeInTheDocument();
expect(screen.getByTestId("mission-control-panel")).toBeInTheDocument(); expect(screen.getByTestId("mission-control-panel")).toBeInTheDocument();
expect(screen.getByTestId("queue-notification-feed")).toBeInTheDocument();
}); });
}); });

View File

@@ -9,6 +9,7 @@ import {
MissionControlPanel, MissionControlPanel,
type PanelConfig, type PanelConfig,
} from "@/components/mission-control/MissionControlPanel"; } from "@/components/mission-control/MissionControlPanel";
import { QueueNotificationFeed } from "@/components/mission-control/QueueNotificationFeed";
import { Button } from "@/components/ui/button"; import { Button } from "@/components/ui/button";
const INITIAL_PANELS: PanelConfig[] = [{}]; const INITIAL_PANELS: PanelConfig[] = [{}];
@@ -94,7 +95,7 @@ export function MissionControlLayout(): React.JSX.Element {
/> />
</header> </header>
<div className="grid min-h-0 flex-1 gap-4 xl:grid-cols-[280px_minmax(0,1fr)]"> <div className="grid min-h-0 flex-1 gap-4 xl:grid-cols-[280px_minmax(0,1fr)_320px]">
<aside className="h-full min-h-0"> <aside className="h-full min-h-0">
<GlobalAgentRoster <GlobalAgentRoster
onSelectSession={handleSelectSession} onSelectSession={handleSelectSession}
@@ -109,6 +110,9 @@ export function MissionControlLayout(): React.JSX.Element {
onExpandPanel={handleExpandPanel} onExpandPanel={handleExpandPanel}
/> />
</main> </main>
<aside className="h-full min-h-0">
<QueueNotificationFeed />
</aside>
</div> </div>
</section> </section>
); );

View File

@@ -89,7 +89,7 @@ describe("PanelControls", (): void => {
await waitFor((): void => { await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith( expect(mockApiPost).toHaveBeenCalledWith(
"/api/mission-control/sessions/session%20with%20space/pause", "/api/orchestrator/api/mission-control/sessions/session%20with%20space/pause",
undefined undefined
); );
}); });
@@ -114,9 +114,12 @@ describe("PanelControls", (): void => {
await user.click(screen.getByRole("button", { name: "Confirm" })); await user.click(screen.getByRole("button", { name: "Confirm" }));
await waitFor((): void => { await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-4/kill", { expect(mockApiPost).toHaveBeenCalledWith(
force: false, "/api/orchestrator/api/mission-control/sessions/session-4/kill",
}); {
force: false,
}
);
}); });
expect(onStatusChange).toHaveBeenCalledWith("killed"); expect(onStatusChange).toHaveBeenCalledWith("killed");
@@ -137,9 +140,12 @@ describe("PanelControls", (): void => {
await user.click(screen.getByRole("button", { name: "Confirm" })); await user.click(screen.getByRole("button", { name: "Confirm" }));
await waitFor((): void => { await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-5/kill", { expect(mockApiPost).toHaveBeenCalledWith(
force: true, "/api/orchestrator/api/mission-control/sessions/session-5/kill",
}); {
force: true,
}
);
}); });
expect(onStatusChange).toHaveBeenCalledWith("killed"); expect(onStatusChange).toHaveBeenCalledWith("killed");

View File

@@ -50,23 +50,23 @@ export function PanelControls({
switch (action) { switch (action) {
case "pause": case "pause":
await apiPost<{ message: string }>( await apiPost<{ message: string }>(
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/pause` `/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/pause`
); );
return { nextStatus: "paused" }; return { nextStatus: "paused" };
case "resume": case "resume":
await apiPost<{ message: string }>( await apiPost<{ message: string }>(
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/resume` `/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/resume`
); );
return { nextStatus: "active" }; return { nextStatus: "active" };
case "graceful-kill": case "graceful-kill":
await apiPost<{ message: string }>( await apiPost<{ message: string }>(
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`, `/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
{ force: false } { force: false }
); );
return { nextStatus: "killed" }; return { nextStatus: "killed" };
case "force-kill": case "force-kill":
await apiPost<{ message: string }>( await apiPost<{ message: string }>(
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`, `/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
{ force: true } { force: true }
); );
return { nextStatus: "killed" }; return { nextStatus: "killed" };

View File

@@ -0,0 +1,225 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { act, render, screen, waitFor } from "@testing-library/react";
import userEvent from "@testing-library/user-event";
import type { ButtonHTMLAttributes, HTMLAttributes, ReactNode } from "react";
vi.mock("date-fns", () => ({
formatDistanceToNow: (): string => "5 minutes ago",
}));
interface MockButtonProps extends ButtonHTMLAttributes<HTMLButtonElement> {
children: ReactNode;
}
interface MockContainerProps extends HTMLAttributes<HTMLElement> {
children: ReactNode;
}
interface MockEventSourceInstance {
url: string;
onerror: ((event: Event) => void) | null;
onmessage: ((event: MessageEvent<string>) => void) | null;
close: ReturnType<typeof vi.fn>;
emitMessage: (payload: unknown) => void;
}
interface QueueNotification {
id: string;
agent: string;
filename: string;
payload: unknown;
createdAt: string;
}
const mockApiGet = vi.fn<(endpoint: string) => Promise<QueueNotification[]>>();
const mockApiPost = vi.fn<(endpoint: string) => Promise<{ success: true; id: string }>>();
const mockShowToast = vi.fn<(message: string, variant?: string) => void>();
let mockEventSourceInstances: MockEventSourceInstance[] = [];
const MockEventSource = vi.fn(function (this: MockEventSourceInstance, url: string): void {
this.url = url;
this.onerror = null;
this.onmessage = null;
this.close = vi.fn();
this.emitMessage = (payload: unknown): void => {
this.onmessage?.(new MessageEvent("message", { data: JSON.stringify(payload) }));
};
mockEventSourceInstances.push(this);
});
vi.mock("@/lib/api/client", () => ({
apiGet: (endpoint: string): Promise<QueueNotification[]> => mockApiGet(endpoint),
apiPost: (endpoint: string): Promise<{ success: true; id: string }> => mockApiPost(endpoint),
}));
vi.mock("@mosaic/ui", () => ({
useToast: (): { showToast: typeof mockShowToast; removeToast: ReturnType<typeof vi.fn> } => ({
showToast: mockShowToast,
removeToast: vi.fn(),
}),
}));
vi.mock("@/components/ui/button", () => ({
Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => (
<button {...props}>{children}</button>
),
}));
vi.mock("@/components/ui/badge", () => ({
Badge: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<span {...props}>{children}</span>
),
}));
vi.mock("@/components/ui/card", () => ({
Card: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<section {...props}>{children}</section>
),
CardHeader: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<header {...props}>{children}</header>
),
CardContent: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<div {...props}>{children}</div>
),
CardTitle: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<h2 {...props}>{children}</h2>
),
}));
vi.mock("@/components/ui/collapsible", () => ({
Collapsible: ({ children }: MockContainerProps): React.JSX.Element => <div>{children}</div>,
}));
vi.mock("@/components/ui/scroll-area", () => ({
ScrollArea: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
<div {...props}>{children}</div>
),
}));
vi.mock("@/components/ui/skeleton", () => ({
Skeleton: (props: HTMLAttributes<HTMLDivElement>): React.JSX.Element => <div {...props} />,
}));
import { QueueNotificationFeed } from "./QueueNotificationFeed";
function latestEventSource(): MockEventSourceInstance {
const instance = mockEventSourceInstances[mockEventSourceInstances.length - 1];
if (!instance) {
throw new Error("Expected an EventSource instance");
}
return instance;
}
function makeNotification(overrides: Partial<QueueNotification>): QueueNotification {
return {
id: "notif-1",
agent: "mosaic",
filename: "notif-1.json",
payload: {
taskId: "MS24-WEB-001",
eventType: "task.ready",
},
createdAt: "2026-03-08T23:00:00.000Z",
...overrides,
};
}
describe("QueueNotificationFeed", (): void => {
beforeEach((): void => {
vi.clearAllMocks();
vi.stubGlobal("EventSource", MockEventSource);
vi.stubGlobal("fetch", vi.fn());
mockEventSourceInstances = [];
mockApiGet.mockResolvedValue([]);
mockApiPost.mockResolvedValue({ success: true, id: "notif-1" });
});
afterEach((): void => {
vi.unstubAllGlobals();
});
it("loads and renders notifications grouped by agent", async (): Promise<void> => {
mockApiGet.mockResolvedValue([
makeNotification({ id: "notif-1", agent: "mosaic" }),
makeNotification({
id: "notif-2",
agent: "dyor",
payload: { taskId: "MS24-WEB-002", eventType: "task.blocked" },
}),
]);
render(<QueueNotificationFeed />);
await waitFor((): void => {
expect(mockApiGet).toHaveBeenCalledWith("/api/orchestrator/api/queue/notifications");
});
expect(screen.getByText("mosaic")).toBeInTheDocument();
expect(screen.getByText("dyor")).toBeInTheDocument();
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
expect(screen.getByText("task.ready")).toBeInTheDocument();
expect(screen.getAllByText("5 minutes ago")).toHaveLength(2);
expect(MockEventSource).toHaveBeenCalledWith("/api/orchestrator/queue/notifications/stream");
});
it("acknowledges a notification and removes it from the list", async (): Promise<void> => {
const user = userEvent.setup();
mockApiGet.mockResolvedValue([makeNotification({ id: "notif-ack" })]);
render(<QueueNotificationFeed />);
await waitFor((): void => {
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
});
await user.click(screen.getByRole("button", { name: "ACK notification MS24-WEB-001" }));
await waitFor((): void => {
expect(mockApiPost).toHaveBeenCalledWith(
"/api/orchestrator/api/queue/notifications/notif-ack/ack"
);
});
await waitFor((): void => {
expect(screen.getByText("No pending notifications")).toBeInTheDocument();
});
});
it("renders the empty state when there are no notifications", async (): Promise<void> => {
mockApiGet.mockResolvedValue([]);
render(<QueueNotificationFeed />);
await waitFor((): void => {
expect(screen.getByText("No pending notifications")).toBeInTheDocument();
});
});
it("refreshes the list when an SSE message arrives", async (): Promise<void> => {
mockApiGet.mockResolvedValue([makeNotification({ id: "notif-before" })]);
render(<QueueNotificationFeed />);
await waitFor((): void => {
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
});
act(() => {
latestEventSource().emitMessage([
makeNotification({
id: "notif-after",
agent: "sage",
payload: { taskId: "MS24-WEB-003", eventType: "task.failed" },
}),
]);
});
await waitFor((): void => {
expect(screen.getByText("sage")).toBeInTheDocument();
expect(screen.getByText("MS24-WEB-003")).toBeInTheDocument();
expect(screen.getByText("task.failed")).toBeInTheDocument();
});
});
});

View File

@@ -0,0 +1,332 @@
"use client";
import { useCallback, useEffect, useMemo, useState } from "react";
import { formatDistanceToNow } from "date-fns";
import { BellOff, ChevronLeft, ChevronRight, Loader2 } from "lucide-react";
import { useToast } from "@mosaic/ui";
import { Badge } from "@/components/ui/badge";
import { Button } from "@/components/ui/button";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Collapsible } from "@/components/ui/collapsible";
import { ScrollArea } from "@/components/ui/scroll-area";
import { Skeleton } from "@/components/ui/skeleton";
import { apiGet, apiPost } from "@/lib/api/client";
export interface QueueNotificationFeedProps {
className?: string;
}
interface QueueNotification {
id: string;
agent: string;
filename: string;
payload: unknown;
createdAt: string;
}
interface QueuePayloadRecord {
taskId?: unknown;
type?: unknown;
eventType?: unknown;
event?: unknown;
}
interface NotificationGroup {
agent: string;
notifications: QueueNotification[];
}
const NOTIFICATIONS_ENDPOINT = "/api/orchestrator/api/queue/notifications";
const NOTIFICATIONS_STREAM_ENDPOINT = "/api/orchestrator/queue/notifications/stream";
function joinClasses(...classes: (string | undefined)[]): string {
return classes.filter((value) => typeof value === "string" && value.length > 0).join(" ");
}
function asPayloadRecord(payload: unknown): QueuePayloadRecord | null {
if (payload === null || typeof payload !== "object" || Array.isArray(payload)) {
return null;
}
return payload as QueuePayloadRecord;
}
function getNotificationTaskId(notification: QueueNotification): string {
const payload = asPayloadRecord(notification.payload);
return typeof payload?.taskId === "string" && payload.taskId.trim().length > 0
? payload.taskId
: notification.id;
}
function getNotificationEventType(notification: QueueNotification): string {
const payload = asPayloadRecord(notification.payload);
const candidates = [payload?.eventType, payload?.type, payload?.event];
for (const candidate of candidates) {
if (typeof candidate === "string" && candidate.trim().length > 0) {
return candidate;
}
}
return "notification";
}
function formatNotificationAge(createdAt: string): string {
const parsedDate = new Date(createdAt);
if (Number.isNaN(parsedDate.getTime())) {
return "just now";
}
return formatDistanceToNow(parsedDate, { addSuffix: true });
}
function groupNotificationsByAgent(notifications: QueueNotification[]): NotificationGroup[] {
const grouped = new Map<string, QueueNotification[]>();
for (const notification of notifications) {
const current = grouped.get(notification.agent) ?? [];
current.push(notification);
grouped.set(notification.agent, current);
}
return Array.from(grouped.entries())
.sort(([leftAgent], [rightAgent]) => leftAgent.localeCompare(rightAgent))
.map(([agent, items]) => ({
agent,
notifications: [...items].sort(
(left, right) => new Date(right.createdAt).getTime() - new Date(left.createdAt).getTime()
),
}));
}
export function QueueNotificationFeed({
className,
}: QueueNotificationFeedProps): React.JSX.Element {
const { showToast } = useToast();
const [notifications, setNotifications] = useState<QueueNotification[]>([]);
const [isLoading, setIsLoading] = useState(true);
const [errorMessage, setErrorMessage] = useState<string | null>(null);
const [acknowledgingIds, setAcknowledgingIds] = useState<Record<string, boolean>>({});
const [isCollapsed, setIsCollapsed] = useState(false);
const [now, setNow] = useState(Date.now());
const refreshNotifications = useCallback(async (): Promise<void> => {
try {
const payload = await apiGet<QueueNotification[]>(NOTIFICATIONS_ENDPOINT);
setNotifications(payload);
setErrorMessage(null);
} catch (error) {
const message =
error instanceof Error && error.message.trim().length > 0
? error.message
: "Failed to load queue notifications.";
setErrorMessage(message);
} finally {
setIsLoading(false);
}
}, []);
useEffect(() => {
void refreshNotifications();
}, [refreshNotifications]);
useEffect(() => {
if (typeof EventSource === "undefined") {
return undefined;
}
const source = new EventSource(NOTIFICATIONS_STREAM_ENDPOINT);
source.onmessage = (event: MessageEvent<string>): void => {
try {
const payload = JSON.parse(event.data) as QueueNotification[];
setNotifications(payload);
setErrorMessage(null);
setIsLoading(false);
} catch {
setErrorMessage("Received an invalid notification stream payload.");
}
};
source.onerror = (): void => {
setErrorMessage((current) => current ?? "Queue notification stream disconnected.");
};
return (): void => {
source.close();
};
}, []);
useEffect(() => {
const intervalId = window.setInterval(() => {
setNow(Date.now());
}, 60_000);
return (): void => {
window.clearInterval(intervalId);
};
}, []);
const groupedNotifications = useMemo(
() => groupNotificationsByAgent(notifications),
[notifications]
);
const pendingCount = notifications.length;
const handleAck = useCallback(
async (notificationId: string): Promise<void> => {
setAcknowledgingIds((current) => ({
...current,
[notificationId]: true,
}));
try {
await apiPost<{ success: true; id: string }>(
`/api/orchestrator/api/queue/notifications/${encodeURIComponent(notificationId)}/ack`
);
setNotifications((current) => current.filter((item) => item.id !== notificationId));
} catch (error) {
const message =
error instanceof Error && error.message.trim().length > 0
? error.message
: "Failed to acknowledge notification.";
showToast(message, "error");
} finally {
setAcknowledgingIds((current) => {
const { [notificationId]: _omitted, ...remaining } = current;
return remaining;
});
}
},
[showToast]
);
return (
<Card className={joinClasses("flex h-full min-h-0 flex-col", className)}>
<CardHeader className="pb-2">
<div className="flex items-start justify-between gap-2">
<div className="flex min-w-0 items-center gap-2">
<CardTitle className="text-base">Queue Notifications</CardTitle>
<Badge variant={pendingCount > 0 ? "status-info" : "secondary"}>{pendingCount}</Badge>
</div>
<Button
type="button"
variant="ghost"
size="icon"
className="h-7 w-7"
onClick={() => {
setIsCollapsed((current) => !current);
}}
aria-label={isCollapsed ? "Expand queue notifications" : "Collapse queue notifications"}
title={isCollapsed ? "Expand queue notifications" : "Collapse queue notifications"}
>
{isCollapsed ? (
<ChevronLeft className="h-4 w-4" aria-hidden="true" />
) : (
<ChevronRight className="h-4 w-4" aria-hidden="true" />
)}
</Button>
</div>
</CardHeader>
<Collapsible open={!isCollapsed} className="min-h-0 flex-1">
{!isCollapsed ? (
<CardContent className="min-h-0 flex-1 px-3 pb-3">
{isLoading ? (
<ScrollArea className="h-full">
<div className="space-y-2 pr-1">
{Array.from({ length: 6 }).map((_, index) => (
<Skeleton
key={`queue-notification-skeleton-${String(index)}`}
className="h-14 w-full"
/>
))}
</div>
</ScrollArea>
) : errorMessage && notifications.length === 0 ? (
<div className="flex h-full items-center justify-center px-4 text-center text-sm text-red-500">
{errorMessage}
</div>
) : groupedNotifications.length === 0 ? (
<div className="flex h-full flex-col items-center justify-center gap-2 text-center text-sm text-muted-foreground">
<BellOff className="h-5 w-5" aria-hidden="true" />
<span>No pending notifications</span>
</div>
) : (
<ScrollArea className="h-full">
<div className="space-y-4 pr-1">
{groupedNotifications.map((group) => (
<section key={group.agent} className="space-y-2">
<div className="flex items-center justify-between gap-2">
<h3 className="text-sm font-semibold text-foreground">{group.agent}</h3>
<span className="text-xs text-muted-foreground">
{group.notifications.length}
</span>
</div>
<div className="space-y-2">
{group.notifications.map((notification) => {
const taskId = getNotificationTaskId(notification);
const eventType = getNotificationEventType(notification);
const isAcknowledging = acknowledgingIds[notification.id] ?? false;
return (
<article
key={notification.id}
className="rounded-lg border border-border/70 bg-card px-3 py-2"
>
<div className="flex items-start justify-between gap-3">
<div className="min-w-0 space-y-1">
<div className="flex flex-wrap items-center gap-2">
<span className="font-mono text-xs text-foreground">
{taskId}
</span>
<Badge variant="secondary">{eventType}</Badge>
</div>
<time
className="block text-xs text-muted-foreground"
dateTime={notification.createdAt}
>
{formatNotificationAge(notification.createdAt)}
</time>
</div>
<Button
type="button"
variant="outline"
size="sm"
disabled={isAcknowledging}
onClick={() => {
void handleAck(notification.id);
}}
aria-label={`ACK notification ${taskId}`}
>
{isAcknowledging ? (
<span className="flex items-center gap-2">
<Loader2
className="h-4 w-4 animate-spin"
aria-hidden="true"
/>
ACK
</span>
) : (
"ACK"
)}
</Button>
</div>
</article>
);
})}
</div>
</section>
))}
</div>
</ScrollArea>
)}
</CardContent>
) : null}
</Collapsible>
<span className="sr-only" aria-live="polite">
{pendingCount} pending notifications, refreshed at {new Date(now).toISOString()}
</span>
</Card>
);
}

View File

@@ -0,0 +1,28 @@
"use client";
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
import { useState, type ReactNode } from "react";
interface ReactQueryProviderProps {
children: ReactNode;
}
export function ReactQueryProvider({ children }: ReactQueryProviderProps): React.JSX.Element {
// Create a stable QueryClient per component mount (one per app session)
const [queryClient] = useState(
() =>
new QueryClient({
defaultOptions: {
queries: {
// Don't refetch on window focus in a dashboard context
refetchOnWindowFocus: false,
// Stale time of 30s — short enough for live data, avoids hammering
staleTime: 30_000,
retry: 1,
},
},
})
);
return <QueryClientProvider client={queryClient}>{children}</QueryClientProvider>;
}

View File

@@ -333,7 +333,7 @@ services:
start_period: 40s start_period: 40s
networks: networks:
- internal - internal
- openbrain_brain-internal - openbrain-brain-internal
cap_drop: cap_drop:
- ALL - ALL
cap_add: cap_add:
@@ -406,7 +406,7 @@ services:
networks: networks:
- internal - internal
- traefik-public - traefik-public
- openbrain_brain-internal - openbrain-brain-internal
deploy: deploy:
restart_policy: restart_policy:
condition: on-failure condition: on-failure

View File

@@ -385,6 +385,9 @@ services:
NEXT_PUBLIC_APP_URL: ${NEXT_PUBLIC_APP_URL:-http://localhost:3000} NEXT_PUBLIC_APP_URL: ${NEXT_PUBLIC_APP_URL:-http://localhost:3000}
NEXT_PUBLIC_API_URL: ${NEXT_PUBLIC_API_URL:-http://localhost:3001} NEXT_PUBLIC_API_URL: ${NEXT_PUBLIC_API_URL:-http://localhost:3001}
TRUSTED_ORIGINS: ${TRUSTED_ORIGINS:-} TRUSTED_ORIGINS: ${TRUSTED_ORIGINS:-}
GITEA_WEBHOOK_SECRET: ${GITEA_WEBHOOK_SECRET:-}
GITEA_API_TOKEN: ${GITEA_API_TOKEN:-}
GATEKEEPER_ENABLED: ${GATEKEEPER_ENABLED:-true}
volumes: volumes:
- openbao_init:/openbao/init:ro - openbao_init:/openbao/init:ro
ports: ports:

View File

@@ -94,6 +94,21 @@ OIDC_REDIRECT_URI=http://localhost:3001/auth/oauth2/callback/authentik
See [Authentik Setup](2-authentik.md) for complete OIDC configuration. See [Authentik Setup](2-authentik.md) for complete OIDC configuration.
## Webhooks and Merge Automation
```bash
# Gitea webhook validation secret for /api/gatekeeper/webhook/gitea
GITEA_WEBHOOK_SECRET=your-random-webhook-secret
# Personal access token used by Gatekeeper to comment on and merge PRs
GITEA_API_TOKEN=your-gitea-api-token
# Master switch for the Gatekeeper auto-merge workflow
GATEKEEPER_ENABLED=true
```
Use a dedicated Gitea token with the minimum repository scope needed to comment on pull requests and perform merges.
## Cache and Storage ## Cache and Storage
### Valkey (Redis-compatible) ### Valkey (Redis-compatible)

View File

@@ -119,14 +119,14 @@ Target version: `v0.0.23`
### Phase 0 — Backend Core (Foundation) ### Phase 0 — Backend Core (Foundation)
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | | id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
| ----------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- | | ----------- | ------ | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- |
| MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD | | MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD |
| MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | | | MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | | | MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
| MS23-P0-004 | done | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-07 | 2026-03-07 | 15K | — | | | MS23-P0-004 | done | p0-foundation | Orchestrator API: POST /agents/:id/inject + pause/resume endpoints | #693 | orchestrator | feat/ms23-p0-controls | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-07 | 2026-03-07 | 15K | — | |
| MS23-P0-005 | done | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | | | MS23-P0-005 | done | p0-foundation | Subagent tree: parentAgentId on spawn registration + GET /agents/tree | #693 | orchestrator | feat/ms23-p0-tree | MS23-P0-001 | MS23-P0-006 | — | — | — | 15K | — | |
| MS23-P0-006 | done | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | codex | 2026-03-07 | 2026-03-07 | 20K | — | Phase 0 gate: SSE stream verified via curl | | MS23-P0-006 | done | p0-foundation | Unit + integration tests for all P0 orchestrator endpoints | #693 | orchestrator | test/ms23-p0 | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005 | MS23-P1-001 | codex | 2026-03-07 | 2026-03-07 | 20K | — | Phase 0 gate: SSE stream verified via curl |
### Phase 1 — Provider Interface (Plugin Architecture) ### Phase 1 — Provider Interface (Plugin Architecture)
@@ -183,3 +183,29 @@ Target version: `v0.0.23`
| **Total** | **29** | **~478K** | | **Total** | **29** | **~478K** |
Recommended dispatch: Codex for Phase 2 UI + routine API tasks; Sonnet for complex streaming logic (P0-003, P1-005, P3-002). Recommended dispatch: Codex for Phase 2 UI + routine API tasks; Sonnet for complex streaming logic (P0-003, P1-005, P3-002).
---
## MS24 — Queue Integration in Mission Control
PRD: Issue #749
Milestone: `0.0.24`
Target version: `v0.0.24`
> Single-writer: orchestrator (Jarvis/OpenClaw) only. Workers read but never modify.
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
| ------------ | ------ | --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----- | ----- | -------------------- | ------------ | ------------ | ----- | ---------- | ------------ | -------- | ---- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- |
| MS24-API-001 | done | p0-api | QueueNotificationsModule: GET /notifications, GET /notifications/stream (SSE), POST /notifications/:id/ack, GET /tasks. Auth: ApiKeyGuard. Unit tests included. | #749 | api | feat/ms24-queue-api | — | MS24-WEB-001 | — | — | — | 25K | — | Guard: ApiKeyGuard from src/common/guards/api-key.guard.ts (COORDINATOR_API_KEY). SSE via raw express Response + chokidar. Fail-soft if inbox dir missing. |
| MS24-API-002 | done | p0-api | Woodpecker CI webhook → agent notification: POST /api/webhooks/woodpecker, HMAC-SHA256 verify, scan agent-state active dir, fire mosaic-queue notification to matched agent | #749 | api | feat/ms24-ci-webhook | MS24-API-001 | MS24-VER-001 | codex | 2026-03-08 | — | 15K | — | event: completed/failed. @SkipCsrf, no auth guard. Also adds notify-webhook step to .woodpecker/ci.yml |
| MS24-WEB-001 | done | p0-ui | QueueNotificationFeed component + MissionControlLayout wiring (right sidebar panel, badge count) | #749 | web | feat/ms24-queue-ui | MS24-API-001 | MS24-VER-001 | — | — | — | 20K | — | SSE to /api/queue/notifications/stream. ACK button. Collapsible panel in MissionControlLayout.tsx. |
| MS24-VER-001 | done | p0-verify | CI green, no regressions, deploy to prod, tag v0.0.24 | #749 | stack | — | MS24-WEB-001 | — | — | — | — | 5K | — | |
### MS24 Budget Summary
| Phase | Tasks | Estimate |
| --------- | ----- | -------- |
| API | 1 | ~25K |
| UI | 1 | ~20K |
| Verify | 1 | ~5K |
| **Total** | **3** | **~50K** |

View File

@@ -13,11 +13,11 @@ Images are tagged based on branch and event type:
### Tag Meanings ### Tag Meanings
| Tag | Purpose | Stability | | Tag | Purpose | Stability |
| -------------------------- | ---------------------------------- | --------- | | -------------------------- | -------------------------------- | --------- |
| `latest` | Current build from `main` | Latest | | `latest` | Current build from `main` | Latest |
| `v*` (e.g., `v1.0.0`) | Versioned release | Immutable | | `v*` (e.g., `v1.0.0`) | Versioned release | Immutable |
| `{sha}` (e.g., `658ec077`) | Specific commit for traceability | Immutable | | `{sha}` (e.g., `658ec077`) | Specific commit for traceability | Immutable |
## Retention Policy Configuration ## Retention Policy Configuration

View File

@@ -3,6 +3,7 @@
## Objective ## Objective
Implement rate limiting on all federation endpoints to prevent denial-of-service (DoS) attacks. Federation endpoints currently have no rate limiting, allowing attackers to: Implement rate limiting on all federation endpoints to prevent denial-of-service (DoS) attacks. Federation endpoints currently have no rate limiting, allowing attackers to:
- Overwhelm the server with connection requests - Overwhelm the server with connection requests
- Flood token validation endpoints - Flood token validation endpoints
- Exhaust system resources - Exhaust system resources
@@ -12,6 +13,7 @@ Implement rate limiting on all federation endpoints to prevent denial-of-service
**Severity:** P0 (Critical) - Blocks production deployment **Severity:** P0 (Critical) - Blocks production deployment
**Attack Vector:** Unauthenticated public endpoints allow unlimited requests **Attack Vector:** Unauthenticated public endpoints allow unlimited requests
**Risk:** System can be brought down by flooding requests to: **Risk:** System can be brought down by flooding requests to:
1. `POST /api/v1/federation/incoming/connect` (Public, no auth) 1. `POST /api/v1/federation/incoming/connect` (Public, no auth)
2. `POST /api/v1/federation/auth/validate` (Public, no auth) 2. `POST /api/v1/federation/auth/validate` (Public, no auth)
3. All other endpoints (authenticated, but can be abused) 3. All other endpoints (authenticated, but can be abused)
@@ -19,15 +21,19 @@ Implement rate limiting on all federation endpoints to prevent denial-of-service
## Approach ## Approach
### 1. Install @nestjs/throttler ### 1. Install @nestjs/throttler
Use NestJS's official rate limiting package which integrates with the framework's guard system. Use NestJS's official rate limiting package which integrates with the framework's guard system.
### 2. Configure Rate Limits ### 2. Configure Rate Limits
Tiered rate limiting strategy: Tiered rate limiting strategy:
- **Public endpoints:** Strict limits (5 req/min per IP) - **Public endpoints:** Strict limits (5 req/min per IP)
- **Authenticated endpoints:** Moderate limits (20 req/min per user) - **Authenticated endpoints:** Moderate limits (20 req/min per user)
- **Admin endpoints:** Higher limits (50 req/min per user) - **Admin endpoints:** Higher limits (50 req/min per user)
### 3. Implementation Strategy ### 3. Implementation Strategy
1. Add `@nestjs/throttler` dependency 1. Add `@nestjs/throttler` dependency
2. Configure ThrottlerModule globally 2. Configure ThrottlerModule globally
3. Apply custom rate limits per endpoint using decorators 3. Apply custom rate limits per endpoint using decorators
@@ -53,6 +59,7 @@ Tiered rate limiting strategy:
**COMPLETE** - Rate limiting successfully implemented on all federation endpoints. **COMPLETE** - Rate limiting successfully implemented on all federation endpoints.
**Security Impact:** MITIGATED **Security Impact:** MITIGATED
- DoS vulnerability eliminated via rate limiting - DoS vulnerability eliminated via rate limiting
- Public endpoints protected with strict limits (3 req/sec) - Public endpoints protected with strict limits (3 req/sec)
- Authenticated endpoints have moderate limits (20 req/min) - Authenticated endpoints have moderate limits (20 req/min)
@@ -61,6 +68,7 @@ Tiered rate limiting strategy:
## Baseline Quality Status ## Baseline Quality Status
**Pre-existing Technical Debt** (NOT introduced by this fix): **Pre-existing Technical Debt** (NOT introduced by this fix):
- 29 TypeScript errors in apps/api (federation + runner-jobs) - 29 TypeScript errors in apps/api (federation + runner-jobs)
- Federation: Missing Prisma schema types (`FederationConnectionStatus`, `Instance`, `federatedIdentity`) - Federation: Missing Prisma schema types (`FederationConnectionStatus`, `Instance`, `federatedIdentity`)
- Runner Jobs: Missing `version` field in schema - Runner Jobs: Missing `version` field in schema
@@ -68,6 +76,7 @@ Tiered rate limiting strategy:
- **My changes introduced 0 new errors** - **My changes introduced 0 new errors**
**Quality Assessment:** **Quality Assessment:**
- ✅ Tier 1 (Baseline): No regression (error count unchanged) - ✅ Tier 1 (Baseline): No regression (error count unchanged)
- ✅ Tier 2 (Modified Files): 0 new errors in files I touched - ✅ Tier 2 (Modified Files): 0 new errors in files I touched
- ✅ Tier 3 (New Code): Rate limiting configuration is syntactically correct - ✅ Tier 3 (New Code): Rate limiting configuration is syntactically correct
@@ -75,6 +84,7 @@ Tiered rate limiting strategy:
## Testing Status ## Testing Status
**Blocked:** Federation module tests cannot run until Prisma schema is added. Pre-existing error: **Blocked:** Federation module tests cannot run until Prisma schema is added. Pre-existing error:
``` ```
TypeError: Cannot read properties of undefined (reading 'PENDING') TypeError: Cannot read properties of undefined (reading 'PENDING')
FederationConnectionStatus is undefined FederationConnectionStatus is undefined
@@ -83,6 +93,7 @@ FederationConnectionStatus is undefined
This is NOT caused by my changes - it's pre-existing technical debt from incomplete M7 federation implementation. This is NOT caused by my changes - it's pre-existing technical debt from incomplete M7 federation implementation.
**Manual Verification:** **Manual Verification:**
- TypeScript compilation: No new errors introduced - TypeScript compilation: No new errors introduced
- Rate limiting decorators: Correctly applied to all endpoints - Rate limiting decorators: Correctly applied to all endpoints
- ThrottlerModule: Properly configured with 3 tiers - ThrottlerModule: Properly configured with 3 tiers
@@ -91,6 +102,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
## Testing ## Testing
### Rate Limit Tests ### Rate Limit Tests
1. Public endpoint exceeds limit → 429 Too Many Requests 1. Public endpoint exceeds limit → 429 Too Many Requests
2. Authenticated endpoint exceeds limit → 429 Too Many Requests 2. Authenticated endpoint exceeds limit → 429 Too Many Requests
3. Within limits → 200 OK 3. Within limits → 200 OK
@@ -99,6 +111,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
6. Different users have independent limits 6. Different users have independent limits
### Security Tests ### Security Tests
1. Cannot bypass rate limit with different user agents 1. Cannot bypass rate limit with different user agents
2. Cannot bypass rate limit with different headers 2. Cannot bypass rate limit with different headers
3. Rate limit counter resets after time window 3. Rate limit counter resets after time window
@@ -107,6 +120,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
## Federation Endpoints Requiring Rate Limiting ## Federation Endpoints Requiring Rate Limiting
### FederationController (`/api/v1/federation`) ### FederationController (`/api/v1/federation`)
- `GET /instance` - Public (5 req/min per IP) - `GET /instance` - Public (5 req/min per IP)
- `POST /instance/regenerate-keys` - Admin (10 req/min per user) - `POST /instance/regenerate-keys` - Admin (10 req/min per user)
- `POST /connections/initiate` - Auth (10 req/min per user) - `POST /connections/initiate` - Auth (10 req/min per user)
@@ -118,6 +132,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
- `POST /incoming/connect` - **Public (3 req/min per IP)** ← CRITICAL - `POST /incoming/connect` - **Public (3 req/min per IP)** ← CRITICAL
### FederationAuthController (`/api/v1/federation/auth`) ### FederationAuthController (`/api/v1/federation/auth`)
- `POST /initiate` - Auth (10 req/min per user) - `POST /initiate` - Auth (10 req/min per user)
- `POST /link` - Auth (5 req/min per user) - `POST /link` - Auth (5 req/min per user)
- `GET /identities` - Auth (30 req/min per user) - `GET /identities` - Auth (30 req/min per user)
@@ -127,18 +142,21 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
## Notes ## Notes
### Design Decisions ### Design Decisions
- Use IP-based rate limiting for public endpoints - Use IP-based rate limiting for public endpoints
- Use user-based rate limiting for authenticated endpoints - Use user-based rate limiting for authenticated endpoints
- Store rate limit state in Valkey (Redis-compatible) for scalability - Store rate limit state in Valkey (Redis-compatible) for scalability
- Include rate limit headers in responses (X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset) - Include rate limit headers in responses (X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset)
### Attack Vectors Mitigated ### Attack Vectors Mitigated
1. **Connection Request Flooding:** Attacker sends unlimited connection requests to `/incoming/connect` 1. **Connection Request Flooding:** Attacker sends unlimited connection requests to `/incoming/connect`
2. **Token Validation Abuse:** Attacker floods `/auth/validate` to exhaust resources 2. **Token Validation Abuse:** Attacker floods `/auth/validate` to exhaust resources
3. **Authenticated User Abuse:** Compromised credentials used to flood authenticated endpoints 3. **Authenticated User Abuse:** Compromised credentials used to flood authenticated endpoints
4. **Resource Exhaustion:** Prevents CPU/memory exhaustion from processing excessive requests 4. **Resource Exhaustion:** Prevents CPU/memory exhaustion from processing excessive requests
### Future Enhancements (Not in Scope) ### Future Enhancements (Not in Scope)
- Circuit breaker pattern for failing instances - Circuit breaker pattern for failing instances
- Geographic rate limiting - Geographic rate limiting
- Adaptive rate limiting based on system load - Adaptive rate limiting based on system load

View File

@@ -7,11 +7,13 @@ The initial implementation (commit 6878d57) was high quality but included placeh
## Security-Critical Issues ## Security-Critical Issues
### 1. JWT Token Validation (CRITICAL) ### 1. JWT Token Validation (CRITICAL)
**Problem**: `validateToken()` always returns `valid: false` **Problem**: `validateToken()` always returns `valid: false`
**Risk**: Cannot verify authenticity of federated tokens **Risk**: Cannot verify authenticity of federated tokens
**Solution**: Implement proper JWT validation with signature verification **Solution**: Implement proper JWT validation with signature verification
### 2. OIDC Discovery (CRITICAL) ### 2. OIDC Discovery (CRITICAL)
**Problem**: `generateAuthUrl()` returns hardcoded placeholder URL **Problem**: `generateAuthUrl()` returns hardcoded placeholder URL
**Risk**: Cannot initiate real federated authentication flows **Risk**: Cannot initiate real federated authentication flows
**Solution**: Implement OIDC discovery and proper authorization URL generation **Solution**: Implement OIDC discovery and proper authorization URL generation
@@ -19,9 +21,11 @@ The initial implementation (commit 6878d57) was high quality but included placeh
## Implementation Plan ## Implementation Plan
### 1. Add Dependencies ### 1. Add Dependencies
- [x] Add `jose` library for JWT handling (industry-standard, secure) - [x] Add `jose` library for JWT handling (industry-standard, secure)
### 2. Implement JWT Validation ### 2. Implement JWT Validation
- [ ] Fetch OIDC discovery metadata from issuer - [ ] Fetch OIDC discovery metadata from issuer
- [ ] Cache JWKS (JSON Web Key Set) for performance - [ ] Cache JWKS (JSON Web Key Set) for performance
- [ ] Verify JWT signature using remote public key - [ ] Verify JWT signature using remote public key
@@ -31,6 +35,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
- [ ] Return proper validation results - [ ] Return proper validation results
### 3. Implement OIDC Discovery ### 3. Implement OIDC Discovery
- [ ] Fetch `.well-known/openid-configuration` from remote instance - [ ] Fetch `.well-known/openid-configuration` from remote instance
- [ ] Cache discovery metadata - [ ] Cache discovery metadata
- [ ] Generate proper OAuth2 authorization URL - [ ] Generate proper OAuth2 authorization URL
@@ -39,6 +44,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
- [ ] Support standard OIDC scopes (openid, profile, email) - [ ] Support standard OIDC scopes (openid, profile, email)
### 4. Update Tests ### 4. Update Tests
- [ ] Replace mock-based tests with real behavior tests - [ ] Replace mock-based tests with real behavior tests
- [ ] Test valid JWT validation - [ ] Test valid JWT validation
- [ ] Test expired/invalid token rejection - [ ] Test expired/invalid token rejection
@@ -47,6 +53,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
- [ ] Maintain 85%+ test coverage - [ ] Maintain 85%+ test coverage
### 5. Security Considerations ### 5. Security Considerations
- Cache JWKS to avoid excessive network calls - Cache JWKS to avoid excessive network calls
- Validate token expiration strictly - Validate token expiration strictly
- Use PKCE to prevent authorization code interception - Use PKCE to prevent authorization code interception
@@ -57,6 +64,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
## Implementation Notes ## Implementation Notes
**PKCE Flow**: **PKCE Flow**:
1. Generate random code_verifier (base64url-encoded random bytes) 1. Generate random code_verifier (base64url-encoded random bytes)
2. Generate code_challenge = base64url(SHA256(code_verifier)) 2. Generate code_challenge = base64url(SHA256(code_verifier))
3. Store code_verifier in session/database 3. Store code_verifier in session/database
@@ -64,6 +72,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
5. Send code_verifier in token exchange 5. Send code_verifier in token exchange
**JWT Validation Flow**: **JWT Validation Flow**:
1. Parse JWT without verification to get header 1. Parse JWT without verification to get header
2. Fetch JWKS from issuer (cache for 1 hour) 2. Fetch JWKS from issuer (cache for 1 hour)
3. Find matching key by kid (key ID) 3. Find matching key by kid (key ID)

View File

@@ -12,6 +12,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
## What Was Implemented ## What Was Implemented
### Database Schema ### Database Schema
- **FederationEventSubscription Model**: New table for storing event subscriptions - **FederationEventSubscription Model**: New table for storing event subscriptions
- Fields: id, workspaceId, connectionId, eventType, metadata, isActive, timestamps - Fields: id, workspaceId, connectionId, eventType, metadata, isActive, timestamps
- Unique constraint on (workspaceId, connectionId, eventType) - Unique constraint on (workspaceId, connectionId, eventType)
@@ -21,6 +22,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
### Core Services ### Core Services
**EventService** (`event.service.ts`) **EventService** (`event.service.ts`)
- `subscribeToEventType()`: Subscribe to events from remote instance - `subscribeToEventType()`: Subscribe to events from remote instance
- `unsubscribeFromEventType()`: Remove event subscription - `unsubscribeFromEventType()`: Remove event subscription
- `publishEvent()`: Publish events to all subscribed connections - `publishEvent()`: Publish events to all subscribed connections
@@ -35,6 +37,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
**EventController** (`event.controller.ts`) **EventController** (`event.controller.ts`)
**Authenticated Endpoints (require AuthGuard):** **Authenticated Endpoints (require AuthGuard):**
- `POST /api/v1/federation/events/subscribe` - Subscribe to event type - `POST /api/v1/federation/events/subscribe` - Subscribe to event type
- `POST /api/v1/federation/events/unsubscribe` - Unsubscribe from event type - `POST /api/v1/federation/events/unsubscribe` - Unsubscribe from event type
- `POST /api/v1/federation/events/publish` - Publish event to subscribers - `POST /api/v1/federation/events/publish` - Publish event to subscribers
@@ -43,12 +46,14 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
- `GET /api/v1/federation/events/messages/:id` - Get single event message - `GET /api/v1/federation/events/messages/:id` - Get single event message
**Public Endpoints (signature-verified):** **Public Endpoints (signature-verified):**
- `POST /api/v1/federation/incoming/event` - Receive event from remote instance - `POST /api/v1/federation/incoming/event` - Receive event from remote instance
- `POST /api/v1/federation/incoming/event/ack` - Receive event acknowledgment - `POST /api/v1/federation/incoming/event/ack` - Receive event acknowledgment
### Type Definitions ### Type Definitions
**Added to `message.types.ts`:** **Added to `message.types.ts`:**
- `EventMessage`: Outgoing event structure - `EventMessage`: Outgoing event structure
- `EventAck`: Event acknowledgment structure - `EventAck`: Event acknowledgment structure
- `EventMessageDetails`: Event message response type - `EventMessageDetails`: Event message response type
@@ -57,6 +62,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
### Data Transfer Objects ### Data Transfer Objects
**event.dto.ts:** **event.dto.ts:**
- `SubscribeToEventDto`: Subscribe request - `SubscribeToEventDto`: Subscribe request
- `UnsubscribeFromEventDto`: Unsubscribe request - `UnsubscribeFromEventDto`: Unsubscribe request
- `PublishEventDto`: Publish event request - `PublishEventDto`: Publish event request
@@ -66,12 +72,14 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
## Testing ## Testing
### Test Coverage ### Test Coverage
- **EventService**: 18 unit tests, **89.09% coverage** - **EventService**: 18 unit tests, **89.09% coverage**
- **EventController**: 11 unit tests, **83.87% coverage** - **EventController**: 11 unit tests, **83.87% coverage**
- **Total**: 29 tests, all passing - **Total**: 29 tests, all passing
- **Coverage**: Exceeds 85% minimum requirement - **Coverage**: Exceeds 85% minimum requirement
### Test Scenarios Covered ### Test Scenarios Covered
- Subscription creation and deletion - Subscription creation and deletion
- Event publishing to multiple subscribers - Event publishing to multiple subscribers
- Failed delivery handling - Failed delivery handling
@@ -84,17 +92,21 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
## Design Patterns ## Design Patterns
### Consistency with Existing Code ### Consistency with Existing Code
- Follows patterns from `QueryService` and `CommandService` - Follows patterns from `QueryService` and `CommandService`
- Reuses existing `SignatureService` for message verification - Reuses existing `SignatureService` for message verification
- Reuses existing `FederationService` for instance identity - Reuses existing `FederationService` for instance identity
- Uses existing `FederationMessage` model with new `eventType` field - Uses existing `FederationMessage` model with new `eventType` field
### Event Type Naming Convention ### Event Type Naming Convention
Hierarchical dot-notation: Hierarchical dot-notation:
- `entity.action` (e.g., "task.created", "user.updated") - `entity.action` (e.g., "task.created", "user.updated")
- `entity.action.detail` (e.g., "task.status.changed") - `entity.action.detail` (e.g., "task.status.changed")
### Security Features ### Security Features
- All events signature-verified (RSA) - All events signature-verified (RSA)
- Timestamp validation (prevents replay attacks) - Timestamp validation (prevents replay attacks)
- Connection status validation (only active connections) - Connection status validation (only active connections)
@@ -103,14 +115,18 @@ Hierarchical dot-notation:
## Technical Details ## Technical Details
### Database Migration ### Database Migration
File: `20260203_add_federation_event_subscriptions/migration.sql` File: `20260203_add_federation_event_subscriptions/migration.sql`
- Adds `eventType` column to `federation_messages` - Adds `eventType` column to `federation_messages`
- Creates `federation_event_subscriptions` table - Creates `federation_event_subscriptions` table
- Adds appropriate indexes for performance - Adds appropriate indexes for performance
- Establishes foreign key relationships - Establishes foreign key relationships
### Integration ### Integration
Updated `federation.module.ts`: Updated `federation.module.ts`:
- Added `EventService` to providers - Added `EventService` to providers
- Added `EventController` to controllers - Added `EventController` to controllers
- Exported `EventService` for use by other modules - Exported `EventService` for use by other modules
@@ -126,6 +142,7 @@ Updated `federation.module.ts`:
## Files Created/Modified ## Files Created/Modified
### New Files (7) ### New Files (7)
- `apps/api/src/federation/event.service.ts` (470 lines) - `apps/api/src/federation/event.service.ts` (470 lines)
- `apps/api/src/federation/event.service.spec.ts` (1,088 lines) - `apps/api/src/federation/event.service.spec.ts` (1,088 lines)
- `apps/api/src/federation/event.controller.ts` (199 lines) - `apps/api/src/federation/event.controller.ts` (199 lines)
@@ -135,11 +152,13 @@ Updated `federation.module.ts`:
- `docs/scratchpads/90-event-subscriptions.md` (185 lines) - `docs/scratchpads/90-event-subscriptions.md` (185 lines)
### Modified Files (3) ### Modified Files (3)
- `apps/api/src/federation/types/message.types.ts` (+118 lines) - `apps/api/src/federation/types/message.types.ts` (+118 lines)
- `apps/api/src/federation/federation.module.ts` (+3 lines) - `apps/api/src/federation/federation.module.ts` (+3 lines)
- `apps/api/prisma/schema.prisma` (+27 lines) - `apps/api/prisma/schema.prisma` (+27 lines)
### Total Changes ### Total Changes
- **2,395 lines added** - **2,395 lines added**
- **5 lines removed** - **5 lines removed**
- **10 files changed** - **10 files changed**
@@ -147,20 +166,25 @@ Updated `federation.module.ts`:
## Key Features ## Key Features
### Server-Side Event Filtering ### Server-Side Event Filtering
Events are only sent to instances with active subscriptions for that event type. This prevents unnecessary network traffic and processing. Events are only sent to instances with active subscriptions for that event type. This prevents unnecessary network traffic and processing.
### Acknowledgment Protocol ### Acknowledgment Protocol
Simple ACK pattern confirms event delivery: Simple ACK pattern confirms event delivery:
1. Publisher sends event 1. Publisher sends event
2. Receiver processes and returns ACK 2. Receiver processes and returns ACK
3. Publisher updates delivery status 3. Publisher updates delivery status
### Error Handling ### Error Handling
- Failed deliveries marked as FAILED with error message - Failed deliveries marked as FAILED with error message
- Connection errors logged but don't crash the system - Connection errors logged but don't crash the system
- Invalid signatures rejected immediately - Invalid signatures rejected immediately
### Subscription Management ### Subscription Management
- Subscriptions persist in database - Subscriptions persist in database
- Can be activated/deactivated without deletion - Can be activated/deactivated without deletion
- Support for metadata (extensibility) - Support for metadata (extensibility)
@@ -168,6 +192,7 @@ Simple ACK pattern confirms event delivery:
## Future Enhancements (Not Implemented) ## Future Enhancements (Not Implemented)
These were considered but deferred to future issues: These were considered but deferred to future issues:
- Event replay/history - Event replay/history
- Event filtering by payload fields - Event filtering by payload fields
- Webhook support for event delivery - Webhook support for event delivery
@@ -179,11 +204,13 @@ These were considered but deferred to future issues:
## Performance Considerations ## Performance Considerations
### Scalability ### Scalability
- Database indexes on eventType, connectionId, workspaceId - Database indexes on eventType, connectionId, workspaceId
- Efficient queries with proper WHERE clauses - Efficient queries with proper WHERE clauses
- Server-side filtering reduces network overhead - Server-side filtering reduces network overhead
### Monitoring ### Monitoring
- All operations logged with appropriate level - All operations logged with appropriate level
- Failed deliveries tracked in database - Failed deliveries tracked in database
- Delivery timestamps recorded for analytics - Delivery timestamps recorded for analytics
@@ -191,12 +218,14 @@ These were considered but deferred to future issues:
## Documentation ## Documentation
### Inline Documentation ### Inline Documentation
- JSDoc comments on all public methods - JSDoc comments on all public methods
- Clear parameter descriptions - Clear parameter descriptions
- Return type documentation - Return type documentation
- Usage examples in comments - Usage examples in comments
### Scratchpad Documentation ### Scratchpad Documentation
- Complete implementation plan - Complete implementation plan
- Design decisions documented - Design decisions documented
- Testing strategy outlined - Testing strategy outlined
@@ -205,6 +234,7 @@ These were considered but deferred to future issues:
## Integration Testing Recommendations ## Integration Testing Recommendations
While unit tests are comprehensive, recommend integration testing: While unit tests are comprehensive, recommend integration testing:
1. Set up two federated instances 1. Set up two federated instances
2. Subscribe from Instance A to Instance B events 2. Subscribe from Instance A to Instance B events
3. Publish event from Instance B 3. Publish event from Instance B
@@ -214,6 +244,7 @@ While unit tests are comprehensive, recommend integration testing:
## Conclusion ## Conclusion
FED-007 (EVENT Subscriptions) is **complete and ready for code review**. The implementation: FED-007 (EVENT Subscriptions) is **complete and ready for code review**. The implementation:
- ✅ Follows TDD principles - ✅ Follows TDD principles
- ✅ Meets 85%+ code coverage requirement - ✅ Meets 85%+ code coverage requirement
- ✅ Passes all quality gates (lint, typecheck, tests) - ✅ Passes all quality gates (lint, typecheck, tests)

View File

@@ -0,0 +1,40 @@
# MS-GATE-001 Scratchpad
## Objective
Build the API Gatekeeper module for PR auto-merge orchestration using Gitea PR webhooks, Woodpecker CI webhooks, and a `pending_merges` Prisma model.
## Constraints
- Work in `/home/jwoltje/src/mosaic-stack-worktrees/gate-001`
- Do not merge or deploy
- Must pass:
- `pnpm format:check`
- `SKIP_ENV_VALIDATION=true pnpm turbo typecheck`
- `SKIP_ENV_VALIDATION=true pnpm turbo lint`
- `SKIP_ENV_VALIDATION=true pnpm turbo test --filter=@mosaic/api -- --testPathPattern="gatekeeper"`
## ASSUMPTION
- Woodpecker PR pipelines expose `CI_COMMIT_PULL_REQUEST` and `CI_COMMIT_SHA`, so the webhook notifier can send `prNumber` and `headSha`.
- Rationale: Gatekeeper needs an exact PR/head tuple to safely match CI back to `pending_merges`.
## Plan
1. Add Prisma model + SQL migration for `pending_merges`
2. Add Gatekeeper NestJS module/controller/service/DTO/tests
3. Wire Woodpecker webhook -> Gatekeeper CI handler
4. Add env/config documentation and compose variables
5. Run quality gates, review, remediate, push, open PR
## Progress
- [x] Context loaded
- [ ] Tests added first
- [ ] Implementation complete
- [ ] Quality gates green
- [ ] Push + PR opened
## Verification
- Pending

14
pnpm-lock.yaml generated
View File

@@ -162,6 +162,9 @@ importers:
bullmq: bullmq:
specifier: ^5.67.2 specifier: ^5.67.2
version: 5.67.2 version: 5.67.2
chokidar:
specifier: ^4.0.3
version: 4.0.3
class-transformer: class-transformer:
specifier: ^0.5.1 specifier: ^0.5.1
version: 0.5.1 version: 0.5.1
@@ -365,8 +368,8 @@ importers:
specifier: ^7.8.1 specifier: ^7.8.1
version: 7.8.2 version: 7.8.2
simple-git: simple-git:
specifier: ^3.27.0 specifier: ^3.32.3
version: 3.30.0 version: 3.33.0
zod: zod:
specifier: ^3.24.1 specifier: ^3.24.1
version: 3.25.76 version: 3.25.76
@@ -5176,6 +5179,7 @@ packages:
glob@10.5.0: glob@10.5.0:
resolution: {integrity: sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg==} resolution: {integrity: sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg==}
deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
hasBin: true hasBin: true
glob@13.0.0: glob@13.0.0:
@@ -6791,8 +6795,8 @@ packages:
simple-get@4.0.1: simple-get@4.0.1:
resolution: {integrity: sha512-brv7p5WgH0jmQJr1ZDDfKDOSeWWg+OVypG99A/5vYGPqJ6pxiaHLy8nxtFjBA7oMa01ebA9gfh1uMCFqOuXxvA==} resolution: {integrity: sha512-brv7p5WgH0jmQJr1ZDDfKDOSeWWg+OVypG99A/5vYGPqJ6pxiaHLy8nxtFjBA7oMa01ebA9gfh1uMCFqOuXxvA==}
simple-git@3.30.0: simple-git@3.33.0:
resolution: {integrity: sha512-q6lxyDsCmEal/MEGhP1aVyQ3oxnagGlBDOVSIB4XUVLl1iZh0Pah6ebC9V4xBap/RfgP2WlI8EKs0WS0rMEJHg==} resolution: {integrity: sha512-D4V/tGC2sjsoNhoMybKyGoE+v8A60hRawKQ1iFRA1zwuDgGZCBJ4ByOzZ5J8joBbi4Oam0qiPH+GhzmSBwbJng==}
sisteransi@1.0.5: sisteransi@1.0.5:
resolution: {integrity: sha512-bLGGlR1QxBcynn2d5YmDX4MGjlZvy2MRBDRNHLJ8VI6l6+9FUiyTFNJ0IveOSP0bcXgVDPRcfGqA0pjaqUpfVg==} resolution: {integrity: sha512-bLGGlR1QxBcynn2d5YmDX4MGjlZvy2MRBDRNHLJ8VI6l6+9FUiyTFNJ0IveOSP0bcXgVDPRcfGqA0pjaqUpfVg==}
@@ -14526,7 +14530,7 @@ snapshots:
once: 1.4.0 once: 1.4.0
simple-concat: 1.0.1 simple-concat: 1.0.1
simple-git@3.30.0: simple-git@3.33.0:
dependencies: dependencies:
'@kwsites/file-exists': 1.1.1 '@kwsites/file-exists': 1.1.1
'@kwsites/promise-deferred': 1.1.1 '@kwsites/promise-deferred': 1.1.1