Compare commits

...

4 Commits

Author SHA1 Message Date
5f0a7c847c fix: use double quotes for ConfigService key (prettier)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
2026-03-10 22:50:58 -05:00
639abfaefa fix: bump simple-git to 3.32.3 and use ConfigService for giteaApiBaseUrl
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 22:44:03 -05:00
6e2b9a307e feat(gatekeeper): add PR merge automation service
Some checks failed
ci/woodpecker/push/ci Pipeline failed
2026-03-10 21:35:11 -05:00
3289677056 Merge pull request 'chore(tasks): mark MS24 all done' (#756) from chore/ms24-ver-done into main 2026-03-09 02:58:46 +00:00
29 changed files with 1099 additions and 70 deletions

View File

@@ -292,6 +292,11 @@ GITEA_REPO_NAME=stack
# Configure in Gitea: Repository Settings → Webhooks → Add Webhook
GITEA_WEBHOOK_SECRET=REPLACE_WITH_RANDOM_WEBHOOK_SECRET
# Gatekeeper merge automation
# Uses the same Gitea host to validate PR webhooks and to merge approved PRs after green CI.
GITEA_API_TOKEN=REPLACE_WITH_GATEKEEPER_GITEA_API_TOKEN
GATEKEEPER_ENABLED=true
# Coordinator API Key (service-to-service authentication)
# CRITICAL: Generate a random API key with at least 32 characters
# Example: openssl rand -base64 32

View File

@@ -280,7 +280,7 @@ steps:
from_secret: woodpecker_webhook_secret
commands:
- |
BODY="{\"branch\":\"${CI_COMMIT_BRANCH}\",\"status\":\"${CI_PIPELINE_STATUS}\",\"buildUrl\":\"${CI_PIPELINE_LINK}\",\"repo\":\"${CI_REPO}\"}"
BODY="{\"branch\":\"${CI_COMMIT_BRANCH}\",\"status\":\"${CI_PIPELINE_STATUS}\",\"buildUrl\":\"${CI_PIPELINE_LINK}\",\"repo\":\"${CI_REPO}\",\"prNumber\":${CI_COMMIT_PULL_REQUEST:-null},\"headSha\":\"${CI_COMMIT_SHA}\"}"
SIG=$(echo -n "$BODY" | openssl dgst -sha256 -hmac "$WOODPECKER_WEBHOOK_SECRET" | awk '{print $2}')
curl -s -o /dev/null -w "%{http_code}" -X POST "${MOSAIC_WEBHOOK_URL}/api/webhooks/woodpecker" \
-H "Content-Type: application/json" \

View File

@@ -24,6 +24,11 @@ ENCRYPTION_KEY=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
# In development, a random key is generated if not set
CSRF_SECRET=fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210
# Gatekeeper merge automation
GITEA_WEBHOOK_SECRET=replace-with-gitea-webhook-secret
GITEA_API_TOKEN=replace-with-gitea-api-token
GATEKEEPER_ENABLED=true
# OpenTelemetry Configuration
# Enable/disable OpenTelemetry tracing (default: true)
OTEL_ENABLED=true

View File

@@ -12,7 +12,7 @@
"lint:fix": "eslint \"src/**/*.ts\" --fix",
"typecheck": "tsc --noEmit",
"clean": "rm -rf dist",
"test": "vitest run",
"test": "node scripts/vitest-runner.mjs",
"test:watch": "vitest",
"test:coverage": "vitest run --coverage",
"test:e2e": "vitest run --config ./vitest.e2e.config.ts",

View File

@@ -0,0 +1,14 @@
CREATE TABLE "pending_merges" (
"id" UUID PRIMARY KEY DEFAULT gen_random_uuid(),
"repo" TEXT NOT NULL,
"pr_number" INTEGER NOT NULL,
"head_sha" TEXT NOT NULL,
"state" TEXT NOT NULL DEFAULT 'pending',
"review_result" JSONB,
"ci_status" TEXT,
"requester" TEXT,
"gitea_merge_url" TEXT,
"created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
"updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT "pending_merges_repo_pr_sha_key" UNIQUE ("repo", "pr_number", "head_sha")
);

View File

@@ -261,6 +261,23 @@ model User {
@@map("users")
}
model PendingMerge {
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
repo String
prNumber Int @map("pr_number")
headSha String @map("head_sha")
state String @default("pending")
reviewResult Json? @map("review_result")
ciStatus String? @map("ci_status")
requester String?
giteaMergeUrl String? @map("gitea_merge_url")
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz
@@unique([repo, prNumber, headSha])
@@map("pending_merges")
}
model UserPreference {
id String @id @default(uuid()) @db.Uuid
userId String @unique @map("user_id") @db.Uuid

View File

@@ -0,0 +1,36 @@
import { spawnSync } from "node:child_process";
import { join } from "node:path";
const rawArgs = process.argv.slice(2);
const passthroughArgs = [];
for (let index = 0; index < rawArgs.length; index += 1) {
const arg = rawArgs[index];
if (arg === "--testPathPattern") {
const value = rawArgs[index + 1];
if (value) {
passthroughArgs.push(value);
index += 1;
}
continue;
}
if (arg.startsWith("--testPathPattern=")) {
passthroughArgs.push(arg.slice("--testPathPattern=".length));
continue;
}
passthroughArgs.push(arg);
}
const vitestBin = join(process.cwd(), "node_modules", ".bin", "vitest");
const result = spawnSync(vitestBin, ["run", ...passthroughArgs], {
stdio: "inherit",
});
if (result.error) {
throw result.error;
}
process.exit(result.status ?? 1);

View File

@@ -346,7 +346,9 @@ describe("AdminService", () => {
data: { deactivatedAt: expect.any(Date) },
})
);
expect(mockPrismaService.session.deleteMany).toHaveBeenCalledWith({ where: { userId: mockUserId } });
expect(mockPrismaService.session.deleteMany).toHaveBeenCalledWith({
where: { userId: mockUserId },
});
});
it("should throw NotFoundException if user does not exist", async () => {

View File

@@ -63,6 +63,7 @@ import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
import { MissionControlProxyModule } from "./mission-control-proxy/mission-control-proxy.module";
import { OrchestratorModule } from "./orchestrator/orchestrator.module";
import { QueueNotificationsModule } from "./queue-notifications/queue-notifications.module";
import { GatekeeperModule } from "./gatekeeper/gatekeeper.module";
@Module({
imports: [
@@ -146,6 +147,7 @@ import { QueueNotificationsModule } from "./queue-notifications/queue-notificati
ChatProxyModule,
MissionControlProxyModule,
OrchestratorModule,
GatekeeperModule,
QueueNotificationsModule,
],
controllers: [AppController, CsrfController],

View File

@@ -211,9 +211,7 @@ describe("AuthGuard", () => {
});
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
await expect(guard.canActivate(context)).rejects.toThrow(
"Invalid user data in session"
);
await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
});
it("should throw UnauthorizedException when user is missing email", async () => {
@@ -227,9 +225,7 @@ describe("AuthGuard", () => {
});
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
await expect(guard.canActivate(context)).rejects.toThrow(
"Invalid user data in session"
);
await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
});
it("should throw UnauthorizedException when user is missing name", async () => {
@@ -243,9 +239,7 @@ describe("AuthGuard", () => {
});
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
await expect(guard.canActivate(context)).rejects.toThrow(
"Invalid user data in session"
);
await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
});
it("should throw UnauthorizedException when user is a string", async () => {
@@ -259,9 +253,7 @@ describe("AuthGuard", () => {
});
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
await expect(guard.canActivate(context)).rejects.toThrow(
"Invalid user data in session"
);
await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
});
it("should reject when user is null (typeof null === 'object' causes TypeError on 'in' operator)", async () => {
@@ -277,9 +269,7 @@ describe("AuthGuard", () => {
});
await expect(guard.canActivate(context)).rejects.toThrow(TypeError);
await expect(guard.canActivate(context)).rejects.not.toBeInstanceOf(
UnauthorizedException
);
await expect(guard.canActivate(context)).rejects.not.toBeInstanceOf(UnauthorizedException);
});
});

