feat(api): add OpenClawGatewayModule with agent registry (MS22-P1b)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
This commit is contained in:
273
apps/api/src/openclaw-gateway/openclaw-gateway.service.ts
Normal file
273
apps/api/src/openclaw-gateway/openclaw-gateway.service.ts
Normal file
@@ -0,0 +1,273 @@
|
||||
import { HttpService } from "@nestjs/axios";
|
||||
import {
|
||||
Injectable,
|
||||
Logger,
|
||||
NotFoundException,
|
||||
ServiceUnavailableException,
|
||||
UnauthorizedException,
|
||||
} from "@nestjs/common";
|
||||
import type { OpenClawAgent } from "@prisma/client";
|
||||
import type { Readable } from "node:stream";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
import type { ChatMessage } from "./openclaw-gateway.dto";
|
||||
|
||||
interface OpenAiSseChoiceDelta {
|
||||
content?: string;
|
||||
}
|
||||
|
||||
interface OpenAiSseChoice {
|
||||
delta?: OpenAiSseChoiceDelta;
|
||||
}
|
||||
|
||||
interface OpenAiSseError {
|
||||
message?: string;
|
||||
}
|
||||
|
||||
interface OpenAiSsePayload {
|
||||
choices?: OpenAiSseChoice[];
|
||||
error?: OpenAiSseError;
|
||||
}
|
||||
|
||||
type ParsedSseEvent = { done: true } | { done: false; content: string } | null;
|
||||
|
||||
interface GatewayErrorLike {
|
||||
message?: string;
|
||||
code?: string;
|
||||
response?: {
|
||||
status?: number;
|
||||
};
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class OpenClawGatewayService {
|
||||
private readonly logger = new Logger(OpenClawGatewayService.name);
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly httpService: HttpService
|
||||
) {}
|
||||
|
||||
async *streamChat(
|
||||
agentName: string,
|
||||
messages: ChatMessage[],
|
||||
workspaceId?: string
|
||||
): AsyncGenerator<string> {
|
||||
const agent = await this.prisma.openClawAgent.findUnique({
|
||||
where: { name: agentName },
|
||||
});
|
||||
|
||||
if (!agent) {
|
||||
throw new NotFoundException(`OpenClaw agent '${agentName}' not found`);
|
||||
}
|
||||
|
||||
if (!agent.isActive) {
|
||||
throw new ServiceUnavailableException(`OpenClaw agent '${agentName}' is inactive`);
|
||||
}
|
||||
|
||||
const token = this.resolveGatewayToken(agent.name);
|
||||
const endpoint = this.buildChatEndpoint(agent.gatewayUrl);
|
||||
|
||||
try {
|
||||
const response = await this.httpService.axiosRef.post<Readable>(
|
||||
endpoint,
|
||||
{
|
||||
model: `openclaw:${agent.agentId}`,
|
||||
messages,
|
||||
stream: true,
|
||||
},
|
||||
{
|
||||
responseType: "stream",
|
||||
timeout: 120000,
|
||||
headers: {
|
||||
Authorization: `Bearer ${token}`,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
}
|
||||
);
|
||||
|
||||
for await (const chunk of this.extractContentChunks(response.data)) {
|
||||
yield chunk;
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
this.throwGatewayError(agent, endpoint, workspaceId, error);
|
||||
}
|
||||
}
|
||||
|
||||
private resolveGatewayToken(agentName: string): string {
|
||||
const envKey = this.getTokenEnvKey(agentName);
|
||||
const token = process.env[envKey];
|
||||
|
||||
if (!token) {
|
||||
throw new ServiceUnavailableException(
|
||||
`Missing gateway token for agent '${agentName}'. Set ${envKey}.`
|
||||
);
|
||||
}
|
||||
|
||||
return token;
|
||||
}
|
||||
|
||||
private getTokenEnvKey(agentName: string): string {
|
||||
return `OPENCLAW_TOKEN_${agentName.replace(/-/g, "_").toUpperCase()}`;
|
||||
}
|
||||
|
||||
private buildChatEndpoint(gatewayUrl: string): string {
|
||||
const sanitizedBaseUrl = gatewayUrl.replace(/\/+$/, "");
|
||||
return `${sanitizedBaseUrl}/v1/chat/completions`;
|
||||
}
|
||||
|
||||
private async *extractContentChunks(stream: Readable): AsyncGenerator<string> {
|
||||
let buffer = "";
|
||||
|
||||
for await (const rawChunk of stream) {
|
||||
buffer += this.chunkToString(rawChunk);
|
||||
|
||||
for (;;) {
|
||||
const delimiterMatch = /\r?\n\r?\n/.exec(buffer);
|
||||
const delimiterIndex = delimiterMatch?.index;
|
||||
|
||||
if (delimiterMatch === null || delimiterIndex === undefined) {
|
||||
break;
|
||||
}
|
||||
|
||||
const rawEvent = buffer.slice(0, delimiterIndex);
|
||||
buffer = buffer.slice(delimiterIndex + delimiterMatch[0].length);
|
||||
|
||||
const parsed = this.parseSseEvent(rawEvent);
|
||||
if (parsed === null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (parsed.done) {
|
||||
return;
|
||||
}
|
||||
|
||||
yield parsed.content;
|
||||
}
|
||||
}
|
||||
|
||||
const trailingEvent = this.parseSseEvent(buffer);
|
||||
if (trailingEvent !== null && !trailingEvent.done) {
|
||||
yield trailingEvent.content;
|
||||
}
|
||||
}
|
||||
|
||||
private parseSseEvent(rawEvent: string): ParsedSseEvent {
|
||||
const payload = this.extractSseDataPayload(rawEvent);
|
||||
if (!payload) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (payload === "[DONE]") {
|
||||
return { done: true };
|
||||
}
|
||||
|
||||
let parsedPayload: OpenAiSsePayload;
|
||||
|
||||
try {
|
||||
parsedPayload = JSON.parse(payload) as OpenAiSsePayload;
|
||||
} catch {
|
||||
this.logger.debug(`Skipping non-JSON OpenClaw SSE payload: ${payload}`);
|
||||
return null;
|
||||
}
|
||||
|
||||
if (parsedPayload.error?.message) {
|
||||
throw new ServiceUnavailableException(
|
||||
`OpenClaw gateway error: ${parsedPayload.error.message}`
|
||||
);
|
||||
}
|
||||
|
||||
const content = parsedPayload.choices?.[0]?.delta?.content;
|
||||
|
||||
if (typeof content === "string" && content.length > 0) {
|
||||
return { done: false, content };
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private extractSseDataPayload(rawEvent: string): string | null {
|
||||
if (rawEvent.trim().length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const dataLines = rawEvent
|
||||
.split(/\r?\n/)
|
||||
.filter((line) => line.startsWith("data:"))
|
||||
.map((line) => line.slice(5).trimStart());
|
||||
|
||||
if (dataLines.length === 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return dataLines.join("\n").trim();
|
||||
}
|
||||
|
||||
private chunkToString(chunk: unknown): string {
|
||||
if (typeof chunk === "string") {
|
||||
return chunk;
|
||||
}
|
||||
|
||||
if (Buffer.isBuffer(chunk)) {
|
||||
return chunk.toString("utf8");
|
||||
}
|
||||
|
||||
return String(chunk);
|
||||
}
|
||||
|
||||
private throwGatewayError(
|
||||
agent: OpenClawAgent,
|
||||
endpoint: string,
|
||||
workspaceId: string | undefined,
|
||||
error: unknown
|
||||
): never {
|
||||
if (error instanceof NotFoundException) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (error instanceof UnauthorizedException) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
if (error instanceof ServiceUnavailableException) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
const gatewayError = error as GatewayErrorLike;
|
||||
const statusCode = gatewayError.response?.status;
|
||||
const errorCode = gatewayError.code;
|
||||
const message = gatewayError.message ?? String(error);
|
||||
|
||||
const workspaceSuffix = workspaceId ? ` (workspace ${workspaceId})` : "";
|
||||
|
||||
if (statusCode === 401 || statusCode === 403) {
|
||||
this.logger.error(
|
||||
`OpenClaw auth failed for agent '${agent.name}' at ${endpoint}${workspaceSuffix}: ${message}`
|
||||
);
|
||||
throw new UnauthorizedException(`OpenClaw authentication failed for agent '${agent.name}'`);
|
||||
}
|
||||
|
||||
const isGatewayOfflineCode =
|
||||
errorCode === "ECONNREFUSED" ||
|
||||
errorCode === "ENOTFOUND" ||
|
||||
errorCode === "ETIMEDOUT" ||
|
||||
errorCode === "ECONNRESET";
|
||||
const isGatewayOfflineStatus =
|
||||
statusCode === 502 || statusCode === 503 || statusCode === 504 || statusCode === 522;
|
||||
|
||||
if (isGatewayOfflineCode || isGatewayOfflineStatus) {
|
||||
this.logger.warn(
|
||||
`OpenClaw gateway offline for agent '${agent.name}' at ${endpoint}${workspaceSuffix}: ${message}`
|
||||
);
|
||||
throw new ServiceUnavailableException(
|
||||
`OpenClaw gateway for agent '${agent.name}' is unavailable`
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.error(
|
||||
`OpenClaw request failed for agent '${agent.name}' at ${endpoint}${workspaceSuffix}: ${message}`
|
||||
);
|
||||
throw new ServiceUnavailableException(
|
||||
`OpenClaw request failed for agent '${agent.name}': ${message}`
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user