feat(gateway): MCP client — connect to external MCP servers as agent tools (#127) (#141)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful

Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #141.
This commit is contained in:
2026-03-15 18:28:31 +00:00
committed by jason.woltje
parent f208f72dc0
commit 09e649fc7e
6 changed files with 385 additions and 2 deletions

View File

@@ -5,10 +5,11 @@ import { RoutingService } from './routing.service.js';
import { ProvidersController } from './providers.controller.js';
import { SessionsController } from './sessions.controller.js';
import { CoordModule } from '../coord/coord.module.js';
import { McpClientModule } from '../mcp-client/mcp-client.module.js';
@Global()
@Module({
imports: [CoordModule],
imports: [CoordModule, McpClientModule],
providers: [ProviderService, RoutingService, AgentService],
controllers: [ProvidersController, SessionsController],
exports: [AgentService, ProviderService, RoutingService],

View File

@@ -13,6 +13,7 @@ import { MEMORY } from '../memory/memory.tokens.js';
import { EmbeddingService } from '../memory/embedding.service.js';
import { CoordService } from '../coord/coord.service.js';
import { ProviderService } from './provider.service.js';
import { McpClientService } from '../mcp-client/mcp-client.service.js';
import { createBrainTools } from './tools/brain-tools.js';
import { createCoordTools } from './tools/coord-tools.js';
import { createMemoryTools } from './tools/memory-tools.js';
@@ -53,6 +54,7 @@ export class AgentService implements OnModuleDestroy {
@Inject(MEMORY) private readonly memory: Memory,
@Inject(EmbeddingService) private readonly embeddingService: EmbeddingService,
@Inject(CoordService) private readonly coordService: CoordService,
@Inject(McpClientService) private readonly mcpClientService: McpClientService,
) {
const fileBaseDir = process.env['AGENT_FILE_SANDBOX_DIR'] ?? process.cwd();
const gitDefaultCwd = process.env['AGENT_GIT_CWD'] ?? process.cwd();
@@ -96,6 +98,13 @@ export class AgentService implements OnModuleDestroy {
`Creating agent session: ${sessionId} (provider=${providerName}, model=${modelId})`,
);
// Combine static tools with dynamically discovered MCP client tools
const mcpTools = this.mcpClientService.getToolDefinitions();
const allCustomTools = [...this.customTools, ...mcpTools];
if (mcpTools.length > 0) {
this.logger.log(`Attaching ${mcpTools.length} MCP client tool(s) to session ${sessionId}`);
}
let piSession: PiAgentSession;
try {
const result = await createAgentSession({
@@ -103,7 +112,7 @@ export class AgentService implements OnModuleDestroy {
modelRegistry: this.providerService.getRegistry(),
model: model ?? undefined,
tools: [],
customTools: this.customTools,
customTools: allCustomTools,
});
piSession = result.session;
} catch (err) {

View File

@@ -0,0 +1,33 @@
/**
* DTOs for MCP client configuration and tool discovery.
*/
export interface McpServerConfigDto {
/** Unique name identifying this MCP server */
name: string;
/** URL of the MCP server (streamable HTTP or SSE endpoint) */
url: string;
/** Optional HTTP headers to send with requests (e.g., Authorization) */
headers?: Record<string, string>;
}
export interface McpToolDto {
/** Namespaced tool name: "<serverName>__<toolName>" */
name: string;
/** Human-readable description of the tool */
description: string;
/** JSON Schema for tool input parameters */
inputSchema: Record<string, unknown>;
/** MCP server this tool belongs to */
serverName: string;
/** Original tool name on the remote server */
remoteName: string;
}
export interface McpServerStatusDto {
name: string;
url: string;
connected: boolean;
toolCount: number;
error?: string;
}

View File

@@ -0,0 +1,8 @@
import { Module } from '@nestjs/common';
import { McpClientService } from './mcp-client.service.js';
@Module({
providers: [McpClientService],
exports: [McpClientService],
})
export class McpClientModule {}

View File

@@ -0,0 +1,331 @@
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
import { SSEClientTransport } from '@modelcontextprotocol/sdk/client/sse.js';
import { Type } from '@sinclair/typebox';
import type { ToolDefinition } from '@mariozechner/pi-coding-agent';
import type { McpServerConfigDto, McpToolDto, McpServerStatusDto } from './mcp-client.dto.js';
interface ConnectedServer {
config: McpServerConfigDto;
client: Client;
tools: McpToolDto[];
connected: boolean;
error?: string;
}
/**
* McpClientService connects to external MCP servers, discovers their tools,
* and bridges them into Pi SDK ToolDefinition format for agent sessions.
*
* Configuration is read from the MCP_SERVERS environment variable:
* MCP_SERVERS='[{"name":"my-server","url":"http://localhost:3001/mcp","headers":{"Authorization":"Bearer token"}}]'
*/
@Injectable()
export class McpClientService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(McpClientService.name);
private readonly servers = new Map<string, ConnectedServer>();
async onModuleInit(): Promise<void> {
const configs = this.loadConfigs();
if (configs.length === 0) {
this.logger.log('No external MCP servers configured (MCP_SERVERS not set)');
return;
}
this.logger.log(`Connecting to ${configs.length} external MCP server(s)`);
await Promise.allSettled(configs.map((cfg) => this.connectServer(cfg)));
}
async onModuleDestroy(): Promise<void> {
this.logger.log(`Disconnecting from ${this.servers.size} MCP server(s)`);
const disconnects = Array.from(this.servers.values()).map((s) => this.disconnectServer(s));
await Promise.allSettled(disconnects);
this.servers.clear();
}
/**
* Returns all bridged Pi SDK ToolDefinitions from all connected MCP servers.
*/
getToolDefinitions(): ToolDefinition[] {
const tools: ToolDefinition[] = [];
for (const server of this.servers.values()) {
if (!server.connected) continue;
for (const mcpTool of server.tools) {
tools.push(this.bridgeTool(server.client, mcpTool));
}
}
return tools;
}
/**
* Returns status information for all configured MCP servers.
*/
getServerStatuses(): McpServerStatusDto[] {
return Array.from(this.servers.values()).map((s) => ({
name: s.config.name,
url: s.config.url,
connected: s.connected,
toolCount: s.tools.length,
error: s.error,
}));
}
/**
* Attempts to reconnect a server that has been disconnected.
*/
async reconnectServer(serverName: string): Promise<void> {
const existing = this.servers.get(serverName);
if (!existing) {
throw new Error(`MCP server not found: ${serverName}`);
}
if (existing.connected) return;
this.logger.log(`Reconnecting to MCP server: ${serverName}`);
await this.connectServer(existing.config);
}
// ─── Private helpers ──────────────────────────────────────────────────────
private loadConfigs(): McpServerConfigDto[] {
const raw = process.env['MCP_SERVERS'];
if (!raw) return [];
try {
const parsed: unknown = JSON.parse(raw);
if (!Array.isArray(parsed)) {
this.logger.warn('MCP_SERVERS must be a JSON array — ignoring');
return [];
}
const configs: McpServerConfigDto[] = [];
for (const item of parsed) {
if (
typeof item === 'object' &&
item !== null &&
'name' in item &&
typeof (item as Record<string, unknown>)['name'] === 'string' &&
'url' in item &&
typeof (item as Record<string, unknown>)['url'] === 'string'
) {
const cfg = item as McpServerConfigDto;
configs.push({
name: cfg.name,
url: cfg.url,
headers: cfg.headers,
});
} else {
this.logger.warn(`Skipping invalid MCP server config entry: ${JSON.stringify(item)}`);
}
}
return configs;
} catch (err) {
this.logger.error(
`Failed to parse MCP_SERVERS: ${err instanceof Error ? err.message : String(err)}`,
);
return [];
}
}
private async connectServer(config: McpServerConfigDto): Promise<void> {
const serverEntry: ConnectedServer = {
config,
client: new Client({ name: 'mosaic-gateway', version: '1.0.0' }),
tools: [],
connected: false,
};
// Preserve existing entry if reconnecting
this.servers.set(config.name, serverEntry);
try {
const url = new URL(config.url);
const headers = config.headers ?? {};
// Attempt StreamableHTTP first, fall back to SSE
let connected = false;
try {
const transport = new StreamableHTTPClientTransport(url, { requestInit: { headers } });
await serverEntry.client.connect(transport);
connected = true;
this.logger.log(`Connected to MCP server "${config.name}" via StreamableHTTP`);
} catch (streamErr) {
this.logger.warn(
`StreamableHTTP failed for "${config.name}", trying SSE: ${streamErr instanceof Error ? streamErr.message : String(streamErr)}`,
);
// Reset client for SSE attempt
serverEntry.client = new Client({ name: 'mosaic-gateway', version: '1.0.0' });
try {
const transport = new SSEClientTransport(url, { requestInit: { headers } });
await serverEntry.client.connect(transport);
connected = true;
this.logger.log(`Connected to MCP server "${config.name}" via SSE`);
} catch (sseErr) {
throw new Error(
`Both transports failed for "${config.name}": SSE error: ${sseErr instanceof Error ? sseErr.message : String(sseErr)}`,
);
}
}
if (!connected) return;
// Discover tools
const toolsResult = await serverEntry.client.listTools();
serverEntry.tools = toolsResult.tools.map((t) => ({
name: `${config.name}__${t.name}`,
description: t.description ?? `Tool ${t.name} from MCP server ${config.name}`,
inputSchema: (t.inputSchema as Record<string, unknown>) ?? {},
serverName: config.name,
remoteName: t.name,
}));
serverEntry.connected = true;
this.logger.log(
`Discovered ${serverEntry.tools.length} tool(s) from MCP server "${config.name}"`,
);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
serverEntry.error = message;
serverEntry.connected = false;
this.logger.error(`Failed to connect to MCP server "${config.name}": ${message}`);
}
}
private async disconnectServer(server: ConnectedServer): Promise<void> {
try {
await server.client.close();
} catch (err) {
this.logger.warn(
`Error closing MCP client for "${server.config.name}": ${err instanceof Error ? err.message : String(err)}`,
);
}
}
/**
* Bridges a single McpToolDto into a Pi SDK ToolDefinition.
* The MCP inputSchema is converted to a TypeBox schema representation.
*/
private bridgeTool(client: Client, mcpTool: McpToolDto): ToolDefinition {
const schema = this.inputSchemaToTypeBox(mcpTool.inputSchema);
return {
name: mcpTool.name,
label: mcpTool.remoteName,
description: mcpTool.description,
parameters: schema,
execute: async (_toolCallId: string, params: unknown) => {
try {
const result = await client.callTool({
name: mcpTool.remoteName,
arguments: (params as Record<string, unknown>) ?? {},
});
// MCP callTool returns { content: [...], isError?: boolean }
const content = Array.isArray(result.content) ? result.content : [];
const textParts = content
.filter((c): c is { type: 'text'; text: string } => c.type === 'text')
.map((c) => c.text)
.join('\n');
if (result.isError) {
return {
content: [
{
type: 'text' as const,
text: `MCP tool error from "${mcpTool.serverName}/${mcpTool.remoteName}": ${textParts || 'Unknown error'}`,
},
],
details: undefined,
};
}
return {
content:
content.length > 0
? (content as { type: 'text'; text: string }[])
: [{ type: 'text' as const, text: '' }],
details: undefined,
};
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.logger.error(
`MCP tool call failed: ${mcpTool.serverName}/${mcpTool.remoteName}: ${message}`,
);
return {
content: [
{
type: 'text' as const,
text: `Failed to call MCP tool "${mcpTool.name}": ${message}`,
},
],
details: undefined,
};
}
},
};
}
/**
* Converts a JSON Schema object to a TypeBox-compatible schema.
* For simplicity, maps the inputSchema properties to TypeBox Type.Object.
* Unknown/complex schemas fall back to Type.Object with Type.Unknown values.
*/
private inputSchemaToTypeBox(
inputSchema: Record<string, unknown>,
): ReturnType<typeof Type.Object> {
const properties = inputSchema['properties'];
if (!properties || typeof properties !== 'object') {
return Type.Object({});
}
const required: string[] = Array.isArray(inputSchema['required'])
? (inputSchema['required'] as string[])
: [];
const tbProps: Record<string, ReturnType<typeof Type.String>> = {};
for (const [key, schemaDef] of Object.entries(properties as Record<string, unknown>)) {
const def = schemaDef as Record<string, unknown>;
const desc = typeof def['description'] === 'string' ? def['description'] : undefined;
const isOptional = !required.includes(key);
const base = this.jsonSchemaToTypeBox(def);
tbProps[key] = isOptional
? (Type.Optional(base) as unknown as ReturnType<typeof Type.String>)
: (base as unknown as ReturnType<typeof Type.String>);
if (desc && tbProps[key]) {
// Attach description via metadata
(tbProps[key] as Record<string, unknown>)['description'] = desc;
}
}
return Type.Object(tbProps as Parameters<typeof Type.Object>[0]);
}
private jsonSchemaToTypeBox(
def: Record<string, unknown>,
):
| ReturnType<typeof Type.String>
| ReturnType<typeof Type.Number>
| ReturnType<typeof Type.Boolean>
| ReturnType<typeof Type.Unknown> {
const type = def['type'];
const desc = typeof def['description'] === 'string' ? { description: def['description'] } : {};
switch (type) {
case 'string':
return Type.String(desc);
case 'number':
case 'integer':
return Type.Number(desc);
case 'boolean':
return Type.Boolean(desc);
default:
return Type.Unknown(desc);
}
}
}

View File

@@ -0,0 +1 @@
export const MCP_CLIENT_SERVICE = 'MCP_CLIENT_SERVICE';