From 5891a4b08054b146ddb346d58ec715c20fb83481 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 15 Mar 2026 13:25:15 -0500 Subject: [PATCH] =?UTF-8?q?feat(gateway):=20MCP=20client=20=E2=80=94=20con?= =?UTF-8?q?nect=20to=20external=20MCP=20servers=20as=20agent=20tools=20(#1?= =?UTF-8?q?27)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds McpClientModule: discovers and connects to external MCP servers via StreamableHTTP (SSE fallback), bridges their tools into Pi SDK agent sessions. Configure via MCP_SERVERS env var (JSON array of {name, url, headers?}). Co-Authored-By: Claude Sonnet 4.6 --- apps/gateway/src/agent/agent.module.ts | 3 +- apps/gateway/src/agent/agent.service.ts | 11 +- apps/gateway/src/mcp-client/mcp-client.dto.ts | 33 ++ .../src/mcp-client/mcp-client.module.ts | 8 + .../src/mcp-client/mcp-client.service.ts | 331 ++++++++++++++++++ .../src/mcp-client/mcp-client.tokens.ts | 1 + 6 files changed, 385 insertions(+), 2 deletions(-) create mode 100644 apps/gateway/src/mcp-client/mcp-client.dto.ts create mode 100644 apps/gateway/src/mcp-client/mcp-client.module.ts create mode 100644 apps/gateway/src/mcp-client/mcp-client.service.ts create mode 100644 apps/gateway/src/mcp-client/mcp-client.tokens.ts diff --git a/apps/gateway/src/agent/agent.module.ts b/apps/gateway/src/agent/agent.module.ts index 7e66767..867ce42 100644 --- a/apps/gateway/src/agent/agent.module.ts +++ b/apps/gateway/src/agent/agent.module.ts @@ -5,10 +5,11 @@ import { RoutingService } from './routing.service.js'; import { ProvidersController } from './providers.controller.js'; import { SessionsController } from './sessions.controller.js'; import { CoordModule } from '../coord/coord.module.js'; +import { McpClientModule } from '../mcp-client/mcp-client.module.js'; @Global() @Module({ - imports: [CoordModule], + imports: [CoordModule, McpClientModule], providers: [ProviderService, RoutingService, AgentService], controllers: [ProvidersController, SessionsController], exports: [AgentService, ProviderService, RoutingService], diff --git a/apps/gateway/src/agent/agent.service.ts b/apps/gateway/src/agent/agent.service.ts index 4910c7e..85852ed 100644 --- a/apps/gateway/src/agent/agent.service.ts +++ b/apps/gateway/src/agent/agent.service.ts @@ -13,6 +13,7 @@ import { MEMORY } from '../memory/memory.tokens.js'; import { EmbeddingService } from '../memory/embedding.service.js'; import { CoordService } from '../coord/coord.service.js'; import { ProviderService } from './provider.service.js'; +import { McpClientService } from '../mcp-client/mcp-client.service.js'; import { createBrainTools } from './tools/brain-tools.js'; import { createCoordTools } from './tools/coord-tools.js'; import { createMemoryTools } from './tools/memory-tools.js'; @@ -53,6 +54,7 @@ export class AgentService implements OnModuleDestroy { @Inject(MEMORY) private readonly memory: Memory, @Inject(EmbeddingService) private readonly embeddingService: EmbeddingService, @Inject(CoordService) private readonly coordService: CoordService, + @Inject(McpClientService) private readonly mcpClientService: McpClientService, ) { const fileBaseDir = process.env['AGENT_FILE_SANDBOX_DIR'] ?? process.cwd(); const gitDefaultCwd = process.env['AGENT_GIT_CWD'] ?? process.cwd(); @@ -96,6 +98,13 @@ export class AgentService implements OnModuleDestroy { `Creating agent session: ${sessionId} (provider=${providerName}, model=${modelId})`, ); + // Combine static tools with dynamically discovered MCP client tools + const mcpTools = this.mcpClientService.getToolDefinitions(); + const allCustomTools = [...this.customTools, ...mcpTools]; + if (mcpTools.length > 0) { + this.logger.log(`Attaching ${mcpTools.length} MCP client tool(s) to session ${sessionId}`); + } + let piSession: PiAgentSession; try { const result = await createAgentSession({ @@ -103,7 +112,7 @@ export class AgentService implements OnModuleDestroy { modelRegistry: this.providerService.getRegistry(), model: model ?? undefined, tools: [], - customTools: this.customTools, + customTools: allCustomTools, }); piSession = result.session; } catch (err) { diff --git a/apps/gateway/src/mcp-client/mcp-client.dto.ts b/apps/gateway/src/mcp-client/mcp-client.dto.ts new file mode 100644 index 0000000..9fa3fd2 --- /dev/null +++ b/apps/gateway/src/mcp-client/mcp-client.dto.ts @@ -0,0 +1,33 @@ +/** + * DTOs for MCP client configuration and tool discovery. + */ + +export interface McpServerConfigDto { + /** Unique name identifying this MCP server */ + name: string; + /** URL of the MCP server (streamable HTTP or SSE endpoint) */ + url: string; + /** Optional HTTP headers to send with requests (e.g., Authorization) */ + headers?: Record; +} + +export interface McpToolDto { + /** Namespaced tool name: "__" */ + name: string; + /** Human-readable description of the tool */ + description: string; + /** JSON Schema for tool input parameters */ + inputSchema: Record; + /** MCP server this tool belongs to */ + serverName: string; + /** Original tool name on the remote server */ + remoteName: string; +} + +export interface McpServerStatusDto { + name: string; + url: string; + connected: boolean; + toolCount: number; + error?: string; +} diff --git a/apps/gateway/src/mcp-client/mcp-client.module.ts b/apps/gateway/src/mcp-client/mcp-client.module.ts new file mode 100644 index 0000000..91ee579 --- /dev/null +++ b/apps/gateway/src/mcp-client/mcp-client.module.ts @@ -0,0 +1,8 @@ +import { Module } from '@nestjs/common'; +import { McpClientService } from './mcp-client.service.js'; + +@Module({ + providers: [McpClientService], + exports: [McpClientService], +}) +export class McpClientModule {} diff --git a/apps/gateway/src/mcp-client/mcp-client.service.ts b/apps/gateway/src/mcp-client/mcp-client.service.ts new file mode 100644 index 0000000..28910e0 --- /dev/null +++ b/apps/gateway/src/mcp-client/mcp-client.service.ts @@ -0,0 +1,331 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; +import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js'; +import { Type } from '@sinclair/typebox'; +import type { ToolDefinition } from '@mariozechner/pi-coding-agent'; +import type { McpServerConfigDto, McpToolDto, McpServerStatusDto } from './mcp-client.dto.js'; + +interface ConnectedServer { + config: McpServerConfigDto; + client: Client; + tools: McpToolDto[]; + connected: boolean; + error?: string; +} + +/** + * McpClientService connects to external MCP servers, discovers their tools, + * and bridges them into Pi SDK ToolDefinition format for agent sessions. + * + * Configuration is read from the MCP_SERVERS environment variable: + * MCP_SERVERS='[{"name":"my-server","url":"http://localhost:3001/mcp","headers":{"Authorization":"Bearer token"}}]' + */ +@Injectable() +export class McpClientService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(McpClientService.name); + private readonly servers = new Map(); + + async onModuleInit(): Promise { + const configs = this.loadConfigs(); + if (configs.length === 0) { + this.logger.log('No external MCP servers configured (MCP_SERVERS not set)'); + return; + } + + this.logger.log(`Connecting to ${configs.length} external MCP server(s)`); + await Promise.allSettled(configs.map((cfg) => this.connectServer(cfg))); + } + + async onModuleDestroy(): Promise { + this.logger.log(`Disconnecting from ${this.servers.size} MCP server(s)`); + const disconnects = Array.from(this.servers.values()).map((s) => this.disconnectServer(s)); + await Promise.allSettled(disconnects); + this.servers.clear(); + } + + /** + * Returns all bridged Pi SDK ToolDefinitions from all connected MCP servers. + */ + getToolDefinitions(): ToolDefinition[] { + const tools: ToolDefinition[] = []; + for (const server of this.servers.values()) { + if (!server.connected) continue; + for (const mcpTool of server.tools) { + tools.push(this.bridgeTool(server.client, mcpTool)); + } + } + return tools; + } + + /** + * Returns status information for all configured MCP servers. + */ + getServerStatuses(): McpServerStatusDto[] { + return Array.from(this.servers.values()).map((s) => ({ + name: s.config.name, + url: s.config.url, + connected: s.connected, + toolCount: s.tools.length, + error: s.error, + })); + } + + /** + * Attempts to reconnect a server that has been disconnected. + */ + async reconnectServer(serverName: string): Promise { + const existing = this.servers.get(serverName); + if (!existing) { + throw new Error(`MCP server not found: ${serverName}`); + } + if (existing.connected) return; + + this.logger.log(`Reconnecting to MCP server: ${serverName}`); + await this.connectServer(existing.config); + } + + // ─── Private helpers ────────────────────────────────────────────────────── + + private loadConfigs(): McpServerConfigDto[] { + const raw = process.env['MCP_SERVERS']; + if (!raw) return []; + + try { + const parsed: unknown = JSON.parse(raw); + if (!Array.isArray(parsed)) { + this.logger.warn('MCP_SERVERS must be a JSON array — ignoring'); + return []; + } + + const configs: McpServerConfigDto[] = []; + for (const item of parsed) { + if ( + typeof item === 'object' && + item !== null && + 'name' in item && + typeof (item as Record)['name'] === 'string' && + 'url' in item && + typeof (item as Record)['url'] === 'string' + ) { + const cfg = item as McpServerConfigDto; + configs.push({ + name: cfg.name, + url: cfg.url, + headers: cfg.headers, + }); + } else { + this.logger.warn(`Skipping invalid MCP server config entry: ${JSON.stringify(item)}`); + } + } + + return configs; + } catch (err) { + this.logger.error( + `Failed to parse MCP_SERVERS: ${err instanceof Error ? err.message : String(err)}`, + ); + return []; + } + } + + private async connectServer(config: McpServerConfigDto): Promise { + const serverEntry: ConnectedServer = { + config, + client: new Client({ name: 'mosaic-gateway', version: '1.0.0' }), + tools: [], + connected: false, + }; + + // Preserve existing entry if reconnecting + this.servers.set(config.name, serverEntry); + + try { + const url = new URL(config.url); + const headers = config.headers ?? {}; + + // Attempt StreamableHTTP first, fall back to SSE + let connected = false; + + try { + const transport = new StreamableHTTPClientTransport(url, { requestInit: { headers } }); + await serverEntry.client.connect(transport); + connected = true; + this.logger.log(`Connected to MCP server "${config.name}" via StreamableHTTP`); + } catch (streamErr) { + this.logger.warn( + `StreamableHTTP failed for "${config.name}", trying SSE: ${streamErr instanceof Error ? streamErr.message : String(streamErr)}`, + ); + + // Reset client for SSE attempt + serverEntry.client = new Client({ name: 'mosaic-gateway', version: '1.0.0' }); + + try { + const transport = new SSEClientTransport(url, { requestInit: { headers } }); + await serverEntry.client.connect(transport); + connected = true; + this.logger.log(`Connected to MCP server "${config.name}" via SSE`); + } catch (sseErr) { + throw new Error( + `Both transports failed for "${config.name}": SSE error: ${sseErr instanceof Error ? sseErr.message : String(sseErr)}`, + ); + } + } + + if (!connected) return; + + // Discover tools + const toolsResult = await serverEntry.client.listTools(); + serverEntry.tools = toolsResult.tools.map((t) => ({ + name: `${config.name}__${t.name}`, + description: t.description ?? `Tool ${t.name} from MCP server ${config.name}`, + inputSchema: (t.inputSchema as Record) ?? {}, + serverName: config.name, + remoteName: t.name, + })); + + serverEntry.connected = true; + this.logger.log( + `Discovered ${serverEntry.tools.length} tool(s) from MCP server "${config.name}"`, + ); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + serverEntry.error = message; + serverEntry.connected = false; + this.logger.error(`Failed to connect to MCP server "${config.name}": ${message}`); + } + } + + private async disconnectServer(server: ConnectedServer): Promise { + try { + await server.client.close(); + } catch (err) { + this.logger.warn( + `Error closing MCP client for "${server.config.name}": ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + + /** + * Bridges a single McpToolDto into a Pi SDK ToolDefinition. + * The MCP inputSchema is converted to a TypeBox schema representation. + */ + private bridgeTool(client: Client, mcpTool: McpToolDto): ToolDefinition { + const schema = this.inputSchemaToTypeBox(mcpTool.inputSchema); + + return { + name: mcpTool.name, + label: mcpTool.remoteName, + description: mcpTool.description, + parameters: schema, + execute: async (_toolCallId: string, params: unknown) => { + try { + const result = await client.callTool({ + name: mcpTool.remoteName, + arguments: (params as Record) ?? {}, + }); + + // MCP callTool returns { content: [...], isError?: boolean } + const content = Array.isArray(result.content) ? result.content : []; + const textParts = content + .filter((c): c is { type: 'text'; text: string } => c.type === 'text') + .map((c) => c.text) + .join('\n'); + + if (result.isError) { + return { + content: [ + { + type: 'text' as const, + text: `MCP tool error from "${mcpTool.serverName}/${mcpTool.remoteName}": ${textParts || 'Unknown error'}`, + }, + ], + details: undefined, + }; + } + + return { + content: + content.length > 0 + ? (content as { type: 'text'; text: string }[]) + : [{ type: 'text' as const, text: '' }], + details: undefined, + }; + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.logger.error( + `MCP tool call failed: ${mcpTool.serverName}/${mcpTool.remoteName}: ${message}`, + ); + return { + content: [ + { + type: 'text' as const, + text: `Failed to call MCP tool "${mcpTool.name}": ${message}`, + }, + ], + details: undefined, + }; + } + }, + }; + } + + /** + * Converts a JSON Schema object to a TypeBox-compatible schema. + * For simplicity, maps the inputSchema properties to TypeBox Type.Object. + * Unknown/complex schemas fall back to Type.Object with Type.Unknown values. + */ + private inputSchemaToTypeBox( + inputSchema: Record, + ): ReturnType { + const properties = inputSchema['properties']; + + if (!properties || typeof properties !== 'object') { + return Type.Object({}); + } + + const required: string[] = Array.isArray(inputSchema['required']) + ? (inputSchema['required'] as string[]) + : []; + + const tbProps: Record> = {}; + + for (const [key, schemaDef] of Object.entries(properties as Record)) { + const def = schemaDef as Record; + const desc = typeof def['description'] === 'string' ? def['description'] : undefined; + const isOptional = !required.includes(key); + const base = this.jsonSchemaToTypeBox(def); + tbProps[key] = isOptional + ? (Type.Optional(base) as unknown as ReturnType) + : (base as unknown as ReturnType); + if (desc && tbProps[key]) { + // Attach description via metadata + (tbProps[key] as Record)['description'] = desc; + } + } + + return Type.Object(tbProps as Parameters[0]); + } + + private jsonSchemaToTypeBox( + def: Record, + ): + | ReturnType + | ReturnType + | ReturnType + | ReturnType { + const type = def['type']; + const desc = typeof def['description'] === 'string' ? { description: def['description'] } : {}; + + switch (type) { + case 'string': + return Type.String(desc); + case 'number': + case 'integer': + return Type.Number(desc); + case 'boolean': + return Type.Boolean(desc); + default: + return Type.Unknown(desc); + } + } +} diff --git a/apps/gateway/src/mcp-client/mcp-client.tokens.ts b/apps/gateway/src/mcp-client/mcp-client.tokens.ts new file mode 100644 index 0000000..5ad36c3 --- /dev/null +++ b/apps/gateway/src/mcp-client/mcp-client.tokens.ts @@ -0,0 +1 @@ +export const MCP_CLIENT_SERVICE = 'MCP_CLIENT_SERVICE';