Compare commits
24 Commits
feat/ms23-
...
feat/ms24-
| Author | SHA1 | Date | |
|---|---|---|---|
| 456d53fc7f | |||
| 06f2cc4be3 | |||
| a6f1438f40 | |||
| 523662656e | |||
| 27120ac3f2 | |||
| ad9921107c | |||
| 3c288f9849 | |||
| 51d6302401 | |||
| cf490510bf | |||
| 3d91334df7 | |||
| e80b624ca6 | |||
| 65536fcb75 | |||
| 53915dc621 | |||
| 398ee06920 | |||
| 2182717f59 | |||
| fe55363f38 | |||
| d60165572a | |||
| ff73fbd391 | |||
| 95ec63a868 | |||
| 2ab736b68b | |||
| 30e0168983 | |||
| 495d78115e | |||
| 54ee5cf945 | |||
| 563d59ad5d |
@@ -56,6 +56,7 @@
|
||||
"bcryptjs": "^3.0.3",
|
||||
"better-auth": "^1.4.17",
|
||||
"bullmq": "^5.67.2",
|
||||
"chokidar": "^4.0.3",
|
||||
"class-transformer": "^0.5.1",
|
||||
"class-validator": "^0.14.3",
|
||||
"cookie-parser": "^1.4.7",
|
||||
|
||||
@@ -61,6 +61,7 @@ import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
|
||||
import { OnboardingModule } from "./onboarding/onboarding.module";
|
||||
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
|
||||
import { OrchestratorModule } from "./orchestrator/orchestrator.module";
|
||||
import { QueueNotificationsModule } from "./queue-notifications/queue-notifications.module";
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@@ -143,6 +144,7 @@ import { OrchestratorModule } from "./orchestrator/orchestrator.module";
|
||||
OnboardingModule,
|
||||
ChatProxyModule,
|
||||
OrchestratorModule,
|
||||
QueueNotificationsModule,
|
||||
],
|
||||
controllers: [AppController, CsrfController],
|
||||
providers: [
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { NotFoundException } from "@nestjs/common";
|
||||
import type { Response } from "express";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import { Test, type TestingModule } from "@nestjs/testing";
|
||||
import { QueueNotificationsController } from "./queue-notifications.controller";
|
||||
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
||||
|
||||
describe("QueueNotificationsController", () => {
|
||||
let controller: QueueNotificationsController;
|
||||
|
||||
const mockService = {
|
||||
listNotifications: vi.fn(),
|
||||
streamNotifications: vi.fn(),
|
||||
ackNotification: vi.fn(),
|
||||
listTasks: vi.fn(),
|
||||
};
|
||||
|
||||
const mockConfigService = {
|
||||
get: vi.fn().mockReturnValue("coordinator-api-key"),
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
vi.clearAllMocks();
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
controllers: [QueueNotificationsController],
|
||||
providers: [
|
||||
{ provide: QueueNotificationsService, useValue: mockService },
|
||||
{ provide: ConfigService, useValue: mockConfigService },
|
||||
],
|
||||
})
|
||||
.overrideGuard(ApiKeyGuard)
|
||||
.useValue({ canActivate: () => true })
|
||||
.compile();
|
||||
|
||||
controller = module.get<QueueNotificationsController>(QueueNotificationsController);
|
||||
});
|
||||
|
||||
it("returns notification objects", async () => {
|
||||
mockService.listNotifications.mockResolvedValue([
|
||||
{
|
||||
id: "notif-1",
|
||||
agent: "mosaic",
|
||||
filename: "notif-1.json",
|
||||
payload: { type: "task.ready" },
|
||||
createdAt: new Date("2026-03-08T22:00:00.000Z"),
|
||||
},
|
||||
]);
|
||||
|
||||
await expect(controller.getNotifications()).resolves.toEqual([
|
||||
expect.objectContaining({
|
||||
id: "notif-1",
|
||||
agent: "mosaic",
|
||||
filename: "notif-1.json",
|
||||
payload: { type: "task.ready" },
|
||||
}),
|
||||
]);
|
||||
});
|
||||
|
||||
it("streams notifications through the response object", async () => {
|
||||
const res = {
|
||||
setHeader: vi.fn(),
|
||||
flushHeaders: vi.fn(),
|
||||
write: vi.fn(),
|
||||
on: vi.fn(),
|
||||
end: vi.fn(),
|
||||
} as unknown as Response;
|
||||
|
||||
mockService.streamNotifications.mockResolvedValue(undefined);
|
||||
|
||||
await controller.streamNotifications(res);
|
||||
|
||||
expect(mockService.streamNotifications).toHaveBeenCalledWith(res);
|
||||
});
|
||||
|
||||
it("acks a notification by id", async () => {
|
||||
mockService.ackNotification.mockResolvedValue({ success: true, id: "notif-2" });
|
||||
|
||||
await expect(controller.ackNotification("notif-2")).resolves.toEqual({
|
||||
success: true,
|
||||
id: "notif-2",
|
||||
});
|
||||
});
|
||||
|
||||
it("surfaces ack errors", async () => {
|
||||
mockService.ackNotification.mockRejectedValue(new NotFoundException("missing"));
|
||||
|
||||
await expect(controller.ackNotification("missing")).rejects.toThrow(NotFoundException);
|
||||
});
|
||||
|
||||
it("returns parsed queue tasks", async () => {
|
||||
mockService.listTasks.mockResolvedValue([
|
||||
{
|
||||
id: "task-1",
|
||||
project: "mosaic-stack",
|
||||
taskId: "MS24-API-001",
|
||||
status: "pending",
|
||||
description: "Build queue notifications module",
|
||||
},
|
||||
]);
|
||||
|
||||
await expect(controller.getTasks()).resolves.toEqual([
|
||||
{
|
||||
id: "task-1",
|
||||
project: "mosaic-stack",
|
||||
taskId: "MS24-API-001",
|
||||
status: "pending",
|
||||
description: "Build queue notifications module",
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("uses ApiKeyGuard at the controller level", () => {
|
||||
const guards = Reflect.getMetadata("__guards__", QueueNotificationsController) as unknown[];
|
||||
|
||||
expect(guards).toContain(ApiKeyGuard);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,36 @@
|
||||
import { Controller, Get, Param, Post, Res, UseGuards } from "@nestjs/common";
|
||||
import type { Response } from "express";
|
||||
import { SkipCsrf } from "../common/decorators/skip-csrf.decorator";
|
||||
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
||||
import {
|
||||
QueueNotificationsService,
|
||||
type QueueNotification,
|
||||
type QueueTask,
|
||||
} from "./queue-notifications.service";
|
||||
|
||||
@Controller("queue")
|
||||
@UseGuards(ApiKeyGuard)
|
||||
export class QueueNotificationsController {
|
||||
constructor(private readonly queueNotificationsService: QueueNotificationsService) {}
|
||||
|
||||
@Get("notifications")
|
||||
async getNotifications(): Promise<QueueNotification[]> {
|
||||
return this.queueNotificationsService.listNotifications();
|
||||
}
|
||||
|
||||
@Get("notifications/stream")
|
||||
async streamNotifications(@Res() res: Response): Promise<void> {
|
||||
await this.queueNotificationsService.streamNotifications(res);
|
||||
}
|
||||
|
||||
@SkipCsrf()
|
||||
@Post("notifications/:id/ack")
|
||||
async ackNotification(@Param("id") id: string): Promise<{ success: true; id: string }> {
|
||||
return this.queueNotificationsService.ackNotification(id);
|
||||
}
|
||||
|
||||
@Get("tasks")
|
||||
async getTasks(): Promise<QueueTask[]> {
|
||||
return this.queueNotificationsService.listTasks();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { ConfigModule } from "@nestjs/config";
|
||||
import { AuthModule } from "../auth/auth.module";
|
||||
import { ApiKeyGuard } from "../common/guards/api-key.guard";
|
||||
import { QueueNotificationsController } from "./queue-notifications.controller";
|
||||
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||
|
||||
@Module({
|
||||
imports: [ConfigModule, AuthModule],
|
||||
controllers: [QueueNotificationsController],
|
||||
providers: [QueueNotificationsService, ApiKeyGuard],
|
||||
exports: [QueueNotificationsService],
|
||||
})
|
||||
export class QueueNotificationsModule {}
|
||||
@@ -0,0 +1,172 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import { Logger, NotFoundException } from "@nestjs/common";
|
||||
import { mkdtemp, mkdir, rm, writeFile } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { execFile } from "node:child_process";
|
||||
import { QueueNotificationsService } from "./queue-notifications.service";
|
||||
|
||||
vi.mock("node:child_process", () => ({
|
||||
execFile: vi.fn(),
|
||||
}));
|
||||
|
||||
describe("QueueNotificationsService", () => {
|
||||
let service: QueueNotificationsService;
|
||||
let inboxDir: string;
|
||||
let configService: ConfigService;
|
||||
|
||||
beforeEach(async () => {
|
||||
vi.clearAllMocks();
|
||||
inboxDir = await mkdtemp(join(tmpdir(), "queue-notifications-"));
|
||||
configService = {
|
||||
get: vi.fn((key: string) => {
|
||||
if (key === "MOSAIC_QUEUE_INBOX_DIR") {
|
||||
return inboxDir;
|
||||
}
|
||||
|
||||
if (key === "MOSAIC_QUEUE_CLI") {
|
||||
return "/tmp/mosaic-queue-cli.js";
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}),
|
||||
} as unknown as ConfigService;
|
||||
|
||||
service = new QueueNotificationsService(configService);
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
vi.restoreAllMocks();
|
||||
await rm(inboxDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
describe("onModuleInit", () => {
|
||||
it("logs a warning when the inbox directory does not exist", async () => {
|
||||
await rm(inboxDir, { recursive: true, force: true });
|
||||
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||
|
||||
await service.onModuleInit();
|
||||
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining("Queue notifications inbox directory does not exist")
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("listNotifications", () => {
|
||||
it("returns parsed notifications from agent inbox directories", async () => {
|
||||
await mkdir(join(inboxDir, "mosaic"), { recursive: true });
|
||||
await mkdir(join(inboxDir, "mosaic", "_acked"), { recursive: true });
|
||||
await mkdir(join(inboxDir, "sage"), { recursive: true });
|
||||
await writeFile(
|
||||
join(inboxDir, "mosaic", "notif-1.json"),
|
||||
JSON.stringify({ type: "task.ready", taskId: "MS24-API-001" })
|
||||
);
|
||||
await writeFile(
|
||||
join(inboxDir, "mosaic", "_acked", "notif-ignored.json"),
|
||||
JSON.stringify({ ignored: true })
|
||||
);
|
||||
await writeFile(join(inboxDir, "sage", "notif-2.json"), JSON.stringify({ type: "done" }));
|
||||
|
||||
const notifications = await service.listNotifications();
|
||||
|
||||
expect(notifications).toHaveLength(2);
|
||||
expect(notifications).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
id: "notif-1",
|
||||
agent: "mosaic",
|
||||
filename: "notif-1.json",
|
||||
payload: { type: "task.ready", taskId: "MS24-API-001" },
|
||||
}),
|
||||
expect.objectContaining({
|
||||
id: "notif-2",
|
||||
agent: "sage",
|
||||
filename: "notif-2.json",
|
||||
payload: { type: "done" },
|
||||
}),
|
||||
])
|
||||
);
|
||||
});
|
||||
|
||||
it("returns an empty array when the inbox directory is missing", async () => {
|
||||
await rm(inboxDir, { recursive: true, force: true });
|
||||
|
||||
await expect(service.listNotifications()).resolves.toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ackNotification", () => {
|
||||
it("executes the queue CLI with node and ack args", async () => {
|
||||
await mkdir(join(inboxDir, "mosaic"), { recursive: true });
|
||||
await writeFile(join(inboxDir, "mosaic", "notif-3.json"), JSON.stringify({ ok: true }));
|
||||
vi.mocked(execFile).mockImplementation(
|
||||
(
|
||||
_command: string,
|
||||
_args: readonly string[],
|
||||
callback: (error: Error | null, stdout: string, stderr: string) => void
|
||||
) => callback(null, "acked", "")
|
||||
);
|
||||
|
||||
await expect(service.ackNotification("notif-3")).resolves.toEqual({
|
||||
success: true,
|
||||
id: "notif-3",
|
||||
});
|
||||
|
||||
expect(execFile).toHaveBeenCalledWith(
|
||||
"node",
|
||||
["/tmp/mosaic-queue-cli.js", "ack", "notif-3"],
|
||||
expect.any(Function)
|
||||
);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when the notification does not exist", async () => {
|
||||
await expect(service.ackNotification("missing")).rejects.toThrow(NotFoundException);
|
||||
expect(execFile).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("listTasks", () => {
|
||||
it("parses tab-separated CLI output", async () => {
|
||||
vi.mocked(execFile).mockImplementation(
|
||||
(
|
||||
_command: string,
|
||||
_args: readonly string[],
|
||||
callback: (error: Error | null, stdout: string, stderr: string) => void
|
||||
) =>
|
||||
callback(
|
||||
null,
|
||||
[
|
||||
"task-1\tmosaic-stack/MS24-API-001\t[pending]\tBuild queue notifications module",
|
||||
"task-2\tmosaic-stack/MS24-API-002\t[done]\tWrite tests",
|
||||
].join("\n"),
|
||||
""
|
||||
)
|
||||
);
|
||||
|
||||
await expect(service.listTasks()).resolves.toEqual([
|
||||
{
|
||||
id: "task-1",
|
||||
project: "mosaic-stack",
|
||||
taskId: "MS24-API-001",
|
||||
status: "pending",
|
||||
description: "Build queue notifications module",
|
||||
},
|
||||
{
|
||||
id: "task-2",
|
||||
project: "mosaic-stack",
|
||||
taskId: "MS24-API-002",
|
||||
status: "done",
|
||||
description: "Write tests",
|
||||
},
|
||||
]);
|
||||
|
||||
expect(execFile).toHaveBeenCalledWith(
|
||||
"node",
|
||||
["/tmp/mosaic-queue-cli.js", "list", "mosaic-stack"],
|
||||
expect.any(Function)
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
231
apps/api/src/queue-notifications/queue-notifications.service.ts
Normal file
231
apps/api/src/queue-notifications/queue-notifications.service.ts
Normal file
@@ -0,0 +1,231 @@
|
||||
import {
|
||||
Injectable,
|
||||
InternalServerErrorException,
|
||||
Logger,
|
||||
NotFoundException,
|
||||
OnModuleInit,
|
||||
} from "@nestjs/common";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import { execFile } from "node:child_process";
|
||||
import { access, readdir, readFile, stat } from "node:fs/promises";
|
||||
import { homedir } from "node:os";
|
||||
import { basename, join } from "node:path";
|
||||
import type { Response } from "express";
|
||||
import chokidar from "chokidar";
|
||||
|
||||
export interface QueueNotification {
|
||||
id: string;
|
||||
agent: string;
|
||||
filename: string;
|
||||
payload: unknown;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
export interface QueueTask {
|
||||
id: string;
|
||||
project: string;
|
||||
taskId: string;
|
||||
status: string;
|
||||
description: string;
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class QueueNotificationsService implements OnModuleInit {
|
||||
private readonly logger = new Logger(QueueNotificationsService.name);
|
||||
|
||||
constructor(private readonly configService: ConfigService) {}
|
||||
|
||||
async onModuleInit(): Promise<void> {
|
||||
if (!(await this.inboxDirExists())) {
|
||||
this.logger.warn(`Queue notifications inbox directory does not exist: ${this.getInboxDir()}`);
|
||||
}
|
||||
}
|
||||
|
||||
async listNotifications(): Promise<QueueNotification[]> {
|
||||
const inboxDir = this.getInboxDir();
|
||||
|
||||
if (!(await this.inboxDirExists())) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Paths come from controlled config plus directory entries under the inbox root.
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
const agentEntries = await readdir(inboxDir, { withFileTypes: true });
|
||||
const notifications: QueueNotification[] = [];
|
||||
|
||||
for (const agentEntry of agentEntries) {
|
||||
if (!agentEntry.isDirectory() || this.isIgnoredDirectory(agentEntry.name)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const agentDir = join(inboxDir, agentEntry.name);
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
const files = await readdir(agentDir, { withFileTypes: true });
|
||||
|
||||
for (const fileEntry of files) {
|
||||
if (!fileEntry.isFile() || !fileEntry.name.endsWith(".json")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const filePath = join(agentDir, fileEntry.name);
|
||||
const [rawPayload, fileStats] = await Promise.all([
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
readFile(filePath, "utf8"),
|
||||
// eslint-disable-next-line security/detect-non-literal-fs-filename
|
||||
stat(filePath),
|
||||
]);
|
||||
|
||||
notifications.push({
|
||||
id: basename(fileEntry.name, ".json"),
|
||||
agent: agentEntry.name,
|
||||
filename: fileEntry.name,
|
||||
payload: JSON.parse(rawPayload) as unknown,
|
||||
createdAt: fileStats.birthtime,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return notifications.sort(
|
||||
(left, right) => right.createdAt.getTime() - left.createdAt.getTime()
|
||||
);
|
||||
}
|
||||
|
||||
async streamNotifications(res: Response): Promise<void> {
|
||||
res.setHeader("Content-Type", "text/event-stream");
|
||||
res.setHeader("Cache-Control", "no-cache");
|
||||
res.setHeader("Connection", "keep-alive");
|
||||
res.setHeader("X-Accel-Buffering", "no");
|
||||
|
||||
if (typeof res.flushHeaders === "function") {
|
||||
res.flushHeaders();
|
||||
}
|
||||
|
||||
const emitNotifications = async (): Promise<void> => {
|
||||
try {
|
||||
const notifications = await this.listNotifications();
|
||||
res.write(`data: ${JSON.stringify(notifications)}\n\n`);
|
||||
} catch (error: unknown) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
res.write(`event: error\n`);
|
||||
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
|
||||
}
|
||||
};
|
||||
|
||||
await emitNotifications();
|
||||
|
||||
const watcher = chokidar.watch(this.getInboxDir(), {
|
||||
ignoreInitial: true,
|
||||
persistent: true,
|
||||
ignored: (watchedPath: string) => {
|
||||
return watchedPath.includes("/_acked/") || watchedPath.includes("/_dead-letter/");
|
||||
},
|
||||
});
|
||||
|
||||
watcher.on("add", () => {
|
||||
void emitNotifications();
|
||||
});
|
||||
|
||||
watcher.on("unlink", () => {
|
||||
void emitNotifications();
|
||||
});
|
||||
|
||||
res.on("close", () => {
|
||||
void watcher.close();
|
||||
res.end();
|
||||
});
|
||||
}
|
||||
|
||||
async ackNotification(id: string): Promise<{ success: true; id: string }> {
|
||||
const notification = (await this.listNotifications()).find((entry) => entry.id === id);
|
||||
|
||||
if (!notification) {
|
||||
throw new NotFoundException(`Queue notification ${id} not found`);
|
||||
}
|
||||
|
||||
await this.execQueueCli(["ack", notification.id]);
|
||||
|
||||
return {
|
||||
success: true,
|
||||
id: notification.id,
|
||||
};
|
||||
}
|
||||
|
||||
async listTasks(): Promise<QueueTask[]> {
|
||||
const stdout = await this.execQueueCli(["list", "mosaic-stack"]);
|
||||
|
||||
return stdout
|
||||
.split(/\r?\n/)
|
||||
.map((line) => line.trim())
|
||||
.filter((line) => line.length > 0)
|
||||
.map((line) => {
|
||||
const [rawId = "", projectTaskId = "", rawStatus = "", description = ""] = line.split("\t");
|
||||
const [project = "", taskId = ""] = projectTaskId.split("/");
|
||||
|
||||
return {
|
||||
id: rawId,
|
||||
project,
|
||||
taskId,
|
||||
status: rawStatus.replace(/^\[/, "").replace(/\]$/, ""),
|
||||
description,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
private async execQueueCli(args: string[]): Promise<string> {
|
||||
const cliPath = this.getQueueCliPath();
|
||||
|
||||
return new Promise<string>((resolve, reject) => {
|
||||
execFile("node", [cliPath, ...args], (error, stdout, stderr) => {
|
||||
if (error) {
|
||||
this.logger.error(
|
||||
`Queue CLI command failed: node ${cliPath} ${args.join(" ")} | ${stderr || error.message}`
|
||||
);
|
||||
reject(
|
||||
new InternalServerErrorException(`Queue CLI command failed: ${stderr || error.message}`)
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
resolve(stdout);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private getInboxDir(): string {
|
||||
return this.expandHomePath(
|
||||
this.configService.get<string>("MOSAIC_QUEUE_INBOX_DIR") ??
|
||||
"~/.openclaw/workspace/agent-inbox"
|
||||
);
|
||||
}
|
||||
|
||||
private getQueueCliPath(): string {
|
||||
return this.expandHomePath(
|
||||
this.configService.get<string>("MOSAIC_QUEUE_CLI") ?? "~/src/mosaic-queue/dist/cli.js"
|
||||
);
|
||||
}
|
||||
|
||||
private expandHomePath(value: string): string {
|
||||
if (value === "~") {
|
||||
return homedir();
|
||||
}
|
||||
|
||||
if (value.startsWith("~/")) {
|
||||
return join(homedir(), value.slice(2));
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
private async inboxDirExists(): Promise<boolean> {
|
||||
try {
|
||||
await access(this.getInboxDir());
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private isIgnoredDirectory(name: string): boolean {
|
||||
return name === "_acked" || name === "_dead-letter";
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@mosaic/orchestrator",
|
||||
"version": "0.0.20",
|
||||
"version": "0.0.23",
|
||||
"private": true,
|
||||
"scripts": {
|
||||
"build": "nest build",
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { PrismaModule } from "../../prisma/prisma.module";
|
||||
import { OrchestratorApiKeyGuard } from "../../common/guards/api-key.guard";
|
||||
import { EncryptionService } from "../../security/encryption.service";
|
||||
import { AgentProvidersController } from "./agent-providers.controller";
|
||||
import { AgentProvidersService } from "./agent-providers.service";
|
||||
|
||||
@Module({
|
||||
imports: [PrismaModule],
|
||||
controllers: [AgentProvidersController],
|
||||
providers: [OrchestratorApiKeyGuard, AgentProvidersService],
|
||||
providers: [OrchestratorApiKeyGuard, EncryptionService, AgentProvidersService],
|
||||
})
|
||||
export class AgentProvidersModule {}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { NotFoundException } from "@nestjs/common";
|
||||
import { EncryptionService } from "../../security/encryption.service";
|
||||
import { AgentProvidersService } from "./agent-providers.service";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
|
||||
@@ -14,6 +15,9 @@ describe("AgentProvidersService", () => {
|
||||
delete: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
let encryptionService: {
|
||||
encryptIfNeeded: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
prisma = {
|
||||
@@ -26,7 +30,14 @@ describe("AgentProvidersService", () => {
|
||||
},
|
||||
};
|
||||
|
||||
service = new AgentProvidersService(prisma as unknown as PrismaService);
|
||||
encryptionService = {
|
||||
encryptIfNeeded: vi.fn((value: string) => `enc:${value}`),
|
||||
};
|
||||
|
||||
service = new AgentProvidersService(
|
||||
prisma as unknown as PrismaService,
|
||||
encryptionService as unknown as EncryptionService
|
||||
);
|
||||
});
|
||||
|
||||
it("lists all provider configs", async () => {
|
||||
@@ -111,6 +122,42 @@ describe("AgentProvidersService", () => {
|
||||
credentials: {},
|
||||
},
|
||||
});
|
||||
expect(encryptionService.encryptIfNeeded).not.toHaveBeenCalled();
|
||||
expect(result).toEqual(created);
|
||||
});
|
||||
|
||||
it("encrypts openclaw token credentials when creating provider config", async () => {
|
||||
const created = {
|
||||
id: "cfg-openclaw",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "OpenClaw",
|
||||
provider: "openclaw",
|
||||
gatewayUrl: "https://openclaw.example.com",
|
||||
credentials: { apiToken: "enc:top-secret" },
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
};
|
||||
prisma.agentProviderConfig.create.mockResolvedValue(created);
|
||||
|
||||
const result = await service.create({
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "OpenClaw",
|
||||
provider: "openclaw",
|
||||
gatewayUrl: "https://openclaw.example.com",
|
||||
credentials: { apiToken: "top-secret" },
|
||||
});
|
||||
|
||||
expect(encryptionService.encryptIfNeeded).toHaveBeenCalledWith("top-secret");
|
||||
expect(prisma.agentProviderConfig.create).toHaveBeenCalledWith({
|
||||
data: {
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "OpenClaw",
|
||||
provider: "openclaw",
|
||||
gatewayUrl: "https://openclaw.example.com",
|
||||
credentials: { apiToken: "enc:top-secret" },
|
||||
},
|
||||
});
|
||||
expect(result).toEqual(created);
|
||||
});
|
||||
|
||||
@@ -156,6 +203,47 @@ describe("AgentProvidersService", () => {
|
||||
isActive: false,
|
||||
},
|
||||
});
|
||||
expect(encryptionService.encryptIfNeeded).not.toHaveBeenCalled();
|
||||
expect(result).toEqual(updated);
|
||||
});
|
||||
|
||||
it("encrypts openclaw token credentials when updating provider config", async () => {
|
||||
prisma.agentProviderConfig.findUnique.mockResolvedValue({
|
||||
id: "cfg-openclaw",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "OpenClaw",
|
||||
provider: "openclaw",
|
||||
gatewayUrl: "https://openclaw.example.com",
|
||||
credentials: { apiToken: "enc:existing" },
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
});
|
||||
|
||||
const updated = {
|
||||
id: "cfg-openclaw",
|
||||
workspaceId: "8bcd7eda-a122-4d6c-adfd-b152f6f75369",
|
||||
name: "OpenClaw",
|
||||
provider: "openclaw",
|
||||
gatewayUrl: "https://openclaw.example.com",
|
||||
credentials: { apiToken: "enc:rotated-token" },
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T18:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T19:00:00.000Z"),
|
||||
};
|
||||
prisma.agentProviderConfig.update.mockResolvedValue(updated);
|
||||
|
||||
const result = await service.update("cfg-openclaw", {
|
||||
credentials: { apiToken: "rotated-token" },
|
||||
});
|
||||
|
||||
expect(encryptionService.encryptIfNeeded).toHaveBeenCalledWith("rotated-token");
|
||||
expect(prisma.agentProviderConfig.update).toHaveBeenCalledWith({
|
||||
where: { id: "cfg-openclaw" },
|
||||
data: {
|
||||
credentials: { apiToken: "enc:rotated-token" },
|
||||
},
|
||||
});
|
||||
expect(result).toEqual(updated);
|
||||
});
|
||||
|
||||
|
||||
@@ -1,12 +1,19 @@
|
||||
import { Injectable, NotFoundException } from "@nestjs/common";
|
||||
import type { AgentProviderConfig, Prisma } from "@prisma/client";
|
||||
import { EncryptionService } from "../../security/encryption.service";
|
||||
import { PrismaService } from "../../prisma/prisma.service";
|
||||
import { CreateAgentProviderDto } from "./dto/create-agent-provider.dto";
|
||||
import { UpdateAgentProviderDto } from "./dto/update-agent-provider.dto";
|
||||
|
||||
const OPENCLAW_PROVIDER_TYPE = "openclaw";
|
||||
const OPENCLAW_TOKEN_KEYS = ["apiToken", "token", "bearerToken"] as const;
|
||||
|
||||
@Injectable()
|
||||
export class AgentProvidersService {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly encryptionService: EncryptionService
|
||||
) {}
|
||||
|
||||
async list(): Promise<AgentProviderConfig[]> {
|
||||
return this.prisma.agentProviderConfig.findMany({
|
||||
@@ -27,20 +34,23 @@ export class AgentProvidersService {
|
||||
}
|
||||
|
||||
async create(dto: CreateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||
const credentials = this.sanitizeCredentials(dto.provider, dto.credentials ?? {});
|
||||
|
||||
return this.prisma.agentProviderConfig.create({
|
||||
data: {
|
||||
workspaceId: dto.workspaceId,
|
||||
name: dto.name,
|
||||
provider: dto.provider,
|
||||
gatewayUrl: dto.gatewayUrl,
|
||||
credentials: this.toJsonValue(dto.credentials ?? {}),
|
||||
credentials: this.toJsonValue(credentials),
|
||||
...(dto.isActive !== undefined ? { isActive: dto.isActive } : {}),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async update(id: string, dto: UpdateAgentProviderDto): Promise<AgentProviderConfig> {
|
||||
await this.getById(id);
|
||||
const existingConfig = await this.getById(id);
|
||||
const provider = dto.provider ?? existingConfig.provider;
|
||||
|
||||
const data: Prisma.AgentProviderConfigUpdateInput = {
|
||||
...(dto.workspaceId !== undefined ? { workspaceId: dto.workspaceId } : {}),
|
||||
@@ -48,7 +58,9 @@ export class AgentProvidersService {
|
||||
...(dto.provider !== undefined ? { provider: dto.provider } : {}),
|
||||
...(dto.gatewayUrl !== undefined ? { gatewayUrl: dto.gatewayUrl } : {}),
|
||||
...(dto.isActive !== undefined ? { isActive: dto.isActive } : {}),
|
||||
...(dto.credentials !== undefined ? { credentials: this.toJsonValue(dto.credentials) } : {}),
|
||||
...(dto.credentials !== undefined
|
||||
? { credentials: this.toJsonValue(this.sanitizeCredentials(provider, dto.credentials)) }
|
||||
: {}),
|
||||
};
|
||||
|
||||
return this.prisma.agentProviderConfig.update({
|
||||
@@ -65,6 +77,25 @@ export class AgentProvidersService {
|
||||
});
|
||||
}
|
||||
|
||||
private sanitizeCredentials(
|
||||
provider: string,
|
||||
credentials: Record<string, unknown>
|
||||
): Record<string, unknown> {
|
||||
if (provider.toLowerCase() !== OPENCLAW_PROVIDER_TYPE) {
|
||||
return credentials;
|
||||
}
|
||||
|
||||
const nextCredentials: Record<string, unknown> = { ...credentials };
|
||||
for (const key of OPENCLAW_TOKEN_KEYS) {
|
||||
const tokenValue = nextCredentials[key];
|
||||
if (typeof tokenValue === "string" && tokenValue.length > 0) {
|
||||
nextCredentials[key] = this.encryptionService.encryptIfNeeded(tokenValue);
|
||||
}
|
||||
}
|
||||
|
||||
return nextCredentials;
|
||||
}
|
||||
|
||||
private toJsonValue(value: Record<string, unknown>): Prisma.InputJsonValue {
|
||||
return value as Prisma.InputJsonValue;
|
||||
}
|
||||
|
||||
@@ -146,7 +146,7 @@ export class AgentsController {
|
||||
* Return recent orchestrator events for non-streaming consumers.
|
||||
*/
|
||||
@Get("events/recent")
|
||||
@Throttle({ status: { limit: 200, ttl: 60000 } })
|
||||
@Throttle({ default: { limit: 1000, ttl: 60000 } })
|
||||
getRecentEvents(@Query("limit") limit?: string): {
|
||||
events: ReturnType<AgentEventsService["getRecentEvents"]>;
|
||||
} {
|
||||
|
||||
@@ -0,0 +1,145 @@
|
||||
import type { HttpService } from "@nestjs/axios";
|
||||
import type { AgentMessage } from "@mosaic/shared";
|
||||
import { Readable } from "node:stream";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
|
||||
|
||||
describe("OpenClawSseBridge", () => {
|
||||
let bridge: OpenClawSseBridge;
|
||||
let httpService: {
|
||||
axiosRef: {
|
||||
get: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
httpService = {
|
||||
axiosRef: {
|
||||
get: vi.fn(),
|
||||
},
|
||||
};
|
||||
|
||||
bridge = new OpenClawSseBridge(httpService as unknown as HttpService);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("maps message and status events, and skips heartbeats", async () => {
|
||||
httpService.axiosRef.get.mockResolvedValue({
|
||||
data: Readable.from([
|
||||
'event: message\ndata: {"id":"msg-1","role":"assistant","content":"hello","timestamp":"2026-03-07T16:00:00.000Z"}\n\n',
|
||||
"event: heartbeat\ndata: {}\n\n",
|
||||
'event: status\ndata: {"status":"paused","timestamp":"2026-03-07T16:00:01.000Z"}\n\n',
|
||||
"data: [DONE]\n\n",
|
||||
]),
|
||||
});
|
||||
|
||||
const messages = await collectMessages(
|
||||
bridge.streamSession("https://gateway.example.com/", "session-1", {
|
||||
Authorization: "Bearer test-token",
|
||||
})
|
||||
);
|
||||
|
||||
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
|
||||
"https://gateway.example.com/api/sessions/session-1/stream",
|
||||
{
|
||||
headers: {
|
||||
Authorization: "Bearer test-token",
|
||||
Accept: "text/event-stream",
|
||||
},
|
||||
responseType: "stream",
|
||||
}
|
||||
);
|
||||
|
||||
expect(messages).toHaveLength(2);
|
||||
expect(messages[0]).toEqual({
|
||||
id: "msg-1",
|
||||
sessionId: "session-1",
|
||||
role: "assistant",
|
||||
content: "hello",
|
||||
timestamp: new Date("2026-03-07T16:00:00.000Z"),
|
||||
});
|
||||
|
||||
expect(messages[1]).toEqual({
|
||||
id: expect.any(String),
|
||||
sessionId: "session-1",
|
||||
role: "system",
|
||||
content: "Session status changed to paused",
|
||||
timestamp: new Date("2026-03-07T16:00:01.000Z"),
|
||||
metadata: {
|
||||
status: "paused",
|
||||
timestamp: "2026-03-07T16:00:01.000Z",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("retries after disconnect and resumes streaming", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
httpService.axiosRef.get
|
||||
.mockResolvedValueOnce({
|
||||
data: Readable.from([
|
||||
'event: message\ndata: {"id":"msg-1","content":"first","timestamp":"2026-03-07T16:10:00.000Z"}\n\n',
|
||||
]),
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
data: Readable.from(["data: [DONE]\n\n"]),
|
||||
});
|
||||
|
||||
const consumePromise = collectMessages(
|
||||
bridge.streamSession("https://gateway.example.com", "session-1", {
|
||||
Authorization: "Bearer test-token",
|
||||
})
|
||||
);
|
||||
|
||||
await vi.advanceTimersByTimeAsync(2000);
|
||||
|
||||
const messages = await consumePromise;
|
||||
|
||||
expect(httpService.axiosRef.get).toHaveBeenCalledTimes(2);
|
||||
expect(messages).toEqual([
|
||||
{
|
||||
id: "msg-1",
|
||||
sessionId: "session-1",
|
||||
role: "user",
|
||||
content: "first",
|
||||
timestamp: new Date("2026-03-07T16:10:00.000Z"),
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("throws after exhausting reconnect retries", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
httpService.axiosRef.get.mockRejectedValue(new Error("socket closed"));
|
||||
|
||||
const consumePromise = collectMessages(
|
||||
bridge.streamSession("https://gateway.example.com", "session-1", {
|
||||
Authorization: "Bearer test-token",
|
||||
})
|
||||
);
|
||||
|
||||
const rejection = expect(consumePromise).rejects.toThrow(
|
||||
"Failed to reconnect OpenClaw stream for session session-1 after 5 retries: socket closed"
|
||||
);
|
||||
|
||||
for (let attempt = 0; attempt < 5; attempt += 1) {
|
||||
await vi.advanceTimersByTimeAsync(2000);
|
||||
}
|
||||
|
||||
await rejection;
|
||||
expect(httpService.axiosRef.get).toHaveBeenCalledTimes(6);
|
||||
});
|
||||
});
|
||||
|
||||
async function collectMessages(stream: AsyncIterable<AgentMessage>): Promise<AgentMessage[]> {
|
||||
const messages: AgentMessage[] = [];
|
||||
|
||||
for await (const message of stream) {
|
||||
messages.push(message);
|
||||
}
|
||||
|
||||
return messages;
|
||||
}
|
||||
@@ -0,0 +1,420 @@
|
||||
import { HttpService } from "@nestjs/axios";
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import type { AgentMessage, AgentMessageRole } from "@mosaic/shared";
|
||||
import { randomUUID } from "node:crypto";
|
||||
|
||||
const STREAM_RETRY_DELAY_MS = 2000;
|
||||
const STREAM_MAX_RETRIES = 5;
|
||||
|
||||
type JsonRecord = Record<string, unknown>;
|
||||
type AsyncChunkStream = AsyncIterable<string | Uint8Array | Buffer>;
|
||||
|
||||
type ParsedStreamEvent =
|
||||
| {
|
||||
type: "message";
|
||||
message: AgentMessage;
|
||||
}
|
||||
| {
|
||||
type: "done";
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class OpenClawSseBridge {
|
||||
constructor(private readonly httpService: HttpService) {}
|
||||
|
||||
async *streamSession(
|
||||
baseUrl: string,
|
||||
sessionId: string,
|
||||
headers: Record<string, string>
|
||||
): AsyncIterable<AgentMessage> {
|
||||
let retryCount = 0;
|
||||
let lastError: unknown = new Error("OpenClaw stream disconnected");
|
||||
|
||||
while (retryCount <= STREAM_MAX_RETRIES) {
|
||||
try {
|
||||
const response = await this.httpService.axiosRef.get(
|
||||
this.buildStreamUrl(baseUrl, sessionId),
|
||||
{
|
||||
headers: {
|
||||
...headers,
|
||||
Accept: "text/event-stream",
|
||||
},
|
||||
responseType: "stream",
|
||||
}
|
||||
);
|
||||
|
||||
const stream = this.asAsyncChunkStream(response.data);
|
||||
if (stream === null) {
|
||||
throw new Error("OpenClaw stream response is not readable");
|
||||
}
|
||||
|
||||
retryCount = 0;
|
||||
let streamCompleted = false;
|
||||
|
||||
for await (const event of this.parseStream(stream, sessionId)) {
|
||||
if (event.type === "done") {
|
||||
streamCompleted = true;
|
||||
break;
|
||||
}
|
||||
|
||||
yield event.message;
|
||||
}
|
||||
|
||||
if (streamCompleted) {
|
||||
return;
|
||||
}
|
||||
|
||||
lastError = new Error("OpenClaw stream disconnected");
|
||||
} catch (error) {
|
||||
lastError = error;
|
||||
}
|
||||
|
||||
if (retryCount >= STREAM_MAX_RETRIES) {
|
||||
throw new Error(
|
||||
`Failed to reconnect OpenClaw stream for session ${sessionId} after ${String(STREAM_MAX_RETRIES)} retries: ${this.toErrorMessage(lastError)}`
|
||||
);
|
||||
}
|
||||
|
||||
retryCount += 1;
|
||||
await this.delay(STREAM_RETRY_DELAY_MS);
|
||||
}
|
||||
}
|
||||
|
||||
private async *parseStream(
|
||||
stream: AsyncChunkStream,
|
||||
sessionId: string
|
||||
): AsyncGenerator<ParsedStreamEvent> {
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
|
||||
for await (const chunk of stream) {
|
||||
const textChunk = typeof chunk === "string" ? chunk : decoder.decode(chunk, { stream: true });
|
||||
buffer += textChunk.replace(/\r\n/gu, "\n");
|
||||
|
||||
const rawEvents = buffer.split("\n\n");
|
||||
buffer = rawEvents.pop() ?? "";
|
||||
|
||||
for (const rawEvent of rawEvents) {
|
||||
const parsedEvent = this.parseRawEvent(rawEvent);
|
||||
if (parsedEvent === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (parsedEvent.data === "[DONE]") {
|
||||
yield {
|
||||
type: "done",
|
||||
};
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = this.tryParseJson(parsedEvent.data) ?? parsedEvent.data;
|
||||
const message = this.mapEventToMessage(parsedEvent.type, payload, sessionId);
|
||||
if (message !== null) {
|
||||
yield {
|
||||
type: "message",
|
||||
message,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
buffer += decoder.decode();
|
||||
|
||||
const trailingEvent = this.parseRawEvent(buffer.trim());
|
||||
if (trailingEvent === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (trailingEvent.data === "[DONE]") {
|
||||
yield {
|
||||
type: "done",
|
||||
};
|
||||
return;
|
||||
}
|
||||
|
||||
const payload = this.tryParseJson(trailingEvent.data) ?? trailingEvent.data;
|
||||
const message = this.mapEventToMessage(trailingEvent.type, payload, sessionId);
|
||||
if (message !== null) {
|
||||
yield {
|
||||
type: "message",
|
||||
message,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
private parseRawEvent(rawEvent: string): { type: string; data: string } | null {
|
||||
if (rawEvent.trim().length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
let type = "message";
|
||||
const dataLines: string[] = [];
|
||||
|
||||
for (const line of rawEvent.split("\n")) {
|
||||
const trimmedLine = line.trimEnd();
|
||||
if (trimmedLine.length === 0 || trimmedLine.startsWith(":")) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (trimmedLine.startsWith("event:")) {
|
||||
type = trimmedLine.slice(6).trim().toLowerCase();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (trimmedLine.startsWith("data:")) {
|
||||
dataLines.push(trimmedLine.slice(5).trimStart());
|
||||
}
|
||||
}
|
||||
|
||||
if (dataLines.length > 0) {
|
||||
return {
|
||||
type,
|
||||
data: dataLines.join("\n").trim(),
|
||||
};
|
||||
}
|
||||
|
||||
const trimmedEvent = rawEvent.trim();
|
||||
if (trimmedEvent.startsWith("{") || trimmedEvent.startsWith("[")) {
|
||||
return {
|
||||
type,
|
||||
data: trimmedEvent,
|
||||
};
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private mapEventToMessage(
|
||||
eventType: string,
|
||||
payload: unknown,
|
||||
fallbackSessionId: string
|
||||
): AgentMessage | null {
|
||||
switch (eventType) {
|
||||
case "heartbeat":
|
||||
return null;
|
||||
case "status":
|
||||
return this.toStatusMessage(payload, fallbackSessionId);
|
||||
case "message":
|
||||
default:
|
||||
return this.toAgentMessage(payload, fallbackSessionId);
|
||||
}
|
||||
}
|
||||
|
||||
private toStatusMessage(value: unknown, sessionId: string): AgentMessage | null {
|
||||
if (typeof value === "string") {
|
||||
const status = value.trim();
|
||||
if (status.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
id: randomUUID(),
|
||||
sessionId,
|
||||
role: "system",
|
||||
content: `Session status changed to ${status}`,
|
||||
timestamp: new Date(),
|
||||
metadata: {
|
||||
status,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
if (!this.isRecord(value)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const status = this.readString(value.status);
|
||||
if (!status) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
id: randomUUID(),
|
||||
sessionId,
|
||||
role: "system",
|
||||
content: `Session status changed to ${status}`,
|
||||
timestamp: this.parseDate(value.timestamp ?? value.updatedAt),
|
||||
metadata: value,
|
||||
};
|
||||
}
|
||||
|
||||
private toAgentMessage(value: unknown, fallbackSessionId: string): AgentMessage | null {
|
||||
if (typeof value === "string") {
|
||||
const content = value.trim();
|
||||
if (content.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
id: randomUUID(),
|
||||
sessionId: fallbackSessionId,
|
||||
role: "assistant",
|
||||
content,
|
||||
timestamp: new Date(),
|
||||
};
|
||||
}
|
||||
|
||||
let candidate: JsonRecord | null = null;
|
||||
|
||||
if (this.isRecord(value) && this.isRecord(value.message)) {
|
||||
candidate = value.message;
|
||||
} else if (this.isRecord(value)) {
|
||||
candidate = value;
|
||||
}
|
||||
|
||||
if (candidate === null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const sessionId = this.readString(candidate.sessionId) ?? fallbackSessionId;
|
||||
if (!sessionId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const content = this.extractMessageContent(
|
||||
candidate.content ?? candidate.text ?? candidate.message
|
||||
);
|
||||
if (content.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const metadata = this.toMetadata(candidate.metadata);
|
||||
|
||||
return {
|
||||
id: this.readString(candidate.id) ?? this.readString(candidate.messageId) ?? randomUUID(),
|
||||
sessionId,
|
||||
role: this.toMessageRole(this.readString(candidate.role) ?? this.readString(candidate.type)),
|
||||
content,
|
||||
timestamp: this.parseDate(candidate.timestamp ?? candidate.createdAt),
|
||||
...(metadata !== undefined ? { metadata } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
private extractMessageContent(content: unknown): string {
|
||||
if (typeof content === "string") {
|
||||
return content.trim();
|
||||
}
|
||||
|
||||
if (Array.isArray(content)) {
|
||||
const parts: string[] = [];
|
||||
|
||||
for (const part of content) {
|
||||
if (typeof part === "string") {
|
||||
const trimmed = part.trim();
|
||||
if (trimmed.length > 0) {
|
||||
parts.push(trimmed);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!this.isRecord(part)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const text = this.readString(part.text) ?? this.readString(part.content);
|
||||
if (text !== undefined && text.trim().length > 0) {
|
||||
parts.push(text.trim());
|
||||
}
|
||||
}
|
||||
|
||||
return parts.join("\n\n").trim();
|
||||
}
|
||||
|
||||
if (this.isRecord(content)) {
|
||||
const text = this.readString(content.text) ?? this.readString(content.content);
|
||||
return text?.trim() ?? "";
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
private toMessageRole(role?: string): AgentMessageRole {
|
||||
switch (role?.toLowerCase()) {
|
||||
case "assistant":
|
||||
case "agent":
|
||||
return "assistant";
|
||||
case "system":
|
||||
return "system";
|
||||
case "tool":
|
||||
return "tool";
|
||||
case "operator":
|
||||
case "user":
|
||||
default:
|
||||
return "user";
|
||||
}
|
||||
}
|
||||
|
||||
private parseDate(value: unknown, fallback = new Date()): Date {
|
||||
if (value instanceof Date) {
|
||||
return value;
|
||||
}
|
||||
|
||||
if (typeof value === "string" || typeof value === "number") {
|
||||
const parsed = new Date(value);
|
||||
if (!Number.isNaN(parsed.getTime())) {
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
|
||||
return fallback;
|
||||
}
|
||||
|
||||
private toMetadata(value: unknown): Record<string, unknown> | undefined {
|
||||
if (this.isRecord(value)) {
|
||||
return value;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
private buildStreamUrl(baseUrl: string, sessionId: string): string {
|
||||
const normalizedBaseUrl = baseUrl.replace(/\/$/u, "");
|
||||
return new URL(
|
||||
`/api/sessions/${encodeURIComponent(sessionId)}/stream`,
|
||||
`${normalizedBaseUrl}/`
|
||||
).toString();
|
||||
}
|
||||
|
||||
private tryParseJson(value: string): unknown {
|
||||
try {
|
||||
return JSON.parse(value) as unknown;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private asAsyncChunkStream(value: unknown): AsyncChunkStream | null {
|
||||
if (value !== null && typeof value === "object" && Symbol.asyncIterator in value) {
|
||||
return value as AsyncChunkStream;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private isRecord(value: unknown): value is JsonRecord {
|
||||
return typeof value === "object" && value !== null && !Array.isArray(value);
|
||||
}
|
||||
|
||||
private readString(value: unknown): string | undefined {
|
||||
if (typeof value !== "string") {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const trimmed = value.trim();
|
||||
return trimmed.length > 0 ? trimmed : undefined;
|
||||
}
|
||||
|
||||
private async delay(ms: number): Promise<void> {
|
||||
await new Promise((resolve) => {
|
||||
setTimeout(resolve, ms);
|
||||
});
|
||||
}
|
||||
|
||||
private toErrorMessage(error: unknown): string {
|
||||
if (error instanceof Error) {
|
||||
return error.message;
|
||||
}
|
||||
|
||||
return String(error);
|
||||
}
|
||||
}
|
||||
@@ -2,16 +2,23 @@ import { HttpService } from "@nestjs/axios";
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import type { AgentProviderConfig } from "@prisma/client";
|
||||
import { EncryptionService } from "../../../security/encryption.service";
|
||||
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
|
||||
import { OpenClawProvider } from "./openclaw.provider";
|
||||
|
||||
@Injectable()
|
||||
export class OpenClawProviderFactory {
|
||||
constructor(
|
||||
private readonly encryptionService: EncryptionService,
|
||||
private readonly httpService: HttpService
|
||||
private readonly httpService: HttpService,
|
||||
private readonly openClawSseBridge: OpenClawSseBridge
|
||||
) {}
|
||||
|
||||
createProvider(config: AgentProviderConfig): OpenClawProvider {
|
||||
return new OpenClawProvider(config, this.encryptionService, this.httpService);
|
||||
return new OpenClawProvider(
|
||||
config,
|
||||
this.encryptionService,
|
||||
this.httpService,
|
||||
this.openClawSseBridge
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,183 @@
|
||||
import type { HttpService } from "@nestjs/axios";
|
||||
import { ServiceUnavailableException } from "@nestjs/common";
|
||||
import type { AgentMessage } from "@mosaic/shared";
|
||||
import type { AgentProviderConfig } from "@prisma/client";
|
||||
import { Readable } from "node:stream";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { EncryptionService } from "../../../security/encryption.service";
|
||||
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
|
||||
import { OpenClawProvider } from "./openclaw.provider";
|
||||
|
||||
describe("Phase 3 gate: OpenClaw provider config registered in DB → provider loaded on boot → sessions returned from /api/mission-control/sessions → inject/pause/kill proxied to gateway", () => {
|
||||
let provider: OpenClawProvider;
|
||||
let httpService: {
|
||||
axiosRef: {
|
||||
get: ReturnType<typeof vi.fn>;
|
||||
post: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
let encryptionService: {
|
||||
decryptIfNeeded: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
const config: AgentProviderConfig = {
|
||||
id: "cfg-openclaw-1",
|
||||
workspaceId: "workspace-1",
|
||||
name: "openclaw-home",
|
||||
provider: "openclaw",
|
||||
gatewayUrl: "https://gateway.example.com",
|
||||
credentials: {
|
||||
apiToken: "enc:token",
|
||||
},
|
||||
isActive: true,
|
||||
createdAt: new Date("2026-03-07T15:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T15:00:00.000Z"),
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
httpService = {
|
||||
axiosRef: {
|
||||
get: vi.fn(),
|
||||
post: vi.fn(),
|
||||
},
|
||||
};
|
||||
|
||||
encryptionService = {
|
||||
decryptIfNeeded: vi.fn().mockReturnValue("plain-token"),
|
||||
};
|
||||
|
||||
provider = new OpenClawProvider(
|
||||
config,
|
||||
encryptionService as unknown as EncryptionService,
|
||||
httpService as unknown as HttpService,
|
||||
new OpenClawSseBridge(httpService as unknown as HttpService)
|
||||
);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("maps listSessions from mocked OpenClaw gateway HTTP responses", async () => {
|
||||
httpService.axiosRef.get.mockResolvedValue({
|
||||
data: {
|
||||
sessions: [
|
||||
{
|
||||
id: "session-1",
|
||||
status: "running",
|
||||
createdAt: "2026-03-07T15:01:00.000Z",
|
||||
updatedAt: "2026-03-07T15:02:00.000Z",
|
||||
},
|
||||
],
|
||||
total: 1,
|
||||
},
|
||||
});
|
||||
|
||||
await expect(provider.listSessions()).resolves.toEqual({
|
||||
sessions: [
|
||||
{
|
||||
id: "session-1",
|
||||
providerId: "openclaw-home",
|
||||
providerType: "openclaw",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T15:01:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T15:02:00.000Z"),
|
||||
},
|
||||
],
|
||||
total: 1,
|
||||
});
|
||||
|
||||
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
|
||||
"https://gateway.example.com/api/sessions",
|
||||
{
|
||||
headers: {
|
||||
Authorization: "Bearer plain-token",
|
||||
},
|
||||
params: {
|
||||
limit: 50,
|
||||
},
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
it("maps streamMessages from mock SSE events into AgentMessage output", async () => {
|
||||
httpService.axiosRef.get.mockResolvedValue({
|
||||
data: Readable.from([
|
||||
'event: message\ndata: {"id":"msg-1","role":"assistant","content":"hello from stream","timestamp":"2026-03-07T15:03:00.000Z"}\n\n',
|
||||
'event: status\ndata: {"status":"paused","timestamp":"2026-03-07T15:04:00.000Z"}\n\n',
|
||||
"data: [DONE]\n\n",
|
||||
]),
|
||||
});
|
||||
|
||||
const messages = await collectMessages(provider.streamMessages("session-1"));
|
||||
|
||||
expect(messages).toEqual([
|
||||
{
|
||||
id: "msg-1",
|
||||
sessionId: "session-1",
|
||||
role: "assistant",
|
||||
content: "hello from stream",
|
||||
timestamp: new Date("2026-03-07T15:03:00.000Z"),
|
||||
},
|
||||
{
|
||||
id: expect.any(String),
|
||||
sessionId: "session-1",
|
||||
role: "system",
|
||||
content: "Session status changed to paused",
|
||||
timestamp: new Date("2026-03-07T15:04:00.000Z"),
|
||||
metadata: {
|
||||
status: "paused",
|
||||
timestamp: "2026-03-07T15:04:00.000Z",
|
||||
},
|
||||
},
|
||||
]);
|
||||
});
|
||||
|
||||
it("handles unavailable gateway errors", async () => {
|
||||
httpService.axiosRef.get.mockRejectedValue(new Error("gateway unavailable"));
|
||||
|
||||
await expect(provider.listSessions()).rejects.toBeInstanceOf(ServiceUnavailableException);
|
||||
await expect(provider.listSessions()).rejects.toThrow("gateway unavailable");
|
||||
});
|
||||
|
||||
it("handles bad token decryption errors", async () => {
|
||||
encryptionService.decryptIfNeeded.mockImplementation(() => {
|
||||
throw new Error("bad token");
|
||||
});
|
||||
|
||||
await expect(provider.listSessions()).rejects.toBeInstanceOf(ServiceUnavailableException);
|
||||
await expect(provider.listSessions()).rejects.toThrow("Failed to decrypt API token");
|
||||
});
|
||||
|
||||
it("handles malformed SSE stream responses", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
httpService.axiosRef.get.mockResolvedValue({
|
||||
data: {
|
||||
malformed: true,
|
||||
},
|
||||
});
|
||||
|
||||
const streamPromise = collectMessages(provider.streamMessages("session-malformed"));
|
||||
const rejection = expect(streamPromise).rejects.toThrow(
|
||||
"OpenClaw provider openclaw-home failed to stream messages for session session-malformed"
|
||||
);
|
||||
|
||||
for (let attempt = 0; attempt < 5; attempt += 1) {
|
||||
await vi.advanceTimersByTimeAsync(2000);
|
||||
}
|
||||
|
||||
await rejection;
|
||||
expect(httpService.axiosRef.get).toHaveBeenCalledTimes(6);
|
||||
});
|
||||
});
|
||||
|
||||
async function collectMessages(stream: AsyncIterable<AgentMessage>): Promise<AgentMessage[]> {
|
||||
const messages: AgentMessage[] = [];
|
||||
|
||||
for await (const message of stream) {
|
||||
messages.push(message);
|
||||
}
|
||||
|
||||
return messages;
|
||||
}
|
||||
@@ -1,9 +1,9 @@
|
||||
import type { HttpService } from "@nestjs/axios";
|
||||
import { ServiceUnavailableException } from "@nestjs/common";
|
||||
import type { AgentProviderConfig } from "@prisma/client";
|
||||
import { Readable } from "node:stream";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { EncryptionService } from "../../../security/encryption.service";
|
||||
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
|
||||
import { OpenClawProvider } from "./openclaw.provider";
|
||||
|
||||
describe("OpenClawProvider", () => {
|
||||
@@ -17,6 +17,9 @@ describe("OpenClawProvider", () => {
|
||||
let encryptionService: {
|
||||
decryptIfNeeded: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
let sseBridge: {
|
||||
streamSession: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
const config: AgentProviderConfig = {
|
||||
id: "cfg-openclaw-1",
|
||||
@@ -45,10 +48,15 @@ describe("OpenClawProvider", () => {
|
||||
decryptIfNeeded: vi.fn().mockReturnValue("plain-token"),
|
||||
};
|
||||
|
||||
sseBridge = {
|
||||
streamSession: vi.fn(),
|
||||
};
|
||||
|
||||
provider = new OpenClawProvider(
|
||||
config,
|
||||
encryptionService as unknown as EncryptionService,
|
||||
httpService as unknown as HttpService
|
||||
httpService as unknown as HttpService,
|
||||
sseBridge as unknown as OpenClawSseBridge
|
||||
);
|
||||
});
|
||||
|
||||
@@ -219,41 +227,34 @@ describe("OpenClawProvider", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("parses SSE stream messages", async () => {
|
||||
const stream = Readable.from([
|
||||
'data: {"id":"message-stream","sessionId":"session-stream","role":"assistant","content":"stream hello","timestamp":"2026-03-07T16:00:00.000Z"}\n\n',
|
||||
"data: [DONE]\n\n",
|
||||
]);
|
||||
it("delegates streaming to OpenClawSseBridge", async () => {
|
||||
const streamedMessage = {
|
||||
id: "message-stream",
|
||||
sessionId: "session-stream",
|
||||
role: "assistant",
|
||||
content: "stream hello",
|
||||
timestamp: new Date("2026-03-07T16:00:00.000Z"),
|
||||
};
|
||||
|
||||
httpService.axiosRef.get.mockResolvedValue({
|
||||
data: stream,
|
||||
});
|
||||
sseBridge.streamSession.mockReturnValue(
|
||||
(async function* () {
|
||||
yield streamedMessage;
|
||||
})()
|
||||
);
|
||||
|
||||
const messages: Array<unknown> = [];
|
||||
for await (const message of provider.streamMessages("session-stream")) {
|
||||
messages.push(message);
|
||||
}
|
||||
|
||||
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
|
||||
"https://gateway.example.com/api/sessions/session-stream/stream",
|
||||
expect(sseBridge.streamSession).toHaveBeenCalledWith(
|
||||
"https://gateway.example.com",
|
||||
"session-stream",
|
||||
{
|
||||
headers: {
|
||||
Authorization: "Bearer plain-token",
|
||||
Accept: "text/event-stream",
|
||||
},
|
||||
responseType: "stream",
|
||||
}
|
||||
);
|
||||
|
||||
expect(messages).toEqual([
|
||||
{
|
||||
id: "message-stream",
|
||||
sessionId: "session-stream",
|
||||
role: "assistant",
|
||||
content: "stream hello",
|
||||
timestamp: new Date("2026-03-07T16:00:00.000Z"),
|
||||
},
|
||||
]);
|
||||
expect(messages).toEqual([streamedMessage]);
|
||||
});
|
||||
|
||||
it("throws ServiceUnavailableException for request failures", async () => {
|
||||
|
||||
@@ -12,6 +12,7 @@ import type {
|
||||
import type { AgentProviderConfig } from "@prisma/client";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { EncryptionService } from "../../../security/encryption.service";
|
||||
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
|
||||
|
||||
const DEFAULT_SESSION_LIMIT = 50;
|
||||
const DEFAULT_MESSAGE_LIMIT = 50;
|
||||
@@ -21,7 +22,6 @@ const API_TOKEN_KEYS = ["apiToken", "token", "bearerToken"] as const;
|
||||
const DISPLAY_NAME_KEYS = ["displayName", "label"] as const;
|
||||
|
||||
type JsonRecord = Record<string, unknown>;
|
||||
type AsyncChunkStream = AsyncIterable<string | Uint8Array | Buffer>;
|
||||
|
||||
interface HttpErrorWithResponse {
|
||||
response?: {
|
||||
@@ -38,7 +38,8 @@ export class OpenClawProvider implements IAgentProvider {
|
||||
constructor(
|
||||
private readonly config: AgentProviderConfig,
|
||||
private readonly encryptionService: EncryptionService,
|
||||
private readonly httpService: HttpService
|
||||
private readonly httpService: HttpService,
|
||||
private readonly sseBridge: OpenClawSseBridge
|
||||
) {
|
||||
this.providerId = this.config.name;
|
||||
this.displayName = this.resolveDisplayName();
|
||||
@@ -196,64 +197,7 @@ export class OpenClawProvider implements IAgentProvider {
|
||||
|
||||
async *streamMessages(sessionId: string): AsyncIterable<AgentMessage> {
|
||||
try {
|
||||
const response = await this.httpService.axiosRef.get(
|
||||
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/stream`),
|
||||
{
|
||||
headers: this.authHeaders({
|
||||
Accept: "text/event-stream",
|
||||
}),
|
||||
responseType: "stream",
|
||||
}
|
||||
);
|
||||
|
||||
const stream = this.asAsyncChunkStream(response.data);
|
||||
if (!stream) {
|
||||
throw new Error("OpenClaw stream response is not readable");
|
||||
}
|
||||
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
let streamDone = false;
|
||||
|
||||
for await (const chunk of stream) {
|
||||
const textChunk =
|
||||
typeof chunk === "string" ? chunk : decoder.decode(chunk, { stream: true });
|
||||
buffer += textChunk.replace(/\r\n/gu, "\n");
|
||||
|
||||
const events = buffer.split("\n\n");
|
||||
buffer = events.pop() ?? "";
|
||||
|
||||
for (const event of events) {
|
||||
const data = this.extractSseData(event);
|
||||
if (data === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (data === "[DONE]") {
|
||||
streamDone = true;
|
||||
break;
|
||||
}
|
||||
|
||||
const message = this.toAgentMessage(this.tryParseJson(data) ?? data, sessionId);
|
||||
if (message !== null) {
|
||||
yield message;
|
||||
}
|
||||
}
|
||||
|
||||
if (streamDone) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!streamDone && buffer.trim().length > 0) {
|
||||
const data = this.extractSseData(buffer.trim());
|
||||
if (data !== null && data !== "[DONE]") {
|
||||
const message = this.toAgentMessage(this.tryParseJson(data) ?? data, sessionId);
|
||||
if (message !== null) {
|
||||
yield message;
|
||||
}
|
||||
}
|
||||
}
|
||||
yield* this.sseBridge.streamSession(this.resolveBaseUrl(), sessionId, this.authHeaders());
|
||||
} catch (error) {
|
||||
throw this.toServiceUnavailable(`stream messages for session ${sessionId}`, error);
|
||||
}
|
||||
@@ -631,40 +575,6 @@ export class OpenClawProvider implements IAgentProvider {
|
||||
return new URL(path, `${this.resolveBaseUrl()}/`).toString();
|
||||
}
|
||||
|
||||
private extractSseData(rawEvent: string): string | null {
|
||||
const lines = rawEvent.split("\n");
|
||||
const dataLines = lines
|
||||
.filter((line) => line.startsWith("data:"))
|
||||
.map((line) => line.slice(5).trimStart());
|
||||
|
||||
if (dataLines.length > 0) {
|
||||
return dataLines.join("\n").trim();
|
||||
}
|
||||
|
||||
const trimmed = rawEvent.trim();
|
||||
if (trimmed.startsWith("{") || trimmed.startsWith("[")) {
|
||||
return trimmed;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private tryParseJson(value: string): unknown {
|
||||
try {
|
||||
return JSON.parse(value) as unknown;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private asAsyncChunkStream(value: unknown): AsyncChunkStream | null {
|
||||
if (value !== null && typeof value === "object" && Symbol.asyncIterator in value) {
|
||||
return value as AsyncChunkStream;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private isRecord(value: unknown): value is JsonRecord {
|
||||
return typeof value === "object" && value !== null && !Array.isArray(value);
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import { EncryptionService } from "../../security/encryption.service";
|
||||
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||
import { AgentsModule } from "../agents/agents.module";
|
||||
import { OpenClawProviderFactory } from "./openclaw/openclaw.provider-factory";
|
||||
import { OpenClawSseBridge } from "./openclaw/openclaw-sse.bridge";
|
||||
|
||||
const OPENCLAW_PROVIDER_TYPE = "openclaw";
|
||||
|
||||
@@ -19,7 +20,7 @@ const OPENCLAW_PROVIDER_TYPE = "openclaw";
|
||||
maxRedirects: 5,
|
||||
}),
|
||||
],
|
||||
providers: [EncryptionService, OpenClawProviderFactory],
|
||||
providers: [EncryptionService, OpenClawSseBridge, OpenClawProviderFactory],
|
||||
})
|
||||
export class ProvidersModule implements OnModuleInit {
|
||||
private readonly logger = new Logger(ProvidersModule.name);
|
||||
|
||||
@@ -4,6 +4,6 @@ import { AuthGuard } from "./guards/auth.guard";
|
||||
|
||||
@Module({
|
||||
providers: [OrchestratorApiKeyGuard, AuthGuard],
|
||||
exports: [AuthGuard],
|
||||
exports: [OrchestratorApiKeyGuard, AuthGuard],
|
||||
})
|
||||
export class AuthModule {}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import { createDecipheriv, hkdfSync } from "node:crypto";
|
||||
import { createCipheriv, createDecipheriv, hkdfSync, randomBytes } from "node:crypto";
|
||||
|
||||
const ALGORITHM = "aes-256-gcm";
|
||||
const ENCRYPTED_PREFIX = "enc:";
|
||||
@@ -16,6 +16,27 @@ export class EncryptionService {
|
||||
|
||||
constructor(private readonly configService: ConfigService) {}
|
||||
|
||||
encryptIfNeeded(value: string): string {
|
||||
if (this.isEncrypted(value)) {
|
||||
return value;
|
||||
}
|
||||
|
||||
return this.encrypt(value);
|
||||
}
|
||||
|
||||
encrypt(plaintext: string): string {
|
||||
try {
|
||||
const iv = randomBytes(IV_LENGTH);
|
||||
const cipher = createCipheriv(ALGORITHM, this.getOrCreateKey(), iv);
|
||||
const ciphertext = Buffer.concat([cipher.update(plaintext, "utf8"), cipher.final()]);
|
||||
const authTag = cipher.getAuthTag();
|
||||
const payload = Buffer.concat([iv, ciphertext, authTag]);
|
||||
return `${ENCRYPTED_PREFIX}${payload.toString("base64")}`;
|
||||
} catch {
|
||||
throw new Error("Failed to encrypt value");
|
||||
}
|
||||
}
|
||||
|
||||
decryptIfNeeded(value: string): string {
|
||||
if (!this.isEncrypted(value)) {
|
||||
return value;
|
||||
|
||||
315
apps/orchestrator/tests/integration/ms23-p3-gate.spec.ts
Normal file
315
apps/orchestrator/tests/integration/ms23-p3-gate.spec.ts
Normal file
@@ -0,0 +1,315 @@
|
||||
import type { HttpService } from "@nestjs/axios";
|
||||
import type {
|
||||
AgentMessage,
|
||||
AgentSession,
|
||||
AgentSessionList,
|
||||
IAgentProvider,
|
||||
InjectResult,
|
||||
} from "@mosaic/shared";
|
||||
import type { AgentProviderConfig } from "@prisma/client";
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import type { InternalAgentProvider } from "../../src/api/agents/internal-agent.provider";
|
||||
import { AgentProviderRegistry } from "../../src/api/agents/agent-provider.registry";
|
||||
import { MissionControlController } from "../../src/api/mission-control/mission-control.controller";
|
||||
import { MissionControlService } from "../../src/api/mission-control/mission-control.service";
|
||||
import { OpenClawProviderFactory } from "../../src/api/providers/openclaw/openclaw.provider-factory";
|
||||
import { OpenClawSseBridge } from "../../src/api/providers/openclaw/openclaw-sse.bridge";
|
||||
import { ProvidersModule } from "../../src/api/providers/providers.module";
|
||||
import type { PrismaService } from "../../src/prisma/prisma.service";
|
||||
import type { EncryptionService } from "../../src/security/encryption.service";
|
||||
|
||||
type MockProvider = IAgentProvider & {
|
||||
listSessions: ReturnType<typeof vi.fn>;
|
||||
getSession: ReturnType<typeof vi.fn>;
|
||||
injectMessage: ReturnType<typeof vi.fn>;
|
||||
pauseSession: ReturnType<typeof vi.fn>;
|
||||
killSession: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
|
||||
type MockPrisma = {
|
||||
agentProviderConfig: {
|
||||
create: ReturnType<typeof vi.fn>;
|
||||
findMany: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
operatorAuditLog: {
|
||||
create: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
|
||||
const emptyMessageStream = async function* (): AsyncIterable<AgentMessage> {
|
||||
return;
|
||||
};
|
||||
|
||||
describe("MS23-P3-004 API integration", () => {
|
||||
let controller: MissionControlController;
|
||||
let providersModule: ProvidersModule;
|
||||
let registry: AgentProviderRegistry;
|
||||
let prisma: MockPrisma;
|
||||
let httpService: {
|
||||
axiosRef: {
|
||||
get: ReturnType<typeof vi.fn>;
|
||||
post: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
};
|
||||
|
||||
const gatewayUrl = "https://openclaw-gateway.example.com";
|
||||
const internalSession: AgentSession = {
|
||||
id: "session-internal-1",
|
||||
providerId: "internal",
|
||||
providerType: "internal",
|
||||
status: "active",
|
||||
createdAt: new Date("2026-03-07T16:00:00.000Z"),
|
||||
updatedAt: new Date("2026-03-07T16:02:00.000Z"),
|
||||
};
|
||||
|
||||
const openClawGatewaySession = {
|
||||
id: "session-openclaw-1",
|
||||
status: "running",
|
||||
createdAt: "2026-03-07T16:01:00.000Z",
|
||||
updatedAt: "2026-03-07T16:03:00.000Z",
|
||||
};
|
||||
|
||||
const createInternalProvider = (session: AgentSession): MockProvider => ({
|
||||
providerId: "internal",
|
||||
providerType: "internal",
|
||||
displayName: "Internal",
|
||||
listSessions: vi.fn().mockResolvedValue({ sessions: [session], total: 1 } as AgentSessionList),
|
||||
getSession: vi.fn().mockImplementation(async (sessionId: string) => {
|
||||
return sessionId === session.id ? session : null;
|
||||
}),
|
||||
getMessages: vi.fn().mockResolvedValue([]),
|
||||
injectMessage: vi.fn().mockResolvedValue({ accepted: true } as InjectResult),
|
||||
pauseSession: vi.fn().mockResolvedValue(undefined),
|
||||
resumeSession: vi.fn().mockResolvedValue(undefined),
|
||||
killSession: vi.fn().mockResolvedValue(undefined),
|
||||
streamMessages: vi.fn().mockReturnValue(emptyMessageStream()),
|
||||
isAvailable: vi.fn().mockResolvedValue(true),
|
||||
});
|
||||
|
||||
beforeEach(() => {
|
||||
const providerConfigs: AgentProviderConfig[] = [];
|
||||
|
||||
prisma = {
|
||||
agentProviderConfig: {
|
||||
create: vi.fn().mockImplementation(async (args: { data: Record<string, unknown> }) => {
|
||||
const now = new Date("2026-03-07T15:00:00.000Z");
|
||||
const record: AgentProviderConfig = {
|
||||
id: `cfg-${String(providerConfigs.length + 1)}`,
|
||||
workspaceId: String(args.data.workspaceId),
|
||||
name: String(args.data.name),
|
||||
provider: String(args.data.provider),
|
||||
gatewayUrl: String(args.data.gatewayUrl),
|
||||
credentials: (args.data.credentials ?? {}) as AgentProviderConfig["credentials"],
|
||||
isActive: args.data.isActive !== false,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
};
|
||||
|
||||
providerConfigs.push(record);
|
||||
return record;
|
||||
}),
|
||||
findMany: vi
|
||||
.fn()
|
||||
.mockImplementation(
|
||||
async (args: { where?: { provider?: string; isActive?: boolean } }) => {
|
||||
const where = args.where ?? {};
|
||||
return providerConfigs.filter((config) => {
|
||||
if (where.provider !== undefined && config.provider !== where.provider) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (where.isActive !== undefined && config.isActive !== where.isActive) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
}
|
||||
),
|
||||
},
|
||||
operatorAuditLog: {
|
||||
create: vi.fn().mockResolvedValue(undefined),
|
||||
},
|
||||
};
|
||||
|
||||
httpService = {
|
||||
axiosRef: {
|
||||
get: vi.fn().mockImplementation(async (url: string) => {
|
||||
if (url === `${gatewayUrl}/api/sessions`) {
|
||||
return {
|
||||
data: {
|
||||
sessions: [openClawGatewaySession],
|
||||
total: 1,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
if (url === `${gatewayUrl}/api/sessions/${openClawGatewaySession.id}`) {
|
||||
return {
|
||||
data: {
|
||||
session: openClawGatewaySession,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
throw new Error(`Unexpected GET ${url}`);
|
||||
}),
|
||||
post: vi.fn().mockImplementation(async (url: string) => {
|
||||
if (url.endsWith("/inject")) {
|
||||
return { data: { accepted: true, messageId: "msg-inject-1" } };
|
||||
}
|
||||
|
||||
if (url.endsWith("/pause") || url.endsWith("/kill")) {
|
||||
return { data: {} };
|
||||
}
|
||||
|
||||
throw new Error(`Unexpected POST ${url}`);
|
||||
}),
|
||||
},
|
||||
};
|
||||
|
||||
const internalProvider = createInternalProvider(internalSession);
|
||||
registry = new AgentProviderRegistry(internalProvider as unknown as InternalAgentProvider);
|
||||
registry.onModuleInit();
|
||||
|
||||
const encryptionService = {
|
||||
decryptIfNeeded: vi.fn().mockReturnValue("plain-openclaw-token"),
|
||||
};
|
||||
|
||||
const sseBridge = new OpenClawSseBridge(httpService as unknown as HttpService);
|
||||
const openClawProviderFactory = new OpenClawProviderFactory(
|
||||
encryptionService as unknown as EncryptionService,
|
||||
httpService as unknown as HttpService,
|
||||
sseBridge
|
||||
);
|
||||
|
||||
providersModule = new ProvidersModule(
|
||||
prisma as unknown as PrismaService,
|
||||
registry,
|
||||
openClawProviderFactory
|
||||
);
|
||||
|
||||
const missionControlService = new MissionControlService(
|
||||
registry,
|
||||
prisma as unknown as PrismaService
|
||||
);
|
||||
|
||||
controller = new MissionControlController(missionControlService);
|
||||
});
|
||||
|
||||
it("Phase 3 gate: OpenClaw provider config registered in DB → provider loaded on boot → sessions returned from /api/mission-control/sessions → inject/pause/kill proxied to gateway", async () => {
|
||||
await prisma.agentProviderConfig.create({
|
||||
data: {
|
||||
workspaceId: "workspace-ms23",
|
||||
name: "openclaw-home",
|
||||
provider: "openclaw",
|
||||
gatewayUrl,
|
||||
credentials: {
|
||||
apiToken: "enc:test-openclaw-token",
|
||||
},
|
||||
isActive: true,
|
||||
},
|
||||
});
|
||||
|
||||
await providersModule.onModuleInit();
|
||||
|
||||
// Equivalent to GET /api/mission-control/sessions
|
||||
const sessionsResponse = await controller.listSessions();
|
||||
|
||||
expect(sessionsResponse.sessions.map((session) => session.id)).toEqual([
|
||||
"session-openclaw-1",
|
||||
"session-internal-1",
|
||||
]);
|
||||
expect(sessionsResponse.sessions).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
id: "session-internal-1",
|
||||
providerId: "internal",
|
||||
}),
|
||||
expect.objectContaining({
|
||||
id: "session-openclaw-1",
|
||||
providerId: "openclaw-home",
|
||||
providerType: "openclaw",
|
||||
}),
|
||||
])
|
||||
);
|
||||
|
||||
const operatorRequest = {
|
||||
user: {
|
||||
id: "operator-ms23",
|
||||
},
|
||||
};
|
||||
|
||||
await expect(
|
||||
controller.injectMessage(
|
||||
"session-openclaw-1",
|
||||
{
|
||||
message: "Ship it",
|
||||
},
|
||||
operatorRequest
|
||||
)
|
||||
).resolves.toEqual({ accepted: true, messageId: "msg-inject-1" });
|
||||
|
||||
await expect(controller.pauseSession("session-openclaw-1", operatorRequest)).resolves.toEqual({
|
||||
message: "Session session-openclaw-1 paused",
|
||||
});
|
||||
|
||||
await expect(
|
||||
controller.killSession(
|
||||
"session-openclaw-1",
|
||||
{
|
||||
force: false,
|
||||
},
|
||||
operatorRequest
|
||||
)
|
||||
).resolves.toEqual({ message: "Session session-openclaw-1 killed" });
|
||||
|
||||
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
`${gatewayUrl}/api/sessions/session-openclaw-1/inject`,
|
||||
{ content: "Ship it" },
|
||||
{
|
||||
headers: {
|
||||
Authorization: "Bearer plain-openclaw-token",
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
`${gatewayUrl}/api/sessions/session-openclaw-1/pause`,
|
||||
{},
|
||||
{
|
||||
headers: {
|
||||
Authorization: "Bearer plain-openclaw-token",
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
|
||||
3,
|
||||
`${gatewayUrl}/api/sessions/session-openclaw-1/kill`,
|
||||
{ force: false },
|
||||
{
|
||||
headers: {
|
||||
Authorization: "Bearer plain-openclaw-token",
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
expect(prisma.operatorAuditLog.create).toHaveBeenNthCalledWith(1, {
|
||||
data: {
|
||||
sessionId: "session-openclaw-1",
|
||||
userId: "operator-ms23",
|
||||
provider: "openclaw-home",
|
||||
action: "inject",
|
||||
content: "Ship it",
|
||||
metadata: {
|
||||
payload: {
|
||||
message: "Ship it",
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -4,7 +4,7 @@ export default defineConfig({
|
||||
test: {
|
||||
globals: true,
|
||||
environment: "node",
|
||||
include: ["**/*.e2e-spec.ts"],
|
||||
include: ["tests/integration/**/*.e2e-spec.ts", "tests/integration/**/*.spec.ts"],
|
||||
testTimeout: 30000,
|
||||
},
|
||||
});
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@mosaic/web",
|
||||
"version": "0.0.20",
|
||||
"version": "0.0.23",
|
||||
"private": true,
|
||||
"scripts": {
|
||||
"build": "next build",
|
||||
|
||||
@@ -0,0 +1,528 @@
|
||||
"use client";
|
||||
|
||||
import {
|
||||
useCallback,
|
||||
useEffect,
|
||||
useState,
|
||||
type ChangeEvent,
|
||||
type ReactElement,
|
||||
type SyntheticEvent,
|
||||
} from "react";
|
||||
import { Pencil, Trash2 } from "lucide-react";
|
||||
import { FleetSettingsNav } from "@/components/settings/FleetSettingsNav";
|
||||
import {
|
||||
createAgentProvider,
|
||||
deleteAgentProvider,
|
||||
fetchAgentProviders,
|
||||
updateAgentProvider,
|
||||
type AgentProviderConfig,
|
||||
type CreateAgentProviderRequest,
|
||||
type UpdateAgentProviderRequest,
|
||||
} from "@/lib/api/agent-providers";
|
||||
import {
|
||||
AlertDialog,
|
||||
AlertDialogAction,
|
||||
AlertDialogCancel,
|
||||
AlertDialogContent,
|
||||
AlertDialogDescription,
|
||||
AlertDialogFooter,
|
||||
AlertDialogHeader,
|
||||
AlertDialogTitle,
|
||||
} from "@/components/ui/alert-dialog";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card";
|
||||
import {
|
||||
Dialog,
|
||||
DialogContent,
|
||||
DialogDescription,
|
||||
DialogFooter,
|
||||
DialogHeader,
|
||||
DialogTitle,
|
||||
} from "@/components/ui/dialog";
|
||||
import { Input } from "@/components/ui/input";
|
||||
import { Label } from "@/components/ui/label";
|
||||
import {
|
||||
Select,
|
||||
SelectContent,
|
||||
SelectItem,
|
||||
SelectTrigger,
|
||||
SelectValue,
|
||||
} from "@/components/ui/select";
|
||||
import { Switch } from "@/components/ui/switch";
|
||||
|
||||
interface ProviderFormData {
|
||||
name: string;
|
||||
provider: "openclaw";
|
||||
gatewayUrl: string;
|
||||
apiToken: string;
|
||||
isActive: boolean;
|
||||
}
|
||||
|
||||
const NAME_PATTERN = /^[a-zA-Z0-9-]+$/;
|
||||
|
||||
const INITIAL_FORM: ProviderFormData = {
|
||||
name: "",
|
||||
provider: "openclaw",
|
||||
gatewayUrl: "",
|
||||
apiToken: "",
|
||||
isActive: true,
|
||||
};
|
||||
|
||||
function getErrorMessage(error: unknown, fallback: string): string {
|
||||
if (error instanceof Error && error.message.trim().length > 0) {
|
||||
return error.message;
|
||||
}
|
||||
|
||||
return fallback;
|
||||
}
|
||||
|
||||
function isValidHttpsUrl(value: string): boolean {
|
||||
try {
|
||||
const parsed = new URL(value);
|
||||
return parsed.protocol === "https:";
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function formatCreatedDate(value: string): string {
|
||||
const parsed = new Date(value);
|
||||
if (Number.isNaN(parsed.getTime())) {
|
||||
return "Unknown";
|
||||
}
|
||||
|
||||
return new Intl.DateTimeFormat(undefined, {
|
||||
year: "numeric",
|
||||
month: "short",
|
||||
day: "numeric",
|
||||
}).format(parsed);
|
||||
}
|
||||
|
||||
function validateForm(form: ProviderFormData, isEditing: boolean): string | null {
|
||||
const name = form.name.trim();
|
||||
if (name.length === 0) {
|
||||
return "Name is required.";
|
||||
}
|
||||
|
||||
if (!NAME_PATTERN.test(name)) {
|
||||
return "Name must contain only letters, numbers, and hyphens.";
|
||||
}
|
||||
|
||||
const gatewayUrl = form.gatewayUrl.trim();
|
||||
if (gatewayUrl.length === 0) {
|
||||
return "Gateway URL is required.";
|
||||
}
|
||||
|
||||
if (!isValidHttpsUrl(gatewayUrl)) {
|
||||
return "Gateway URL must be a valid https:// URL.";
|
||||
}
|
||||
|
||||
if (!isEditing && form.apiToken.trim().length === 0) {
|
||||
return "API token is required when creating a provider.";
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
export default function AgentProvidersSettingsPage(): ReactElement {
|
||||
const [providers, setProviders] = useState<AgentProviderConfig[]>([]);
|
||||
const [isLoading, setIsLoading] = useState<boolean>(true);
|
||||
const [isRefreshing, setIsRefreshing] = useState<boolean>(false);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [successMessage, setSuccessMessage] = useState<string | null>(null);
|
||||
|
||||
const [isDialogOpen, setIsDialogOpen] = useState<boolean>(false);
|
||||
const [editingProvider, setEditingProvider] = useState<AgentProviderConfig | null>(null);
|
||||
const [form, setForm] = useState<ProviderFormData>(INITIAL_FORM);
|
||||
const [formError, setFormError] = useState<string | null>(null);
|
||||
const [isSaving, setIsSaving] = useState<boolean>(false);
|
||||
|
||||
const [deleteTarget, setDeleteTarget] = useState<AgentProviderConfig | null>(null);
|
||||
const [isDeleting, setIsDeleting] = useState<boolean>(false);
|
||||
|
||||
const loadProviders = useCallback(async (showLoadingState: boolean): Promise<void> => {
|
||||
if (showLoadingState) {
|
||||
setIsLoading(true);
|
||||
} else {
|
||||
setIsRefreshing(true);
|
||||
}
|
||||
|
||||
try {
|
||||
const data = await fetchAgentProviders();
|
||||
setProviders(data);
|
||||
setError(null);
|
||||
} catch (loadError: unknown) {
|
||||
setError(getErrorMessage(loadError, "Failed to load agent providers."));
|
||||
} finally {
|
||||
setIsLoading(false);
|
||||
setIsRefreshing(false);
|
||||
}
|
||||
}, []);
|
||||
|
||||
useEffect(() => {
|
||||
void loadProviders(true);
|
||||
}, [loadProviders]);
|
||||
|
||||
function openCreateDialog(): void {
|
||||
setEditingProvider(null);
|
||||
setForm(INITIAL_FORM);
|
||||
setFormError(null);
|
||||
setIsDialogOpen(true);
|
||||
}
|
||||
|
||||
function openEditDialog(provider: AgentProviderConfig): void {
|
||||
setEditingProvider(provider);
|
||||
setForm({
|
||||
name: provider.name,
|
||||
provider: "openclaw",
|
||||
gatewayUrl: provider.gatewayUrl,
|
||||
apiToken: "",
|
||||
isActive: provider.isActive,
|
||||
});
|
||||
setFormError(null);
|
||||
setIsDialogOpen(true);
|
||||
}
|
||||
|
||||
function closeDialog(): void {
|
||||
if (isSaving) {
|
||||
return;
|
||||
}
|
||||
|
||||
setIsDialogOpen(false);
|
||||
setEditingProvider(null);
|
||||
setForm(INITIAL_FORM);
|
||||
setFormError(null);
|
||||
}
|
||||
|
||||
async function handleSubmit(event: SyntheticEvent): Promise<void> {
|
||||
event.preventDefault();
|
||||
setFormError(null);
|
||||
setSuccessMessage(null);
|
||||
|
||||
const validationError = validateForm(form, editingProvider !== null);
|
||||
if (validationError !== null) {
|
||||
setFormError(validationError);
|
||||
return;
|
||||
}
|
||||
|
||||
const name = form.name.trim();
|
||||
const gatewayUrl = form.gatewayUrl.trim();
|
||||
const apiToken = form.apiToken.trim();
|
||||
|
||||
try {
|
||||
setIsSaving(true);
|
||||
|
||||
if (editingProvider) {
|
||||
const updatePayload: UpdateAgentProviderRequest = {
|
||||
name,
|
||||
provider: form.provider,
|
||||
gatewayUrl,
|
||||
isActive: form.isActive,
|
||||
};
|
||||
|
||||
if (apiToken.length > 0) {
|
||||
updatePayload.credentials = { apiToken };
|
||||
}
|
||||
|
||||
await updateAgentProvider(editingProvider.id, updatePayload);
|
||||
setSuccessMessage(`Updated provider "${name}".`);
|
||||
} else {
|
||||
const createPayload: CreateAgentProviderRequest = {
|
||||
name,
|
||||
provider: form.provider,
|
||||
gatewayUrl,
|
||||
credentials: { apiToken },
|
||||
isActive: form.isActive,
|
||||
};
|
||||
|
||||
await createAgentProvider(createPayload);
|
||||
setSuccessMessage(`Added provider "${name}".`);
|
||||
}
|
||||
|
||||
setIsDialogOpen(false);
|
||||
setEditingProvider(null);
|
||||
setForm(INITIAL_FORM);
|
||||
await loadProviders(false);
|
||||
} catch (saveError: unknown) {
|
||||
setFormError(getErrorMessage(saveError, "Unable to save agent provider."));
|
||||
} finally {
|
||||
setIsSaving(false);
|
||||
}
|
||||
}
|
||||
|
||||
async function handleDeleteProvider(): Promise<void> {
|
||||
if (!deleteTarget) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
setIsDeleting(true);
|
||||
await deleteAgentProvider(deleteTarget.id);
|
||||
setSuccessMessage(`Deleted provider "${deleteTarget.name}".`);
|
||||
setDeleteTarget(null);
|
||||
await loadProviders(false);
|
||||
} catch (deleteError: unknown) {
|
||||
setError(getErrorMessage(deleteError, "Failed to delete agent provider."));
|
||||
} finally {
|
||||
setIsDeleting(false);
|
||||
}
|
||||
}
|
||||
|
||||
return (
|
||||
<div className="max-w-6xl mx-auto p-6 space-y-6">
|
||||
<div className="space-y-4">
|
||||
<div>
|
||||
<h1 className="text-3xl font-bold">Agent Providers</h1>
|
||||
<p className="text-muted-foreground mt-1">
|
||||
Register OpenClaw gateways and API tokens used for external agent sessions.
|
||||
</p>
|
||||
</div>
|
||||
<FleetSettingsNav />
|
||||
</div>
|
||||
|
||||
<Card>
|
||||
<CardHeader className="flex flex-col gap-3 sm:flex-row sm:items-center sm:justify-between">
|
||||
<div>
|
||||
<CardTitle>OpenClaw Gateways</CardTitle>
|
||||
<CardDescription>
|
||||
Add one or more OpenClaw gateway endpoints and control which ones are active.
|
||||
</CardDescription>
|
||||
</div>
|
||||
<div className="flex items-center gap-2">
|
||||
<Button
|
||||
variant="outline"
|
||||
onClick={() => {
|
||||
void loadProviders(false);
|
||||
}}
|
||||
disabled={isLoading || isRefreshing}
|
||||
>
|
||||
{isRefreshing ? "Refreshing..." : "Refresh"}
|
||||
</Button>
|
||||
<Button onClick={openCreateDialog}>Add Provider</Button>
|
||||
</div>
|
||||
</CardHeader>
|
||||
|
||||
<CardContent className="space-y-3">
|
||||
{error ? (
|
||||
<p className="text-sm text-destructive" role="alert">
|
||||
{error}
|
||||
</p>
|
||||
) : null}
|
||||
|
||||
{successMessage ? <p className="text-sm text-emerald-600">{successMessage}</p> : null}
|
||||
|
||||
{isLoading ? (
|
||||
<p className="text-sm text-muted-foreground">Loading agent providers...</p>
|
||||
) : providers.length === 0 ? (
|
||||
<p className="text-sm text-muted-foreground">
|
||||
No agent providers configured yet. Add one to register an OpenClaw gateway.
|
||||
</p>
|
||||
) : (
|
||||
providers.map((provider) => (
|
||||
<div
|
||||
key={provider.id}
|
||||
className="rounded-lg border p-4 flex flex-col gap-4 md:flex-row md:items-start md:justify-between"
|
||||
>
|
||||
<div className="space-y-2 min-w-0">
|
||||
<div className="flex items-center gap-2 flex-wrap">
|
||||
<p className="font-semibold truncate">{provider.name}</p>
|
||||
<Badge variant={provider.isActive ? "default" : "secondary"}>
|
||||
{provider.isActive ? "Active" : "Inactive"}
|
||||
</Badge>
|
||||
<Badge variant="outline">{provider.provider}</Badge>
|
||||
</div>
|
||||
<p className="text-sm text-muted-foreground break-all">
|
||||
Gateway URL: {provider.gatewayUrl}
|
||||
</p>
|
||||
<p className="text-sm text-muted-foreground">
|
||||
Created: {formatCreatedDate(provider.createdAt)}
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center gap-2">
|
||||
<Button
|
||||
variant="outline"
|
||||
size="sm"
|
||||
onClick={() => {
|
||||
openEditDialog(provider);
|
||||
}}
|
||||
>
|
||||
<Pencil className="h-4 w-4 mr-2" />
|
||||
Edit
|
||||
</Button>
|
||||
<Button
|
||||
variant="destructive"
|
||||
size="sm"
|
||||
onClick={() => {
|
||||
setDeleteTarget(provider);
|
||||
}}
|
||||
>
|
||||
<Trash2 className="h-4 w-4 mr-2" />
|
||||
Delete
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
))
|
||||
)}
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
<Dialog
|
||||
open={isDialogOpen}
|
||||
onOpenChange={(nextOpen) => {
|
||||
if (!nextOpen) {
|
||||
closeDialog();
|
||||
return;
|
||||
}
|
||||
|
||||
setIsDialogOpen(true);
|
||||
}}
|
||||
>
|
||||
<DialogContent>
|
||||
<DialogHeader>
|
||||
<DialogTitle>
|
||||
{editingProvider ? "Edit Agent Provider" : "Add Agent Provider"}
|
||||
</DialogTitle>
|
||||
<DialogDescription>
|
||||
Configure an OpenClaw gateway URL and API token for agent provider registration.
|
||||
</DialogDescription>
|
||||
</DialogHeader>
|
||||
|
||||
<form onSubmit={(event) => void handleSubmit(event)} className="space-y-4">
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="agent-provider-name">Name</Label>
|
||||
<Input
|
||||
id="agent-provider-name"
|
||||
value={form.name}
|
||||
onChange={(event: ChangeEvent<HTMLInputElement>) => {
|
||||
setForm((previous) => ({ ...previous, name: event.target.value }));
|
||||
}}
|
||||
placeholder="openclaw-primary"
|
||||
maxLength={100}
|
||||
disabled={isSaving}
|
||||
required
|
||||
/>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Use letters, numbers, and hyphens only.
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="agent-provider-type">Provider Type</Label>
|
||||
<Select
|
||||
value={form.provider}
|
||||
onValueChange={(value) => {
|
||||
if (value === "openclaw") {
|
||||
setForm((previous) => ({ ...previous, provider: value }));
|
||||
}
|
||||
}}
|
||||
disabled={isSaving}
|
||||
>
|
||||
<SelectTrigger id="agent-provider-type">
|
||||
<SelectValue placeholder="Select provider type" />
|
||||
</SelectTrigger>
|
||||
<SelectContent>
|
||||
<SelectItem value="openclaw">openclaw</SelectItem>
|
||||
</SelectContent>
|
||||
</Select>
|
||||
</div>
|
||||
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="agent-provider-gateway-url">Gateway URL</Label>
|
||||
<Input
|
||||
id="agent-provider-gateway-url"
|
||||
value={form.gatewayUrl}
|
||||
onChange={(event: ChangeEvent<HTMLInputElement>) => {
|
||||
setForm((previous) => ({ ...previous, gatewayUrl: event.target.value }));
|
||||
}}
|
||||
placeholder="https://my-openclaw.example.com"
|
||||
disabled={isSaving}
|
||||
required
|
||||
/>
|
||||
</div>
|
||||
|
||||
<div className="space-y-2">
|
||||
<Label htmlFor="agent-provider-api-token">API Token</Label>
|
||||
<Input
|
||||
id="agent-provider-api-token"
|
||||
type="password"
|
||||
value={form.apiToken}
|
||||
onChange={(event: ChangeEvent<HTMLInputElement>) => {
|
||||
setForm((previous) => ({ ...previous, apiToken: event.target.value }));
|
||||
}}
|
||||
placeholder={
|
||||
editingProvider ? "Leave blank to keep existing token" : "Enter API token"
|
||||
}
|
||||
autoComplete="new-password"
|
||||
disabled={isSaving}
|
||||
/>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
{editingProvider
|
||||
? "Leave blank to keep the currently stored token."
|
||||
: "Required when creating a provider."}
|
||||
</p>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-between rounded-md border px-3 py-2">
|
||||
<div>
|
||||
<Label htmlFor="agent-provider-active">Provider Status</Label>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Inactive providers remain saved but are excluded from routing.
|
||||
</p>
|
||||
</div>
|
||||
<Switch
|
||||
id="agent-provider-active"
|
||||
checked={form.isActive}
|
||||
onCheckedChange={(checked) => {
|
||||
setForm((previous) => ({ ...previous, isActive: checked }));
|
||||
}}
|
||||
disabled={isSaving}
|
||||
/>
|
||||
</div>
|
||||
|
||||
{formError ? (
|
||||
<p className="text-sm text-destructive" role="alert">
|
||||
{formError}
|
||||
</p>
|
||||
) : null}
|
||||
|
||||
<DialogFooter>
|
||||
<Button type="button" variant="outline" onClick={closeDialog} disabled={isSaving}>
|
||||
Cancel
|
||||
</Button>
|
||||
<Button type="submit" disabled={isSaving}>
|
||||
{isSaving ? "Saving..." : editingProvider ? "Save Changes" : "Create Provider"}
|
||||
</Button>
|
||||
</DialogFooter>
|
||||
</form>
|
||||
</DialogContent>
|
||||
</Dialog>
|
||||
|
||||
<AlertDialog
|
||||
open={deleteTarget !== null}
|
||||
onOpenChange={(open) => {
|
||||
if (!open && !isDeleting) {
|
||||
setDeleteTarget(null);
|
||||
}
|
||||
}}
|
||||
>
|
||||
<AlertDialogContent>
|
||||
<AlertDialogHeader>
|
||||
<AlertDialogTitle>Delete Agent Provider</AlertDialogTitle>
|
||||
<AlertDialogDescription>
|
||||
Delete provider "{deleteTarget?.name}"? This permanently removes its gateway and token
|
||||
configuration.
|
||||
</AlertDialogDescription>
|
||||
</AlertDialogHeader>
|
||||
<AlertDialogFooter>
|
||||
<AlertDialogCancel disabled={isDeleting}>Cancel</AlertDialogCancel>
|
||||
<AlertDialogAction onClick={handleDeleteProvider} disabled={isDeleting}>
|
||||
{isDeleting ? "Deleting..." : "Delete Provider"}
|
||||
</AlertDialogAction>
|
||||
</AlertDialogFooter>
|
||||
</AlertDialogContent>
|
||||
</AlertDialog>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@@ -227,6 +227,33 @@ const categories: CategoryConfig[] = [
|
||||
</svg>
|
||||
),
|
||||
},
|
||||
{
|
||||
title: "Agent Providers",
|
||||
description:
|
||||
"Register OpenClaw gateway URLs and API tokens for external agent provider routing.",
|
||||
href: "/settings/agent-providers",
|
||||
accent: "var(--ms-blue-400)",
|
||||
iconBg: "rgba(47, 128, 255, 0.12)",
|
||||
icon: (
|
||||
<svg
|
||||
width="20"
|
||||
height="20"
|
||||
viewBox="0 0 20 20"
|
||||
fill="none"
|
||||
stroke="currentColor"
|
||||
strokeWidth="1.5"
|
||||
strokeLinecap="round"
|
||||
strokeLinejoin="round"
|
||||
aria-hidden="true"
|
||||
>
|
||||
<path d="M4 6.5h12" />
|
||||
<path d="M6.5 10h7" />
|
||||
<path d="M4 13.5h12" />
|
||||
<circle cx="5.5" cy="10" r="1.5" />
|
||||
<circle cx="14.5" cy="10" r="1.5" />
|
||||
</svg>
|
||||
),
|
||||
},
|
||||
{
|
||||
title: "Agent Config",
|
||||
description: "Choose primary and fallback models, plus optional personality/SOUL instructions.",
|
||||
|
||||
93
apps/web/src/app/api/orchestrator/[...path]/route.ts
Normal file
93
apps/web/src/app/api/orchestrator/[...path]/route.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import { type NextRequest, NextResponse } from "next/server";
|
||||
|
||||
const DEFAULT_ORCHESTRATOR_URL = "http://localhost:3001";
|
||||
|
||||
function getOrchestratorUrl(): string {
|
||||
return (
|
||||
process.env.ORCHESTRATOR_URL ??
|
||||
process.env.NEXT_PUBLIC_ORCHESTRATOR_URL ??
|
||||
process.env.NEXT_PUBLIC_API_URL ??
|
||||
DEFAULT_ORCHESTRATOR_URL
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic catch-all proxy for orchestrator API routes.
|
||||
*
|
||||
* Forwards any request to /api/orchestrator/<path> → ORCHESTRATOR_URL/<path>
|
||||
* with the ORCHESTRATOR_API_KEY injected server-side so it never reaches the browser.
|
||||
*
|
||||
* Supports GET, POST, PATCH, DELETE, PUT.
|
||||
*
|
||||
* Example:
|
||||
* GET /api/orchestrator/mission-control/sessions
|
||||
* → GET ORCHESTRATOR_URL/api/mission-control/sessions
|
||||
* POST /api/orchestrator/mission-control/sessions/abc/kill
|
||||
* → POST ORCHESTRATOR_URL/api/mission-control/sessions/abc/kill
|
||||
*/
|
||||
async function proxyToOrchestrator(
|
||||
request: NextRequest,
|
||||
context: { params: Promise<{ path: string[] }> }
|
||||
): Promise<NextResponse> {
|
||||
const orchestratorApiKey = process.env.ORCHESTRATOR_API_KEY;
|
||||
if (!orchestratorApiKey) {
|
||||
return NextResponse.json(
|
||||
{ error: "ORCHESTRATOR_API_KEY is not configured on the web server." },
|
||||
{ status: 503 }
|
||||
);
|
||||
}
|
||||
|
||||
const { path } = await context.params;
|
||||
const upstreamPath = `/${path.join("/")}`;
|
||||
const search = request.nextUrl.search;
|
||||
const upstreamUrl = `${getOrchestratorUrl()}${upstreamPath}${search}`;
|
||||
|
||||
const controller = new AbortController();
|
||||
const timeout = setTimeout(() => {
|
||||
controller.abort();
|
||||
}, 30_000);
|
||||
|
||||
try {
|
||||
const headers: Record<string, string> = {
|
||||
"X-API-Key": orchestratorApiKey,
|
||||
};
|
||||
|
||||
const contentType = request.headers.get("Content-Type");
|
||||
if (contentType) {
|
||||
headers["Content-Type"] = contentType;
|
||||
}
|
||||
|
||||
const hasBody = request.method !== "GET" && request.method !== "HEAD";
|
||||
const body = hasBody ? await request.text() : null;
|
||||
|
||||
const upstream = await fetch(upstreamUrl, {
|
||||
method: request.method,
|
||||
headers,
|
||||
...(body !== null ? { body } : {}),
|
||||
cache: "no-store",
|
||||
signal: controller.signal,
|
||||
});
|
||||
|
||||
const responseText = await upstream.text();
|
||||
return new NextResponse(responseText, {
|
||||
status: upstream.status,
|
||||
headers: {
|
||||
"Content-Type": upstream.headers.get("Content-Type") ?? "application/json",
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
const message =
|
||||
error instanceof Error && error.name === "AbortError"
|
||||
? "Orchestrator request timed out."
|
||||
: "Unable to reach orchestrator.";
|
||||
return NextResponse.json({ error: message }, { status: 502 });
|
||||
} finally {
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
|
||||
export const GET = proxyToOrchestrator;
|
||||
export const POST = proxyToOrchestrator;
|
||||
export const PATCH = proxyToOrchestrator;
|
||||
export const PUT = proxyToOrchestrator;
|
||||
export const DELETE = proxyToOrchestrator;
|
||||
@@ -4,6 +4,7 @@ import { Outfit, Fira_Code } from "next/font/google";
|
||||
import { AuthProvider } from "@/lib/auth/auth-context";
|
||||
import { ErrorBoundary } from "@/components/error-boundary";
|
||||
import { ThemeProvider } from "@/providers/ThemeProvider";
|
||||
import { ReactQueryProvider } from "@/providers/ReactQueryProvider";
|
||||
import "./globals.css";
|
||||
|
||||
export const dynamic = "force-dynamic";
|
||||
@@ -56,9 +57,11 @@ export default function RootLayout({ children }: { children: ReactNode }): React
|
||||
</head>
|
||||
<body>
|
||||
<ThemeProvider>
|
||||
<ReactQueryProvider>
|
||||
<ErrorBoundary>
|
||||
<AuthProvider>{children}</AuthProvider>
|
||||
</ErrorBoundary>
|
||||
</ReactQueryProvider>
|
||||
</ThemeProvider>
|
||||
</body>
|
||||
</html>
|
||||
|
||||
@@ -44,6 +44,25 @@ interface AuditLogResponse {
|
||||
total: number;
|
||||
page: number;
|
||||
pages: number;
|
||||
notice?: string;
|
||||
}
|
||||
|
||||
function createEmptyAuditLogResponse(page: number, notice?: string): AuditLogResponse {
|
||||
return {
|
||||
items: [],
|
||||
total: 0,
|
||||
page,
|
||||
pages: 0,
|
||||
...(notice !== undefined ? { notice } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function isRateLimitError(error: unknown): boolean {
|
||||
if (!(error instanceof Error)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return /429|rate limit|too many requests/i.test(error.message);
|
||||
}
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
@@ -138,7 +157,17 @@ async function fetchAuditLog(
|
||||
params.set("sessionId", normalizedSessionId);
|
||||
}
|
||||
|
||||
return apiGet<AuditLogResponse>(`/api/mission-control/audit-log?${params.toString()}`);
|
||||
try {
|
||||
return await apiGet<AuditLogResponse>(
|
||||
`/api/orchestrator/api/mission-control/audit-log?${params.toString()}`
|
||||
);
|
||||
} catch (error) {
|
||||
if (isRateLimitError(error)) {
|
||||
return createEmptyAuditLogResponse(page, "Rate limited - retrying...");
|
||||
}
|
||||
|
||||
return createEmptyAuditLogResponse(page);
|
||||
}
|
||||
}
|
||||
|
||||
export function AuditLogDrawer({ sessionId, trigger }: AuditLogDrawerProps): React.JSX.Element {
|
||||
@@ -180,11 +209,10 @@ export function AuditLogDrawer({ sessionId, trigger }: AuditLogDrawerProps): Rea
|
||||
const totalItems = auditLogQuery.data?.total ?? 0;
|
||||
const totalPages = auditLogQuery.data?.pages ?? 0;
|
||||
const items = auditLogQuery.data?.items ?? [];
|
||||
const notice = auditLogQuery.data?.notice;
|
||||
|
||||
const canGoPrevious = page > 1;
|
||||
const canGoNext = totalPages > 0 && page < totalPages;
|
||||
const errorMessage =
|
||||
auditLogQuery.error instanceof Error ? auditLogQuery.error.message : "Failed to load audit log";
|
||||
|
||||
return (
|
||||
<Sheet open={open} onOpenChange={setOpen}>
|
||||
@@ -237,10 +265,13 @@ export function AuditLogDrawer({ sessionId, trigger }: AuditLogDrawerProps): Rea
|
||||
Loading audit log...
|
||||
</td>
|
||||
</tr>
|
||||
) : auditLogQuery.error ? (
|
||||
) : notice ? (
|
||||
<tr>
|
||||
<td colSpan={5} className="px-3 py-6 text-center text-sm text-red-500">
|
||||
{errorMessage}
|
||||
<td
|
||||
colSpan={5}
|
||||
className="px-3 py-6 text-center text-sm text-muted-foreground"
|
||||
>
|
||||
{notice}
|
||||
</td>
|
||||
</tr>
|
||||
) : items.length === 0 ? (
|
||||
|
||||
@@ -59,9 +59,12 @@ describe("BargeInInput", (): void => {
|
||||
await user.click(screen.getByRole("button", { name: "Send" }));
|
||||
|
||||
await waitFor((): void => {
|
||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-1/inject", {
|
||||
expect(mockApiPost).toHaveBeenCalledWith(
|
||||
"/api/orchestrator/api/mission-control/sessions/session-1/inject",
|
||||
{
|
||||
content: "execute plan",
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
expect(onSent).toHaveBeenCalledTimes(1);
|
||||
@@ -83,12 +86,18 @@ describe("BargeInInput", (): void => {
|
||||
|
||||
const calls = mockApiPost.mock.calls as [string, unknown?][];
|
||||
|
||||
expect(calls[0]).toEqual(["/api/mission-control/sessions/session-2/pause", undefined]);
|
||||
expect(calls[0]).toEqual([
|
||||
"/api/orchestrator/api/mission-control/sessions/session-2/pause",
|
||||
undefined,
|
||||
]);
|
||||
expect(calls[1]).toEqual([
|
||||
"/api/mission-control/sessions/session-2/inject",
|
||||
"/api/orchestrator/api/mission-control/sessions/session-2/inject",
|
||||
{ content: "hello world" },
|
||||
]);
|
||||
expect(calls[2]).toEqual(["/api/mission-control/sessions/session-2/resume", undefined]);
|
||||
expect(calls[2]).toEqual([
|
||||
"/api/orchestrator/api/mission-control/sessions/session-2/resume",
|
||||
undefined,
|
||||
]);
|
||||
});
|
||||
|
||||
it("submits with Enter and does not submit on Shift+Enter", async (): Promise<void> => {
|
||||
@@ -105,9 +114,12 @@ describe("BargeInInput", (): void => {
|
||||
fireEvent.keyDown(textarea, { key: "Enter", code: "Enter", shiftKey: false });
|
||||
|
||||
await waitFor((): void => {
|
||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-3/inject", {
|
||||
expect(mockApiPost).toHaveBeenCalledWith(
|
||||
"/api/orchestrator/api/mission-control/sessions/session-3/inject",
|
||||
{
|
||||
content: "first",
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -39,7 +39,7 @@ export function BargeInInput({ sessionId, onSent }: BargeInInputProps): React.JS
|
||||
}
|
||||
|
||||
const encodedSessionId = encodeURIComponent(sessionId);
|
||||
const baseEndpoint = `/api/mission-control/sessions/${encodedSessionId}`;
|
||||
const baseEndpoint = `/api/orchestrator/api/mission-control/sessions/${encodedSessionId}`;
|
||||
let didPause = false;
|
||||
let didInject = false;
|
||||
|
||||
|
||||
@@ -177,9 +177,12 @@ describe("GlobalAgentRoster", (): void => {
|
||||
fireEvent.click(screen.getByRole("button", { name: "Kill session killme12" }));
|
||||
|
||||
await waitFor((): void => {
|
||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/killme123456/kill", {
|
||||
expect(mockApiPost).toHaveBeenCalledWith(
|
||||
"/api/orchestrator/api/mission-control/sessions/killme123456/kill",
|
||||
{
|
||||
force: false,
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -83,7 +83,7 @@ function groupByProvider(sessions: MissionControlSession[]): ProviderSessionGrou
|
||||
|
||||
async function fetchSessions(): Promise<MissionControlSession[]> {
|
||||
const payload = await apiGet<MissionControlSession[] | SessionsPayload>(
|
||||
"/api/mission-control/sessions"
|
||||
"/api/orchestrator/api/mission-control/sessions"
|
||||
);
|
||||
return Array.isArray(payload) ? payload : payload.sessions;
|
||||
}
|
||||
@@ -118,9 +118,12 @@ export function GlobalAgentRoster({
|
||||
|
||||
const killMutation = useMutation({
|
||||
mutationFn: async (sessionId: string): Promise<string> => {
|
||||
await apiPost<{ message: string }>(`/api/mission-control/sessions/${sessionId}/kill`, {
|
||||
await apiPost<{ message: string }>(
|
||||
`/api/orchestrator/api/mission-control/sessions/${sessionId}/kill`,
|
||||
{
|
||||
force: false,
|
||||
});
|
||||
}
|
||||
);
|
||||
return sessionId;
|
||||
},
|
||||
onSuccess: (): void => {
|
||||
|
||||
@@ -112,14 +112,20 @@ describe("KillAllDialog", (): void => {
|
||||
await user.click(screen.getByRole("button", { name: "Kill All Agents" }));
|
||||
|
||||
await waitFor((): void => {
|
||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/internal-1/kill", {
|
||||
expect(mockApiPost).toHaveBeenCalledWith(
|
||||
"/api/orchestrator/api/mission-control/sessions/internal-1/kill",
|
||||
{
|
||||
force: true,
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
expect(mockApiPost).not.toHaveBeenCalledWith("/api/mission-control/sessions/external-1/kill", {
|
||||
expect(mockApiPost).not.toHaveBeenCalledWith(
|
||||
"/api/orchestrator/api/mission-control/sessions/external-1/kill",
|
||||
{
|
||||
force: true,
|
||||
});
|
||||
}
|
||||
);
|
||||
expect(onComplete).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
@@ -141,12 +147,18 @@ describe("KillAllDialog", (): void => {
|
||||
await user.click(screen.getByRole("button", { name: "Kill All Agents" }));
|
||||
|
||||
await waitFor((): void => {
|
||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/internal-2/kill", {
|
||||
expect(mockApiPost).toHaveBeenCalledWith(
|
||||
"/api/orchestrator/api/mission-control/sessions/internal-2/kill",
|
||||
{
|
||||
force: true,
|
||||
});
|
||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/external-2/kill", {
|
||||
}
|
||||
);
|
||||
expect(mockApiPost).toHaveBeenCalledWith(
|
||||
"/api/orchestrator/api/mission-control/sessions/external-2/kill",
|
||||
{
|
||||
force: true,
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -96,9 +96,12 @@ export function KillAllDialog({ sessions, onComplete }: KillAllDialogProps): Rea
|
||||
|
||||
const killRequests = targetSessions.map(async (session) => {
|
||||
try {
|
||||
await apiPost<{ message: string }>(`/api/mission-control/sessions/${session.id}/kill`, {
|
||||
await apiPost<{ message: string }>(
|
||||
`/api/orchestrator/api/mission-control/sessions/${session.id}/kill`,
|
||||
{
|
||||
force: true,
|
||||
});
|
||||
}
|
||||
);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
|
||||
@@ -89,7 +89,7 @@ describe("PanelControls", (): void => {
|
||||
|
||||
await waitFor((): void => {
|
||||
expect(mockApiPost).toHaveBeenCalledWith(
|
||||
"/api/mission-control/sessions/session%20with%20space/pause",
|
||||
"/api/orchestrator/api/mission-control/sessions/session%20with%20space/pause",
|
||||
undefined
|
||||
);
|
||||
});
|
||||
@@ -114,9 +114,12 @@ describe("PanelControls", (): void => {
|
||||
await user.click(screen.getByRole("button", { name: "Confirm" }));
|
||||
|
||||
await waitFor((): void => {
|
||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-4/kill", {
|
||||
expect(mockApiPost).toHaveBeenCalledWith(
|
||||
"/api/orchestrator/api/mission-control/sessions/session-4/kill",
|
||||
{
|
||||
force: false,
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
expect(onStatusChange).toHaveBeenCalledWith("killed");
|
||||
@@ -137,9 +140,12 @@ describe("PanelControls", (): void => {
|
||||
await user.click(screen.getByRole("button", { name: "Confirm" }));
|
||||
|
||||
await waitFor((): void => {
|
||||
expect(mockApiPost).toHaveBeenCalledWith("/api/mission-control/sessions/session-5/kill", {
|
||||
expect(mockApiPost).toHaveBeenCalledWith(
|
||||
"/api/orchestrator/api/mission-control/sessions/session-5/kill",
|
||||
{
|
||||
force: true,
|
||||
});
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
expect(onStatusChange).toHaveBeenCalledWith("killed");
|
||||
|
||||
@@ -50,23 +50,23 @@ export function PanelControls({
|
||||
switch (action) {
|
||||
case "pause":
|
||||
await apiPost<{ message: string }>(
|
||||
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/pause`
|
||||
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/pause`
|
||||
);
|
||||
return { nextStatus: "paused" };
|
||||
case "resume":
|
||||
await apiPost<{ message: string }>(
|
||||
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/resume`
|
||||
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/resume`
|
||||
);
|
||||
return { nextStatus: "active" };
|
||||
case "graceful-kill":
|
||||
await apiPost<{ message: string }>(
|
||||
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
|
||||
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
|
||||
{ force: false }
|
||||
);
|
||||
return { nextStatus: "killed" };
|
||||
case "force-kill":
|
||||
await apiPost<{ message: string }>(
|
||||
`/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
|
||||
`/api/orchestrator/api/mission-control/sessions/${encodeURIComponent(sessionId)}/kill`,
|
||||
{ force: true }
|
||||
);
|
||||
return { nextStatus: "killed" };
|
||||
|
||||
@@ -11,6 +11,7 @@ interface FleetSettingsLink {
|
||||
|
||||
const FLEET_SETTINGS_LINKS: FleetSettingsLink[] = [
|
||||
{ href: "/settings/providers", label: "Providers" },
|
||||
{ href: "/settings/agent-providers", label: "Agent Providers" },
|
||||
{ href: "/settings/agent-config", label: "Agent Config" },
|
||||
{ href: "/settings/auth", label: "Authentication" },
|
||||
];
|
||||
|
||||
79
apps/web/src/lib/api/agent-providers.test.ts
Normal file
79
apps/web/src/lib/api/agent-providers.test.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import * as client from "./client";
|
||||
import {
|
||||
createAgentProvider,
|
||||
deleteAgentProvider,
|
||||
fetchAgentProviders,
|
||||
updateAgentProvider,
|
||||
} from "./agent-providers";
|
||||
|
||||
vi.mock("./client");
|
||||
|
||||
beforeEach((): void => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("fetchAgentProviders", (): void => {
|
||||
it("calls provider list endpoint", async (): Promise<void> => {
|
||||
vi.mocked(client.apiGet).mockResolvedValueOnce([] as never);
|
||||
|
||||
await fetchAgentProviders();
|
||||
|
||||
expect(client.apiGet).toHaveBeenCalledWith("/api/agent-providers");
|
||||
});
|
||||
});
|
||||
|
||||
describe("createAgentProvider", (): void => {
|
||||
it("posts create payload", async (): Promise<void> => {
|
||||
vi.mocked(client.apiPost).mockResolvedValueOnce({ id: "provider-1" } as never);
|
||||
|
||||
await createAgentProvider({
|
||||
name: "openclaw-primary",
|
||||
provider: "openclaw",
|
||||
gatewayUrl: "https://openclaw.example.com",
|
||||
credentials: {
|
||||
apiToken: "top-secret",
|
||||
},
|
||||
isActive: true,
|
||||
});
|
||||
|
||||
expect(client.apiPost).toHaveBeenCalledWith("/api/agent-providers", {
|
||||
name: "openclaw-primary",
|
||||
provider: "openclaw",
|
||||
gatewayUrl: "https://openclaw.example.com",
|
||||
credentials: {
|
||||
apiToken: "top-secret",
|
||||
},
|
||||
isActive: true,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("updateAgentProvider", (): void => {
|
||||
it("sends PUT request with update payload", async (): Promise<void> => {
|
||||
vi.mocked(client.apiRequest).mockResolvedValueOnce({ id: "provider-1" } as never);
|
||||
|
||||
await updateAgentProvider("provider-1", {
|
||||
gatewayUrl: "https://new-openclaw.example.com",
|
||||
isActive: false,
|
||||
});
|
||||
|
||||
expect(client.apiRequest).toHaveBeenCalledWith("/api/agent-providers/provider-1", {
|
||||
method: "PUT",
|
||||
body: JSON.stringify({
|
||||
gatewayUrl: "https://new-openclaw.example.com",
|
||||
isActive: false,
|
||||
}),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("deleteAgentProvider", (): void => {
|
||||
it("calls delete endpoint", async (): Promise<void> => {
|
||||
vi.mocked(client.apiDelete).mockResolvedValueOnce(undefined as never);
|
||||
|
||||
await deleteAgentProvider("provider-1");
|
||||
|
||||
expect(client.apiDelete).toHaveBeenCalledWith("/api/agent-providers/provider-1");
|
||||
});
|
||||
});
|
||||
61
apps/web/src/lib/api/agent-providers.ts
Normal file
61
apps/web/src/lib/api/agent-providers.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
import { apiDelete, apiGet, apiPost, apiRequest } from "./client";
|
||||
|
||||
export type AgentProviderType = "openclaw";
|
||||
|
||||
export interface AgentProviderCredentials {
|
||||
apiToken?: string;
|
||||
}
|
||||
|
||||
export interface AgentProviderConfig {
|
||||
id: string;
|
||||
workspaceId: string;
|
||||
name: string;
|
||||
provider: AgentProviderType;
|
||||
gatewayUrl: string;
|
||||
credentials: AgentProviderCredentials | null;
|
||||
isActive: boolean;
|
||||
createdAt: string;
|
||||
updatedAt: string;
|
||||
}
|
||||
|
||||
export interface CreateAgentProviderRequest {
|
||||
name: string;
|
||||
provider: AgentProviderType;
|
||||
gatewayUrl: string;
|
||||
credentials: {
|
||||
apiToken: string;
|
||||
};
|
||||
isActive: boolean;
|
||||
}
|
||||
|
||||
export interface UpdateAgentProviderRequest {
|
||||
name?: string;
|
||||
provider?: AgentProviderType;
|
||||
gatewayUrl?: string;
|
||||
credentials?: AgentProviderCredentials;
|
||||
isActive?: boolean;
|
||||
}
|
||||
|
||||
export async function fetchAgentProviders(): Promise<AgentProviderConfig[]> {
|
||||
return apiGet<AgentProviderConfig[]>("/api/agent-providers");
|
||||
}
|
||||
|
||||
export async function createAgentProvider(
|
||||
data: CreateAgentProviderRequest
|
||||
): Promise<AgentProviderConfig> {
|
||||
return apiPost<AgentProviderConfig>("/api/agent-providers", data);
|
||||
}
|
||||
|
||||
export async function updateAgentProvider(
|
||||
providerId: string,
|
||||
data: UpdateAgentProviderRequest
|
||||
): Promise<AgentProviderConfig> {
|
||||
return apiRequest<AgentProviderConfig>(`/api/agent-providers/${providerId}`, {
|
||||
method: "PUT",
|
||||
body: JSON.stringify(data),
|
||||
});
|
||||
}
|
||||
|
||||
export async function deleteAgentProvider(providerId: string): Promise<void> {
|
||||
await apiDelete<unknown>(`/api/agent-providers/${providerId}`);
|
||||
}
|
||||
@@ -18,4 +18,5 @@ export * from "./projects";
|
||||
export * from "./workspaces";
|
||||
export * from "./admin";
|
||||
export * from "./fleet-settings";
|
||||
export * from "./agent-providers";
|
||||
export * from "./activity";
|
||||
|
||||
28
apps/web/src/providers/ReactQueryProvider.tsx
Normal file
28
apps/web/src/providers/ReactQueryProvider.tsx
Normal file
@@ -0,0 +1,28 @@
|
||||
"use client";
|
||||
|
||||
import { QueryClient, QueryClientProvider } from "@tanstack/react-query";
|
||||
import { useState, type ReactNode } from "react";
|
||||
|
||||
interface ReactQueryProviderProps {
|
||||
children: ReactNode;
|
||||
}
|
||||
|
||||
export function ReactQueryProvider({ children }: ReactQueryProviderProps): React.JSX.Element {
|
||||
// Create a stable QueryClient per component mount (one per app session)
|
||||
const [queryClient] = useState(
|
||||
() =>
|
||||
new QueryClient({
|
||||
defaultOptions: {
|
||||
queries: {
|
||||
// Don't refetch on window focus in a dashboard context
|
||||
refetchOnWindowFocus: false,
|
||||
// Stale time of 30s — short enough for live data, avoids hammering
|
||||
staleTime: 30_000,
|
||||
retry: 1,
|
||||
},
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
return <QueryClientProvider client={queryClient}>{children}</QueryClientProvider>;
|
||||
}
|
||||
@@ -316,6 +316,8 @@ services:
|
||||
SANDBOX_ENABLED: "true"
|
||||
# API key for authenticating requests from the web proxy
|
||||
ORCHESTRATOR_API_KEY: ${ORCHESTRATOR_API_KEY}
|
||||
# Prisma database connection (uses the shared openbrain postgres)
|
||||
DATABASE_URL: postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@openbrain_brain-db:5432/${POSTGRES_DB:-mosaic}
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock:ro
|
||||
- orchestrator_workspace:/workspace
|
||||
@@ -331,6 +333,7 @@ services:
|
||||
start_period: 40s
|
||||
networks:
|
||||
- internal
|
||||
- openbrain-brain-internal
|
||||
cap_drop:
|
||||
- ALL
|
||||
cap_add:
|
||||
@@ -403,6 +406,7 @@ services:
|
||||
networks:
|
||||
- internal
|
||||
- traefik-public
|
||||
- openbrain-brain-internal
|
||||
deploy:
|
||||
restart_policy:
|
||||
condition: on-failure
|
||||
|
||||
@@ -62,6 +62,39 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Updated Makefile with Traefik deployment shortcuts
|
||||
- Enhanced docker-compose.override.yml.example with Traefik examples
|
||||
|
||||
## [0.0.23] - 2026-03-07
|
||||
|
||||
### Added
|
||||
|
||||
- **Mission Control Dashboard** — real-time agent orchestration UI at `/mission-control`
|
||||
- Live SSE message streams per agent (`OrchestratorPanel`)
|
||||
- Barge-in input with optional pause-before-send
|
||||
- Pause / Resume / Graceful Kill / Force Kill controls per agent panel
|
||||
- Global agent roster sidebar with tree view and per-agent kill
|
||||
- KillAllDialog with scope selector (requires typing `KILL ALL` to confirm)
|
||||
- AuditLogDrawer with paginated operator action history
|
||||
- Responsive panel grid: up to 6 panels, add/remove, full-screen expand
|
||||
- **Agent Provider Interface** — extensible `IAgentProvider` plugin system
|
||||
- `InternalAgentProvider` wrapping existing orchestrator services
|
||||
- `AgentProviderRegistry` aggregating sessions across providers
|
||||
- `AgentProviderConfig` CRUD API (`/api/agent-providers`)
|
||||
- Mission Control proxy API (`/api/mission-control/*`) with SSE proxying and audit log
|
||||
- **OpenClaw Provider Adapter** — connect external OpenClaw instances
|
||||
- `OpenClawProvider` implementing `IAgentProvider` against OpenClaw REST API
|
||||
- Dedicated `OpenClawSseBridge` with retry logic (5 retries, 2s backoff)
|
||||
- Provider config UI in Settings for registering OpenClaw gateways
|
||||
- Tokens encrypted at rest via `EncryptionService` (AES-256-GCM)
|
||||
- **OperatorAuditLog** — every inject/pause/resume/kill persisted to DB
|
||||
|
||||
### Changed
|
||||
|
||||
- Orchestrator app: extended with `AgentsModule` exports for provider registry
|
||||
- Settings navigation: added "Agent Providers" section
|
||||
|
||||
### Fixed
|
||||
|
||||
- Flaky web tests: async query timing in Kanban and OnboardingWizard tests
|
||||
|
||||
## [0.0.1] - 2026-01-28
|
||||
|
||||
### Added
|
||||
@@ -79,5 +112,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Documentation structure (Bookstack-compatible hierarchy)
|
||||
- Development workflow and coding standards
|
||||
|
||||
[Unreleased]: https://git.mosaicstack.dev/mosaic/stack/compare/v0.0.1...HEAD
|
||||
[Unreleased]: https://git.mosaicstack.dev/mosaic/stack/compare/v0.0.23...HEAD
|
||||
[0.0.23]: https://git.mosaicstack.dev/mosaic/stack/releases/tag/v0.0.23
|
||||
[0.0.1]: https://git.mosaicstack.dev/mosaic/stack/releases/tag/v0.0.1
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Mosaic Stack Roadmap
|
||||
|
||||
**Last Updated:** 2026-01-29
|
||||
**Last Updated:** 2026-03-07
|
||||
**Authoritative Source:** [Issues & Milestones](https://git.mosaicstack.dev/mosaic/stack/issues)
|
||||
|
||||
## Versioning Policy
|
||||
@@ -12,6 +12,20 @@
|
||||
| `0.x.y` | Pre-stable iteration, API may change with notice |
|
||||
| `1.0.0` | Stable release, public API contract |
|
||||
|
||||
## Release Track (Current)
|
||||
|
||||
### ✅ v0.0.23 — Mission Control Dashboard (Complete)
|
||||
|
||||
- Mission Control dashboard shipped at `/mission-control`
|
||||
- Agent provider plugin system and Mission Control proxy API shipped
|
||||
- OpenClaw provider adapter shipped with encrypted token storage
|
||||
- Operator audit logging persisted for inject/pause/resume/kill actions
|
||||
|
||||
### 📋 v0.0.24 — Placeholder
|
||||
|
||||
- Scope TBD (to be defined after v0.0.23 production deployment)
|
||||
- Initial release notes and roadmap breakdown pending
|
||||
|
||||
---
|
||||
|
||||
## Milestone Overview
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "mosaic-stack",
|
||||
"version": "0.0.20",
|
||||
"version": "0.0.23",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"packageManager": "pnpm@10.19.0",
|
||||
|
||||
5
pnpm-lock.yaml
generated
5
pnpm-lock.yaml
generated
@@ -162,6 +162,9 @@ importers:
|
||||
bullmq:
|
||||
specifier: ^5.67.2
|
||||
version: 5.67.2
|
||||
chokidar:
|
||||
specifier: ^4.0.3
|
||||
version: 4.0.3
|
||||
class-transformer:
|
||||
specifier: ^0.5.1
|
||||
version: 0.5.1
|
||||
@@ -1625,7 +1628,6 @@ packages:
|
||||
|
||||
'@mosaicstack/telemetry-client@0.1.1':
|
||||
resolution: {integrity: sha512-1udg6p4cs8rhQgQ2pKCfi7EpRlJieRRhA5CIqthRQ6HQZLgQ0wH+632jEulov3rlHSM1iplIQ+AAe5DWrvSkEA==, tarball: https://git.mosaicstack.dev/api/packages/mosaic/npm/%40mosaicstack%2Ftelemetry-client/-/0.1.1/telemetry-client-0.1.1.tgz}
|
||||
engines: {node: '>=18'}
|
||||
|
||||
'@mrleebo/prisma-ast@0.13.1':
|
||||
resolution: {integrity: sha512-XyroGQXcHrZdvmrGJvsA9KNeOOgGMg1Vg9OlheUsBOSKznLMDl+YChxbkboRHvtFYJEMRYmlV3uoo/njCw05iw==}
|
||||
@@ -5176,6 +5178,7 @@ packages:
|
||||
|
||||
glob@10.5.0:
|
||||
resolution: {integrity: sha512-DfXN8DfhJ7NH3Oe7cFmu3NCu1wKbkReJ8TorzSAFbSKrlNaQSKfIzqYqVY8zlbs2NLBbWpRiU52GX2PbaBVNkg==}
|
||||
deprecated: Old versions of glob are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me
|
||||
hasBin: true
|
||||
|
||||
glob@13.0.0:
|
||||
|
||||
Reference in New Issue
Block a user