import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { Client } from '@modelcontextprotocol/sdk/client/index.js'; import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'; import { Type } from '@sinclair/typebox'; import type { ToolDefinition } from '@mariozechner/pi-coding-agent'; import type { McpServerConfigDto, McpToolDto, McpServerStatusDto } from './mcp-client.dto.js'; interface ConnectedServer { config: McpServerConfigDto; client: Client; tools: McpToolDto[]; connected: boolean; error?: string; } /** * McpClientService connects to external MCP servers, discovers their tools, * and bridges them into Pi SDK ToolDefinition format for agent sessions. * * Configuration is read from the MCP_SERVERS environment variable: * MCP_SERVERS='[{"name":"my-server","url":"http://localhost:3001/mcp","headers":{"Authorization":"Bearer token"}}]' */ @Injectable() export class McpClientService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(McpClientService.name); private readonly servers = new Map(); async onModuleInit(): Promise { const configs = this.loadConfigs(); if (configs.length === 0) { this.logger.log('No external MCP servers configured (MCP_SERVERS not set)'); return; } this.logger.log(`Connecting to ${configs.length} external MCP server(s)`); await Promise.allSettled(configs.map((cfg) => this.connectServer(cfg))); } async onModuleDestroy(): Promise { this.logger.log(`Disconnecting from ${this.servers.size} MCP server(s)`); const disconnects = Array.from(this.servers.values()).map((s) => this.disconnectServer(s)); await Promise.allSettled(disconnects); this.servers.clear(); } /** * Returns all bridged Pi SDK ToolDefinitions from all connected MCP servers. */ getToolDefinitions(): ToolDefinition[] { const tools: ToolDefinition[] = []; for (const server of this.servers.values()) { if (!server.connected) continue; for (const mcpTool of server.tools) { tools.push(this.bridgeTool(server.client, mcpTool)); } } return tools; } /** * Returns status information for all configured MCP servers. */ getServerStatuses(): McpServerStatusDto[] { return Array.from(this.servers.values()).map((s) => ({ name: s.config.name, url: s.config.url, connected: s.connected, toolCount: s.tools.length, error: s.error, })); } /** * Attempts to reconnect a server that has been disconnected. */ async reconnectServer(serverName: string): Promise { const existing = this.servers.get(serverName); if (!existing) { throw new Error(`MCP server not found: ${serverName}`); } if (existing.connected) return; this.logger.log(`Reconnecting to MCP server: ${serverName}`); await this.connectServer(existing.config); } // ─── Private helpers ────────────────────────────────────────────────────── private loadConfigs(): McpServerConfigDto[] { const raw = process.env['MCP_SERVERS']; if (!raw) return []; try { const parsed: unknown = JSON.parse(raw); if (!Array.isArray(parsed)) { this.logger.warn('MCP_SERVERS must be a JSON array — ignoring'); return []; } const configs: McpServerConfigDto[] = []; for (const item of parsed) { if ( typeof item === 'object' && item !== null && 'name' in item && typeof (item as Record)['name'] === 'string' && 'url' in item && typeof (item as Record)['url'] === 'string' ) { const cfg = item as McpServerConfigDto; configs.push({ name: cfg.name, url: cfg.url, headers: cfg.headers, }); } else { this.logger.warn(`Skipping invalid MCP server config entry: ${JSON.stringify(item)}`); } } return configs; } catch (err) { this.logger.error( `Failed to parse MCP_SERVERS: ${err instanceof Error ? err.message : String(err)}`, ); return []; } } private async connectServer(config: McpServerConfigDto): Promise { const serverEntry: ConnectedServer = { config, client: new Client({ name: 'mosaic-gateway', version: '1.0.0' }), tools: [], connected: false, }; // Preserve existing entry if reconnecting this.servers.set(config.name, serverEntry); try { const url = new URL(config.url); const headers = config.headers ?? {}; // Attempt StreamableHTTP first, fall back to SSE let connected = false; try { const transport = new StreamableHTTPClientTransport(url, { requestInit: { headers } }); await serverEntry.client.connect(transport); connected = true; this.logger.log(`Connected to MCP server "${config.name}" via StreamableHTTP`); } catch (streamErr) { this.logger.warn( `StreamableHTTP failed for "${config.name}", trying SSE: ${streamErr instanceof Error ? streamErr.message : String(streamErr)}`, ); // Reset client for SSE attempt serverEntry.client = new Client({ name: 'mosaic-gateway', version: '1.0.0' }); try { const transport = new SSEClientTransport(url, { requestInit: { headers } }); await serverEntry.client.connect(transport); connected = true; this.logger.log(`Connected to MCP server "${config.name}" via SSE`); } catch (sseErr) { throw new Error( `Both transports failed for "${config.name}": SSE error: ${sseErr instanceof Error ? sseErr.message : String(sseErr)}`, ); } } if (!connected) return; // Discover tools const toolsResult = await serverEntry.client.listTools(); serverEntry.tools = toolsResult.tools.map((t) => ({ name: `${config.name}__${t.name}`, description: t.description ?? `Tool ${t.name} from MCP server ${config.name}`, inputSchema: (t.inputSchema as Record) ?? {}, serverName: config.name, remoteName: t.name, })); serverEntry.connected = true; this.logger.log( `Discovered ${serverEntry.tools.length} tool(s) from MCP server "${config.name}"`, ); } catch (err) { const message = err instanceof Error ? err.message : String(err); serverEntry.error = message; serverEntry.connected = false; this.logger.error(`Failed to connect to MCP server "${config.name}": ${message}`); } } private async disconnectServer(server: ConnectedServer): Promise { try { await server.client.close(); } catch (err) { this.logger.warn( `Error closing MCP client for "${server.config.name}": ${err instanceof Error ? err.message : String(err)}`, ); } } /** * Bridges a single McpToolDto into a Pi SDK ToolDefinition. * The MCP inputSchema is converted to a TypeBox schema representation. */ private bridgeTool(client: Client, mcpTool: McpToolDto): ToolDefinition { const schema = this.inputSchemaToTypeBox(mcpTool.inputSchema); return { name: mcpTool.name, label: mcpTool.remoteName, description: mcpTool.description, parameters: schema, execute: async (_toolCallId: string, params: unknown) => { try { const result = await client.callTool({ name: mcpTool.remoteName, arguments: (params as Record) ?? {}, }); // MCP callTool returns { content: [...], isError?: boolean } const content = Array.isArray(result.content) ? result.content : []; const textParts = content .filter((c): c is { type: 'text'; text: string } => c.type === 'text') .map((c) => c.text) .join('\n'); if (result.isError) { return { content: [ { type: 'text' as const, text: `MCP tool error from "${mcpTool.serverName}/${mcpTool.remoteName}": ${textParts || 'Unknown error'}`, }, ], details: undefined, }; } return { content: content.length > 0 ? (content as { type: 'text'; text: string }[]) : [{ type: 'text' as const, text: '' }], details: undefined, }; } catch (err) { const message = err instanceof Error ? err.message : String(err); this.logger.error( `MCP tool call failed: ${mcpTool.serverName}/${mcpTool.remoteName}: ${message}`, ); return { content: [ { type: 'text' as const, text: `Failed to call MCP tool "${mcpTool.name}": ${message}`, }, ], details: undefined, }; } }, }; } /** * Converts a JSON Schema object to a TypeBox-compatible schema. * For simplicity, maps the inputSchema properties to TypeBox Type.Object. * Unknown/complex schemas fall back to Type.Object with Type.Unknown values. */ private inputSchemaToTypeBox( inputSchema: Record, ): ReturnType { const properties = inputSchema['properties']; if (!properties || typeof properties !== 'object') { return Type.Object({}); } const required: string[] = Array.isArray(inputSchema['required']) ? (inputSchema['required'] as string[]) : []; const tbProps: Record> = {}; for (const [key, schemaDef] of Object.entries(properties as Record)) { const def = schemaDef as Record; const desc = typeof def['description'] === 'string' ? def['description'] : undefined; const isOptional = !required.includes(key); const base = this.jsonSchemaToTypeBox(def); tbProps[key] = isOptional ? (Type.Optional(base) as unknown as ReturnType) : (base as unknown as ReturnType); if (desc && tbProps[key]) { // Attach description via metadata (tbProps[key] as Record)['description'] = desc; } } return Type.Object(tbProps as Parameters[0]); } private jsonSchemaToTypeBox( def: Record, ): | ReturnType | ReturnType | ReturnType | ReturnType { const type = def['type']; const desc = typeof def['description'] === 'string' ? { description: def['description'] } : {}; switch (type) { case 'string': return Type.String(desc); case 'number': case 'integer': return Type.Number(desc); case 'boolean': return Type.Boolean(desc); default: return Type.Unknown(desc); } } }