From 495d78115edd00d3bffbff03eb98133a086319ea Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sat, 7 Mar 2026 16:20:27 -0600 Subject: [PATCH] feat(orchestrator): add OpenClaw SSE bridge streaming --- .../openclaw/openclaw-sse.bridge.spec.ts | 145 ++++++ .../providers/openclaw/openclaw-sse.bridge.ts | 420 ++++++++++++++++++ .../openclaw/openclaw.provider-factory.ts | 11 +- .../openclaw/openclaw.provider.spec.ts | 55 +-- .../providers/openclaw/openclaw.provider.ts | 98 +--- .../src/api/providers/providers.module.ts | 3 +- 6 files changed, 608 insertions(+), 124 deletions(-) create mode 100644 apps/orchestrator/src/api/providers/openclaw/openclaw-sse.bridge.spec.ts create mode 100644 apps/orchestrator/src/api/providers/openclaw/openclaw-sse.bridge.ts diff --git a/apps/orchestrator/src/api/providers/openclaw/openclaw-sse.bridge.spec.ts b/apps/orchestrator/src/api/providers/openclaw/openclaw-sse.bridge.spec.ts new file mode 100644 index 0000000..ac85638 --- /dev/null +++ b/apps/orchestrator/src/api/providers/openclaw/openclaw-sse.bridge.spec.ts @@ -0,0 +1,145 @@ +import type { HttpService } from "@nestjs/axios"; +import type { AgentMessage } from "@mosaic/shared"; +import { Readable } from "node:stream"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { OpenClawSseBridge } from "./openclaw-sse.bridge"; + +describe("OpenClawSseBridge", () => { + let bridge: OpenClawSseBridge; + let httpService: { + axiosRef: { + get: ReturnType; + }; + }; + + beforeEach(() => { + httpService = { + axiosRef: { + get: vi.fn(), + }, + }; + + bridge = new OpenClawSseBridge(httpService as unknown as HttpService); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("maps message and status events, and skips heartbeats", async () => { + httpService.axiosRef.get.mockResolvedValue({ + data: Readable.from([ + 'event: message\ndata: {"id":"msg-1","role":"assistant","content":"hello","timestamp":"2026-03-07T16:00:00.000Z"}\n\n', + "event: heartbeat\ndata: {}\n\n", + 'event: status\ndata: {"status":"paused","timestamp":"2026-03-07T16:00:01.000Z"}\n\n', + "data: [DONE]\n\n", + ]), + }); + + const messages = await collectMessages( + bridge.streamSession("https://gateway.example.com/", "session-1", { + Authorization: "Bearer test-token", + }) + ); + + expect(httpService.axiosRef.get).toHaveBeenCalledWith( + "https://gateway.example.com/api/sessions/session-1/stream", + { + headers: { + Authorization: "Bearer test-token", + Accept: "text/event-stream", + }, + responseType: "stream", + } + ); + + expect(messages).toHaveLength(2); + expect(messages[0]).toEqual({ + id: "msg-1", + sessionId: "session-1", + role: "assistant", + content: "hello", + timestamp: new Date("2026-03-07T16:00:00.000Z"), + }); + + expect(messages[1]).toEqual({ + id: expect.any(String), + sessionId: "session-1", + role: "system", + content: "Session status changed to paused", + timestamp: new Date("2026-03-07T16:00:01.000Z"), + metadata: { + status: "paused", + timestamp: "2026-03-07T16:00:01.000Z", + }, + }); + }); + + it("retries after disconnect and resumes streaming", async () => { + vi.useFakeTimers(); + + httpService.axiosRef.get + .mockResolvedValueOnce({ + data: Readable.from([ + 'event: message\ndata: {"id":"msg-1","content":"first","timestamp":"2026-03-07T16:10:00.000Z"}\n\n', + ]), + }) + .mockResolvedValueOnce({ + data: Readable.from(["data: [DONE]\n\n"]), + }); + + const consumePromise = collectMessages( + bridge.streamSession("https://gateway.example.com", "session-1", { + Authorization: "Bearer test-token", + }) + ); + + await vi.advanceTimersByTimeAsync(2000); + + const messages = await consumePromise; + + expect(httpService.axiosRef.get).toHaveBeenCalledTimes(2); + expect(messages).toEqual([ + { + id: "msg-1", + sessionId: "session-1", + role: "user", + content: "first", + timestamp: new Date("2026-03-07T16:10:00.000Z"), + }, + ]); + }); + + it("throws after exhausting reconnect retries", async () => { + vi.useFakeTimers(); + + httpService.axiosRef.get.mockRejectedValue(new Error("socket closed")); + + const consumePromise = collectMessages( + bridge.streamSession("https://gateway.example.com", "session-1", { + Authorization: "Bearer test-token", + }) + ); + + const rejection = expect(consumePromise).rejects.toThrow( + "Failed to reconnect OpenClaw stream for session session-1 after 5 retries: socket closed" + ); + + for (let attempt = 0; attempt < 5; attempt += 1) { + await vi.advanceTimersByTimeAsync(2000); + } + + await rejection; + expect(httpService.axiosRef.get).toHaveBeenCalledTimes(6); + }); +}); + +async function collectMessages(stream: AsyncIterable): Promise { + const messages: AgentMessage[] = []; + + for await (const message of stream) { + messages.push(message); + } + + return messages; +} diff --git a/apps/orchestrator/src/api/providers/openclaw/openclaw-sse.bridge.ts b/apps/orchestrator/src/api/providers/openclaw/openclaw-sse.bridge.ts new file mode 100644 index 0000000..657c84e --- /dev/null +++ b/apps/orchestrator/src/api/providers/openclaw/openclaw-sse.bridge.ts @@ -0,0 +1,420 @@ +import { HttpService } from "@nestjs/axios"; +import { Injectable } from "@nestjs/common"; +import type { AgentMessage, AgentMessageRole } from "@mosaic/shared"; +import { randomUUID } from "node:crypto"; + +const STREAM_RETRY_DELAY_MS = 2000; +const STREAM_MAX_RETRIES = 5; + +type JsonRecord = Record; +type AsyncChunkStream = AsyncIterable; + +type ParsedStreamEvent = + | { + type: "message"; + message: AgentMessage; + } + | { + type: "done"; + }; + +@Injectable() +export class OpenClawSseBridge { + constructor(private readonly httpService: HttpService) {} + + async *streamSession( + baseUrl: string, + sessionId: string, + headers: Record + ): AsyncIterable { + let retryCount = 0; + let lastError: unknown = new Error("OpenClaw stream disconnected"); + + while (retryCount <= STREAM_MAX_RETRIES) { + try { + const response = await this.httpService.axiosRef.get( + this.buildStreamUrl(baseUrl, sessionId), + { + headers: { + ...headers, + Accept: "text/event-stream", + }, + responseType: "stream", + } + ); + + const stream = this.asAsyncChunkStream(response.data); + if (stream === null) { + throw new Error("OpenClaw stream response is not readable"); + } + + retryCount = 0; + let streamCompleted = false; + + for await (const event of this.parseStream(stream, sessionId)) { + if (event.type === "done") { + streamCompleted = true; + break; + } + + yield event.message; + } + + if (streamCompleted) { + return; + } + + lastError = new Error("OpenClaw stream disconnected"); + } catch (error) { + lastError = error; + } + + if (retryCount >= STREAM_MAX_RETRIES) { + throw new Error( + `Failed to reconnect OpenClaw stream for session ${sessionId} after ${String(STREAM_MAX_RETRIES)} retries: ${this.toErrorMessage(lastError)}` + ); + } + + retryCount += 1; + await this.delay(STREAM_RETRY_DELAY_MS); + } + } + + private async *parseStream( + stream: AsyncChunkStream, + sessionId: string + ): AsyncGenerator { + const decoder = new TextDecoder(); + let buffer = ""; + + for await (const chunk of stream) { + const textChunk = typeof chunk === "string" ? chunk : decoder.decode(chunk, { stream: true }); + buffer += textChunk.replace(/\r\n/gu, "\n"); + + const rawEvents = buffer.split("\n\n"); + buffer = rawEvents.pop() ?? ""; + + for (const rawEvent of rawEvents) { + const parsedEvent = this.parseRawEvent(rawEvent); + if (parsedEvent === null) { + continue; + } + + if (parsedEvent.data === "[DONE]") { + yield { + type: "done", + }; + return; + } + + const payload = this.tryParseJson(parsedEvent.data) ?? parsedEvent.data; + const message = this.mapEventToMessage(parsedEvent.type, payload, sessionId); + if (message !== null) { + yield { + type: "message", + message, + }; + } + } + } + + buffer += decoder.decode(); + + const trailingEvent = this.parseRawEvent(buffer.trim()); + if (trailingEvent === null) { + return; + } + + if (trailingEvent.data === "[DONE]") { + yield { + type: "done", + }; + return; + } + + const payload = this.tryParseJson(trailingEvent.data) ?? trailingEvent.data; + const message = this.mapEventToMessage(trailingEvent.type, payload, sessionId); + if (message !== null) { + yield { + type: "message", + message, + }; + } + } + + private parseRawEvent(rawEvent: string): { type: string; data: string } | null { + if (rawEvent.trim().length === 0) { + return null; + } + + let type = "message"; + const dataLines: string[] = []; + + for (const line of rawEvent.split("\n")) { + const trimmedLine = line.trimEnd(); + if (trimmedLine.length === 0 || trimmedLine.startsWith(":")) { + continue; + } + + if (trimmedLine.startsWith("event:")) { + type = trimmedLine.slice(6).trim().toLowerCase(); + continue; + } + + if (trimmedLine.startsWith("data:")) { + dataLines.push(trimmedLine.slice(5).trimStart()); + } + } + + if (dataLines.length > 0) { + return { + type, + data: dataLines.join("\n").trim(), + }; + } + + const trimmedEvent = rawEvent.trim(); + if (trimmedEvent.startsWith("{") || trimmedEvent.startsWith("[")) { + return { + type, + data: trimmedEvent, + }; + } + + return null; + } + + private mapEventToMessage( + eventType: string, + payload: unknown, + fallbackSessionId: string + ): AgentMessage | null { + switch (eventType) { + case "heartbeat": + return null; + case "status": + return this.toStatusMessage(payload, fallbackSessionId); + case "message": + default: + return this.toAgentMessage(payload, fallbackSessionId); + } + } + + private toStatusMessage(value: unknown, sessionId: string): AgentMessage | null { + if (typeof value === "string") { + const status = value.trim(); + if (status.length === 0) { + return null; + } + + return { + id: randomUUID(), + sessionId, + role: "system", + content: `Session status changed to ${status}`, + timestamp: new Date(), + metadata: { + status, + }, + }; + } + + if (!this.isRecord(value)) { + return null; + } + + const status = this.readString(value.status); + if (!status) { + return null; + } + + return { + id: randomUUID(), + sessionId, + role: "system", + content: `Session status changed to ${status}`, + timestamp: this.parseDate(value.timestamp ?? value.updatedAt), + metadata: value, + }; + } + + private toAgentMessage(value: unknown, fallbackSessionId: string): AgentMessage | null { + if (typeof value === "string") { + const content = value.trim(); + if (content.length === 0) { + return null; + } + + return { + id: randomUUID(), + sessionId: fallbackSessionId, + role: "assistant", + content, + timestamp: new Date(), + }; + } + + let candidate: JsonRecord | null = null; + + if (this.isRecord(value) && this.isRecord(value.message)) { + candidate = value.message; + } else if (this.isRecord(value)) { + candidate = value; + } + + if (candidate === null) { + return null; + } + + const sessionId = this.readString(candidate.sessionId) ?? fallbackSessionId; + if (!sessionId) { + return null; + } + + const content = this.extractMessageContent( + candidate.content ?? candidate.text ?? candidate.message + ); + if (content.length === 0) { + return null; + } + + const metadata = this.toMetadata(candidate.metadata); + + return { + id: this.readString(candidate.id) ?? this.readString(candidate.messageId) ?? randomUUID(), + sessionId, + role: this.toMessageRole(this.readString(candidate.role) ?? this.readString(candidate.type)), + content, + timestamp: this.parseDate(candidate.timestamp ?? candidate.createdAt), + ...(metadata !== undefined ? { metadata } : {}), + }; + } + + private extractMessageContent(content: unknown): string { + if (typeof content === "string") { + return content.trim(); + } + + if (Array.isArray(content)) { + const parts: string[] = []; + + for (const part of content) { + if (typeof part === "string") { + const trimmed = part.trim(); + if (trimmed.length > 0) { + parts.push(trimmed); + } + continue; + } + + if (!this.isRecord(part)) { + continue; + } + + const text = this.readString(part.text) ?? this.readString(part.content); + if (text !== undefined && text.trim().length > 0) { + parts.push(text.trim()); + } + } + + return parts.join("\n\n").trim(); + } + + if (this.isRecord(content)) { + const text = this.readString(content.text) ?? this.readString(content.content); + return text?.trim() ?? ""; + } + + return ""; + } + + private toMessageRole(role?: string): AgentMessageRole { + switch (role?.toLowerCase()) { + case "assistant": + case "agent": + return "assistant"; + case "system": + return "system"; + case "tool": + return "tool"; + case "operator": + case "user": + default: + return "user"; + } + } + + private parseDate(value: unknown, fallback = new Date()): Date { + if (value instanceof Date) { + return value; + } + + if (typeof value === "string" || typeof value === "number") { + const parsed = new Date(value); + if (!Number.isNaN(parsed.getTime())) { + return parsed; + } + } + + return fallback; + } + + private toMetadata(value: unknown): Record | undefined { + if (this.isRecord(value)) { + return value; + } + + return undefined; + } + + private buildStreamUrl(baseUrl: string, sessionId: string): string { + const normalizedBaseUrl = baseUrl.replace(/\/$/u, ""); + return new URL( + `/api/sessions/${encodeURIComponent(sessionId)}/stream`, + `${normalizedBaseUrl}/` + ).toString(); + } + + private tryParseJson(value: string): unknown { + try { + return JSON.parse(value) as unknown; + } catch { + return null; + } + } + + private asAsyncChunkStream(value: unknown): AsyncChunkStream | null { + if (value !== null && typeof value === "object" && Symbol.asyncIterator in value) { + return value as AsyncChunkStream; + } + + return null; + } + + private isRecord(value: unknown): value is JsonRecord { + return typeof value === "object" && value !== null && !Array.isArray(value); + } + + private readString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : undefined; + } + + private async delay(ms: number): Promise { + await new Promise((resolve) => { + setTimeout(resolve, ms); + }); + } + + private toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + + return String(error); + } +} diff --git a/apps/orchestrator/src/api/providers/openclaw/openclaw.provider-factory.ts b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider-factory.ts index 4af3579..d8369c2 100644 --- a/apps/orchestrator/src/api/providers/openclaw/openclaw.provider-factory.ts +++ b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider-factory.ts @@ -2,16 +2,23 @@ import { HttpService } from "@nestjs/axios"; import { Injectable } from "@nestjs/common"; import type { AgentProviderConfig } from "@prisma/client"; import { EncryptionService } from "../../../security/encryption.service"; +import { OpenClawSseBridge } from "./openclaw-sse.bridge"; import { OpenClawProvider } from "./openclaw.provider"; @Injectable() export class OpenClawProviderFactory { constructor( private readonly encryptionService: EncryptionService, - private readonly httpService: HttpService + private readonly httpService: HttpService, + private readonly openClawSseBridge: OpenClawSseBridge ) {} createProvider(config: AgentProviderConfig): OpenClawProvider { - return new OpenClawProvider(config, this.encryptionService, this.httpService); + return new OpenClawProvider( + config, + this.encryptionService, + this.httpService, + this.openClawSseBridge + ); } } diff --git a/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.spec.ts b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.spec.ts index 7b56758..bc066bf 100644 --- a/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.spec.ts +++ b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.spec.ts @@ -1,9 +1,9 @@ import type { HttpService } from "@nestjs/axios"; import { ServiceUnavailableException } from "@nestjs/common"; import type { AgentProviderConfig } from "@prisma/client"; -import { Readable } from "node:stream"; import { beforeEach, describe, expect, it, vi } from "vitest"; import { EncryptionService } from "../../../security/encryption.service"; +import { OpenClawSseBridge } from "./openclaw-sse.bridge"; import { OpenClawProvider } from "./openclaw.provider"; describe("OpenClawProvider", () => { @@ -17,6 +17,9 @@ describe("OpenClawProvider", () => { let encryptionService: { decryptIfNeeded: ReturnType; }; + let sseBridge: { + streamSession: ReturnType; + }; const config: AgentProviderConfig = { id: "cfg-openclaw-1", @@ -45,10 +48,15 @@ describe("OpenClawProvider", () => { decryptIfNeeded: vi.fn().mockReturnValue("plain-token"), }; + sseBridge = { + streamSession: vi.fn(), + }; + provider = new OpenClawProvider( config, encryptionService as unknown as EncryptionService, - httpService as unknown as HttpService + httpService as unknown as HttpService, + sseBridge as unknown as OpenClawSseBridge ); }); @@ -219,41 +227,34 @@ describe("OpenClawProvider", () => { ); }); - it("parses SSE stream messages", async () => { - const stream = Readable.from([ - 'data: {"id":"message-stream","sessionId":"session-stream","role":"assistant","content":"stream hello","timestamp":"2026-03-07T16:00:00.000Z"}\n\n', - "data: [DONE]\n\n", - ]); + it("delegates streaming to OpenClawSseBridge", async () => { + const streamedMessage = { + id: "message-stream", + sessionId: "session-stream", + role: "assistant", + content: "stream hello", + timestamp: new Date("2026-03-07T16:00:00.000Z"), + }; - httpService.axiosRef.get.mockResolvedValue({ - data: stream, - }); + sseBridge.streamSession.mockReturnValue( + (async function* () { + yield streamedMessage; + })() + ); const messages: Array = []; for await (const message of provider.streamMessages("session-stream")) { messages.push(message); } - expect(httpService.axiosRef.get).toHaveBeenCalledWith( - "https://gateway.example.com/api/sessions/session-stream/stream", + expect(sseBridge.streamSession).toHaveBeenCalledWith( + "https://gateway.example.com", + "session-stream", { - headers: { - Authorization: "Bearer plain-token", - Accept: "text/event-stream", - }, - responseType: "stream", + Authorization: "Bearer plain-token", } ); - - expect(messages).toEqual([ - { - id: "message-stream", - sessionId: "session-stream", - role: "assistant", - content: "stream hello", - timestamp: new Date("2026-03-07T16:00:00.000Z"), - }, - ]); + expect(messages).toEqual([streamedMessage]); }); it("throws ServiceUnavailableException for request failures", async () => { diff --git a/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.ts b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.ts index 1836764..7a33629 100644 --- a/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.ts +++ b/apps/orchestrator/src/api/providers/openclaw/openclaw.provider.ts @@ -12,6 +12,7 @@ import type { import type { AgentProviderConfig } from "@prisma/client"; import { randomUUID } from "node:crypto"; import { EncryptionService } from "../../../security/encryption.service"; +import { OpenClawSseBridge } from "./openclaw-sse.bridge"; const DEFAULT_SESSION_LIMIT = 50; const DEFAULT_MESSAGE_LIMIT = 50; @@ -21,7 +22,6 @@ const API_TOKEN_KEYS = ["apiToken", "token", "bearerToken"] as const; const DISPLAY_NAME_KEYS = ["displayName", "label"] as const; type JsonRecord = Record; -type AsyncChunkStream = AsyncIterable; interface HttpErrorWithResponse { response?: { @@ -38,7 +38,8 @@ export class OpenClawProvider implements IAgentProvider { constructor( private readonly config: AgentProviderConfig, private readonly encryptionService: EncryptionService, - private readonly httpService: HttpService + private readonly httpService: HttpService, + private readonly sseBridge: OpenClawSseBridge ) { this.providerId = this.config.name; this.displayName = this.resolveDisplayName(); @@ -196,64 +197,7 @@ export class OpenClawProvider implements IAgentProvider { async *streamMessages(sessionId: string): AsyncIterable { try { - const response = await this.httpService.axiosRef.get( - this.buildUrl(`/api/sessions/${encodeURIComponent(sessionId)}/stream`), - { - headers: this.authHeaders({ - Accept: "text/event-stream", - }), - responseType: "stream", - } - ); - - const stream = this.asAsyncChunkStream(response.data); - if (!stream) { - throw new Error("OpenClaw stream response is not readable"); - } - - const decoder = new TextDecoder(); - let buffer = ""; - let streamDone = false; - - for await (const chunk of stream) { - const textChunk = - typeof chunk === "string" ? chunk : decoder.decode(chunk, { stream: true }); - buffer += textChunk.replace(/\r\n/gu, "\n"); - - const events = buffer.split("\n\n"); - buffer = events.pop() ?? ""; - - for (const event of events) { - const data = this.extractSseData(event); - if (data === null) { - continue; - } - - if (data === "[DONE]") { - streamDone = true; - break; - } - - const message = this.toAgentMessage(this.tryParseJson(data) ?? data, sessionId); - if (message !== null) { - yield message; - } - } - - if (streamDone) { - break; - } - } - - if (!streamDone && buffer.trim().length > 0) { - const data = this.extractSseData(buffer.trim()); - if (data !== null && data !== "[DONE]") { - const message = this.toAgentMessage(this.tryParseJson(data) ?? data, sessionId); - if (message !== null) { - yield message; - } - } - } + yield* this.sseBridge.streamSession(this.resolveBaseUrl(), sessionId, this.authHeaders()); } catch (error) { throw this.toServiceUnavailable(`stream messages for session ${sessionId}`, error); } @@ -631,40 +575,6 @@ export class OpenClawProvider implements IAgentProvider { return new URL(path, `${this.resolveBaseUrl()}/`).toString(); } - private extractSseData(rawEvent: string): string | null { - const lines = rawEvent.split("\n"); - const dataLines = lines - .filter((line) => line.startsWith("data:")) - .map((line) => line.slice(5).trimStart()); - - if (dataLines.length > 0) { - return dataLines.join("\n").trim(); - } - - const trimmed = rawEvent.trim(); - if (trimmed.startsWith("{") || trimmed.startsWith("[")) { - return trimmed; - } - - return null; - } - - private tryParseJson(value: string): unknown { - try { - return JSON.parse(value) as unknown; - } catch { - return null; - } - } - - private asAsyncChunkStream(value: unknown): AsyncChunkStream | null { - if (value !== null && typeof value === "object" && Symbol.asyncIterator in value) { - return value as AsyncChunkStream; - } - - return null; - } - private isRecord(value: unknown): value is JsonRecord { return typeof value === "object" && value !== null && !Array.isArray(value); } diff --git a/apps/orchestrator/src/api/providers/providers.module.ts b/apps/orchestrator/src/api/providers/providers.module.ts index e87c802..ee7a1b1 100644 --- a/apps/orchestrator/src/api/providers/providers.module.ts +++ b/apps/orchestrator/src/api/providers/providers.module.ts @@ -7,6 +7,7 @@ import { EncryptionService } from "../../security/encryption.service"; import { AgentProviderRegistry } from "../agents/agent-provider.registry"; import { AgentsModule } from "../agents/agents.module"; import { OpenClawProviderFactory } from "./openclaw/openclaw.provider-factory"; +import { OpenClawSseBridge } from "./openclaw/openclaw-sse.bridge"; const OPENCLAW_PROVIDER_TYPE = "openclaw"; @@ -19,7 +20,7 @@ const OPENCLAW_PROVIDER_TYPE = "openclaw"; maxRedirects: 5, }), ], - providers: [EncryptionService, OpenClawProviderFactory], + providers: [EncryptionService, OpenClawSseBridge, OpenClawProviderFactory], }) export class ProvidersModule implements OnModuleInit { private readonly logger = new Logger(ProvidersModule.name);