All checks were successful
ci/woodpecker/push/ci Pipeline was successful
Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
332 lines
11 KiB
TypeScript
332 lines
11 KiB
TypeScript
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
|
|
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
|
|
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
|
|
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';
|
|
import { Type } from '@sinclair/typebox';
|
|
import type { ToolDefinition } from '@mariozechner/pi-coding-agent';
|
|
import type { McpServerConfigDto, McpToolDto, McpServerStatusDto } from './mcp-client.dto.js';
|
|
|
|
interface ConnectedServer {
|
|
config: McpServerConfigDto;
|
|
client: Client;
|
|
tools: McpToolDto[];
|
|
connected: boolean;
|
|
error?: string;
|
|
}
|
|
|
|
/**
|
|
* McpClientService connects to external MCP servers, discovers their tools,
|
|
* and bridges them into Pi SDK ToolDefinition format for agent sessions.
|
|
*
|
|
* Configuration is read from the MCP_SERVERS environment variable:
|
|
* MCP_SERVERS='[{"name":"my-server","url":"http://localhost:3001/mcp","headers":{"Authorization":"Bearer token"}}]'
|
|
*/
|
|
@Injectable()
|
|
export class McpClientService implements OnModuleInit, OnModuleDestroy {
|
|
private readonly logger = new Logger(McpClientService.name);
|
|
private readonly servers = new Map<string, ConnectedServer>();
|
|
|
|
async onModuleInit(): Promise<void> {
|
|
const configs = this.loadConfigs();
|
|
if (configs.length === 0) {
|
|
this.logger.log('No external MCP servers configured (MCP_SERVERS not set)');
|
|
return;
|
|
}
|
|
|
|
this.logger.log(`Connecting to ${configs.length} external MCP server(s)`);
|
|
await Promise.allSettled(configs.map((cfg) => this.connectServer(cfg)));
|
|
}
|
|
|
|
async onModuleDestroy(): Promise<void> {
|
|
this.logger.log(`Disconnecting from ${this.servers.size} MCP server(s)`);
|
|
const disconnects = Array.from(this.servers.values()).map((s) => this.disconnectServer(s));
|
|
await Promise.allSettled(disconnects);
|
|
this.servers.clear();
|
|
}
|
|
|
|
/**
|
|
* Returns all bridged Pi SDK ToolDefinitions from all connected MCP servers.
|
|
*/
|
|
getToolDefinitions(): ToolDefinition[] {
|
|
const tools: ToolDefinition[] = [];
|
|
for (const server of this.servers.values()) {
|
|
if (!server.connected) continue;
|
|
for (const mcpTool of server.tools) {
|
|
tools.push(this.bridgeTool(server.client, mcpTool));
|
|
}
|
|
}
|
|
return tools;
|
|
}
|
|
|
|
/**
|
|
* Returns status information for all configured MCP servers.
|
|
*/
|
|
getServerStatuses(): McpServerStatusDto[] {
|
|
return Array.from(this.servers.values()).map((s) => ({
|
|
name: s.config.name,
|
|
url: s.config.url,
|
|
connected: s.connected,
|
|
toolCount: s.tools.length,
|
|
error: s.error,
|
|
}));
|
|
}
|
|
|
|
/**
|
|
* Attempts to reconnect a server that has been disconnected.
|
|
*/
|
|
async reconnectServer(serverName: string): Promise<void> {
|
|
const existing = this.servers.get(serverName);
|
|
if (!existing) {
|
|
throw new Error(`MCP server not found: ${serverName}`);
|
|
}
|
|
if (existing.connected) return;
|
|
|
|
this.logger.log(`Reconnecting to MCP server: ${serverName}`);
|
|
await this.connectServer(existing.config);
|
|
}
|
|
|
|
// ─── Private helpers ──────────────────────────────────────────────────────
|
|
|
|
private loadConfigs(): McpServerConfigDto[] {
|
|
const raw = process.env['MCP_SERVERS'];
|
|
if (!raw) return [];
|
|
|
|
try {
|
|
const parsed: unknown = JSON.parse(raw);
|
|
if (!Array.isArray(parsed)) {
|
|
this.logger.warn('MCP_SERVERS must be a JSON array — ignoring');
|
|
return [];
|
|
}
|
|
|
|
const configs: McpServerConfigDto[] = [];
|
|
for (const item of parsed) {
|
|
if (
|
|
typeof item === 'object' &&
|
|
item !== null &&
|
|
'name' in item &&
|
|
typeof (item as Record<string, unknown>)['name'] === 'string' &&
|
|
'url' in item &&
|
|
typeof (item as Record<string, unknown>)['url'] === 'string'
|
|
) {
|
|
const cfg = item as McpServerConfigDto;
|
|
configs.push({
|
|
name: cfg.name,
|
|
url: cfg.url,
|
|
headers: cfg.headers,
|
|
});
|
|
} else {
|
|
this.logger.warn(`Skipping invalid MCP server config entry: ${JSON.stringify(item)}`);
|
|
}
|
|
}
|
|
|
|
return configs;
|
|
} catch (err) {
|
|
this.logger.error(
|
|
`Failed to parse MCP_SERVERS: ${err instanceof Error ? err.message : String(err)}`,
|
|
);
|
|
return [];
|
|
}
|
|
}
|
|
|
|
private async connectServer(config: McpServerConfigDto): Promise<void> {
|
|
const serverEntry: ConnectedServer = {
|
|
config,
|
|
client: new Client({ name: 'mosaic-gateway', version: '1.0.0' }),
|
|
tools: [],
|
|
connected: false,
|
|
};
|
|
|
|
// Preserve existing entry if reconnecting
|
|
this.servers.set(config.name, serverEntry);
|
|
|
|
try {
|
|
const url = new URL(config.url);
|
|
const headers = config.headers ?? {};
|
|
|
|
// Attempt StreamableHTTP first, fall back to SSE
|
|
let connected = false;
|
|
|
|
try {
|
|
const transport = new StreamableHTTPClientTransport(url, { requestInit: { headers } });
|
|
await serverEntry.client.connect(transport);
|
|
connected = true;
|
|
this.logger.log(`Connected to MCP server "${config.name}" via StreamableHTTP`);
|
|
} catch (streamErr) {
|
|
this.logger.warn(
|
|
`StreamableHTTP failed for "${config.name}", trying SSE: ${streamErr instanceof Error ? streamErr.message : String(streamErr)}`,
|
|
);
|
|
|
|
// Reset client for SSE attempt
|
|
serverEntry.client = new Client({ name: 'mosaic-gateway', version: '1.0.0' });
|
|
|
|
try {
|
|
const transport = new SSEClientTransport(url, { requestInit: { headers } });
|
|
await serverEntry.client.connect(transport);
|
|
connected = true;
|
|
this.logger.log(`Connected to MCP server "${config.name}" via SSE`);
|
|
} catch (sseErr) {
|
|
throw new Error(
|
|
`Both transports failed for "${config.name}": SSE error: ${sseErr instanceof Error ? sseErr.message : String(sseErr)}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
if (!connected) return;
|
|
|
|
// Discover tools
|
|
const toolsResult = await serverEntry.client.listTools();
|
|
serverEntry.tools = toolsResult.tools.map((t) => ({
|
|
name: `${config.name}__${t.name}`,
|
|
description: t.description ?? `Tool ${t.name} from MCP server ${config.name}`,
|
|
inputSchema: (t.inputSchema as Record<string, unknown>) ?? {},
|
|
serverName: config.name,
|
|
remoteName: t.name,
|
|
}));
|
|
|
|
serverEntry.connected = true;
|
|
this.logger.log(
|
|
`Discovered ${serverEntry.tools.length} tool(s) from MCP server "${config.name}"`,
|
|
);
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
serverEntry.error = message;
|
|
serverEntry.connected = false;
|
|
this.logger.error(`Failed to connect to MCP server "${config.name}": ${message}`);
|
|
}
|
|
}
|
|
|
|
private async disconnectServer(server: ConnectedServer): Promise<void> {
|
|
try {
|
|
await server.client.close();
|
|
} catch (err) {
|
|
this.logger.warn(
|
|
`Error closing MCP client for "${server.config.name}": ${err instanceof Error ? err.message : String(err)}`,
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Bridges a single McpToolDto into a Pi SDK ToolDefinition.
|
|
* The MCP inputSchema is converted to a TypeBox schema representation.
|
|
*/
|
|
private bridgeTool(client: Client, mcpTool: McpToolDto): ToolDefinition {
|
|
const schema = this.inputSchemaToTypeBox(mcpTool.inputSchema);
|
|
|
|
return {
|
|
name: mcpTool.name,
|
|
label: mcpTool.remoteName,
|
|
description: mcpTool.description,
|
|
parameters: schema,
|
|
execute: async (_toolCallId: string, params: unknown) => {
|
|
try {
|
|
const result = await client.callTool({
|
|
name: mcpTool.remoteName,
|
|
arguments: (params as Record<string, unknown>) ?? {},
|
|
});
|
|
|
|
// MCP callTool returns { content: [...], isError?: boolean }
|
|
const content = Array.isArray(result.content) ? result.content : [];
|
|
const textParts = content
|
|
.filter((c): c is { type: 'text'; text: string } => c.type === 'text')
|
|
.map((c) => c.text)
|
|
.join('\n');
|
|
|
|
if (result.isError) {
|
|
return {
|
|
content: [
|
|
{
|
|
type: 'text' as const,
|
|
text: `MCP tool error from "${mcpTool.serverName}/${mcpTool.remoteName}": ${textParts || 'Unknown error'}`,
|
|
},
|
|
],
|
|
details: undefined,
|
|
};
|
|
}
|
|
|
|
return {
|
|
content:
|
|
content.length > 0
|
|
? (content as { type: 'text'; text: string }[])
|
|
: [{ type: 'text' as const, text: '' }],
|
|
details: undefined,
|
|
};
|
|
} catch (err) {
|
|
const message = err instanceof Error ? err.message : String(err);
|
|
this.logger.error(
|
|
`MCP tool call failed: ${mcpTool.serverName}/${mcpTool.remoteName}: ${message}`,
|
|
);
|
|
return {
|
|
content: [
|
|
{
|
|
type: 'text' as const,
|
|
text: `Failed to call MCP tool "${mcpTool.name}": ${message}`,
|
|
},
|
|
],
|
|
details: undefined,
|
|
};
|
|
}
|
|
},
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Converts a JSON Schema object to a TypeBox-compatible schema.
|
|
* For simplicity, maps the inputSchema properties to TypeBox Type.Object.
|
|
* Unknown/complex schemas fall back to Type.Object with Type.Unknown values.
|
|
*/
|
|
private inputSchemaToTypeBox(
|
|
inputSchema: Record<string, unknown>,
|
|
): ReturnType<typeof Type.Object> {
|
|
const properties = inputSchema['properties'];
|
|
|
|
if (!properties || typeof properties !== 'object') {
|
|
return Type.Object({});
|
|
}
|
|
|
|
const required: string[] = Array.isArray(inputSchema['required'])
|
|
? (inputSchema['required'] as string[])
|
|
: [];
|
|
|
|
const tbProps: Record<string, ReturnType<typeof Type.String>> = {};
|
|
|
|
for (const [key, schemaDef] of Object.entries(properties as Record<string, unknown>)) {
|
|
const def = schemaDef as Record<string, unknown>;
|
|
const desc = typeof def['description'] === 'string' ? def['description'] : undefined;
|
|
const isOptional = !required.includes(key);
|
|
const base = this.jsonSchemaToTypeBox(def);
|
|
tbProps[key] = isOptional
|
|
? (Type.Optional(base) as unknown as ReturnType<typeof Type.String>)
|
|
: (base as unknown as ReturnType<typeof Type.String>);
|
|
if (desc && tbProps[key]) {
|
|
// Attach description via metadata
|
|
(tbProps[key] as Record<string, unknown>)['description'] = desc;
|
|
}
|
|
}
|
|
|
|
return Type.Object(tbProps as Parameters<typeof Type.Object>[0]);
|
|
}
|
|
|
|
private jsonSchemaToTypeBox(
|
|
def: Record<string, unknown>,
|
|
):
|
|
| ReturnType<typeof Type.String>
|
|
| ReturnType<typeof Type.Number>
|
|
| ReturnType<typeof Type.Boolean>
|
|
| ReturnType<typeof Type.Unknown> {
|
|
const type = def['type'];
|
|
const desc = typeof def['description'] === 'string' ? { description: def['description'] } : {};
|
|
|
|
switch (type) {
|
|
case 'string':
|
|
return Type.String(desc);
|
|
case 'number':
|
|
case 'integer':
|
|
return Type.Number(desc);
|
|
case 'boolean':
|
|
return Type.Boolean(desc);
|
|
default:
|
|
return Type.Unknown(desc);
|
|
}
|
|
}
|
|
}
|