From c847b74bda274d0e38f25ad75f26bc73fda928cb Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 1 Mar 2026 07:59:39 -0600 Subject: [PATCH] feat(api): add OpenClawGatewayModule with agent registry (MS22-P1b) --- .../migration.sql | 18 ++ apps/api/prisma/schema.prisma | 13 + apps/api/src/app.module.ts | 2 + .../agent-registry.controller.ts | 36 +++ .../agent-registry.service.ts | 65 +++++ .../openclaw-gateway.controller.ts | 40 +++ .../openclaw-gateway/openclaw-gateway.dto.ts | 115 ++++++++ .../openclaw-gateway.module.ts | 21 ++ .../openclaw-gateway.service.spec.ts | 212 ++++++++++++++ .../openclaw-gateway.service.ts | 273 ++++++++++++++++++ 10 files changed, 795 insertions(+) create mode 100644 apps/api/prisma/migrations/20260301075200_add_openclaw_agent_registry/migration.sql create mode 100644 apps/api/src/openclaw-gateway/agent-registry.controller.ts create mode 100644 apps/api/src/openclaw-gateway/agent-registry.service.ts create mode 100644 apps/api/src/openclaw-gateway/openclaw-gateway.controller.ts create mode 100644 apps/api/src/openclaw-gateway/openclaw-gateway.dto.ts create mode 100644 apps/api/src/openclaw-gateway/openclaw-gateway.module.ts create mode 100644 apps/api/src/openclaw-gateway/openclaw-gateway.service.spec.ts create mode 100644 apps/api/src/openclaw-gateway/openclaw-gateway.service.ts diff --git a/apps/api/prisma/migrations/20260301075200_add_openclaw_agent_registry/migration.sql b/apps/api/prisma/migrations/20260301075200_add_openclaw_agent_registry/migration.sql new file mode 100644 index 0000000..0a9ef9c --- /dev/null +++ b/apps/api/prisma/migrations/20260301075200_add_openclaw_agent_registry/migration.sql @@ -0,0 +1,18 @@ +-- CreateTable +CREATE TABLE "OpenClawAgent" ( + "id" TEXT NOT NULL, + "name" TEXT NOT NULL, + "displayName" TEXT NOT NULL, + "role" TEXT NOT NULL, + "gatewayUrl" TEXT NOT NULL, + "agentId" TEXT NOT NULL DEFAULT 'main', + "model" TEXT NOT NULL, + "isActive" BOOLEAN NOT NULL DEFAULT true, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL, + + CONSTRAINT "OpenClawAgent_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "OpenClawAgent_name_key" ON "OpenClawAgent"("name"); diff --git a/apps/api/prisma/schema.prisma b/apps/api/prisma/schema.prisma index 936538b..f93a37c 100644 --- a/apps/api/prisma/schema.prisma +++ b/apps/api/prisma/schema.prisma @@ -1407,6 +1407,19 @@ model Instance { @@map("instances") } +model OpenClawAgent { + id String @id @default(cuid()) + name String @unique // "jarvis-main", "jarvis-projects", etc. + displayName String // "Jarvis (Main)", etc. + role String // "orchestrator" | "developer" | "researcher" | "operations" + gatewayUrl String // "http://jarvis-main:18789" + agentId String @default("main") // OpenClaw agent id within that instance + model String // "zai/glm-5", "ollama/cogito" + isActive Boolean @default(true) + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt +} + model FederationConnection { id String @id @default(uuid()) @db.Uuid workspaceId String @map("workspace_id") @db.Uuid diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index ba05285..f94c3fd 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -49,6 +49,7 @@ import { AdminModule } from "./admin/admin.module"; import { TeamsModule } from "./teams/teams.module"; import { ImportModule } from "./import/import.module"; import { ConversationArchiveModule } from "./conversation-archive/conversation-archive.module"; +import { OpenClawGatewayModule } from "./openclaw-gateway/openclaw-gateway.module"; import { RlsContextInterceptor } from "./common/interceptors/rls-context.interceptor"; @Module({ @@ -121,6 +122,7 @@ import { RlsContextInterceptor } from "./common/interceptors/rls-context.interce TeamsModule, ImportModule, ConversationArchiveModule, + OpenClawGatewayModule, ], controllers: [AppController, CsrfController], providers: [ diff --git a/apps/api/src/openclaw-gateway/agent-registry.controller.ts b/apps/api/src/openclaw-gateway/agent-registry.controller.ts new file mode 100644 index 0000000..0f869b3 --- /dev/null +++ b/apps/api/src/openclaw-gateway/agent-registry.controller.ts @@ -0,0 +1,36 @@ +import { Body, Controller, Get, Param, Patch, Post, Query } from "@nestjs/common"; +import type { OpenClawAgent } from "@prisma/client"; +import { AgentRegistryService } from "./agent-registry.service"; +import type { + CreateOpenClawAgentDto, + QueryOpenClawAgentsDto, + UpdateOpenClawAgentDto, +} from "./openclaw-gateway.dto"; + +@Controller("openclaw/agents") +export class AgentRegistryController { + constructor(private readonly agentRegistryService: AgentRegistryService) {} + + @Get() + async listAgents(@Query() query: QueryOpenClawAgentsDto): Promise { + return this.agentRegistryService.listAgents(query); + } + + @Get(":name") + async getAgent(@Param("name") name: string): Promise { + return this.agentRegistryService.getAgent(name); + } + + @Post() + async createAgent(@Body() dto: CreateOpenClawAgentDto): Promise { + return this.agentRegistryService.createAgent(dto); + } + + @Patch(":name") + async updateAgent( + @Param("name") name: string, + @Body() dto: UpdateOpenClawAgentDto + ): Promise { + return this.agentRegistryService.updateAgent(name, dto); + } +} diff --git a/apps/api/src/openclaw-gateway/agent-registry.service.ts b/apps/api/src/openclaw-gateway/agent-registry.service.ts new file mode 100644 index 0000000..8edb187 --- /dev/null +++ b/apps/api/src/openclaw-gateway/agent-registry.service.ts @@ -0,0 +1,65 @@ +import { Injectable, NotFoundException } from "@nestjs/common"; +import { Prisma, type OpenClawAgent } from "@prisma/client"; +import { PrismaService } from "../prisma/prisma.service"; +import type { + CreateOpenClawAgentDto, + QueryOpenClawAgentsDto, + UpdateOpenClawAgentDto, +} from "./openclaw-gateway.dto"; + +@Injectable() +export class AgentRegistryService { + constructor(private readonly prisma: PrismaService) {} + + async listAgents(query: QueryOpenClawAgentsDto): Promise { + const where = query.isActive === undefined ? {} : { isActive: query.isActive }; + return this.prisma.openClawAgent.findMany({ + where, + orderBy: { name: "asc" }, + }); + } + + async getAgent(name: string): Promise { + const agent = await this.prisma.openClawAgent.findUnique({ + where: { name }, + }); + + if (!agent) { + throw new NotFoundException(`OpenClaw agent '${name}' not found`); + } + + return agent; + } + + async createAgent(dto: CreateOpenClawAgentDto): Promise { + return this.prisma.openClawAgent.create({ + data: { + name: dto.name, + displayName: dto.displayName, + role: dto.role, + gatewayUrl: dto.gatewayUrl, + agentId: dto.agentId ?? "main", + model: dto.model, + isActive: dto.isActive ?? true, + }, + }); + } + + async updateAgent(name: string, dto: UpdateOpenClawAgentDto): Promise { + await this.getAgent(name); + + const data: Prisma.OpenClawAgentUpdateInput = {}; + if (dto.name !== undefined) data.name = dto.name; + if (dto.displayName !== undefined) data.displayName = dto.displayName; + if (dto.role !== undefined) data.role = dto.role; + if (dto.gatewayUrl !== undefined) data.gatewayUrl = dto.gatewayUrl; + if (dto.agentId !== undefined) data.agentId = dto.agentId; + if (dto.model !== undefined) data.model = dto.model; + if (dto.isActive !== undefined) data.isActive = dto.isActive; + + return this.prisma.openClawAgent.update({ + where: { name }, + data, + }); + } +} diff --git a/apps/api/src/openclaw-gateway/openclaw-gateway.controller.ts b/apps/api/src/openclaw-gateway/openclaw-gateway.controller.ts new file mode 100644 index 0000000..d8740a4 --- /dev/null +++ b/apps/api/src/openclaw-gateway/openclaw-gateway.controller.ts @@ -0,0 +1,40 @@ +import { Body, Controller, HttpCode, HttpStatus, Post, Res } from "@nestjs/common"; +import type { Response } from "express"; +import { OpenClawGatewayService } from "./openclaw-gateway.service"; +import { ChatRequestDto } from "./openclaw-gateway.dto"; + +@Controller("openclaw") +export class OpenClawGatewayController { + constructor(private readonly openClawGatewayService: OpenClawGatewayService) {} + + @Post("chat") + @HttpCode(HttpStatus.OK) + async chat(@Body() dto: ChatRequestDto, @Res() res: Response): Promise { + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + + if (typeof res.flushHeaders === "function") { + res.flushHeaders(); + } + + try { + for await (const content of this.openClawGatewayService.streamChat( + dto.agent, + dto.messages, + dto.workspaceId + )) { + res.write(`data: ${JSON.stringify({ content })}\n\n`); + } + + res.write("data: [DONE]\n\n"); + } catch (error: unknown) { + const errorMessage = error instanceof Error ? error.message : String(error); + res.write("event: error\n"); + res.write(`data: ${JSON.stringify({ error: errorMessage })}\n\n`); + } finally { + res.end(); + } + } +} diff --git a/apps/api/src/openclaw-gateway/openclaw-gateway.dto.ts b/apps/api/src/openclaw-gateway/openclaw-gateway.dto.ts new file mode 100644 index 0000000..7b6d3b8 --- /dev/null +++ b/apps/api/src/openclaw-gateway/openclaw-gateway.dto.ts @@ -0,0 +1,115 @@ +import { PartialType } from "@nestjs/mapped-types"; +import { + IsArray, + IsBoolean, + IsIn, + IsOptional, + IsString, + Matches, + MaxLength, + MinLength, + ValidateNested, +} from "class-validator"; +import { Type, Transform } from "class-transformer"; + +export type ChatRole = "system" | "user" | "assistant"; + +export interface ChatMessage { + role: ChatRole; + content: string; +} + +export class ChatMessageDto implements ChatMessage { + @IsString({ message: "role must be a string" }) + @IsIn(["system", "user", "assistant"], { + message: "role must be one of: system, user, assistant", + }) + role!: ChatRole; + + @IsString({ message: "content must be a string" }) + @MinLength(1, { message: "content must not be empty" }) + content!: string; +} + +export class ChatRequestDto { + @IsString({ message: "agent must be a string" }) + @MinLength(1, { message: "agent must not be empty" }) + @MaxLength(100, { message: "agent must not exceed 100 characters" }) + @Matches(/^[a-z0-9-]+$/, { + message: "agent must contain only lowercase letters, numbers, and hyphens", + }) + agent!: string; + + @IsArray({ message: "messages must be an array" }) + @ValidateNested({ each: true }) + @Type(() => ChatMessageDto) + messages!: ChatMessageDto[]; + + @IsOptional() + @IsString({ message: "workspaceId must be a string" }) + workspaceId?: string; +} + +export class CreateOpenClawAgentDto { + @IsString({ message: "name must be a string" }) + @MinLength(1, { message: "name must not be empty" }) + @MaxLength(100, { message: "name must not exceed 100 characters" }) + @Matches(/^[a-z0-9-]+$/, { + message: "name must contain only lowercase letters, numbers, and hyphens", + }) + name!: string; + + @IsString({ message: "displayName must be a string" }) + @MinLength(1, { message: "displayName must not be empty" }) + @MaxLength(255, { message: "displayName must not exceed 255 characters" }) + displayName!: string; + + @IsString({ message: "role must be a string" }) + @IsIn(["orchestrator", "developer", "researcher", "operations"], { + message: "role must be one of: orchestrator, developer, researcher, operations", + }) + role!: string; + + @IsString({ message: "gatewayUrl must be a string" }) + @MinLength(1, { message: "gatewayUrl must not be empty" }) + @MaxLength(2048, { message: "gatewayUrl must not exceed 2048 characters" }) + @Matches(/^https?:\/\/[^\s]+$/i, { + message: "gatewayUrl must be a valid HTTP(S) URL", + }) + gatewayUrl!: string; + + @IsOptional() + @IsString({ message: "agentId must be a string" }) + @MinLength(1, { message: "agentId must not be empty" }) + @MaxLength(100, { message: "agentId must not exceed 100 characters" }) + agentId?: string; + + @IsString({ message: "model must be a string" }) + @MinLength(1, { message: "model must not be empty" }) + @MaxLength(255, { message: "model must not exceed 255 characters" }) + model!: string; + + @IsOptional() + @IsBoolean({ message: "isActive must be a boolean" }) + isActive?: boolean; +} + +export class UpdateOpenClawAgentDto extends PartialType(CreateOpenClawAgentDto) {} + +export class QueryOpenClawAgentsDto { + @IsOptional() + @Transform(({ value }: { value: unknown }) => { + if (typeof value === "boolean") { + return value; + } + if (value === "true" || value === "1") { + return true; + } + if (value === "false" || value === "0") { + return false; + } + return value; + }) + @IsBoolean({ message: "isActive must be a boolean" }) + isActive?: boolean; +} diff --git a/apps/api/src/openclaw-gateway/openclaw-gateway.module.ts b/apps/api/src/openclaw-gateway/openclaw-gateway.module.ts new file mode 100644 index 0000000..ac8536a --- /dev/null +++ b/apps/api/src/openclaw-gateway/openclaw-gateway.module.ts @@ -0,0 +1,21 @@ +import { Module } from "@nestjs/common"; +import { HttpModule } from "@nestjs/axios"; +import { PrismaModule } from "../prisma/prisma.module"; +import { OpenClawGatewayService } from "./openclaw-gateway.service"; +import { OpenClawGatewayController } from "./openclaw-gateway.controller"; +import { AgentRegistryService } from "./agent-registry.service"; +import { AgentRegistryController } from "./agent-registry.controller"; + +@Module({ + imports: [ + PrismaModule, + HttpModule.register({ + timeout: 120000, + maxRedirects: 3, + }), + ], + controllers: [OpenClawGatewayController, AgentRegistryController], + providers: [OpenClawGatewayService, AgentRegistryService], + exports: [OpenClawGatewayService, AgentRegistryService], +}) +export class OpenClawGatewayModule {} diff --git a/apps/api/src/openclaw-gateway/openclaw-gateway.service.spec.ts b/apps/api/src/openclaw-gateway/openclaw-gateway.service.spec.ts new file mode 100644 index 0000000..cc5049b --- /dev/null +++ b/apps/api/src/openclaw-gateway/openclaw-gateway.service.spec.ts @@ -0,0 +1,212 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; +import { Test, TestingModule } from "@nestjs/testing"; +import { + NotFoundException, + ServiceUnavailableException, + UnauthorizedException, +} from "@nestjs/common"; +import { HttpService } from "@nestjs/axios"; +import { Readable } from "node:stream"; +import { PrismaService } from "../prisma/prisma.service"; +import { OpenClawGatewayService } from "./openclaw-gateway.service"; +import type { ChatMessage } from "./openclaw-gateway.dto"; + +describe("OpenClawGatewayService", () => { + let service: OpenClawGatewayService; + + const mockPrisma = { + openClawAgent: { + findUnique: vi.fn(), + }, + }; + + const mockHttpService = { + axiosRef: { + post: vi.fn(), + }, + }; + + const tokenEnvKey = "OPENCLAW_TOKEN_JARVIS_MAIN"; + + beforeEach(async () => { + vi.clearAllMocks(); + process.env[tokenEnvKey] = "test-token"; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + OpenClawGatewayService, + { provide: PrismaService, useValue: mockPrisma }, + { provide: HttpService, useValue: mockHttpService }, + ], + }).compile(); + + service = module.get(OpenClawGatewayService); + }); + + afterEach(() => { + delete process.env[tokenEnvKey]; + }); + + it("streams content chunks from OpenClaw SSE responses", async () => { + const messages: ChatMessage[] = [{ role: "user", content: "Hello" }]; + + mockPrisma.openClawAgent.findUnique.mockResolvedValue({ + id: "agent-1", + name: "jarvis-main", + displayName: "Jarvis (Main)", + role: "orchestrator", + gatewayUrl: "http://jarvis-main:18789", + agentId: "main", + model: "zai/glm-5", + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }); + + mockHttpService.axiosRef.post.mockResolvedValue({ + data: Readable.from([ + 'data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n', + 'data: {"choices":[{"delta":{"content":" world"}}]}\n\n', + "data: [DONE]\n\n", + ]), + }); + + const chunks: string[] = []; + for await (const chunk of service.streamChat("jarvis-main", messages)) { + chunks.push(chunk); + } + + expect(chunks).toEqual(["Hello", " world"]); + expect(mockHttpService.axiosRef.post).toHaveBeenCalledWith( + "http://jarvis-main:18789/v1/chat/completions", + { + model: "openclaw:main", + messages, + stream: true, + }, + expect.objectContaining({ + responseType: "stream", + headers: expect.objectContaining({ + Authorization: "Bearer test-token", + "Content-Type": "application/json", + }), + }) + ); + }); + + it("throws NotFoundException when agent is not registered", async () => { + mockPrisma.openClawAgent.findUnique.mockResolvedValue(null); + + await expect( + (async () => { + for await (const _chunk of service.streamChat("missing-agent", [])) { + // no-op + } + })() + ).rejects.toBeInstanceOf(NotFoundException); + }); + + it("throws ServiceUnavailableException when agent is inactive", async () => { + mockPrisma.openClawAgent.findUnique.mockResolvedValue({ + id: "agent-1", + name: "jarvis-main", + displayName: "Jarvis (Main)", + role: "orchestrator", + gatewayUrl: "http://jarvis-main:18789", + agentId: "main", + model: "zai/glm-5", + isActive: false, + createdAt: new Date(), + updatedAt: new Date(), + }); + + await expect( + (async () => { + for await (const _chunk of service.streamChat("jarvis-main", [])) { + // no-op + } + })() + ).rejects.toBeInstanceOf(ServiceUnavailableException); + }); + + it("throws ServiceUnavailableException when token env var is missing", async () => { + delete process.env[tokenEnvKey]; + + mockPrisma.openClawAgent.findUnique.mockResolvedValue({ + id: "agent-1", + name: "jarvis-main", + displayName: "Jarvis (Main)", + role: "orchestrator", + gatewayUrl: "http://jarvis-main:18789", + agentId: "main", + model: "zai/glm-5", + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }); + + await expect( + (async () => { + for await (const _chunk of service.streamChat("jarvis-main", [])) { + // no-op + } + })() + ).rejects.toBeInstanceOf(ServiceUnavailableException); + }); + + it("throws UnauthorizedException when OpenClaw returns 401", async () => { + mockPrisma.openClawAgent.findUnique.mockResolvedValue({ + id: "agent-1", + name: "jarvis-main", + displayName: "Jarvis (Main)", + role: "orchestrator", + gatewayUrl: "http://jarvis-main:18789", + agentId: "main", + model: "zai/glm-5", + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }); + + mockHttpService.axiosRef.post.mockRejectedValue({ + message: "Request failed with status code 401", + response: { status: 401 }, + }); + + await expect( + (async () => { + for await (const _chunk of service.streamChat("jarvis-main", [])) { + // no-op + } + })() + ).rejects.toBeInstanceOf(UnauthorizedException); + }); + + it("throws ServiceUnavailableException when gateway is offline", async () => { + mockPrisma.openClawAgent.findUnique.mockResolvedValue({ + id: "agent-1", + name: "jarvis-main", + displayName: "Jarvis (Main)", + role: "orchestrator", + gatewayUrl: "http://jarvis-main:18789", + agentId: "main", + model: "zai/glm-5", + isActive: true, + createdAt: new Date(), + updatedAt: new Date(), + }); + + mockHttpService.axiosRef.post.mockRejectedValue({ + message: "connect ECONNREFUSED 127.0.0.1:18789", + code: "ECONNREFUSED", + }); + + await expect( + (async () => { + for await (const _chunk of service.streamChat("jarvis-main", [])) { + // no-op + } + })() + ).rejects.toBeInstanceOf(ServiceUnavailableException); + }); +}); diff --git a/apps/api/src/openclaw-gateway/openclaw-gateway.service.ts b/apps/api/src/openclaw-gateway/openclaw-gateway.service.ts new file mode 100644 index 0000000..2ef8622 --- /dev/null +++ b/apps/api/src/openclaw-gateway/openclaw-gateway.service.ts @@ -0,0 +1,273 @@ +import { HttpService } from "@nestjs/axios"; +import { + Injectable, + Logger, + NotFoundException, + ServiceUnavailableException, + UnauthorizedException, +} from "@nestjs/common"; +import type { OpenClawAgent } from "@prisma/client"; +import type { Readable } from "node:stream"; +import { PrismaService } from "../prisma/prisma.service"; +import type { ChatMessage } from "./openclaw-gateway.dto"; + +interface OpenAiSseChoiceDelta { + content?: string; +} + +interface OpenAiSseChoice { + delta?: OpenAiSseChoiceDelta; +} + +interface OpenAiSseError { + message?: string; +} + +interface OpenAiSsePayload { + choices?: OpenAiSseChoice[]; + error?: OpenAiSseError; +} + +type ParsedSseEvent = { done: true } | { done: false; content: string } | null; + +interface GatewayErrorLike { + message?: string; + code?: string; + response?: { + status?: number; + }; +} + +@Injectable() +export class OpenClawGatewayService { + private readonly logger = new Logger(OpenClawGatewayService.name); + + constructor( + private readonly prisma: PrismaService, + private readonly httpService: HttpService + ) {} + + async *streamChat( + agentName: string, + messages: ChatMessage[], + workspaceId?: string + ): AsyncGenerator { + const agent = await this.prisma.openClawAgent.findUnique({ + where: { name: agentName }, + }); + + if (!agent) { + throw new NotFoundException(`OpenClaw agent '${agentName}' not found`); + } + + if (!agent.isActive) { + throw new ServiceUnavailableException(`OpenClaw agent '${agentName}' is inactive`); + } + + const token = this.resolveGatewayToken(agent.name); + const endpoint = this.buildChatEndpoint(agent.gatewayUrl); + + try { + const response = await this.httpService.axiosRef.post( + endpoint, + { + model: `openclaw:${agent.agentId}`, + messages, + stream: true, + }, + { + responseType: "stream", + timeout: 120000, + headers: { + Authorization: `Bearer ${token}`, + "Content-Type": "application/json", + }, + } + ); + + for await (const chunk of this.extractContentChunks(response.data)) { + yield chunk; + } + } catch (error: unknown) { + this.throwGatewayError(agent, endpoint, workspaceId, error); + } + } + + private resolveGatewayToken(agentName: string): string { + const envKey = this.getTokenEnvKey(agentName); + const token = process.env[envKey]; + + if (!token) { + throw new ServiceUnavailableException( + `Missing gateway token for agent '${agentName}'. Set ${envKey}.` + ); + } + + return token; + } + + private getTokenEnvKey(agentName: string): string { + return `OPENCLAW_TOKEN_${agentName.replace(/-/g, "_").toUpperCase()}`; + } + + private buildChatEndpoint(gatewayUrl: string): string { + const sanitizedBaseUrl = gatewayUrl.replace(/\/+$/, ""); + return `${sanitizedBaseUrl}/v1/chat/completions`; + } + + private async *extractContentChunks(stream: Readable): AsyncGenerator { + let buffer = ""; + + for await (const rawChunk of stream) { + buffer += this.chunkToString(rawChunk); + + for (;;) { + const delimiterMatch = /\r?\n\r?\n/.exec(buffer); + const delimiterIndex = delimiterMatch?.index; + + if (delimiterMatch === null || delimiterIndex === undefined) { + break; + } + + const rawEvent = buffer.slice(0, delimiterIndex); + buffer = buffer.slice(delimiterIndex + delimiterMatch[0].length); + + const parsed = this.parseSseEvent(rawEvent); + if (parsed === null) { + continue; + } + + if (parsed.done) { + return; + } + + yield parsed.content; + } + } + + const trailingEvent = this.parseSseEvent(buffer); + if (trailingEvent !== null && !trailingEvent.done) { + yield trailingEvent.content; + } + } + + private parseSseEvent(rawEvent: string): ParsedSseEvent { + const payload = this.extractSseDataPayload(rawEvent); + if (!payload) { + return null; + } + + if (payload === "[DONE]") { + return { done: true }; + } + + let parsedPayload: OpenAiSsePayload; + + try { + parsedPayload = JSON.parse(payload) as OpenAiSsePayload; + } catch { + this.logger.debug(`Skipping non-JSON OpenClaw SSE payload: ${payload}`); + return null; + } + + if (parsedPayload.error?.message) { + throw new ServiceUnavailableException( + `OpenClaw gateway error: ${parsedPayload.error.message}` + ); + } + + const content = parsedPayload.choices?.[0]?.delta?.content; + + if (typeof content === "string" && content.length > 0) { + return { done: false, content }; + } + + return null; + } + + private extractSseDataPayload(rawEvent: string): string | null { + if (rawEvent.trim().length === 0) { + return null; + } + + const dataLines = rawEvent + .split(/\r?\n/) + .filter((line) => line.startsWith("data:")) + .map((line) => line.slice(5).trimStart()); + + if (dataLines.length === 0) { + return null; + } + + return dataLines.join("\n").trim(); + } + + private chunkToString(chunk: unknown): string { + if (typeof chunk === "string") { + return chunk; + } + + if (Buffer.isBuffer(chunk)) { + return chunk.toString("utf8"); + } + + return String(chunk); + } + + private throwGatewayError( + agent: OpenClawAgent, + endpoint: string, + workspaceId: string | undefined, + error: unknown + ): never { + if (error instanceof NotFoundException) { + throw error; + } + + if (error instanceof UnauthorizedException) { + throw error; + } + + if (error instanceof ServiceUnavailableException) { + throw error; + } + + const gatewayError = error as GatewayErrorLike; + const statusCode = gatewayError.response?.status; + const errorCode = gatewayError.code; + const message = gatewayError.message ?? String(error); + + const workspaceSuffix = workspaceId ? ` (workspace ${workspaceId})` : ""; + + if (statusCode === 401 || statusCode === 403) { + this.logger.error( + `OpenClaw auth failed for agent '${agent.name}' at ${endpoint}${workspaceSuffix}: ${message}` + ); + throw new UnauthorizedException(`OpenClaw authentication failed for agent '${agent.name}'`); + } + + const isGatewayOfflineCode = + errorCode === "ECONNREFUSED" || + errorCode === "ENOTFOUND" || + errorCode === "ETIMEDOUT" || + errorCode === "ECONNRESET"; + const isGatewayOfflineStatus = + statusCode === 502 || statusCode === 503 || statusCode === 504 || statusCode === 522; + + if (isGatewayOfflineCode || isGatewayOfflineStatus) { + this.logger.warn( + `OpenClaw gateway offline for agent '${agent.name}' at ${endpoint}${workspaceSuffix}: ${message}` + ); + throw new ServiceUnavailableException( + `OpenClaw gateway for agent '${agent.name}' is unavailable` + ); + } + + this.logger.error( + `OpenClaw request failed for agent '${agent.name}' at ${endpoint}${workspaceSuffix}: ${message}` + ); + throw new ServiceUnavailableException( + `OpenClaw request failed for agent '${agent.name}': ${message}` + ); + } +}