feat(api): chat proxy (MS22-P1i) #615

Merged
jason.woltje merged 1 commits from feat/ms22-p1i-chat-proxy into main 2026-03-01 15:59:02 +00:00
6 changed files with 309 additions and 0 deletions
Showing only changes of commit 9a4d4e750b - Show all commits

View File

@@ -57,6 +57,7 @@ import { ContainerLifecycleModule } from "./container-lifecycle/container-lifecy
import { ContainerReaperModule } from "./container-reaper/container-reaper.module";
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
import { OnboardingModule } from "./onboarding/onboarding.module";
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
@Module({
imports: [
@@ -135,6 +136,7 @@ import { OnboardingModule } from "./onboarding/onboarding.module";
ContainerReaperModule,
FleetSettingsModule,
OnboardingModule,
ChatProxyModule,
],
controllers: [AppController, CsrfController],
providers: [

View File

@@ -0,0 +1,72 @@
import { Body, Controller, Post, Req, Res, UnauthorizedException, UseGuards } from "@nestjs/common";
import type { Response } from "express";
import { AuthGuard } from "../auth/guards/auth.guard";
import type { MaybeAuthenticatedRequest } from "../auth/types/better-auth-request.interface";
import { ChatStreamDto } from "./chat-proxy.dto";
import { ChatProxyService } from "./chat-proxy.service";
@Controller("chat")
@UseGuards(AuthGuard)
export class ChatProxyController {
constructor(private readonly chatProxyService: ChatProxyService) {}
// POST /api/chat/stream
// Request: { messages: Array<{role, content}> }
// Response: SSE stream of chat completion events
@Post("stream")
async streamChat(
@Body() body: ChatStreamDto,
@Req() req: MaybeAuthenticatedRequest,
@Res() res: Response
): Promise<void> {
const userId = req.user?.id;
if (!userId) {
throw new UnauthorizedException("No authenticated user found on request");
}
const abortController = new AbortController();
req.once("close", () => {
abortController.abort();
});
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
try {
const upstreamResponse = await this.chatProxyService.proxyChat(
userId,
body.messages,
abortController.signal
);
const upstreamContentType = upstreamResponse.headers.get("content-type");
if (upstreamContentType) {
res.setHeader("Content-Type", upstreamContentType);
}
if (!upstreamResponse.body) {
throw new Error("OpenClaw response did not include a stream body");
}
for await (const chunk of upstreamResponse.body as unknown as AsyncIterable<Uint8Array>) {
if (res.writableEnded || res.destroyed) {
break;
}
res.write(Buffer.from(chunk));
}
} catch (error: unknown) {
if (!res.writableEnded && !res.destroyed) {
const message = error instanceof Error ? error.message : String(error);
res.write("event: error\n");
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
}
} finally {
if (!res.writableEnded && !res.destroyed) {
res.end();
}
}
}
}

View File

@@ -0,0 +1,25 @@
import { Type } from "class-transformer";
import { ArrayMinSize, IsArray, IsNotEmpty, IsString, ValidateNested } from "class-validator";
export interface ChatMessage {
role: string;
content: string;
}
export class ChatMessageDto implements ChatMessage {
@IsString({ message: "role must be a string" })
@IsNotEmpty({ message: "role is required" })
role!: string;
@IsString({ message: "content must be a string" })
@IsNotEmpty({ message: "content is required" })
content!: string;
}
export class ChatStreamDto {
@IsArray({ message: "messages must be an array" })
@ArrayMinSize(1, { message: "messages must contain at least one message" })
@ValidateNested({ each: true })
@Type(() => ChatMessageDto)
messages!: ChatMessageDto[];
}

View File

@@ -0,0 +1,14 @@
import { Module } from "@nestjs/common";
import { AgentConfigModule } from "../agent-config/agent-config.module";
import { ContainerLifecycleModule } from "../container-lifecycle/container-lifecycle.module";
import { PrismaModule } from "../prisma/prisma.module";
import { ChatProxyController } from "./chat-proxy.controller";
import { ChatProxyService } from "./chat-proxy.service";
@Module({
imports: [PrismaModule, ContainerLifecycleModule, AgentConfigModule],
controllers: [ChatProxyController],
providers: [ChatProxyService],
exports: [ChatProxyService],
})
export class ChatProxyModule {}

View File

@@ -0,0 +1,107 @@
import { ServiceUnavailableException } from "@nestjs/common";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { ChatProxyService } from "./chat-proxy.service";
describe("ChatProxyService", () => {
const userId = "user-123";
const prisma = {
userAgentConfig: {
findUnique: vi.fn(),
},
};
const containerLifecycle = {
ensureRunning: vi.fn(),
touch: vi.fn(),
};
let service: ChatProxyService;
let fetchMock: ReturnType<typeof vi.fn>;
beforeEach(() => {
fetchMock = vi.fn();
vi.stubGlobal("fetch", fetchMock);
service = new ChatProxyService(prisma as never, containerLifecycle as never);
});
afterEach(() => {
vi.unstubAllGlobals();
vi.clearAllMocks();
});
describe("getContainerUrl", () => {
it("calls ensureRunning and touch for the user", async () => {
containerLifecycle.ensureRunning.mockResolvedValue({
url: "http://mosaic-user-user-123:19000",
token: "gateway-token",
});
containerLifecycle.touch.mockResolvedValue(undefined);
const url = await service.getContainerUrl(userId);
expect(url).toBe("http://mosaic-user-user-123:19000");
expect(containerLifecycle.ensureRunning).toHaveBeenCalledWith(userId);
expect(containerLifecycle.touch).toHaveBeenCalledWith(userId);
});
});
describe("proxyChat", () => {
it("forwards the request to the user's OpenClaw container", async () => {
containerLifecycle.ensureRunning.mockResolvedValue({
url: "http://mosaic-user-user-123:19000",
token: "gateway-token",
});
containerLifecycle.touch.mockResolvedValue(undefined);
fetchMock.mockResolvedValue(new Response("event: token\ndata: hello\n\n"));
const messages = [{ role: "user", content: "Hello from Mosaic" }];
const response = await service.proxyChat(userId, messages);
expect(response).toBeInstanceOf(Response);
expect(fetchMock).toHaveBeenCalledWith(
"http://mosaic-user-user-123:19000/v1/chat/completions",
expect.objectContaining({
method: "POST",
headers: {
"Content-Type": "application/json",
},
})
);
const [, request] = fetchMock.mock.calls[0] as [string, RequestInit];
const parsedBody = JSON.parse(String(request.body));
expect(parsedBody).toEqual({
messages,
model: "openclaw:default",
stream: true,
});
});
it("throws ServiceUnavailableException on connection refused errors", async () => {
containerLifecycle.ensureRunning.mockResolvedValue({
url: "http://mosaic-user-user-123:19000",
token: "gateway-token",
});
containerLifecycle.touch.mockResolvedValue(undefined);
fetchMock.mockRejectedValue(new Error("connect ECONNREFUSED 127.0.0.1:19000"));
await expect(service.proxyChat(userId, [])).rejects.toBeInstanceOf(
ServiceUnavailableException
);
});
it("throws ServiceUnavailableException on timeout errors", async () => {
containerLifecycle.ensureRunning.mockResolvedValue({
url: "http://mosaic-user-user-123:19000",
token: "gateway-token",
});
containerLifecycle.touch.mockResolvedValue(undefined);
fetchMock.mockRejectedValue(new Error("The operation was aborted due to timeout"));
await expect(service.proxyChat(userId, [])).rejects.toBeInstanceOf(
ServiceUnavailableException
);
});
});
});

View File

@@ -0,0 +1,89 @@
import { BadGatewayException, Injectable, ServiceUnavailableException } from "@nestjs/common";
import { ContainerLifecycleService } from "../container-lifecycle/container-lifecycle.service";
import { PrismaService } from "../prisma/prisma.service";
import type { ChatMessage } from "./chat-proxy.dto";
const DEFAULT_OPENCLAW_MODEL = "openclaw:default";
@Injectable()
export class ChatProxyService {
constructor(
private readonly prisma: PrismaService,
private readonly containerLifecycle: ContainerLifecycleService
) {}
// Get the user's OpenClaw container URL and mark it active.
async getContainerUrl(userId: string): Promise<string> {
const { url } = await this.containerLifecycle.ensureRunning(userId);
await this.containerLifecycle.touch(userId);
return url;
}
// Proxy chat request to OpenClaw.
async proxyChat(
userId: string,
messages: ChatMessage[],
signal?: AbortSignal
): Promise<Response> {
const containerUrl = await this.getContainerUrl(userId);
const model = await this.getPreferredModel(userId);
const requestInit: RequestInit = {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
messages,
model,
stream: true,
}),
};
if (signal) {
requestInit.signal = signal;
}
try {
const response = await fetch(`${containerUrl}/v1/chat/completions`, requestInit);
if (!response.ok) {
const detail = await this.readResponseText(response);
const status = `${String(response.status)} ${response.statusText}`.trim();
const message = detail
? `OpenClaw returned ${status}: ${detail}`
: `OpenClaw returned ${status}`;
throw new BadGatewayException(message);
}
return response;
} catch (error: unknown) {
if (error instanceof BadGatewayException) {
throw error;
}
const message = error instanceof Error ? error.message : String(error);
throw new ServiceUnavailableException(`Failed to proxy chat to OpenClaw: ${message}`);
}
}
private async getPreferredModel(userId: string): Promise<string> {
const config = await this.prisma.userAgentConfig.findUnique({
where: { userId },
select: { primaryModel: true },
});
const primaryModel = config?.primaryModel?.trim();
if (!primaryModel) {
return DEFAULT_OPENCLAW_MODEL;
}
return primaryModel;
}
private async readResponseText(response: Response): Promise<string | null> {
try {
const text = (await response.text()).trim();
return text.length > 0 ? text : null;
} catch {
return null;
}
}
}