diff --git a/apps/orchestrator/package.json b/apps/orchestrator/package.json index d0e7681..8889d7a 100644 --- a/apps/orchestrator/package.json +++ b/apps/orchestrator/package.json @@ -22,6 +22,7 @@ "@anthropic-ai/sdk": "^0.72.1", "@mosaic/config": "workspace:*", "@mosaic/shared": "workspace:*", + "@nestjs/axios": "^4.0.1", "@nestjs/bullmq": "^11.0.4", "@nestjs/common": "^11.1.12", "@nestjs/config": "^4.0.2", diff --git a/apps/orchestrator/src/api/providers/openclaw/openclaw.provider-factory.ts b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider-factory.ts new file mode 100644 index 0000000..4af3579 --- /dev/null +++ b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider-factory.ts @@ -0,0 +1,17 @@ +import { HttpService } from "@nestjs/axios"; +import { Injectable } from "@nestjs/common"; +import type { AgentProviderConfig } from "@prisma/client"; +import { EncryptionService } from "../../../security/encryption.service"; +import { OpenClawProvider } from "./openclaw.provider"; + +@Injectable() +export class OpenClawProviderFactory { + constructor( + private readonly encryptionService: EncryptionService, + private readonly httpService: HttpService + ) {} + + createProvider(config: AgentProviderConfig): OpenClawProvider { + return new OpenClawProvider(config, this.encryptionService, this.httpService); + } +} diff --git a/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.spec.ts b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.spec.ts new file mode 100644 index 0000000..7b56758 --- /dev/null +++ b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.spec.ts @@ -0,0 +1,270 @@ +import type { HttpService } from "@nestjs/axios"; +import { ServiceUnavailableException } from "@nestjs/common"; +import type { AgentProviderConfig } from "@prisma/client"; +import { Readable } from "node:stream"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { EncryptionService } from "../../../security/encryption.service"; +import { OpenClawProvider } from "./openclaw.provider"; + +describe("OpenClawProvider", () => { + let provider: OpenClawProvider; + let httpService: { + axiosRef: { + get: ReturnType; + post: ReturnType; + }; + }; + let encryptionService: { + decryptIfNeeded: ReturnType; + }; + + const config: AgentProviderConfig = { + id: "cfg-openclaw-1", + workspaceId: "workspace-1", + name: "openclaw-home", + provider: "openclaw", + gatewayUrl: "https://gateway.example.com/", + credentials: { + apiToken: "enc:token-value", + displayName: "Home OpenClaw", + }, + isActive: true, + createdAt: new Date("2026-03-07T15:00:00.000Z"), + updatedAt: new Date("2026-03-07T15:00:00.000Z"), + }; + + beforeEach(() => { + httpService = { + axiosRef: { + get: vi.fn(), + post: vi.fn(), + }, + }; + + encryptionService = { + decryptIfNeeded: vi.fn().mockReturnValue("plain-token"), + }; + + provider = new OpenClawProvider( + config, + encryptionService as unknown as EncryptionService, + httpService as unknown as HttpService + ); + }); + + it("maps listSessions from OpenClaw API", async () => { + httpService.axiosRef.get.mockResolvedValue({ + data: { + sessions: [ + { + id: "session-1", + status: "running", + createdAt: "2026-03-07T15:01:00.000Z", + updatedAt: "2026-03-07T15:02:00.000Z", + }, + ], + total: 1, + cursor: "next-cursor", + }, + }); + + const result = await provider.listSessions("cursor-1", 25); + + expect(httpService.axiosRef.get).toHaveBeenCalledWith( + "https://gateway.example.com/api/sessions", + { + headers: { + Authorization: "Bearer plain-token", + }, + params: { + cursor: "cursor-1", + limit: 25, + }, + } + ); + expect(result).toEqual({ + sessions: [ + { + id: "session-1", + providerId: "openclaw-home", + providerType: "openclaw", + status: "active", + createdAt: new Date("2026-03-07T15:01:00.000Z"), + updatedAt: new Date("2026-03-07T15:02:00.000Z"), + }, + ], + total: 1, + cursor: "next-cursor", + }); + expect(encryptionService.decryptIfNeeded).toHaveBeenCalledWith("enc:token-value"); + }); + + it("returns null from getSession when OpenClaw returns 404", async () => { + httpService.axiosRef.get.mockRejectedValue({ + response: { + status: 404, + }, + }); + + await expect(provider.getSession("missing-session")).resolves.toBeNull(); + }); + + it("maps getMessages response", async () => { + httpService.axiosRef.get.mockResolvedValue({ + data: { + messages: [ + { + id: "message-1", + sessionId: "session-1", + role: "agent", + content: "hello", + timestamp: "2026-03-07T15:03:00.000Z", + metadata: { + tokens: 128, + }, + }, + ], + }, + }); + + const result = await provider.getMessages("session-1", 20, "before-cursor"); + + expect(httpService.axiosRef.get).toHaveBeenCalledWith( + "https://gateway.example.com/api/messages", + { + headers: { + Authorization: "Bearer plain-token", + }, + params: { + sessionId: "session-1", + limit: 20, + before: "before-cursor", + }, + } + ); + expect(result).toEqual([ + { + id: "message-1", + sessionId: "session-1", + role: "assistant", + content: "hello", + timestamp: new Date("2026-03-07T15:03:00.000Z"), + metadata: { + tokens: 128, + }, + }, + ]); + }); + + it("maps inject and control endpoints", async () => { + httpService.axiosRef.post + .mockResolvedValueOnce({ + data: { + accepted: true, + messageId: "message-2", + }, + }) + .mockResolvedValueOnce({ data: {} }) + .mockResolvedValueOnce({ data: {} }) + .mockResolvedValueOnce({ data: {} }); + + await expect(provider.injectMessage("session-1", "barge in")).resolves.toEqual({ + accepted: true, + messageId: "message-2", + }); + + await provider.pauseSession("session-1"); + await provider.resumeSession("session-1"); + await provider.killSession("session-1", false); + + expect(httpService.axiosRef.post).toHaveBeenNthCalledWith( + 1, + "https://gateway.example.com/api/sessions/session-1/inject", + { content: "barge in" }, + { + headers: { + Authorization: "Bearer plain-token", + }, + } + ); + expect(httpService.axiosRef.post).toHaveBeenNthCalledWith( + 2, + "https://gateway.example.com/api/sessions/session-1/pause", + {}, + { + headers: { + Authorization: "Bearer plain-token", + }, + } + ); + expect(httpService.axiosRef.post).toHaveBeenNthCalledWith( + 3, + "https://gateway.example.com/api/sessions/session-1/resume", + {}, + { + headers: { + Authorization: "Bearer plain-token", + }, + } + ); + expect(httpService.axiosRef.post).toHaveBeenNthCalledWith( + 4, + "https://gateway.example.com/api/sessions/session-1/kill", + { force: false }, + { + headers: { + Authorization: "Bearer plain-token", + }, + } + ); + }); + + it("parses SSE stream messages", async () => { + const stream = Readable.from([ + 'data: {"id":"message-stream","sessionId":"session-stream","role":"assistant","content":"stream hello","timestamp":"2026-03-07T16:00:00.000Z"}\n\n', + "data: [DONE]\n\n", + ]); + + httpService.axiosRef.get.mockResolvedValue({ + data: stream, + }); + + const messages: Array = []; + for await (const message of provider.streamMessages("session-stream")) { + messages.push(message); + } + + expect(httpService.axiosRef.get).toHaveBeenCalledWith( + "https://gateway.example.com/api/sessions/session-stream/stream", + { + headers: { + Authorization: "Bearer plain-token", + Accept: "text/event-stream", + }, + responseType: "stream", + } + ); + + expect(messages).toEqual([ + { + id: "message-stream", + sessionId: "session-stream", + role: "assistant", + content: "stream hello", + timestamp: new Date("2026-03-07T16:00:00.000Z"), + }, + ]); + }); + + it("throws ServiceUnavailableException for request failures", async () => { + httpService.axiosRef.get.mockRejectedValue(new Error("gateway unreachable")); + + await expect(provider.listSessions()).rejects.toBeInstanceOf(ServiceUnavailableException); + }); + + it("returns false from isAvailable when gateway check fails", async () => { + httpService.axiosRef.get.mockRejectedValue(new Error("gateway unreachable")); + + await expect(provider.isAvailable()).resolves.toBe(false); + }); +}); diff --git a/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.ts b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.ts new file mode 100644 index 0000000..1836764 --- /dev/null +++ b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.ts @@ -0,0 +1,703 @@ +import { HttpService } from "@nestjs/axios"; +import { Injectable, ServiceUnavailableException } from "@nestjs/common"; +import type { + AgentMessage, + AgentMessageRole, + AgentSession, + AgentSessionList, + AgentSessionStatus, + IAgentProvider, + InjectResult, +} from "@mosaic/shared"; +import type { AgentProviderConfig } from "@prisma/client"; +import { randomUUID } from "node:crypto"; +import { EncryptionService } from "../../../security/encryption.service"; + +const DEFAULT_SESSION_LIMIT = 50; +const DEFAULT_MESSAGE_LIMIT = 50; +const MAX_MESSAGE_LIMIT = 200; +const OPENCLAW_PROVIDER_TYPE = "openclaw"; +const API_TOKEN_KEYS = ["apiToken", "token", "bearerToken"] as const; +const DISPLAY_NAME_KEYS = ["displayName", "label"] as const; + +type JsonRecord = Record; +type AsyncChunkStream = AsyncIterable; + +interface HttpErrorWithResponse { + response?: { + status?: number; + }; +} + +@Injectable() +export class OpenClawProvider implements IAgentProvider { + readonly providerId: string; + readonly providerType = OPENCLAW_PROVIDER_TYPE; + readonly displayName: string; + + constructor( + private readonly config: AgentProviderConfig, + private readonly encryptionService: EncryptionService, + private readonly httpService: HttpService + ) { + this.providerId = this.config.name; + this.displayName = this.resolveDisplayName(); + } + + validateBaseUrl(): void { + void this.resolveBaseUrl(); + } + + validateToken(): void { + void this.resolveApiToken(); + } + + async listSessions(cursor?: string, limit = DEFAULT_SESSION_LIMIT): Promise { + const safeLimit = this.normalizeLimit(limit, DEFAULT_SESSION_LIMIT); + const params: Record = { limit: safeLimit }; + if (typeof cursor === "string" && cursor.length > 0) { + params.cursor = cursor; + } + + try { + const response = await this.httpService.axiosRef.get(this.buildUrl("/api/sessions"), { + headers: this.authHeaders(), + params, + }); + + const page = this.extractSessionPage(response.data); + const sessions = page.records + .map((record) => this.toAgentSession(record)) + .filter((session): session is AgentSession => session !== null); + + return { + sessions, + total: page.total ?? sessions.length, + ...(page.cursor !== undefined ? { cursor: page.cursor } : {}), + }; + } catch (error) { + throw this.toServiceUnavailable("list sessions", error); + } + } + + async getSession(sessionId: string): Promise { + try { + const response = await this.httpService.axiosRef.get( + this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}`), + { + headers: this.authHeaders(), + } + ); + + const payload = this.unwrapContainer(response.data, ["session", "data"]); + return this.toAgentSession(payload); + } catch (error) { + if (this.getHttpStatus(error) === 404) { + return null; + } + + throw this.toServiceUnavailable(`get session ${sessionId}`, error); + } + } + + async getMessages( + sessionId: string, + limit = DEFAULT_MESSAGE_LIMIT, + before?: string + ): Promise { + const safeLimit = this.normalizeLimit(limit, DEFAULT_MESSAGE_LIMIT); + const params: Record = { + sessionId, + limit: safeLimit, + }; + + if (typeof before === "string" && before.length > 0) { + params.before = before; + } + + try { + const response = await this.httpService.axiosRef.get(this.buildUrl("/api/messages"), { + headers: this.authHeaders(), + params, + }); + + return this.extractMessageRecords(response.data) + .map((record) => this.toAgentMessage(record, sessionId)) + .filter((message): message is AgentMessage => message !== null); + } catch (error) { + throw this.toServiceUnavailable(`get messages for session ${sessionId}`, error); + } + } + + async injectMessage(sessionId: string, content: string): Promise { + try { + const response = await this.httpService.axiosRef.post( + this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/inject`), + { content }, + { + headers: this.authHeaders(), + } + ); + + const payload = this.isRecord(response.data) ? response.data : {}; + + return { + accepted: typeof payload.accepted === "boolean" ? payload.accepted : true, + ...(this.readString(payload.messageId) !== undefined + ? { messageId: this.readString(payload.messageId) } + : {}), + }; + } catch (error) { + throw this.toServiceUnavailable(`inject message into session ${sessionId}`, error); + } + } + + async pauseSession(sessionId: string): Promise { + try { + await this.httpService.axiosRef.post( + this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/pause`), + {}, + { + headers: this.authHeaders(), + } + ); + } catch (error) { + throw this.toServiceUnavailable(`pause session ${sessionId}`, error); + } + } + + async resumeSession(sessionId: string): Promise { + try { + await this.httpService.axiosRef.post( + this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/resume`), + {}, + { + headers: this.authHeaders(), + } + ); + } catch (error) { + throw this.toServiceUnavailable(`resume session ${sessionId}`, error); + } + } + + async killSession(sessionId: string, force = true): Promise { + try { + await this.httpService.axiosRef.post( + this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/kill`), + { force }, + { + headers: this.authHeaders(), + } + ); + } catch (error) { + throw this.toServiceUnavailable(`kill session ${sessionId}`, error); + } + } + + async *streamMessages(sessionId: string): AsyncIterable { + try { + const response = await this.httpService.axiosRef.get( + this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/stream`), + { + headers: this.authHeaders({ + Accept: "text/event-stream", + }), + responseType: "stream", + } + ); + + const stream = this.asAsyncChunkStream(response.data); + if (!stream) { + throw new Error("OpenClaw stream response is not readable"); + } + + const decoder = new TextDecoder(); + let buffer = ""; + let streamDone = false; + + for await (const chunk of stream) { + const textChunk = + typeof chunk === "string" ? chunk : decoder.decode(chunk, { stream: true }); + buffer += textChunk.replace(/\r\n/gu, "\n"); + + const events = buffer.split("\n\n"); + buffer = events.pop() ?? ""; + + for (const event of events) { + const data = this.extractSseData(event); + if (data === null) { + continue; + } + + if (data === "[DONE]") { + streamDone = true; + break; + } + + const message = this.toAgentMessage(this.tryParseJson(data) ?? data, sessionId); + if (message !== null) { + yield message; + } + } + + if (streamDone) { + break; + } + } + + if (!streamDone && buffer.trim().length > 0) { + const data = this.extractSseData(buffer.trim()); + if (data !== null && data !== "[DONE]") { + const message = this.toAgentMessage(this.tryParseJson(data) ?? data, sessionId); + if (message !== null) { + yield message; + } + } + } + } catch (error) { + throw this.toServiceUnavailable(`stream messages for session ${sessionId}`, error); + } + } + + async isAvailable(): Promise { + try { + this.validateBaseUrl(); + this.validateToken(); + + await this.httpService.axiosRef.get(this.buildUrl("/api/sessions"), { + headers: this.authHeaders(), + params: { limit: 1 }, + }); + + return true; + } catch { + return false; + } + } + + private extractSessionPage(payload: unknown): { + records: unknown[]; + total?: number; + cursor?: string; + } { + if (Array.isArray(payload)) { + return { + records: payload, + total: payload.length, + }; + } + + if (!this.isRecord(payload)) { + return { + records: [], + }; + } + + let records: unknown[] = []; + if (Array.isArray(payload.sessions)) { + records = payload.sessions; + } else if (Array.isArray(payload.items)) { + records = payload.items; + } else if (Array.isArray(payload.data)) { + records = payload.data; + } + + const total = typeof payload.total === "number" ? payload.total : undefined; + const cursor = this.readString(payload.cursor) ?? this.readString(payload.nextCursor); + + return { + records, + total, + ...(cursor !== undefined ? { cursor } : {}), + }; + } + + private extractMessageRecords(payload: unknown): unknown[] { + if (Array.isArray(payload)) { + return payload; + } + + if (!this.isRecord(payload)) { + return []; + } + + if (Array.isArray(payload.messages)) { + return payload.messages; + } + + if (Array.isArray(payload.items)) { + return payload.items; + } + + if (Array.isArray(payload.data)) { + return payload.data; + } + + return []; + } + + private unwrapContainer(payload: unknown, keys: string[]): unknown { + if (!this.isRecord(payload)) { + return payload; + } + + for (const key of keys) { + if (key in payload) { + return payload[key]; + } + } + + return payload; + } + + private toAgentSession(record: unknown): AgentSession | null { + if (!this.isRecord(record)) { + return null; + } + + const id = + this.readString(record.id) ?? + this.readString(record.sessionId) ?? + this.readString(record.key); + if (!id) { + return null; + } + + const createdAt = this.parseDate(record.createdAt ?? record.spawnedAt ?? record.startedAt); + const updatedAt = this.parseDate( + record.updatedAt ?? record.completedAt ?? record.lastActivityAt ?? record.endedAt, + createdAt + ); + + const label = + this.readString(record.label) ?? + this.readString(record.title) ?? + this.readString(record.name) ?? + undefined; + + const parentSessionId = this.readString(record.parentSessionId) ?? undefined; + const metadata = this.toMetadata(record.metadata); + + return { + id, + providerId: this.providerId, + providerType: this.providerType, + ...(label !== undefined ? { label } : {}), + status: this.toSessionStatus(this.readString(record.status)), + ...(parentSessionId !== undefined ? { parentSessionId } : {}), + createdAt, + updatedAt, + ...(metadata !== undefined ? { metadata } : {}), + }; + } + + private toAgentMessage(value: unknown, fallbackSessionId?: string): AgentMessage | null { + if (typeof value === "string") { + const content = value.trim(); + if (content.length === 0 || fallbackSessionId === undefined) { + return null; + } + + return { + id: randomUUID(), + sessionId: fallbackSessionId, + role: "assistant", + content, + timestamp: new Date(), + }; + } + + let candidate: JsonRecord | null = null; + + if (this.isRecord(value) && this.isRecord(value.message)) { + candidate = value.message; + } else if (this.isRecord(value)) { + candidate = value; + } + + if (candidate === null) { + return null; + } + + const sessionId = this.readString(candidate.sessionId) ?? fallbackSessionId; + if (!sessionId) { + return null; + } + + const content = this.extractMessageContent( + candidate.content ?? candidate.text ?? candidate.message + ); + if (content.length === 0) { + return null; + } + + const metadata = this.toMetadata(candidate.metadata); + + return { + id: this.readString(candidate.id) ?? this.readString(candidate.messageId) ?? randomUUID(), + sessionId, + role: this.toMessageRole(this.readString(candidate.role) ?? this.readString(candidate.type)), + content, + timestamp: this.parseDate(candidate.timestamp ?? candidate.createdAt), + ...(metadata !== undefined ? { metadata } : {}), + }; + } + + private extractMessageContent(content: unknown): string { + if (typeof content === "string") { + return content.trim(); + } + + if (Array.isArray(content)) { + const parts: string[] = []; + + for (const part of content) { + if (typeof part === "string") { + const trimmed = part.trim(); + if (trimmed.length > 0) { + parts.push(trimmed); + } + continue; + } + + if (!this.isRecord(part)) { + continue; + } + + const text = this.readString(part.text) ?? this.readString(part.content); + if (text !== undefined && text.trim().length > 0) { + parts.push(text.trim()); + } + } + + return parts.join("\n\n").trim(); + } + + if (this.isRecord(content)) { + const text = this.readString(content.text) ?? this.readString(content.content); + return text?.trim() ?? ""; + } + + return ""; + } + + private toSessionStatus(status?: string): AgentSessionStatus { + switch (status?.toLowerCase()) { + case "active": + case "running": + return "active"; + case "paused": + return "paused"; + case "completed": + case "done": + case "succeeded": + return "completed"; + case "failed": + case "error": + case "killed": + case "terminated": + case "cancelled": + return "failed"; + case "idle": + case "pending": + case "queued": + default: + return "idle"; + } + } + + private toMessageRole(role?: string): AgentMessageRole { + switch (role?.toLowerCase()) { + case "assistant": + case "agent": + return "assistant"; + case "system": + return "system"; + case "tool": + return "tool"; + case "operator": + case "user": + default: + return "user"; + } + } + + private normalizeLimit(value: number, fallback: number): number { + const normalized = Number.isFinite(value) ? Math.trunc(value) : fallback; + if (normalized < 1) { + return 1; + } + + return Math.min(normalized, MAX_MESSAGE_LIMIT); + } + + private parseDate(value: unknown, fallback = new Date()): Date { + if (value instanceof Date) { + return value; + } + + if (typeof value === "string" || typeof value === "number") { + const parsed = new Date(value); + if (!Number.isNaN(parsed.getTime())) { + return parsed; + } + } + + return fallback; + } + + private toMetadata(value: unknown): Record | undefined { + if (this.isRecord(value)) { + return value; + } + + return undefined; + } + + private resolveDisplayName(): string { + const credentials = this.readCredentials(); + + for (const key of DISPLAY_NAME_KEYS) { + const value = this.readString(credentials[key]); + if (value !== undefined) { + return value; + } + } + + return this.config.name; + } + + private resolveBaseUrl(): string { + const configRecord = this.config as unknown as JsonRecord; + const rawBaseUrl = + this.readString(this.config.gatewayUrl) ?? this.readString(configRecord.baseUrl); + + if (rawBaseUrl === undefined) { + throw new Error(`OpenClaw provider ${this.providerId} is missing gateway URL`); + } + + try { + const parsed = new URL(rawBaseUrl); + return parsed.toString().replace(/\/$/u, ""); + } catch { + throw new Error(`OpenClaw provider ${this.providerId} has invalid gateway URL`); + } + } + + private resolveApiToken(): string { + const configRecord = this.config as unknown as JsonRecord; + const credentials = this.readCredentials(); + + const rawToken = + this.readString(configRecord.apiToken) ?? + this.readString(configRecord.token) ?? + this.readString(configRecord.bearerToken) ?? + this.findFirstString(credentials, API_TOKEN_KEYS); + + if (rawToken === undefined) { + throw new Error(`OpenClaw provider ${this.providerId} is missing apiToken credentials`); + } + + try { + return this.encryptionService.decryptIfNeeded(rawToken); + } catch (error) { + throw new Error(`Failed to decrypt API token: ${this.toErrorMessage(error)}`); + } + } + + private readCredentials(): JsonRecord { + return this.isRecord(this.config.credentials) ? this.config.credentials : {}; + } + + private findFirstString(record: JsonRecord, keys: readonly string[]): string | undefined { + for (const key of keys) { + const value = this.readString(record[key]); + if (value !== undefined) { + return value; + } + } + + return undefined; + } + + private authHeaders(extraHeaders: Record = {}): Record { + return { + Authorization: `Bearer ${this.resolveApiToken()}`, + ...extraHeaders, + }; + } + + private buildUrl(path: string): string { + return new URL(path, `${this.resolveBaseUrl()}/`).toString(); + } + + private extractSseData(rawEvent: string): string | null { + const lines = rawEvent.split("\n"); + const dataLines = lines + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice(5).trimStart()); + + if (dataLines.length > 0) { + return dataLines.join("\n").trim(); + } + + const trimmed = rawEvent.trim(); + if (trimmed.startsWith("{") || trimmed.startsWith("[")) { + return trimmed; + } + + return null; + } + + private tryParseJson(value: string): unknown { + try { + return JSON.parse(value) as unknown; + } catch { + return null; + } + } + + private asAsyncChunkStream(value: unknown): AsyncChunkStream | null { + if (value !== null && typeof value === "object" && Symbol.asyncIterator in value) { + return value as AsyncChunkStream; + } + + return null; + } + + private isRecord(value: unknown): value is JsonRecord { + return typeof value === "object" && value !== null && !Array.isArray(value); + } + + private readString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; + } + + private getHttpStatus(error: unknown): number | undefined { + if (typeof error !== "object" || error === null || !("response" in error)) { + return undefined; + } + + const response = (error as HttpErrorWithResponse).response; + return typeof response?.status === "number" ? response.status : undefined; + } + + private toServiceUnavailable(operation: string, error: unknown): ServiceUnavailableException { + return new ServiceUnavailableException( + `OpenClaw provider ${this.providerId} failed to ${operation}: ${this.toErrorMessage(error)}` + ); + } + + private toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + + return String(error); + } +} diff --git a/apps/orchestrator/src/api/providers/providers.module.spec.ts b/apps/orchestrator/src/api/providers/providers.module.spec.ts new file mode 100644 index 0000000..ee2eb51 --- /dev/null +++ b/apps/orchestrator/src/api/providers/providers.module.spec.ts @@ -0,0 +1,131 @@ +import { Logger } from "@nestjs/common"; +import type { AgentProviderConfig } from "@prisma/client"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { PrismaService } from "../../prisma/prisma.service"; +import { AgentProviderRegistry } from "../agents/agent-provider.registry"; +import { OpenClawProviderFactory } from "./openclaw/openclaw.provider-factory"; +import { ProvidersModule } from "./providers.module"; + +type MockOpenClawProvider = { + providerId: string; + validateBaseUrl: ReturnType; + validateToken: ReturnType; + isAvailable: ReturnType; +}; + +describe("ProvidersModule", () => { + let moduleRef: ProvidersModule; + let prisma: { + agentProviderConfig: { + findMany: ReturnType; + }; + }; + let registry: { + registerProvider: ReturnType; + }; + let factory: { + createProvider: ReturnType; + }; + + const config: AgentProviderConfig = { + id: "cfg-openclaw-1", + workspaceId: "workspace-1", + name: "openclaw-home", + provider: "openclaw", + gatewayUrl: "https://gateway.example.com", + credentials: { apiToken: "enc:token-value" }, + isActive: true, + createdAt: new Date("2026-03-07T15:00:00.000Z"), + updatedAt: new Date("2026-03-07T15:00:00.000Z"), + }; + + beforeEach(() => { + prisma = { + agentProviderConfig: { + findMany: vi.fn(), + }, + }; + + registry = { + registerProvider: vi.fn(), + }; + + factory = { + createProvider: vi.fn(), + }; + + moduleRef = new ProvidersModule( + prisma as unknown as PrismaService, + registry as unknown as AgentProviderRegistry, + factory as unknown as OpenClawProviderFactory + ); + }); + + it("registers reachable OpenClaw providers", async () => { + const provider: MockOpenClawProvider = { + providerId: "openclaw-home", + validateBaseUrl: vi.fn(), + validateToken: vi.fn(), + isAvailable: vi.fn().mockResolvedValue(true), + }; + + prisma.agentProviderConfig.findMany.mockResolvedValue([config]); + factory.createProvider.mockReturnValue(provider); + + await moduleRef.onModuleInit(); + + expect(prisma.agentProviderConfig.findMany).toHaveBeenCalledWith({ + where: { + provider: "openclaw", + isActive: true, + }, + orderBy: [{ createdAt: "asc" }, { id: "asc" }], + }); + expect(factory.createProvider).toHaveBeenCalledWith(config); + expect(provider.validateBaseUrl).toHaveBeenCalledTimes(1); + expect(provider.validateToken).toHaveBeenCalledTimes(1); + expect(provider.isAvailable).toHaveBeenCalledTimes(1); + expect(registry.registerProvider).toHaveBeenCalledWith(provider); + }); + + it("skips provider registration when gateway is unreachable", async () => { + const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined); + const provider: MockOpenClawProvider = { + providerId: "openclaw-home", + validateBaseUrl: vi.fn(), + validateToken: vi.fn(), + isAvailable: vi.fn().mockResolvedValue(false), + }; + + prisma.agentProviderConfig.findMany.mockResolvedValue([config]); + factory.createProvider.mockReturnValue(provider); + + await moduleRef.onModuleInit(); + + expect(registry.registerProvider).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining("Skipping OpenClaw provider openclaw-home") + ); + }); + + it("skips provider registration when token decryption fails", async () => { + const errorSpy = vi.spyOn(Logger.prototype, "error").mockImplementation(() => undefined); + const provider: MockOpenClawProvider = { + providerId: "openclaw-home", + validateBaseUrl: vi.fn(), + validateToken: vi.fn().mockImplementation(() => { + throw new Error("Failed to decrypt API token"); + }), + isAvailable: vi.fn().mockResolvedValue(true), + }; + + prisma.agentProviderConfig.findMany.mockResolvedValue([config]); + factory.createProvider.mockReturnValue(provider); + + await moduleRef.onModuleInit(); + + expect(registry.registerProvider).not.toHaveBeenCalled(); + expect(errorSpy).toHaveBeenCalledWith(expect.stringContaining("token decryption failed")); + expect(provider.isAvailable).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/orchestrator/src/api/providers/providers.module.ts b/apps/orchestrator/src/api/providers/providers.module.ts new file mode 100644 index 0000000..e87c802 --- /dev/null +++ b/apps/orchestrator/src/api/providers/providers.module.ts @@ -0,0 +1,94 @@ +import { HttpModule } from "@nestjs/axios"; +import { Logger, Module, OnModuleInit } from "@nestjs/common"; +import type { AgentProviderConfig } from "@prisma/client"; +import { PrismaModule } from "../../prisma/prisma.module"; +import { PrismaService } from "../../prisma/prisma.service"; +import { EncryptionService } from "../../security/encryption.service"; +import { AgentProviderRegistry } from "../agents/agent-provider.registry"; +import { AgentsModule } from "../agents/agents.module"; +import { OpenClawProviderFactory } from "./openclaw/openclaw.provider-factory"; + +const OPENCLAW_PROVIDER_TYPE = "openclaw"; + +@Module({ + imports: [ + AgentsModule, + PrismaModule, + HttpModule.register({ + timeout: 10000, + maxRedirects: 5, + }), + ], + providers: [EncryptionService, OpenClawProviderFactory], +}) +export class ProvidersModule implements OnModuleInit { + private readonly logger = new Logger(ProvidersModule.name); + + constructor( + private readonly prisma: PrismaService, + private readonly registry: AgentProviderRegistry, + private readonly openClawProviderFactory: OpenClawProviderFactory + ) {} + + async onModuleInit(): Promise { + const configs = await this.prisma.agentProviderConfig.findMany({ + where: { + provider: OPENCLAW_PROVIDER_TYPE, + isActive: true, + }, + orderBy: [{ createdAt: "asc" }, { id: "asc" }], + }); + + for (const config of configs) { + await this.registerProvider(config); + } + } + + private async registerProvider(config: AgentProviderConfig): Promise { + const provider = this.openClawProviderFactory.createProvider(config); + + try { + provider.validateBaseUrl(); + } catch (error) { + this.logger.warn( + `Skipping OpenClaw provider ${config.name}: invalid configuration (${this.toErrorMessage(error)})` + ); + return; + } + + try { + provider.validateToken(); + } catch (error) { + this.logger.error( + `Skipping OpenClaw provider ${config.name}: token decryption failed (${this.toErrorMessage(error)})` + ); + return; + } + + try { + const available = await provider.isAvailable(); + if (!available) { + this.logger.warn( + `Skipping OpenClaw provider ${config.name}: gateway ${config.gatewayUrl} is unreachable` + ); + return; + } + } catch (error) { + this.logger.warn( + `Skipping OpenClaw provider ${config.name}: gateway ${config.gatewayUrl} is unreachable (${this.toErrorMessage(error)})` + ); + return; + } + + this.registry.registerProvider(provider); + this.logger.log(`Registered OpenClaw provider ${provider.providerId}`); + } + + private toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + + return String(error); + } +} diff --git a/apps/orchestrator/src/app.module.ts b/apps/orchestrator/src/app.module.ts index 6aeb7bd..cd87df8 100644 --- a/apps/orchestrator/src/app.module.ts +++ b/apps/orchestrator/src/app.module.ts @@ -7,6 +7,7 @@ import { AgentsModule } from "./api/agents/agents.module"; import { MissionControlModule } from "./api/mission-control/mission-control.module"; import { QueueApiModule } from "./api/queue/queue-api.module"; import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module"; +import { ProvidersModule } from "./api/providers/providers.module"; import { CoordinatorModule } from "./coordinator/coordinator.module"; import { BudgetModule } from "./budget/budget.module"; import { CIModule } from "./ci"; @@ -54,6 +55,7 @@ import { orchestratorConfig } from "./config/orchestrator.config"; HealthModule, AgentsModule, AgentProvidersModule, + ProvidersModule, MissionControlModule, QueueApiModule, CoordinatorModule, diff --git a/apps/orchestrator/src/security/encryption.service.ts b/apps/orchestrator/src/security/encryption.service.ts new file mode 100644 index 0000000..bf4c576 --- /dev/null +++ b/apps/orchestrator/src/security/encryption.service.ts @@ -0,0 +1,85 @@ +import { Injectable } from "@nestjs/common"; +import { ConfigService } from "@nestjs/config"; +import { createDecipheriv, hkdfSync } from "node:crypto"; + +const ALGORITHM = "aes-256-gcm"; +const ENCRYPTED_PREFIX = "enc:"; +const IV_LENGTH = 12; +const AUTH_TAG_LENGTH = 16; +const DERIVED_KEY_LENGTH = 32; +const HKDF_SALT = "mosaic.crypto.v1"; +const HKDF_INFO = "mosaic-db-secret-encryption"; + +@Injectable() +export class EncryptionService { + private key: Buffer | null = null; + + constructor(private readonly configService: ConfigService) {} + + decryptIfNeeded(value: string): string { + if (!this.isEncrypted(value)) { + return value; + } + + return this.decrypt(value); + } + + decrypt(encrypted: string): string { + if (!this.isEncrypted(encrypted)) { + throw new Error("Value is not encrypted"); + } + + const payloadBase64 = encrypted.slice(ENCRYPTED_PREFIX.length); + + try { + const payload = Buffer.from(payloadBase64, "base64"); + if (payload.length < IV_LENGTH + AUTH_TAG_LENGTH) { + throw new Error("Encrypted payload is too short"); + } + + const iv = payload.subarray(0, IV_LENGTH); + const authTag = payload.subarray(payload.length - AUTH_TAG_LENGTH); + const ciphertext = payload.subarray(IV_LENGTH, payload.length - AUTH_TAG_LENGTH); + + const decipher = createDecipheriv(ALGORITHM, this.getOrCreateKey(), iv); + decipher.setAuthTag(authTag); + + return Buffer.concat([decipher.update(ciphertext), decipher.final()]).toString("utf8"); + } catch { + throw new Error("Failed to decrypt value"); + } + } + + isEncrypted(value: string): boolean { + return value.startsWith(ENCRYPTED_PREFIX); + } + + private getOrCreateKey(): Buffer { + if (this.key !== null) { + return this.key; + } + + const secret = this.configService.get("MOSAIC_SECRET_KEY"); + if (!secret) { + throw new Error( + "orchestrator: MOSAIC_SECRET_KEY is required. Set it in your config or via MOSAIC_SECRET_KEY." + ); + } + + if (secret.length < 32) { + throw new Error("MOSAIC_SECRET_KEY must be at least 32 characters"); + } + + this.key = Buffer.from( + hkdfSync( + "sha256", + Buffer.from(secret, "utf8"), + Buffer.from(HKDF_SALT, "utf8"), + Buffer.from(HKDF_INFO, "utf8"), + DERIVED_KEY_LENGTH + ) + ); + + return this.key; + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index abe75f5..86ce44b 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -319,6 +319,9 @@ importers: '@mosaic/shared': specifier: workspace:* version: link:../../packages/shared + '@nestjs/axios': + specifier: ^4.0.1 + version: 4.0.1(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(axios@1.13.5)(rxjs@7.8.2) '@nestjs/bullmq': specifier: ^11.0.4 version: 11.0.4(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(bullmq@5.67.2)