import type { HttpService } from "@nestjs/axios"; import type { AgentMessage } from "@mosaic/shared"; import { Readable } from "node:stream"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { OpenClawSseBridge } from "./openclaw-sse.bridge"; describe("OpenClawSseBridge", () => { let bridge: OpenClawSseBridge; let httpService: { axiosRef: { get: ReturnType; }; }; beforeEach(() => { httpService = { axiosRef: { get: vi.fn(), }, }; bridge = new OpenClawSseBridge(httpService as unknown as HttpService); }); afterEach(() => { vi.useRealTimers(); }); it("maps message and status events, and skips heartbeats", async () => { httpService.axiosRef.get.mockResolvedValue({ data: Readable.from([ 'event: message\ndata: {"id":"msg-1","role":"assistant","content":"hello","timestamp":"2026-03-07T16:00:00.000Z"}\n\n', "event: heartbeat\ndata: {}\n\n", 'event: status\ndata: {"status":"paused","timestamp":"2026-03-07T16:00:01.000Z"}\n\n', "data: [DONE]\n\n", ]), }); const messages = await collectMessages( bridge.streamSession("https://gateway.example.com/", "session-1", { Authorization: "Bearer test-token", }) ); expect(httpService.axiosRef.get).toHaveBeenCalledWith( "https://gateway.example.com/api/sessions/session-1/stream", { headers: { Authorization: "Bearer test-token", Accept: "text/event-stream", }, responseType: "stream", } ); expect(messages).toHaveLength(2); expect(messages[0]).toEqual({ id: "msg-1", sessionId: "session-1", role: "assistant", content: "hello", timestamp: new Date("2026-03-07T16:00:00.000Z"), }); expect(messages[1]).toEqual({ id: expect.any(String), sessionId: "session-1", role: "system", content: "Session status changed to paused", timestamp: new Date("2026-03-07T16:00:01.000Z"), metadata: { status: "paused", timestamp: "2026-03-07T16:00:01.000Z", }, }); }); it("retries after disconnect and resumes streaming", async () => { vi.useFakeTimers(); httpService.axiosRef.get .mockResolvedValueOnce({ data: Readable.from([ 'event: message\ndata: {"id":"msg-1","content":"first","timestamp":"2026-03-07T16:10:00.000Z"}\n\n', ]), }) .mockResolvedValueOnce({ data: Readable.from(["data: [DONE]\n\n"]), }); const consumePromise = collectMessages( bridge.streamSession("https://gateway.example.com", "session-1", { Authorization: "Bearer test-token", }) ); await vi.advanceTimersByTimeAsync(2000); const messages = await consumePromise; expect(httpService.axiosRef.get).toHaveBeenCalledTimes(2); expect(messages).toEqual([ { id: "msg-1", sessionId: "session-1", role: "user", content: "first", timestamp: new Date("2026-03-07T16:10:00.000Z"), }, ]); }); it("throws after exhausting reconnect retries", async () => { vi.useFakeTimers(); httpService.axiosRef.get.mockRejectedValue(new Error("socket closed")); const consumePromise = collectMessages( bridge.streamSession("https://gateway.example.com", "session-1", { Authorization: "Bearer test-token", }) ); const rejection = expect(consumePromise).rejects.toThrow( "Failed to reconnect OpenClaw stream for session session-1 after 5 retries: socket closed" ); for (let attempt = 0; attempt < 5; attempt += 1) { await vi.advanceTimersByTimeAsync(2000); } await rejection; expect(httpService.axiosRef.get).toHaveBeenCalledTimes(6); }); }); async function collectMessages(stream: AsyncIterable): Promise { const messages: AgentMessage[] = []; for await (const message of stream) { messages.push(message); } return messages; }