import { HttpService } from "@nestjs/axios"; import { Injectable } from "@nestjs/common"; import type { AgentMessage, AgentMessageRole } from "@mosaic/shared"; import { randomUUID } from "node:crypto"; const STREAM_RETRY_DELAY_MS = 2000; const STREAM_MAX_RETRIES = 5; type JsonRecord = Record; type AsyncChunkStream = AsyncIterable; type ParsedStreamEvent = | { type: "message"; message: AgentMessage; } | { type: "done"; }; @Injectable() export class OpenClawSseBridge { constructor(private readonly httpService: HttpService) {} async *streamSession( baseUrl: string, sessionId: string, headers: Record ): AsyncIterable { let retryCount = 0; let lastError: unknown = new Error("OpenClaw stream disconnected"); while (retryCount <= STREAM_MAX_RETRIES) { try { const response = await this.httpService.axiosRef.get( this.buildStreamUrl(baseUrl, sessionId), { headers: { ...headers, Accept: "text/event-stream", }, responseType: "stream", } ); const stream = this.asAsyncChunkStream(response.data); if (stream === null) { throw new Error("OpenClaw stream response is not readable"); } retryCount = 0; let streamCompleted = false; for await (const event of this.parseStream(stream, sessionId)) { if (event.type === "done") { streamCompleted = true; break; } yield event.message; } if (streamCompleted) { return; } lastError = new Error("OpenClaw stream disconnected"); } catch (error) { lastError = error; } if (retryCount >= STREAM_MAX_RETRIES) { throw new Error( `Failed to reconnect OpenClaw stream for session ${sessionId} after ${String(STREAM_MAX_RETRIES)} retries: ${this.toErrorMessage(lastError)}` ); } retryCount += 1; await this.delay(STREAM_RETRY_DELAY_MS); } } private async *parseStream( stream: AsyncChunkStream, sessionId: string ): AsyncGenerator { const decoder = new TextDecoder(); let buffer = ""; for await (const chunk of stream) { const textChunk = typeof chunk === "string" ? chunk : decoder.decode(chunk, { stream: true }); buffer += textChunk.replace(/\r\n/gu, "\n"); const rawEvents = buffer.split("\n\n"); buffer = rawEvents.pop() ?? ""; for (const rawEvent of rawEvents) { const parsedEvent = this.parseRawEvent(rawEvent); if (parsedEvent === null) { continue; } if (parsedEvent.data === "[DONE]") { yield { type: "done", }; return; } const payload = this.tryParseJson(parsedEvent.data) ?? parsedEvent.data; const message = this.mapEventToMessage(parsedEvent.type, payload, sessionId); if (message !== null) { yield { type: "message", message, }; } } } buffer += decoder.decode(); const trailingEvent = this.parseRawEvent(buffer.trim()); if (trailingEvent === null) { return; } if (trailingEvent.data === "[DONE]") { yield { type: "done", }; return; } const payload = this.tryParseJson(trailingEvent.data) ?? trailingEvent.data; const message = this.mapEventToMessage(trailingEvent.type, payload, sessionId); if (message !== null) { yield { type: "message", message, }; } } private parseRawEvent(rawEvent: string): { type: string; data: string } | null { if (rawEvent.trim().length === 0) { return null; } let type = "message"; const dataLines: string[] = []; for (const line of rawEvent.split("\n")) { const trimmedLine = line.trimEnd(); if (trimmedLine.length === 0 || trimmedLine.startsWith(":")) { continue; } if (trimmedLine.startsWith("event:")) { type = trimmedLine.slice(6).trim().toLowerCase(); continue; } if (trimmedLine.startsWith("data:")) { dataLines.push(trimmedLine.slice(5).trimStart()); } } if (dataLines.length > 0) { return { type, data: dataLines.join("\n").trim(), }; } const trimmedEvent = rawEvent.trim(); if (trimmedEvent.startsWith("{") || trimmedEvent.startsWith("[")) { return { type, data: trimmedEvent, }; } return null; } private mapEventToMessage( eventType: string, payload: unknown, fallbackSessionId: string ): AgentMessage | null { switch (eventType) { case "heartbeat": return null; case "status": return this.toStatusMessage(payload, fallbackSessionId); case "message": default: return this.toAgentMessage(payload, fallbackSessionId); } } private toStatusMessage(value: unknown, sessionId: string): AgentMessage | null { if (typeof value === "string") { const status = value.trim(); if (status.length === 0) { return null; } return { id: randomUUID(), sessionId, role: "system", content: `Session status changed to ${status}`, timestamp: new Date(), metadata: { status, }, }; } if (!this.isRecord(value)) { return null; } const status = this.readString(value.status); if (!status) { return null; } return { id: randomUUID(), sessionId, role: "system", content: `Session status changed to ${status}`, timestamp: this.parseDate(value.timestamp ?? value.updatedAt), metadata: value, }; } private toAgentMessage(value: unknown, fallbackSessionId: string): AgentMessage | null { if (typeof value === "string") { const content = value.trim(); if (content.length === 0) { return null; } return { id: randomUUID(), sessionId: fallbackSessionId, role: "assistant", content, timestamp: new Date(), }; } let candidate: JsonRecord | null = null; if (this.isRecord(value) && this.isRecord(value.message)) { candidate = value.message; } else if (this.isRecord(value)) { candidate = value; } if (candidate === null) { return null; } const sessionId = this.readString(candidate.sessionId) ?? fallbackSessionId; if (!sessionId) { return null; } const content = this.extractMessageContent( candidate.content ?? candidate.text ?? candidate.message ); if (content.length === 0) { return null; } const metadata = this.toMetadata(candidate.metadata); return { id: this.readString(candidate.id) ?? this.readString(candidate.messageId) ?? randomUUID(), sessionId, role: this.toMessageRole(this.readString(candidate.role) ?? this.readString(candidate.type)), content, timestamp: this.parseDate(candidate.timestamp ?? candidate.createdAt), ...(metadata !== undefined ? { metadata } : {}), }; } private extractMessageContent(content: unknown): string { if (typeof content === "string") { return content.trim(); } if (Array.isArray(content)) { const parts: string[] = []; for (const part of content) { if (typeof part === "string") { const trimmed = part.trim(); if (trimmed.length > 0) { parts.push(trimmed); } continue; } if (!this.isRecord(part)) { continue; } const text = this.readString(part.text) ?? this.readString(part.content); if (text !== undefined && text.trim().length > 0) { parts.push(text.trim()); } } return parts.join("\n\n").trim(); } if (this.isRecord(content)) { const text = this.readString(content.text) ?? this.readString(content.content); return text?.trim() ?? ""; } return ""; } private toMessageRole(role?: string): AgentMessageRole { switch (role?.toLowerCase()) { case "assistant": case "agent": return "assistant"; case "system": return "system"; case "tool": return "tool"; case "operator": case "user": default: return "user"; } } private parseDate(value: unknown, fallback = new Date()): Date { if (value instanceof Date) { return value; } if (typeof value === "string" || typeof value === "number") { const parsed = new Date(value); if (!Number.isNaN(parsed.getTime())) { return parsed; } } return fallback; } private toMetadata(value: unknown): Record | undefined { if (this.isRecord(value)) { return value; } return undefined; } private buildStreamUrl(baseUrl: string, sessionId: string): string { const normalizedBaseUrl = baseUrl.replace(/\/$/u, ""); return new URL( `/api/sessions/${encodeURIComponent(sessionId)}/stream`, `${normalizedBaseUrl}/` ).toString(); } private tryParseJson(value: string): unknown { try { return JSON.parse(value) as unknown; } catch { return null; } } private asAsyncChunkStream(value: unknown): AsyncChunkStream | null { if (value !== null && typeof value === "object" && Symbol.asyncIterator in value) { return value as AsyncChunkStream; } return null; } private isRecord(value: unknown): value is JsonRecord { return typeof value === "object" && value !== null && !Array.isArray(value); } private readString(value: unknown): string | undefined { if (typeof value !== "string") { return undefined; } const trimmed = value.trim(); return trimmed.length > 0 ? trimmed : undefined; } private async delay(ms: number): Promise { await new Promise((resolve) => { setTimeout(resolve, ms); }); } private toErrorMessage(error: unknown): string { if (error instanceof Error) { return error.message; } return String(error); } }