import type { HttpService } from "@nestjs/axios"; import { ServiceUnavailableException } from "@nestjs/common"; import type { AgentMessage } from "@mosaic/shared"; import type { AgentProviderConfig } from "@prisma/client"; import { Readable } from "node:stream"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { EncryptionService } from "../../../security/encryption.service"; import { OpenClawSseBridge } from "./openclaw-sse.bridge"; import { OpenClawProvider } from "./openclaw.provider"; describe("Phase 3 gate: OpenClaw provider config registered in DB → provider loaded on boot → sessions returned from /api/mission-control/sessions → inject/pause/kill proxied to gateway", () => { let provider: OpenClawProvider; let httpService: { axiosRef: { get: ReturnType; post: ReturnType; }; }; let encryptionService: { decryptIfNeeded: ReturnType; }; const config: AgentProviderConfig = { id: "cfg-openclaw-1", workspaceId: "workspace-1", name: "openclaw-home", provider: "openclaw", gatewayUrl: "https://gateway.example.com", credentials: { apiToken: "enc:token", }, isActive: true, createdAt: new Date("2026-03-07T15:00:00.000Z"), updatedAt: new Date("2026-03-07T15:00:00.000Z"), }; beforeEach(() => { httpService = { axiosRef: { get: vi.fn(), post: vi.fn(), }, }; encryptionService = { decryptIfNeeded: vi.fn().mockReturnValue("plain-token"), }; provider = new OpenClawProvider( config, encryptionService as unknown as EncryptionService, httpService as unknown as HttpService, new OpenClawSseBridge(httpService as unknown as HttpService) ); }); afterEach(() => { vi.useRealTimers(); }); it("maps listSessions from mocked OpenClaw gateway HTTP responses", async () => { httpService.axiosRef.get.mockResolvedValue({ data: { sessions: [ { id: "session-1", status: "running", createdAt: "2026-03-07T15:01:00.000Z", updatedAt: "2026-03-07T15:02:00.000Z", }, ], total: 1, }, }); await expect(provider.listSessions()).resolves.toEqual({ sessions: [ { id: "session-1", providerId: "openclaw-home", providerType: "openclaw", status: "active", createdAt: new Date("2026-03-07T15:01:00.000Z"), updatedAt: new Date("2026-03-07T15:02:00.000Z"), }, ], total: 1, }); expect(httpService.axiosRef.get).toHaveBeenCalledWith( "https://gateway.example.com/api/sessions", { headers: { Authorization: "Bearer plain-token", }, params: { limit: 50, }, } ); }); it("maps streamMessages from mock SSE events into AgentMessage output", async () => { httpService.axiosRef.get.mockResolvedValue({ data: Readable.from([ 'event: message\ndata: {"id":"msg-1","role":"assistant","content":"hello from stream","timestamp":"2026-03-07T15:03:00.000Z"}\n\n', 'event: status\ndata: {"status":"paused","timestamp":"2026-03-07T15:04:00.000Z"}\n\n', "data: [DONE]\n\n", ]), }); const messages = await collectMessages(provider.streamMessages("session-1")); expect(messages).toEqual([ { id: "msg-1", sessionId: "session-1", role: "assistant", content: "hello from stream", timestamp: new Date("2026-03-07T15:03:00.000Z"), }, { id: expect.any(String), sessionId: "session-1", role: "system", content: "Session status changed to paused", timestamp: new Date("2026-03-07T15:04:00.000Z"), metadata: { status: "paused", timestamp: "2026-03-07T15:04:00.000Z", }, }, ]); }); it("handles unavailable gateway errors", async () => { httpService.axiosRef.get.mockRejectedValue(new Error("gateway unavailable")); await expect(provider.listSessions()).rejects.toBeInstanceOf(ServiceUnavailableException); await expect(provider.listSessions()).rejects.toThrow("gateway unavailable"); }); it("handles bad token decryption errors", async () => { encryptionService.decryptIfNeeded.mockImplementation(() => { throw new Error("bad token"); }); await expect(provider.listSessions()).rejects.toBeInstanceOf(ServiceUnavailableException); await expect(provider.listSessions()).rejects.toThrow("Failed to decrypt API token"); }); it("handles malformed SSE stream responses", async () => { vi.useFakeTimers(); httpService.axiosRef.get.mockResolvedValue({ data: { malformed: true, }, }); const streamPromise = collectMessages(provider.streamMessages("session-malformed")); const rejection = expect(streamPromise).rejects.toThrow( "OpenClaw provider openclaw-home failed to stream messages for session session-malformed" ); for (let attempt = 0; attempt < 5; attempt += 1) { await vi.advanceTimersByTimeAsync(2000); } await rejection; expect(httpService.axiosRef.get).toHaveBeenCalledTimes(6); }); }); async function collectMessages(stream: AsyncIterable): Promise { const messages: AgentMessage[] = []; for await (const message of stream) { messages.push(message); } return messages; }