feat(orchestrator): add OpenClaw SSE bridge streaming
All checks were successful
ci/woodpecker/push/ci Pipeline was successful

This commit is contained in:
2026-03-07 16:20:27 -06:00
parent 563d59ad5d
commit 495d78115e
6 changed files with 608 additions and 124 deletions

View File

@@ -1,9 +1,9 @@
import type { HttpService } from "@nestjs/axios";
import { ServiceUnavailableException } from "@nestjs/common";
import type { AgentProviderConfig } from "@prisma/client";
import { Readable } from "node:stream";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { EncryptionService } from "../../../security/encryption.service";
import { OpenClawSseBridge } from "./openclaw-sse.bridge";
import { OpenClawProvider } from "./openclaw.provider";
describe("OpenClawProvider", () => {
@@ -17,6 +17,9 @@ describe("OpenClawProvider", () => {
let encryptionService: {
decryptIfNeeded: ReturnType<typeof vi.fn>;
};
let sseBridge: {
streamSession: ReturnType<typeof vi.fn>;
};
const config: AgentProviderConfig = {
id: "cfg-openclaw-1",
@@ -45,10 +48,15 @@ describe("OpenClawProvider", () => {
decryptIfNeeded: vi.fn().mockReturnValue("plain-token"),
};
sseBridge = {
streamSession: vi.fn(),
};
provider = new OpenClawProvider(
config,
encryptionService as unknown as EncryptionService,
httpService as unknown as HttpService
httpService as unknown as HttpService,
sseBridge as unknown as OpenClawSseBridge
);
});
@@ -219,41 +227,34 @@ describe("OpenClawProvider", () => {
);
});
it("parses SSE stream messages", async () => {
const stream = Readable.from([
'data: {"id":"message-stream","sessionId":"session-stream","role":"assistant","content":"stream hello","timestamp":"2026-03-07T16:00:00.000Z"}\n\n',
"data: [DONE]\n\n",
]);
it("delegates streaming to OpenClawSseBridge", async () => {
const streamedMessage = {
id: "message-stream",
sessionId: "session-stream",
role: "assistant",
content: "stream hello",
timestamp: new Date("2026-03-07T16:00:00.000Z"),
};
httpService.axiosRef.get.mockResolvedValue({
data: stream,
});
sseBridge.streamSession.mockReturnValue(
(async function* () {
yield streamedMessage;
})()
);
const messages: Array<unknown> = [];
for await (const message of provider.streamMessages("session-stream")) {
messages.push(message);
}
expect(httpService.axiosRef.get).toHaveBeenCalledWith(
"https://gateway.example.com/api/sessions/session-stream/stream",
expect(sseBridge.streamSession).toHaveBeenCalledWith(
"https://gateway.example.com",
"session-stream",
{
headers: {
Authorization: "Bearer plain-token",
Accept: "text/event-stream",
},
responseType: "stream",
Authorization: "Bearer plain-token",
}
);
expect(messages).toEqual([
{
id: "message-stream",
sessionId: "session-stream",
role: "assistant",
content: "stream hello",
timestamp: new Date("2026-03-07T16:00:00.000Z"),
},
]);
expect(messages).toEqual([streamedMessage]);
});
it("throws ServiceUnavailableException for request failures", async () => {