Files
stack/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.ts
Jason Woltje 495d78115e
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
feat(orchestrator): add OpenClaw SSE bridge streaming
2026-03-07 16:20:27 -06:00

614 lines
16 KiB
TypeScript

import { HttpService } from "@nestjs/axios";
import { Injectable, ServiceUnavailableException } from "@nestjs/common";
import type {
AgentMessage,
AgentMessageRole,
AgentSession,
AgentSessionList,
AgentSessionStatus,
IAgentProvider,
InjectResult,
} from "@mosaic/shared";
import type { AgentProviderConfig } from "@prisma/client";
import { randomUUID } from "node:crypto";
import { EncryptionService } from "../../../security/encryption.service";
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
const DEFAULT_SESSION_LIMIT = 50;
const DEFAULT_MESSAGE_LIMIT = 50;
const MAX_MESSAGE_LIMIT = 200;
const OPENCLAW_PROVIDER_TYPE = "openclaw";
const API_TOKEN_KEYS = ["apiToken", "token", "bearerToken"] as const;
const DISPLAY_NAME_KEYS = ["displayName", "label"] as const;
type JsonRecord = Record<string, unknown>;
interface HttpErrorWithResponse {
response?: {
status?: number;
};
}
@Injectable()
export class OpenClawProvider implements IAgentProvider {
readonly providerId: string;
readonly providerType = OPENCLAW_PROVIDER_TYPE;
readonly displayName: string;
constructor(
private readonly config: AgentProviderConfig,
private readonly encryptionService: EncryptionService,
private readonly httpService: HttpService,
private readonly sseBridge: OpenClawSseBridge
) {
this.providerId = this.config.name;
this.displayName = this.resolveDisplayName();
}
validateBaseUrl(): void {
void this.resolveBaseUrl();
}
validateToken(): void {
void this.resolveApiToken();
}
async listSessions(cursor?: string, limit = DEFAULT_SESSION_LIMIT): Promise<AgentSessionList> {
const safeLimit = this.normalizeLimit(limit, DEFAULT_SESSION_LIMIT);
const params: Record<string, number | string> = { limit: safeLimit };
if (typeof cursor === "string" && cursor.length > 0) {
params.cursor = cursor;
}
try {
const response = await this.httpService.axiosRef.get(this.buildUrl("/api/sessions"), {
headers: this.authHeaders(),
params,
});
const page = this.extractSessionPage(response.data);
const sessions = page.records
.map((record) => this.toAgentSession(record))
.filter((session): session is AgentSession => session !== null);
return {
sessions,
total: page.total ?? sessions.length,
...(page.cursor !== undefined ? { cursor: page.cursor } : {}),
};
} catch (error) {
throw this.toServiceUnavailable("list sessions", error);
}
}
async getSession(sessionId: string): Promise<AgentSession | null> {
try {
const response = await this.httpService.axiosRef.get(
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}`),
{
headers: this.authHeaders(),
}
);
const payload = this.unwrapContainer(response.data, ["session", "data"]);
return this.toAgentSession(payload);
} catch (error) {
if (this.getHttpStatus(error) === 404) {
return null;
}
throw this.toServiceUnavailable(`get session ${sessionId}`, error);
}
}
async getMessages(
sessionId: string,
limit = DEFAULT_MESSAGE_LIMIT,
before?: string
): Promise<AgentMessage[]> {
const safeLimit = this.normalizeLimit(limit, DEFAULT_MESSAGE_LIMIT);
const params: Record<string, number | string> = {
sessionId,
limit: safeLimit,
};
if (typeof before === "string" && before.length > 0) {
params.before = before;
}
try {
const response = await this.httpService.axiosRef.get(this.buildUrl("/api/messages"), {
headers: this.authHeaders(),
params,
});
return this.extractMessageRecords(response.data)
.map((record) => this.toAgentMessage(record, sessionId))
.filter((message): message is AgentMessage => message !== null);
} catch (error) {
throw this.toServiceUnavailable(`get messages for session ${sessionId}`, error);
}
}
async injectMessage(sessionId: string, content: string): Promise<InjectResult> {
try {
const response = await this.httpService.axiosRef.post(
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/inject`),
{ content },
{
headers: this.authHeaders(),
}
);
const payload = this.isRecord(response.data) ? response.data : {};
return {
accepted: typeof payload.accepted === "boolean" ? payload.accepted : true,
...(this.readString(payload.messageId) !== undefined
? { messageId: this.readString(payload.messageId) }
: {}),
};
} catch (error) {
throw this.toServiceUnavailable(`inject message into session ${sessionId}`, error);
}
}
async pauseSession(sessionId: string): Promise<void> {
try {
await this.httpService.axiosRef.post(
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/pause`),
{},
{
headers: this.authHeaders(),
}
);
} catch (error) {
throw this.toServiceUnavailable(`pause session ${sessionId}`, error);
}
}
async resumeSession(sessionId: string): Promise<void> {
try {
await this.httpService.axiosRef.post(
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/resume`),
{},
{
headers: this.authHeaders(),
}
);
} catch (error) {
throw this.toServiceUnavailable(`resume session ${sessionId}`, error);
}
}
async killSession(sessionId: string, force = true): Promise<void> {
try {
await this.httpService.axiosRef.post(
this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/kill`),
{ force },
{
headers: this.authHeaders(),
}
);
} catch (error) {
throw this.toServiceUnavailable(`kill session ${sessionId}`, error);
}
}
async *streamMessages(sessionId: string): AsyncIterable<AgentMessage> {
try {
yield* this.sseBridge.streamSession(this.resolveBaseUrl(), sessionId, this.authHeaders());
} catch (error) {
throw this.toServiceUnavailable(`stream messages for session ${sessionId}`, error);
}
}
async isAvailable(): Promise<boolean> {
try {
this.validateBaseUrl();
this.validateToken();
await this.httpService.axiosRef.get(this.buildUrl("/api/sessions"), {
headers: this.authHeaders(),
params: { limit: 1 },
});
return true;
} catch {
return false;
}
}
private extractSessionPage(payload: unknown): {
records: unknown[];
total?: number;
cursor?: string;
} {
if (Array.isArray(payload)) {
return {
records: payload,
total: payload.length,
};
}
if (!this.isRecord(payload)) {
return {
records: [],
};
}
let records: unknown[] = [];
if (Array.isArray(payload.sessions)) {
records = payload.sessions;
} else if (Array.isArray(payload.items)) {
records = payload.items;
} else if (Array.isArray(payload.data)) {
records = payload.data;
}
const total = typeof payload.total === "number" ? payload.total : undefined;
const cursor = this.readString(payload.cursor) ?? this.readString(payload.nextCursor);
return {
records,
total,
...(cursor !== undefined ? { cursor } : {}),
};
}
private extractMessageRecords(payload: unknown): unknown[] {
if (Array.isArray(payload)) {
return payload;
}
if (!this.isRecord(payload)) {
return [];
}
if (Array.isArray(payload.messages)) {
return payload.messages;
}
if (Array.isArray(payload.items)) {
return payload.items;
}
if (Array.isArray(payload.data)) {
return payload.data;
}
return [];
}
private unwrapContainer(payload: unknown, keys: string[]): unknown {
if (!this.isRecord(payload)) {
return payload;
}
for (const key of keys) {
if (key in payload) {
return payload[key];
}
}
return payload;
}
private toAgentSession(record: unknown): AgentSession | null {
if (!this.isRecord(record)) {
return null;
}
const id =
this.readString(record.id) ??
this.readString(record.sessionId) ??
this.readString(record.key);
if (!id) {
return null;
}
const createdAt = this.parseDate(record.createdAt ?? record.spawnedAt ?? record.startedAt);
const updatedAt = this.parseDate(
record.updatedAt ?? record.completedAt ?? record.lastActivityAt ?? record.endedAt,
createdAt
);
const label =
this.readString(record.label) ??
this.readString(record.title) ??
this.readString(record.name) ??
undefined;
const parentSessionId = this.readString(record.parentSessionId) ?? undefined;
const metadata = this.toMetadata(record.metadata);
return {
id,
providerId: this.providerId,
providerType: this.providerType,
...(label !== undefined ? { label } : {}),
status: this.toSessionStatus(this.readString(record.status)),
...(parentSessionId !== undefined ? { parentSessionId } : {}),
createdAt,
updatedAt,
...(metadata !== undefined ? { metadata } : {}),
};
}
private toAgentMessage(value: unknown, fallbackSessionId?: string): AgentMessage | null {
if (typeof value === "string") {
const content = value.trim();
if (content.length === 0 || fallbackSessionId === undefined) {
return null;
}
return {
id: randomUUID(),
sessionId: fallbackSessionId,
role: "assistant",
content,
timestamp: new Date(),
};
}
let candidate: JsonRecord | null = null;
if (this.isRecord(value) && this.isRecord(value.message)) {
candidate = value.message;
} else if (this.isRecord(value)) {
candidate = value;
}
if (candidate === null) {
return null;
}
const sessionId = this.readString(candidate.sessionId) ?? fallbackSessionId;
if (!sessionId) {
return null;
}
const content = this.extractMessageContent(
candidate.content ?? candidate.text ?? candidate.message
);
if (content.length === 0) {
return null;
}
const metadata = this.toMetadata(candidate.metadata);
return {
id: this.readString(candidate.id) ?? this.readString(candidate.messageId) ?? randomUUID(),
sessionId,
role: this.toMessageRole(this.readString(candidate.role) ?? this.readString(candidate.type)),
content,
timestamp: this.parseDate(candidate.timestamp ?? candidate.createdAt),
...(metadata !== undefined ? { metadata } : {}),
};
}
private extractMessageContent(content: unknown): string {
if (typeof content === "string") {
return content.trim();
}
if (Array.isArray(content)) {
const parts: string[] = [];
for (const part of content) {
if (typeof part === "string") {
const trimmed = part.trim();
if (trimmed.length > 0) {
parts.push(trimmed);
}
continue;
}
if (!this.isRecord(part)) {
continue;
}
const text = this.readString(part.text) ?? this.readString(part.content);
if (text !== undefined && text.trim().length > 0) {
parts.push(text.trim());
}
}
return parts.join("\n\n").trim();
}
if (this.isRecord(content)) {
const text = this.readString(content.text) ?? this.readString(content.content);
return text?.trim() ?? "";
}
return "";
}
private toSessionStatus(status?: string): AgentSessionStatus {
switch (status?.toLowerCase()) {
case "active":
case "running":
return "active";
case "paused":
return "paused";
case "completed":
case "done":
case "succeeded":
return "completed";
case "failed":
case "error":
case "killed":
case "terminated":
case "cancelled":
return "failed";
case "idle":
case "pending":
case "queued":
default:
return "idle";
}
}
private toMessageRole(role?: string): AgentMessageRole {
switch (role?.toLowerCase()) {
case "assistant":
case "agent":
return "assistant";
case "system":
return "system";
case "tool":
return "tool";
case "operator":
case "user":
default:
return "user";
}
}
private normalizeLimit(value: number, fallback: number): number {
const normalized = Number.isFinite(value) ? Math.trunc(value) : fallback;
if (normalized < 1) {
return 1;
}
return Math.min(normalized, MAX_MESSAGE_LIMIT);
}
private parseDate(value: unknown, fallback = new Date()): Date {
if (value instanceof Date) {
return value;
}
if (typeof value === "string" || typeof value === "number") {
const parsed = new Date(value);
if (!Number.isNaN(parsed.getTime())) {
return parsed;
}
}
return fallback;
}
private toMetadata(value: unknown): Record<string, unknown> | undefined {
if (this.isRecord(value)) {
return value;
}
return undefined;
}
private resolveDisplayName(): string {
const credentials = this.readCredentials();
for (const key of DISPLAY_NAME_KEYS) {
const value = this.readString(credentials[key]);
if (value !== undefined) {
return value;
}
}
return this.config.name;
}
private resolveBaseUrl(): string {
const configRecord = this.config as unknown as JsonRecord;
const rawBaseUrl =
this.readString(this.config.gatewayUrl) ?? this.readString(configRecord.baseUrl);
if (rawBaseUrl === undefined) {
throw new Error(`OpenClaw provider ${this.providerId} is missing gateway URL`);
}
try {
const parsed = new URL(rawBaseUrl);
return parsed.toString().replace(/\/$/u, "");
} catch {
throw new Error(`OpenClaw provider ${this.providerId} has invalid gateway URL`);
}
}
private resolveApiToken(): string {
const configRecord = this.config as unknown as JsonRecord;
const credentials = this.readCredentials();
const rawToken =
this.readString(configRecord.apiToken) ??
this.readString(configRecord.token) ??
this.readString(configRecord.bearerToken) ??
this.findFirstString(credentials, API_TOKEN_KEYS);
if (rawToken === undefined) {
throw new Error(`OpenClaw provider ${this.providerId} is missing apiToken credentials`);
}
try {
return this.encryptionService.decryptIfNeeded(rawToken);
} catch (error) {
throw new Error(`Failed to decrypt API token: ${this.toErrorMessage(error)}`);
}
}
private readCredentials(): JsonRecord {
return this.isRecord(this.config.credentials) ? this.config.credentials : {};
}
private findFirstString(record: JsonRecord, keys: readonly string[]): string | undefined {
for (const key of keys) {
const value = this.readString(record[key]);
if (value !== undefined) {
return value;
}
}
return undefined;
}
private authHeaders(extraHeaders: Record<string, string> = {}): Record<string, string> {
return {
Authorization: `Bearer ${this.resolveApiToken()}`,
...extraHeaders,
};
}
private buildUrl(path: string): string {
return new URL(path, `${this.resolveBaseUrl()}/`).toString();
}
private isRecord(value: unknown): value is JsonRecord {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
private readString(value: unknown): string | undefined {
if (typeof value !== "string") {
return undefined;
}
const trimmed = value.trim();
return trimmed.length > 0 ? trimmed : undefined;
}
private getHttpStatus(error: unknown): number | undefined {
if (typeof error !== "object" || error === null || !("response" in error)) {
return undefined;
}
const response = (error as HttpErrorWithResponse).response;
return typeof response?.status === "number" ? response.status : undefined;
}
private toServiceUnavailable(operation: string, error: unknown): ServiceUnavailableException {
return new ServiceUnavailableException(
`OpenClaw provider ${this.providerId} failed to ${operation}: ${this.toErrorMessage(error)}`
);
}
private toErrorMessage(error: unknown): string {
if (error instanceof Error) {
return error.message;
}
return String(error);
}
}