Compare commits
1 Commits
feat/usage
...
feat/ms22-
| Author | SHA1 | Date | |
|---|---|---|---|
| 9a4d4e750b |
@@ -57,6 +57,7 @@ import { ContainerLifecycleModule } from "./container-lifecycle/container-lifecy
|
|||||||
import { ContainerReaperModule } from "./container-reaper/container-reaper.module";
|
import { ContainerReaperModule } from "./container-reaper/container-reaper.module";
|
||||||
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
|
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
|
||||||
import { OnboardingModule } from "./onboarding/onboarding.module";
|
import { OnboardingModule } from "./onboarding/onboarding.module";
|
||||||
|
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
@@ -135,6 +136,7 @@ import { OnboardingModule } from "./onboarding/onboarding.module";
|
|||||||
ContainerReaperModule,
|
ContainerReaperModule,
|
||||||
FleetSettingsModule,
|
FleetSettingsModule,
|
||||||
OnboardingModule,
|
OnboardingModule,
|
||||||
|
ChatProxyModule,
|
||||||
],
|
],
|
||||||
controllers: [AppController, CsrfController],
|
controllers: [AppController, CsrfController],
|
||||||
providers: [
|
providers: [
|
||||||
|
|||||||
72
apps/api/src/chat-proxy/chat-proxy.controller.ts
Normal file
72
apps/api/src/chat-proxy/chat-proxy.controller.ts
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
import { Body, Controller, Post, Req, Res, UnauthorizedException, UseGuards } from "@nestjs/common";
|
||||||
|
import type { Response } from "express";
|
||||||
|
import { AuthGuard } from "../auth/guards/auth.guard";
|
||||||
|
import type { MaybeAuthenticatedRequest } from "../auth/types/better-auth-request.interface";
|
||||||
|
import { ChatStreamDto } from "./chat-proxy.dto";
|
||||||
|
import { ChatProxyService } from "./chat-proxy.service";
|
||||||
|
|
||||||
|
@Controller("chat")
|
||||||
|
@UseGuards(AuthGuard)
|
||||||
|
export class ChatProxyController {
|
||||||
|
constructor(private readonly chatProxyService: ChatProxyService) {}
|
||||||
|
|
||||||
|
// POST /api/chat/stream
|
||||||
|
// Request: { messages: Array<{role, content}> }
|
||||||
|
// Response: SSE stream of chat completion events
|
||||||
|
@Post("stream")
|
||||||
|
async streamChat(
|
||||||
|
@Body() body: ChatStreamDto,
|
||||||
|
@Req() req: MaybeAuthenticatedRequest,
|
||||||
|
@Res() res: Response
|
||||||
|
): Promise<void> {
|
||||||
|
const userId = req.user?.id;
|
||||||
|
if (!userId) {
|
||||||
|
throw new UnauthorizedException("No authenticated user found on request");
|
||||||
|
}
|
||||||
|
|
||||||
|
const abortController = new AbortController();
|
||||||
|
req.once("close", () => {
|
||||||
|
abortController.abort();
|
||||||
|
});
|
||||||
|
|
||||||
|
res.setHeader("Content-Type", "text/event-stream");
|
||||||
|
res.setHeader("Cache-Control", "no-cache");
|
||||||
|
res.setHeader("Connection", "keep-alive");
|
||||||
|
res.setHeader("X-Accel-Buffering", "no");
|
||||||
|
|
||||||
|
try {
|
||||||
|
const upstreamResponse = await this.chatProxyService.proxyChat(
|
||||||
|
userId,
|
||||||
|
body.messages,
|
||||||
|
abortController.signal
|
||||||
|
);
|
||||||
|
|
||||||
|
const upstreamContentType = upstreamResponse.headers.get("content-type");
|
||||||
|
if (upstreamContentType) {
|
||||||
|
res.setHeader("Content-Type", upstreamContentType);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!upstreamResponse.body) {
|
||||||
|
throw new Error("OpenClaw response did not include a stream body");
|
||||||
|
}
|
||||||
|
|
||||||
|
for await (const chunk of upstreamResponse.body as unknown as AsyncIterable<Uint8Array>) {
|
||||||
|
if (res.writableEnded || res.destroyed) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
res.write(Buffer.from(chunk));
|
||||||
|
}
|
||||||
|
} catch (error: unknown) {
|
||||||
|
if (!res.writableEnded && !res.destroyed) {
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
res.write("event: error\n");
|
||||||
|
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (!res.writableEnded && !res.destroyed) {
|
||||||
|
res.end();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
25
apps/api/src/chat-proxy/chat-proxy.dto.ts
Normal file
25
apps/api/src/chat-proxy/chat-proxy.dto.ts
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
import { Type } from "class-transformer";
|
||||||
|
import { ArrayMinSize, IsArray, IsNotEmpty, IsString, ValidateNested } from "class-validator";
|
||||||
|
|
||||||
|
export interface ChatMessage {
|
||||||
|
role: string;
|
||||||
|
content: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ChatMessageDto implements ChatMessage {
|
||||||
|
@IsString({ message: "role must be a string" })
|
||||||
|
@IsNotEmpty({ message: "role is required" })
|
||||||
|
role!: string;
|
||||||
|
|
||||||
|
@IsString({ message: "content must be a string" })
|
||||||
|
@IsNotEmpty({ message: "content is required" })
|
||||||
|
content!: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ChatStreamDto {
|
||||||
|
@IsArray({ message: "messages must be an array" })
|
||||||
|
@ArrayMinSize(1, { message: "messages must contain at least one message" })
|
||||||
|
@ValidateNested({ each: true })
|
||||||
|
@Type(() => ChatMessageDto)
|
||||||
|
messages!: ChatMessageDto[];
|
||||||
|
}
|
||||||
14
apps/api/src/chat-proxy/chat-proxy.module.ts
Normal file
14
apps/api/src/chat-proxy/chat-proxy.module.ts
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
import { Module } from "@nestjs/common";
|
||||||
|
import { AgentConfigModule } from "../agent-config/agent-config.module";
|
||||||
|
import { ContainerLifecycleModule } from "../container-lifecycle/container-lifecycle.module";
|
||||||
|
import { PrismaModule } from "../prisma/prisma.module";
|
||||||
|
import { ChatProxyController } from "./chat-proxy.controller";
|
||||||
|
import { ChatProxyService } from "./chat-proxy.service";
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [PrismaModule, ContainerLifecycleModule, AgentConfigModule],
|
||||||
|
controllers: [ChatProxyController],
|
||||||
|
providers: [ChatProxyService],
|
||||||
|
exports: [ChatProxyService],
|
||||||
|
})
|
||||||
|
export class ChatProxyModule {}
|
||||||
107
apps/api/src/chat-proxy/chat-proxy.service.spec.ts
Normal file
107
apps/api/src/chat-proxy/chat-proxy.service.spec.ts
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
import { ServiceUnavailableException } from "@nestjs/common";
|
||||||
|
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
import { ChatProxyService } from "./chat-proxy.service";
|
||||||
|
|
||||||
|
describe("ChatProxyService", () => {
|
||||||
|
const userId = "user-123";
|
||||||
|
|
||||||
|
const prisma = {
|
||||||
|
userAgentConfig: {
|
||||||
|
findUnique: vi.fn(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const containerLifecycle = {
|
||||||
|
ensureRunning: vi.fn(),
|
||||||
|
touch: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let service: ChatProxyService;
|
||||||
|
let fetchMock: ReturnType<typeof vi.fn>;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
fetchMock = vi.fn();
|
||||||
|
vi.stubGlobal("fetch", fetchMock);
|
||||||
|
service = new ChatProxyService(prisma as never, containerLifecycle as never);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.unstubAllGlobals();
|
||||||
|
vi.clearAllMocks();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("getContainerUrl", () => {
|
||||||
|
it("calls ensureRunning and touch for the user", async () => {
|
||||||
|
containerLifecycle.ensureRunning.mockResolvedValue({
|
||||||
|
url: "http://mosaic-user-user-123:19000",
|
||||||
|
token: "gateway-token",
|
||||||
|
});
|
||||||
|
containerLifecycle.touch.mockResolvedValue(undefined);
|
||||||
|
|
||||||
|
const url = await service.getContainerUrl(userId);
|
||||||
|
|
||||||
|
expect(url).toBe("http://mosaic-user-user-123:19000");
|
||||||
|
expect(containerLifecycle.ensureRunning).toHaveBeenCalledWith(userId);
|
||||||
|
expect(containerLifecycle.touch).toHaveBeenCalledWith(userId);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("proxyChat", () => {
|
||||||
|
it("forwards the request to the user's OpenClaw container", async () => {
|
||||||
|
containerLifecycle.ensureRunning.mockResolvedValue({
|
||||||
|
url: "http://mosaic-user-user-123:19000",
|
||||||
|
token: "gateway-token",
|
||||||
|
});
|
||||||
|
containerLifecycle.touch.mockResolvedValue(undefined);
|
||||||
|
fetchMock.mockResolvedValue(new Response("event: token\ndata: hello\n\n"));
|
||||||
|
|
||||||
|
const messages = [{ role: "user", content: "Hello from Mosaic" }];
|
||||||
|
const response = await service.proxyChat(userId, messages);
|
||||||
|
|
||||||
|
expect(response).toBeInstanceOf(Response);
|
||||||
|
expect(fetchMock).toHaveBeenCalledWith(
|
||||||
|
"http://mosaic-user-user-123:19000/v1/chat/completions",
|
||||||
|
expect.objectContaining({
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
const [, request] = fetchMock.mock.calls[0] as [string, RequestInit];
|
||||||
|
const parsedBody = JSON.parse(String(request.body));
|
||||||
|
expect(parsedBody).toEqual({
|
||||||
|
messages,
|
||||||
|
model: "openclaw:default",
|
||||||
|
stream: true,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws ServiceUnavailableException on connection refused errors", async () => {
|
||||||
|
containerLifecycle.ensureRunning.mockResolvedValue({
|
||||||
|
url: "http://mosaic-user-user-123:19000",
|
||||||
|
token: "gateway-token",
|
||||||
|
});
|
||||||
|
containerLifecycle.touch.mockResolvedValue(undefined);
|
||||||
|
fetchMock.mockRejectedValue(new Error("connect ECONNREFUSED 127.0.0.1:19000"));
|
||||||
|
|
||||||
|
await expect(service.proxyChat(userId, [])).rejects.toBeInstanceOf(
|
||||||
|
ServiceUnavailableException
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws ServiceUnavailableException on timeout errors", async () => {
|
||||||
|
containerLifecycle.ensureRunning.mockResolvedValue({
|
||||||
|
url: "http://mosaic-user-user-123:19000",
|
||||||
|
token: "gateway-token",
|
||||||
|
});
|
||||||
|
containerLifecycle.touch.mockResolvedValue(undefined);
|
||||||
|
fetchMock.mockRejectedValue(new Error("The operation was aborted due to timeout"));
|
||||||
|
|
||||||
|
await expect(service.proxyChat(userId, [])).rejects.toBeInstanceOf(
|
||||||
|
ServiceUnavailableException
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
89
apps/api/src/chat-proxy/chat-proxy.service.ts
Normal file
89
apps/api/src/chat-proxy/chat-proxy.service.ts
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
import { BadGatewayException, Injectable, ServiceUnavailableException } from "@nestjs/common";
|
||||||
|
import { ContainerLifecycleService } from "../container-lifecycle/container-lifecycle.service";
|
||||||
|
import { PrismaService } from "../prisma/prisma.service";
|
||||||
|
import type { ChatMessage } from "./chat-proxy.dto";
|
||||||
|
|
||||||
|
const DEFAULT_OPENCLAW_MODEL = "openclaw:default";
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class ChatProxyService {
|
||||||
|
constructor(
|
||||||
|
private readonly prisma: PrismaService,
|
||||||
|
private readonly containerLifecycle: ContainerLifecycleService
|
||||||
|
) {}
|
||||||
|
|
||||||
|
// Get the user's OpenClaw container URL and mark it active.
|
||||||
|
async getContainerUrl(userId: string): Promise<string> {
|
||||||
|
const { url } = await this.containerLifecycle.ensureRunning(userId);
|
||||||
|
await this.containerLifecycle.touch(userId);
|
||||||
|
return url;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Proxy chat request to OpenClaw.
|
||||||
|
async proxyChat(
|
||||||
|
userId: string,
|
||||||
|
messages: ChatMessage[],
|
||||||
|
signal?: AbortSignal
|
||||||
|
): Promise<Response> {
|
||||||
|
const containerUrl = await this.getContainerUrl(userId);
|
||||||
|
const model = await this.getPreferredModel(userId);
|
||||||
|
const requestInit: RequestInit = {
|
||||||
|
method: "POST",
|
||||||
|
headers: { "Content-Type": "application/json" },
|
||||||
|
body: JSON.stringify({
|
||||||
|
messages,
|
||||||
|
model,
|
||||||
|
stream: true,
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
|
||||||
|
if (signal) {
|
||||||
|
requestInit.signal = signal;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await fetch(`${containerUrl}/v1/chat/completions`, requestInit);
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
const detail = await this.readResponseText(response);
|
||||||
|
const status = `${String(response.status)} ${response.statusText}`.trim();
|
||||||
|
const message = detail
|
||||||
|
? `OpenClaw returned ${status}: ${detail}`
|
||||||
|
: `OpenClaw returned ${status}`;
|
||||||
|
throw new BadGatewayException(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
return response;
|
||||||
|
} catch (error: unknown) {
|
||||||
|
if (error instanceof BadGatewayException) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = error instanceof Error ? error.message : String(error);
|
||||||
|
throw new ServiceUnavailableException(`Failed to proxy chat to OpenClaw: ${message}`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async getPreferredModel(userId: string): Promise<string> {
|
||||||
|
const config = await this.prisma.userAgentConfig.findUnique({
|
||||||
|
where: { userId },
|
||||||
|
select: { primaryModel: true },
|
||||||
|
});
|
||||||
|
|
||||||
|
const primaryModel = config?.primaryModel?.trim();
|
||||||
|
if (!primaryModel) {
|
||||||
|
return DEFAULT_OPENCLAW_MODEL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return primaryModel;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async readResponseText(response: Response): Promise<string | null> {
|
||||||
|
try {
|
||||||
|
const text = (await response.text()).trim();
|
||||||
|
return text.length > 0 ? text : null;
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user