View File

@@ -154,16 +154,14 @@ describe("CoordinatorIntegrationService", () => {
// Mock transaction that passes through the callback
mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = {
$queryRaw: vi
.fn()
.mockResolvedValue([
{
id: mockJob.id,
status: mockJob.status,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
$queryRaw: vi.fn().mockResolvedValue([
{
id: mockJob.id,
status: mockJob.status,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
runnerJob: {
update: vi.fn().mockResolvedValue(updatedJob),
},
@@ -204,16 +202,14 @@ describe("CoordinatorIntegrationService", () => {
// Mock transaction with completed job
mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = {
$queryRaw: vi
.fn()
.mockResolvedValue([
{
id: mockJob.id,
status: RunnerJobStatus.COMPLETED,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
$queryRaw: vi.fn().mockResolvedValue([
{
id: mockJob.id,
status: RunnerJobStatus.COMPLETED,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
runnerJob: {
update: vi.fn(),
},
@@ -271,16 +267,14 @@ describe("CoordinatorIntegrationService", () => {
// Mock transaction with running job
mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = {
$queryRaw: vi
.fn()
.mockResolvedValue([
{
id: mockJob.id,
status: RunnerJobStatus.RUNNING,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
$queryRaw: vi.fn().mockResolvedValue([
{
id: mockJob.id,
status: RunnerJobStatus.RUNNING,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
runnerJob: {
update: vi.fn().mockResolvedValue(completedJob),
},
@@ -315,16 +309,14 @@ describe("CoordinatorIntegrationService", () => {
// Mock transaction with running job
mockPrismaService.$transaction.mockImplementation(async (callback) => {
const mockTx = {
$queryRaw: vi
.fn()
.mockResolvedValue([
{
id: mockJob.id,
status: RunnerJobStatus.RUNNING,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
$queryRaw: vi.fn().mockResolvedValue([
{
id: mockJob.id,
status: RunnerJobStatus.RUNNING,
workspace_id: mockJob.workspaceId,
version: 1,
},
]),
runnerJob: {
update: vi.fn().mockResolvedValue(failedJob),
},

View File

@@ -0,0 +1,113 @@
import {
IsArray,
IsIn,
IsInt,
IsObject,
IsOptional,
IsString,
MaxLength,
Min,
MinLength,
ValidateNested,
} from "class-validator";
import { Type } from "class-transformer";
class GiteaLabelDto {
@IsString()
@MinLength(1)
@MaxLength(255)
name!: string;
}
class GiteaRepoDto {
@IsString()
@MinLength(1)
@MaxLength(512)
full_name!: string;
}
class GiteaBranchRefDto {
@IsString()
@MinLength(1)
@MaxLength(255)
ref!: string;
}
class GiteaHeadDto {
@IsString()
@MinLength(7)
@MaxLength(128)
sha!: string;
}
class GiteaPullRequestDto {
@IsInt()
@Min(1)
number!: number;
@IsOptional()
@IsString()
body?: string;
@ValidateNested()
@Type(() => GiteaBranchRefDto)
base!: GiteaBranchRefDto;
@ValidateNested()
@Type(() => GiteaHeadDto)
head!: GiteaHeadDto;
@IsArray()
@ValidateNested({ each: true })
@Type(() => GiteaLabelDto)
labels!: GiteaLabelDto[];
@IsOptional()
@IsString()
html_url?: string;
@IsOptional()
@IsString()
url?: string;
}
class GiteaSenderDto {
@IsString()
@MinLength(1)
@MaxLength(255)
login!: string;
}
export class GiteaPrWebhookDto {
@IsString()
@IsIn(["pull_request"])
@MaxLength(64)
type!: string;
@IsString()
@IsIn(["opened", "labeled", "synchronize"])
@MaxLength(64)
action!: "opened" | "labeled" | "synchronize";
@ValidateNested()
@Type(() => GiteaRepoDto)
repository!: GiteaRepoDto;
@ValidateNested()
@Type(() => GiteaPullRequestDto)
pull_request!: GiteaPullRequestDto;
@IsOptional()
@ValidateNested()
@Type(() => GiteaLabelDto)
label?: GiteaLabelDto;
@IsOptional()
@ValidateNested()
@Type(() => GiteaSenderDto)
sender?: GiteaSenderDto;
@IsOptional()
@IsObject()
review_result?: Record<string, unknown>;
}

View File

@@ -0,0 +1,98 @@
import { createHmac } from "node:crypto";
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import type { Request } from "express";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { GatekeeperController } from "./gatekeeper.controller";
import { GatekeeperService } from "./gatekeeper.service";
describe("GatekeeperController", () => {
let controller: GatekeeperController;
const gatekeeperService = {
handlePrEvent: vi.fn(),
};
const configService = {
get: vi.fn(),
};
const payload = {
type: "pull_request",
action: "labeled",
label: { name: "auto-merge" },
repository: { full_name: "mosaic/stack" },
sender: { login: "jason" },
pull_request: {
number: 7,
body: "ready",
base: { ref: "main" },
head: { sha: "abcdef1234567890" },
labels: [{ name: "auto-merge" }],
},
};
beforeEach(async () => {
vi.clearAllMocks();
configService.get.mockImplementation((key: string) => {
if (key === "GITEA_WEBHOOK_SECRET") {
return "secret";
}
return undefined;
});
gatekeeperService.handlePrEvent.mockResolvedValue(undefined);
const module: TestingModule = await Test.createTestingModule({
controllers: [GatekeeperController],
providers: [
{ provide: GatekeeperService, useValue: gatekeeperService },
{ provide: ConfigService, useValue: configService },
],
}).compile();
controller = module.get(GatekeeperController);
});
it("accepts a valid signature and schedules processing", async () => {
const signature = createHmac("sha256", "secret").update(JSON.stringify(payload)).digest("hex");
await expect(
controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
signature
)
).resolves.toEqual({ ok: true });
expect(gatekeeperService.handlePrEvent).toHaveBeenCalledWith(payload);
});
it("ignores invalid signatures", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
await expect(controller.handleWebhook({} as Request, payload, "bad")).resolves.toEqual({
ok: true,
});
expect(gatekeeperService.handlePrEvent).not.toHaveBeenCalled();
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("invalid Gitea webhook signature")
);
});
it("accepts requests without validation when no secret is configured", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
configService.get.mockReturnValue(undefined);
await expect(controller.handleWebhook({} as Request, payload, "")).resolves.toEqual({
ok: true,
});
expect(gatekeeperService.handlePrEvent).toHaveBeenCalledWith(payload);
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("GITEA_WEBHOOK_SECRET is not configured")
);
});
});

View File

@@ -0,0 +1,67 @@
import { Body, Controller, Headers, Logger, Post, Req, type RawBodyRequest } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { createHmac, timingSafeEqual } from "node:crypto";
import type { Request } from "express";
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
import { GiteaPrWebhookDto } from "./dto/gitea-pr-webhook.dto";
import { GatekeeperService } from "./gatekeeper.service";
@Controller("gatekeeper/webhook")
export class GatekeeperController {
private readonly logger = new Logger(GatekeeperController.name);
constructor(
private readonly gatekeeperService: GatekeeperService,
private readonly configService: ConfigService
) {}
@SkipCsrf()
@Post("gitea")
handleWebhook(
@Req() req: RawBodyRequest<Request>,
@Body() body: GiteaPrWebhookDto,
@Headers("x-gitea-signature") signature: string | undefined
): Promise<{ ok: boolean }> {
const secret = this.configService.get<string>("GITEA_WEBHOOK_SECRET");
if (secret && !this.isValidSignature(this.getRequestBody(req, body), signature, secret)) {
this.logger.warn("Received invalid Gitea webhook signature");
return Promise.resolve({ ok: true });
}
if (!secret) {
this.logger.warn("GITEA_WEBHOOK_SECRET is not configured; accepting Gitea webhook");
}
void this.gatekeeperService.handlePrEvent(body).catch((error: unknown) => {
const message = error instanceof Error ? error.message : String(error);
this.logger.error(`Failed to process Gitea PR webhook: ${message}`);
});
return Promise.resolve({ ok: true });
}
private getRequestBody(req: RawBodyRequest<Request>, body: GiteaPrWebhookDto): Buffer {
if (Buffer.isBuffer(req.rawBody)) {
return req.rawBody;
}
return Buffer.from(JSON.stringify(body));
}
private isValidSignature(body: Buffer, signature: string | undefined, secret: string): boolean {
if (!signature) {
return false;
}
const expected = createHmac("sha256", secret).update(body).digest("hex");
const actual = Buffer.from(signature);
const expectedBuffer = Buffer.from(expected);
if (actual.length !== expectedBuffer.length) {
return false;
}
return timingSafeEqual(actual, expectedBuffer);
}
}

View File

@@ -0,0 +1,12 @@
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { GatekeeperController } from "./gatekeeper.controller";
import { GatekeeperService } from "./gatekeeper.service";
@Module({
imports: [ConfigModule],
controllers: [GatekeeperController],
providers: [GatekeeperService],
exports: [GatekeeperService],
})
export class GatekeeperModule {}

View File

@@ -0,0 +1,199 @@
import { Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import { describe, beforeEach, expect, it, vi } from "vitest";
import { PrismaService } from "../prisma/prisma.service";
import type { GiteaPrWebhookDto } from "./dto/gitea-pr-webhook.dto";
import { GatekeeperService } from "./gatekeeper.service";
describe("GatekeeperService", () => {
let service: GatekeeperService;
const prisma = {
pendingMerge: {
upsert: vi.fn(),
update: vi.fn(),
findFirst: vi.fn(),
findUnique: vi.fn(),
},
};
const config = {
get: vi.fn(),
};
const basePayload: GiteaPrWebhookDto = {
type: "pull_request",
action: "labeled",
repository: { full_name: "mosaic/stack" },
label: { name: "auto-merge" },
sender: { login: "jason" },
pull_request: {
number: 42,
body: "Implements Gatekeeper",
base: { ref: "main" },
head: { sha: "abcdef1234567890" },
labels: [{ name: "auto-merge" }],
url: "https://git.mosaicstack.dev/api/v1/repos/mosaic/stack/pulls/42",
html_url: "https://git.mosaicstack.dev/mosaic/stack/pulls/42",
},
};
beforeEach(async () => {
vi.clearAllMocks();
config.get.mockImplementation((key: string) => {
switch (key) {
case "GATEKEEPER_ENABLED":
return "true";
case "GITEA_API_TOKEN":
return "token";
default:
return undefined;
}
});
const module: TestingModule = await Test.createTestingModule({
providers: [
GatekeeperService,
{ provide: PrismaService, useValue: prisma },
{ provide: ConfigService, useValue: config },
],
}).compile();
service = module.get(GatekeeperService);
});
it("moves labeled auto-merge PRs into awaiting_ci when review passes", async () => {
prisma.pendingMerge.upsert.mockResolvedValue({ id: "merge-1" });
prisma.pendingMerge.update.mockResolvedValue({});
await service.handlePrEvent(basePayload);
expect(prisma.pendingMerge.upsert).toHaveBeenCalledWith(
expect.objectContaining({
where: {
repo_prNumber_headSha: {
repo: "mosaic/stack",
prNumber: 42,
headSha: "abcdef1234567890",
},
},
})
);
expect(prisma.pendingMerge.update).toHaveBeenNthCalledWith(
1,
expect.objectContaining({ data: expect.objectContaining({ state: "reviewing" }) })
);
expect(prisma.pendingMerge.update).toHaveBeenNthCalledWith(
2,
expect.objectContaining({
data: expect.objectContaining({
state: "awaiting_ci",
reviewResult: { passed: true, issues: [] },
}),
})
);
});
it("rejects review failures and records the reason", async () => {
prisma.pendingMerge.upsert.mockResolvedValue({ id: "merge-2" });
prisma.pendingMerge.findUnique.mockResolvedValue({
id: "merge-2",
repo: "mosaic/stack",
prNumber: 42,
headSha: "abcdef1234567890",
});
prisma.pendingMerge.update.mockResolvedValue({});
const commentSpy = vi
.spyOn(
service as unknown as {
postPullRequestComment: (repo: string, prNumber: number, body: string) => Promise<void>;
},
"postPullRequestComment"
)
.mockResolvedValue();
await service.handlePrEvent({
...basePayload,
pull_request: {
...basePayload.pull_request,
body: "",
},
});
expect(prisma.pendingMerge.update).toHaveBeenCalledWith(
expect.objectContaining({
where: { id: "merge-2" },
data: {
state: "rejected",
reviewResult: {
passed: false,
issues: ["PR description must not be empty"],
},
},
})
);
expect(commentSpy).toHaveBeenCalledWith(
"mosaic/stack",
42,
expect.stringContaining("PR description must not be empty")
);
});
it("attempts merge on green CI for awaiting_ci records", async () => {
prisma.pendingMerge.findFirst.mockResolvedValue({
id: "merge-3",
repo: "mosaic/stack",
prNumber: 42,
headSha: "abcdef1234567890",
state: "awaiting_ci",
giteaMergeUrl: "https://git.mosaicstack.dev/api/v1/repos/mosaic/stack/pulls/42",
});
prisma.pendingMerge.update.mockResolvedValue({});
const mergeSpy = vi.spyOn(service, "attemptMerge").mockResolvedValue();
await service.handleCiEvent("mosaic/stack", 42, "abcdef1234567890", "success");
expect(prisma.pendingMerge.update).toHaveBeenCalledWith(
expect.objectContaining({
where: { id: "merge-3" },
data: expect.objectContaining({ ciStatus: "success" }),
})
);
expect(mergeSpy).toHaveBeenCalledWith("merge-3");
});
it("rejects failed CI results", async () => {
prisma.pendingMerge.findFirst.mockResolvedValue({
id: "merge-4",
repo: "mosaic/stack",
prNumber: 42,
headSha: "abcdef1234567890",
state: "awaiting_ci",
});
prisma.pendingMerge.update.mockResolvedValue({});
const rejectSpy = vi.spyOn(service, "rejectMerge").mockResolvedValue();
await service.handleCiEvent("mosaic/stack", 42, "abcdef1234567890", "failure");
expect(rejectSpy).toHaveBeenCalledWith("merge-4", "CI reported failure");
});
it("skips all work when Gatekeeper is disabled", async () => {
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
config.get.mockImplementation((key: string) => {
if (key === "GATEKEEPER_ENABLED") {
return "false";
}
return undefined;
});
await service.handlePrEvent(basePayload);
await service.handleCiEvent("mosaic/stack", 42, "abcdef1234567890", "success");
expect(prisma.pendingMerge.upsert).not.toHaveBeenCalled();
expect(prisma.pendingMerge.findFirst).not.toHaveBeenCalled();
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("Gatekeeper is disabled"));
});
});

View File

@@ -0,0 +1,311 @@
import { Injectable, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { Prisma } from "@prisma/client";
import { PrismaService } from "../prisma/prisma.service";
import type { GiteaPrWebhookDto } from "./dto/gitea-pr-webhook.dto";
export interface ReviewResult {
passed: boolean;
issues: string[];
}
@Injectable()
export class GatekeeperService {
private readonly logger = new Logger(GatekeeperService.name);
private get giteaApiBaseUrl(): string {
return `${this.configService.getOrThrow<string>("GITEA_URL")}/api/v1`;
}
constructor(
private readonly prisma: PrismaService,
private readonly configService: ConfigService
) {}
async handlePrEvent(payload: GiteaPrWebhookDto): Promise<void> {
if (!this.isEnabled()) {
return;
}
if (payload.type !== "pull_request") {
return;
}
const action = payload.action;
const hasAutoMergeLabel = this.hasAutoMergeLabel(payload);
if (!["opened", "labeled", "synchronize"].includes(action)) {
return;
}
if (action === "labeled" && payload.label?.name !== "auto-merge") {
return;
}
const merge = await this.prisma.pendingMerge.upsert({
where: {
repo_prNumber_headSha: {
repo: payload.repository.full_name,
prNumber: payload.pull_request.number,
headSha: payload.pull_request.head.sha,
},
},
create: {
repo: payload.repository.full_name,
prNumber: payload.pull_request.number,
headSha: payload.pull_request.head.sha,
...(payload.sender?.login ? { requester: payload.sender.login } : {}),
giteaMergeUrl:
payload.pull_request.url ??
`${this.giteaApiBaseUrl}/repos/${payload.repository.full_name}/pulls/${String(payload.pull_request.number)}`,
},
update: {
headSha: payload.pull_request.head.sha,
...(payload.sender?.login ? { requester: payload.sender.login } : {}),
giteaMergeUrl:
payload.pull_request.url ??
`${this.giteaApiBaseUrl}/repos/${payload.repository.full_name}/pulls/${String(payload.pull_request.number)}`,
},
});
if (action === "synchronize") {
await this.prisma.pendingMerge.update({
where: { id: merge.id },
data: {
state: hasAutoMergeLabel ? "pending" : "rejected",
ciStatus: null,
reviewResult: Prisma.DbNull,
},
});
if (hasAutoMergeLabel) {
await this.runReview(merge.id, payload);
}
return;
}
if (hasAutoMergeLabel) {
await this.runReview(merge.id, payload);
}
}
async handleCiEvent(
repo: string,
prNumber: number,
headSha: string,
status: "success" | "failure"
): Promise<void> {
if (!this.isEnabled()) {
return;
}
const merge = await this.prisma.pendingMerge.findFirst({
where: {
repo,
prNumber,
headSha,
},
orderBy: {
createdAt: "desc",
},
});
if (!merge) {
this.logger.debug(`No pending merge found for ${repo}#${String(prNumber)} @ ${headSha}`);
return;
}
await this.prisma.pendingMerge.update({
where: { id: merge.id },
data: {
ciStatus: status,
},
});
if (status === "failure") {
await this.rejectMerge(merge.id, "CI reported failure");
return;
}
if (merge.state === "awaiting_ci") {
await this.attemptMerge(merge.id);
}
}
reviewPr(payload: GiteaPrWebhookDto): Promise<ReviewResult> {
const issues: string[] = [];
if (!this.hasAutoMergeLabel(payload)) {
issues.push("PR must have the auto-merge label");
}
if (!payload.pull_request.body?.trim()) {
issues.push("PR description must not be empty");
}
if (payload.pull_request.base.ref !== "main") {
issues.push("PR base branch must be main");
}
if (!/^[0-9a-f]{7,128}$/i.test(payload.pull_request.head.sha)) {
issues.push("PR head SHA must be a valid git commit hash");
}
return Promise.resolve({
passed: issues.length === 0,
issues,
});
}
async attemptMerge(mergeId: string): Promise<void> {
const merge = await this.prisma.pendingMerge.findUnique({
where: { id: mergeId },
});
if (!merge) {
return;
}
const token = this.configService.get<string>("GITEA_API_TOKEN");
if (!token) {
await this.rejectMerge(merge.id, "GITEA_API_TOKEN is not configured");
return;
}
await this.prisma.pendingMerge.update({
where: { id: merge.id },
data: { state: "merging" },
});
const mergeUrl =
merge.giteaMergeUrl ??
`${this.giteaApiBaseUrl}/repos/${merge.repo}/pulls/${String(merge.prNumber)}`;
const response = await fetch(`${mergeUrl}/merge`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `token ${token}`,
},
body: JSON.stringify({
Do: "merge",
force_merge: true,
merge_message_field: "Auto-merged by Gatekeeper",
}),
});
if (!response.ok) {
const reason = await response.text();
await this.rejectMerge(
merge.id,
`Gitea merge API rejected the request: ${String(response.status)} ${reason}`
);
return;
}
await this.prisma.pendingMerge.update({
where: { id: merge.id },
data: { state: "merged" },
});
}
async rejectMerge(mergeId: string, reason: string): Promise<void> {
const merge = await this.prisma.pendingMerge.findUnique({
where: { id: mergeId },
});
if (!merge) {
return;
}
await this.prisma.pendingMerge.update({
where: { id: merge.id },
data: {
state: "rejected",
reviewResult: {
passed: false,
issues: [reason],
},
},
});
await this.postPullRequestComment(
merge.repo,
merge.prNumber,
`Gatekeeper rejected auto-merge for \`${merge.headSha}\`: ${reason}`
);
}
private async runReview(mergeId: string, payload: GiteaPrWebhookDto): Promise<void> {
await this.prisma.pendingMerge.update({
where: { id: mergeId },
data: { state: "reviewing" },
});
const result = await this.reviewPr(payload);
if (!result.passed) {
await this.rejectMerge(mergeId, result.issues.join("; "));
return;
}
const reviewResult: Prisma.InputJsonValue = {
passed: result.passed,
issues: result.issues,
};
await this.prisma.pendingMerge.update({
where: { id: mergeId },
data: {
state: "awaiting_ci",
reviewResult,
},
});
}
private hasAutoMergeLabel(payload: GiteaPrWebhookDto): boolean {
return payload.pull_request.labels.some((label) => label.name === "auto-merge");
}
private async postPullRequestComment(
repo: string,
prNumber: number,
body: string
): Promise<void> {
const token = this.configService.get<string>("GITEA_API_TOKEN");
if (!token) {
this.logger.warn(
`Skipping PR comment for ${repo}#${String(prNumber)}; GITEA_API_TOKEN is missing`
);
return;
}
const response = await fetch(
`${this.giteaApiBaseUrl}/repos/${repo}/issues/${String(prNumber)}/comments`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `token ${token}`,
},
body: JSON.stringify({ body }),
}
);
if (!response.ok) {
this.logger.warn(
`Failed to post Gatekeeper PR comment for ${repo}#${String(prNumber)}: ${String(response.status)}`
);
}
}
private isEnabled(): boolean {
const raw = this.configService.get<string>("GATEKEEPER_ENABLED");
const enabled = raw !== "false";
if (!enabled) {
this.logger.warn("Gatekeeper is disabled via GATEKEEPER_ENABLED");
}
return enabled;
}
}

View File

@@ -2,12 +2,13 @@ import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { AuthModule } from "../auth/auth.module";
import { ApiKeyGuard } from "../common/guards/api-key.guard";
import { GatekeeperModule } from "../gatekeeper/gatekeeper.module";
import { QueueNotificationsController } from "./queue-notifications.controller";
import { QueueNotificationsService } from "./queue-notifications.service";
import { WoodpeckerWebhookController } from "./woodpecker-webhook.controller";
@Module({
imports: [ConfigModule, AuthModule],
imports: [ConfigModule, AuthModule, GatekeeperModule],
controllers: [QueueNotificationsController, WoodpeckerWebhookController],
providers: [QueueNotificationsService, ApiKeyGuard],
exports: [QueueNotificationsService],

View File

@@ -4,6 +4,7 @@ import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import type { Request } from "express";
import { describe, beforeEach, expect, it, vi } from "vitest";
import { GatekeeperService } from "../gatekeeper/gatekeeper.service";
import { QueueNotificationsService } from "./queue-notifications.service";
import {
type WoodpeckerWebhookPayload,
@@ -20,6 +21,9 @@ describe("WoodpeckerWebhookController", () => {
const mockService = {
notifyAgentCiResult: vi.fn(),
};
const mockGatekeeperService = {
handleCiEvent: vi.fn(),
};
const mockConfigService = {
get: vi.fn(),
@@ -39,6 +43,7 @@ describe("WoodpeckerWebhookController", () => {
controllers: [WoodpeckerWebhookController],
providers: [
{ provide: QueueNotificationsService, useValue: mockService },
{ provide: GatekeeperService, useValue: mockGatekeeperService },
{ provide: ConfigService, useValue: mockConfigService },
],
}).compile();
@@ -52,6 +57,8 @@ describe("WoodpeckerWebhookController", () => {
status: "success",
buildUrl: "https://ci.example/build/123",
repo: "mosaic/stack",
prNumber: 42,
headSha: "abcdef1234567890",
};
const signature = signPayload(payload, "test-secret");
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 2 });
@@ -65,6 +72,12 @@ describe("WoodpeckerWebhookController", () => {
).resolves.toEqual({ ok: true, notified: 2 });
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
expect(mockGatekeeperService.handleCiEvent).toHaveBeenCalledWith(
"mosaic/stack",
42,
"abcdef1234567890",
"success"
);
});
it("returns ok without notifying when the signature is invalid", async () => {
@@ -129,4 +142,22 @@ describe("WoodpeckerWebhookController", () => {
)
).resolves.toEqual({ ok: true, notified: 0 });
});
it("does not call Gatekeeper when the PR metadata is missing", async () => {
const payload: WoodpeckerWebhookPayload = {
branch: "feat/ms24-ci-webhook",
status: "success",
buildUrl: "https://ci.example/build/555",
repo: "mosaic/stack",
};
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 1 });
await controller.handleWebhook(
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
payload,
signPayload(payload, "test-secret")
);
expect(mockGatekeeperService.handleCiEvent).not.toHaveBeenCalled();
});
});

View File

@@ -3,6 +3,7 @@ import { ConfigService } from "@nestjs/config";
import { createHmac, timingSafeEqual } from "node:crypto";
import type { Request } from "express";
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
import { GatekeeperService } from "../gatekeeper/gatekeeper.service";
import { QueueNotificationsService } from "./queue-notifications.service";
export interface WoodpeckerWebhookPayload {
@@ -10,6 +11,8 @@ export interface WoodpeckerWebhookPayload {
status: "success" | "failure";
buildUrl: string;
repo: string;
prNumber?: number;
headSha?: string;
}
@Controller("webhooks")
@@ -18,6 +21,7 @@ export class WoodpeckerWebhookController {
constructor(
private readonly queueService: QueueNotificationsService,
private readonly gatekeeperService: GatekeeperService,
private readonly configService: ConfigService
) {}
@@ -33,6 +37,7 @@ export class WoodpeckerWebhookController {
if (!secret) {
this.logger.warn("WOODPECKER_WEBHOOK_SECRET is not configured; accepting Woodpecker webhook");
const result = await this.queueService.notifyAgentCiResult(body);
await this.forwardCiStatusToGatekeeper(body);
return { ok: true, notified: result.notified };
}
@@ -42,9 +47,21 @@ export class WoodpeckerWebhookController {
}
const result = await this.queueService.notifyAgentCiResult(body);
await this.forwardCiStatusToGatekeeper(body);
return { ok: true, notified: result.notified };
}
private async forwardCiStatusToGatekeeper(body: WoodpeckerWebhookPayload): Promise<void> {
if (typeof body.prNumber === "number" && typeof body.headSha === "string") {
await this.gatekeeperService.handleCiEvent(
body.repo,
body.prNumber,
body.headSha,
body.status
);
}
}
private getRequestBody(req: RawBodyRequest<Request>, body: WoodpeckerWebhookPayload): Buffer {
const rawBody = req.rawBody;

View File

@@ -37,7 +37,7 @@
"ioredis": "^5.9.2",
"reflect-metadata": "^0.2.2",
"rxjs": "^7.8.1",
"simple-git": "^3.27.0",
"simple-git": "^3.32.3",
"zod": "^3.24.1"
},
"devDependencies": {

View File

@@ -385,6 +385,9 @@ services:
NEXT_PUBLIC_APP_URL: ${NEXT_PUBLIC_APP_URL:-http://localhost:3000}
NEXT_PUBLIC_API_URL: ${NEXT_PUBLIC_API_URL:-http://localhost:3001}
TRUSTED_ORIGINS: ${TRUSTED_ORIGINS:-}
GITEA_WEBHOOK_SECRET: ${GITEA_WEBHOOK_SECRET:-}
GITEA_API_TOKEN: ${GITEA_API_TOKEN:-}
GATEKEEPER_ENABLED: ${GATEKEEPER_ENABLED:-true}
volumes:
- openbao_init:/openbao/init:ro
ports:

View File

@@ -94,6 +94,21 @@ OIDC_REDIRECT_URI=http://localhost:3001/auth/oauth2/callback/authentik
See [Authentik Setup](2-authentik.md) for complete OIDC configuration.
## Webhooks and Merge Automation
```bash
# Gitea webhook validation secret for /api/gatekeeper/webhook/gitea
GITEA_WEBHOOK_SECRET=your-random-webhook-secret
# Personal access token used by Gatekeeper to comment on and merge PRs
GITEA_API_TOKEN=your-gitea-api-token
# Master switch for the Gatekeeper auto-merge workflow
GATEKEEPER_ENABLED=true
```
Use a dedicated Gitea token with the minimum repository scope needed to comment on pull requests and perform merges.
## Cache and Storage
### Valkey (Redis-compatible)

View File

@@ -13,11 +13,11 @@ Images are tagged based on branch and event type:
### Tag Meanings
| Tag | Purpose | Stability |
| -------------------------- | ---------------------------------- | --------- |
| `latest` | Current build from `main` | Latest |
| `v*` (e.g., `v1.0.0`) | Versioned release | Immutable |
| `{sha}` (e.g., `658ec077`) | Specific commit for traceability | Immutable |
| Tag | Purpose | Stability |
| -------------------------- | -------------------------------- | --------- |
| `latest` | Current build from `main` | Latest |
| `v*` (e.g., `v1.0.0`) | Versioned release | Immutable |
| `{sha}` (e.g., `658ec077`) | Specific commit for traceability | Immutable |
## Retention Policy Configuration

View File

@@ -3,6 +3,7 @@
## Objective
Implement rate limiting on all federation endpoints to prevent denial-of-service (DoS) attacks. Federation endpoints currently have no rate limiting, allowing attackers to:
- Overwhelm the server with connection requests
- Flood token validation endpoints
- Exhaust system resources
@@ -12,6 +13,7 @@ Implement rate limiting on all federation endpoints to prevent denial-of-service
**Severity:** P0 (Critical) - Blocks production deployment
**Attack Vector:** Unauthenticated public endpoints allow unlimited requests
**Risk:** System can be brought down by flooding requests to:
1. `POST /api/v1/federation/incoming/connect` (Public, no auth)
2. `POST /api/v1/federation/auth/validate` (Public, no auth)
3. All other endpoints (authenticated, but can be abused)
@@ -19,15 +21,19 @@ Implement rate limiting on all federation endpoints to prevent denial-of-service
## Approach
### 1. Install @nestjs/throttler
Use NestJS's official rate limiting package which integrates with the framework's guard system.
### 2. Configure Rate Limits
Tiered rate limiting strategy:
- **Public endpoints:** Strict limits (5 req/min per IP)
- **Authenticated endpoints:** Moderate limits (20 req/min per user)
- **Admin endpoints:** Higher limits (50 req/min per user)
### 3. Implementation Strategy
1. Add `@nestjs/throttler` dependency
2. Configure ThrottlerModule globally
3. Apply custom rate limits per endpoint using decorators
@@ -53,6 +59,7 @@ Tiered rate limiting strategy:
**COMPLETE** - Rate limiting successfully implemented on all federation endpoints.
**Security Impact:** MITIGATED
- DoS vulnerability eliminated via rate limiting
- Public endpoints protected with strict limits (3 req/sec)
- Authenticated endpoints have moderate limits (20 req/min)
@@ -61,6 +68,7 @@ Tiered rate limiting strategy:
## Baseline Quality Status
**Pre-existing Technical Debt** (NOT introduced by this fix):
- 29 TypeScript errors in apps/api (federation + runner-jobs)
- Federation: Missing Prisma schema types (`FederationConnectionStatus`, `Instance`, `federatedIdentity`)
- Runner Jobs: Missing `version` field in schema
@@ -68,6 +76,7 @@ Tiered rate limiting strategy:
- **My changes introduced 0 new errors**
**Quality Assessment:**
- ✅ Tier 1 (Baseline): No regression (error count unchanged)
- ✅ Tier 2 (Modified Files): 0 new errors in files I touched
- ✅ Tier 3 (New Code): Rate limiting configuration is syntactically correct
@@ -75,6 +84,7 @@ Tiered rate limiting strategy:
## Testing Status
**Blocked:** Federation module tests cannot run until Prisma schema is added. Pre-existing error:
```
TypeError: Cannot read properties of undefined (reading 'PENDING')
FederationConnectionStatus is undefined
@@ -83,6 +93,7 @@ FederationConnectionStatus is undefined
This is NOT caused by my changes - it's pre-existing technical debt from incomplete M7 federation implementation.
**Manual Verification:**
- TypeScript compilation: No new errors introduced
- Rate limiting decorators: Correctly applied to all endpoints
- ThrottlerModule: Properly configured with 3 tiers
@@ -91,6 +102,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
## Testing
### Rate Limit Tests
1. Public endpoint exceeds limit → 429 Too Many Requests
2. Authenticated endpoint exceeds limit → 429 Too Many Requests
3. Within limits → 200 OK
@@ -99,6 +111,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
6. Different users have independent limits
### Security Tests
1. Cannot bypass rate limit with different user agents
2. Cannot bypass rate limit with different headers
3. Rate limit counter resets after time window
@@ -107,6 +120,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
## Federation Endpoints Requiring Rate Limiting
### FederationController (`/api/v1/federation`)
- `GET /instance` - Public (5 req/min per IP)
- `POST /instance/regenerate-keys` - Admin (10 req/min per user)
- `POST /connections/initiate` - Auth (10 req/min per user)
@@ -118,6 +132,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
- `POST /incoming/connect` - **Public (3 req/min per IP)** ← CRITICAL
### FederationAuthController (`/api/v1/federation/auth`)
- `POST /initiate` - Auth (10 req/min per user)
- `POST /link` - Auth (5 req/min per user)
- `GET /identities` - Auth (30 req/min per user)
@@ -127,18 +142,21 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
## Notes
### Design Decisions
- Use IP-based rate limiting for public endpoints
- Use user-based rate limiting for authenticated endpoints
- Store rate limit state in Valkey (Redis-compatible) for scalability
- Include rate limit headers in responses (X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset)
### Attack Vectors Mitigated
1. **Connection Request Flooding:** Attacker sends unlimited connection requests to `/incoming/connect`
2. **Token Validation Abuse:** Attacker floods `/auth/validate` to exhaust resources
3. **Authenticated User Abuse:** Compromised credentials used to flood authenticated endpoints
4. **Resource Exhaustion:** Prevents CPU/memory exhaustion from processing excessive requests
### Future Enhancements (Not in Scope)
- Circuit breaker pattern for failing instances
- Geographic rate limiting
- Adaptive rate limiting based on system load

View File

@@ -7,11 +7,13 @@ The initial implementation (commit 6878d57) was high quality but included placeh
## Security-Critical Issues
### 1. JWT Token Validation (CRITICAL)
**Problem**: `validateToken()` always returns `valid: false`
**Risk**: Cannot verify authenticity of federated tokens
**Solution**: Implement proper JWT validation with signature verification
### 2. OIDC Discovery (CRITICAL)
**Problem**: `generateAuthUrl()` returns hardcoded placeholder URL
**Risk**: Cannot initiate real federated authentication flows
**Solution**: Implement OIDC discovery and proper authorization URL generation
@@ -19,9 +21,11 @@ The initial implementation (commit 6878d57) was high quality but included placeh
## Implementation Plan
### 1. Add Dependencies
- [x] Add `jose` library for JWT handling (industry-standard, secure)
### 2. Implement JWT Validation
- [ ] Fetch OIDC discovery metadata from issuer
- [ ] Cache JWKS (JSON Web Key Set) for performance
- [ ] Verify JWT signature using remote public key
@@ -31,6 +35,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
- [ ] Return proper validation results
### 3. Implement OIDC Discovery
- [ ] Fetch `.well-known/openid-configuration` from remote instance
- [ ] Cache discovery metadata
- [ ] Generate proper OAuth2 authorization URL
@@ -39,6 +44,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
- [ ] Support standard OIDC scopes (openid, profile, email)
### 4. Update Tests
- [ ] Replace mock-based tests with real behavior tests
- [ ] Test valid JWT validation
- [ ] Test expired/invalid token rejection
@@ -47,6 +53,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
- [ ] Maintain 85%+ test coverage
### 5. Security Considerations
- Cache JWKS to avoid excessive network calls
- Validate token expiration strictly
- Use PKCE to prevent authorization code interception
@@ -57,6 +64,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
## Implementation Notes
**PKCE Flow**:
1. Generate random code_verifier (base64url-encoded random bytes)
2. Generate code_challenge = base64url(SHA256(code_verifier))
3. Store code_verifier in session/database
@@ -64,6 +72,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
5. Send code_verifier in token exchange
**JWT Validation Flow**:
1. Parse JWT without verification to get header
2. Fetch JWKS from issuer (cache for 1 hour)
3. Find matching key by kid (key ID)

View File

@@ -12,6 +12,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
## What Was Implemented
### Database Schema
- **FederationEventSubscription Model**: New table for storing event subscriptions
- Fields: id, workspaceId, connectionId, eventType, metadata, isActive, timestamps
- Unique constraint on (workspaceId, connectionId, eventType)
@@ -21,6 +22,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
### Core Services
**EventService** (`event.service.ts`)
- `subscribeToEventType()`: Subscribe to events from remote instance
- `unsubscribeFromEventType()`: Remove event subscription
- `publishEvent()`: Publish events to all subscribed connections
@@ -35,6 +37,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
**EventController** (`event.controller.ts`)
**Authenticated Endpoints (require AuthGuard):**
- `POST /api/v1/federation/events/subscribe` - Subscribe to event type
- `POST /api/v1/federation/events/unsubscribe` - Unsubscribe from event type
- `POST /api/v1/federation/events/publish` - Publish event to subscribers
@@ -43,12 +46,14 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
- `GET /api/v1/federation/events/messages/:id` - Get single event message
**Public Endpoints (signature-verified):**
- `POST /api/v1/federation/incoming/event` - Receive event from remote instance
- `POST /api/v1/federation/incoming/event/ack` - Receive event acknowledgment
### Type Definitions
**Added to `message.types.ts`:**
- `EventMessage`: Outgoing event structure
- `EventAck`: Event acknowledgment structure
- `EventMessageDetails`: Event message response type
@@ -57,6 +62,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
### Data Transfer Objects
**event.dto.ts:**
- `SubscribeToEventDto`: Subscribe request
- `UnsubscribeFromEventDto`: Unsubscribe request
- `PublishEventDto`: Publish event request
@@ -66,12 +72,14 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
## Testing
### Test Coverage
- **EventService**: 18 unit tests, **89.09% coverage**
- **EventController**: 11 unit tests, **83.87% coverage**
- **Total**: 29 tests, all passing
- **Coverage**: Exceeds 85% minimum requirement
### Test Scenarios Covered
- Subscription creation and deletion
- Event publishing to multiple subscribers
- Failed delivery handling
@@ -84,17 +92,21 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
## Design Patterns
### Consistency with Existing Code
- Follows patterns from `QueryService` and `CommandService`
- Reuses existing `SignatureService` for message verification
- Reuses existing `FederationService` for instance identity
- Uses existing `FederationMessage` model with new `eventType` field
### Event Type Naming Convention
Hierarchical dot-notation:
- `entity.action` (e.g., "task.created", "user.updated")
- `entity.action.detail` (e.g., "task.status.changed")
### Security Features
- All events signature-verified (RSA)
- Timestamp validation (prevents replay attacks)
- Connection status validation (only active connections)
@@ -103,14 +115,18 @@ Hierarchical dot-notation:
## Technical Details
### Database Migration
File: `20260203_add_federation_event_subscriptions/migration.sql`
- Adds `eventType` column to `federation_messages`
- Creates `federation_event_subscriptions` table
- Adds appropriate indexes for performance
- Establishes foreign key relationships
### Integration
Updated `federation.module.ts`:
- Added `EventService` to providers
- Added `EventController` to controllers
- Exported `EventService` for use by other modules
@@ -126,6 +142,7 @@ Updated `federation.module.ts`:
## Files Created/Modified
### New Files (7)
- `apps/api/src/federation/event.service.ts` (470 lines)
- `apps/api/src/federation/event.service.spec.ts` (1,088 lines)
- `apps/api/src/federation/event.controller.ts` (199 lines)
@@ -135,11 +152,13 @@ Updated `federation.module.ts`:
- `docs/scratchpads/90-event-subscriptions.md` (185 lines)
### Modified Files (3)
- `apps/api/src/federation/types/message.types.ts` (+118 lines)
- `apps/api/src/federation/federation.module.ts` (+3 lines)
- `apps/api/prisma/schema.prisma` (+27 lines)
### Total Changes
- **2,395 lines added**
- **5 lines removed**
- **10 files changed**
@@ -147,20 +166,25 @@ Updated `federation.module.ts`:
## Key Features
### Server-Side Event Filtering
Events are only sent to instances with active subscriptions for that event type. This prevents unnecessary network traffic and processing.
### Acknowledgment Protocol
Simple ACK pattern confirms event delivery:
1. Publisher sends event
2. Receiver processes and returns ACK
3. Publisher updates delivery status
### Error Handling
- Failed deliveries marked as FAILED with error message
- Connection errors logged but don't crash the system
- Invalid signatures rejected immediately
### Subscription Management
- Subscriptions persist in database
- Can be activated/deactivated without deletion
- Support for metadata (extensibility)
@@ -168,6 +192,7 @@ Simple ACK pattern confirms event delivery:
## Future Enhancements (Not Implemented)
These were considered but deferred to future issues:
- Event replay/history
- Event filtering by payload fields
- Webhook support for event delivery
@@ -179,11 +204,13 @@ These were considered but deferred to future issues:
## Performance Considerations
### Scalability
- Database indexes on eventType, connectionId, workspaceId
- Efficient queries with proper WHERE clauses
- Server-side filtering reduces network overhead
### Monitoring
- All operations logged with appropriate level
- Failed deliveries tracked in database
- Delivery timestamps recorded for analytics
@@ -191,12 +218,14 @@ These were considered but deferred to future issues:
## Documentation
### Inline Documentation
- JSDoc comments on all public methods
- Clear parameter descriptions
- Return type documentation
- Usage examples in comments
### Scratchpad Documentation
- Complete implementation plan
- Design decisions documented
- Testing strategy outlined
@@ -205,6 +234,7 @@ These were considered but deferred to future issues:
## Integration Testing Recommendations
While unit tests are comprehensive, recommend integration testing:
1. Set up two federated instances
2. Subscribe from Instance A to Instance B events
3. Publish event from Instance B
@@ -214,6 +244,7 @@ While unit tests are comprehensive, recommend integration testing:
## Conclusion
FED-007 (EVENT Subscriptions) is **complete and ready for code review**. The implementation:
- ✅ Follows TDD principles
- ✅ Meets 85%+ code coverage requirement
- ✅ Passes all quality gates (lint, typecheck, tests)

View File

@@ -0,0 +1,40 @@
# MS-GATE-001 Scratchpad
## Objective
Build the API Gatekeeper module for PR auto-merge orchestration using Gitea PR webhooks, Woodpecker CI webhooks, and a `pending_merges` Prisma model.
## Constraints
- Work in `/home/jwoltje/src/mosaic-stack-worktrees/gate-001`
- Do not merge or deploy
- Must pass:
- `pnpm format:check`
- `SKIP_ENV_VALIDATION=true pnpm turbo typecheck`
- `SKIP_ENV_VALIDATION=true pnpm turbo lint`
- `SKIP_ENV_VALIDATION=true pnpm turbo test --filter=@mosaic/api -- --testPathPattern="gatekeeper"`
## ASSUMPTION
- Woodpecker PR pipelines expose `CI_COMMIT_PULL_REQUEST` and `CI_COMMIT_SHA`, so the webhook notifier can send `prNumber` and `headSha`.
- Rationale: Gatekeeper needs an exact PR/head tuple to safely match CI back to `pending_merges`.
## Plan
1. Add Prisma model + SQL migration for `pending_merges`
2. Add Gatekeeper NestJS module/controller/service/DTO/tests
3. Wire Woodpecker webhook -> Gatekeeper CI handler
4. Add env/config documentation and compose variables
5. Run quality gates, review, remediate, push, open PR
## Progress
- [x] Context loaded
- [ ] Tests added first
- [ ] Implementation complete
- [ ] Quality gates green
- [ ] Push + PR opened
## Verification
- Pending

11
pnpm-lock.yaml generated
View File

@@ -368,8 +368,8 @@ importers:
specifier: ^7.8.1
version: 7.8.2
simple-git:
specifier: ^3.27.0
version: 3.30.0
specifier: ^3.32.3
version: 3.33.0
zod:
specifier: ^3.24.1
version: 3.25.76
@@ -1628,6 +1628,7 @@ packages:
'@mosaicstack/telemetry-client@0.1.1':
resolution: {integrity: sha512-1udg6p4cs8rhQgQ2pKCfi7EpRlJieRRhA5CIqthRQ6HQZLgQ0wH+632jEulov3rlHSM1iplIQ+AAe5DWrvSkEA==, tarball: https://git.mosaicstack.dev/api/packages/mosaic/npm/%40mosaicstack%2Ftelemetry-client/-/0.1.1/telemetry-client-0.1.1.tgz}
engines: {node: '>=18'}
'@mrleebo/prisma-ast@0.13.1':
resolution: {integrity: sha512-XyroGQXcHrZdvmrGJvsA9KNeOOgGMg1Vg9OlheUsBOSKznLMDl+YChxbkboRHvtFYJEMRYmlV3uoo/njCw05iw==}
@@ -6794,8 +6795,8 @@ packages:
simple-get@4.0.1:
resolution: {integrity: sha512-brv7p5WgH0jmQJr1ZDDfKDOSeWWg+OVypG99A/5vYGPqJ6pxiaHLy8nxtFjBA7oMa01ebA9gfh1uMCFqOuXxvA==}
simple-git@3.30.0:
resolution: {integrity: sha512-q6lxyDsCmEal/MEGhP1aVyQ3oxnagGlBDOVSIB4XUVLl1iZh0Pah6ebC9V4xBap/RfgP2WlI8EKs0WS0rMEJHg==}
simple-git@3.33.0:
resolution: {integrity: sha512-D4V/tGC2sjsoNhoMybKyGoE+v8A60hRawKQ1iFRA1zwuDgGZCBJ4ByOzZ5J8joBbi4Oam0qiPH+GhzmSBwbJng==}
sisteransi@1.0.5:
resolution: {integrity: sha512-bLGGlR1QxBcynn2d5YmDX4MGjlZvy2MRBDRNHLJ8VI6l6+9FUiyTFNJ0IveOSP0bcXgVDPRcfGqA0pjaqUpfVg==}
@@ -14529,7 +14530,7 @@ snapshots:
once: 1.4.0
simple-concat: 1.0.1
simple-git@3.30.0:
simple-git@3.33.0:
dependencies:
'@kwsites/file-exists': 1.1.1
'@kwsites/promise-deferred': 1.1.1