From 029c190c05787a528f6db6bc2928b413aaf4dc45 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 1 Mar 2026 15:59:00 +0000 Subject: [PATCH] feat(api): chat proxy (MS22-P1i) (#615) Co-authored-by: Jason Woltje Co-committed-by: Jason Woltje --- apps/api/src/app.module.ts | 2 + .../src/chat-proxy/chat-proxy.controller.ts | 72 ++++++++++++ apps/api/src/chat-proxy/chat-proxy.dto.ts | 25 ++++ apps/api/src/chat-proxy/chat-proxy.module.ts | 14 +++ .../src/chat-proxy/chat-proxy.service.spec.ts | 107 ++++++++++++++++++ apps/api/src/chat-proxy/chat-proxy.service.ts | 89 +++++++++++++++ 6 files changed, 309 insertions(+) create mode 100644 apps/api/src/chat-proxy/chat-proxy.controller.ts create mode 100644 apps/api/src/chat-proxy/chat-proxy.dto.ts create mode 100644 apps/api/src/chat-proxy/chat-proxy.module.ts create mode 100644 apps/api/src/chat-proxy/chat-proxy.service.spec.ts create mode 100644 apps/api/src/chat-proxy/chat-proxy.service.ts diff --git a/apps/api/src/app.module.ts b/apps/api/src/app.module.ts index 147bb6f..92cfa5c 100644 --- a/apps/api/src/app.module.ts +++ b/apps/api/src/app.module.ts @@ -57,6 +57,7 @@ import { ContainerLifecycleModule } from "./container-lifecycle/container-lifecy import { ContainerReaperModule } from "./container-reaper/container-reaper.module"; import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module"; import { OnboardingModule } from "./onboarding/onboarding.module"; +import { ChatProxyModule } from "./chat-proxy/chat-proxy.module"; @Module({ imports: [ @@ -135,6 +136,7 @@ import { OnboardingModule } from "./onboarding/onboarding.module"; ContainerReaperModule, FleetSettingsModule, OnboardingModule, + ChatProxyModule, ], controllers: [AppController, CsrfController], providers: [ diff --git a/apps/api/src/chat-proxy/chat-proxy.controller.ts b/apps/api/src/chat-proxy/chat-proxy.controller.ts new file mode 100644 index 0000000..dc38807 --- /dev/null +++ b/apps/api/src/chat-proxy/chat-proxy.controller.ts @@ -0,0 +1,72 @@ +import { Body, Controller, Post, Req, Res, UnauthorizedException, UseGuards } from "@nestjs/common"; +import type { Response } from "express"; +import { AuthGuard } from "../auth/guards/auth.guard"; +import type { MaybeAuthenticatedRequest } from "../auth/types/better-auth-request.interface"; +import { ChatStreamDto } from "./chat-proxy.dto"; +import { ChatProxyService } from "./chat-proxy.service"; + +@Controller("chat") +@UseGuards(AuthGuard) +export class ChatProxyController { + constructor(private readonly chatProxyService: ChatProxyService) {} + + // POST /api/chat/stream + // Request: { messages: Array<{role, content}> } + // Response: SSE stream of chat completion events + @Post("stream") + async streamChat( + @Body() body: ChatStreamDto, + @Req() req: MaybeAuthenticatedRequest, + @Res() res: Response + ): Promise { + const userId = req.user?.id; + if (!userId) { + throw new UnauthorizedException("No authenticated user found on request"); + } + + const abortController = new AbortController(); + req.once("close", () => { + abortController.abort(); + }); + + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + + try { + const upstreamResponse = await this.chatProxyService.proxyChat( + userId, + body.messages, + abortController.signal + ); + + const upstreamContentType = upstreamResponse.headers.get("content-type"); + if (upstreamContentType) { + res.setHeader("Content-Type", upstreamContentType); + } + + if (!upstreamResponse.body) { + throw new Error("OpenClaw response did not include a stream body"); + } + + for await (const chunk of upstreamResponse.body as unknown as AsyncIterable) { + if (res.writableEnded || res.destroyed) { + break; + } + + res.write(Buffer.from(chunk)); + } + } catch (error: unknown) { + if (!res.writableEnded && !res.destroyed) { + const message = error instanceof Error ? error.message : String(error); + res.write("event: error\n"); + res.write(`data: ${JSON.stringify({ error: message })}\n\n`); + } + } finally { + if (!res.writableEnded && !res.destroyed) { + res.end(); + } + } + } +} diff --git a/apps/api/src/chat-proxy/chat-proxy.dto.ts b/apps/api/src/chat-proxy/chat-proxy.dto.ts new file mode 100644 index 0000000..1c77cd9 --- /dev/null +++ b/apps/api/src/chat-proxy/chat-proxy.dto.ts @@ -0,0 +1,25 @@ +import { Type } from "class-transformer"; +import { ArrayMinSize, IsArray, IsNotEmpty, IsString, ValidateNested } from "class-validator"; + +export interface ChatMessage { + role: string; + content: string; +} + +export class ChatMessageDto implements ChatMessage { + @IsString({ message: "role must be a string" }) + @IsNotEmpty({ message: "role is required" }) + role!: string; + + @IsString({ message: "content must be a string" }) + @IsNotEmpty({ message: "content is required" }) + content!: string; +} + +export class ChatStreamDto { + @IsArray({ message: "messages must be an array" }) + @ArrayMinSize(1, { message: "messages must contain at least one message" }) + @ValidateNested({ each: true }) + @Type(() => ChatMessageDto) + messages!: ChatMessageDto[]; +} diff --git a/apps/api/src/chat-proxy/chat-proxy.module.ts b/apps/api/src/chat-proxy/chat-proxy.module.ts new file mode 100644 index 0000000..0a010db --- /dev/null +++ b/apps/api/src/chat-proxy/chat-proxy.module.ts @@ -0,0 +1,14 @@ +import { Module } from "@nestjs/common"; +import { AgentConfigModule } from "../agent-config/agent-config.module"; +import { ContainerLifecycleModule } from "../container-lifecycle/container-lifecycle.module"; +import { PrismaModule } from "../prisma/prisma.module"; +import { ChatProxyController } from "./chat-proxy.controller"; +import { ChatProxyService } from "./chat-proxy.service"; + +@Module({ + imports: [PrismaModule, ContainerLifecycleModule, AgentConfigModule], + controllers: [ChatProxyController], + providers: [ChatProxyService], + exports: [ChatProxyService], +}) +export class ChatProxyModule {} diff --git a/apps/api/src/chat-proxy/chat-proxy.service.spec.ts b/apps/api/src/chat-proxy/chat-proxy.service.spec.ts new file mode 100644 index 0000000..17b3c33 --- /dev/null +++ b/apps/api/src/chat-proxy/chat-proxy.service.spec.ts @@ -0,0 +1,107 @@ +import { ServiceUnavailableException } from "@nestjs/common"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { ChatProxyService } from "./chat-proxy.service"; + +describe("ChatProxyService", () => { + const userId = "user-123"; + + const prisma = { + userAgentConfig: { + findUnique: vi.fn(), + }, + }; + + const containerLifecycle = { + ensureRunning: vi.fn(), + touch: vi.fn(), + }; + + let service: ChatProxyService; + let fetchMock: ReturnType; + + beforeEach(() => { + fetchMock = vi.fn(); + vi.stubGlobal("fetch", fetchMock); + service = new ChatProxyService(prisma as never, containerLifecycle as never); + }); + + afterEach(() => { + vi.unstubAllGlobals(); + vi.clearAllMocks(); + }); + + describe("getContainerUrl", () => { + it("calls ensureRunning and touch for the user", async () => { + containerLifecycle.ensureRunning.mockResolvedValue({ + url: "http://mosaic-user-user-123:19000", + token: "gateway-token", + }); + containerLifecycle.touch.mockResolvedValue(undefined); + + const url = await service.getContainerUrl(userId); + + expect(url).toBe("http://mosaic-user-user-123:19000"); + expect(containerLifecycle.ensureRunning).toHaveBeenCalledWith(userId); + expect(containerLifecycle.touch).toHaveBeenCalledWith(userId); + }); + }); + + describe("proxyChat", () => { + it("forwards the request to the user's OpenClaw container", async () => { + containerLifecycle.ensureRunning.mockResolvedValue({ + url: "http://mosaic-user-user-123:19000", + token: "gateway-token", + }); + containerLifecycle.touch.mockResolvedValue(undefined); + fetchMock.mockResolvedValue(new Response("event: token\ndata: hello\n\n")); + + const messages = [{ role: "user", content: "Hello from Mosaic" }]; + const response = await service.proxyChat(userId, messages); + + expect(response).toBeInstanceOf(Response); + expect(fetchMock).toHaveBeenCalledWith( + "http://mosaic-user-user-123:19000/v1/chat/completions", + expect.objectContaining({ + method: "POST", + headers: { + "Content-Type": "application/json", + }, + }) + ); + + const [, request] = fetchMock.mock.calls[0] as [string, RequestInit]; + const parsedBody = JSON.parse(String(request.body)); + expect(parsedBody).toEqual({ + messages, + model: "openclaw:default", + stream: true, + }); + }); + + it("throws ServiceUnavailableException on connection refused errors", async () => { + containerLifecycle.ensureRunning.mockResolvedValue({ + url: "http://mosaic-user-user-123:19000", + token: "gateway-token", + }); + containerLifecycle.touch.mockResolvedValue(undefined); + fetchMock.mockRejectedValue(new Error("connect ECONNREFUSED 127.0.0.1:19000")); + + await expect(service.proxyChat(userId, [])).rejects.toBeInstanceOf( + ServiceUnavailableException + ); + }); + + it("throws ServiceUnavailableException on timeout errors", async () => { + containerLifecycle.ensureRunning.mockResolvedValue({ + url: "http://mosaic-user-user-123:19000", + token: "gateway-token", + }); + containerLifecycle.touch.mockResolvedValue(undefined); + fetchMock.mockRejectedValue(new Error("The operation was aborted due to timeout")); + + await expect(service.proxyChat(userId, [])).rejects.toBeInstanceOf( + ServiceUnavailableException + ); + }); + }); +}); diff --git a/apps/api/src/chat-proxy/chat-proxy.service.ts b/apps/api/src/chat-proxy/chat-proxy.service.ts new file mode 100644 index 0000000..f936d6a --- /dev/null +++ b/apps/api/src/chat-proxy/chat-proxy.service.ts @@ -0,0 +1,89 @@ +import { BadGatewayException, Injectable, ServiceUnavailableException } from "@nestjs/common"; +import { ContainerLifecycleService } from "../container-lifecycle/container-lifecycle.service"; +import { PrismaService } from "../prisma/prisma.service"; +import type { ChatMessage } from "./chat-proxy.dto"; + +const DEFAULT_OPENCLAW_MODEL = "openclaw:default"; + +@Injectable() +export class ChatProxyService { + constructor( + private readonly prisma: PrismaService, + private readonly containerLifecycle: ContainerLifecycleService + ) {} + + // Get the user's OpenClaw container URL and mark it active. + async getContainerUrl(userId: string): Promise { + const { url } = await this.containerLifecycle.ensureRunning(userId); + await this.containerLifecycle.touch(userId); + return url; + } + + // Proxy chat request to OpenClaw. + async proxyChat( + userId: string, + messages: ChatMessage[], + signal?: AbortSignal + ): Promise { + const containerUrl = await this.getContainerUrl(userId); + const model = await this.getPreferredModel(userId); + const requestInit: RequestInit = { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + messages, + model, + stream: true, + }), + }; + + if (signal) { + requestInit.signal = signal; + } + + try { + const response = await fetch(`${containerUrl}/v1/chat/completions`, requestInit); + + if (!response.ok) { + const detail = await this.readResponseText(response); + const status = `${String(response.status)} ${response.statusText}`.trim(); + const message = detail + ? `OpenClaw returned ${status}: ${detail}` + : `OpenClaw returned ${status}`; + throw new BadGatewayException(message); + } + + return response; + } catch (error: unknown) { + if (error instanceof BadGatewayException) { + throw error; + } + + const message = error instanceof Error ? error.message : String(error); + throw new ServiceUnavailableException(`Failed to proxy chat to OpenClaw: ${message}`); + } + } + + private async getPreferredModel(userId: string): Promise { + const config = await this.prisma.userAgentConfig.findUnique({ + where: { userId }, + select: { primaryModel: true }, + }); + + const primaryModel = config?.primaryModel?.trim(); + if (!primaryModel) { + return DEFAULT_OPENCLAW_MODEL; + } + + return primaryModel; + } + + private async readResponseText(response: Response): Promise { + try { + const text = (await response.text()).trim(); + return text.length > 0 ? text : null; + } catch { + return null; + } + } +}