Compare commits

..

2 Commits

Author SHA1 Message Date
811a32eb61 fix(web): fix lint/prettier and TypeScript errors in proxy route
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
- Remove unnecessary null-coalesce on request.nextUrl.search
- Fix body typing: use null instead of undefined for no-body requests
- Use conditional spread to satisfy fetch() overload types
- Auto-fix prettier formatting across all changed files
2026-03-08 11:54:11 -05:00
23036cb1dd fix(web): route Mission Control API calls through orchestrator proxy
Some checks failed
ci/woodpecker/push/ci Pipeline failed
The Mission Control components were calling /api/mission-control/* which
routes to mosaic-api.woltje.com (the main API) — those routes don't exist
there, causing 404s.

These routes live on the orchestrator service and must go through the
Next.js server-side proxy (which injects ORCHESTRATOR_API_KEY):

Changes:
- Add catch-all proxy route at /api/orchestrator/[...path]/route.ts
  Forwards any GET/POST/PATCH/PUT/DELETE to ORCHESTRATOR_URL/<path>
  Replaces the need for per-endpoint proxy files for new routes.
- Update all Mission Control components to call
  /api/orchestrator/api/mission-control/* instead of /api/mission-control/*
- Update test expectations to match new paths
2026-03-08 11:45:31 -05:00
8 changed files with 1 additions and 580 deletions

View File

@@ -56,7 +56,6 @@
"bcryptjs": "^3.0.3",
"better-auth": "^1.4.17",
"bullmq": "^5.67.2",
"chokidar": "^4.0.3",
"class-transformer": "^0.5.1",
"class-validator": "^0.14.3",
"cookie-parser": "^1.4.7",

View File

@@ -61,7 +61,6 @@ import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
import { OnboardingModule } from "./onboarding/onboarding.module";
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
import { OrchestratorModule } from "./orchestrator/orchestrator.module";
import { QueueNotificationsModule } from "./queue-notifications/queue-notifications.module";
@Module({
imports: [
@@ -144,7 +143,6 @@ import { QueueNotificationsModule } from "./queue-notifications/queue-notificati
OnboardingModule,
ChatProxyModule,
OrchestratorModule,
QueueNotificationsModule,
],
controllers: [AppController, CsrfController],
providers: [

View File

@@ -1,120 +0,0 @@
import { beforeEach, describe, expect, it, vi } from "vitest";
import { NotFoundException } from "@nestjs/common";
import type { Response } from "express";
import { ConfigService } from "@nestjs/config";
import { Test, type TestingModule } from "@nestjs/testing";
import { QueueNotificationsController } from "./queue-notifications.controller";
import { QueueNotificationsService } from "./queue-notifications.service";
import { ApiKeyGuard } from "../common/guards/api-key.guard";
describe("QueueNotificationsController", () => {
let controller: QueueNotificationsController;
const mockService = {
listNotifications: vi.fn(),
streamNotifications: vi.fn(),
ackNotification: vi.fn(),
listTasks: vi.fn(),
};
const mockConfigService = {
get: vi.fn().mockReturnValue("coordinator-api-key"),
};
beforeEach(async () => {
vi.clearAllMocks();
const module: TestingModule = await Test.createTestingModule({
controllers: [QueueNotificationsController],
providers: [
{ provide: QueueNotificationsService, useValue: mockService },
{ provide: ConfigService, useValue: mockConfigService },
],
})
.overrideGuard(ApiKeyGuard)
.useValue({ canActivate: () => true })
.compile();
controller = module.get<QueueNotificationsController>(QueueNotificationsController);
});
it("returns notification objects", async () => {
mockService.listNotifications.mockResolvedValue([
{
id: "notif-1",
agent: "mosaic",
filename: "notif-1.json",
payload: { type: "task.ready" },
createdAt: new Date("2026-03-08T22:00:00.000Z"),
},
]);
await expect(controller.getNotifications()).resolves.toEqual([
expect.objectContaining({
id: "notif-1",
agent: "mosaic",
filename: "notif-1.json",
payload: { type: "task.ready" },
}),
]);
});
it("streams notifications through the response object", async () => {
const res = {
setHeader: vi.fn(),
flushHeaders: vi.fn(),
write: vi.fn(),
on: vi.fn(),
end: vi.fn(),
} as unknown as Response;
mockService.streamNotifications.mockResolvedValue(undefined);
await controller.streamNotifications(res);
expect(mockService.streamNotifications).toHaveBeenCalledWith(res);
});
it("acks a notification by id", async () => {
mockService.ackNotification.mockResolvedValue({ success: true, id: "notif-2" });
await expect(controller.ackNotification("notif-2")).resolves.toEqual({
success: true,
id: "notif-2",
});
});
it("surfaces ack errors", async () => {
mockService.ackNotification.mockRejectedValue(new NotFoundException("missing"));
await expect(controller.ackNotification("missing")).rejects.toThrow(NotFoundException);
});
it("returns parsed queue tasks", async () => {
mockService.listTasks.mockResolvedValue([
{
id: "task-1",
project: "mosaic-stack",
taskId: "MS24-API-001",
status: "pending",
description: "Build queue notifications module",
},
]);
await expect(controller.getTasks()).resolves.toEqual([
{
id: "task-1",
project: "mosaic-stack",
taskId: "MS24-API-001",
status: "pending",
description: "Build queue notifications module",
},
]);
});
it("uses ApiKeyGuard at the controller level", () => {
const guards = Reflect.getMetadata("__guards__", QueueNotificationsController) as unknown[];
expect(guards).toContain(ApiKeyGuard);
});
});

View File

@@ -1,36 +0,0 @@
import { Controller, Get, Param, Post, Res, UseGuards } from "@nestjs/common";
import type { Response } from "express";
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
import { ApiKeyGuard } from "../common/guards/api-key.guard";
import {
QueueNotificationsService,
type QueueNotification,
type QueueTask,
} from "./queue-notifications.service";
@Controller("queue")
@UseGuards(ApiKeyGuard)
export class QueueNotificationsController {
constructor(private readonly queueNotificationsService: QueueNotificationsService) {}
@Get("notifications")
async getNotifications(): Promise<QueueNotification[]> {
return this.queueNotificationsService.listNotifications();
}
@Get("notifications/stream")
async streamNotifications(@Res() res: Response): Promise<void> {
await this.queueNotificationsService.streamNotifications(res);
}
@SkipCsrf()
@Post("notifications/:id/ack")
async ackNotification(@Param("id") id: string): Promise<{ success: true; id: string }> {
return this.queueNotificationsService.ackNotification(id);
}
@Get("tasks")
async getTasks(): Promise<QueueTask[]> {
return this.queueNotificationsService.listTasks();
}
}

View File

@@ -1,14 +0,0 @@
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { AuthModule } from "../auth/auth.module";
import { ApiKeyGuard } from "../common/guards/api-key.guard";
import { QueueNotificationsController } from "./queue-notifications.controller";
import { QueueNotificationsService } from "./queue-notifications.service";
@Module({
imports: [ConfigModule, AuthModule],
controllers: [QueueNotificationsController],
providers: [QueueNotificationsService, ApiKeyGuard],
exports: [QueueNotificationsService],
})
export class QueueNotificationsModule {}

View File

@@ -1,172 +0,0 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { ConfigService } from "@nestjs/config";
import { Logger, NotFoundException } from "@nestjs/common";
import { mkdtemp, mkdir, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { execFile } from "node:child_process";
import { QueueNotificationsService } from "./queue-notifications.service";
vi.mock("node:child_process", () => ({
execFile: vi.fn(),
}));
describe("QueueNotificationsService", () => {
let service: QueueNotificationsService;
let inboxDir: string;
let configService: ConfigService;
beforeEach(async () => {
vi.clearAllMocks();
inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-"));
configService = {
get: vi.fn((key: string) => {
if (key === "MOSAIC_QUEUE_INBOX_DIR") {
return inboxDir;
}
if (key === "MOSAIC_QUEUE_CLI") {
return "/tmp/mosaic-queue-cli.js";
}
return undefined;
}),
} as unknown as ConfigService;
service = new QueueNotificationsService(configService);
});
afterEach(async () => {
vi.restoreAllMocks();
await rm(inboxDir, { recursive: true, force: true });
});
describe("onModuleInit", () => {
it("logs a warning when the inbox directory does not exist", async () => {
await rm(inboxDir, { recursive: true, force: true });
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
await service.onModuleInit();
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("Queue notifications inbox directory does not exist")
);
});
});
describe("listNotifications", () => {
it("returns parsed notifications from agent inbox directories", async () => {
await mkdir(join(inboxDir, "mosaic"), { recursive: true });
await mkdir(join(inboxDir, "mosaic", "_acked"), { recursive: true });
await mkdir(join(inboxDir, "sage"), { recursive: true });
await writeFile(
join(inboxDir, "mosaic", "notif-1.json"),
JSON.stringify({ type: "task.ready", taskId: "MS24-API-001" })
);
await writeFile(
join(inboxDir, "mosaic", "_acked", "notif-ignored.json"),
JSON.stringify({ ignored: true })
);
await writeFile(join(inboxDir, "sage", "notif-2.json"), JSON.stringify({ type: "done" }));
const notifications = await service.listNotifications();
expect(notifications).toHaveLength(2);
expect(notifications).toEqual(
expect.arrayContaining([
expect.objectContaining({
id: "notif-1",
agent: "mosaic",
filename: "notif-1.json",
payload: { type: "task.ready", taskId: "MS24-API-001" },
}),
expect.objectContaining({
id: "notif-2",
agent: "sage",
filename: "notif-2.json",
payload: { type: "done" },
}),
])
);
});
it("returns an empty array when the inbox directory is missing", async () => {
await rm(inboxDir, { recursive: true, force: true });
await expect(service.listNotifications()).resolves.toEqual([]);
});
});
describe("ackNotification", () => {
it("executes the queue CLI with node and ack args", async () => {
await mkdir(join(inboxDir, "mosaic"), { recursive: true });
await writeFile(join(inboxDir, "mosaic", "notif-3.json"), JSON.stringify({ ok: true }));
vi.mocked(execFile).mockImplementation(
(
_command: string,
_args: readonly string[],
callback: (error: Error | null, stdout: string, stderr: string) => void
) => callback(null, "acked", "")
);
await expect(service.ackNotification("notif-3")).resolves.toEqual({
success: true,
id: "notif-3",
});
expect(execFile).toHaveBeenCalledWith(
"node",
["/tmp/mosaic-queue-cli.js", "ack", "notif-3"],
expect.any(Function)
);
});
it("throws NotFoundException when the notification does not exist", async () => {
await expect(service.ackNotification("missing")).rejects.toThrow(NotFoundException);
expect(execFile).not.toHaveBeenCalled();
});
});
describe("listTasks", () => {
it("parses tab-separated CLI output", async () => {
vi.mocked(execFile).mockImplementation(
(
_command: string,
_args: readonly string[],
callback: (error: Error | null, stdout: string, stderr: string) => void
) =>
callback(
null,
[
"task-1\tmosaic-stack/MS24-API-001\t[pending]\tBuild queue notifications module",
"task-2\tmosaic-stack/MS24-API-002\t[done]\tWrite tests",
].join("\n"),
""
)
);
await expect(service.listTasks()).resolves.toEqual([
{
id: "task-1",
project: "mosaic-stack",
taskId: "MS24-API-001",
status: "pending",
description: "Build queue notifications module",
},
{
id: "task-2",
project: "mosaic-stack",
taskId: "MS24-API-002",
status: "done",
description: "Write tests",
},
]);
expect(execFile).toHaveBeenCalledWith(
"node",
["/tmp/mosaic-queue-cli.js", "list", "mosaic-stack"],
expect.any(Function)
);
});
});
});

View File

@@ -1,231 +0,0 @@
import {
Injectable,
InternalServerErrorException,
Logger,
NotFoundException,
OnModuleInit,
} from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { execFile } from "node:child_process";
import { access, readdir, readFile, stat } from "node:fs/promises";
import { homedir } from "node:os";
import { basename, join } from "node:path";
import type { Response } from "express";
import chokidar from "chokidar";
export interface QueueNotification {
id: string;
agent: string;
filename: string;
payload: unknown;
createdAt: Date;
}
export interface QueueTask {
id: string;
project: string;
taskId: string;
status: string;
description: string;
}
@Injectable()
export class QueueNotificationsService implements OnModuleInit {
private readonly logger = new Logger(QueueNotificationsService.name);
constructor(private readonly configService: ConfigService) {}
async onModuleInit(): Promise<void> {
if (!(await this.inboxDirExists())) {
this.logger.warn(`Queue notifications inbox directory does not exist: ${this.getInboxDir()}`);
}
}
async listNotifications(): Promise<QueueNotification[]> {
const inboxDir = this.getInboxDir();
if (!(await this.inboxDirExists())) {
return [];
}
// Paths come from controlled config plus directory entries under the inbox root.
// eslint-disable-next-line security/detect-non-literal-fs-filename
const agentEntries = await readdir(inboxDir, { withFileTypes: true });
const notifications: QueueNotification[] = [];
for (const agentEntry of agentEntries) {
if (!agentEntry.isDirectory() || this.isIgnoredDirectory(agentEntry.name)) {
continue;
}
const agentDir = join(inboxDir, agentEntry.name);
// eslint-disable-next-line security/detect-non-literal-fs-filename
const files = await readdir(agentDir, { withFileTypes: true });
for (const fileEntry of files) {
if (!fileEntry.isFile() || !fileEntry.name.endsWith(".json")) {
continue;
}
const filePath = join(agentDir, fileEntry.name);
const [rawPayload, fileStats] = await Promise.all([
// eslint-disable-next-line security/detect-non-literal-fs-filename
readFile(filePath, "utf8"),
// eslint-disable-next-line security/detect-non-literal-fs-filename
stat(filePath),
]);
notifications.push({
id: basename(fileEntry.name, ".json"),
agent: agentEntry.name,
filename: fileEntry.name,
payload: JSON.parse(rawPayload) as unknown,
createdAt: fileStats.birthtime,
});
}
}
return notifications.sort(
(left, right) => right.createdAt.getTime() - left.createdAt.getTime()
);
}
async streamNotifications(res: Response): Promise<void> {
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
if (typeof res.flushHeaders === "function") {
res.flushHeaders();
}
const emitNotifications = async (): Promise<void> => {
try {
const notifications = await this.listNotifications();
res.write(`data: ${JSON.stringify(notifications)}\n\n`);
} catch (error: unknown) {
const message = error instanceof Error ? error.message : String(error);
res.write(`event: error\n`);
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
}
};
await emitNotifications();
const watcher = chokidar.watch(this.getInboxDir(), {
ignoreInitial: true,
persistent: true,
ignored: (watchedPath: string) => {
return watchedPath.includes("/_acked/") || watchedPath.includes("/_dead-letter/");
},
});
watcher.on("add", () => {
void emitNotifications();
});
watcher.on("unlink", () => {
void emitNotifications();
});
res.on("close", () => {
void watcher.close();
res.end();
});
}
async ackNotification(id: string): Promise<{ success: true; id: string }> {
const notification = (await this.listNotifications()).find((entry) => entry.id === id);
if (!notification) {
throw new NotFoundException(`Queue notification ${id} not found`);
}
await this.execQueueCli(["ack", notification.id]);
return {
success: true,
id: notification.id,
};
}
async listTasks(): Promise<QueueTask[]> {
const stdout = await this.execQueueCli(["list", "mosaic-stack"]);
return stdout
.split(/\r?\n/)
.map((line) => line.trim())
.filter((line) => line.length > 0)
.map((line) => {
const [rawId = "", projectTaskId = "", rawStatus = "", description = ""] = line.split("\t");
const [project = "", taskId = ""] = projectTaskId.split("/");
return {
id: rawId,
project,
taskId,
status: rawStatus.replace(/^\[/, "").replace(/\]$/, ""),
description,
};
});
}
private async execQueueCli(args: string[]): Promise<string> {
const cliPath = this.getQueueCliPath();
return new Promise<string>((resolve, reject) => {
execFile("node", [cliPath, ...args], (error, stdout, stderr) => {
if (error) {
this.logger.error(
`Queue CLI command failed: node ${cliPath} ${args.join(" ")} | ${stderr || error.message}`
);
reject(
new InternalServerErrorException(`Queue CLI command failed: ${stderr || error.message}`)
);
return;
}
resolve(stdout);
});
});
}
private getInboxDir(): string {
return this.expandHomePath(
this.configService.get<string>("MOSAIC_QUEUE_INBOX_DIR") ??
"~/.openclaw/workspace/agent-inbox"
);
}
private getQueueCliPath(): string {
return this.expandHomePath(
this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js"
);
}
private expandHomePath(value: string): string {
if (value === "~") {
return homedir();
}
if (value.startsWith("~/")) {
return join(homedir(), value.slice(2));
}
return value;
}
private async inboxDirExists(): Promise<boolean> {
try {
await access(this.getInboxDir());
return true;
} catch {
return false;
}
}
private isIgnoredDirectory(name: string): boolean {
return name === "_acked" || name === "_dead-letter";
}
}

5
pnpm-lock.yaml generated
View File

@@ -162,9 +162,6 @@ importers:
bullmq:
specifier: ^5.67.2
version: 5.67.2
chokidar:
specifier: ^4.0.3
version: 4.0.3
class-transformer:
specifier: ^0.5.1
version: 0.5.1
@@ -1628,6 +1625,7 @@ packages:
'@mosaicstack/telemetry-client@0.1.1':
resolution: {integrity: sha512-1udg6p4cs8rhQgQ2pKCfi7EpRlJieRRhA5CIqthRQ6HQZLgQ0wH+632jEulov3rlHSM1iplIQ+AAe5DWrvSkEA==, tarball: https://git.mosaicstack.dev/api/packages/mosaic/npm/%40mosaicstack%2Ftelemetry-client/-/0.1.1/telemetry-client-0.1.1.tgz}
engines: {node: '>=18'}
'@mrleebo/prisma-ast@0.13.1':
resolution: {integrity: sha512-XyroGQXcHrZdvmrGJvsA9KNeOOgGMg1Vg9OlheUsBOSKznLMDl+YChxbkboRHvtFYJEMRYmlV3uoo/njCw05iw==}
@@ -5178,7 +5176,6 @@ packages:
glob@10.5.0:
resolution: {integrity: sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg==}
deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
hasBin: true
glob@13.0.0: