Compare commits
1 Commits
feat/ms22-
...
feat/ms22-
| Author | SHA1 | Date | |
|---|---|---|---|
| 2c4f290c78 |
@@ -57,7 +57,6 @@ import { ContainerLifecycleModule } from "./container-lifecycle/container-lifecy
|
|||||||
import { ContainerReaperModule } from "./container-reaper/container-reaper.module";
|
import { ContainerReaperModule } from "./container-reaper/container-reaper.module";
|
||||||
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
|
import { FleetSettingsModule } from "./fleet-settings/fleet-settings.module";
|
||||||
import { OnboardingModule } from "./onboarding/onboarding.module";
|
import { OnboardingModule } from "./onboarding/onboarding.module";
|
||||||
import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
|
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
imports: [
|
imports: [
|
||||||
@@ -136,7 +135,6 @@ import { ChatProxyModule } from "./chat-proxy/chat-proxy.module";
|
|||||||
ContainerReaperModule,
|
ContainerReaperModule,
|
||||||
FleetSettingsModule,
|
FleetSettingsModule,
|
||||||
OnboardingModule,
|
OnboardingModule,
|
||||||
ChatProxyModule,
|
|
||||||
],
|
],
|
||||||
controllers: [AppController, CsrfController],
|
controllers: [AppController, CsrfController],
|
||||||
providers: [
|
providers: [
|
||||||
|
|||||||
@@ -1,72 +0,0 @@
|
|||||||
import { Body, Controller, Post, Req, Res, UnauthorizedException, UseGuards } from "@nestjs/common";
|
|
||||||
import type { Response } from "express";
|
|
||||||
import { AuthGuard } from "../auth/guards/auth.guard";
|
|
||||||
import type { MaybeAuthenticatedRequest } from "../auth/types/better-auth-request.interface";
|
|
||||||
import { ChatStreamDto } from "./chat-proxy.dto";
|
|
||||||
import { ChatProxyService } from "./chat-proxy.service";
|
|
||||||
|
|
||||||
@Controller("chat")
|
|
||||||
@UseGuards(AuthGuard)
|
|
||||||
export class ChatProxyController {
|
|
||||||
constructor(private readonly chatProxyService: ChatProxyService) {}
|
|
||||||
|
|
||||||
// POST /api/chat/stream
|
|
||||||
// Request: { messages: Array<{role, content}> }
|
|
||||||
// Response: SSE stream of chat completion events
|
|
||||||
@Post("stream")
|
|
||||||
async streamChat(
|
|
||||||
@Body() body: ChatStreamDto,
|
|
||||||
@Req() req: MaybeAuthenticatedRequest,
|
|
||||||
@Res() res: Response
|
|
||||||
): Promise<void> {
|
|
||||||
const userId = req.user?.id;
|
|
||||||
if (!userId) {
|
|
||||||
throw new UnauthorizedException("No authenticated user found on request");
|
|
||||||
}
|
|
||||||
|
|
||||||
const abortController = new AbortController();
|
|
||||||
req.once("close", () => {
|
|
||||||
abortController.abort();
|
|
||||||
});
|
|
||||||
|
|
||||||
res.setHeader("Content-Type", "text/event-stream");
|
|
||||||
res.setHeader("Cache-Control", "no-cache");
|
|
||||||
res.setHeader("Connection", "keep-alive");
|
|
||||||
res.setHeader("X-Accel-Buffering", "no");
|
|
||||||
|
|
||||||
try {
|
|
||||||
const upstreamResponse = await this.chatProxyService.proxyChat(
|
|
||||||
userId,
|
|
||||||
body.messages,
|
|
||||||
abortController.signal
|
|
||||||
);
|
|
||||||
|
|
||||||
const upstreamContentType = upstreamResponse.headers.get("content-type");
|
|
||||||
if (upstreamContentType) {
|
|
||||||
res.setHeader("Content-Type", upstreamContentType);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!upstreamResponse.body) {
|
|
||||||
throw new Error("OpenClaw response did not include a stream body");
|
|
||||||
}
|
|
||||||
|
|
||||||
for await (const chunk of upstreamResponse.body as unknown as AsyncIterable<Uint8Array>) {
|
|
||||||
if (res.writableEnded || res.destroyed) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
res.write(Buffer.from(chunk));
|
|
||||||
}
|
|
||||||
} catch (error: unknown) {
|
|
||||||
if (!res.writableEnded && !res.destroyed) {
|
|
||||||
const message = error instanceof Error ? error.message : String(error);
|
|
||||||
res.write("event: error\n");
|
|
||||||
res.write(`data: ${JSON.stringify({ error: message })}\n\n`);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (!res.writableEnded && !res.destroyed) {
|
|
||||||
res.end();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
import { Type } from "class-transformer";
|
|
||||||
import { ArrayMinSize, IsArray, IsNotEmpty, IsString, ValidateNested } from "class-validator";
|
|
||||||
|
|
||||||
export interface ChatMessage {
|
|
||||||
role: string;
|
|
||||||
content: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export class ChatMessageDto implements ChatMessage {
|
|
||||||
@IsString({ message: "role must be a string" })
|
|
||||||
@IsNotEmpty({ message: "role is required" })
|
|
||||||
role!: string;
|
|
||||||
|
|
||||||
@IsString({ message: "content must be a string" })
|
|
||||||
@IsNotEmpty({ message: "content is required" })
|
|
||||||
content!: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export class ChatStreamDto {
|
|
||||||
@IsArray({ message: "messages must be an array" })
|
|
||||||
@ArrayMinSize(1, { message: "messages must contain at least one message" })
|
|
||||||
@ValidateNested({ each: true })
|
|
||||||
@Type(() => ChatMessageDto)
|
|
||||||
messages!: ChatMessageDto[];
|
|
||||||
}
|
|
||||||
@@ -1,14 +0,0 @@
|
|||||||
import { Module } from "@nestjs/common";
|
|
||||||
import { AgentConfigModule } from "../agent-config/agent-config.module";
|
|
||||||
import { ContainerLifecycleModule } from "../container-lifecycle/container-lifecycle.module";
|
|
||||||
import { PrismaModule } from "../prisma/prisma.module";
|
|
||||||
import { ChatProxyController } from "./chat-proxy.controller";
|
|
||||||
import { ChatProxyService } from "./chat-proxy.service";
|
|
||||||
|
|
||||||
@Module({
|
|
||||||
imports: [PrismaModule, ContainerLifecycleModule, AgentConfigModule],
|
|
||||||
controllers: [ChatProxyController],
|
|
||||||
providers: [ChatProxyService],
|
|
||||||
exports: [ChatProxyService],
|
|
||||||
})
|
|
||||||
export class ChatProxyModule {}
|
|
||||||
@@ -1,107 +0,0 @@
|
|||||||
import { ServiceUnavailableException } from "@nestjs/common";
|
|
||||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
|
||||||
import { ChatProxyService } from "./chat-proxy.service";
|
|
||||||
|
|
||||||
describe("ChatProxyService", () => {
|
|
||||||
const userId = "user-123";
|
|
||||||
|
|
||||||
const prisma = {
|
|
||||||
userAgentConfig: {
|
|
||||||
findUnique: vi.fn(),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
const containerLifecycle = {
|
|
||||||
ensureRunning: vi.fn(),
|
|
||||||
touch: vi.fn(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let service: ChatProxyService;
|
|
||||||
let fetchMock: ReturnType<typeof vi.fn>;
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
fetchMock = vi.fn();
|
|
||||||
vi.stubGlobal("fetch", fetchMock);
|
|
||||||
service = new ChatProxyService(prisma as never, containerLifecycle as never);
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(() => {
|
|
||||||
vi.unstubAllGlobals();
|
|
||||||
vi.clearAllMocks();
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("getContainerUrl", () => {
|
|
||||||
it("calls ensureRunning and touch for the user", async () => {
|
|
||||||
containerLifecycle.ensureRunning.mockResolvedValue({
|
|
||||||
url: "http://mosaic-user-user-123:19000",
|
|
||||||
token: "gateway-token",
|
|
||||||
});
|
|
||||||
containerLifecycle.touch.mockResolvedValue(undefined);
|
|
||||||
|
|
||||||
const url = await service.getContainerUrl(userId);
|
|
||||||
|
|
||||||
expect(url).toBe("http://mosaic-user-user-123:19000");
|
|
||||||
expect(containerLifecycle.ensureRunning).toHaveBeenCalledWith(userId);
|
|
||||||
expect(containerLifecycle.touch).toHaveBeenCalledWith(userId);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("proxyChat", () => {
|
|
||||||
it("forwards the request to the user's OpenClaw container", async () => {
|
|
||||||
containerLifecycle.ensureRunning.mockResolvedValue({
|
|
||||||
url: "http://mosaic-user-user-123:19000",
|
|
||||||
token: "gateway-token",
|
|
||||||
});
|
|
||||||
containerLifecycle.touch.mockResolvedValue(undefined);
|
|
||||||
fetchMock.mockResolvedValue(new Response("event: token\ndata: hello\n\n"));
|
|
||||||
|
|
||||||
const messages = [{ role: "user", content: "Hello from Mosaic" }];
|
|
||||||
const response = await service.proxyChat(userId, messages);
|
|
||||||
|
|
||||||
expect(response).toBeInstanceOf(Response);
|
|
||||||
expect(fetchMock).toHaveBeenCalledWith(
|
|
||||||
"http://mosaic-user-user-123:19000/v1/chat/completions",
|
|
||||||
expect.objectContaining({
|
|
||||||
method: "POST",
|
|
||||||
headers: {
|
|
||||||
"Content-Type": "application/json",
|
|
||||||
},
|
|
||||||
})
|
|
||||||
);
|
|
||||||
|
|
||||||
const [, request] = fetchMock.mock.calls[0] as [string, RequestInit];
|
|
||||||
const parsedBody = JSON.parse(String(request.body));
|
|
||||||
expect(parsedBody).toEqual({
|
|
||||||
messages,
|
|
||||||
model: "openclaw:default",
|
|
||||||
stream: true,
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it("throws ServiceUnavailableException on connection refused errors", async () => {
|
|
||||||
containerLifecycle.ensureRunning.mockResolvedValue({
|
|
||||||
url: "http://mosaic-user-user-123:19000",
|
|
||||||
token: "gateway-token",
|
|
||||||
});
|
|
||||||
containerLifecycle.touch.mockResolvedValue(undefined);
|
|
||||||
fetchMock.mockRejectedValue(new Error("connect ECONNREFUSED 127.0.0.1:19000"));
|
|
||||||
|
|
||||||
await expect(service.proxyChat(userId, [])).rejects.toBeInstanceOf(
|
|
||||||
ServiceUnavailableException
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("throws ServiceUnavailableException on timeout errors", async () => {
|
|
||||||
containerLifecycle.ensureRunning.mockResolvedValue({
|
|
||||||
url: "http://mosaic-user-user-123:19000",
|
|
||||||
token: "gateway-token",
|
|
||||||
});
|
|
||||||
containerLifecycle.touch.mockResolvedValue(undefined);
|
|
||||||
fetchMock.mockRejectedValue(new Error("The operation was aborted due to timeout"));
|
|
||||||
|
|
||||||
await expect(service.proxyChat(userId, [])).rejects.toBeInstanceOf(
|
|
||||||
ServiceUnavailableException
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
@@ -1,89 +0,0 @@
|
|||||||
import { BadGatewayException, Injectable, ServiceUnavailableException } from "@nestjs/common";
|
|
||||||
import { ContainerLifecycleService } from "../container-lifecycle/container-lifecycle.service";
|
|
||||||
import { PrismaService } from "../prisma/prisma.service";
|
|
||||||
import type { ChatMessage } from "./chat-proxy.dto";
|
|
||||||
|
|
||||||
const DEFAULT_OPENCLAW_MODEL = "openclaw:default";
|
|
||||||
|
|
||||||
@Injectable()
|
|
||||||
export class ChatProxyService {
|
|
||||||
constructor(
|
|
||||||
private readonly prisma: PrismaService,
|
|
||||||
private readonly containerLifecycle: ContainerLifecycleService
|
|
||||||
) {}
|
|
||||||
|
|
||||||
// Get the user's OpenClaw container URL and mark it active.
|
|
||||||
async getContainerUrl(userId: string): Promise<string> {
|
|
||||||
const { url } = await this.containerLifecycle.ensureRunning(userId);
|
|
||||||
await this.containerLifecycle.touch(userId);
|
|
||||||
return url;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Proxy chat request to OpenClaw.
|
|
||||||
async proxyChat(
|
|
||||||
userId: string,
|
|
||||||
messages: ChatMessage[],
|
|
||||||
signal?: AbortSignal
|
|
||||||
): Promise<Response> {
|
|
||||||
const containerUrl = await this.getContainerUrl(userId);
|
|
||||||
const model = await this.getPreferredModel(userId);
|
|
||||||
const requestInit: RequestInit = {
|
|
||||||
method: "POST",
|
|
||||||
headers: { "Content-Type": "application/json" },
|
|
||||||
body: JSON.stringify({
|
|
||||||
messages,
|
|
||||||
model,
|
|
||||||
stream: true,
|
|
||||||
}),
|
|
||||||
};
|
|
||||||
|
|
||||||
if (signal) {
|
|
||||||
requestInit.signal = signal;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
const response = await fetch(`${containerUrl}/v1/chat/completions`, requestInit);
|
|
||||||
|
|
||||||
if (!response.ok) {
|
|
||||||
const detail = await this.readResponseText(response);
|
|
||||||
const status = `${String(response.status)} ${response.statusText}`.trim();
|
|
||||||
const message = detail
|
|
||||||
? `OpenClaw returned ${status}: ${detail}`
|
|
||||||
: `OpenClaw returned ${status}`;
|
|
||||||
throw new BadGatewayException(message);
|
|
||||||
}
|
|
||||||
|
|
||||||
return response;
|
|
||||||
} catch (error: unknown) {
|
|
||||||
if (error instanceof BadGatewayException) {
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
|
|
||||||
const message = error instanceof Error ? error.message : String(error);
|
|
||||||
throw new ServiceUnavailableException(`Failed to proxy chat to OpenClaw: ${message}`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async getPreferredModel(userId: string): Promise<string> {
|
|
||||||
const config = await this.prisma.userAgentConfig.findUnique({
|
|
||||||
where: { userId },
|
|
||||||
select: { primaryModel: true },
|
|
||||||
});
|
|
||||||
|
|
||||||
const primaryModel = config?.primaryModel?.trim();
|
|
||||||
if (!primaryModel) {
|
|
||||||
return DEFAULT_OPENCLAW_MODEL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return primaryModel;
|
|
||||||
}
|
|
||||||
|
|
||||||
private async readResponseText(response: Response): Promise<string | null> {
|
|
||||||
try {
|
|
||||||
const text = (await response.text()).trim();
|
|
||||||
return text.length > 0 ? text : null;
|
|
||||||
} catch {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,3 +0,0 @@
|
|||||||
DATABASE_URL=postgresql://mosaic:changeme@postgres:5432/mosaic
|
|
||||||
DATABASE_PASSWORD=changeme
|
|
||||||
MOSAIC_SECRET_KEY=your-secret-key-at-least-32-characters-long
|
|
||||||
@@ -1,40 +0,0 @@
|
|||||||
# Mosaic Docker (Core Services)
|
|
||||||
|
|
||||||
This folder includes the Compose stack for **core Mosaic services only**:
|
|
||||||
|
|
||||||
- `mosaic-api`
|
|
||||||
- `mosaic-web`
|
|
||||||
- `postgres`
|
|
||||||
|
|
||||||
User OpenClaw containers are **not** defined in Compose. They are created and managed dynamically by the API's `ContainerLifecycleService` through Docker socket access.
|
|
||||||
|
|
||||||
## Start the stack
|
|
||||||
|
|
||||||
```bash
|
|
||||||
docker compose -f docker/mosaic-compose.yml up -d
|
|
||||||
```
|
|
||||||
|
|
||||||
## Required environment variables
|
|
||||||
|
|
||||||
- `DATABASE_URL`
|
|
||||||
- `MOSAIC_SECRET_KEY`
|
|
||||||
- `DATABASE_PASSWORD`
|
|
||||||
|
|
||||||
Use [`docker/.env.example`](./.env.example) as a starting point.
|
|
||||||
|
|
||||||
## Architecture overview
|
|
||||||
|
|
||||||
See the design doc: [`docs/design/MS22-DB-CENTRIC-ARCHITECTURE.md`](../docs/design/MS22-DB-CENTRIC-ARCHITECTURE.md)
|
|
||||||
|
|
||||||
`mosaic-agents` is an internal-only bridge network reserved for dynamically created user containers.
|
|
||||||
|
|
||||||
## OpenClaw entrypoint behavior
|
|
||||||
|
|
||||||
`docker/openclaw-entrypoint.sh` is intended for dynamically created user OpenClaw containers:
|
|
||||||
|
|
||||||
1. Validates required env vars (`MOSAIC_API_URL`, `AGENT_TOKEN`, `AGENT_ID`).
|
|
||||||
2. Fetches agent-specific OpenClaw config from Mosaic API internal endpoint.
|
|
||||||
3. Writes the config to `/tmp/openclaw.json`.
|
|
||||||
4. Starts OpenClaw gateway with `OPENCLAW_CONFIG_PATH=/tmp/openclaw.json`.
|
|
||||||
|
|
||||||
`docker/openclaw-healthcheck.sh` probes `http://localhost:18789/health` for container health.
|
|
||||||
@@ -1,53 +0,0 @@
|
|||||||
services:
|
|
||||||
mosaic-api:
|
|
||||||
image: mosaic/api:latest
|
|
||||||
environment:
|
|
||||||
DATABASE_URL: ${DATABASE_URL}
|
|
||||||
MOSAIC_SECRET_KEY: ${MOSAIC_SECRET_KEY}
|
|
||||||
volumes:
|
|
||||||
- /var/run/docker.sock:/var/run/docker.sock
|
|
||||||
networks:
|
|
||||||
- internal
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD", "curl", "-f", "http://localhost:4000/api/health"]
|
|
||||||
interval: 30s
|
|
||||||
timeout: 10s
|
|
||||||
retries: 3
|
|
||||||
|
|
||||||
mosaic-web:
|
|
||||||
image: mosaic/web:latest
|
|
||||||
environment:
|
|
||||||
NEXT_PUBLIC_API_URL: http://mosaic-api:4000
|
|
||||||
ports:
|
|
||||||
- "3000:3000"
|
|
||||||
networks:
|
|
||||||
- internal
|
|
||||||
depends_on:
|
|
||||||
mosaic-api:
|
|
||||||
condition: service_healthy
|
|
||||||
|
|
||||||
postgres:
|
|
||||||
image: postgres:17-alpine
|
|
||||||
environment:
|
|
||||||
POSTGRES_DB: mosaic
|
|
||||||
POSTGRES_USER: mosaic
|
|
||||||
POSTGRES_PASSWORD: ${DATABASE_PASSWORD}
|
|
||||||
volumes:
|
|
||||||
- postgres-data:/var/lib/postgresql/data
|
|
||||||
networks:
|
|
||||||
- internal
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD-SHELL", "pg_isready -U mosaic"]
|
|
||||||
interval: 10s
|
|
||||||
timeout: 5s
|
|
||||||
retries: 5
|
|
||||||
|
|
||||||
networks:
|
|
||||||
internal:
|
|
||||||
driver: bridge
|
|
||||||
mosaic-agents:
|
|
||||||
driver: bridge
|
|
||||||
internal: true
|
|
||||||
|
|
||||||
volumes:
|
|
||||||
postgres-data:
|
|
||||||
@@ -1,20 +0,0 @@
|
|||||||
#!/bin/sh
|
|
||||||
set -e
|
|
||||||
: "${MOSAIC_API_URL:?MOSAIC_API_URL is required}"
|
|
||||||
: "${AGENT_TOKEN:?AGENT_TOKEN is required}"
|
|
||||||
: "${AGENT_ID:?AGENT_ID is required}"
|
|
||||||
|
|
||||||
echo "[entrypoint] Fetching config for agent ${AGENT_ID}..."
|
|
||||||
HTTP_CODE=$(curl -sf -w "%{http_code}" \
|
|
||||||
"${MOSAIC_API_URL}/api/internal/agent-config/${AGENT_ID}" \
|
|
||||||
-H "Authorization: Bearer ${AGENT_TOKEN}" \
|
|
||||||
-o /tmp/openclaw.json)
|
|
||||||
|
|
||||||
if [ "$HTTP_CODE" != "200" ]; then
|
|
||||||
echo "[entrypoint] ERROR: Config fetch failed with HTTP ${HTTP_CODE}"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "[entrypoint] Config loaded. Starting OpenClaw gateway..."
|
|
||||||
export OPENCLAW_CONFIG_PATH=/tmp/openclaw.json
|
|
||||||
exec openclaw gateway run --bind lan --auth token
|
|
||||||
@@ -1,2 +0,0 @@
|
|||||||
#!/bin/sh
|
|
||||||
curl -sf http://localhost:18789/health || exit 1
|
|
||||||
Reference in New Issue
Block a user