Compare commits
20 Commits
feat/ms24-
...
feat/ms-ga
| Author | SHA1 | Date | |
|---|---|---|---|
| 5f0a7c847c | |||
| 639abfaefa | |||
| 6e2b9a307e | |||
| 3289677056 | |||
| 5a14a97cb4 | |||
| aebf6b18db | |||
| 6fbfb3c197 | |||
| 348943c5f7 | |||
| 39d36e67c5 | |||
| e8a2d32476 | |||
| 3c0c10c9e5 | |||
| f59ce6a7a5 | |||
| 11fa1734bd | |||
| 46815707a9 | |||
| 621df6ee70 | |||
| ac406f19bc | |||
| 72d295edd6 | |||
| 6e9def3c5a | |||
| 8014930b70 | |||
| 04bbdf3308 |
@@ -292,6 +292,11 @@ GITEA_REPO_NAME=stack
|
|||||||
# Configure in Gitea: Repository Settings → Webhooks → Add Webhook
|
# Configure in Gitea: Repository Settings → Webhooks → Add Webhook
|
||||||
GITEA_WEBHOOK_SECRET=REPLACE_WITH_RANDOM_WEBHOOK_SECRET
|
GITEA_WEBHOOK_SECRET=REPLACE_WITH_RANDOM_WEBHOOK_SECRET
|
||||||
|
|
||||||
|
# Gatekeeper merge automation
|
||||||
|
# Uses the same Gitea host to validate PR webhooks and to merge approved PRs after green CI.
|
||||||
|
GITEA_API_TOKEN=REPLACE_WITH_GATEKEEPER_GITEA_API_TOKEN
|
||||||
|
GATEKEEPER_ENABLED=true
|
||||||
|
|
||||||
# Coordinator API Key (service-to-service authentication)
|
# Coordinator API Key (service-to-service authentication)
|
||||||
# CRITICAL: Generate a random API key with at least 32 characters
|
# CRITICAL: Generate a random API key with at least 32 characters
|
||||||
# Example: openssl rand -base64 32
|
# Example: openssl rand -base64 32
|
||||||
|
|||||||
@@ -271,6 +271,26 @@ steps:
|
|||||||
depends_on:
|
depends_on:
|
||||||
- docker-build-orchestrator
|
- docker-build-orchestrator
|
||||||
|
|
||||||
|
notify-webhook:
|
||||||
|
image: curlimages/curl:8.6.0
|
||||||
|
environment:
|
||||||
|
MOSAIC_WEBHOOK_URL:
|
||||||
|
from_secret: mosaic_webhook_url
|
||||||
|
WOODPECKER_WEBHOOK_SECRET:
|
||||||
|
from_secret: woodpecker_webhook_secret
|
||||||
|
commands:
|
||||||
|
- |
|
||||||
|
BODY="{\"branch\":\"${CI_COMMIT_BRANCH}\",\"status\":\"${CI_PIPELINE_STATUS}\",\"buildUrl\":\"${CI_PIPELINE_LINK}\",\"repo\":\"${CI_REPO}\",\"prNumber\":${CI_COMMIT_PULL_REQUEST:-null},\"headSha\":\"${CI_COMMIT_SHA}\"}"
|
||||||
|
SIG=$(echo -n "$BODY" | openssl dgst -sha256 -hmac "$WOODPECKER_WEBHOOK_SECRET" | awk '{print $2}')
|
||||||
|
curl -s -o /dev/null -w "%{http_code}" -X POST "${MOSAIC_WEBHOOK_URL}/api/webhooks/woodpecker" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-H "X-Woodpecker-Signature: ${SIG}" \
|
||||||
|
-d "$BODY" || true
|
||||||
|
when:
|
||||||
|
- status: [success, failure]
|
||||||
|
depends_on:
|
||||||
|
- build
|
||||||
|
|
||||||
security-trivy-web:
|
security-trivy-web:
|
||||||
image: aquasec/trivy:latest
|
image: aquasec/trivy:latest
|
||||||
environment:
|
environment:
|
||||||
|
|||||||
@@ -24,6 +24,11 @@ ENCRYPTION_KEY=0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef
|
|||||||
# In development, a random key is generated if not set
|
# In development, a random key is generated if not set
|
||||||
CSRF_SECRET=fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210
|
CSRF_SECRET=fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210
|
||||||
|
|
||||||
|
# Gatekeeper merge automation
|
||||||
|
GITEA_WEBHOOK_SECRET=replace-with-gitea-webhook-secret
|
||||||
|
GITEA_API_TOKEN=replace-with-gitea-api-token
|
||||||
|
GATEKEEPER_ENABLED=true
|
||||||
|
|
||||||
# OpenTelemetry Configuration
|
# OpenTelemetry Configuration
|
||||||
# Enable/disable OpenTelemetry tracing (default: true)
|
# Enable/disable OpenTelemetry tracing (default: true)
|
||||||
OTEL_ENABLED=true
|
OTEL_ENABLED=true
|
||||||
|
|||||||
@@ -12,7 +12,7 @@
|
|||||||
"lint:fix": "eslint \"src/**/*.ts\" --fix",
|
"lint:fix": "eslint \"src/**/*.ts\" --fix",
|
||||||
"typecheck": "tsc --noEmit",
|
"typecheck": "tsc --noEmit",
|
||||||
"clean": "rm -rf dist",
|
"clean": "rm -rf dist",
|
||||||
"test": "vitest run",
|
"test": "node scripts/vitest-runner.mjs",
|
||||||
"test:watch": "vitest",
|
"test:watch": "vitest",
|
||||||
"test:coverage": "vitest run --coverage",
|
"test:coverage": "vitest run --coverage",
|
||||||
"test:e2e": "vitest run --config ./vitest.e2e.config.ts",
|
"test:e2e": "vitest run --config ./vitest.e2e.config.ts",
|
||||||
|
|||||||
@@ -0,0 +1,14 @@
|
|||||||
|
CREATE TABLE "pending_merges" (
|
||||||
|
"id" UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
"repo" TEXT NOT NULL,
|
||||||
|
"pr_number" INTEGER NOT NULL,
|
||||||
|
"head_sha" TEXT NOT NULL,
|
||||||
|
"state" TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
"review_result" JSONB,
|
||||||
|
"ci_status" TEXT,
|
||||||
|
"requester" TEXT,
|
||||||
|
"gitea_merge_url" TEXT,
|
||||||
|
"created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
"updated_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||||
|
CONSTRAINT "pending_merges_repo_pr_sha_key" UNIQUE ("repo", "pr_number", "head_sha")
|
||||||
|
);
|
||||||
@@ -261,6 +261,23 @@ model User {
|
|||||||
@@map("users")
|
@@map("users")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
model PendingMerge {
|
||||||
|
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
|
||||||
|
repo String
|
||||||
|
prNumber Int @map("pr_number")
|
||||||
|
headSha String @map("head_sha")
|
||||||
|
state String @default("pending")
|
||||||
|
reviewResult Json? @map("review_result")
|
||||||
|
ciStatus String? @map("ci_status")
|
||||||
|
requester String?
|
||||||
|
giteaMergeUrl String? @map("gitea_merge_url")
|
||||||
|
createdAt DateTime @default(now()) @map("created_at") @db.Timestamptz
|
||||||
|
updatedAt DateTime @updatedAt @map("updated_at") @db.Timestamptz
|
||||||
|
|
||||||
|
@@unique([repo, prNumber, headSha])
|
||||||
|
@@map("pending_merges")
|
||||||
|
}
|
||||||
|
|
||||||
model UserPreference {
|
model UserPreference {
|
||||||
id String @id @default(uuid()) @db.Uuid
|
id String @id @default(uuid()) @db.Uuid
|
||||||
userId String @unique @map("user_id") @db.Uuid
|
userId String @unique @map("user_id") @db.Uuid
|
||||||
|
|||||||
36
apps/api/scripts/vitest-runner.mjs
Normal file
36
apps/api/scripts/vitest-runner.mjs
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
import { spawnSync } from "node:child_process";
|
||||||
|
import { join } from "node:path";
|
||||||
|
|
||||||
|
const rawArgs = process.argv.slice(2);
|
||||||
|
const passthroughArgs = [];
|
||||||
|
|
||||||
|
for (let index = 0; index < rawArgs.length; index += 1) {
|
||||||
|
const arg = rawArgs[index];
|
||||||
|
|
||||||
|
if (arg === "--testPathPattern") {
|
||||||
|
const value = rawArgs[index + 1];
|
||||||
|
if (value) {
|
||||||
|
passthroughArgs.push(value);
|
||||||
|
index += 1;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (arg.startsWith("--testPathPattern=")) {
|
||||||
|
passthroughArgs.push(arg.slice("--testPathPattern=".length));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
passthroughArgs.push(arg);
|
||||||
|
}
|
||||||
|
|
||||||
|
const vitestBin = join(process.cwd(), "node_modules", ".bin", "vitest");
|
||||||
|
const result = spawnSync(vitestBin, ["run", ...passthroughArgs], {
|
||||||
|
stdio: "inherit",
|
||||||
|
});
|
||||||
|
|
||||||
|
if (result.error) {
|
||||||
|
throw result.error;
|
||||||
|
}
|
||||||
|
|
||||||
|
process.exit(result.status ?? 1);
|
||||||
@@ -346,7 +346,9 @@ describe("AdminService", () => {
|
|||||||
data: { deactivatedAt: expect.any(Date) },
|
data: { deactivatedAt: expect.any(Date) },
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
expect(mockPrismaService.session.deleteMany).toHaveBeenCalledWith({ where: { userId: mockUserId } });
|
expect(mockPrismaService.session.deleteMany).toHaveBeenCalledWith({
|
||||||
|
where: { userId: mockUserId },
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should throw NotFoundException if user does not exist", async () => {
|
it("should throw NotFoundException if user does not exist", async () => {
|
||||||
|
|||||||
@@ -60,8 +60,10 @@ import { ContainerReaperModule } from "./container-reaper/container-reaper.modul
|
|||||||
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
|
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
|
||||||
import { OnboardingModule } from "./onboarding/onboarding.module";
|
import { OnboardingModule } from "./onboarding/onboarding.module";
|
||||||
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
|
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
|
||||||
|
import { MissionControlProxyModule } from "./mission-control-proxy/mission-control-proxy.module";
|
||||||
import { OrchestratorModule } from "./orchestrator/orchestrator.module";
|
import { OrchestratorModule } from "./orchestrator/orchestrator.module";
|
||||||
import { QueueNotificationsModule } from "./queue-notifications/queue-notifications.module";
|
import { QueueNotificationsModule } from "./queue-notifications/queue-notifications.module";
|
||||||
|
import { GatekeeperModule } from "./gatekeeper/gatekeeper.module";
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
@@ -143,7 +145,9 @@ import { QueueNotificationsModule } from "./queue-notifications/queue-notificati
|
|||||||
FleetSettingsModule,
|
FleetSettingsModule,
|
||||||
OnboardingModule,
|
OnboardingModule,
|
||||||
ChatProxyModule,
|
ChatProxyModule,
|
||||||
|
MissionControlProxyModule,
|
||||||
OrchestratorModule,
|
OrchestratorModule,
|
||||||
|
GatekeeperModule,
|
||||||
QueueNotificationsModule,
|
QueueNotificationsModule,
|
||||||
],
|
],
|
||||||
controllers: [AppController, CsrfController],
|
controllers: [AppController, CsrfController],
|
||||||
|
|||||||
@@ -211,9 +211,7 @@ describe("AuthGuard", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
|
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
|
||||||
await expect(guard.canActivate(context)).rejects.toThrow(
|
await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
|
||||||
"Invalid user data in session"
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should throw UnauthorizedException when user is missing email", async () => {
|
it("should throw UnauthorizedException when user is missing email", async () => {
|
||||||
@@ -227,9 +225,7 @@ describe("AuthGuard", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
|
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
|
||||||
await expect(guard.canActivate(context)).rejects.toThrow(
|
await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
|
||||||
"Invalid user data in session"
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should throw UnauthorizedException when user is missing name", async () => {
|
it("should throw UnauthorizedException when user is missing name", async () => {
|
||||||
@@ -243,9 +239,7 @@ describe("AuthGuard", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
|
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
|
||||||
await expect(guard.canActivate(context)).rejects.toThrow(
|
await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
|
||||||
"Invalid user data in session"
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should throw UnauthorizedException when user is a string", async () => {
|
it("should throw UnauthorizedException when user is a string", async () => {
|
||||||
@@ -259,9 +253,7 @@ describe("AuthGuard", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
|
await expect(guard.canActivate(context)).rejects.toThrow(UnauthorizedException);
|
||||||
await expect(guard.canActivate(context)).rejects.toThrow(
|
await expect(guard.canActivate(context)).rejects.toThrow("Invalid user data in session");
|
||||||
"Invalid user data in session"
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("should reject when user is null (typeof null === 'object' causes TypeError on 'in' operator)", async () => {
|
it("should reject when user is null (typeof null === 'object' causes TypeError on 'in' operator)", async () => {
|
||||||
@@ -277,9 +269,7 @@ describe("AuthGuard", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await expect(guard.canActivate(context)).rejects.toThrow(TypeError);
|
await expect(guard.canActivate(context)).rejects.toThrow(TypeError);
|
||||||
await expect(guard.canActivate(context)).rejects.not.toBeInstanceOf(
|
await expect(guard.canActivate(context)).rejects.not.toBeInstanceOf(UnauthorizedException);
|
||||||
UnauthorizedException
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -154,9 +154,7 @@ describe("CoordinatorIntegrationService", () => {
|
|||||||
// Mock transaction that passes through the callback
|
// Mock transaction that passes through the callback
|
||||||
mockPrismaService.$transaction.mockImplementation(async (callback) => {
|
mockPrismaService.$transaction.mockImplementation(async (callback) => {
|
||||||
const mockTx = {
|
const mockTx = {
|
||||||
$queryRaw: vi
|
$queryRaw: vi.fn().mockResolvedValue([
|
||||||
.fn()
|
|
||||||
.mockResolvedValue([
|
|
||||||
{
|
{
|
||||||
id: mockJob.id,
|
id: mockJob.id,
|
||||||
status: mockJob.status,
|
status: mockJob.status,
|
||||||
@@ -204,9 +202,7 @@ describe("CoordinatorIntegrationService", () => {
|
|||||||
// Mock transaction with completed job
|
// Mock transaction with completed job
|
||||||
mockPrismaService.$transaction.mockImplementation(async (callback) => {
|
mockPrismaService.$transaction.mockImplementation(async (callback) => {
|
||||||
const mockTx = {
|
const mockTx = {
|
||||||
$queryRaw: vi
|
$queryRaw: vi.fn().mockResolvedValue([
|
||||||
.fn()
|
|
||||||
.mockResolvedValue([
|
|
||||||
{
|
{
|
||||||
id: mockJob.id,
|
id: mockJob.id,
|
||||||
status: RunnerJobStatus.COMPLETED,
|
status: RunnerJobStatus.COMPLETED,
|
||||||
@@ -271,9 +267,7 @@ describe("CoordinatorIntegrationService", () => {
|
|||||||
// Mock transaction with running job
|
// Mock transaction with running job
|
||||||
mockPrismaService.$transaction.mockImplementation(async (callback) => {
|
mockPrismaService.$transaction.mockImplementation(async (callback) => {
|
||||||
const mockTx = {
|
const mockTx = {
|
||||||
$queryRaw: vi
|
$queryRaw: vi.fn().mockResolvedValue([
|
||||||
.fn()
|
|
||||||
.mockResolvedValue([
|
|
||||||
{
|
{
|
||||||
id: mockJob.id,
|
id: mockJob.id,
|
||||||
status: RunnerJobStatus.RUNNING,
|
status: RunnerJobStatus.RUNNING,
|
||||||
@@ -315,9 +309,7 @@ describe("CoordinatorIntegrationService", () => {
|
|||||||
// Mock transaction with running job
|
// Mock transaction with running job
|
||||||
mockPrismaService.$transaction.mockImplementation(async (callback) => {
|
mockPrismaService.$transaction.mockImplementation(async (callback) => {
|
||||||
const mockTx = {
|
const mockTx = {
|
||||||
$queryRaw: vi
|
$queryRaw: vi.fn().mockResolvedValue([
|
||||||
.fn()
|
|
||||||
.mockResolvedValue([
|
|
||||||
{
|
{
|
||||||
id: mockJob.id,
|
id: mockJob.id,
|
||||||
status: RunnerJobStatus.RUNNING,
|
status: RunnerJobStatus.RUNNING,
|
||||||
|
|||||||
113
apps/api/src/gatekeeper/dto/gitea-pr-webhook.dto.ts
Normal file
113
apps/api/src/gatekeeper/dto/gitea-pr-webhook.dto.ts
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
import {
|
||||||
|
IsArray,
|
||||||
|
IsIn,
|
||||||
|
IsInt,
|
||||||
|
IsObject,
|
||||||
|
IsOptional,
|
||||||
|
IsString,
|
||||||
|
MaxLength,
|
||||||
|
Min,
|
||||||
|
MinLength,
|
||||||
|
ValidateNested,
|
||||||
|
} from "class-validator";
|
||||||
|
import { Type } from "class-transformer";
|
||||||
|
|
||||||
|
class GiteaLabelDto {
|
||||||
|
@IsString()
|
||||||
|
@MinLength(1)
|
||||||
|
@MaxLength(255)
|
||||||
|
name!: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
class GiteaRepoDto {
|
||||||
|
@IsString()
|
||||||
|
@MinLength(1)
|
||||||
|
@MaxLength(512)
|
||||||
|
full_name!: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
class GiteaBranchRefDto {
|
||||||
|
@IsString()
|
||||||
|
@MinLength(1)
|
||||||
|
@MaxLength(255)
|
||||||
|
ref!: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
class GiteaHeadDto {
|
||||||
|
@IsString()
|
||||||
|
@MinLength(7)
|
||||||
|
@MaxLength(128)
|
||||||
|
sha!: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
class GiteaPullRequestDto {
|
||||||
|
@IsInt()
|
||||||
|
@Min(1)
|
||||||
|
number!: number;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
body?: string;
|
||||||
|
|
||||||
|
@ValidateNested()
|
||||||
|
@Type(() => GiteaBranchRefDto)
|
||||||
|
base!: GiteaBranchRefDto;
|
||||||
|
|
||||||
|
@ValidateNested()
|
||||||
|
@Type(() => GiteaHeadDto)
|
||||||
|
head!: GiteaHeadDto;
|
||||||
|
|
||||||
|
@IsArray()
|
||||||
|
@ValidateNested({ each: true })
|
||||||
|
@Type(() => GiteaLabelDto)
|
||||||
|
labels!: GiteaLabelDto[];
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
html_url?: string;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsString()
|
||||||
|
url?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
class GiteaSenderDto {
|
||||||
|
@IsString()
|
||||||
|
@MinLength(1)
|
||||||
|
@MaxLength(255)
|
||||||
|
login!: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class GiteaPrWebhookDto {
|
||||||
|
@IsString()
|
||||||
|
@IsIn(["pull_request"])
|
||||||
|
@MaxLength(64)
|
||||||
|
type!: string;
|
||||||
|
|
||||||
|
@IsString()
|
||||||
|
@IsIn(["opened", "labeled", "synchronize"])
|
||||||
|
@MaxLength(64)
|
||||||
|
action!: "opened" | "labeled" | "synchronize";
|
||||||
|
|
||||||
|
@ValidateNested()
|
||||||
|
@Type(() => GiteaRepoDto)
|
||||||
|
repository!: GiteaRepoDto;
|
||||||
|
|
||||||
|
@ValidateNested()
|
||||||
|
@Type(() => GiteaPullRequestDto)
|
||||||
|
pull_request!: GiteaPullRequestDto;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@ValidateNested()
|
||||||
|
@Type(() => GiteaLabelDto)
|
||||||
|
label?: GiteaLabelDto;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@ValidateNested()
|
||||||
|
@Type(() => GiteaSenderDto)
|
||||||
|
sender?: GiteaSenderDto;
|
||||||
|
|
||||||
|
@IsOptional()
|
||||||
|
@IsObject()
|
||||||
|
review_result?: Record<string, unknown>;
|
||||||
|
}
|
||||||
98
apps/api/src/gatekeeper/gatekeeper.controller.spec.ts
Normal file
98
apps/api/src/gatekeeper/gatekeeper.controller.spec.ts
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
import { createHmac } from "node:crypto";
|
||||||
|
import { Logger } from "@nestjs/common";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { Test, type TestingModule } from "@nestjs/testing";
|
||||||
|
import type { Request } from "express";
|
||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { GatekeeperController } from "./gatekeeper.controller";
|
||||||
|
import { GatekeeperService } from "./gatekeeper.service";
|
||||||
|
|
||||||
|
describe("GatekeeperController", () => {
|
||||||
|
let controller: GatekeeperController;
|
||||||
|
|
||||||
|
const gatekeeperService = {
|
||||||
|
handlePrEvent: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const configService = {
|
||||||
|
get: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const payload = {
|
||||||
|
type: "pull_request",
|
||||||
|
action: "labeled",
|
||||||
|
label: { name: "auto-merge" },
|
||||||
|
repository: { full_name: "mosaic/stack" },
|
||||||
|
sender: { login: "jason" },
|
||||||
|
pull_request: {
|
||||||
|
number: 7,
|
||||||
|
body: "ready",
|
||||||
|
base: { ref: "main" },
|
||||||
|
head: { sha: "abcdef1234567890" },
|
||||||
|
labels: [{ name: "auto-merge" }],
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
configService.get.mockImplementation((key: string) => {
|
||||||
|
if (key === "GITEA_WEBHOOK_SECRET") {
|
||||||
|
return "secret";
|
||||||
|
}
|
||||||
|
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
gatekeeperService.handlePrEvent.mockResolvedValue(undefined);
|
||||||
|
|
||||||
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
|
controllers: [GatekeeperController],
|
||||||
|
providers: [
|
||||||
|
{ provide: GatekeeperService, useValue: gatekeeperService },
|
||||||
|
{ provide: ConfigService, useValue: configService },
|
||||||
|
],
|
||||||
|
}).compile();
|
||||||
|
|
||||||
|
controller = module.get(GatekeeperController);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("accepts a valid signature and schedules processing", async () => {
|
||||||
|
const signature = createHmac("sha256", "secret").update(JSON.stringify(payload)).digest("hex");
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
controller.handleWebhook(
|
||||||
|
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
|
||||||
|
payload,
|
||||||
|
signature
|
||||||
|
)
|
||||||
|
).resolves.toEqual({ ok: true });
|
||||||
|
|
||||||
|
expect(gatekeeperService.handlePrEvent).toHaveBeenCalledWith(payload);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("ignores invalid signatures", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
|
||||||
|
await expect(controller.handleWebhook({} as Request, payload, "bad")).resolves.toEqual({
|
||||||
|
ok: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(gatekeeperService.handlePrEvent).not.toHaveBeenCalled();
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("invalid Gitea webhook signature")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("accepts requests without validation when no secret is configured", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
configService.get.mockReturnValue(undefined);
|
||||||
|
|
||||||
|
await expect(controller.handleWebhook({} as Request, payload, "")).resolves.toEqual({
|
||||||
|
ok: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(gatekeeperService.handlePrEvent).toHaveBeenCalledWith(payload);
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("GITEA_WEBHOOK_SECRET is not configured")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
67
apps/api/src/gatekeeper/gatekeeper.controller.ts
Normal file
67
apps/api/src/gatekeeper/gatekeeper.controller.ts
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
import { Body, Controller, Headers, Logger, Post, Req, type RawBodyRequest } from "@nestjs/common";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { createHmac, timingSafeEqual } from "node:crypto";
|
||||||
|
import type { Request } from "express";
|
||||||
|
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
|
||||||
|
import { GiteaPrWebhookDto } from "./dto/gitea-pr-webhook.dto";
|
||||||
|
import { GatekeeperService } from "./gatekeeper.service";
|
||||||
|
|
||||||
|
@Controller("gatekeeper/webhook")
|
||||||
|
export class GatekeeperController {
|
||||||
|
private readonly logger = new Logger(GatekeeperController.name);
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly gatekeeperService: GatekeeperService,
|
||||||
|
private readonly configService: ConfigService
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@SkipCsrf()
|
||||||
|
@Post("gitea")
|
||||||
|
handleWebhook(
|
||||||
|
@Req() req: RawBodyRequest<Request>,
|
||||||
|
@Body() body: GiteaPrWebhookDto,
|
||||||
|
@Headers("x-gitea-signature") signature: string | undefined
|
||||||
|
): Promise<{ ok: boolean }> {
|
||||||
|
const secret = this.configService.get<string>("GITEA_WEBHOOK_SECRET");
|
||||||
|
|
||||||
|
if (secret && !this.isValidSignature(this.getRequestBody(req, body), signature, secret)) {
|
||||||
|
this.logger.warn("Received invalid Gitea webhook signature");
|
||||||
|
return Promise.resolve({ ok: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!secret) {
|
||||||
|
this.logger.warn("GITEA_WEBHOOK_SECRET is not configured; accepting Gitea webhook");
|
||||||
|
}
|
||||||
|
|
||||||
|
void this.gatekeeperService.handlePrEvent(body).catch((error: unknown) => {
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
this.logger.error(`Failed to process Gitea PR webhook: ${message}`);
|
||||||
|
});
|
||||||
|
|
||||||
|
return Promise.resolve({ ok: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
private getRequestBody(req: RawBodyRequest<Request>, body: GiteaPrWebhookDto): Buffer {
|
||||||
|
if (Buffer.isBuffer(req.rawBody)) {
|
||||||
|
return req.rawBody;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Buffer.from(JSON.stringify(body));
|
||||||
|
}
|
||||||
|
|
||||||
|
private isValidSignature(body: Buffer, signature: string | undefined, secret: string): boolean {
|
||||||
|
if (!signature) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const expected = createHmac("sha256", secret).update(body).digest("hex");
|
||||||
|
const actual = Buffer.from(signature);
|
||||||
|
const expectedBuffer = Buffer.from(expected);
|
||||||
|
|
||||||
|
if (actual.length !== expectedBuffer.length) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return timingSafeEqual(actual, expectedBuffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
12
apps/api/src/gatekeeper/gatekeeper.module.ts
Normal file
12
apps/api/src/gatekeeper/gatekeeper.module.ts
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
import { Module } from "@nestjs/common";
|
||||||
|
import { ConfigModule } from "@nestjs/config";
|
||||||
|
import { GatekeeperController } from "./gatekeeper.controller";
|
||||||
|
import { GatekeeperService } from "./gatekeeper.service";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [ConfigModule],
|
||||||
|
controllers: [GatekeeperController],
|
||||||
|
providers: [GatekeeperService],
|
||||||
|
exports: [GatekeeperService],
|
||||||
|
})
|
||||||
|
export class GatekeeperModule {}
|
||||||
199
apps/api/src/gatekeeper/gatekeeper.service.spec.ts
Normal file
199
apps/api/src/gatekeeper/gatekeeper.service.spec.ts
Normal file
@@ -0,0 +1,199 @@
|
|||||||
|
import { Logger } from "@nestjs/common";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { Test, type TestingModule } from "@nestjs/testing";
|
||||||
|
import { describe, beforeEach, expect, it, vi } from "vitest";
|
||||||
|
import { PrismaService } from "../prisma/prisma.service";
|
||||||
|
import type { GiteaPrWebhookDto } from "./dto/gitea-pr-webhook.dto";
|
||||||
|
import { GatekeeperService } from "./gatekeeper.service";
|
||||||
|
|
||||||
|
describe("GatekeeperService", () => {
|
||||||
|
let service: GatekeeperService;
|
||||||
|
|
||||||
|
const prisma = {
|
||||||
|
pendingMerge: {
|
||||||
|
upsert: vi.fn(),
|
||||||
|
update: vi.fn(),
|
||||||
|
findFirst: vi.fn(),
|
||||||
|
findUnique: vi.fn(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const config = {
|
||||||
|
get: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const basePayload: GiteaPrWebhookDto = {
|
||||||
|
type: "pull_request",
|
||||||
|
action: "labeled",
|
||||||
|
repository: { full_name: "mosaic/stack" },
|
||||||
|
label: { name: "auto-merge" },
|
||||||
|
sender: { login: "jason" },
|
||||||
|
pull_request: {
|
||||||
|
number: 42,
|
||||||
|
body: "Implements Gatekeeper",
|
||||||
|
base: { ref: "main" },
|
||||||
|
head: { sha: "abcdef1234567890" },
|
||||||
|
labels: [{ name: "auto-merge" }],
|
||||||
|
url: "https://git.mosaicstack.dev/api/v1/repos/mosaic/stack/pulls/42",
|
||||||
|
html_url: "https://git.mosaicstack.dev/mosaic/stack/pulls/42",
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
config.get.mockImplementation((key: string) => {
|
||||||
|
switch (key) {
|
||||||
|
case "GATEKEEPER_ENABLED":
|
||||||
|
return "true";
|
||||||
|
case "GITEA_API_TOKEN":
|
||||||
|
return "token";
|
||||||
|
default:
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
|
providers: [
|
||||||
|
GatekeeperService,
|
||||||
|
{ provide: PrismaService, useValue: prisma },
|
||||||
|
{ provide: ConfigService, useValue: config },
|
||||||
|
],
|
||||||
|
}).compile();
|
||||||
|
|
||||||
|
service = module.get(GatekeeperService);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("moves labeled auto-merge PRs into awaiting_ci when review passes", async () => {
|
||||||
|
prisma.pendingMerge.upsert.mockResolvedValue({ id: "merge-1" });
|
||||||
|
prisma.pendingMerge.update.mockResolvedValue({});
|
||||||
|
|
||||||
|
await service.handlePrEvent(basePayload);
|
||||||
|
|
||||||
|
expect(prisma.pendingMerge.upsert).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
where: {
|
||||||
|
repo_prNumber_headSha: {
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
prNumber: 42,
|
||||||
|
headSha: "abcdef1234567890",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
expect(prisma.pendingMerge.update).toHaveBeenNthCalledWith(
|
||||||
|
1,
|
||||||
|
expect.objectContaining({ data: expect.objectContaining({ state: "reviewing" }) })
|
||||||
|
);
|
||||||
|
expect(prisma.pendingMerge.update).toHaveBeenNthCalledWith(
|
||||||
|
2,
|
||||||
|
expect.objectContaining({
|
||||||
|
data: expect.objectContaining({
|
||||||
|
state: "awaiting_ci",
|
||||||
|
reviewResult: { passed: true, issues: [] },
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects review failures and records the reason", async () => {
|
||||||
|
prisma.pendingMerge.upsert.mockResolvedValue({ id: "merge-2" });
|
||||||
|
prisma.pendingMerge.findUnique.mockResolvedValue({
|
||||||
|
id: "merge-2",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
prNumber: 42,
|
||||||
|
headSha: "abcdef1234567890",
|
||||||
|
});
|
||||||
|
prisma.pendingMerge.update.mockResolvedValue({});
|
||||||
|
const commentSpy = vi
|
||||||
|
.spyOn(
|
||||||
|
service as unknown as {
|
||||||
|
postPullRequestComment: (repo: string, prNumber: number, body: string) => Promise<void>;
|
||||||
|
},
|
||||||
|
"postPullRequestComment"
|
||||||
|
)
|
||||||
|
.mockResolvedValue();
|
||||||
|
|
||||||
|
await service.handlePrEvent({
|
||||||
|
...basePayload,
|
||||||
|
pull_request: {
|
||||||
|
...basePayload.pull_request,
|
||||||
|
body: "",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(prisma.pendingMerge.update).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
where: { id: "merge-2" },
|
||||||
|
data: {
|
||||||
|
state: "rejected",
|
||||||
|
reviewResult: {
|
||||||
|
passed: false,
|
||||||
|
issues: ["PR description must not be empty"],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
expect(commentSpy).toHaveBeenCalledWith(
|
||||||
|
"mosaic/stack",
|
||||||
|
42,
|
||||||
|
expect.stringContaining("PR description must not be empty")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("attempts merge on green CI for awaiting_ci records", async () => {
|
||||||
|
prisma.pendingMerge.findFirst.mockResolvedValue({
|
||||||
|
id: "merge-3",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
prNumber: 42,
|
||||||
|
headSha: "abcdef1234567890",
|
||||||
|
state: "awaiting_ci",
|
||||||
|
giteaMergeUrl: "https://git.mosaicstack.dev/api/v1/repos/mosaic/stack/pulls/42",
|
||||||
|
});
|
||||||
|
prisma.pendingMerge.update.mockResolvedValue({});
|
||||||
|
const mergeSpy = vi.spyOn(service, "attemptMerge").mockResolvedValue();
|
||||||
|
|
||||||
|
await service.handleCiEvent("mosaic/stack", 42, "abcdef1234567890", "success");
|
||||||
|
|
||||||
|
expect(prisma.pendingMerge.update).toHaveBeenCalledWith(
|
||||||
|
expect.objectContaining({
|
||||||
|
where: { id: "merge-3" },
|
||||||
|
data: expect.objectContaining({ ciStatus: "success" }),
|
||||||
|
})
|
||||||
|
);
|
||||||
|
expect(mergeSpy).toHaveBeenCalledWith("merge-3");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects failed CI results", async () => {
|
||||||
|
prisma.pendingMerge.findFirst.mockResolvedValue({
|
||||||
|
id: "merge-4",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
prNumber: 42,
|
||||||
|
headSha: "abcdef1234567890",
|
||||||
|
state: "awaiting_ci",
|
||||||
|
});
|
||||||
|
prisma.pendingMerge.update.mockResolvedValue({});
|
||||||
|
const rejectSpy = vi.spyOn(service, "rejectMerge").mockResolvedValue();
|
||||||
|
|
||||||
|
await service.handleCiEvent("mosaic/stack", 42, "abcdef1234567890", "failure");
|
||||||
|
|
||||||
|
expect(rejectSpy).toHaveBeenCalledWith("merge-4", "CI reported failure");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("skips all work when Gatekeeper is disabled", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
config.get.mockImplementation((key: string) => {
|
||||||
|
if (key === "GATEKEEPER_ENABLED") {
|
||||||
|
return "false";
|
||||||
|
}
|
||||||
|
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
|
||||||
|
await service.handlePrEvent(basePayload);
|
||||||
|
await service.handleCiEvent("mosaic/stack", 42, "abcdef1234567890", "success");
|
||||||
|
|
||||||
|
expect(prisma.pendingMerge.upsert).not.toHaveBeenCalled();
|
||||||
|
expect(prisma.pendingMerge.findFirst).not.toHaveBeenCalled();
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("Gatekeeper is disabled"));
|
||||||
|
});
|
||||||
|
});
|
||||||
311
apps/api/src/gatekeeper/gatekeeper.service.ts
Normal file
311
apps/api/src/gatekeeper/gatekeeper.service.ts
Normal file
@@ -0,0 +1,311 @@
|
|||||||
|
import { Injectable, Logger } from "@nestjs/common";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { Prisma } from "@prisma/client";
|
||||||
|
import { PrismaService } from "../prisma/prisma.service";
|
||||||
|
import type { GiteaPrWebhookDto } from "./dto/gitea-pr-webhook.dto";
|
||||||
|
|
||||||
|
export interface ReviewResult {
|
||||||
|
passed: boolean;
|
||||||
|
issues: string[];
|
||||||
|
}
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class GatekeeperService {
|
||||||
|
private readonly logger = new Logger(GatekeeperService.name);
|
||||||
|
|
||||||
|
private get giteaApiBaseUrl(): string {
|
||||||
|
return `${this.configService.getOrThrow<string>("GITEA_URL")}/api/v1`;
|
||||||
|
}
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly prisma: PrismaService,
|
||||||
|
private readonly configService: ConfigService
|
||||||
|
) {}
|
||||||
|
|
||||||
|
async handlePrEvent(payload: GiteaPrWebhookDto): Promise<void> {
|
||||||
|
if (!this.isEnabled()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payload.type !== "pull_request") {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const action = payload.action;
|
||||||
|
const hasAutoMergeLabel = this.hasAutoMergeLabel(payload);
|
||||||
|
|
||||||
|
if (!["opened", "labeled", "synchronize"].includes(action)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (action === "labeled" && payload.label?.name !== "auto-merge") {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const merge = await this.prisma.pendingMerge.upsert({
|
||||||
|
where: {
|
||||||
|
repo_prNumber_headSha: {
|
||||||
|
repo: payload.repository.full_name,
|
||||||
|
prNumber: payload.pull_request.number,
|
||||||
|
headSha: payload.pull_request.head.sha,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
create: {
|
||||||
|
repo: payload.repository.full_name,
|
||||||
|
prNumber: payload.pull_request.number,
|
||||||
|
headSha: payload.pull_request.head.sha,
|
||||||
|
...(payload.sender?.login ? { requester: payload.sender.login } : {}),
|
||||||
|
giteaMergeUrl:
|
||||||
|
payload.pull_request.url ??
|
||||||
|
`${this.giteaApiBaseUrl}/repos/${payload.repository.full_name}/pulls/${String(payload.pull_request.number)}`,
|
||||||
|
},
|
||||||
|
update: {
|
||||||
|
headSha: payload.pull_request.head.sha,
|
||||||
|
...(payload.sender?.login ? { requester: payload.sender.login } : {}),
|
||||||
|
giteaMergeUrl:
|
||||||
|
payload.pull_request.url ??
|
||||||
|
`${this.giteaApiBaseUrl}/repos/${payload.repository.full_name}/pulls/${String(payload.pull_request.number)}`,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
if (action === "synchronize") {
|
||||||
|
await this.prisma.pendingMerge.update({
|
||||||
|
where: { id: merge.id },
|
||||||
|
data: {
|
||||||
|
state: hasAutoMergeLabel ? "pending" : "rejected",
|
||||||
|
ciStatus: null,
|
||||||
|
reviewResult: Prisma.DbNull,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
if (hasAutoMergeLabel) {
|
||||||
|
await this.runReview(merge.id, payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasAutoMergeLabel) {
|
||||||
|
await this.runReview(merge.id, payload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async handleCiEvent(
|
||||||
|
repo: string,
|
||||||
|
prNumber: number,
|
||||||
|
headSha: string,
|
||||||
|
status: "success" | "failure"
|
||||||
|
): Promise<void> {
|
||||||
|
if (!this.isEnabled()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const merge = await this.prisma.pendingMerge.findFirst({
|
||||||
|
where: {
|
||||||
|
repo,
|
||||||
|
prNumber,
|
||||||
|
headSha,
|
||||||
|
},
|
||||||
|
orderBy: {
|
||||||
|
createdAt: "desc",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!merge) {
|
||||||
|
this.logger.debug(`No pending merge found for ${repo}#${String(prNumber)} @ ${headSha}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.prisma.pendingMerge.update({
|
||||||
|
where: { id: merge.id },
|
||||||
|
data: {
|
||||||
|
ciStatus: status,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
if (status === "failure") {
|
||||||
|
await this.rejectMerge(merge.id, "CI reported failure");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (merge.state === "awaiting_ci") {
|
||||||
|
await this.attemptMerge(merge.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reviewPr(payload: GiteaPrWebhookDto): Promise<ReviewResult> {
|
||||||
|
const issues: string[] = [];
|
||||||
|
|
||||||
|
if (!this.hasAutoMergeLabel(payload)) {
|
||||||
|
issues.push("PR must have the auto-merge label");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!payload.pull_request.body?.trim()) {
|
||||||
|
issues.push("PR description must not be empty");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payload.pull_request.base.ref !== "main") {
|
||||||
|
issues.push("PR base branch must be main");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!/^[0-9a-f]{7,128}$/i.test(payload.pull_request.head.sha)) {
|
||||||
|
issues.push("PR head SHA must be a valid git commit hash");
|
||||||
|
}
|
||||||
|
|
||||||
|
return Promise.resolve({
|
||||||
|
passed: issues.length === 0,
|
||||||
|
issues,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async attemptMerge(mergeId: string): Promise<void> {
|
||||||
|
const merge = await this.prisma.pendingMerge.findUnique({
|
||||||
|
where: { id: mergeId },
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!merge) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const token = this.configService.get<string>("GITEA_API_TOKEN");
|
||||||
|
if (!token) {
|
||||||
|
await this.rejectMerge(merge.id, "GITEA_API_TOKEN is not configured");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.prisma.pendingMerge.update({
|
||||||
|
where: { id: merge.id },
|
||||||
|
data: { state: "merging" },
|
||||||
|
});
|
||||||
|
|
||||||
|
const mergeUrl =
|
||||||
|
merge.giteaMergeUrl ??
|
||||||
|
`${this.giteaApiBaseUrl}/repos/${merge.repo}/pulls/${String(merge.prNumber)}`;
|
||||||
|
const response = await fetch(`${mergeUrl}/merge`, {
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
Authorization: `token ${token}`,
|
||||||
|
},
|
||||||
|
body: JSON.stringify({
|
||||||
|
Do: "merge",
|
||||||
|
force_merge: true,
|
||||||
|
merge_message_field: "Auto-merged by Gatekeeper",
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
const reason = await response.text();
|
||||||
|
await this.rejectMerge(
|
||||||
|
merge.id,
|
||||||
|
`Gitea merge API rejected the request: ${String(response.status)} ${reason}`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.prisma.pendingMerge.update({
|
||||||
|
where: { id: merge.id },
|
||||||
|
data: { state: "merged" },
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async rejectMerge(mergeId: string, reason: string): Promise<void> {
|
||||||
|
const merge = await this.prisma.pendingMerge.findUnique({
|
||||||
|
where: { id: mergeId },
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!merge) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.prisma.pendingMerge.update({
|
||||||
|
where: { id: merge.id },
|
||||||
|
data: {
|
||||||
|
state: "rejected",
|
||||||
|
reviewResult: {
|
||||||
|
passed: false,
|
||||||
|
issues: [reason],
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await this.postPullRequestComment(
|
||||||
|
merge.repo,
|
||||||
|
merge.prNumber,
|
||||||
|
`Gatekeeper rejected auto-merge for \`${merge.headSha}\`: ${reason}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async runReview(mergeId: string, payload: GiteaPrWebhookDto): Promise<void> {
|
||||||
|
await this.prisma.pendingMerge.update({
|
||||||
|
where: { id: mergeId },
|
||||||
|
data: { state: "reviewing" },
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await this.reviewPr(payload);
|
||||||
|
if (!result.passed) {
|
||||||
|
await this.rejectMerge(mergeId, result.issues.join("; "));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const reviewResult: Prisma.InputJsonValue = {
|
||||||
|
passed: result.passed,
|
||||||
|
issues: result.issues,
|
||||||
|
};
|
||||||
|
|
||||||
|
await this.prisma.pendingMerge.update({
|
||||||
|
where: { id: mergeId },
|
||||||
|
data: {
|
||||||
|
state: "awaiting_ci",
|
||||||
|
reviewResult,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private hasAutoMergeLabel(payload: GiteaPrWebhookDto): boolean {
|
||||||
|
return payload.pull_request.labels.some((label) => label.name === "auto-merge");
|
||||||
|
}
|
||||||
|
|
||||||
|
private async postPullRequestComment(
|
||||||
|
repo: string,
|
||||||
|
prNumber: number,
|
||||||
|
body: string
|
||||||
|
): Promise<void> {
|
||||||
|
const token = this.configService.get<string>("GITEA_API_TOKEN");
|
||||||
|
if (!token) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Skipping PR comment for ${repo}#${String(prNumber)}; GITEA_API_TOKEN is missing`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const response = await fetch(
|
||||||
|
`${this.giteaApiBaseUrl}/repos/${repo}/issues/${String(prNumber)}/comments`,
|
||||||
|
{
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
Authorization: `token ${token}`,
|
||||||
|
},
|
||||||
|
body: JSON.stringify({ body }),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Failed to post Gatekeeper PR comment for ${repo}#${String(prNumber)}: ${String(response.status)}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private isEnabled(): boolean {
|
||||||
|
const raw = this.configService.get<string>("GATEKEEPER_ENABLED");
|
||||||
|
const enabled = raw !== "false";
|
||||||
|
|
||||||
|
if (!enabled) {
|
||||||
|
this.logger.warn("Gatekeeper is disabled via GATEKEEPER_ENABLED");
|
||||||
|
}
|
||||||
|
|
||||||
|
return enabled;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,286 @@
|
|||||||
|
import {
|
||||||
|
Body,
|
||||||
|
Controller,
|
||||||
|
Get,
|
||||||
|
Logger,
|
||||||
|
Param,
|
||||||
|
Post,
|
||||||
|
Req,
|
||||||
|
Res,
|
||||||
|
ServiceUnavailableException,
|
||||||
|
UseGuards,
|
||||||
|
} from "@nestjs/common";
|
||||||
|
import type { Request, Response } from "express";
|
||||||
|
import { AuthGuard } from "../auth/guards/auth.guard";
|
||||||
|
|
||||||
|
const ORCHESTRATOR_URL_KEY = "ORCHESTRATOR_URL";
|
||||||
|
const ORCHESTRATOR_API_KEY = "ORCHESTRATOR_API_KEY";
|
||||||
|
|
||||||
|
@Controller("mission-control")
|
||||||
|
@UseGuards(AuthGuard)
|
||||||
|
export class MissionControlProxyController {
|
||||||
|
private readonly logger = new Logger(MissionControlProxyController.name);
|
||||||
|
private readonly orchestratorUrl: string;
|
||||||
|
private readonly orchestratorApiKey: string;
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
this.orchestratorUrl = this.requireEnv(ORCHESTRATOR_URL_KEY);
|
||||||
|
this.orchestratorApiKey = this.requireEnv(ORCHESTRATOR_API_KEY);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions")
|
||||||
|
proxySessions(@Req() req: Request, @Res() res: Response): Promise<void> {
|
||||||
|
return this.proxyRequest("GET", "sessions", req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions/:sessionId")
|
||||||
|
proxySession(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("GET", `sessions/${sessionId}`, req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions/:sessionId/messages")
|
||||||
|
proxyMessages(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("GET", `sessions/${sessionId}/messages`, req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("audit-log")
|
||||||
|
proxyAuditLog(@Req() req: Request, @Res() res: Response): Promise<void> {
|
||||||
|
return this.proxyRequest("GET", "audit-log", req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/inject")
|
||||||
|
proxyInject(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Body() body: unknown,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("POST", `sessions/${sessionId}/inject`, req, res, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/pause")
|
||||||
|
proxyPause(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("POST", `sessions/${sessionId}/pause`, req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/resume")
|
||||||
|
proxyResume(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("POST", `sessions/${sessionId}/resume`, req, res);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Post("sessions/:sessionId/kill")
|
||||||
|
proxyKill(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Body() body: unknown,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
return this.proxyRequest("POST", `sessions/${sessionId}/kill`, req, res, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Get("sessions/:sessionId/stream")
|
||||||
|
async proxySessionStream(
|
||||||
|
@Param("sessionId") sessionId: string,
|
||||||
|
@Req() req: Request,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
const abortController = new AbortController();
|
||||||
|
req.once("close", () => {
|
||||||
|
abortController.abort();
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
const upstream = await this.fetchUpstream(
|
||||||
|
"GET",
|
||||||
|
`sessions/${sessionId}/stream`,
|
||||||
|
req.query,
|
||||||
|
undefined,
|
||||||
|
abortController.signal
|
||||||
|
);
|
||||||
|
|
||||||
|
if (!upstream.ok || !upstream.body) {
|
||||||
|
await this.sendStandardResponse(upstream, res);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
res.status(upstream.status);
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "content-type");
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "cache-control");
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "connection");
|
||||||
|
res.setHeader("X-Accel-Buffering", "no");
|
||||||
|
|
||||||
|
if (typeof res.flushHeaders === "function") {
|
||||||
|
res.flushHeaders();
|
||||||
|
}
|
||||||
|
|
||||||
|
for await (const chunk of upstream.body as unknown as AsyncIterable<Uint8Array>) {
|
||||||
|
if (res.writableEnded || res.destroyed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
res.write(Buffer.from(chunk));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!res.writableEnded && !res.destroyed) {
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
} catch (error: unknown) {
|
||||||
|
if (this.isAbortError(error)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
this.logger.warn(`Mission Control stream proxy request failed: ${message}`);
|
||||||
|
|
||||||
|
if (!res.headersSent) {
|
||||||
|
res.status(503).json({ message: "Failed to proxy Mission Control request" });
|
||||||
|
} else if (!res.writableEnded && !res.destroyed) {
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async proxyRequest(
|
||||||
|
method: "GET" | "POST",
|
||||||
|
path: string,
|
||||||
|
req: Request,
|
||||||
|
res: Response,
|
||||||
|
body?: unknown
|
||||||
|
): Promise<void> {
|
||||||
|
const abortController = new AbortController();
|
||||||
|
req.once("close", () => {
|
||||||
|
abortController.abort();
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
const upstream = await this.fetchUpstream(
|
||||||
|
method,
|
||||||
|
path,
|
||||||
|
req.query,
|
||||||
|
body,
|
||||||
|
abortController.signal
|
||||||
|
);
|
||||||
|
await this.sendStandardResponse(upstream, res);
|
||||||
|
} catch (error: unknown) {
|
||||||
|
if (this.isAbortError(error)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.handleProxyError(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async fetchUpstream(
|
||||||
|
method: "GET" | "POST",
|
||||||
|
path: string,
|
||||||
|
query: Request["query"],
|
||||||
|
body: unknown,
|
||||||
|
signal: AbortSignal
|
||||||
|
): Promise<globalThis.Response> {
|
||||||
|
const url = new URL(`/api/mission-control/${path}`, this.orchestratorUrl);
|
||||||
|
this.appendQueryParams(url.searchParams, query);
|
||||||
|
|
||||||
|
const headers: Record<string, string> = {
|
||||||
|
"X-API-Key": this.orchestratorApiKey,
|
||||||
|
};
|
||||||
|
|
||||||
|
const requestInit: RequestInit = {
|
||||||
|
method,
|
||||||
|
headers,
|
||||||
|
signal,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (method === "POST" && body !== undefined) {
|
||||||
|
headers["Content-Type"] = "application/json";
|
||||||
|
requestInit.body = JSON.stringify(body);
|
||||||
|
}
|
||||||
|
|
||||||
|
return fetch(url.toString(), requestInit);
|
||||||
|
}
|
||||||
|
|
||||||
|
private appendQueryParams(searchParams: URLSearchParams, query: Request["query"]): void {
|
||||||
|
for (const [key, value] of Object.entries(query)) {
|
||||||
|
this.appendQueryValue(searchParams, key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private appendQueryValue(searchParams: URLSearchParams, key: string, value: unknown): void {
|
||||||
|
if (value === undefined || value === null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Array.isArray(value)) {
|
||||||
|
for (const item of value) {
|
||||||
|
this.appendQueryValue(searchParams, key, item);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof value === "string" || typeof value === "number" || typeof value === "boolean") {
|
||||||
|
searchParams.append(key, String(value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async sendStandardResponse(upstream: globalThis.Response, res: Response): Promise<void> {
|
||||||
|
res.status(upstream.status);
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "content-type");
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "cache-control");
|
||||||
|
this.copyHeaderIfPresent(upstream, res, "location");
|
||||||
|
|
||||||
|
const responseText = await upstream.text();
|
||||||
|
|
||||||
|
if (responseText.length === 0) {
|
||||||
|
res.end();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
res.send(responseText);
|
||||||
|
}
|
||||||
|
|
||||||
|
private copyHeaderIfPresent(
|
||||||
|
upstream: globalThis.Response,
|
||||||
|
res: Response,
|
||||||
|
headerName: string
|
||||||
|
): void {
|
||||||
|
const value = upstream.headers.get(headerName);
|
||||||
|
if (value) {
|
||||||
|
res.setHeader(headerName, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleProxyError(error: unknown): never {
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
this.logger.warn(`Mission Control proxy request failed: ${message}`);
|
||||||
|
throw new ServiceUnavailableException("Failed to proxy Mission Control request");
|
||||||
|
}
|
||||||
|
|
||||||
|
private isAbortError(error: unknown): boolean {
|
||||||
|
return error instanceof Error && error.name === "AbortError";
|
||||||
|
}
|
||||||
|
|
||||||
|
private requireEnv(key: string): string {
|
||||||
|
const value = process.env[key];
|
||||||
|
|
||||||
|
if (typeof value !== "string" || value.trim().length === 0) {
|
||||||
|
throw new Error(`@mosaic/api: ${key} is required. Set it in your config or via ${key}.`);
|
||||||
|
}
|
||||||
|
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,9 @@
|
|||||||
|
import { Module } from "@nestjs/common";
|
||||||
|
import { AuthModule } from "../auth/auth.module";
|
||||||
|
import { MissionControlProxyController } from "./mission-control-proxy.controller";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [AuthModule],
|
||||||
|
controllers: [MissionControlProxyController],
|
||||||
|
})
|
||||||
|
export class MissionControlProxyModule {}
|
||||||
@@ -2,12 +2,14 @@ import { Module } from "@nestjs/common";
|
|||||||
import { ConfigModule } from "@nestjs/config";
|
import { ConfigModule } from "@nestjs/config";
|
||||||
import { AuthModule } from "../auth/auth.module";
|
import { AuthModule } from "../auth/auth.module";
|
||||||
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
||||||
|
import { GatekeeperModule } from "../gatekeeper/gatekeeper.module";
|
||||||
import { QueueNotificationsController } from "./queue-notifications.controller";
|
import { QueueNotificationsController } from "./queue-notifications.controller";
|
||||||
import { QueueNotificationsService } from "./queue-notifications.service";
|
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||||
|
import { WoodpeckerWebhookController } from "./woodpecker-webhook.controller";
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [ConfigModule, AuthModule],
|
imports: [ConfigModule, AuthModule, GatekeeperModule],
|
||||||
controllers: [QueueNotificationsController],
|
controllers: [QueueNotificationsController, WoodpeckerWebhookController],
|
||||||
providers: [QueueNotificationsService, ApiKeyGuard],
|
providers: [QueueNotificationsService, ApiKeyGuard],
|
||||||
exports: [QueueNotificationsService],
|
exports: [QueueNotificationsService],
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
import { ConfigService } from "@nestjs/config";
|
import { ConfigService } from "@nestjs/config";
|
||||||
import { Logger, NotFoundException } from "@nestjs/common";
|
import { Logger, NotFoundException } from "@nestjs/common";
|
||||||
import { mkdtemp, mkdir, rm, writeFile } from "node:fs/promises";
|
import { mkdtemp, mkdir, readFile, readdir, rm, writeFile } from "node:fs/promises";
|
||||||
import { tmpdir } from "node:os";
|
import { tmpdir } from "node:os";
|
||||||
import { join } from "node:path";
|
import { join } from "node:path";
|
||||||
import { execFile } from "node:child_process";
|
import { execFile } from "node:child_process";
|
||||||
@@ -14,17 +14,23 @@ vi.mock("node:child_process", () => ({
|
|||||||
describe("QueueNotificationsService", () => {
|
describe("QueueNotificationsService", () => {
|
||||||
let service: QueueNotificationsService;
|
let service: QueueNotificationsService;
|
||||||
let inboxDir: string;
|
let inboxDir: string;
|
||||||
|
let agentStateDir: string;
|
||||||
let configService: ConfigService;
|
let configService: ConfigService;
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
vi.clearAllMocks();
|
vi.clearAllMocks();
|
||||||
inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-"));
|
inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-"));
|
||||||
|
agentStateDir = await mkdtemp(join(tmpdir(), "agent-state-"));
|
||||||
configService = {
|
configService = {
|
||||||
get: vi.fn((key: string) => {
|
get: vi.fn((key: string) => {
|
||||||
if (key === "MOSAIC_QUEUE_INBOX_DIR") {
|
if (key === "MOSAIC_QUEUE_INBOX_DIR") {
|
||||||
return inboxDir;
|
return inboxDir;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (key === "MOSAIC_AGENT_STATE_DIR") {
|
||||||
|
return agentStateDir;
|
||||||
|
}
|
||||||
|
|
||||||
if (key === "MOSAIC_QUEUE_CLI") {
|
if (key === "MOSAIC_QUEUE_CLI") {
|
||||||
return "/tmp/mosaic-queue-cli.js";
|
return "/tmp/mosaic-queue-cli.js";
|
||||||
}
|
}
|
||||||
@@ -39,6 +45,7 @@ describe("QueueNotificationsService", () => {
|
|||||||
afterEach(async () => {
|
afterEach(async () => {
|
||||||
vi.restoreAllMocks();
|
vi.restoreAllMocks();
|
||||||
await rm(inboxDir, { recursive: true, force: true });
|
await rm(inboxDir, { recursive: true, force: true });
|
||||||
|
await rm(agentStateDir, { recursive: true, force: true });
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("onModuleInit", () => {
|
describe("onModuleInit", () => {
|
||||||
@@ -169,4 +176,93 @@ describe("QueueNotificationsService", () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("notifyAgentCiResult", () => {
|
||||||
|
it("writes one notification per active agent-state record matching the branch", async () => {
|
||||||
|
await mkdir(join(agentStateDir, "active"), { recursive: true });
|
||||||
|
await writeFile(
|
||||||
|
join(agentStateDir, "active", "sage-landing-page.json"),
|
||||||
|
JSON.stringify({
|
||||||
|
taskId: "sage-landing-page",
|
||||||
|
status: "spawned",
|
||||||
|
startedAt: "2026-03-08T22:47:20Z",
|
||||||
|
lastUpdated: "2026-03-08T23:15:11.726Z",
|
||||||
|
agent: "sage",
|
||||||
|
branch: "feature/landing-page",
|
||||||
|
description: "Create apps/landing marketing site",
|
||||||
|
})
|
||||||
|
);
|
||||||
|
await writeFile(
|
||||||
|
join(agentStateDir, "active", "pixels-other.json"),
|
||||||
|
JSON.stringify({
|
||||||
|
taskId: "pixels-other",
|
||||||
|
status: "spawned",
|
||||||
|
startedAt: "2026-03-08T22:47:20Z",
|
||||||
|
lastUpdated: "2026-03-08T23:15:11.726Z",
|
||||||
|
agent: "pixels",
|
||||||
|
branch: "feature/something-else",
|
||||||
|
description: "Unrelated",
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
service.notifyAgentCiResult({
|
||||||
|
branch: "feature/landing-page",
|
||||||
|
status: "success",
|
||||||
|
buildUrl: "https://ci.example/build/123",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
})
|
||||||
|
).resolves.toEqual({ notified: 1 });
|
||||||
|
|
||||||
|
const inboxFiles = await readdir(join(inboxDir, "sage"));
|
||||||
|
expect(inboxFiles).toHaveLength(1);
|
||||||
|
|
||||||
|
const notification = JSON.parse(
|
||||||
|
await readFile(join(inboxDir, "sage", inboxFiles[0]!), "utf8")
|
||||||
|
) as Record<string, unknown>;
|
||||||
|
|
||||||
|
expect(notification).toMatchObject({
|
||||||
|
taskId: "sage-landing-page",
|
||||||
|
event: "completed",
|
||||||
|
targetAgent: "sage",
|
||||||
|
fromAgent: "mosaic-api",
|
||||||
|
retries: 0,
|
||||||
|
maxRetries: 3,
|
||||||
|
ttlSeconds: 600,
|
||||||
|
payload: {
|
||||||
|
branch: "feature/landing-page",
|
||||||
|
buildUrl: "https://ci.example/build/123",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
ciStatus: "success",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
expect(typeof notification.id).toBe("string");
|
||||||
|
expect(typeof notification.createdAt).toBe("string");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns zero when no active agent-state branch matches the webhook payload", async () => {
|
||||||
|
await mkdir(join(agentStateDir, "active"), { recursive: true });
|
||||||
|
await writeFile(
|
||||||
|
join(agentStateDir, "active", "mosaic-task.json"),
|
||||||
|
JSON.stringify({
|
||||||
|
taskId: "mosaic-task",
|
||||||
|
status: "spawned",
|
||||||
|
startedAt: "2026-03-08T22:47:20Z",
|
||||||
|
lastUpdated: "2026-03-08T23:15:11.726Z",
|
||||||
|
agent: "mosaic",
|
||||||
|
branch: "feature/not-this-one",
|
||||||
|
description: "Unrelated",
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
service.notifyAgentCiResult({
|
||||||
|
branch: "feature/landing-page",
|
||||||
|
status: "failure",
|
||||||
|
buildUrl: "https://ci.example/build/456",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
})
|
||||||
|
).resolves.toEqual({ notified: 0 });
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -6,8 +6,9 @@ import {
|
|||||||
OnModuleInit,
|
OnModuleInit,
|
||||||
} from "@nestjs/common";
|
} from "@nestjs/common";
|
||||||
import { ConfigService } from "@nestjs/config";
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { randomUUID } from "node:crypto";
|
||||||
import { execFile } from "node:child_process";
|
import { execFile } from "node:child_process";
|
||||||
import { access, readdir, readFile, stat } from "node:fs/promises";
|
import { access, mkdir, readdir, readFile, stat, writeFile } from "node:fs/promises";
|
||||||
import { homedir } from "node:os";
|
import { homedir } from "node:os";
|
||||||
import { basename, join } from "node:path";
|
import { basename, join } from "node:path";
|
||||||
import type { Response } from "express";
|
import type { Response } from "express";
|
||||||
@@ -29,6 +30,30 @@ export interface QueueTask {
|
|||||||
description: string;
|
description: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface AgentStateRecord {
|
||||||
|
taskId: string;
|
||||||
|
agent: string;
|
||||||
|
branch: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface AgentCiNotification {
|
||||||
|
id: string;
|
||||||
|
taskId: string;
|
||||||
|
event: "completed" | "failed";
|
||||||
|
targetAgent: string;
|
||||||
|
fromAgent: string;
|
||||||
|
payload: {
|
||||||
|
branch: string;
|
||||||
|
buildUrl: string;
|
||||||
|
repo: string;
|
||||||
|
ciStatus: "success" | "failure";
|
||||||
|
};
|
||||||
|
createdAt: string;
|
||||||
|
retries: number;
|
||||||
|
maxRetries: number;
|
||||||
|
ttlSeconds: number;
|
||||||
|
}
|
||||||
|
|
||||||
@Injectable()
|
@Injectable()
|
||||||
export class QueueNotificationsService implements OnModuleInit {
|
export class QueueNotificationsService implements OnModuleInit {
|
||||||
private readonly logger = new Logger(QueueNotificationsService.name);
|
private readonly logger = new Logger(QueueNotificationsService.name);
|
||||||
@@ -171,6 +196,52 @@ export class QueueNotificationsService implements OnModuleInit {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async notifyAgentCiResult(opts: {
|
||||||
|
branch: string;
|
||||||
|
status: "success" | "failure";
|
||||||
|
buildUrl: string;
|
||||||
|
repo: string;
|
||||||
|
}): Promise<{ notified: number }> {
|
||||||
|
const activeRecords = await this.listActiveAgentStateRecords();
|
||||||
|
const matches = activeRecords.filter((record) => record.branch === opts.branch);
|
||||||
|
|
||||||
|
await Promise.all(
|
||||||
|
matches.map(async (record) => {
|
||||||
|
const notification: AgentCiNotification = {
|
||||||
|
id: randomUUID(),
|
||||||
|
taskId: record.taskId,
|
||||||
|
event: opts.status === "success" ? "completed" : "failed",
|
||||||
|
targetAgent: record.agent,
|
||||||
|
fromAgent: "mosaic-api",
|
||||||
|
payload: {
|
||||||
|
branch: opts.branch,
|
||||||
|
buildUrl: opts.buildUrl,
|
||||||
|
repo: opts.repo,
|
||||||
|
ciStatus: opts.status,
|
||||||
|
},
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
retries: 0,
|
||||||
|
maxRetries: 3,
|
||||||
|
ttlSeconds: 600,
|
||||||
|
};
|
||||||
|
|
||||||
|
const agentDir = join(this.getInboxDir(), record.agent);
|
||||||
|
// Path is built from the configured inbox root plus validated agent-state entries.
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
await mkdir(agentDir, { recursive: true });
|
||||||
|
// Path is built from the configured inbox root plus validated agent-state entries.
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
await writeFile(
|
||||||
|
join(agentDir, `${notification.id}.json`),
|
||||||
|
JSON.stringify(notification, null, 2),
|
||||||
|
"utf8"
|
||||||
|
);
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
return { notified: matches.length };
|
||||||
|
}
|
||||||
|
|
||||||
private async execQueueCli(args: string[]): Promise<string> {
|
private async execQueueCli(args: string[]): Promise<string> {
|
||||||
const cliPath = this.getQueueCliPath();
|
const cliPath = this.getQueueCliPath();
|
||||||
|
|
||||||
@@ -198,6 +269,12 @@ export class QueueNotificationsService implements OnModuleInit {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private getAgentStateDir(): string {
|
||||||
|
return this.expandHomePath(
|
||||||
|
this.configService.get<string>("MOSAIC_AGENT_STATE_DIR") ?? "~/.openclaw/workspace/agents"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
private getQueueCliPath(): string {
|
private getQueueCliPath(): string {
|
||||||
return this.expandHomePath(
|
return this.expandHomePath(
|
||||||
this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js"
|
this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js"
|
||||||
@@ -228,4 +305,37 @@ export class QueueNotificationsService implements OnModuleInit {
|
|||||||
private isIgnoredDirectory(name: string): boolean {
|
private isIgnoredDirectory(name: string): boolean {
|
||||||
return name === "_acked" || name === "_dead-letter";
|
return name === "_acked" || name === "_dead-letter";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async listActiveAgentStateRecords(): Promise<AgentStateRecord[]> {
|
||||||
|
const activeDir = join(this.getAgentStateDir(), "active");
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Path comes from controlled config and a fixed "active" subdirectory.
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
const entries = await readdir(activeDir, { withFileTypes: true });
|
||||||
|
const records = await Promise.all(
|
||||||
|
entries
|
||||||
|
.filter((entry) => entry.isFile() && entry.name.endsWith(".json"))
|
||||||
|
.map(async (entry) => {
|
||||||
|
const filePath = join(activeDir, entry.name);
|
||||||
|
// Path comes from controlled config plus directory entries under the active root.
|
||||||
|
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||||
|
const rawRecord = await readFile(filePath, "utf8");
|
||||||
|
return JSON.parse(rawRecord) as AgentStateRecord;
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
return records.filter((record) => {
|
||||||
|
return (
|
||||||
|
typeof record.taskId === "string" &&
|
||||||
|
typeof record.agent === "string" &&
|
||||||
|
typeof record.branch === "string"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
} catch (error: unknown) {
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
this.logger.warn(`Unable to read active agent-state records from ${activeDir}: ${message}`);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,163 @@
|
|||||||
|
import { createHmac } from "node:crypto";
|
||||||
|
import { Logger } from "@nestjs/common";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { Test, type TestingModule } from "@nestjs/testing";
|
||||||
|
import type { Request } from "express";
|
||||||
|
import { describe, beforeEach, expect, it, vi } from "vitest";
|
||||||
|
import { GatekeeperService } from "../gatekeeper/gatekeeper.service";
|
||||||
|
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||||
|
import {
|
||||||
|
type WoodpeckerWebhookPayload,
|
||||||
|
WoodpeckerWebhookController,
|
||||||
|
} from "./woodpecker-webhook.controller";
|
||||||
|
|
||||||
|
function signPayload(payload: WoodpeckerWebhookPayload, secret: string): string {
|
||||||
|
return createHmac("sha256", secret).update(JSON.stringify(payload)).digest("hex");
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("WoodpeckerWebhookController", () => {
|
||||||
|
let controller: WoodpeckerWebhookController;
|
||||||
|
|
||||||
|
const mockService = {
|
||||||
|
notifyAgentCiResult: vi.fn(),
|
||||||
|
};
|
||||||
|
const mockGatekeeperService = {
|
||||||
|
handleCiEvent: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const mockConfigService = {
|
||||||
|
get: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
mockConfigService.get.mockImplementation((key: string) => {
|
||||||
|
if (key === "WOODPECKER_WEBHOOK_SECRET") {
|
||||||
|
return "test-secret";
|
||||||
|
}
|
||||||
|
|
||||||
|
return undefined;
|
||||||
|
});
|
||||||
|
|
||||||
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
|
controllers: [WoodpeckerWebhookController],
|
||||||
|
providers: [
|
||||||
|
{ provide: QueueNotificationsService, useValue: mockService },
|
||||||
|
{ provide: GatekeeperService, useValue: mockGatekeeperService },
|
||||||
|
{ provide: ConfigService, useValue: mockConfigService },
|
||||||
|
],
|
||||||
|
}).compile();
|
||||||
|
|
||||||
|
controller = module.get<WoodpeckerWebhookController>(WoodpeckerWebhookController);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("accepts a valid signature and forwards the payload to the service", async () => {
|
||||||
|
const payload: WoodpeckerWebhookPayload = {
|
||||||
|
branch: "feat/ms24-ci-webhook",
|
||||||
|
status: "success",
|
||||||
|
buildUrl: "https://ci.example/build/123",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
prNumber: 42,
|
||||||
|
headSha: "abcdef1234567890",
|
||||||
|
};
|
||||||
|
const signature = signPayload(payload, "test-secret");
|
||||||
|
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 2 });
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
controller.handleWebhook(
|
||||||
|
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
|
||||||
|
payload,
|
||||||
|
signature
|
||||||
|
)
|
||||||
|
).resolves.toEqual({ ok: true, notified: 2 });
|
||||||
|
|
||||||
|
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
|
||||||
|
expect(mockGatekeeperService.handleCiEvent).toHaveBeenCalledWith(
|
||||||
|
"mosaic/stack",
|
||||||
|
42,
|
||||||
|
"abcdef1234567890",
|
||||||
|
"success"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns ok without notifying when the signature is invalid", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
const payload: WoodpeckerWebhookPayload = {
|
||||||
|
branch: "feat/ms24-ci-webhook",
|
||||||
|
status: "failure",
|
||||||
|
buildUrl: "https://ci.example/build/123",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
};
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
controller.handleWebhook(
|
||||||
|
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
|
||||||
|
payload,
|
||||||
|
"bad-signature"
|
||||||
|
)
|
||||||
|
).resolves.toEqual({ ok: true, notified: 0 });
|
||||||
|
|
||||||
|
expect(mockService.notifyAgentCiResult).not.toHaveBeenCalled();
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("invalid Woodpecker webhook signature")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("accepts the payload when the webhook secret is missing", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
const payload: WoodpeckerWebhookPayload = {
|
||||||
|
branch: "feat/ms24-ci-webhook",
|
||||||
|
status: "success",
|
||||||
|
buildUrl: "https://ci.example/build/123",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
};
|
||||||
|
mockConfigService.get.mockReturnValue(undefined);
|
||||||
|
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 1 });
|
||||||
|
|
||||||
|
await expect(controller.handleWebhook({} as Request, payload, "")).resolves.toEqual({
|
||||||
|
ok: true,
|
||||||
|
notified: 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(mockService.notifyAgentCiResult).toHaveBeenCalledWith(payload);
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("WOODPECKER_WEBHOOK_SECRET is not configured")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns zero notifications when no active branch matches", async () => {
|
||||||
|
const payload: WoodpeckerWebhookPayload = {
|
||||||
|
branch: "feat/ms24-ci-webhook",
|
||||||
|
status: "success",
|
||||||
|
buildUrl: "https://ci.example/build/999",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
};
|
||||||
|
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 0 });
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
controller.handleWebhook(
|
||||||
|
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
|
||||||
|
payload,
|
||||||
|
signPayload(payload, "test-secret")
|
||||||
|
)
|
||||||
|
).resolves.toEqual({ ok: true, notified: 0 });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("does not call Gatekeeper when the PR metadata is missing", async () => {
|
||||||
|
const payload: WoodpeckerWebhookPayload = {
|
||||||
|
branch: "feat/ms24-ci-webhook",
|
||||||
|
status: "success",
|
||||||
|
buildUrl: "https://ci.example/build/555",
|
||||||
|
repo: "mosaic/stack",
|
||||||
|
};
|
||||||
|
mockService.notifyAgentCiResult.mockResolvedValue({ notified: 1 });
|
||||||
|
|
||||||
|
await controller.handleWebhook(
|
||||||
|
{ rawBody: Buffer.from(JSON.stringify(payload)) } as Request,
|
||||||
|
payload,
|
||||||
|
signPayload(payload, "test-secret")
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(mockGatekeeperService.handleCiEvent).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,90 @@
|
|||||||
|
import { Body, Controller, Headers, Logger, Post, Req, type RawBodyRequest } from "@nestjs/common";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { createHmac, timingSafeEqual } from "node:crypto";
|
||||||
|
import type { Request } from "express";
|
||||||
|
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
|
||||||
|
import { GatekeeperService } from "../gatekeeper/gatekeeper.service";
|
||||||
|
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||||
|
|
||||||
|
export interface WoodpeckerWebhookPayload {
|
||||||
|
branch: string;
|
||||||
|
status: "success" | "failure";
|
||||||
|
buildUrl: string;
|
||||||
|
repo: string;
|
||||||
|
prNumber?: number;
|
||||||
|
headSha?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Controller("webhooks")
|
||||||
|
export class WoodpeckerWebhookController {
|
||||||
|
private readonly logger = new Logger(WoodpeckerWebhookController.name);
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly queueService: QueueNotificationsService,
|
||||||
|
private readonly gatekeeperService: GatekeeperService,
|
||||||
|
private readonly configService: ConfigService
|
||||||
|
) {}
|
||||||
|
|
||||||
|
@SkipCsrf()
|
||||||
|
@Post("woodpecker")
|
||||||
|
async handleWebhook(
|
||||||
|
@Req() req: RawBodyRequest<Request>,
|
||||||
|
@Body() body: WoodpeckerWebhookPayload,
|
||||||
|
@Headers("x-woodpecker-signature") signature: string
|
||||||
|
): Promise<{ ok: boolean; notified: number }> {
|
||||||
|
const secret = this.configService.get<string>("WOODPECKER_WEBHOOK_SECRET");
|
||||||
|
|
||||||
|
if (!secret) {
|
||||||
|
this.logger.warn("WOODPECKER_WEBHOOK_SECRET is not configured; accepting Woodpecker webhook");
|
||||||
|
const result = await this.queueService.notifyAgentCiResult(body);
|
||||||
|
await this.forwardCiStatusToGatekeeper(body);
|
||||||
|
return { ok: true, notified: result.notified };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.isValidSignature(this.getRequestBody(req, body), signature, secret)) {
|
||||||
|
this.logger.warn("Received invalid Woodpecker webhook signature");
|
||||||
|
return { ok: true, notified: 0 };
|
||||||
|
}
|
||||||
|
|
||||||
|
const result = await this.queueService.notifyAgentCiResult(body);
|
||||||
|
await this.forwardCiStatusToGatekeeper(body);
|
||||||
|
return { ok: true, notified: result.notified };
|
||||||
|
}
|
||||||
|
|
||||||
|
private async forwardCiStatusToGatekeeper(body: WoodpeckerWebhookPayload): Promise<void> {
|
||||||
|
if (typeof body.prNumber === "number" && typeof body.headSha === "string") {
|
||||||
|
await this.gatekeeperService.handleCiEvent(
|
||||||
|
body.repo,
|
||||||
|
body.prNumber,
|
||||||
|
body.headSha,
|
||||||
|
body.status
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private getRequestBody(req: RawBodyRequest<Request>, body: WoodpeckerWebhookPayload): Buffer {
|
||||||
|
const rawBody = req.rawBody;
|
||||||
|
|
||||||
|
if (Buffer.isBuffer(rawBody)) {
|
||||||
|
return rawBody;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Buffer.from(JSON.stringify(body));
|
||||||
|
}
|
||||||
|
|
||||||
|
private isValidSignature(body: Buffer, signature: string | undefined, secret: string): boolean {
|
||||||
|
if (!signature) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const expected = createHmac("sha256", secret).update(body).digest("hex");
|
||||||
|
const signatureBuffer = Buffer.from(signature);
|
||||||
|
const expectedBuffer = Buffer.from(expected);
|
||||||
|
|
||||||
|
if (signatureBuffer.length !== expectedBuffer.length) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return timingSafeEqual(signatureBuffer, expectedBuffer);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -37,7 +37,7 @@
|
|||||||
"ioredis": "^5.9.2",
|
"ioredis": "^5.9.2",
|
||||||
"reflect-metadata": "^0.2.2",
|
"reflect-metadata": "^0.2.2",
|
||||||
"rxjs": "^7.8.1",
|
"rxjs": "^7.8.1",
|
||||||
"simple-git": "^3.27.0",
|
"simple-git": "^3.32.3",
|
||||||
"zod": "^3.24.1"
|
"zod": "^3.24.1"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
|||||||
@@ -0,0 +1,57 @@
|
|||||||
|
import { NextResponse } from "next/server";
|
||||||
|
|
||||||
|
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
|
||||||
|
|
||||||
|
function getOrchestratorUrl(): string {
|
||||||
|
return (
|
||||||
|
process.env.ORCHESTRATOR_URL ??
|
||||||
|
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
|
||||||
|
process.env.NEXT_PUBLIC_API_URL ??
|
||||||
|
DEFAULT_ORCHESTRATOR_URL
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
export const dynamic = "force-dynamic";
|
||||||
|
|
||||||
|
export async function GET(): Promise<NextResponse> {
|
||||||
|
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
|
||||||
|
if (!orchestratorApiKey) {
|
||||||
|
return NextResponse.json(
|
||||||
|
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
|
||||||
|
{ status: 503 }
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const upstream = await fetch(`${getOrchestratorUrl()}/api/queue/notifications/stream`, {
|
||||||
|
method: "GET",
|
||||||
|
headers: {
|
||||||
|
"X-API-Key": orchestratorApiKey,
|
||||||
|
Accept: "text/event-stream",
|
||||||
|
},
|
||||||
|
cache: "no-store",
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!upstream.ok || upstream.body === null) {
|
||||||
|
const message = await upstream.text();
|
||||||
|
return new NextResponse(message || "Failed to connect to queue notifications stream", {
|
||||||
|
status: upstream.status || 502,
|
||||||
|
headers: {
|
||||||
|
"Content-Type": upstream.headers.get("Content-Type") ?? "text/plain; charset=utf-8",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return new NextResponse(upstream.body, {
|
||||||
|
status: upstream.status,
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "text/event-stream",
|
||||||
|
"Cache-Control": "no-cache, no-transform",
|
||||||
|
Connection: "keep-alive",
|
||||||
|
"X-Accel-Buffering": "no",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
} catch {
|
||||||
|
return NextResponse.json({ error: "Unable to reach orchestrator." }, { status: 502 });
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
"use client";
|
"use client";
|
||||||
|
|
||||||
import { useEffect, useState } from "react";
|
import { useEffect, useMemo, useState } from "react";
|
||||||
import Link from "next/link";
|
import Link from "next/link";
|
||||||
import { usePathname } from "next/navigation";
|
import { usePathname } from "next/navigation";
|
||||||
import Image from "next/image";
|
import Image from "next/image";
|
||||||
@@ -29,6 +29,10 @@ interface NavGroup {
|
|||||||
items: NavItemConfig[];
|
items: NavItemConfig[];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface QueueNotification {
|
||||||
|
id: string;
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// SVG Icons (16x16 viewBox, stroke="currentColor", strokeWidth="1.5")
|
// SVG Icons (16x16 viewBox, stroke="currentColor", strokeWidth="1.5")
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
@@ -685,6 +689,72 @@ function CollapseToggle({ collapsed, onToggle }: CollapseToggleProps): React.JSX
|
|||||||
export function AppSidebar(): React.JSX.Element {
|
export function AppSidebar(): React.JSX.Element {
|
||||||
const pathname = usePathname();
|
const pathname = usePathname();
|
||||||
const { collapsed, toggleCollapsed, mobileOpen, setMobileOpen, isMobile } = useSidebar();
|
const { collapsed, toggleCollapsed, mobileOpen, setMobileOpen, isMobile } = useSidebar();
|
||||||
|
const [missionControlBadgeCount, setMissionControlBadgeCount] = useState<number>(0);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
let isActive = true;
|
||||||
|
|
||||||
|
const loadNotificationCount = async (): Promise<void> => {
|
||||||
|
try {
|
||||||
|
const response = await fetch("/api/orchestrator/api/queue/notifications", {
|
||||||
|
method: "GET",
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const payload = (await response.json()) as QueueNotification[];
|
||||||
|
if (isActive) {
|
||||||
|
setMissionControlBadgeCount(Array.isArray(payload) ? payload.length : 0);
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Ignore badge failures in the nav.
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
void loadNotificationCount();
|
||||||
|
|
||||||
|
if (typeof EventSource === "undefined") {
|
||||||
|
return (): void => {
|
||||||
|
isActive = false;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const source = new EventSource("/api/orchestrator/queue/notifications/stream");
|
||||||
|
|
||||||
|
source.onmessage = (event: MessageEvent<string>): void => {
|
||||||
|
try {
|
||||||
|
const payload = JSON.parse(event.data) as QueueNotification[];
|
||||||
|
if (isActive) {
|
||||||
|
setMissionControlBadgeCount(Array.isArray(payload) ? payload.length : 0);
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// Ignore malformed badge updates.
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
return (): void => {
|
||||||
|
isActive = false;
|
||||||
|
source.close();
|
||||||
|
};
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
const navGroups = useMemo(
|
||||||
|
() =>
|
||||||
|
NAV_GROUPS.map((group) => ({
|
||||||
|
...group,
|
||||||
|
items: group.items.map((item) =>
|
||||||
|
item.href === "/mission-control" && missionControlBadgeCount > 0
|
||||||
|
? {
|
||||||
|
...item,
|
||||||
|
badge: { label: String(missionControlBadgeCount) },
|
||||||
|
}
|
||||||
|
: item
|
||||||
|
),
|
||||||
|
})),
|
||||||
|
[missionControlBadgeCount]
|
||||||
|
);
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<>
|
<>
|
||||||
@@ -722,7 +792,7 @@ export function AppSidebar(): React.JSX.Element {
|
|||||||
}}
|
}}
|
||||||
aria-label="Main navigation"
|
aria-label="Main navigation"
|
||||||
>
|
>
|
||||||
{NAV_GROUPS.map((group) => (
|
{navGroups.map((group) => (
|
||||||
<div key={group.label} style={{ marginBottom: "18px" }}>
|
<div key={group.label} style={{ marginBottom: "18px" }}>
|
||||||
{/* Group label — hidden when collapsed */}
|
{/* Group label — hidden when collapsed */}
|
||||||
{!collapsed && (
|
{!collapsed && (
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ interface MockButtonProps extends ButtonHTMLAttributes<HTMLButtonElement> {
|
|||||||
|
|
||||||
const mockGlobalAgentRoster = vi.fn();
|
const mockGlobalAgentRoster = vi.fn();
|
||||||
const mockMissionControlPanel = vi.fn();
|
const mockMissionControlPanel = vi.fn();
|
||||||
|
const mockQueueNotificationFeed = vi.fn();
|
||||||
|
|
||||||
vi.mock("@/components/mission-control/AuditLogDrawer", () => ({
|
vi.mock("@/components/mission-control/AuditLogDrawer", () => ({
|
||||||
AuditLogDrawer: ({ trigger }: { trigger: ReactNode }): React.JSX.Element => (
|
AuditLogDrawer: ({ trigger }: { trigger: ReactNode }): React.JSX.Element => (
|
||||||
@@ -31,6 +32,13 @@ vi.mock("@/components/mission-control/MissionControlPanel", () => ({
|
|||||||
MIN_PANEL_COUNT: 1,
|
MIN_PANEL_COUNT: 1,
|
||||||
}));
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/mission-control/QueueNotificationFeed", () => ({
|
||||||
|
QueueNotificationFeed: (props: unknown): React.JSX.Element => {
|
||||||
|
mockQueueNotificationFeed(props);
|
||||||
|
return <div data-testid="queue-notification-feed" />;
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
vi.mock("@/components/ui/button", () => ({
|
vi.mock("@/components/ui/button", () => ({
|
||||||
Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => (
|
Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => (
|
||||||
<button {...props}>{children}</button>
|
<button {...props}>{children}</button>
|
||||||
@@ -66,5 +74,6 @@ describe("MissionControlLayout", (): void => {
|
|||||||
expect(region.querySelector("main")).toBeInTheDocument();
|
expect(region.querySelector("main")).toBeInTheDocument();
|
||||||
expect(screen.getByTestId("global-agent-roster")).toBeInTheDocument();
|
expect(screen.getByTestId("global-agent-roster")).toBeInTheDocument();
|
||||||
expect(screen.getByTestId("mission-control-panel")).toBeInTheDocument();
|
expect(screen.getByTestId("mission-control-panel")).toBeInTheDocument();
|
||||||
|
expect(screen.getByTestId("queue-notification-feed")).toBeInTheDocument();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import {
|
|||||||
MissionControlPanel,
|
MissionControlPanel,
|
||||||
type PanelConfig,
|
type PanelConfig,
|
||||||
} from "@/components/mission-control/MissionControlPanel";
|
} from "@/components/mission-control/MissionControlPanel";
|
||||||
|
import { QueueNotificationFeed } from "@/components/mission-control/QueueNotificationFeed";
|
||||||
import { Button } from "@/components/ui/button";
|
import { Button } from "@/components/ui/button";
|
||||||
|
|
||||||
const INITIAL_PANELS: PanelConfig[] = [{}];
|
const INITIAL_PANELS: PanelConfig[] = [{}];
|
||||||
@@ -94,7 +95,7 @@ export function MissionControlLayout(): React.JSX.Element {
|
|||||||
/>
|
/>
|
||||||
</header>
|
</header>
|
||||||
|
|
||||||
<div className="grid min-h-0 flex-1 gap-4 xl:grid-cols-[280px_minmax(0,1fr)]">
|
<div className="grid min-h-0 flex-1 gap-4 xl:grid-cols-[280px_minmax(0,1fr)_320px]">
|
||||||
<aside className="h-full min-h-0">
|
<aside className="h-full min-h-0">
|
||||||
<GlobalAgentRoster
|
<GlobalAgentRoster
|
||||||
onSelectSession={handleSelectSession}
|
onSelectSession={handleSelectSession}
|
||||||
@@ -109,6 +110,9 @@ export function MissionControlLayout(): React.JSX.Element {
|
|||||||
onExpandPanel={handleExpandPanel}
|
onExpandPanel={handleExpandPanel}
|
||||||
/>
|
/>
|
||||||
</main>
|
</main>
|
||||||
|
<aside className="h-full min-h-0">
|
||||||
|
<QueueNotificationFeed />
|
||||||
|
</aside>
|
||||||
</div>
|
</div>
|
||||||
</section>
|
</section>
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -0,0 +1,225 @@
|
|||||||
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { act, render, screen, waitFor } from "@testing-library/react";
|
||||||
|
import userEvent from "@testing-library/user-event";
|
||||||
|
import type { ButtonHTMLAttributes, HTMLAttributes, ReactNode } from "react";
|
||||||
|
|
||||||
|
vi.mock("date-fns", () => ({
|
||||||
|
formatDistanceToNow: (): string => "5 minutes ago",
|
||||||
|
}));
|
||||||
|
|
||||||
|
interface MockButtonProps extends ButtonHTMLAttributes<HTMLButtonElement> {
|
||||||
|
children: ReactNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface MockContainerProps extends HTMLAttributes<HTMLElement> {
|
||||||
|
children: ReactNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface MockEventSourceInstance {
|
||||||
|
url: string;
|
||||||
|
onerror: ((event: Event) => void) | null;
|
||||||
|
onmessage: ((event: MessageEvent<string>) => void) | null;
|
||||||
|
close: ReturnType<typeof vi.fn>;
|
||||||
|
emitMessage: (payload: unknown) => void;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface QueueNotification {
|
||||||
|
id: string;
|
||||||
|
agent: string;
|
||||||
|
filename: string;
|
||||||
|
payload: unknown;
|
||||||
|
createdAt: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
const mockApiGet = vi.fn<(endpoint: string) => Promise<QueueNotification[]>>();
|
||||||
|
const mockApiPost = vi.fn<(endpoint: string) => Promise<{ success: true; id: string }>>();
|
||||||
|
const mockShowToast = vi.fn<(message: string, variant?: string) => void>();
|
||||||
|
|
||||||
|
let mockEventSourceInstances: MockEventSourceInstance[] = [];
|
||||||
|
|
||||||
|
const MockEventSource = vi.fn(function (this: MockEventSourceInstance, url: string): void {
|
||||||
|
this.url = url;
|
||||||
|
this.onerror = null;
|
||||||
|
this.onmessage = null;
|
||||||
|
this.close = vi.fn();
|
||||||
|
this.emitMessage = (payload: unknown): void => {
|
||||||
|
this.onmessage?.(new MessageEvent("message", { data: JSON.stringify(payload) }));
|
||||||
|
};
|
||||||
|
|
||||||
|
mockEventSourceInstances.push(this);
|
||||||
|
});
|
||||||
|
|
||||||
|
vi.mock("@/lib/api/client", () => ({
|
||||||
|
apiGet: (endpoint: string): Promise<QueueNotification[]> => mockApiGet(endpoint),
|
||||||
|
apiPost: (endpoint: string): Promise<{ success: true; id: string }> => mockApiPost(endpoint),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@mosaic/ui", () => ({
|
||||||
|
useToast: (): { showToast: typeof mockShowToast; removeToast: ReturnType<typeof vi.fn> } => ({
|
||||||
|
showToast: mockShowToast,
|
||||||
|
removeToast: vi.fn(),
|
||||||
|
}),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/button", () => ({
|
||||||
|
Button: ({ children, ...props }: MockButtonProps): React.JSX.Element => (
|
||||||
|
<button {...props}>{children}</button>
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/badge", () => ({
|
||||||
|
Badge: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<span {...props}>{children}</span>
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/card", () => ({
|
||||||
|
Card: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<section {...props}>{children}</section>
|
||||||
|
),
|
||||||
|
CardHeader: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<header {...props}>{children}</header>
|
||||||
|
),
|
||||||
|
CardContent: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<div {...props}>{children}</div>
|
||||||
|
),
|
||||||
|
CardTitle: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<h2 {...props}>{children}</h2>
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/collapsible", () => ({
|
||||||
|
Collapsible: ({ children }: MockContainerProps): React.JSX.Element => <div>{children}</div>,
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/scroll-area", () => ({
|
||||||
|
ScrollArea: ({ children, ...props }: MockContainerProps): React.JSX.Element => (
|
||||||
|
<div {...props}>{children}</div>
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mock("@/components/ui/skeleton", () => ({
|
||||||
|
Skeleton: (props: HTMLAttributes<HTMLDivElement>): React.JSX.Element => <div {...props} />,
|
||||||
|
}));
|
||||||
|
|
||||||
|
import { QueueNotificationFeed } from "./QueueNotificationFeed";
|
||||||
|
|
||||||
|
function latestEventSource(): MockEventSourceInstance {
|
||||||
|
const instance = mockEventSourceInstances[mockEventSourceInstances.length - 1];
|
||||||
|
if (!instance) {
|
||||||
|
throw new Error("Expected an EventSource instance");
|
||||||
|
}
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeNotification(overrides: Partial<QueueNotification>): QueueNotification {
|
||||||
|
return {
|
||||||
|
id: "notif-1",
|
||||||
|
agent: "mosaic",
|
||||||
|
filename: "notif-1.json",
|
||||||
|
payload: {
|
||||||
|
taskId: "MS24-WEB-001",
|
||||||
|
eventType: "task.ready",
|
||||||
|
},
|
||||||
|
createdAt: "2026-03-08T23:00:00.000Z",
|
||||||
|
...overrides,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("QueueNotificationFeed", (): void => {
|
||||||
|
beforeEach((): void => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
vi.stubGlobal("EventSource", MockEventSource);
|
||||||
|
vi.stubGlobal("fetch", vi.fn());
|
||||||
|
mockEventSourceInstances = [];
|
||||||
|
mockApiGet.mockResolvedValue([]);
|
||||||
|
mockApiPost.mockResolvedValue({ success: true, id: "notif-1" });
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach((): void => {
|
||||||
|
vi.unstubAllGlobals();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("loads and renders notifications grouped by agent", async (): Promise<void> => {
|
||||||
|
mockApiGet.mockResolvedValue([
|
||||||
|
makeNotification({ id: "notif-1", agent: "mosaic" }),
|
||||||
|
makeNotification({
|
||||||
|
id: "notif-2",
|
||||||
|
agent: "dyor",
|
||||||
|
payload: { taskId: "MS24-WEB-002", eventType: "task.blocked" },
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
|
||||||
|
render(<QueueNotificationFeed />);
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(mockApiGet).toHaveBeenCalledWith("/api/orchestrator/api/queue/notifications");
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(screen.getByText("mosaic")).toBeInTheDocument();
|
||||||
|
expect(screen.getByText("dyor")).toBeInTheDocument();
|
||||||
|
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
|
||||||
|
expect(screen.getByText("task.ready")).toBeInTheDocument();
|
||||||
|
expect(screen.getAllByText("5 minutes ago")).toHaveLength(2);
|
||||||
|
expect(MockEventSource).toHaveBeenCalledWith("/api/orchestrator/queue/notifications/stream");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("acknowledges a notification and removes it from the list", async (): Promise<void> => {
|
||||||
|
const user = userEvent.setup();
|
||||||
|
mockApiGet.mockResolvedValue([makeNotification({ id: "notif-ack" })]);
|
||||||
|
|
||||||
|
render(<QueueNotificationFeed />);
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
|
||||||
|
});
|
||||||
|
|
||||||
|
await user.click(screen.getByRole("button", { name: "ACK notification MS24-WEB-001" }));
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(mockApiPost).toHaveBeenCalledWith(
|
||||||
|
"/api/orchestrator/api/queue/notifications/notif-ack/ack"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(screen.getByText("No pending notifications")).toBeInTheDocument();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("renders the empty state when there are no notifications", async (): Promise<void> => {
|
||||||
|
mockApiGet.mockResolvedValue([]);
|
||||||
|
|
||||||
|
render(<QueueNotificationFeed />);
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(screen.getByText("No pending notifications")).toBeInTheDocument();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("refreshes the list when an SSE message arrives", async (): Promise<void> => {
|
||||||
|
mockApiGet.mockResolvedValue([makeNotification({ id: "notif-before" })]);
|
||||||
|
|
||||||
|
render(<QueueNotificationFeed />);
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(screen.getByText("MS24-WEB-001")).toBeInTheDocument();
|
||||||
|
});
|
||||||
|
|
||||||
|
act(() => {
|
||||||
|
latestEventSource().emitMessage([
|
||||||
|
makeNotification({
|
||||||
|
id: "notif-after",
|
||||||
|
agent: "sage",
|
||||||
|
payload: { taskId: "MS24-WEB-003", eventType: "task.failed" },
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
await waitFor((): void => {
|
||||||
|
expect(screen.getByText("sage")).toBeInTheDocument();
|
||||||
|
expect(screen.getByText("MS24-WEB-003")).toBeInTheDocument();
|
||||||
|
expect(screen.getByText("task.failed")).toBeInTheDocument();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,332 @@
|
|||||||
|
"use client";
|
||||||
|
|
||||||
|
import { useCallback, useEffect, useMemo, useState } from "react";
|
||||||
|
import { formatDistanceToNow } from "date-fns";
|
||||||
|
import { BellOff, ChevronLeft, ChevronRight, Loader2 } from "lucide-react";
|
||||||
|
import { useToast } from "@mosaic/ui";
|
||||||
|
import { Badge } from "@/components/ui/badge";
|
||||||
|
import { Button } from "@/components/ui/button";
|
||||||
|
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
|
||||||
|
import { Collapsible } from "@/components/ui/collapsible";
|
||||||
|
import { ScrollArea } from "@/components/ui/scroll-area";
|
||||||
|
import { Skeleton } from "@/components/ui/skeleton";
|
||||||
|
import { apiGet, apiPost } from "@/lib/api/client";
|
||||||
|
|
||||||
|
export interface QueueNotificationFeedProps {
|
||||||
|
className?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface QueueNotification {
|
||||||
|
id: string;
|
||||||
|
agent: string;
|
||||||
|
filename: string;
|
||||||
|
payload: unknown;
|
||||||
|
createdAt: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface QueuePayloadRecord {
|
||||||
|
taskId?: unknown;
|
||||||
|
type?: unknown;
|
||||||
|
eventType?: unknown;
|
||||||
|
event?: unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface NotificationGroup {
|
||||||
|
agent: string;
|
||||||
|
notifications: QueueNotification[];
|
||||||
|
}
|
||||||
|
|
||||||
|
const NOTIFICATIONS_ENDPOINT = "/api/orchestrator/api/queue/notifications";
|
||||||
|
const NOTIFICATIONS_STREAM_ENDPOINT = "/api/orchestrator/queue/notifications/stream";
|
||||||
|
|
||||||
|
function joinClasses(...classes: (string | undefined)[]): string {
|
||||||
|
return classes.filter((value) => typeof value === "string" && value.length > 0).join(" ");
|
||||||
|
}
|
||||||
|
|
||||||
|
function asPayloadRecord(payload: unknown): QueuePayloadRecord | null {
|
||||||
|
if (payload === null || typeof payload !== "object" || Array.isArray(payload)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return payload as QueuePayloadRecord;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getNotificationTaskId(notification: QueueNotification): string {
|
||||||
|
const payload = asPayloadRecord(notification.payload);
|
||||||
|
return typeof payload?.taskId === "string" && payload.taskId.trim().length > 0
|
||||||
|
? payload.taskId
|
||||||
|
: notification.id;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getNotificationEventType(notification: QueueNotification): string {
|
||||||
|
const payload = asPayloadRecord(notification.payload);
|
||||||
|
const candidates = [payload?.eventType, payload?.type, payload?.event];
|
||||||
|
|
||||||
|
for (const candidate of candidates) {
|
||||||
|
if (typeof candidate === "string" && candidate.trim().length > 0) {
|
||||||
|
return candidate;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return "notification";
|
||||||
|
}
|
||||||
|
|
||||||
|
function formatNotificationAge(createdAt: string): string {
|
||||||
|
const parsedDate = new Date(createdAt);
|
||||||
|
if (Number.isNaN(parsedDate.getTime())) {
|
||||||
|
return "just now";
|
||||||
|
}
|
||||||
|
|
||||||
|
return formatDistanceToNow(parsedDate, { addSuffix: true });
|
||||||
|
}
|
||||||
|
|
||||||
|
function groupNotificationsByAgent(notifications: QueueNotification[]): NotificationGroup[] {
|
||||||
|
const grouped = new Map<string, QueueNotification[]>();
|
||||||
|
|
||||||
|
for (const notification of notifications) {
|
||||||
|
const current = grouped.get(notification.agent) ?? [];
|
||||||
|
current.push(notification);
|
||||||
|
grouped.set(notification.agent, current);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Array.from(grouped.entries())
|
||||||
|
.sort(([leftAgent], [rightAgent]) => leftAgent.localeCompare(rightAgent))
|
||||||
|
.map(([agent, items]) => ({
|
||||||
|
agent,
|
||||||
|
notifications: [...items].sort(
|
||||||
|
(left, right) => new Date(right.createdAt).getTime() - new Date(left.createdAt).getTime()
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
export function QueueNotificationFeed({
|
||||||
|
className,
|
||||||
|
}: QueueNotificationFeedProps): React.JSX.Element {
|
||||||
|
const { showToast } = useToast();
|
||||||
|
const [notifications, setNotifications] = useState<QueueNotification[]>([]);
|
||||||
|
const [isLoading, setIsLoading] = useState(true);
|
||||||
|
const [errorMessage, setErrorMessage] = useState<string | null>(null);
|
||||||
|
const [acknowledgingIds, setAcknowledgingIds] = useState<Record<string, boolean>>({});
|
||||||
|
const [isCollapsed, setIsCollapsed] = useState(false);
|
||||||
|
const [now, setNow] = useState(Date.now());
|
||||||
|
|
||||||
|
const refreshNotifications = useCallback(async (): Promise<void> => {
|
||||||
|
try {
|
||||||
|
const payload = await apiGet<QueueNotification[]>(NOTIFICATIONS_ENDPOINT);
|
||||||
|
setNotifications(payload);
|
||||||
|
setErrorMessage(null);
|
||||||
|
} catch (error) {
|
||||||
|
const message =
|
||||||
|
error instanceof Error && error.message.trim().length > 0
|
||||||
|
? error.message
|
||||||
|
: "Failed to load queue notifications.";
|
||||||
|
setErrorMessage(message);
|
||||||
|
} finally {
|
||||||
|
setIsLoading(false);
|
||||||
|
}
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
void refreshNotifications();
|
||||||
|
}, [refreshNotifications]);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
if (typeof EventSource === "undefined") {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
const source = new EventSource(NOTIFICATIONS_STREAM_ENDPOINT);
|
||||||
|
|
||||||
|
source.onmessage = (event: MessageEvent<string>): void => {
|
||||||
|
try {
|
||||||
|
const payload = JSON.parse(event.data) as QueueNotification[];
|
||||||
|
setNotifications(payload);
|
||||||
|
setErrorMessage(null);
|
||||||
|
setIsLoading(false);
|
||||||
|
} catch {
|
||||||
|
setErrorMessage("Received an invalid notification stream payload.");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
source.onerror = (): void => {
|
||||||
|
setErrorMessage((current) => current ?? "Queue notification stream disconnected.");
|
||||||
|
};
|
||||||
|
|
||||||
|
return (): void => {
|
||||||
|
source.close();
|
||||||
|
};
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
const intervalId = window.setInterval(() => {
|
||||||
|
setNow(Date.now());
|
||||||
|
}, 60_000);
|
||||||
|
|
||||||
|
return (): void => {
|
||||||
|
window.clearInterval(intervalId);
|
||||||
|
};
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
const groupedNotifications = useMemo(
|
||||||
|
() => groupNotificationsByAgent(notifications),
|
||||||
|
[notifications]
|
||||||
|
);
|
||||||
|
|
||||||
|
const pendingCount = notifications.length;
|
||||||
|
|
||||||
|
const handleAck = useCallback(
|
||||||
|
async (notificationId: string): Promise<void> => {
|
||||||
|
setAcknowledgingIds((current) => ({
|
||||||
|
...current,
|
||||||
|
[notificationId]: true,
|
||||||
|
}));
|
||||||
|
|
||||||
|
try {
|
||||||
|
await apiPost<{ success: true; id: string }>(
|
||||||
|
`/api/orchestrator/api/queue/notifications/${encodeURIComponent(notificationId)}/ack`
|
||||||
|
);
|
||||||
|
setNotifications((current) => current.filter((item) => item.id !== notificationId));
|
||||||
|
} catch (error) {
|
||||||
|
const message =
|
||||||
|
error instanceof Error && error.message.trim().length > 0
|
||||||
|
? error.message
|
||||||
|
: "Failed to acknowledge notification.";
|
||||||
|
showToast(message, "error");
|
||||||
|
} finally {
|
||||||
|
setAcknowledgingIds((current) => {
|
||||||
|
const { [notificationId]: _omitted, ...remaining } = current;
|
||||||
|
return remaining;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
[showToast]
|
||||||
|
);
|
||||||
|
|
||||||
|
return (
|
||||||
|
<Card className={joinClasses("flex h-full min-h-0 flex-col", className)}>
|
||||||
|
<CardHeader className="pb-2">
|
||||||
|
<div className="flex items-start justify-between gap-2">
|
||||||
|
<div className="flex min-w-0 items-center gap-2">
|
||||||
|
<CardTitle className="text-base">Queue Notifications</CardTitle>
|
||||||
|
<Badge variant={pendingCount > 0 ? "status-info" : "secondary"}>{pendingCount}</Badge>
|
||||||
|
</div>
|
||||||
|
<Button
|
||||||
|
type="button"
|
||||||
|
variant="ghost"
|
||||||
|
size="icon"
|
||||||
|
className="h-7 w-7"
|
||||||
|
onClick={() => {
|
||||||
|
setIsCollapsed((current) => !current);
|
||||||
|
}}
|
||||||
|
aria-label={isCollapsed ? "Expand queue notifications" : "Collapse queue notifications"}
|
||||||
|
title={isCollapsed ? "Expand queue notifications" : "Collapse queue notifications"}
|
||||||
|
>
|
||||||
|
{isCollapsed ? (
|
||||||
|
<ChevronLeft className="h-4 w-4" aria-hidden="true" />
|
||||||
|
) : (
|
||||||
|
<ChevronRight className="h-4 w-4" aria-hidden="true" />
|
||||||
|
)}
|
||||||
|
</Button>
|
||||||
|
</div>
|
||||||
|
</CardHeader>
|
||||||
|
<Collapsible open={!isCollapsed} className="min-h-0 flex-1">
|
||||||
|
{!isCollapsed ? (
|
||||||
|
<CardContent className="min-h-0 flex-1 px-3 pb-3">
|
||||||
|
{isLoading ? (
|
||||||
|
<ScrollArea className="h-full">
|
||||||
|
<div className="space-y-2 pr-1">
|
||||||
|
{Array.from({ length: 6 }).map((_, index) => (
|
||||||
|
<Skeleton
|
||||||
|
key={`queue-notification-skeleton-${String(index)}`}
|
||||||
|
className="h-14 w-full"
|
||||||
|
/>
|
||||||
|
))}
|
||||||
|
</div>
|
||||||
|
</ScrollArea>
|
||||||
|
) : errorMessage && notifications.length === 0 ? (
|
||||||
|
<div className="flex h-full items-center justify-center px-4 text-center text-sm text-red-500">
|
||||||
|
{errorMessage}
|
||||||
|
</div>
|
||||||
|
) : groupedNotifications.length === 0 ? (
|
||||||
|
<div className="flex h-full flex-col items-center justify-center gap-2 text-center text-sm text-muted-foreground">
|
||||||
|
<BellOff className="h-5 w-5" aria-hidden="true" />
|
||||||
|
<span>No pending notifications</span>
|
||||||
|
</div>
|
||||||
|
) : (
|
||||||
|
<ScrollArea className="h-full">
|
||||||
|
<div className="space-y-4 pr-1">
|
||||||
|
{groupedNotifications.map((group) => (
|
||||||
|
<section key={group.agent} className="space-y-2">
|
||||||
|
<div className="flex items-center justify-between gap-2">
|
||||||
|
<h3 className="text-sm font-semibold text-foreground">{group.agent}</h3>
|
||||||
|
<span className="text-xs text-muted-foreground">
|
||||||
|
{group.notifications.length}
|
||||||
|
</span>
|
||||||
|
</div>
|
||||||
|
<div className="space-y-2">
|
||||||
|
{group.notifications.map((notification) => {
|
||||||
|
const taskId = getNotificationTaskId(notification);
|
||||||
|
const eventType = getNotificationEventType(notification);
|
||||||
|
const isAcknowledging = acknowledgingIds[notification.id] ?? false;
|
||||||
|
|
||||||
|
return (
|
||||||
|
<article
|
||||||
|
key={notification.id}
|
||||||
|
className="rounded-lg border border-border/70 bg-card px-3 py-2"
|
||||||
|
>
|
||||||
|
<div className="flex items-start justify-between gap-3">
|
||||||
|
<div className="min-w-0 space-y-1">
|
||||||
|
<div className="flex flex-wrap items-center gap-2">
|
||||||
|
<span className="font-mono text-xs text-foreground">
|
||||||
|
{taskId}
|
||||||
|
</span>
|
||||||
|
<Badge variant="secondary">{eventType}</Badge>
|
||||||
|
</div>
|
||||||
|
<time
|
||||||
|
className="block text-xs text-muted-foreground"
|
||||||
|
dateTime={notification.createdAt}
|
||||||
|
>
|
||||||
|
{formatNotificationAge(notification.createdAt)}
|
||||||
|
</time>
|
||||||
|
</div>
|
||||||
|
<Button
|
||||||
|
type="button"
|
||||||
|
variant="outline"
|
||||||
|
size="sm"
|
||||||
|
disabled={isAcknowledging}
|
||||||
|
onClick={() => {
|
||||||
|
void handleAck(notification.id);
|
||||||
|
}}
|
||||||
|
aria-label={`ACK notification ${taskId}`}
|
||||||
|
>
|
||||||
|
{isAcknowledging ? (
|
||||||
|
<span className="flex items-center gap-2">
|
||||||
|
<Loader2
|
||||||
|
className="h-4 w-4 animate-spin"
|
||||||
|
aria-hidden="true"
|
||||||
|
/>
|
||||||
|
ACK
|
||||||
|
</span>
|
||||||
|
) : (
|
||||||
|
"ACK"
|
||||||
|
)}
|
||||||
|
</Button>
|
||||||
|
</div>
|
||||||
|
</article>
|
||||||
|
);
|
||||||
|
})}
|
||||||
|
</div>
|
||||||
|
</section>
|
||||||
|
))}
|
||||||
|
</div>
|
||||||
|
</ScrollArea>
|
||||||
|
)}
|
||||||
|
</CardContent>
|
||||||
|
) : null}
|
||||||
|
</Collapsible>
|
||||||
|
<span className="sr-only" aria-live="polite">
|
||||||
|
{pendingCount} pending notifications, refreshed at {new Date(now).toISOString()}
|
||||||
|
</span>
|
||||||
|
</Card>
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -385,6 +385,9 @@ services:
|
|||||||
NEXT_PUBLIC_APP_URL: ${NEXT_PUBLIC_APP_URL:-http://localhost:3000}
|
NEXT_PUBLIC_APP_URL: ${NEXT_PUBLIC_APP_URL:-http://localhost:3000}
|
||||||
NEXT_PUBLIC_API_URL: ${NEXT_PUBLIC_API_URL:-http://localhost:3001}
|
NEXT_PUBLIC_API_URL: ${NEXT_PUBLIC_API_URL:-http://localhost:3001}
|
||||||
TRUSTED_ORIGINS: ${TRUSTED_ORIGINS:-}
|
TRUSTED_ORIGINS: ${TRUSTED_ORIGINS:-}
|
||||||
|
GITEA_WEBHOOK_SECRET: ${GITEA_WEBHOOK_SECRET:-}
|
||||||
|
GITEA_API_TOKEN: ${GITEA_API_TOKEN:-}
|
||||||
|
GATEKEEPER_ENABLED: ${GATEKEEPER_ENABLED:-true}
|
||||||
volumes:
|
volumes:
|
||||||
- openbao_init:/openbao/init:ro
|
- openbao_init:/openbao/init:ro
|
||||||
ports:
|
ports:
|
||||||
|
|||||||
@@ -94,6 +94,21 @@ OIDC_REDIRECT_URI=http://localhost:3001/auth/oauth2/callback/authentik
|
|||||||
|
|
||||||
See [Authentik Setup](2-authentik.md) for complete OIDC configuration.
|
See [Authentik Setup](2-authentik.md) for complete OIDC configuration.
|
||||||
|
|
||||||
|
## Webhooks and Merge Automation
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Gitea webhook validation secret for /api/gatekeeper/webhook/gitea
|
||||||
|
GITEA_WEBHOOK_SECRET=your-random-webhook-secret
|
||||||
|
|
||||||
|
# Personal access token used by Gatekeeper to comment on and merge PRs
|
||||||
|
GITEA_API_TOKEN=your-gitea-api-token
|
||||||
|
|
||||||
|
# Master switch for the Gatekeeper auto-merge workflow
|
||||||
|
GATEKEEPER_ENABLED=true
|
||||||
|
```
|
||||||
|
|
||||||
|
Use a dedicated Gitea token with the minimum repository scope needed to comment on pull requests and perform merges.
|
||||||
|
|
||||||
## Cache and Storage
|
## Cache and Storage
|
||||||
|
|
||||||
### Valkey (Redis-compatible)
|
### Valkey (Redis-compatible)
|
||||||
|
|||||||
@@ -120,7 +120,7 @@ Target version: `v0.0.23`
|
|||||||
### Phase 0 — Backend Core (Foundation)
|
### Phase 0 — Backend Core (Foundation)
|
||||||
|
|
||||||
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
|
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
|
||||||
| ----------- | ----------- | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- |
|
| ----------- | ------ | ------------- | ------------------------------------------------------------------------------------------------ | ----- | ------------ | ---------------------- | ----------------------------------------------- | ----------------------------------------------------------- | ----- | ---------- | ------------ | -------- | ---- | --------------------------------------------- |
|
||||||
| MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD |
|
| MS23-P0-001 | done | p0-foundation | Prisma schema: AgentConversationMessage, AgentSessionTree, AgentProviderConfig, OperatorAuditLog | #693 | api | feat/ms23-p0-schema | — | MS23-P0-002,MS23-P0-003,MS23-P0-004,MS23-P0-005,MS23-P1-001 | codex | 2026-03-06 | 2026-03-06 | 15K | — | taskSource field per mosaic-queue note in PRD |
|
||||||
| MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
|
| MS23-P0-002 | done | p0-foundation | Agent message ingestion: wire spawner/lifecycle to write messages to DB | #693 | orchestrator | feat/ms23-p0-ingestion | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
|
||||||
| MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
|
| MS23-P0-003 | done | p0-foundation | Orchestrator API: GET /agents/:id/messages + SSE stream endpoint | #693 | orchestrator | feat/ms23-p0-stream | MS23-P0-001 | MS23-P0-006 | codex | 2026-03-06 | 2026-03-07 | 20K | — | |
|
||||||
@@ -183,3 +183,29 @@ Target version: `v0.0.23`
|
|||||||
| **Total** | **29** | **~478K** |
|
| **Total** | **29** | **~478K** |
|
||||||
|
|
||||||
Recommended dispatch: Codex for Phase 2 UI + routine API tasks; Sonnet for complex streaming logic (P0-003, P1-005, P3-002).
|
Recommended dispatch: Codex for Phase 2 UI + routine API tasks; Sonnet for complex streaming logic (P0-003, P1-005, P3-002).
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## MS24 — Queue Integration in Mission Control
|
||||||
|
|
||||||
|
PRD: Issue #749
|
||||||
|
Milestone: `0.0.24`
|
||||||
|
Target version: `v0.0.24`
|
||||||
|
|
||||||
|
> Single-writer: orchestrator (Jarvis/OpenClaw) only. Workers read but never modify.
|
||||||
|
|
||||||
|
| id | status | milestone | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
|
||||||
|
| ------------ | ------ | --------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----- | ----- | -------------------- | ------------ | ------------ | ----- | ---------- | ------------ | -------- | ---- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||||
|
| MS24-API-001 | done | p0-api | QueueNotificationsModule: GET /notifications, GET /notifications/stream (SSE), POST /notifications/:id/ack, GET /tasks. Auth: ApiKeyGuard. Unit tests included. | #749 | api | feat/ms24-queue-api | — | MS24-WEB-001 | — | — | — | 25K | — | Guard: ApiKeyGuard from src/common/guards/api-key.guard.ts (COORDINATOR_API_KEY). SSE via raw express Response + chokidar. Fail-soft if inbox dir missing. |
|
||||||
|
| MS24-API-002 | done | p0-api | Woodpecker CI webhook → agent notification: POST /api/webhooks/woodpecker, HMAC-SHA256 verify, scan agent-state active dir, fire mosaic-queue notification to matched agent | #749 | api | feat/ms24-ci-webhook | MS24-API-001 | MS24-VER-001 | codex | 2026-03-08 | — | 15K | — | event: completed/failed. @SkipCsrf, no auth guard. Also adds notify-webhook step to .woodpecker/ci.yml |
|
||||||
|
| MS24-WEB-001 | done | p0-ui | QueueNotificationFeed component + MissionControlLayout wiring (right sidebar panel, badge count) | #749 | web | feat/ms24-queue-ui | MS24-API-001 | MS24-VER-001 | — | — | — | 20K | — | SSE to /api/queue/notifications/stream. ACK button. Collapsible panel in MissionControlLayout.tsx. |
|
||||||
|
| MS24-VER-001 | done | p0-verify | CI green, no regressions, deploy to prod, tag v0.0.24 | #749 | stack | — | MS24-WEB-001 | — | — | — | — | 5K | — | |
|
||||||
|
|
||||||
|
### MS24 Budget Summary
|
||||||
|
|
||||||
|
| Phase | Tasks | Estimate |
|
||||||
|
| --------- | ----- | -------- |
|
||||||
|
| API | 1 | ~25K |
|
||||||
|
| UI | 1 | ~20K |
|
||||||
|
| Verify | 1 | ~5K |
|
||||||
|
| **Total** | **3** | **~50K** |
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ Images are tagged based on branch and event type:
|
|||||||
### Tag Meanings
|
### Tag Meanings
|
||||||
|
|
||||||
| Tag | Purpose | Stability |
|
| Tag | Purpose | Stability |
|
||||||
| -------------------------- | ---------------------------------- | --------- |
|
| -------------------------- | -------------------------------- | --------- |
|
||||||
| `latest` | Current build from `main` | Latest |
|
| `latest` | Current build from `main` | Latest |
|
||||||
| `v*` (e.g., `v1.0.0`) | Versioned release | Immutable |
|
| `v*` (e.g., `v1.0.0`) | Versioned release | Immutable |
|
||||||
| `{sha}` (e.g., `658ec077`) | Specific commit for traceability | Immutable |
|
| `{sha}` (e.g., `658ec077`) | Specific commit for traceability | Immutable |
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
## Objective
|
## Objective
|
||||||
|
|
||||||
Implement rate limiting on all federation endpoints to prevent denial-of-service (DoS) attacks. Federation endpoints currently have no rate limiting, allowing attackers to:
|
Implement rate limiting on all federation endpoints to prevent denial-of-service (DoS) attacks. Federation endpoints currently have no rate limiting, allowing attackers to:
|
||||||
|
|
||||||
- Overwhelm the server with connection requests
|
- Overwhelm the server with connection requests
|
||||||
- Flood token validation endpoints
|
- Flood token validation endpoints
|
||||||
- Exhaust system resources
|
- Exhaust system resources
|
||||||
@@ -12,6 +13,7 @@ Implement rate limiting on all federation endpoints to prevent denial-of-service
|
|||||||
**Severity:** P0 (Critical) - Blocks production deployment
|
**Severity:** P0 (Critical) - Blocks production deployment
|
||||||
**Attack Vector:** Unauthenticated public endpoints allow unlimited requests
|
**Attack Vector:** Unauthenticated public endpoints allow unlimited requests
|
||||||
**Risk:** System can be brought down by flooding requests to:
|
**Risk:** System can be brought down by flooding requests to:
|
||||||
|
|
||||||
1. `POST /api/v1/federation/incoming/connect` (Public, no auth)
|
1. `POST /api/v1/federation/incoming/connect` (Public, no auth)
|
||||||
2. `POST /api/v1/federation/auth/validate` (Public, no auth)
|
2. `POST /api/v1/federation/auth/validate` (Public, no auth)
|
||||||
3. All other endpoints (authenticated, but can be abused)
|
3. All other endpoints (authenticated, but can be abused)
|
||||||
@@ -19,15 +21,19 @@ Implement rate limiting on all federation endpoints to prevent denial-of-service
|
|||||||
## Approach
|
## Approach
|
||||||
|
|
||||||
### 1. Install @nestjs/throttler
|
### 1. Install @nestjs/throttler
|
||||||
|
|
||||||
Use NestJS's official rate limiting package which integrates with the framework's guard system.
|
Use NestJS's official rate limiting package which integrates with the framework's guard system.
|
||||||
|
|
||||||
### 2. Configure Rate Limits
|
### 2. Configure Rate Limits
|
||||||
|
|
||||||
Tiered rate limiting strategy:
|
Tiered rate limiting strategy:
|
||||||
|
|
||||||
- **Public endpoints:** Strict limits (5 req/min per IP)
|
- **Public endpoints:** Strict limits (5 req/min per IP)
|
||||||
- **Authenticated endpoints:** Moderate limits (20 req/min per user)
|
- **Authenticated endpoints:** Moderate limits (20 req/min per user)
|
||||||
- **Admin endpoints:** Higher limits (50 req/min per user)
|
- **Admin endpoints:** Higher limits (50 req/min per user)
|
||||||
|
|
||||||
### 3. Implementation Strategy
|
### 3. Implementation Strategy
|
||||||
|
|
||||||
1. Add `@nestjs/throttler` dependency
|
1. Add `@nestjs/throttler` dependency
|
||||||
2. Configure ThrottlerModule globally
|
2. Configure ThrottlerModule globally
|
||||||
3. Apply custom rate limits per endpoint using decorators
|
3. Apply custom rate limits per endpoint using decorators
|
||||||
@@ -53,6 +59,7 @@ Tiered rate limiting strategy:
|
|||||||
**COMPLETE** - Rate limiting successfully implemented on all federation endpoints.
|
**COMPLETE** - Rate limiting successfully implemented on all federation endpoints.
|
||||||
|
|
||||||
**Security Impact:** MITIGATED
|
**Security Impact:** MITIGATED
|
||||||
|
|
||||||
- DoS vulnerability eliminated via rate limiting
|
- DoS vulnerability eliminated via rate limiting
|
||||||
- Public endpoints protected with strict limits (3 req/sec)
|
- Public endpoints protected with strict limits (3 req/sec)
|
||||||
- Authenticated endpoints have moderate limits (20 req/min)
|
- Authenticated endpoints have moderate limits (20 req/min)
|
||||||
@@ -61,6 +68,7 @@ Tiered rate limiting strategy:
|
|||||||
## Baseline Quality Status
|
## Baseline Quality Status
|
||||||
|
|
||||||
**Pre-existing Technical Debt** (NOT introduced by this fix):
|
**Pre-existing Technical Debt** (NOT introduced by this fix):
|
||||||
|
|
||||||
- 29 TypeScript errors in apps/api (federation + runner-jobs)
|
- 29 TypeScript errors in apps/api (federation + runner-jobs)
|
||||||
- Federation: Missing Prisma schema types (`FederationConnectionStatus`, `Instance`, `federatedIdentity`)
|
- Federation: Missing Prisma schema types (`FederationConnectionStatus`, `Instance`, `federatedIdentity`)
|
||||||
- Runner Jobs: Missing `version` field in schema
|
- Runner Jobs: Missing `version` field in schema
|
||||||
@@ -68,6 +76,7 @@ Tiered rate limiting strategy:
|
|||||||
- **My changes introduced 0 new errors**
|
- **My changes introduced 0 new errors**
|
||||||
|
|
||||||
**Quality Assessment:**
|
**Quality Assessment:**
|
||||||
|
|
||||||
- ✅ Tier 1 (Baseline): No regression (error count unchanged)
|
- ✅ Tier 1 (Baseline): No regression (error count unchanged)
|
||||||
- ✅ Tier 2 (Modified Files): 0 new errors in files I touched
|
- ✅ Tier 2 (Modified Files): 0 new errors in files I touched
|
||||||
- ✅ Tier 3 (New Code): Rate limiting configuration is syntactically correct
|
- ✅ Tier 3 (New Code): Rate limiting configuration is syntactically correct
|
||||||
@@ -75,6 +84,7 @@ Tiered rate limiting strategy:
|
|||||||
## Testing Status
|
## Testing Status
|
||||||
|
|
||||||
**Blocked:** Federation module tests cannot run until Prisma schema is added. Pre-existing error:
|
**Blocked:** Federation module tests cannot run until Prisma schema is added. Pre-existing error:
|
||||||
|
|
||||||
```
|
```
|
||||||
TypeError: Cannot read properties of undefined (reading 'PENDING')
|
TypeError: Cannot read properties of undefined (reading 'PENDING')
|
||||||
FederationConnectionStatus is undefined
|
FederationConnectionStatus is undefined
|
||||||
@@ -83,6 +93,7 @@ FederationConnectionStatus is undefined
|
|||||||
This is NOT caused by my changes - it's pre-existing technical debt from incomplete M7 federation implementation.
|
This is NOT caused by my changes - it's pre-existing technical debt from incomplete M7 federation implementation.
|
||||||
|
|
||||||
**Manual Verification:**
|
**Manual Verification:**
|
||||||
|
|
||||||
- TypeScript compilation: No new errors introduced
|
- TypeScript compilation: No new errors introduced
|
||||||
- Rate limiting decorators: Correctly applied to all endpoints
|
- Rate limiting decorators: Correctly applied to all endpoints
|
||||||
- ThrottlerModule: Properly configured with 3 tiers
|
- ThrottlerModule: Properly configured with 3 tiers
|
||||||
@@ -91,6 +102,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
|
|||||||
## Testing
|
## Testing
|
||||||
|
|
||||||
### Rate Limit Tests
|
### Rate Limit Tests
|
||||||
|
|
||||||
1. Public endpoint exceeds limit → 429 Too Many Requests
|
1. Public endpoint exceeds limit → 429 Too Many Requests
|
||||||
2. Authenticated endpoint exceeds limit → 429 Too Many Requests
|
2. Authenticated endpoint exceeds limit → 429 Too Many Requests
|
||||||
3. Within limits → 200 OK
|
3. Within limits → 200 OK
|
||||||
@@ -99,6 +111,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
|
|||||||
6. Different users have independent limits
|
6. Different users have independent limits
|
||||||
|
|
||||||
### Security Tests
|
### Security Tests
|
||||||
|
|
||||||
1. Cannot bypass rate limit with different user agents
|
1. Cannot bypass rate limit with different user agents
|
||||||
2. Cannot bypass rate limit with different headers
|
2. Cannot bypass rate limit with different headers
|
||||||
3. Rate limit counter resets after time window
|
3. Rate limit counter resets after time window
|
||||||
@@ -107,6 +120,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
|
|||||||
## Federation Endpoints Requiring Rate Limiting
|
## Federation Endpoints Requiring Rate Limiting
|
||||||
|
|
||||||
### FederationController (`/api/v1/federation`)
|
### FederationController (`/api/v1/federation`)
|
||||||
|
|
||||||
- `GET /instance` - Public (5 req/min per IP)
|
- `GET /instance` - Public (5 req/min per IP)
|
||||||
- `POST /instance/regenerate-keys` - Admin (10 req/min per user)
|
- `POST /instance/regenerate-keys` - Admin (10 req/min per user)
|
||||||
- `POST /connections/initiate` - Auth (10 req/min per user)
|
- `POST /connections/initiate` - Auth (10 req/min per user)
|
||||||
@@ -118,6 +132,7 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
|
|||||||
- `POST /incoming/connect` - **Public (3 req/min per IP)** ← CRITICAL
|
- `POST /incoming/connect` - **Public (3 req/min per IP)** ← CRITICAL
|
||||||
|
|
||||||
### FederationAuthController (`/api/v1/federation/auth`)
|
### FederationAuthController (`/api/v1/federation/auth`)
|
||||||
|
|
||||||
- `POST /initiate` - Auth (10 req/min per user)
|
- `POST /initiate` - Auth (10 req/min per user)
|
||||||
- `POST /link` - Auth (5 req/min per user)
|
- `POST /link` - Auth (5 req/min per user)
|
||||||
- `GET /identities` - Auth (30 req/min per user)
|
- `GET /identities` - Auth (30 req/min per user)
|
||||||
@@ -127,18 +142,21 @@ This is NOT caused by my changes - it's pre-existing technical debt from incompl
|
|||||||
## Notes
|
## Notes
|
||||||
|
|
||||||
### Design Decisions
|
### Design Decisions
|
||||||
|
|
||||||
- Use IP-based rate limiting for public endpoints
|
- Use IP-based rate limiting for public endpoints
|
||||||
- Use user-based rate limiting for authenticated endpoints
|
- Use user-based rate limiting for authenticated endpoints
|
||||||
- Store rate limit state in Valkey (Redis-compatible) for scalability
|
- Store rate limit state in Valkey (Redis-compatible) for scalability
|
||||||
- Include rate limit headers in responses (X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset)
|
- Include rate limit headers in responses (X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset)
|
||||||
|
|
||||||
### Attack Vectors Mitigated
|
### Attack Vectors Mitigated
|
||||||
|
|
||||||
1. **Connection Request Flooding:** Attacker sends unlimited connection requests to `/incoming/connect`
|
1. **Connection Request Flooding:** Attacker sends unlimited connection requests to `/incoming/connect`
|
||||||
2. **Token Validation Abuse:** Attacker floods `/auth/validate` to exhaust resources
|
2. **Token Validation Abuse:** Attacker floods `/auth/validate` to exhaust resources
|
||||||
3. **Authenticated User Abuse:** Compromised credentials used to flood authenticated endpoints
|
3. **Authenticated User Abuse:** Compromised credentials used to flood authenticated endpoints
|
||||||
4. **Resource Exhaustion:** Prevents CPU/memory exhaustion from processing excessive requests
|
4. **Resource Exhaustion:** Prevents CPU/memory exhaustion from processing excessive requests
|
||||||
|
|
||||||
### Future Enhancements (Not in Scope)
|
### Future Enhancements (Not in Scope)
|
||||||
|
|
||||||
- Circuit breaker pattern for failing instances
|
- Circuit breaker pattern for failing instances
|
||||||
- Geographic rate limiting
|
- Geographic rate limiting
|
||||||
- Adaptive rate limiting based on system load
|
- Adaptive rate limiting based on system load
|
||||||
|
|||||||
@@ -7,11 +7,13 @@ The initial implementation (commit 6878d57) was high quality but included placeh
|
|||||||
## Security-Critical Issues
|
## Security-Critical Issues
|
||||||
|
|
||||||
### 1. JWT Token Validation (CRITICAL)
|
### 1. JWT Token Validation (CRITICAL)
|
||||||
|
|
||||||
**Problem**: `validateToken()` always returns `valid: false`
|
**Problem**: `validateToken()` always returns `valid: false`
|
||||||
**Risk**: Cannot verify authenticity of federated tokens
|
**Risk**: Cannot verify authenticity of federated tokens
|
||||||
**Solution**: Implement proper JWT validation with signature verification
|
**Solution**: Implement proper JWT validation with signature verification
|
||||||
|
|
||||||
### 2. OIDC Discovery (CRITICAL)
|
### 2. OIDC Discovery (CRITICAL)
|
||||||
|
|
||||||
**Problem**: `generateAuthUrl()` returns hardcoded placeholder URL
|
**Problem**: `generateAuthUrl()` returns hardcoded placeholder URL
|
||||||
**Risk**: Cannot initiate real federated authentication flows
|
**Risk**: Cannot initiate real federated authentication flows
|
||||||
**Solution**: Implement OIDC discovery and proper authorization URL generation
|
**Solution**: Implement OIDC discovery and proper authorization URL generation
|
||||||
@@ -19,9 +21,11 @@ The initial implementation (commit 6878d57) was high quality but included placeh
|
|||||||
## Implementation Plan
|
## Implementation Plan
|
||||||
|
|
||||||
### 1. Add Dependencies
|
### 1. Add Dependencies
|
||||||
|
|
||||||
- [x] Add `jose` library for JWT handling (industry-standard, secure)
|
- [x] Add `jose` library for JWT handling (industry-standard, secure)
|
||||||
|
|
||||||
### 2. Implement JWT Validation
|
### 2. Implement JWT Validation
|
||||||
|
|
||||||
- [ ] Fetch OIDC discovery metadata from issuer
|
- [ ] Fetch OIDC discovery metadata from issuer
|
||||||
- [ ] Cache JWKS (JSON Web Key Set) for performance
|
- [ ] Cache JWKS (JSON Web Key Set) for performance
|
||||||
- [ ] Verify JWT signature using remote public key
|
- [ ] Verify JWT signature using remote public key
|
||||||
@@ -31,6 +35,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
|
|||||||
- [ ] Return proper validation results
|
- [ ] Return proper validation results
|
||||||
|
|
||||||
### 3. Implement OIDC Discovery
|
### 3. Implement OIDC Discovery
|
||||||
|
|
||||||
- [ ] Fetch `.well-known/openid-configuration` from remote instance
|
- [ ] Fetch `.well-known/openid-configuration` from remote instance
|
||||||
- [ ] Cache discovery metadata
|
- [ ] Cache discovery metadata
|
||||||
- [ ] Generate proper OAuth2 authorization URL
|
- [ ] Generate proper OAuth2 authorization URL
|
||||||
@@ -39,6 +44,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
|
|||||||
- [ ] Support standard OIDC scopes (openid, profile, email)
|
- [ ] Support standard OIDC scopes (openid, profile, email)
|
||||||
|
|
||||||
### 4. Update Tests
|
### 4. Update Tests
|
||||||
|
|
||||||
- [ ] Replace mock-based tests with real behavior tests
|
- [ ] Replace mock-based tests with real behavior tests
|
||||||
- [ ] Test valid JWT validation
|
- [ ] Test valid JWT validation
|
||||||
- [ ] Test expired/invalid token rejection
|
- [ ] Test expired/invalid token rejection
|
||||||
@@ -47,6 +53,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
|
|||||||
- [ ] Maintain 85%+ test coverage
|
- [ ] Maintain 85%+ test coverage
|
||||||
|
|
||||||
### 5. Security Considerations
|
### 5. Security Considerations
|
||||||
|
|
||||||
- Cache JWKS to avoid excessive network calls
|
- Cache JWKS to avoid excessive network calls
|
||||||
- Validate token expiration strictly
|
- Validate token expiration strictly
|
||||||
- Use PKCE to prevent authorization code interception
|
- Use PKCE to prevent authorization code interception
|
||||||
@@ -57,6 +64,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
|
|||||||
## Implementation Notes
|
## Implementation Notes
|
||||||
|
|
||||||
**PKCE Flow**:
|
**PKCE Flow**:
|
||||||
|
|
||||||
1. Generate random code_verifier (base64url-encoded random bytes)
|
1. Generate random code_verifier (base64url-encoded random bytes)
|
||||||
2. Generate code_challenge = base64url(SHA256(code_verifier))
|
2. Generate code_challenge = base64url(SHA256(code_verifier))
|
||||||
3. Store code_verifier in session/database
|
3. Store code_verifier in session/database
|
||||||
@@ -64,6 +72,7 @@ The initial implementation (commit 6878d57) was high quality but included placeh
|
|||||||
5. Send code_verifier in token exchange
|
5. Send code_verifier in token exchange
|
||||||
|
|
||||||
**JWT Validation Flow**:
|
**JWT Validation Flow**:
|
||||||
|
|
||||||
1. Parse JWT without verification to get header
|
1. Parse JWT without verification to get header
|
||||||
2. Fetch JWKS from issuer (cache for 1 hour)
|
2. Fetch JWKS from issuer (cache for 1 hour)
|
||||||
3. Find matching key by kid (key ID)
|
3. Find matching key by kid (key ID)
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
|
|||||||
## What Was Implemented
|
## What Was Implemented
|
||||||
|
|
||||||
### Database Schema
|
### Database Schema
|
||||||
|
|
||||||
- **FederationEventSubscription Model**: New table for storing event subscriptions
|
- **FederationEventSubscription Model**: New table for storing event subscriptions
|
||||||
- Fields: id, workspaceId, connectionId, eventType, metadata, isActive, timestamps
|
- Fields: id, workspaceId, connectionId, eventType, metadata, isActive, timestamps
|
||||||
- Unique constraint on (workspaceId, connectionId, eventType)
|
- Unique constraint on (workspaceId, connectionId, eventType)
|
||||||
@@ -21,6 +22,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
|
|||||||
### Core Services
|
### Core Services
|
||||||
|
|
||||||
**EventService** (`event.service.ts`)
|
**EventService** (`event.service.ts`)
|
||||||
|
|
||||||
- `subscribeToEventType()`: Subscribe to events from remote instance
|
- `subscribeToEventType()`: Subscribe to events from remote instance
|
||||||
- `unsubscribeFromEventType()`: Remove event subscription
|
- `unsubscribeFromEventType()`: Remove event subscription
|
||||||
- `publishEvent()`: Publish events to all subscribed connections
|
- `publishEvent()`: Publish events to all subscribed connections
|
||||||
@@ -35,6 +37,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
|
|||||||
**EventController** (`event.controller.ts`)
|
**EventController** (`event.controller.ts`)
|
||||||
|
|
||||||
**Authenticated Endpoints (require AuthGuard):**
|
**Authenticated Endpoints (require AuthGuard):**
|
||||||
|
|
||||||
- `POST /api/v1/federation/events/subscribe` - Subscribe to event type
|
- `POST /api/v1/federation/events/subscribe` - Subscribe to event type
|
||||||
- `POST /api/v1/federation/events/unsubscribe` - Unsubscribe from event type
|
- `POST /api/v1/federation/events/unsubscribe` - Unsubscribe from event type
|
||||||
- `POST /api/v1/federation/events/publish` - Publish event to subscribers
|
- `POST /api/v1/federation/events/publish` - Publish event to subscribers
|
||||||
@@ -43,12 +46,14 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
|
|||||||
- `GET /api/v1/federation/events/messages/:id` - Get single event message
|
- `GET /api/v1/federation/events/messages/:id` - Get single event message
|
||||||
|
|
||||||
**Public Endpoints (signature-verified):**
|
**Public Endpoints (signature-verified):**
|
||||||
|
|
||||||
- `POST /api/v1/federation/incoming/event` - Receive event from remote instance
|
- `POST /api/v1/federation/incoming/event` - Receive event from remote instance
|
||||||
- `POST /api/v1/federation/incoming/event/ack` - Receive event acknowledgment
|
- `POST /api/v1/federation/incoming/event/ack` - Receive event acknowledgment
|
||||||
|
|
||||||
### Type Definitions
|
### Type Definitions
|
||||||
|
|
||||||
**Added to `message.types.ts`:**
|
**Added to `message.types.ts`:**
|
||||||
|
|
||||||
- `EventMessage`: Outgoing event structure
|
- `EventMessage`: Outgoing event structure
|
||||||
- `EventAck`: Event acknowledgment structure
|
- `EventAck`: Event acknowledgment structure
|
||||||
- `EventMessageDetails`: Event message response type
|
- `EventMessageDetails`: Event message response type
|
||||||
@@ -57,6 +62,7 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
|
|||||||
### Data Transfer Objects
|
### Data Transfer Objects
|
||||||
|
|
||||||
**event.dto.ts:**
|
**event.dto.ts:**
|
||||||
|
|
||||||
- `SubscribeToEventDto`: Subscribe request
|
- `SubscribeToEventDto`: Subscribe request
|
||||||
- `UnsubscribeFromEventDto`: Unsubscribe request
|
- `UnsubscribeFromEventDto`: Unsubscribe request
|
||||||
- `PublishEventDto`: Publish event request
|
- `PublishEventDto`: Publish event request
|
||||||
@@ -66,12 +72,14 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
|
|||||||
## Testing
|
## Testing
|
||||||
|
|
||||||
### Test Coverage
|
### Test Coverage
|
||||||
|
|
||||||
- **EventService**: 18 unit tests, **89.09% coverage** ✅
|
- **EventService**: 18 unit tests, **89.09% coverage** ✅
|
||||||
- **EventController**: 11 unit tests, **83.87% coverage** ✅
|
- **EventController**: 11 unit tests, **83.87% coverage** ✅
|
||||||
- **Total**: 29 tests, all passing
|
- **Total**: 29 tests, all passing
|
||||||
- **Coverage**: Exceeds 85% minimum requirement
|
- **Coverage**: Exceeds 85% minimum requirement
|
||||||
|
|
||||||
### Test Scenarios Covered
|
### Test Scenarios Covered
|
||||||
|
|
||||||
- Subscription creation and deletion
|
- Subscription creation and deletion
|
||||||
- Event publishing to multiple subscribers
|
- Event publishing to multiple subscribers
|
||||||
- Failed delivery handling
|
- Failed delivery handling
|
||||||
@@ -84,17 +92,21 @@ Successfully implemented EVENT message type for federation, enabling pub/sub eve
|
|||||||
## Design Patterns
|
## Design Patterns
|
||||||
|
|
||||||
### Consistency with Existing Code
|
### Consistency with Existing Code
|
||||||
|
|
||||||
- Follows patterns from `QueryService` and `CommandService`
|
- Follows patterns from `QueryService` and `CommandService`
|
||||||
- Reuses existing `SignatureService` for message verification
|
- Reuses existing `SignatureService` for message verification
|
||||||
- Reuses existing `FederationService` for instance identity
|
- Reuses existing `FederationService` for instance identity
|
||||||
- Uses existing `FederationMessage` model with new `eventType` field
|
- Uses existing `FederationMessage` model with new `eventType` field
|
||||||
|
|
||||||
### Event Type Naming Convention
|
### Event Type Naming Convention
|
||||||
|
|
||||||
Hierarchical dot-notation:
|
Hierarchical dot-notation:
|
||||||
|
|
||||||
- `entity.action` (e.g., "task.created", "user.updated")
|
- `entity.action` (e.g., "task.created", "user.updated")
|
||||||
- `entity.action.detail` (e.g., "task.status.changed")
|
- `entity.action.detail` (e.g., "task.status.changed")
|
||||||
|
|
||||||
### Security Features
|
### Security Features
|
||||||
|
|
||||||
- All events signature-verified (RSA)
|
- All events signature-verified (RSA)
|
||||||
- Timestamp validation (prevents replay attacks)
|
- Timestamp validation (prevents replay attacks)
|
||||||
- Connection status validation (only active connections)
|
- Connection status validation (only active connections)
|
||||||
@@ -103,14 +115,18 @@ Hierarchical dot-notation:
|
|||||||
## Technical Details
|
## Technical Details
|
||||||
|
|
||||||
### Database Migration
|
### Database Migration
|
||||||
|
|
||||||
File: `20260203_add_federation_event_subscriptions/migration.sql`
|
File: `20260203_add_federation_event_subscriptions/migration.sql`
|
||||||
|
|
||||||
- Adds `eventType` column to `federation_messages`
|
- Adds `eventType` column to `federation_messages`
|
||||||
- Creates `federation_event_subscriptions` table
|
- Creates `federation_event_subscriptions` table
|
||||||
- Adds appropriate indexes for performance
|
- Adds appropriate indexes for performance
|
||||||
- Establishes foreign key relationships
|
- Establishes foreign key relationships
|
||||||
|
|
||||||
### Integration
|
### Integration
|
||||||
|
|
||||||
Updated `federation.module.ts`:
|
Updated `federation.module.ts`:
|
||||||
|
|
||||||
- Added `EventService` to providers
|
- Added `EventService` to providers
|
||||||
- Added `EventController` to controllers
|
- Added `EventController` to controllers
|
||||||
- Exported `EventService` for use by other modules
|
- Exported `EventService` for use by other modules
|
||||||
@@ -126,6 +142,7 @@ Updated `federation.module.ts`:
|
|||||||
## Files Created/Modified
|
## Files Created/Modified
|
||||||
|
|
||||||
### New Files (7)
|
### New Files (7)
|
||||||
|
|
||||||
- `apps/api/src/federation/event.service.ts` (470 lines)
|
- `apps/api/src/federation/event.service.ts` (470 lines)
|
||||||
- `apps/api/src/federation/event.service.spec.ts` (1,088 lines)
|
- `apps/api/src/federation/event.service.spec.ts` (1,088 lines)
|
||||||
- `apps/api/src/federation/event.controller.ts` (199 lines)
|
- `apps/api/src/federation/event.controller.ts` (199 lines)
|
||||||
@@ -135,11 +152,13 @@ Updated `federation.module.ts`:
|
|||||||
- `docs/scratchpads/90-event-subscriptions.md` (185 lines)
|
- `docs/scratchpads/90-event-subscriptions.md` (185 lines)
|
||||||
|
|
||||||
### Modified Files (3)
|
### Modified Files (3)
|
||||||
|
|
||||||
- `apps/api/src/federation/types/message.types.ts` (+118 lines)
|
- `apps/api/src/federation/types/message.types.ts` (+118 lines)
|
||||||
- `apps/api/src/federation/federation.module.ts` (+3 lines)
|
- `apps/api/src/federation/federation.module.ts` (+3 lines)
|
||||||
- `apps/api/prisma/schema.prisma` (+27 lines)
|
- `apps/api/prisma/schema.prisma` (+27 lines)
|
||||||
|
|
||||||
### Total Changes
|
### Total Changes
|
||||||
|
|
||||||
- **2,395 lines added**
|
- **2,395 lines added**
|
||||||
- **5 lines removed**
|
- **5 lines removed**
|
||||||
- **10 files changed**
|
- **10 files changed**
|
||||||
@@ -147,20 +166,25 @@ Updated `federation.module.ts`:
|
|||||||
## Key Features
|
## Key Features
|
||||||
|
|
||||||
### Server-Side Event Filtering
|
### Server-Side Event Filtering
|
||||||
|
|
||||||
Events are only sent to instances with active subscriptions for that event type. This prevents unnecessary network traffic and processing.
|
Events are only sent to instances with active subscriptions for that event type. This prevents unnecessary network traffic and processing.
|
||||||
|
|
||||||
### Acknowledgment Protocol
|
### Acknowledgment Protocol
|
||||||
|
|
||||||
Simple ACK pattern confirms event delivery:
|
Simple ACK pattern confirms event delivery:
|
||||||
|
|
||||||
1. Publisher sends event
|
1. Publisher sends event
|
||||||
2. Receiver processes and returns ACK
|
2. Receiver processes and returns ACK
|
||||||
3. Publisher updates delivery status
|
3. Publisher updates delivery status
|
||||||
|
|
||||||
### Error Handling
|
### Error Handling
|
||||||
|
|
||||||
- Failed deliveries marked as FAILED with error message
|
- Failed deliveries marked as FAILED with error message
|
||||||
- Connection errors logged but don't crash the system
|
- Connection errors logged but don't crash the system
|
||||||
- Invalid signatures rejected immediately
|
- Invalid signatures rejected immediately
|
||||||
|
|
||||||
### Subscription Management
|
### Subscription Management
|
||||||
|
|
||||||
- Subscriptions persist in database
|
- Subscriptions persist in database
|
||||||
- Can be activated/deactivated without deletion
|
- Can be activated/deactivated without deletion
|
||||||
- Support for metadata (extensibility)
|
- Support for metadata (extensibility)
|
||||||
@@ -168,6 +192,7 @@ Simple ACK pattern confirms event delivery:
|
|||||||
## Future Enhancements (Not Implemented)
|
## Future Enhancements (Not Implemented)
|
||||||
|
|
||||||
These were considered but deferred to future issues:
|
These were considered but deferred to future issues:
|
||||||
|
|
||||||
- Event replay/history
|
- Event replay/history
|
||||||
- Event filtering by payload fields
|
- Event filtering by payload fields
|
||||||
- Webhook support for event delivery
|
- Webhook support for event delivery
|
||||||
@@ -179,11 +204,13 @@ These were considered but deferred to future issues:
|
|||||||
## Performance Considerations
|
## Performance Considerations
|
||||||
|
|
||||||
### Scalability
|
### Scalability
|
||||||
|
|
||||||
- Database indexes on eventType, connectionId, workspaceId
|
- Database indexes on eventType, connectionId, workspaceId
|
||||||
- Efficient queries with proper WHERE clauses
|
- Efficient queries with proper WHERE clauses
|
||||||
- Server-side filtering reduces network overhead
|
- Server-side filtering reduces network overhead
|
||||||
|
|
||||||
### Monitoring
|
### Monitoring
|
||||||
|
|
||||||
- All operations logged with appropriate level
|
- All operations logged with appropriate level
|
||||||
- Failed deliveries tracked in database
|
- Failed deliveries tracked in database
|
||||||
- Delivery timestamps recorded for analytics
|
- Delivery timestamps recorded for analytics
|
||||||
@@ -191,12 +218,14 @@ These were considered but deferred to future issues:
|
|||||||
## Documentation
|
## Documentation
|
||||||
|
|
||||||
### Inline Documentation
|
### Inline Documentation
|
||||||
|
|
||||||
- JSDoc comments on all public methods
|
- JSDoc comments on all public methods
|
||||||
- Clear parameter descriptions
|
- Clear parameter descriptions
|
||||||
- Return type documentation
|
- Return type documentation
|
||||||
- Usage examples in comments
|
- Usage examples in comments
|
||||||
|
|
||||||
### Scratchpad Documentation
|
### Scratchpad Documentation
|
||||||
|
|
||||||
- Complete implementation plan
|
- Complete implementation plan
|
||||||
- Design decisions documented
|
- Design decisions documented
|
||||||
- Testing strategy outlined
|
- Testing strategy outlined
|
||||||
@@ -205,6 +234,7 @@ These were considered but deferred to future issues:
|
|||||||
## Integration Testing Recommendations
|
## Integration Testing Recommendations
|
||||||
|
|
||||||
While unit tests are comprehensive, recommend integration testing:
|
While unit tests are comprehensive, recommend integration testing:
|
||||||
|
|
||||||
1. Set up two federated instances
|
1. Set up two federated instances
|
||||||
2. Subscribe from Instance A to Instance B events
|
2. Subscribe from Instance A to Instance B events
|
||||||
3. Publish event from Instance B
|
3. Publish event from Instance B
|
||||||
@@ -214,6 +244,7 @@ While unit tests are comprehensive, recommend integration testing:
|
|||||||
## Conclusion
|
## Conclusion
|
||||||
|
|
||||||
FED-007 (EVENT Subscriptions) is **complete and ready for code review**. The implementation:
|
FED-007 (EVENT Subscriptions) is **complete and ready for code review**. The implementation:
|
||||||
|
|
||||||
- ✅ Follows TDD principles
|
- ✅ Follows TDD principles
|
||||||
- ✅ Meets 85%+ code coverage requirement
|
- ✅ Meets 85%+ code coverage requirement
|
||||||
- ✅ Passes all quality gates (lint, typecheck, tests)
|
- ✅ Passes all quality gates (lint, typecheck, tests)
|
||||||
|
|||||||
40
docs/scratchpads/ms-gate-001-gatekeeper.md
Normal file
40
docs/scratchpads/ms-gate-001-gatekeeper.md
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
# MS-GATE-001 Scratchpad
|
||||||
|
|
||||||
|
## Objective
|
||||||
|
|
||||||
|
Build the API Gatekeeper module for PR auto-merge orchestration using Gitea PR webhooks, Woodpecker CI webhooks, and a `pending_merges` Prisma model.
|
||||||
|
|
||||||
|
## Constraints
|
||||||
|
|
||||||
|
- Work in `/home/jwoltje/src/mosaic-stack-worktrees/gate-001`
|
||||||
|
- Do not merge or deploy
|
||||||
|
- Must pass:
|
||||||
|
- `pnpm format:check`
|
||||||
|
- `SKIP_ENV_VALIDATION=true pnpm turbo typecheck`
|
||||||
|
- `SKIP_ENV_VALIDATION=true pnpm turbo lint`
|
||||||
|
- `SKIP_ENV_VALIDATION=true pnpm turbo test --filter=@mosaic/api -- --testPathPattern="gatekeeper"`
|
||||||
|
|
||||||
|
## ASSUMPTION
|
||||||
|
|
||||||
|
- Woodpecker PR pipelines expose `CI_COMMIT_PULL_REQUEST` and `CI_COMMIT_SHA`, so the webhook notifier can send `prNumber` and `headSha`.
|
||||||
|
- Rationale: Gatekeeper needs an exact PR/head tuple to safely match CI back to `pending_merges`.
|
||||||
|
|
||||||
|
## Plan
|
||||||
|
|
||||||
|
1. Add Prisma model + SQL migration for `pending_merges`
|
||||||
|
2. Add Gatekeeper NestJS module/controller/service/DTO/tests
|
||||||
|
3. Wire Woodpecker webhook -> Gatekeeper CI handler
|
||||||
|
4. Add env/config documentation and compose variables
|
||||||
|
5. Run quality gates, review, remediate, push, open PR
|
||||||
|
|
||||||
|
## Progress
|
||||||
|
|
||||||
|
- [x] Context loaded
|
||||||
|
- [ ] Tests added first
|
||||||
|
- [ ] Implementation complete
|
||||||
|
- [ ] Quality gates green
|
||||||
|
- [ ] Push + PR opened
|
||||||
|
|
||||||
|
## Verification
|
||||||
|
|
||||||
|
- Pending
|
||||||
11
pnpm-lock.yaml
generated
11
pnpm-lock.yaml
generated
@@ -368,8 +368,8 @@ importers:
|
|||||||
specifier: ^7.8.1
|
specifier: ^7.8.1
|
||||||
version: 7.8.2
|
version: 7.8.2
|
||||||
simple-git:
|
simple-git:
|
||||||
specifier: ^3.27.0
|
specifier: ^3.32.3
|
||||||
version: 3.30.0
|
version: 3.33.0
|
||||||
zod:
|
zod:
|
||||||
specifier: ^3.24.1
|
specifier: ^3.24.1
|
||||||
version: 3.25.76
|
version: 3.25.76
|
||||||
@@ -1628,6 +1628,7 @@ packages:
|
|||||||
|
|
||||||
'@mosaicstack/telemetry-client@0.1.1':
|
'@mosaicstack/telemetry-client@0.1.1':
|
||||||
resolution: {integrity: sha512-1udg6p4cs8rhQgQ2pKCfi7EpRlJieRRhA5CIqthRQ6HQZLgQ0wH+632jEulov3rlHSM1iplIQ+AAe5DWrvSkEA==, tarball: https://git.mosaicstack.dev/api/packages/mosaic/npm/%40mosaicstack%2Ftelemetry-client/-/0.1.1/telemetry-client-0.1.1.tgz}
|
resolution: {integrity: sha512-1udg6p4cs8rhQgQ2pKCfi7EpRlJieRRhA5CIqthRQ6HQZLgQ0wH+632jEulov3rlHSM1iplIQ+AAe5DWrvSkEA==, tarball: https://git.mosaicstack.dev/api/packages/mosaic/npm/%40mosaicstack%2Ftelemetry-client/-/0.1.1/telemetry-client-0.1.1.tgz}
|
||||||
|
engines: {node: '>=18'}
|
||||||
|
|
||||||
'@mrleebo/prisma-ast@0.13.1':
|
'@mrleebo/prisma-ast@0.13.1':
|
||||||
resolution: {integrity: sha512-XyroGQXcHrZdvmrGJvsA9KNeOOgGMg1Vg9OlheUsBOSKznLMDl+YChxbkboRHvtFYJEMRYmlV3uoo/njCw05iw==}
|
resolution: {integrity: sha512-XyroGQXcHrZdvmrGJvsA9KNeOOgGMg1Vg9OlheUsBOSKznLMDl+YChxbkboRHvtFYJEMRYmlV3uoo/njCw05iw==}
|
||||||
@@ -6794,8 +6795,8 @@ packages:
|
|||||||
simple-get@4.0.1:
|
simple-get@4.0.1:
|
||||||
resolution: {integrity: sha512-brv7p5WgH0jmQJr1ZDDfKDOSeWWg+OVypG99A/5vYGPqJ6pxiaHLy8nxtFjBA7oMa01ebA9gfh1uMCFqOuXxvA==}
|
resolution: {integrity: sha512-brv7p5WgH0jmQJr1ZDDfKDOSeWWg+OVypG99A/5vYGPqJ6pxiaHLy8nxtFjBA7oMa01ebA9gfh1uMCFqOuXxvA==}
|
||||||
|
|
||||||
simple-git@3.30.0:
|
simple-git@3.33.0:
|
||||||
resolution: {integrity: sha512-q6lxyDsCmEal/MEGhP1aVyQ3oxnagGlBDOVSIB4XUVLl1iZh0Pah6ebC9V4xBap/RfgP2WlI8EKs0WS0rMEJHg==}
|
resolution: {integrity: sha512-D4V/tGC2sjsoNhoMybKyGoE+v8A60hRawKQ1iFRA1zwuDgGZCBJ4ByOzZ5J8joBbi4Oam0qiPH+GhzmSBwbJng==}
|
||||||
|
|
||||||
sisteransi@1.0.5:
|
sisteransi@1.0.5:
|
||||||
resolution: {integrity: sha512-bLGGlR1QxBcynn2d5YmDX4MGjlZvy2MRBDRNHLJ8VI6l6+9FUiyTFNJ0IveOSP0bcXgVDPRcfGqA0pjaqUpfVg==}
|
resolution: {integrity: sha512-bLGGlR1QxBcynn2d5YmDX4MGjlZvy2MRBDRNHLJ8VI6l6+9FUiyTFNJ0IveOSP0bcXgVDPRcfGqA0pjaqUpfVg==}
|
||||||
@@ -14529,7 +14530,7 @@ snapshots:
|
|||||||
once: 1.4.0
|
once: 1.4.0
|
||||||
simple-concat: 1.0.1
|
simple-concat: 1.0.1
|
||||||
|
|
||||||
simple-git@3.30.0:
|
simple-git@3.33.0:
|
||||||
dependencies:
|
dependencies:
|
||||||
'@kwsites/file-exists': 1.1.1
|
'@kwsites/file-exists': 1.1.1
|
||||||
'@kwsites/promise-deferred': 1.1.1
|
'@kwsites/promise-deferred': 1.1.1
|
||||||
|
|||||||
Reference in New Issue
Block a user