feat(orchestrator): MS23-P3-001 OpenClawProvider
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
This commit is contained in:
@@ -22,6 +22,7 @@
|
|||||||
"@anthropic-ai/sdk": "^0.72.1",
|
"@anthropic-ai/sdk": "^0.72.1",
|
||||||
"@mosaic/config": "workspace:*",
|
"@mosaic/config": "workspace:*",
|
||||||
"@mosaic/shared": "workspace:*",
|
"@mosaic/shared": "workspace:*",
|
||||||
|
"@nestjs/axios": "^4.0.1",
|
||||||
"@nestjs/bullmq": "^11.0.4",
|
"@nestjs/bullmq": "^11.0.4",
|
||||||
"@nestjs/common": "^11.1.12",
|
"@nestjs/common": "^11.1.12",
|
||||||
"@nestjs/config": "^4.0.2",
|
"@nestjs/config": "^4.0.2",
|
||||||
|
|||||||
@@ -0,0 +1,17 @@
|
|||||||
|
import { HttpService } from "@nestjs/axios";
|
||||||
|
import { Injectable } from "@nestjs/common";
|
||||||
|
import type { AgentProviderConfig } from "@prisma/client";
|
||||||
|
import { EncryptionService } from "../../../security/encryption.service";
|
||||||
|
import { OpenClawProvider } from "./openclaw.provider";
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class OpenClawProviderFactory {
|
||||||
|
constructor(
|
||||||
|
private readonly encryptionService: EncryptionService,
|
||||||
|
private readonly httpService: HttpService
|
||||||
|
) {}
|
||||||
|
|
||||||
|
createProvider(config: AgentProviderConfig): OpenClawProvider {
|
||||||
|
return new OpenClawProvider(config, this.encryptionService, this.httpService);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,270 @@
|
|||||||
|
import type { HttpService } from "@nestjs/axios";
|
||||||
|
import { ServiceUnavailableException } from "@nestjs/common";
|
||||||
|
import type { AgentProviderConfig } from "@prisma/client";
|
||||||
|
import { Readable } from "node:stream";
|
||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { EncryptionService } from "../../../security/encryption.service";
|
||||||
|
import { OpenClawProvider } from "./openclaw.provider";
|
||||||
|
|
||||||
|
describe("OpenClawProvider", () => {
|
||||||
|
let provider: OpenClawProvider;
|
||||||
|
let httpService: {
|
||||||
|
axiosRef: {
|
||||||
|
get: ReturnType<typeof vi.fn>;
|
||||||
|
post: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
let encryptionService: {
|
||||||
|
decryptIfNeeded: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
|
||||||
|
const config: AgentProviderConfig = {
|
||||||
|
id: "cfg-openclaw-1",
|
||||||
|
workspaceId: "workspace-1",
|
||||||
|
name: "openclaw-home",
|
||||||
|
provider: "openclaw",
|
||||||
|
gatewayUrl: "https://gateway.example.com/",
|
||||||
|
credentials: {
|
||||||
|
apiToken: "enc:token-value",
|
||||||
|
displayName: "Home OpenClaw",
|
||||||
|
},
|
||||||
|
isActive: true,
|
||||||
|
createdAt: new Date("2026-03-07T15:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T15:00:00.000Z"),
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
httpService = {
|
||||||
|
axiosRef: {
|
||||||
|
get: vi.fn(),
|
||||||
|
post: vi.fn(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
encryptionService = {
|
||||||
|
decryptIfNeeded: vi.fn().mockReturnValue("plain-token"),
|
||||||
|
};
|
||||||
|
|
||||||
|
provider = new OpenClawProvider(
|
||||||
|
config,
|
||||||
|
encryptionService as unknown as EncryptionService,
|
||||||
|
httpService as unknown as HttpService
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("maps listSessions from OpenClaw API", async () => {
|
||||||
|
httpService.axiosRef.get.mockResolvedValue({
|
||||||
|
data: {
|
||||||
|
sessions: [
|
||||||
|
{
|
||||||
|
id: "session-1",
|
||||||
|
status: "running",
|
||||||
|
createdAt: "2026-03-07T15:01:00.000Z",
|
||||||
|
updatedAt: "2026-03-07T15:02:00.000Z",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
total: 1,
|
||||||
|
cursor: "next-cursor",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await provider.listSessions("cursor-1", 25);
|
||||||
|
|
||||||
|
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
|
||||||
|
"https://gateway.example.com/api/sessions",
|
||||||
|
{
|
||||||
|
headers: {
|
||||||
|
Authorization: "Bearer plain-token",
|
||||||
|
},
|
||||||
|
params: {
|
||||||
|
cursor: "cursor-1",
|
||||||
|
limit: 25,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
expect(result).toEqual({
|
||||||
|
sessions: [
|
||||||
|
{
|
||||||
|
id: "session-1",
|
||||||
|
providerId: "openclaw-home",
|
||||||
|
providerType: "openclaw",
|
||||||
|
status: "active",
|
||||||
|
createdAt: new Date("2026-03-07T15:01:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T15:02:00.000Z"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
total: 1,
|
||||||
|
cursor: "next-cursor",
|
||||||
|
});
|
||||||
|
expect(encryptionService.decryptIfNeeded).toHaveBeenCalledWith("enc:token-value");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns null from getSession when OpenClaw returns 404", async () => {
|
||||||
|
httpService.axiosRef.get.mockRejectedValue({
|
||||||
|
response: {
|
||||||
|
status: 404,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await expect(provider.getSession("missing-session")).resolves.toBeNull();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("maps getMessages response", async () => {
|
||||||
|
httpService.axiosRef.get.mockResolvedValue({
|
||||||
|
data: {
|
||||||
|
messages: [
|
||||||
|
{
|
||||||
|
id: "message-1",
|
||||||
|
sessionId: "session-1",
|
||||||
|
role: "agent",
|
||||||
|
content: "hello",
|
||||||
|
timestamp: "2026-03-07T15:03:00.000Z",
|
||||||
|
metadata: {
|
||||||
|
tokens: 128,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const result = await provider.getMessages("session-1", 20, "before-cursor");
|
||||||
|
|
||||||
|
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
|
||||||
|
"https://gateway.example.com/api/messages",
|
||||||
|
{
|
||||||
|
headers: {
|
||||||
|
Authorization: "Bearer plain-token",
|
||||||
|
},
|
||||||
|
params: {
|
||||||
|
sessionId: "session-1",
|
||||||
|
limit: 20,
|
||||||
|
before: "before-cursor",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
expect(result).toEqual([
|
||||||
|
{
|
||||||
|
id: "message-1",
|
||||||
|
sessionId: "session-1",
|
||||||
|
role: "assistant",
|
||||||
|
content: "hello",
|
||||||
|
timestamp: new Date("2026-03-07T15:03:00.000Z"),
|
||||||
|
metadata: {
|
||||||
|
tokens: 128,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("maps inject and control endpoints", async () => {
|
||||||
|
httpService.axiosRef.post
|
||||||
|
.mockResolvedValueOnce({
|
||||||
|
data: {
|
||||||
|
accepted: true,
|
||||||
|
messageId: "message-2",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
.mockResolvedValueOnce({ data: {} })
|
||||||
|
.mockResolvedValueOnce({ data: {} })
|
||||||
|
.mockResolvedValueOnce({ data: {} });
|
||||||
|
|
||||||
|
await expect(provider.injectMessage("session-1", "barge in")).resolves.toEqual({
|
||||||
|
accepted: true,
|
||||||
|
messageId: "message-2",
|
||||||
|
});
|
||||||
|
|
||||||
|
await provider.pauseSession("session-1");
|
||||||
|
await provider.resumeSession("session-1");
|
||||||
|
await provider.killSession("session-1", false);
|
||||||
|
|
||||||
|
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
|
||||||
|
1,
|
||||||
|
"https://gateway.example.com/api/sessions/session-1/inject",
|
||||||
|
{ content: "barge in" },
|
||||||
|
{
|
||||||
|
headers: {
|
||||||
|
Authorization: "Bearer plain-token",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
|
||||||
|
2,
|
||||||
|
"https://gateway.example.com/api/sessions/session-1/pause",
|
||||||
|
{},
|
||||||
|
{
|
||||||
|
headers: {
|
||||||
|
Authorization: "Bearer plain-token",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
|
||||||
|
3,
|
||||||
|
"https://gateway.example.com/api/sessions/session-1/resume",
|
||||||
|
{},
|
||||||
|
{
|
||||||
|
headers: {
|
||||||
|
Authorization: "Bearer plain-token",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
expect(httpService.axiosRef.post).toHaveBeenNthCalledWith(
|
||||||
|
4,
|
||||||
|
"https://gateway.example.com/api/sessions/session-1/kill",
|
||||||
|
{ force: false },
|
||||||
|
{
|
||||||
|
headers: {
|
||||||
|
Authorization: "Bearer plain-token",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("parses SSE stream messages", async () => {
|
||||||
|
const stream = Readable.from([
|
||||||
|
'data: {"id":"message-stream","sessionId":"session-stream","role":"assistant","content":"stream hello","timestamp":"2026-03-07T16:00:00.000Z"}\n\n',
|
||||||
|
"data: [DONE]\n\n",
|
||||||
|
]);
|
||||||
|
|
||||||
|
httpService.axiosRef.get.mockResolvedValue({
|
||||||
|
data: stream,
|
||||||
|
});
|
||||||
|
|
||||||
|
const messages: Array<unknown> = [];
|
||||||
|
for await (const message of provider.streamMessages("session-stream")) {
|
||||||
|
messages.push(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
|
||||||
|
"https://gateway.example.com/api/sessions/session-stream/stream",
|
||||||
|
{
|
||||||
|
headers: {
|
||||||
|
Authorization: "Bearer plain-token",
|
||||||
|
Accept: "text/event-stream",
|
||||||
|
},
|
||||||
|
responseType: "stream",
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(messages).toEqual([
|
||||||
|
{
|
||||||
|
id: "message-stream",
|
||||||
|
sessionId: "session-stream",
|
||||||
|
role: "assistant",
|
||||||
|
content: "stream hello",
|
||||||
|
timestamp: new Date("2026-03-07T16:00:00.000Z"),
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws ServiceUnavailableException for request failures", async () => {
|
||||||
|
httpService.axiosRef.get.mockRejectedValue(new Error("gateway unreachable"));
|
||||||
|
|
||||||
|
await expect(provider.listSessions()).rejects.toBeInstanceOf(ServiceUnavailableException);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("returns false from isAvailable when gateway check fails", async () => {
|
||||||
|
httpService.axiosRef.get.mockRejectedValue(new Error("gateway unreachable"));
|
||||||
|
|
||||||
|
await expect(provider.isAvailable()).resolves.toBe(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,703 @@
|
|||||||
|
import { HttpService } from "@nestjs/axios";
|
||||||
|
import { Injectable, ServiceUnavailableException } from "@nestjs/common";
|
||||||
|
import type {
|
||||||
|
AgentMessage,
|
||||||
|
AgentMessageRole,
|
||||||
|
AgentSession,
|
||||||
|
AgentSessionList,
|
||||||
|
AgentSessionStatus,
|
||||||
|
IAgentProvider,
|
||||||
|
InjectResult,
|
||||||
|
} from "@mosaic/shared";
|
||||||
|
import type { AgentProviderConfig } from "@prisma/client";
|
||||||
|
import { randomUUID } from "node:crypto";
|
||||||
|
import { EncryptionService } from "../../../security/encryption.service";
|
||||||
|
|
||||||
|
const DEFAULT_SESSION_LIMIT = 50;
|
||||||
|
const DEFAULT_MESSAGE_LIMIT = 50;
|
||||||
|
const MAX_MESSAGE_LIMIT = 200;
|
||||||
|
const OPENCLAW_PROVIDER_TYPE = "openclaw";
|
||||||
|
const API_TOKEN_KEYS = ["apiToken", "token", "bearerToken"] as const;
|
||||||
|
const DISPLAY_NAME_KEYS = ["displayName", "label"] as const;
|
||||||
|
|
||||||
|
type JsonRecord = Record<string, unknown>;
|
||||||
|
type AsyncChunkStream = AsyncIterable<string | Uint8Array | Buffer>;
|
||||||
|
|
||||||
|
interface HttpErrorWithResponse {
|
||||||
|
response?: {
|
||||||
|
status?: number;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class OpenClawProvider implements IAgentProvider {
|
||||||
|
readonly providerId: string;
|
||||||
|
readonly providerType = OPENCLAW_PROVIDER_TYPE;
|
||||||
|
readonly displayName: string;
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly config: AgentProviderConfig,
|
||||||
|
private readonly encryptionService: EncryptionService,
|
||||||
|
private readonly httpService: HttpService
|
||||||
|
) {
|
||||||
|
this.providerId = this.config.name;
|
||||||
|
this.displayName = this.resolveDisplayName();
|
||||||
|
}
|
||||||
|
|
||||||
|
validateBaseUrl(): void {
|
||||||
|
void this.resolveBaseUrl();
|
||||||
|
}
|
||||||
|
|
||||||
|
validateToken(): void {
|
||||||
|
void this.resolveApiToken();
|
||||||
|
}
|
||||||
|
|
||||||
|
async listSessions(cursor?: string, limit = DEFAULT_SESSION_LIMIT): Promise<AgentSessionList> {
|
||||||
|
const safeLimit = this.normalizeLimit(limit, DEFAULT_SESSION_LIMIT);
|
||||||
|
const params: Record<string, number | string> = { limit: safeLimit };
|
||||||
|
if (typeof cursor === "string" && cursor.length > 0) {
|
||||||
|
params.cursor = cursor;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await this.httpService.axiosRef.get(this.buildUrl("/api/sessions"), {
|
||||||
|
headers: this.authHeaders(),
|
||||||
|
params,
|
||||||
|
});
|
||||||
|
|
||||||
|
const page = this.extractSessionPage(response.data);
|
||||||
|
const sessions = page.records
|
||||||
|
.map((record) => this.toAgentSession(record))
|
||||||
|
.filter((session): session is AgentSession => session !== null);
|
||||||
|
|
||||||
|
return {
|
||||||
|
sessions,
|
||||||
|
total: page.total ?? sessions.length,
|
||||||
|
...(page.cursor !== undefined ? { cursor: page.cursor } : {}),
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
throw this.toServiceUnavailable("list sessions", error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async getSession(sessionId: string): Promise<AgentSession | null> {
|
||||||
|
try {
|
||||||
|
const response = await this.httpService.axiosRef.get(
|
||||||
|
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}`),
|
||||||
|
{
|
||||||
|
headers: this.authHeaders(),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const payload = this.unwrapContainer(response.data, ["session", "data"]);
|
||||||
|
return this.toAgentSession(payload);
|
||||||
|
} catch (error) {
|
||||||
|
if (this.getHttpStatus(error) === 404) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
throw this.toServiceUnavailable(`get session ${sessionId}`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async getMessages(
|
||||||
|
sessionId: string,
|
||||||
|
limit = DEFAULT_MESSAGE_LIMIT,
|
||||||
|
before?: string
|
||||||
|
): Promise<AgentMessage[]> {
|
||||||
|
const safeLimit = this.normalizeLimit(limit, DEFAULT_MESSAGE_LIMIT);
|
||||||
|
const params: Record<string, number | string> = {
|
||||||
|
sessionId,
|
||||||
|
limit: safeLimit,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (typeof before === "string" && before.length > 0) {
|
||||||
|
params.before = before;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await this.httpService.axiosRef.get(this.buildUrl("/api/messages"), {
|
||||||
|
headers: this.authHeaders(),
|
||||||
|
params,
|
||||||
|
});
|
||||||
|
|
||||||
|
return this.extractMessageRecords(response.data)
|
||||||
|
.map((record) => this.toAgentMessage(record, sessionId))
|
||||||
|
.filter((message): message is AgentMessage => message !== null);
|
||||||
|
} catch (error) {
|
||||||
|
throw this.toServiceUnavailable(`get messages for session ${sessionId}`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async injectMessage(sessionId: string, content: string): Promise<InjectResult> {
|
||||||
|
try {
|
||||||
|
const response = await this.httpService.axiosRef.post(
|
||||||
|
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/inject`),
|
||||||
|
{ content },
|
||||||
|
{
|
||||||
|
headers: this.authHeaders(),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const payload = this.isRecord(response.data) ? response.data : {};
|
||||||
|
|
||||||
|
return {
|
||||||
|
accepted: typeof payload.accepted === "boolean" ? payload.accepted : true,
|
||||||
|
...(this.readString(payload.messageId) !== undefined
|
||||||
|
? { messageId: this.readString(payload.messageId) }
|
||||||
|
: {}),
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
throw this.toServiceUnavailable(`inject message into session ${sessionId}`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async pauseSession(sessionId: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
await this.httpService.axiosRef.post(
|
||||||
|
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/pause`),
|
||||||
|
{},
|
||||||
|
{
|
||||||
|
headers: this.authHeaders(),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
throw this.toServiceUnavailable(`pause session ${sessionId}`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async resumeSession(sessionId: string): Promise<void> {
|
||||||
|
try {
|
||||||
|
await this.httpService.axiosRef.post(
|
||||||
|
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/resume`),
|
||||||
|
{},
|
||||||
|
{
|
||||||
|
headers: this.authHeaders(),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
throw this.toServiceUnavailable(`resume session ${sessionId}`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async killSession(sessionId: string, force = true): Promise<void> {
|
||||||
|
try {
|
||||||
|
await this.httpService.axiosRef.post(
|
||||||
|
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/kill`),
|
||||||
|
{ force },
|
||||||
|
{
|
||||||
|
headers: this.authHeaders(),
|
||||||
|
}
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
throw this.toServiceUnavailable(`kill session ${sessionId}`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async *streamMessages(sessionId: string): AsyncIterable<AgentMessage> {
|
||||||
|
try {
|
||||||
|
const response = await this.httpService.axiosRef.get(
|
||||||
|
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/stream`),
|
||||||
|
{
|
||||||
|
headers: this.authHeaders({
|
||||||
|
Accept: "text/event-stream",
|
||||||
|
}),
|
||||||
|
responseType: "stream",
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const stream = this.asAsyncChunkStream(response.data);
|
||||||
|
if (!stream) {
|
||||||
|
throw new Error("OpenClaw stream response is not readable");
|
||||||
|
}
|
||||||
|
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
let buffer = "";
|
||||||
|
let streamDone = false;
|
||||||
|
|
||||||
|
for await (const chunk of stream) {
|
||||||
|
const textChunk =
|
||||||
|
typeof chunk === "string" ? chunk : decoder.decode(chunk, { stream: true });
|
||||||
|
buffer += textChunk.replace(/\r\n/gu, "\n");
|
||||||
|
|
||||||
|
const events = buffer.split("\n\n");
|
||||||
|
buffer = events.pop() ?? "";
|
||||||
|
|
||||||
|
for (const event of events) {
|
||||||
|
const data = this.extractSseData(event);
|
||||||
|
if (data === null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data === "[DONE]") {
|
||||||
|
streamDone = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = this.toAgentMessage(this.tryParseJson(data) ?? data, sessionId);
|
||||||
|
if (message !== null) {
|
||||||
|
yield message;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (streamDone) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!streamDone && buffer.trim().length > 0) {
|
||||||
|
const data = this.extractSseData(buffer.trim());
|
||||||
|
if (data !== null && data !== "[DONE]") {
|
||||||
|
const message = this.toAgentMessage(this.tryParseJson(data) ?? data, sessionId);
|
||||||
|
if (message !== null) {
|
||||||
|
yield message;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
throw this.toServiceUnavailable(`stream messages for session ${sessionId}`, error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async isAvailable(): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
this.validateBaseUrl();
|
||||||
|
this.validateToken();
|
||||||
|
|
||||||
|
await this.httpService.axiosRef.get(this.buildUrl("/api/sessions"), {
|
||||||
|
headers: this.authHeaders(),
|
||||||
|
params: { limit: 1 },
|
||||||
|
});
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private extractSessionPage(payload: unknown): {
|
||||||
|
records: unknown[];
|
||||||
|
total?: number;
|
||||||
|
cursor?: string;
|
||||||
|
} {
|
||||||
|
if (Array.isArray(payload)) {
|
||||||
|
return {
|
||||||
|
records: payload,
|
||||||
|
total: payload.length,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.isRecord(payload)) {
|
||||||
|
return {
|
||||||
|
records: [],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let records: unknown[] = [];
|
||||||
|
if (Array.isArray(payload.sessions)) {
|
||||||
|
records = payload.sessions;
|
||||||
|
} else if (Array.isArray(payload.items)) {
|
||||||
|
records = payload.items;
|
||||||
|
} else if (Array.isArray(payload.data)) {
|
||||||
|
records = payload.data;
|
||||||
|
}
|
||||||
|
|
||||||
|
const total = typeof payload.total === "number" ? payload.total : undefined;
|
||||||
|
const cursor = this.readString(payload.cursor) ?? this.readString(payload.nextCursor);
|
||||||
|
|
||||||
|
return {
|
||||||
|
records,
|
||||||
|
total,
|
||||||
|
...(cursor !== undefined ? { cursor } : {}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private extractMessageRecords(payload: unknown): unknown[] {
|
||||||
|
if (Array.isArray(payload)) {
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.isRecord(payload)) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Array.isArray(payload.messages)) {
|
||||||
|
return payload.messages;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Array.isArray(payload.items)) {
|
||||||
|
return payload.items;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Array.isArray(payload.data)) {
|
||||||
|
return payload.data;
|
||||||
|
}
|
||||||
|
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
private unwrapContainer(payload: unknown, keys: string[]): unknown {
|
||||||
|
if (!this.isRecord(payload)) {
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const key of keys) {
|
||||||
|
if (key in payload) {
|
||||||
|
return payload[key];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return payload;
|
||||||
|
}
|
||||||
|
|
||||||
|
private toAgentSession(record: unknown): AgentSession | null {
|
||||||
|
if (!this.isRecord(record)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const id =
|
||||||
|
this.readString(record.id) ??
|
||||||
|
this.readString(record.sessionId) ??
|
||||||
|
this.readString(record.key);
|
||||||
|
if (!id) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const createdAt = this.parseDate(record.createdAt ?? record.spawnedAt ?? record.startedAt);
|
||||||
|
const updatedAt = this.parseDate(
|
||||||
|
record.updatedAt ?? record.completedAt ?? record.lastActivityAt ?? record.endedAt,
|
||||||
|
createdAt
|
||||||
|
);
|
||||||
|
|
||||||
|
const label =
|
||||||
|
this.readString(record.label) ??
|
||||||
|
this.readString(record.title) ??
|
||||||
|
this.readString(record.name) ??
|
||||||
|
undefined;
|
||||||
|
|
||||||
|
const parentSessionId = this.readString(record.parentSessionId) ?? undefined;
|
||||||
|
const metadata = this.toMetadata(record.metadata);
|
||||||
|
|
||||||
|
return {
|
||||||
|
id,
|
||||||
|
providerId: this.providerId,
|
||||||
|
providerType: this.providerType,
|
||||||
|
...(label !== undefined ? { label } : {}),
|
||||||
|
status: this.toSessionStatus(this.readString(record.status)),
|
||||||
|
...(parentSessionId !== undefined ? { parentSessionId } : {}),
|
||||||
|
createdAt,
|
||||||
|
updatedAt,
|
||||||
|
...(metadata !== undefined ? { metadata } : {}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private toAgentMessage(value: unknown, fallbackSessionId?: string): AgentMessage | null {
|
||||||
|
if (typeof value === "string") {
|
||||||
|
const content = value.trim();
|
||||||
|
if (content.length === 0 || fallbackSessionId === undefined) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
id: randomUUID(),
|
||||||
|
sessionId: fallbackSessionId,
|
||||||
|
role: "assistant",
|
||||||
|
content,
|
||||||
|
timestamp: new Date(),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let candidate: JsonRecord | null = null;
|
||||||
|
|
||||||
|
if (this.isRecord(value) && this.isRecord(value.message)) {
|
||||||
|
candidate = value.message;
|
||||||
|
} else if (this.isRecord(value)) {
|
||||||
|
candidate = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (candidate === null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const sessionId = this.readString(candidate.sessionId) ?? fallbackSessionId;
|
||||||
|
if (!sessionId) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const content = this.extractMessageContent(
|
||||||
|
candidate.content ?? candidate.text ?? candidate.message
|
||||||
|
);
|
||||||
|
if (content.length === 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
const metadata = this.toMetadata(candidate.metadata);
|
||||||
|
|
||||||
|
return {
|
||||||
|
id: this.readString(candidate.id) ?? this.readString(candidate.messageId) ?? randomUUID(),
|
||||||
|
sessionId,
|
||||||
|
role: this.toMessageRole(this.readString(candidate.role) ?? this.readString(candidate.type)),
|
||||||
|
content,
|
||||||
|
timestamp: this.parseDate(candidate.timestamp ?? candidate.createdAt),
|
||||||
|
...(metadata !== undefined ? { metadata } : {}),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private extractMessageContent(content: unknown): string {
|
||||||
|
if (typeof content === "string") {
|
||||||
|
return content.trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Array.isArray(content)) {
|
||||||
|
const parts: string[] = [];
|
||||||
|
|
||||||
|
for (const part of content) {
|
||||||
|
if (typeof part === "string") {
|
||||||
|
const trimmed = part.trim();
|
||||||
|
if (trimmed.length > 0) {
|
||||||
|
parts.push(trimmed);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.isRecord(part)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const text = this.readString(part.text) ?? this.readString(part.content);
|
||||||
|
if (text !== undefined && text.trim().length > 0) {
|
||||||
|
parts.push(text.trim());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return parts.join("\n\n").trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.isRecord(content)) {
|
||||||
|
const text = this.readString(content.text) ?? this.readString(content.content);
|
||||||
|
return text?.trim() ?? "";
|
||||||
|
}
|
||||||
|
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
private toSessionStatus(status?: string): AgentSessionStatus {
|
||||||
|
switch (status?.toLowerCase()) {
|
||||||
|
case "active":
|
||||||
|
case "running":
|
||||||
|
return "active";
|
||||||
|
case "paused":
|
||||||
|
return "paused";
|
||||||
|
case "completed":
|
||||||
|
case "done":
|
||||||
|
case "succeeded":
|
||||||
|
return "completed";
|
||||||
|
case "failed":
|
||||||
|
case "error":
|
||||||
|
case "killed":
|
||||||
|
case "terminated":
|
||||||
|
case "cancelled":
|
||||||
|
return "failed";
|
||||||
|
case "idle":
|
||||||
|
case "pending":
|
||||||
|
case "queued":
|
||||||
|
default:
|
||||||
|
return "idle";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private toMessageRole(role?: string): AgentMessageRole {
|
||||||
|
switch (role?.toLowerCase()) {
|
||||||
|
case "assistant":
|
||||||
|
case "agent":
|
||||||
|
return "assistant";
|
||||||
|
case "system":
|
||||||
|
return "system";
|
||||||
|
case "tool":
|
||||||
|
return "tool";
|
||||||
|
case "operator":
|
||||||
|
case "user":
|
||||||
|
default:
|
||||||
|
return "user";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private normalizeLimit(value: number, fallback: number): number {
|
||||||
|
const normalized = Number.isFinite(value) ? Math.trunc(value) : fallback;
|
||||||
|
if (normalized < 1) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Math.min(normalized, MAX_MESSAGE_LIMIT);
|
||||||
|
}
|
||||||
|
|
||||||
|
private parseDate(value: unknown, fallback = new Date()): Date {
|
||||||
|
if (value instanceof Date) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof value === "string" || typeof value === "number") {
|
||||||
|
const parsed = new Date(value);
|
||||||
|
if (!Number.isNaN(parsed.getTime())) {
|
||||||
|
return parsed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return fallback;
|
||||||
|
}
|
||||||
|
|
||||||
|
private toMetadata(value: unknown): Record<string, unknown> | undefined {
|
||||||
|
if (this.isRecord(value)) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
private resolveDisplayName(): string {
|
||||||
|
const credentials = this.readCredentials();
|
||||||
|
|
||||||
|
for (const key of DISPLAY_NAME_KEYS) {
|
||||||
|
const value = this.readString(credentials[key]);
|
||||||
|
if (value !== undefined) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.config.name;
|
||||||
|
}
|
||||||
|
|
||||||
|
private resolveBaseUrl(): string {
|
||||||
|
const configRecord = this.config as unknown as JsonRecord;
|
||||||
|
const rawBaseUrl =
|
||||||
|
this.readString(this.config.gatewayUrl) ?? this.readString(configRecord.baseUrl);
|
||||||
|
|
||||||
|
if (rawBaseUrl === undefined) {
|
||||||
|
throw new Error(`OpenClaw provider ${this.providerId} is missing gateway URL`);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const parsed = new URL(rawBaseUrl);
|
||||||
|
return parsed.toString().replace(/\/$/u, "");
|
||||||
|
} catch {
|
||||||
|
throw new Error(`OpenClaw provider ${this.providerId} has invalid gateway URL`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private resolveApiToken(): string {
|
||||||
|
const configRecord = this.config as unknown as JsonRecord;
|
||||||
|
const credentials = this.readCredentials();
|
||||||
|
|
||||||
|
const rawToken =
|
||||||
|
this.readString(configRecord.apiToken) ??
|
||||||
|
this.readString(configRecord.token) ??
|
||||||
|
this.readString(configRecord.bearerToken) ??
|
||||||
|
this.findFirstString(credentials, API_TOKEN_KEYS);
|
||||||
|
|
||||||
|
if (rawToken === undefined) {
|
||||||
|
throw new Error(`OpenClaw provider ${this.providerId} is missing apiToken credentials`);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
return this.encryptionService.decryptIfNeeded(rawToken);
|
||||||
|
} catch (error) {
|
||||||
|
throw new Error(`Failed to decrypt API token: ${this.toErrorMessage(error)}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private readCredentials(): JsonRecord {
|
||||||
|
return this.isRecord(this.config.credentials) ? this.config.credentials : {};
|
||||||
|
}
|
||||||
|
|
||||||
|
private findFirstString(record: JsonRecord, keys: readonly string[]): string | undefined {
|
||||||
|
for (const key of keys) {
|
||||||
|
const value = this.readString(record[key]);
|
||||||
|
if (value !== undefined) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
private authHeaders(extraHeaders: Record<string, string> = {}): Record<string, string> {
|
||||||
|
return {
|
||||||
|
Authorization: `Bearer ${this.resolveApiToken()}`,
|
||||||
|
...extraHeaders,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private buildUrl(path: string): string {
|
||||||
|
return new URL(path, `${this.resolveBaseUrl()}/`).toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
private extractSseData(rawEvent: string): string | null {
|
||||||
|
const lines = rawEvent.split("\n");
|
||||||
|
const dataLines = lines
|
||||||
|
.filter((line) => line.startsWith("data:"))
|
||||||
|
.map((line) => line.slice(5).trimStart());
|
||||||
|
|
||||||
|
if (dataLines.length > 0) {
|
||||||
|
return dataLines.join("\n").trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
const trimmed = rawEvent.trim();
|
||||||
|
if (trimmed.startsWith("{") || trimmed.startsWith("[")) {
|
||||||
|
return trimmed;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private tryParseJson(value: string): unknown {
|
||||||
|
try {
|
||||||
|
return JSON.parse(value) as unknown;
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private asAsyncChunkStream(value: unknown): AsyncChunkStream | null {
|
||||||
|
if (value !== null && typeof value === "object" && Symbol.asyncIterator in value) {
|
||||||
|
return value as AsyncChunkStream;
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private isRecord(value: unknown): value is JsonRecord {
|
||||||
|
return typeof value === "object" && value !== null && !Array.isArray(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private readString(value: unknown): string | undefined {
|
||||||
|
if (typeof value !== "string") {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
const trimmed = value.trim();
|
||||||
|
return trimmed.length > 0 ? trimmed : undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
private getHttpStatus(error: unknown): number | undefined {
|
||||||
|
if (typeof error !== "object" || error === null || !("response" in error)) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
const response = (error as HttpErrorWithResponse).response;
|
||||||
|
return typeof response?.status === "number" ? response.status : undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
private toServiceUnavailable(operation: string, error: unknown): ServiceUnavailableException {
|
||||||
|
return new ServiceUnavailableException(
|
||||||
|
`OpenClaw provider ${this.providerId} failed to ${operation}: ${this.toErrorMessage(error)}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private toErrorMessage(error: unknown): string {
|
||||||
|
if (error instanceof Error) {
|
||||||
|
return error.message;
|
||||||
|
}
|
||||||
|
|
||||||
|
return String(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
131
apps/orchestrator/src/api/providers/providers.module.spec.ts
Normal file
131
apps/orchestrator/src/api/providers/providers.module.spec.ts
Normal file
@@ -0,0 +1,131 @@
|
|||||||
|
import { Logger } from "@nestjs/common";
|
||||||
|
import type { AgentProviderConfig } from "@prisma/client";
|
||||||
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { PrismaService } from "../../prisma/prisma.service";
|
||||||
|
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||||
|
import { OpenClawProviderFactory } from "./openclaw/openclaw.provider-factory";
|
||||||
|
import { ProvidersModule } from "./providers.module";
|
||||||
|
|
||||||
|
type MockOpenClawProvider = {
|
||||||
|
providerId: string;
|
||||||
|
validateBaseUrl: ReturnType<typeof vi.fn>;
|
||||||
|
validateToken: ReturnType<typeof vi.fn>;
|
||||||
|
isAvailable: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
|
||||||
|
describe("ProvidersModule", () => {
|
||||||
|
let moduleRef: ProvidersModule;
|
||||||
|
let prisma: {
|
||||||
|
agentProviderConfig: {
|
||||||
|
findMany: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
let registry: {
|
||||||
|
registerProvider: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
let factory: {
|
||||||
|
createProvider: ReturnType<typeof vi.fn>;
|
||||||
|
};
|
||||||
|
|
||||||
|
const config: AgentProviderConfig = {
|
||||||
|
id: "cfg-openclaw-1",
|
||||||
|
workspaceId: "workspace-1",
|
||||||
|
name: "openclaw-home",
|
||||||
|
provider: "openclaw",
|
||||||
|
gatewayUrl: "https://gateway.example.com",
|
||||||
|
credentials: { apiToken: "enc:token-value" },
|
||||||
|
isActive: true,
|
||||||
|
createdAt: new Date("2026-03-07T15:00:00.000Z"),
|
||||||
|
updatedAt: new Date("2026-03-07T15:00:00.000Z"),
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
prisma = {
|
||||||
|
agentProviderConfig: {
|
||||||
|
findMany: vi.fn(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
registry = {
|
||||||
|
registerProvider: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
factory = {
|
||||||
|
createProvider: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
moduleRef = new ProvidersModule(
|
||||||
|
prisma as unknown as PrismaService,
|
||||||
|
registry as unknown as AgentProviderRegistry,
|
||||||
|
factory as unknown as OpenClawProviderFactory
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("registers reachable OpenClaw providers", async () => {
|
||||||
|
const provider: MockOpenClawProvider = {
|
||||||
|
providerId: "openclaw-home",
|
||||||
|
validateBaseUrl: vi.fn(),
|
||||||
|
validateToken: vi.fn(),
|
||||||
|
isAvailable: vi.fn().mockResolvedValue(true),
|
||||||
|
};
|
||||||
|
|
||||||
|
prisma.agentProviderConfig.findMany.mockResolvedValue([config]);
|
||||||
|
factory.createProvider.mockReturnValue(provider);
|
||||||
|
|
||||||
|
await moduleRef.onModuleInit();
|
||||||
|
|
||||||
|
expect(prisma.agentProviderConfig.findMany).toHaveBeenCalledWith({
|
||||||
|
where: {
|
||||||
|
provider: "openclaw",
|
||||||
|
isActive: true,
|
||||||
|
},
|
||||||
|
orderBy: [{ createdAt: "asc" }, { id: "asc" }],
|
||||||
|
});
|
||||||
|
expect(factory.createProvider).toHaveBeenCalledWith(config);
|
||||||
|
expect(provider.validateBaseUrl).toHaveBeenCalledTimes(1);
|
||||||
|
expect(provider.validateToken).toHaveBeenCalledTimes(1);
|
||||||
|
expect(provider.isAvailable).toHaveBeenCalledTimes(1);
|
||||||
|
expect(registry.registerProvider).toHaveBeenCalledWith(provider);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("skips provider registration when gateway is unreachable", async () => {
|
||||||
|
const warnSpy = vi.spyOn(Logger.prototype, "warn").mockImplementation(() => undefined);
|
||||||
|
const provider: MockOpenClawProvider = {
|
||||||
|
providerId: "openclaw-home",
|
||||||
|
validateBaseUrl: vi.fn(),
|
||||||
|
validateToken: vi.fn(),
|
||||||
|
isAvailable: vi.fn().mockResolvedValue(false),
|
||||||
|
};
|
||||||
|
|
||||||
|
prisma.agentProviderConfig.findMany.mockResolvedValue([config]);
|
||||||
|
factory.createProvider.mockReturnValue(provider);
|
||||||
|
|
||||||
|
await moduleRef.onModuleInit();
|
||||||
|
|
||||||
|
expect(registry.registerProvider).not.toHaveBeenCalled();
|
||||||
|
expect(warnSpy).toHaveBeenCalledWith(
|
||||||
|
expect.stringContaining("Skipping OpenClaw provider openclaw-home")
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("skips provider registration when token decryption fails", async () => {
|
||||||
|
const errorSpy = vi.spyOn(Logger.prototype, "error").mockImplementation(() => undefined);
|
||||||
|
const provider: MockOpenClawProvider = {
|
||||||
|
providerId: "openclaw-home",
|
||||||
|
validateBaseUrl: vi.fn(),
|
||||||
|
validateToken: vi.fn().mockImplementation(() => {
|
||||||
|
throw new Error("Failed to decrypt API token");
|
||||||
|
}),
|
||||||
|
isAvailable: vi.fn().mockResolvedValue(true),
|
||||||
|
};
|
||||||
|
|
||||||
|
prisma.agentProviderConfig.findMany.mockResolvedValue([config]);
|
||||||
|
factory.createProvider.mockReturnValue(provider);
|
||||||
|
|
||||||
|
await moduleRef.onModuleInit();
|
||||||
|
|
||||||
|
expect(registry.registerProvider).not.toHaveBeenCalled();
|
||||||
|
expect(errorSpy).toHaveBeenCalledWith(expect.stringContaining("token decryption failed"));
|
||||||
|
expect(provider.isAvailable).not.toHaveBeenCalled();
|
||||||
|
});
|
||||||
|
});
|
||||||
94
apps/orchestrator/src/api/providers/providers.module.ts
Normal file
94
apps/orchestrator/src/api/providers/providers.module.ts
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
import { HttpModule } from "@nestjs/axios";
|
||||||
|
import { Logger, Module, OnModuleInit } from "@nestjs/common";
|
||||||
|
import type { AgentProviderConfig } from "@prisma/client";
|
||||||
|
import { PrismaModule } from "../../prisma/prisma.module";
|
||||||
|
import { PrismaService } from "../../prisma/prisma.service";
|
||||||
|
import { EncryptionService } from "../../security/encryption.service";
|
||||||
|
import { AgentProviderRegistry } from "../agents/agent-provider.registry";
|
||||||
|
import { AgentsModule } from "../agents/agents.module";
|
||||||
|
import { OpenClawProviderFactory } from "./openclaw/openclaw.provider-factory";
|
||||||
|
|
||||||
|
const OPENCLAW_PROVIDER_TYPE = "openclaw";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [
|
||||||
|
AgentsModule,
|
||||||
|
PrismaModule,
|
||||||
|
HttpModule.register({
|
||||||
|
timeout: 10000,
|
||||||
|
maxRedirects: 5,
|
||||||
|
}),
|
||||||
|
],
|
||||||
|
providers: [EncryptionService, OpenClawProviderFactory],
|
||||||
|
})
|
||||||
|
export class ProvidersModule implements OnModuleInit {
|
||||||
|
private readonly logger = new Logger(ProvidersModule.name);
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
private readonly prisma: PrismaService,
|
||||||
|
private readonly registry: AgentProviderRegistry,
|
||||||
|
private readonly openClawProviderFactory: OpenClawProviderFactory
|
||||||
|
) {}
|
||||||
|
|
||||||
|
async onModuleInit(): Promise<void> {
|
||||||
|
const configs = await this.prisma.agentProviderConfig.findMany({
|
||||||
|
where: {
|
||||||
|
provider: OPENCLAW_PROVIDER_TYPE,
|
||||||
|
isActive: true,
|
||||||
|
},
|
||||||
|
orderBy: [{ createdAt: "asc" }, { id: "asc" }],
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const config of configs) {
|
||||||
|
await this.registerProvider(config);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async registerProvider(config: AgentProviderConfig): Promise<void> {
|
||||||
|
const provider = this.openClawProviderFactory.createProvider(config);
|
||||||
|
|
||||||
|
try {
|
||||||
|
provider.validateBaseUrl();
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Skipping OpenClaw provider ${config.name}: invalid configuration (${this.toErrorMessage(error)})`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
provider.validateToken();
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(
|
||||||
|
`Skipping OpenClaw provider ${config.name}: token decryption failed (${this.toErrorMessage(error)})`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const available = await provider.isAvailable();
|
||||||
|
if (!available) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Skipping OpenClaw provider ${config.name}: gateway ${config.gatewayUrl} is unreachable`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Skipping OpenClaw provider ${config.name}: gateway ${config.gatewayUrl} is unreachable (${this.toErrorMessage(error)})`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.registry.registerProvider(provider);
|
||||||
|
this.logger.log(`Registered OpenClaw provider ${provider.providerId}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
private toErrorMessage(error: unknown): string {
|
||||||
|
if (error instanceof Error) {
|
||||||
|
return error.message;
|
||||||
|
}
|
||||||
|
|
||||||
|
return String(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -7,6 +7,7 @@ import { AgentsModule } from "./api/agents/agents.module";
|
|||||||
import { MissionControlModule } from "./api/mission-control/mission-control.module";
|
import { MissionControlModule } from "./api/mission-control/mission-control.module";
|
||||||
import { QueueApiModule } from "./api/queue/queue-api.module";
|
import { QueueApiModule } from "./api/queue/queue-api.module";
|
||||||
import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module";
|
import { AgentProvidersModule } from "./api/agent-providers/agent-providers.module";
|
||||||
|
import { ProvidersModule } from "./api/providers/providers.module";
|
||||||
import { CoordinatorModule } from "./coordinator/coordinator.module";
|
import { CoordinatorModule } from "./coordinator/coordinator.module";
|
||||||
import { BudgetModule } from "./budget/budget.module";
|
import { BudgetModule } from "./budget/budget.module";
|
||||||
import { CIModule } from "./ci";
|
import { CIModule } from "./ci";
|
||||||
@@ -54,6 +55,7 @@ import { orchestratorConfig } from "./config/orchestrator.config";
|
|||||||
HealthModule,
|
HealthModule,
|
||||||
AgentsModule,
|
AgentsModule,
|
||||||
AgentProvidersModule,
|
AgentProvidersModule,
|
||||||
|
ProvidersModule,
|
||||||
MissionControlModule,
|
MissionControlModule,
|
||||||
QueueApiModule,
|
QueueApiModule,
|
||||||
CoordinatorModule,
|
CoordinatorModule,
|
||||||
|
|||||||
85
apps/orchestrator/src/security/encryption.service.ts
Normal file
85
apps/orchestrator/src/security/encryption.service.ts
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
import { Injectable } from "@nestjs/common";
|
||||||
|
import { ConfigService } from "@nestjs/config";
|
||||||
|
import { createDecipheriv, hkdfSync } from "node:crypto";
|
||||||
|
|
||||||
|
const ALGORITHM = "aes-256-gcm";
|
||||||
|
const ENCRYPTED_PREFIX = "enc:";
|
||||||
|
const IV_LENGTH = 12;
|
||||||
|
const AUTH_TAG_LENGTH = 16;
|
||||||
|
const DERIVED_KEY_LENGTH = 32;
|
||||||
|
const HKDF_SALT = "mosaic.crypto.v1";
|
||||||
|
const HKDF_INFO = "mosaic-db-secret-encryption";
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class EncryptionService {
|
||||||
|
private key: Buffer | null = null;
|
||||||
|
|
||||||
|
constructor(private readonly configService: ConfigService) {}
|
||||||
|
|
||||||
|
decryptIfNeeded(value: string): string {
|
||||||
|
if (!this.isEncrypted(value)) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.decrypt(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
decrypt(encrypted: string): string {
|
||||||
|
if (!this.isEncrypted(encrypted)) {
|
||||||
|
throw new Error("Value is not encrypted");
|
||||||
|
}
|
||||||
|
|
||||||
|
const payloadBase64 = encrypted.slice(ENCRYPTED_PREFIX.length);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const payload = Buffer.from(payloadBase64, "base64");
|
||||||
|
if (payload.length < IV_LENGTH + AUTH_TAG_LENGTH) {
|
||||||
|
throw new Error("Encrypted payload is too short");
|
||||||
|
}
|
||||||
|
|
||||||
|
const iv = payload.subarray(0, IV_LENGTH);
|
||||||
|
const authTag = payload.subarray(payload.length - AUTH_TAG_LENGTH);
|
||||||
|
const ciphertext = payload.subarray(IV_LENGTH, payload.length - AUTH_TAG_LENGTH);
|
||||||
|
|
||||||
|
const decipher = createDecipheriv(ALGORITHM, this.getOrCreateKey(), iv);
|
||||||
|
decipher.setAuthTag(authTag);
|
||||||
|
|
||||||
|
return Buffer.concat([decipher.update(ciphertext), decipher.final()]).toString("utf8");
|
||||||
|
} catch {
|
||||||
|
throw new Error("Failed to decrypt value");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
isEncrypted(value: string): boolean {
|
||||||
|
return value.startsWith(ENCRYPTED_PREFIX);
|
||||||
|
}
|
||||||
|
|
||||||
|
private getOrCreateKey(): Buffer {
|
||||||
|
if (this.key !== null) {
|
||||||
|
return this.key;
|
||||||
|
}
|
||||||
|
|
||||||
|
const secret = this.configService.get<string>("MOSAIC_SECRET_KEY");
|
||||||
|
if (!secret) {
|
||||||
|
throw new Error(
|
||||||
|
"orchestrator: MOSAIC_SECRET_KEY is required. Set it in your config or via MOSAIC_SECRET_KEY."
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (secret.length < 32) {
|
||||||
|
throw new Error("MOSAIC_SECRET_KEY must be at least 32 characters");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.key = Buffer.from(
|
||||||
|
hkdfSync(
|
||||||
|
"sha256",
|
||||||
|
Buffer.from(secret, "utf8"),
|
||||||
|
Buffer.from(HKDF_SALT, "utf8"),
|
||||||
|
Buffer.from(HKDF_INFO, "utf8"),
|
||||||
|
DERIVED_KEY_LENGTH
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
return this.key;
|
||||||
|
}
|
||||||
|
}
|
||||||
3
pnpm-lock.yaml
generated
3
pnpm-lock.yaml
generated
@@ -319,6 +319,9 @@ importers:
|
|||||||
'@mosaic/shared':
|
'@mosaic/shared':
|
||||||
specifier: workspace:*
|
specifier: workspace:*
|
||||||
version: link:../../packages/shared
|
version: link:../../packages/shared
|
||||||
|
'@nestjs/axios':
|
||||||
|
specifier: ^4.0.1
|
||||||
|
version: 4.0.1(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(axios@1.13.5)(rxjs@7.8.2)
|
||||||
'@nestjs/bullmq':
|
'@nestjs/bullmq':
|
||||||
specifier: ^11.0.4
|
specifier: ^11.0.4
|
||||||
version: 11.0.4(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(bullmq@5.67.2)
|
version: 11.0.4(@nestjs/common@11.1.12(class-transformer@0.5.1)(class-validator@0.14.3)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@11.1.12)(bullmq@5.67.2)
|
||||||
|
|||||||
Reference in New Issue
Block a user