Merge pull request 'feat(api): MS24 queue notifications module' (#750) from feat/ms24-queue-api into main
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
This commit was merged in pull request #750.
This commit is contained in:
@@ -56,6 +56,7 @@
|
||||
"bcryptjs": "^3.0.3",
|
||||
"better-auth": "^1.4.17",
|
||||
"bullmq": "^5.67.2",
|
||||
"chokidar": "^4.0.3",
|
||||
"class-transformer": "^0.5.1",
|
||||
"class-validator": "^0.14.3",
|
||||
"cookie-parser": "^1.4.7",
|
||||
|
||||
@@ -62,6 +62,7 @@ import { OnboardingModule } from "./onboarding/onboarding.module";
|
||||
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
|
||||
import { MissionControlProxyModule } from "./mission-control-proxy/mission-control-proxy.module";
|
||||
import { OrchestratorModule } from "./orchestrator/orchestrator.module";
|
||||
import { QueueNotificationsModule } from "./queue-notifications/queue-notifications.module";
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@@ -145,6 +146,7 @@ import { OrchestratorModule } from "./orchestrator/orchestrator.module";
|
||||
ChatProxyModule,
|
||||
MissionControlProxyModule,
|
||||
OrchestratorModule,
|
||||
QueueNotificationsModule,
|
||||
],
|
||||
controllers: [AppController, CsrfController],
|
||||
providers: [
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { NotFoundException } from "@nestjs/common";
|
||||
import type { Response } from "express";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import { Test, type TestingModule } from "@nestjs/testing";
|
||||
import { QueueNotificationsController } from "./queue-notifications.controller";
|
||||
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
||||
|
||||
describe("QueueNotificationsController", () => {
|
||||
let controller: QueueNotificationsController;
|
||||
|
||||
const mockService = {
|
||||
listNotifications: vi.fn(),
|
||||
streamNotifications: vi.fn(),
|
||||
ackNotification: vi.fn(),
|
||||
listTasks: vi.fn(),
|
||||
};
|
||||
|
||||
const mockConfigService = {
|
||||
get: vi.fn().mockReturnValue("coordinator-api-key"),
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
vi.clearAllMocks();
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
controllers: [QueueNotificationsController],
|
||||
providers: [
|
||||
{ provide: QueueNotificationsService, useValue: mockService },
|
||||
{ provide: ConfigService, useValue: mockConfigService },
|
||||
],
|
||||
})
|
||||
.overrideGuard(ApiKeyGuard)
|
||||
.useValue({ canActivate: () => true })
|
||||
.compile();
|
||||
|
||||
controller = module.get<QueueNotificationsController>(QueueNotificationsController);
|
||||
});
|
||||
|
||||
it("returns notification objects", async () => {
|
||||
mockService.listNotifications.mockResolvedValue([
|
||||
{
|
||||
id: "notif-1",
|
||||
agent: "mosaic",
|
||||
filename: "notif-1.json",
|
||||
payload: { type: "task.ready" },
|
||||
createdAt: new Date("2026-03-08T22:00:00.000Z"),
|
||||
},
|
||||
]);
|
||||
|
||||
await expect(controller.getNotifications()).resolves.toEqual([
|
||||
expect.objectContaining({
|
||||
id: "notif-1",
|
||||
agent: "mosaic",
|
||||
filename: "notif-1.json",
|
||||
payload: { type: "task.ready" },
|
||||
}),
|
||||
]);
|
||||
});
|
||||
|
||||
it("streams notifications through the response object", async () => {
|
||||
const res = {
|
||||
setHeader: vi.fn(),
|
||||
flushHeaders: vi.fn(),
|
||||
write: vi.fn(),
|
||||
on: vi.fn(),
|
||||
end: vi.fn(),
|
||||
} as unknown as Response;
|
||||
|
||||
mockService.streamNotifications.mockResolvedValue(undefined);
|
||||
|
||||
await controller.streamNotifications(res);
|
||||
|
||||
expect(mockService.streamNotifications).toHaveBeenCalledWith(res);
|
||||
});
|
||||
|
||||
it("acks a notification by id", async () => {
|
||||
mockService.ackNotification.mockResolvedValue({ success: true, id: "notif-2" });
|
||||
|
||||
await expect(controller.ackNotification("notif-2")).resolves.toEqual({
|
||||
success: true,
|
||||
id: "notif-2",
|
||||
});
|
||||
});
|
||||
|
||||
it("surfaces ack errors", async () => {
|
||||
mockService.ackNotification.mockRejectedValue(new NotFoundException("missing"));
|
||||
|
||||
await expect(controller.ackNotification("missing")).rejects.toThrow(NotFoundException);
|
||||
});
|
||||
|
||||
it("returns parsed queue tasks", async () => {
|
||||
mockService.listTasks.mockResolvedValue([
|
||||
{
|
||||
id: "task-1",
|
||||
project: "mosaic-stack",
|
||||
taskId: "MS24-API-001",
|
||||
status: "pending",
|
||||
description: "Build queue notifications module",
|
||||
},
|
||||
]);
|
||||
|
||||
await expect(controller.getTasks()).resolves.toEqual([
|
||||
{
|
||||
id: "task-1",
|
||||
project: "mosaic-stack",
|
||||
taskId: "MS24-API-001",
|
||||
status: "pending",
|
||||
description: "Build queue notifications module",
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("uses ApiKeyGuard at the controller level", () => {
|
||||
const guards = Reflect.getMetadata("__guards__", QueueNotificationsController) as unknown[];
|
||||
|
||||
expect(guards).toContain(ApiKeyGuard);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,36 @@
|
||||
import { Controller, Get, Param, Post, Res, UseGuards } from "@nestjs/common";
|
||||
import type { Response } from "express";
|
||||
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
|
||||
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
||||
import {
|
||||
QueueNotificationsService,
|
||||
type QueueNotification,
|
||||
type QueueTask,
|
||||
} from "./queue-notifications.service";
|
||||
|
||||
@Controller("queue")
|
||||
@UseGuards(ApiKeyGuard)
|
||||
export class QueueNotificationsController {
|
||||
constructor(private readonly queueNotificationsService: QueueNotificationsService) {}
|
||||
|
||||
@Get("notifications")
|
||||
async getNotifications(): Promise<QueueNotification[]> {
|
||||
return this.queueNotificationsService.listNotifications();
|
||||
}
|
||||
|
||||
@Get("notifications/stream")
|
||||
async streamNotifications(@Res() res: Response): Promise<void> {
|
||||
await this.queueNotificationsService.streamNotifications(res);
|
||||
}
|
||||
|
||||
@SkipCsrf()
|
||||
@Post("notifications/:id/ack")
|
||||
async ackNotification(@Param("id") id: string): Promise<{ success: true; id: string }> {
|
||||
return this.queueNotificationsService.ackNotification(id);
|
||||
}
|
||||
|
||||
@Get("tasks")
|
||||
async getTasks(): Promise<QueueTask[]> {
|
||||
return this.queueNotificationsService.listTasks();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { ConfigModule } from "@nestjs/config";
|
||||
import { AuthModule } from "../auth/auth.module";
|
||||
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
||||
import { QueueNotificationsController } from "./queue-notifications.controller";
|
||||
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||
|
||||
@Module({
|
||||
imports: [ConfigModule, AuthModule],
|
||||
controllers: [QueueNotificationsController],
|
||||
providers: [QueueNotificationsService, ApiKeyGuard],
|
||||
exports: [QueueNotificationsService],
|
||||
})
|
||||
export class QueueNotificationsModule {}
|
||||
@@ -0,0 +1,172 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import { Logger, NotFoundException } from "@nestjs/common";
|
||||
import { mkdtemp, mkdir, rm, writeFile } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { execFile } from "node:child_process";
|
||||
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||
|
||||
vi.mock("node:child_process", () => ({
|
||||
execFile: vi.fn(),
|
||||
}));
|
||||
|
||||
describe("QueueNotificationsService", () => {
|
||||
let service: QueueNotificationsService;
|
||||
let inboxDir: string;
|
||||
let configService: ConfigService;
|
||||
|
||||
beforeEach(async () => {
|
||||
vi.clearAllMocks();
|
||||
inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-"));
|
||||
configService = {
|
||||
get: vi.fn((key: string) => {
|
||||
if (key === "MOSAIC_QUEUE_INBOX_DIR") {
|
||||
return inboxDir;
|
||||
}
|
||||
|
||||
if (key === "MOSAIC_QUEUE_CLI") {
|
||||
return "/tmp/mosaic-queue-cli.js";
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}),
|
||||
} as unknown as ConfigService;
|
||||
|
||||
service = new QueueNotificationsService(configService);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
vi.restoreAllMocks();
|
||||
await rm(inboxDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
describe("onModuleInit", () => {
|
||||
it("logs a warning when the inbox directory does not exist", async () => {
|
||||
await rm(inboxDir, { recursive: true, force: true });
|
||||
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||
|
||||
await service.onModuleInit();
|
||||
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Queue notifications inbox directory does not exist")
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("listNotifications", () => {
|
||||
it("returns parsed notifications from agent inbox directories", async () => {
|
||||
await mkdir(join(inboxDir, "mosaic"), { recursive: true });
|
||||
await mkdir(join(inboxDir, "mosaic", "_acked"), { recursive: true });
|
||||
await mkdir(join(inboxDir, "sage"), { recursive: true });
|
||||
await writeFile(
|
||||
join(inboxDir, "mosaic", "notif-1.json"),
|
||||
JSON.stringify({ type: "task.ready", taskId: "MS24-API-001" })
|
||||
);
|
||||
await writeFile(
|
||||
join(inboxDir, "mosaic", "_acked", "notif-ignored.json"),
|
||||
JSON.stringify({ ignored: true })
|
||||
);
|
||||
await writeFile(join(inboxDir, "sage", "notif-2.json"), JSON.stringify({ type: "done" }));
|
||||
|
||||
const notifications = await service.listNotifications();
|
||||
|
||||
expect(notifications).toHaveLength(2);
|
||||
expect(notifications).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
id: "notif-1",
|
||||
agent: "mosaic",
|
||||
filename: "notif-1.json",
|
||||
payload: { type: "task.ready", taskId: "MS24-API-001" },
|
||||
}),
|
||||
expect.objectContaining({
|
||||
id: "notif-2",
|
||||
agent: "sage",
|
||||
filename: "notif-2.json",
|
||||
payload: { type: "done" },
|
||||
}),
|
||||
])
|
||||
);
|
||||
});
|
||||
|
||||
it("returns an empty array when the inbox directory is missing", async () => {
|
||||
await rm(inboxDir, { recursive: true, force: true });
|
||||
|
||||
await expect(service.listNotifications()).resolves.toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ackNotification", () => {
|
||||
it("executes the queue CLI with node and ack args", async () => {
|
||||
await mkdir(join(inboxDir, "mosaic"), { recursive: true });
|
||||
await writeFile(join(inboxDir, "mosaic", "notif-3.json"), JSON.stringify({ ok: true }));
|
||||
vi.mocked(execFile).mockImplementation(
|
||||
(
|
||||
_command: string,
|
||||
_args: readonly string[],
|
||||
callback: (error: Error | null, stdout: string, stderr: string) => void
|
||||
) => callback(null, "acked", "")
|
||||
);
|
||||
|
||||
await expect(service.ackNotification("notif-3")).resolves.toEqual({
|
||||
success: true,
|
||||
id: "notif-3",
|
||||
});
|
||||
|
||||
expect(execFile).toHaveBeenCalledWith(
|
||||
"node",
|
||||
["/tmp/mosaic-queue-cli.js", "ack", "notif-3"],
|
||||
expect.any(Function)
|
||||
);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when the notification does not exist", async () => {
|
||||
await expect(service.ackNotification("missing")).rejects.toThrow(NotFoundException);
|
||||
expect(execFile).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("listTasks", () => {
|
||||
it("parses tab-separated CLI output", async () => {
|
||||
vi.mocked(execFile).mockImplementation(
|
||||
(
|
||||
_command: string,
|
||||
_args: readonly string[],
|
||||
callback: (error: Error | null, stdout: string, stderr: string) => void
|
||||
) =>
|
||||
callback(
|
||||
null,
|
||||
[
|
||||
"task-1\tmosaic-stack/MS24-API-001\t[pending]\tBuild queue notifications module",
|
||||
"task-2\tmosaic-stack/MS24-API-002\t[done]\tWrite tests",
|
||||
].join("\n"),
|
||||
""
|
||||
)
|
||||
);
|
||||
|
||||
await expect(service.listTasks()).resolves.toEqual([
|
||||
{
|
||||
id: "task-1",
|
||||
project: "mosaic-stack",
|
||||
taskId: "MS24-API-001",
|
||||
status: "pending",
|
||||
description: "Build queue notifications module",
|
||||
},
|
||||
{
|
||||
id: "task-2",
|
||||
project: "mosaic-stack",
|
||||
taskId: "MS24-API-002",
|
||||
status: "done",
|
||||
description: "Write tests",
|
||||
},
|
||||
]);
|
||||
|
||||
expect(execFile).toHaveBeenCalledWith(
|
||||
"node",
|
||||
["/tmp/mosaic-queue-cli.js", "list", "mosaic-stack"],
|
||||
expect.any(Function)
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
231
apps/api/src/queue-notifications/queue-notifications.service.ts
Normal file
231
apps/api/src/queue-notifications/queue-notifications.service.ts
Normal file
@@ -0,0 +1,231 @@
|
||||
import {
|
||||
Injectable,
|
||||
InternalServerErrorException,
|
||||
Logger,
|
||||
NotFoundException,
|
||||
OnModuleInit,
|
||||
} from "@nestjs/common";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import { execFile } from "node:child_process";
|
||||
import { access, readdir, readFile, stat } from "node:fs/promises";
|
||||
import { homedir } from "node:os";
|
||||
import { basename, join } from "node:path";
|
||||
import type { Response } from "express";
|
||||
import chokidar from "chokidar";
|
||||
|
||||
export interface QueueNotification {
|
||||
id: string;
|
||||
agent: string;
|
||||
filename: string;
|
||||
payload: unknown;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
export interface QueueTask {
|
||||
id: string;
|
||||
project: string;
|
||||
taskId: string;
|
||||
status: string;
|
||||
description: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class QueueNotificationsService implements OnModuleInit {
|
||||
private readonly logger = new Logger(QueueNotificationsService.name);
|
||||
|
||||
constructor(private readonly configService: ConfigService) {}
|
||||
|
||||
async onModuleInit(): Promise<void> {
|
||||
if (!(await this.inboxDirExists())) {
|
||||
this.logger.warn(`Queue notifications inbox directory does not exist: ${this.getInboxDir()}`);
|
||||
}
|
||||
}
|
||||
|
||||
async listNotifications(): Promise<QueueNotification[]> {
|
||||
const inboxDir = this.getInboxDir();
|
||||
|
||||
if (!(await this.inboxDirExists())) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Paths come from controlled config plus directory entries under the inbox root.
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
const agentEntries = await readdir(inboxDir, { withFileTypes: true });
|
||||
const notifications: QueueNotification[] = [];
|
||||
|
||||
for (const agentEntry of agentEntries) {
|
||||
if (!agentEntry.isDirectory() || this.isIgnoredDirectory(agentEntry.name)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const agentDir = join(inboxDir, agentEntry.name);
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
const files = await readdir(agentDir, { withFileTypes: true });
|
||||
|
||||
for (const fileEntry of files) {
|
||||
if (!fileEntry.isFile() || !fileEntry.name.endsWith(".json")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const filePath = join(agentDir, fileEntry.name);
|
||||
const [rawPayload, fileStats] = await Promise.all([
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
readFile(filePath, "utf8"),
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
stat(filePath),
|
||||
]);
|
||||
|
||||
notifications.push({
|
||||
id: basename(fileEntry.name, ".json"),
|
||||
agent: agentEntry.name,
|
||||
filename: fileEntry.name,
|
||||
payload: JSON.parse(rawPayload) as unknown,
|
||||
createdAt: fileStats.birthtime,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return notifications.sort(
|
||||
(left, right) => right.createdAt.getTime() - left.createdAt.getTime()
|
||||
);
|
||||
}
|
||||
|
||||
async streamNotifications(res: Response): Promise<void> {
|
||||
res.setHeader("Content-Type", "text/event-stream");
|
||||
res.setHeader("Cache-Control", "no-cache");
|
||||
res.setHeader("Connection", "keep-alive");
|
||||
res.setHeader("X-Accel-Buffering", "no");
|
||||
|
||||
if (typeof res.flushHeaders === "function") {
|
||||
res.flushHeaders();
|
||||
}
|
||||
|
||||
const emitNotifications = async (): Promise<void> => {
|
||||
try {
|
||||
const notifications = await this.listNotifications();
|
||||
res.write(`data: ${JSON.stringify(notifications)}\n\n`);
|
||||
} catch (error: unknown) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
res.write(`event: error\n`);
|
||||
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
|
||||
}
|
||||
};
|
||||
|
||||
await emitNotifications();
|
||||
|
||||
const watcher = chokidar.watch(this.getInboxDir(), {
|
||||
ignoreInitial: true,
|
||||
persistent: true,
|
||||
ignored: (watchedPath: string) => {
|
||||
return watchedPath.includes("/_acked/") || watchedPath.includes("/_dead-letter/");
|
||||
},
|
||||
});
|
||||
|
||||
watcher.on("add", () => {
|
||||
void emitNotifications();
|
||||
});
|
||||
|
||||
watcher.on("unlink", () => {
|
||||
void emitNotifications();
|
||||
});
|
||||
|
||||
res.on("close", () => {
|
||||
void watcher.close();
|
||||
res.end();
|
||||
});
|
||||
}
|
||||
|
||||
async ackNotification(id: string): Promise<{ success: true; id: string }> {
|
||||
const notification = (await this.listNotifications()).find((entry) => entry.id === id);
|
||||
|
||||
if (!notification) {
|
||||
throw new NotFoundException(`Queue notification ${id} not found`);
|
||||
}
|
||||
|
||||
await this.execQueueCli(["ack", notification.id]);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
id: notification.id,
|
||||
};
|
||||
}
|
||||
|
||||
async listTasks(): Promise<QueueTask[]> {
|
||||
const stdout = await this.execQueueCli(["list", "mosaic-stack"]);
|
||||
|
||||
return stdout
|
||||
.split(/\r?\n/)
|
||||
.map((line) => line.trim())
|
||||
.filter((line) => line.length > 0)
|
||||
.map((line) => {
|
||||
const [rawId = "", projectTaskId = "", rawStatus = "", description = ""] = line.split("\t");
|
||||
const [project = "", taskId = ""] = projectTaskId.split("/");
|
||||
|
||||
return {
|
||||
id: rawId,
|
||||
project,
|
||||
taskId,
|
||||
status: rawStatus.replace(/^\[/, "").replace(/\]$/, ""),
|
||||
description,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
private async execQueueCli(args: string[]): Promise<string> {
|
||||
const cliPath = this.getQueueCliPath();
|
||||
|
||||
return new Promise<string>((resolve, reject) => {
|
||||
execFile("node", [cliPath, ...args], (error, stdout, stderr) => {
|
||||
if (error) {
|
||||
this.logger.error(
|
||||
`Queue CLI command failed: node ${cliPath} ${args.join(" ")} | ${stderr || error.message}`
|
||||
);
|
||||
reject(
|
||||
new InternalServerErrorException(`Queue CLI command failed: ${stderr || error.message}`)
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
resolve(stdout);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private getInboxDir(): string {
|
||||
return this.expandHomePath(
|
||||
this.configService.get<string>("MOSAIC_QUEUE_INBOX_DIR") ??
|
||||
"~/.openclaw/workspace/agent-inbox"
|
||||
);
|
||||
}
|
||||
|
||||
private getQueueCliPath(): string {
|
||||
return this.expandHomePath(
|
||||
this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js"
|
||||
);
|
||||
}
|
||||
|
||||
private expandHomePath(value: string): string {
|
||||
if (value === "~") {
|
||||
return homedir();
|
||||
}
|
||||
|
||||
if (value.startsWith("~/")) {
|
||||
return join(homedir(), value.slice(2));
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
private async inboxDirExists(): Promise<boolean> {
|
||||
try {
|
||||
await access(this.getInboxDir());
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private isIgnoredDirectory(name: string): boolean {
|
||||
return name === "_acked" || name === "_dead-letter";
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user