Compare commits
2 Commits
fix/logs-p
...
feat/ms22-
| Author | SHA1 | Date | |
|---|---|---|---|
| b13ff68e22 | |||
| c847b74bda |
@@ -80,8 +80,8 @@
|
||||
"session_id": "sess-002",
|
||||
"runtime": "unknown",
|
||||
"started_at": "2026-02-28T20:30:13Z",
|
||||
"ended_at": "",
|
||||
"ended_reason": "",
|
||||
"ended_at": "2026-03-01T14:04:00Z",
|
||||
"ended_reason": "completed",
|
||||
"milestone_at_end": "",
|
||||
"tasks_completed": [],
|
||||
"last_task_id": ""
|
||||
|
||||
@@ -1,8 +0,0 @@
|
||||
{
|
||||
"session_id": "sess-002",
|
||||
"runtime": "unknown",
|
||||
"pid": 3178395,
|
||||
"started_at": "2026-02-28T20:30:13Z",
|
||||
"project_path": "/tmp/ms21-ui-001",
|
||||
"milestone_id": ""
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
-- CreateTable
|
||||
CREATE TABLE "OpenClawAgent" (
|
||||
"id" TEXT NOT NULL,
|
||||
"name" TEXT NOT NULL,
|
||||
"displayName" TEXT NOT NULL,
|
||||
"role" TEXT NOT NULL,
|
||||
"gatewayUrl" TEXT NOT NULL,
|
||||
"agentId" TEXT NOT NULL DEFAULT 'main',
|
||||
"model" TEXT NOT NULL,
|
||||
"isActive" BOOLEAN NOT NULL DEFAULT true,
|
||||
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
"updatedAt" TIMESTAMP(3) NOT NULL,
|
||||
|
||||
CONSTRAINT "OpenClawAgent_pkey" PRIMARY KEY ("id")
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE UNIQUE INDEX "OpenClawAgent_name_key" ON "OpenClawAgent"("name");
|
||||
@@ -1407,6 +1407,19 @@ model Instance {
|
||||
@@map("instances")
|
||||
}
|
||||
|
||||
model OpenClawAgent {
|
||||
id String @id @default(cuid())
|
||||
name String @unique // "mosaic-main", "mosaic-projects", etc.
|
||||
displayName String // "Main Orchestrator", "Projects", etc.
|
||||
role String // "orchestrator" | "developer" | "researcher" | "operations"
|
||||
gatewayUrl String // "http://mosaic-main:18789"
|
||||
agentId String @default("main") // OpenClaw agent id within that instance
|
||||
model String // "zai/glm-5", "ollama/cogito"
|
||||
isActive Boolean @default(true)
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
}
|
||||
|
||||
model FederationConnection {
|
||||
id String @id @default(uuid()) @db.Uuid
|
||||
workspaceId String @map("workspace_id") @db.Uuid
|
||||
|
||||
@@ -49,6 +49,7 @@ import { AdminModule } from "./admin/admin.module";
|
||||
import { TeamsModule } from "./teams/teams.module";
|
||||
import { ImportModule } from "./import/import.module";
|
||||
import { ConversationArchiveModule } from "./conversation-archive/conversation-archive.module";
|
||||
import { OpenClawGatewayModule } from "./openclaw-gateway/openclaw-gateway.module";
|
||||
import { RlsContextInterceptor } from "./common/interceptors/rls-context.interceptor";
|
||||
|
||||
@Module({
|
||||
@@ -121,6 +122,7 @@ import { RlsContextInterceptor } from "./common/interceptors/rls-context.interce
|
||||
TeamsModule,
|
||||
ImportModule,
|
||||
ConversationArchiveModule,
|
||||
OpenClawGatewayModule,
|
||||
],
|
||||
controllers: [AppController, CsrfController],
|
||||
providers: [
|
||||
|
||||
36
apps/api/src/openclaw-gateway/agent-registry.controller.ts
Normal file
36
apps/api/src/openclaw-gateway/agent-registry.controller.ts
Normal file
@@ -0,0 +1,36 @@
|
||||
import { Body, Controller, Get, Param, Patch, Post, Query } from "@nestjs/common";
|
||||
import type { OpenClawAgent } from "@prisma/client";
|
||||
import { AgentRegistryService } from "./agent-registry.service";
|
||||
import type {
|
||||
CreateOpenClawAgentDto,
|
||||
QueryOpenClawAgentsDto,
|
||||
UpdateOpenClawAgentDto,
|
||||
} from "./openclaw-gateway.dto";
|
||||
|
||||
@Controller("openclaw/agents")
|
||||
export class AgentRegistryController {
|
||||
constructor(private readonly agentRegistryService: AgentRegistryService) {}
|
||||
|
||||
@Get()
|
||||
async listAgents(@Query() query: QueryOpenClawAgentsDto): Promise<OpenClawAgent[]> {
|
||||
return this.agentRegistryService.listAgents(query);
|
||||
}
|
||||
|
||||
@Get(":name")
|
||||
async getAgent(@Param("name") name: string): Promise<OpenClawAgent> {
|
||||
return this.agentRegistryService.getAgent(name);
|
||||
}
|
||||
|
||||
@Post()
|
||||
async createAgent(@Body() dto: CreateOpenClawAgentDto): Promise<OpenClawAgent> {
|
||||
return this.agentRegistryService.createAgent(dto);
|
||||
}
|
||||
|
||||
@Patch(":name")
|
||||
async updateAgent(
|
||||
@Param("name") name: string,
|
||||
@Body() dto: UpdateOpenClawAgentDto
|
||||
): Promise<OpenClawAgent> {
|
||||
return this.agentRegistryService.updateAgent(name, dto);
|
||||
}
|
||||
}
|
||||
65
apps/api/src/openclaw-gateway/agent-registry.service.ts
Normal file
65
apps/api/src/openclaw-gateway/agent-registry.service.ts
Normal file
@@ -0,0 +1,65 @@
|
||||
import { Injectable, NotFoundException } from "@nestjs/common";
|
||||
import { Prisma, type OpenClawAgent } from "@prisma/client";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
import type {
|
||||
CreateOpenClawAgentDto,
|
||||
QueryOpenClawAgentsDto,
|
||||
UpdateOpenClawAgentDto,
|
||||
} from "./openclaw-gateway.dto";
|
||||
|
||||
@Injectable()
|
||||
export class AgentRegistryService {
|
||||
constructor(private readonly prisma: PrismaService) {}
|
||||
|
||||
async listAgents(query: QueryOpenClawAgentsDto): Promise<OpenClawAgent[]> {
|
||||
const where = query.isActive === undefined ? {} : { isActive: query.isActive };
|
||||
return this.prisma.openClawAgent.findMany({
|
||||
where,
|
||||
orderBy: { name: "asc" },
|
||||
});
|
||||
}
|
||||
|
||||
async getAgent(name: string): Promise<OpenClawAgent> {
|
||||
const agent = await this.prisma.openClawAgent.findUnique({
|
||||
where: { name },
|
||||
});
|
||||
|
||||
if (!agent) {
|
||||
throw new NotFoundException(`OpenClaw agent '${name}' not found`);
|
||||
}
|
||||
|
||||
return agent;
|
||||
}
|
||||
|
||||
async createAgent(dto: CreateOpenClawAgentDto): Promise<OpenClawAgent> {
|
||||
return this.prisma.openClawAgent.create({
|
||||
data: {
|
||||
name: dto.name,
|
||||
displayName: dto.displayName,
|
||||
role: dto.role,
|
||||
gatewayUrl: dto.gatewayUrl,
|
||||
agentId: dto.agentId ?? "main",
|
||||
model: dto.model,
|
||||
isActive: dto.isActive ?? true,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async updateAgent(name: string, dto: UpdateOpenClawAgentDto): Promise<OpenClawAgent> {
|
||||
await this.getAgent(name);
|
||||
|
||||
const data: Prisma.OpenClawAgentUpdateInput = {};
|
||||
if (dto.name !== undefined) data.name = dto.name;
|
||||
if (dto.displayName !== undefined) data.displayName = dto.displayName;
|
||||
if (dto.role !== undefined) data.role = dto.role;
|
||||
if (dto.gatewayUrl !== undefined) data.gatewayUrl = dto.gatewayUrl;
|
||||
if (dto.agentId !== undefined) data.agentId = dto.agentId;
|
||||
if (dto.model !== undefined) data.model = dto.model;
|
||||
if (dto.isActive !== undefined) data.isActive = dto.isActive;
|
||||
|
||||
return this.prisma.openClawAgent.update({
|
||||
where: { name },
|
||||
data,
|
||||
});
|
||||
}
|
||||
}
|
||||
40
apps/api/src/openclaw-gateway/openclaw-gateway.controller.ts
Normal file
40
apps/api/src/openclaw-gateway/openclaw-gateway.controller.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import { Body, Controller, HttpCode, HttpStatus, Post, Res } from "@nestjs/common";
|
||||
import type { Response } from "express";
|
||||
import { OpenClawGatewayService } from "./openclaw-gateway.service";
|
||||
import { ChatRequestDto } from "./openclaw-gateway.dto";
|
||||
|
||||
@Controller("openclaw")
|
||||
export class OpenClawGatewayController {
|
||||
constructor(private readonly openClawGatewayService: OpenClawGatewayService) {}
|
||||
|
||||
@Post("chat")
|
||||
@HttpCode(HttpStatus.OK)
|
||||
async chat(@Body() dto: ChatRequestDto, @Res() res: Response): Promise<void> {
|
||||
res.setHeader("Content-Type", "text/event-stream");
|
||||
res.setHeader("Cache-Control", "no-cache");
|
||||
res.setHeader("Connection", "keep-alive");
|
||||
res.setHeader("X-Accel-Buffering", "no");
|
||||
|
||||
if (typeof res.flushHeaders === "function") {
|
||||
res.flushHeaders();
|
||||
}
|
||||
|
||||
try {
|
||||
for await (const content of this.openClawGatewayService.streamChat(
|
||||
dto.agent,
|
||||
dto.messages,
|
||||
dto.workspaceId
|
||||
)) {
|
||||
res.write(`data: ${JSON.stringify({ content })}\n\n`);
|
||||
}
|
||||
|
||||
res.write("data: [DONE]\n\n");
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
res.write("event: error\n");
|
||||
res.write(`data: ${JSON.stringify({ error: errorMessage })}\n\n`);
|
||||
} finally {
|
||||
res.end();
|
||||
}
|
||||
}
|
||||
}
|
||||
115
apps/api/src/openclaw-gateway/openclaw-gateway.dto.ts
Normal file
115
apps/api/src/openclaw-gateway/openclaw-gateway.dto.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
import { PartialType } from "@nestjs/mapped-types";
|
||||
import {
|
||||
IsArray,
|
||||
IsBoolean,
|
||||
IsIn,
|
||||
IsOptional,
|
||||
IsString,
|
||||
Matches,
|
||||
MaxLength,
|
||||
MinLength,
|
||||
ValidateNested,
|
||||
} from "class-validator";
|
||||
import { Type, Transform } from "class-transformer";
|
||||
|
||||
export type ChatRole = "system" | "user" | "assistant";
|
||||
|
||||
export interface ChatMessage {
|
||||
role: ChatRole;
|
||||
content: string;
|
||||
}
|
||||
|
||||
export class ChatMessageDto implements ChatMessage {
|
||||
@IsString({ message: "role must be a string" })
|
||||
@IsIn(["system", "user", "assistant"], {
|
||||
message: "role must be one of: system, user, assistant",
|
||||
})
|
||||
role!: ChatRole;
|
||||
|
||||
@IsString({ message: "content must be a string" })
|
||||
@MinLength(1, { message: "content must not be empty" })
|
||||
content!: string;
|
||||
}
|
||||
|
||||
export class ChatRequestDto {
|
||||
@IsString({ message: "agent must be a string" })
|
||||
@MinLength(1, { message: "agent must not be empty" })
|
||||
@MaxLength(100, { message: "agent must not exceed 100 characters" })
|
||||
@Matches(/^[a-z0-9-]+$/, {
|
||||
message: "agent must contain only lowercase letters, numbers, and hyphens",
|
||||
})
|
||||
agent!: string;
|
||||
|
||||
@IsArray({ message: "messages must be an array" })
|
||||
@ValidateNested({ each: true })
|
||||
@Type(() => ChatMessageDto)
|
||||
messages!: ChatMessageDto[];
|
||||
|
||||
@IsOptional()
|
||||
@IsString({ message: "workspaceId must be a string" })
|
||||
workspaceId?: string;
|
||||
}
|
||||
|
||||
export class CreateOpenClawAgentDto {
|
||||
@IsString({ message: "name must be a string" })
|
||||
@MinLength(1, { message: "name must not be empty" })
|
||||
@MaxLength(100, { message: "name must not exceed 100 characters" })
|
||||
@Matches(/^[a-z0-9-]+$/, {
|
||||
message: "name must contain only lowercase letters, numbers, and hyphens",
|
||||
})
|
||||
name!: string;
|
||||
|
||||
@IsString({ message: "displayName must be a string" })
|
||||
@MinLength(1, { message: "displayName must not be empty" })
|
||||
@MaxLength(255, { message: "displayName must not exceed 255 characters" })
|
||||
displayName!: string;
|
||||
|
||||
@IsString({ message: "role must be a string" })
|
||||
@IsIn(["orchestrator", "developer", "researcher", "operations"], {
|
||||
message: "role must be one of: orchestrator, developer, researcher, operations",
|
||||
})
|
||||
role!: string;
|
||||
|
||||
@IsString({ message: "gatewayUrl must be a string" })
|
||||
@MinLength(1, { message: "gatewayUrl must not be empty" })
|
||||
@MaxLength(2048, { message: "gatewayUrl must not exceed 2048 characters" })
|
||||
@Matches(/^https?:\/\/[^\s]+$/i, {
|
||||
message: "gatewayUrl must be a valid HTTP(S) URL",
|
||||
})
|
||||
gatewayUrl!: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsString({ message: "agentId must be a string" })
|
||||
@MinLength(1, { message: "agentId must not be empty" })
|
||||
@MaxLength(100, { message: "agentId must not exceed 100 characters" })
|
||||
agentId?: string;
|
||||
|
||||
@IsString({ message: "model must be a string" })
|
||||
@MinLength(1, { message: "model must not be empty" })
|
||||
@MaxLength(255, { message: "model must not exceed 255 characters" })
|
||||
model!: string;
|
||||
|
||||
@IsOptional()
|
||||
@IsBoolean({ message: "isActive must be a boolean" })
|
||||
isActive?: boolean;
|
||||
}
|
||||
|
||||
export class UpdateOpenClawAgentDto extends PartialType(CreateOpenClawAgentDto) {}
|
||||
|
||||
export class QueryOpenClawAgentsDto {
|
||||
@IsOptional()
|
||||
@Transform(({ value }: { value: unknown }) => {
|
||||
if (typeof value === "boolean") {
|
||||
return value;
|
||||
}
|
||||
if (value === "true" || value === "1") {
|
||||
return true;
|
||||
}
|
||||
if (value === "false" || value === "0") {
|
||||
return false;
|
||||
}
|
||||
return value;
|
||||
})
|
||||
@IsBoolean({ message: "isActive must be a boolean" })
|
||||
isActive?: boolean;
|
||||
}
|
||||
21
apps/api/src/openclaw-gateway/openclaw-gateway.module.ts
Normal file
21
apps/api/src/openclaw-gateway/openclaw-gateway.module.ts
Normal file
@@ -0,0 +1,21 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { HttpModule } from "@nestjs/axios";
|
||||
import { PrismaModule } from "../prisma/prisma.module";
|
||||
import { OpenClawGatewayService } from "./openclaw-gateway.service";
|
||||
import { OpenClawGatewayController } from "./openclaw-gateway.controller";
|
||||
import { AgentRegistryService } from "./agent-registry.service";
|
||||
import { AgentRegistryController } from "./agent-registry.controller";
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
PrismaModule,
|
||||
HttpModule.register({
|
||||
timeout: 120000,
|
||||
maxRedirects: 3,
|
||||
}),
|
||||
],
|
||||
controllers: [OpenClawGatewayController, AgentRegistryController],
|
||||
providers: [OpenClawGatewayService, AgentRegistryService],
|
||||
exports: [OpenClawGatewayService, AgentRegistryService],
|
||||
})
|
||||
export class OpenClawGatewayModule {}
|
||||
212
apps/api/src/openclaw-gateway/openclaw-gateway.service.spec.ts
Normal file
212
apps/api/src/openclaw-gateway/openclaw-gateway.service.spec.ts
Normal file
@@ -0,0 +1,212 @@
|
||||
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
|
||||
import { Test, TestingModule } from "@nestjs/testing";
|
||||
import {
|
||||
NotFoundException,
|
||||
ServiceUnavailableException,
|
||||
UnauthorizedException,
|
||||
} from "@nestjs/common";
|
||||
import { HttpService } from "@nestjs/axios";
|
||||
import { Readable } from "node:stream";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
import { OpenClawGatewayService } from "./openclaw-gateway.service";
|
||||
import type { ChatMessage } from "./openclaw-gateway.dto";
|
||||
|
||||
describe("OpenClawGatewayService", () => {
|
||||
let service: OpenClawGatewayService;
|
||||
|
||||
const mockPrisma = {
|
||||
openClawAgent: {
|
||||
findUnique: vi.fn(),
|
||||
},
|
||||
};
|
||||
|
||||
const mockHttpService = {
|
||||
axiosRef: {
|
||||
post: vi.fn(),
|
||||
},
|
||||
};
|
||||
|
||||
const tokenEnvKey = "OPENCLAW_TOKEN_MOSAIC_MAIN";
|
||||
|
||||
beforeEach(async () => {
|
||||
vi.clearAllMocks();
|
||||
process.env[tokenEnvKey] = "test-token";
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
OpenClawGatewayService,
|
||||
{ provide: PrismaService, useValue: mockPrisma },
|
||||
{ provide: HttpService, useValue: mockHttpService },
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<OpenClawGatewayService>(OpenClawGatewayService);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
delete process.env[tokenEnvKey];
|
||||
});
|
||||
|
||||
it("streams content chunks from OpenClaw SSE responses", async () => {
|
||||
const messages: ChatMessage[] = [{ role: "user", content: "Hello" }];
|
||||
|
||||
mockPrisma.openClawAgent.findUnique.mockResolvedValue({
|
||||
id: "agent-1",
|
||||
name: "mosaic-main",
|
||||
displayName: "Main Orchestrator",
|
||||
role: "orchestrator",
|
||||
gatewayUrl: "http://mosaic-main:18789",
|
||||
agentId: "main",
|
||||
model: "zai/glm-5",
|
||||
isActive: true,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
mockHttpService.axiosRef.post.mockResolvedValue({
|
||||
data: Readable.from([
|
||||
'data: {"choices":[{"delta":{"content":"Hello"}}]}\n\n',
|
||||
'data: {"choices":[{"delta":{"content":" world"}}]}\n\n',
|
||||
"data: [DONE]\n\n",
|
||||
]),
|
||||
});
|
||||
|
||||
const chunks: string[] = [];
|
||||
for await (const chunk of service.streamChat("mosaic-main", messages)) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
|
||||
expect(chunks).toEqual(["Hello", " world"]);
|
||||
expect(mockHttpService.axiosRef.post).toHaveBeenCalledWith(
|
||||
"http://mosaic-main:18789/v1/chat/completions",
|
||||
{
|
||||
model: "openclaw:main",
|
||||
messages,
|
||||
stream: true,
|
||||
},
|
||||
expect.objectContaining({
|
||||
responseType: "stream",
|
||||
headers: expect.objectContaining({
|
||||
Authorization: "Bearer test-token",
|
||||
"Content-Type": "application/json",
|
||||
}),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("throws NotFoundException when agent is not registered", async () => {
|
||||
mockPrisma.openClawAgent.findUnique.mockResolvedValue(null);
|
||||
|
||||
await expect(
|
||||
(async () => {
|
||||
for await (const _chunk of service.streamChat("missing-agent", [])) {
|
||||
// no-op
|
||||
}
|
||||
})()
|
||||
).rejects.toBeInstanceOf(NotFoundException);
|
||||
});
|
||||
|
||||
it("throws ServiceUnavailableException when agent is inactive", async () => {
|
||||
mockPrisma.openClawAgent.findUnique.mockResolvedValue({
|
||||
id: "agent-1",
|
||||
name: "mosaic-main",
|
||||
displayName: "Main Orchestrator",
|
||||
role: "orchestrator",
|
||||
gatewayUrl: "http://mosaic-main:18789",
|
||||
agentId: "main",
|
||||
model: "zai/glm-5",
|
||||
isActive: false,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
await expect(
|
||||
(async () => {
|
||||
for await (const _chunk of service.streamChat("mosaic-main", [])) {
|
||||
// no-op
|
||||
}
|
||||
})()
|
||||
).rejects.toBeInstanceOf(ServiceUnavailableException);
|
||||
});
|
||||
|
||||
it("throws ServiceUnavailableException when token env var is missing", async () => {
|
||||
delete process.env[tokenEnvKey];
|
||||
|
||||
mockPrisma.openClawAgent.findUnique.mockResolvedValue({
|
||||
id: "agent-1",
|
||||
name: "mosaic-main",
|
||||
displayName: "Main Orchestrator",
|
||||
role: "orchestrator",
|
||||
gatewayUrl: "http://mosaic-main:18789",
|
||||
agentId: "main",
|
||||
model: "zai/glm-5",
|
||||
isActive: true,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
await expect(
|
||||
(async () => {
|
||||
for await (const _chunk of service.streamChat("mosaic-main", [])) {
|
||||
// no-op
|
||||
}
|
||||
})()
|
||||
).rejects.toBeInstanceOf(ServiceUnavailableException);
|
||||
});
|
||||
|
||||
it("throws UnauthorizedException when OpenClaw returns 401", async () => {
|
||||
mockPrisma.openClawAgent.findUnique.mockResolvedValue({
|
||||
id: "agent-1",
|
||||
name: "mosaic-main",
|
||||
displayName: "Main Orchestrator",
|
||||
role: "orchestrator",
|
||||
gatewayUrl: "http://mosaic-main:18789",
|
||||
agentId: "main",
|
||||
model: "zai/glm-5",
|
||||
isActive: true,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
mockHttpService.axiosRef.post.mockRejectedValue({
|
||||
message: "Request failed with status code 401",
|
||||
response: { status: 401 },
|
||||
});
|
||||
|
||||
await expect(
|
||||
(async () => {
|
||||
for await (const _chunk of service.streamChat("mosaic-main", [])) {
|
||||
// no-op
|
||||
}
|
||||
})()
|
||||
).rejects.toBeInstanceOf(UnauthorizedException);
|
||||
});
|
||||
|
||||
it("throws ServiceUnavailableException when gateway is offline", async () => {
|
||||
mockPrisma.openClawAgent.findUnique.mockResolvedValue({
|
||||
id: "agent-1",
|
||||
name: "mosaic-main",
|
||||
displayName: "Main Orchestrator",
|
||||
role: "orchestrator",
|
||||
gatewayUrl: "http://mosaic-main:18789",
|
||||
agentId: "main",
|
||||
model: "zai/glm-5",
|
||||
isActive: true,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
});
|
||||
|
||||
mockHttpService.axiosRef.post.mockRejectedValue({
|
||||
message: "connect ECONNREFUSED 127.0.0.1:18789",
|
||||
code: "ECONNREFUSED",
|
||||
});
|
||||
|
||||
await expect(
|
||||
(async () => {
|
||||
for await (const _chunk of service.streamChat("mosaic-main", [])) {
|
||||
// no-op
|
||||
}
|
||||
})()
|
||||
).rejects.toBeInstanceOf(ServiceUnavailableException);
|
||||
});
|
||||
});
|
||||
273
apps/api/src/openclaw-gateway/openclaw-gateway.service.ts
Normal file
273
apps/api/src/openclaw-gateway/openclaw-gateway.service.ts
Normal file
@@ -0,0 +1,273 @@
|
||||
import { HttpService } from "@nestjs/axios";
|
||||
import {
|
||||
Injectable,
|
||||
Logger,
|
||||
NotFoundException,
|
||||
ServiceUnavailableException,
|
||||
UnauthorizedException,
|
||||
} from "@nestjs/common";
|
||||
import type { OpenClawAgent } from "@prisma/client";
|
||||
import type { Readable } from "node:stream";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
import type { ChatMessage } from "./openclaw-gateway.dto";
|
||||
|
||||
interface OpenAiSseChoiceDelta {
|
||||
content?: string;
|
||||
}
|
||||
|
||||
interface OpenAiSseChoice {
|
||||
delta?: OpenAiSseChoiceDelta;
|
||||
}
|
||||
|
||||
interface OpenAiSseError {
|
||||
message?: string;
|
||||
}
|
||||
|
||||
interface OpenAiSsePayload {
|
||||
choices?: OpenAiSseChoice[];
|
||||
error?: OpenAiSseError;
|
||||
}
|
||||
|
||||
type ParsedSseEvent = { done: true } | { done: false; content: string } | null;
|
||||
|
||||
interface GatewayErrorLike {
|
||||
message?: string;
|
||||
code?: string;
|
||||
response?: {
|
||||
status?: number;
|
||||
};
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class OpenClawGatewayService {
|
||||
private readonly logger = new Logger(OpenClawGatewayService.name);
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly httpService: HttpService
|
||||
) {}
|
||||
|
||||
async *streamChat(
|
||||
agentName: string,
|
||||
messages: ChatMessage[],
|
||||
workspaceId?: string
|
||||
): AsyncGenerator<string> {
|
||||
const agent = await this.prisma.openClawAgent.findUnique({
|
||||
where: { name: agentName },
|
||||
});
|
||||
|
||||
if (!agent) {
|
||||
throw new NotFoundException(`OpenClaw agent '${agentName}' not found`);
|
||||
}
|
||||
|
||||
if (!agent.isActive) {
|
||||
throw new ServiceUnavailableException(`OpenClaw agent '${agentName}' is inactive`);
|
||||
}
|
||||
|
||||
const token = this.resolveGatewayToken(agent.name);
|
||||
const endpoint = this.buildChatEndpoint(agent.gatewayUrl);
|
||||
|
||||
try {
|
||||
const response = await this.httpService.axiosRef.post<Readable>(
|
||||
endpoint,
|
||||
{
|
||||
model: `openclaw:${agent.agentId}`,
|
||||
messages,
|
||||
stream: true,
|
||||
},
|
||||
{
|
||||
responseType: "stream",
|
||||
timeout: 120000,
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
for await (const chunk of this.extractContentChunks(response.data)) {
|
||||
yield chunk;
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
this.throwGatewayError(agent, endpoint, workspaceId, error);
|
||||
}
|
||||
}
|
||||
|
||||
private resolveGatewayToken(agentName: string): string {
|
||||
const envKey = this.getTokenEnvKey(agentName);
|
||||
const token = process.env[envKey];
|
||||
|
||||
if (!token) {
|
||||
throw new ServiceUnavailableException(
|
||||
`Missing gateway token for agent '${agentName}'. Set ${envKey}.`
|
||||
);
|
||||
}
|
||||
|
||||
return token;
|
||||
}
|
||||
|
||||
private getTokenEnvKey(agentName: string): string {
|
||||
return `OPENCLAW_TOKEN_${agentName.replace(/-/g, "_").toUpperCase()}`;
|
||||
}
|
||||
|
||||
private buildChatEndpoint(gatewayUrl: string): string {
|
||||
const sanitizedBaseUrl = gatewayUrl.replace(/\/+$/, "");
|
||||
return `${sanitizedBaseUrl}/v1/chat/completions`;
|
||||
}
|
||||
|
||||
private async *extractContentChunks(stream: Readable): AsyncGenerator<string> {
|
||||
let buffer = "";
|
||||
|
||||
for await (const rawChunk of stream) {
|
||||
buffer += this.chunkToString(rawChunk);
|
||||
|
||||
for (;;) {
|
||||
const delimiterMatch = /\r?\n\r?\n/.exec(buffer);
|
||||
const delimiterIndex = delimiterMatch?.index;
|
||||
|
||||
if (delimiterMatch === null || delimiterIndex === undefined) {
|
||||
break;
|
||||
}
|
||||
|
||||
const rawEvent = buffer.slice(0, delimiterIndex);
|
||||
buffer = buffer.slice(delimiterIndex + delimiterMatch[0].length);
|
||||
|
||||
const parsed = this.parseSseEvent(rawEvent);
|
||||
if (parsed === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (parsed.done) {
|
||||
return;
|
||||
}
|
||||
|
||||
yield parsed.content;
|
||||
}
|
||||
}
|
||||
|
||||
const trailingEvent = this.parseSseEvent(buffer);
|
||||
if (trailingEvent !== null && !trailingEvent.done) {
|
||||
yield trailingEvent.content;
|
||||
}
|
||||
}
|
||||
|
||||
private parseSseEvent(rawEvent: string): ParsedSseEvent {
|
||||
const payload = this.extractSseDataPayload(rawEvent);
|
||||
if (!payload) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (payload === "[DONE]") {
|
||||
return { done: true };
|
||||
}
|
||||
|
||||
let parsedPayload: OpenAiSsePayload;
|
||||
|
||||
try {
|
||||
parsedPayload = JSON.parse(payload) as OpenAiSsePayload;
|
||||
} catch {
|
||||
this.logger.debug(`Skipping non-JSON OpenClaw SSE payload: ${payload}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (parsedPayload.error?.message) {
|
||||
throw new ServiceUnavailableException(
|
||||
`OpenClaw gateway error: ${parsedPayload.error.message}`
|
||||
);
|
||||
}
|
||||
|
||||
const content = parsedPayload.choices?.[0]?.delta?.content;
|
||||
|
||||
if (typeof content === "string" && content.length > 0) {
|
||||
return { done: false, content };
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private extractSseDataPayload(rawEvent: string): string | null {
|
||||
if (rawEvent.trim().length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const dataLines = rawEvent
|
||||
.split(/\r?\n/)
|
||||
.filter((line) => line.startsWith("data:"))
|
||||
.map((line) => line.slice(5).trimStart());
|
||||
|
||||
if (dataLines.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return dataLines.join("\n").trim();
|
||||
}
|
||||
|
||||
private chunkToString(chunk: unknown): string {
|
||||
if (typeof chunk === "string") {
|
||||
return chunk;
|
||||
}
|
||||
|
||||
if (Buffer.isBuffer(chunk)) {
|
||||
return chunk.toString("utf8");
|
||||
}
|
||||
|
||||
return String(chunk);
|
||||
}
|
||||
|
||||
private throwGatewayError(
|
||||
agent: OpenClawAgent,
|
||||
endpoint: string,
|
||||
workspaceId: string | undefined,
|
||||
error: unknown
|
||||
): never {
|
||||
if (error instanceof NotFoundException) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (error instanceof UnauthorizedException) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (error instanceof ServiceUnavailableException) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
const gatewayError = error as GatewayErrorLike;
|
||||
const statusCode = gatewayError.response?.status;
|
||||
const errorCode = gatewayError.code;
|
||||
const message = gatewayError.message ?? String(error);
|
||||
|
||||
const workspaceSuffix = workspaceId ? ` (workspace ${workspaceId})` : "";
|
||||
|
||||
if (statusCode === 401 || statusCode === 403) {
|
||||
this.logger.error(
|
||||
`OpenClaw auth failed for agent '${agent.name}' at ${endpoint}${workspaceSuffix}: ${message}`
|
||||
);
|
||||
throw new UnauthorizedException(`OpenClaw authentication failed for agent '${agent.name}'`);
|
||||
}
|
||||
|
||||
const isGatewayOfflineCode =
|
||||
errorCode === "ECONNREFUSED" ||
|
||||
errorCode === "ENOTFOUND" ||
|
||||
errorCode === "ETIMEDOUT" ||
|
||||
errorCode === "ECONNRESET";
|
||||
const isGatewayOfflineStatus =
|
||||
statusCode === 502 || statusCode === 503 || statusCode === 504 || statusCode === 522;
|
||||
|
||||
if (isGatewayOfflineCode || isGatewayOfflineStatus) {
|
||||
this.logger.warn(
|
||||
`OpenClaw gateway offline for agent '${agent.name}' at ${endpoint}${workspaceSuffix}: ${message}`
|
||||
);
|
||||
throw new ServiceUnavailableException(
|
||||
`OpenClaw gateway for agent '${agent.name}' is unavailable`
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.error(
|
||||
`OpenClaw request failed for agent '${agent.name}' at ${endpoint}${workspaceSuffix}: ${message}`
|
||||
);
|
||||
throw new ServiceUnavailableException(
|
||||
`OpenClaw request failed for agent '${agent.name}': ${message}`
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user