Files
stack/apps/api/src/mcp/mcp-hub.service.ts
Jason Woltje b8805cee50 feat(#132): port MCP (Model Context Protocol) infrastructure
Implement MCP Phase 1 infrastructure for agent tool integration with
central hub, tool registry, and STDIO transport layers.

Components:
- McpHubService: Central registry for MCP server lifecycle
- StdioTransport: STDIO process communication with JSON-RPC 2.0
- ToolRegistryService: Tool catalog management
- McpController: REST API for MCP management

Endpoints:
- GET/POST /mcp/servers - List/register servers
- POST /mcp/servers/:id/start|stop - Lifecycle control
- DELETE /mcp/servers/:id - Unregister
- GET /mcp/tools - List tools
- POST /mcp/tools/:name/invoke - Invoke tool

Features:
- Full JSON-RPC 2.0 protocol support
- Process lifecycle management
- Buffered message parsing
- Type-safe with no explicit any types
- Proper cleanup on shutdown

Tests: 85 passing with 90.9% coverage

Fixes #132

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 13:07:58 -06:00

171 lines
4.0 KiB
TypeScript

import { Injectable, OnModuleDestroy } from "@nestjs/common";
import { StdioTransport } from "./stdio-transport";
import { ToolRegistryService } from "./tool-registry.service";
import type { McpServer, McpServerConfig, McpRequest, McpResponse } from "./interfaces";
/**
* Extended server type with transport
*/
interface McpServerWithTransport extends McpServer {
transport?: StdioTransport;
}
/**
* Central hub for managing MCP servers
* Handles server lifecycle, registration, and request routing
*/
@Injectable()
export class McpHubService implements OnModuleDestroy {
private servers = new Map<string, McpServerWithTransport>();
constructor(private readonly toolRegistry: ToolRegistryService) {}
/**
* Register a new MCP server
*/
async registerServer(config: McpServerConfig): Promise<void> {
const existing = this.servers.get(config.id);
if (existing) {
// Stop existing server before updating
if (existing.status === "running") {
await this.stopServer(config.id);
}
}
const server: McpServer = {
config,
status: "stopped",
};
this.servers.set(config.id, server);
}
/**
* Start an MCP server process
*/
async startServer(serverId: string): Promise<void> {
const server = this.servers.get(serverId);
if (!server) {
throw new Error(`Server ${serverId} not found`);
}
if (server.status === "running") {
return;
}
server.status = "starting";
delete server.error;
try {
const transport = new StdioTransport(
server.config.command,
server.config.args,
server.config.env
);
await transport.start();
server.status = "running";
// Store transport for later use
server.transport = transport;
} catch (error) {
server.status = "error";
server.error = error instanceof Error ? error.message : "Unknown error";
delete server.transport;
throw error;
}
}
/**
* Stop an MCP server
*/
async stopServer(serverId: string): Promise<void> {
const server = this.servers.get(serverId);
if (!server) {
throw new Error(`Server ${serverId} not found`);
}
if (server.status === "stopped") {
return;
}
const transport = server.transport;
if (transport) {
await transport.stop();
}
server.status = "stopped";
delete server.process;
delete server.transport;
// Clear tools provided by this server
this.toolRegistry.clearServerTools(serverId);
}
/**
* Get server status
*/
getServerStatus(serverId: string): McpServer | undefined {
return this.servers.get(serverId);
}
/**
* List all servers
*/
listServers(): McpServer[] {
return Array.from(this.servers.values());
}
/**
* Unregister a server
*/
async unregisterServer(serverId: string): Promise<void> {
const server = this.servers.get(serverId);
if (!server) {
throw new Error(`Server ${serverId} not found`);
}
// Stop server if running
if (server.status === "running") {
await this.stopServer(serverId);
}
this.servers.delete(serverId);
}
/**
* Send request to a server
*/
async sendRequest(serverId: string, request: McpRequest): Promise<McpResponse> {
const server = this.servers.get(serverId);
if (!server) {
throw new Error(`Server ${serverId} not found`);
}
if (server.status !== "running") {
throw new Error(`Server ${serverId} is not running`);
}
if (!server.transport) {
throw new Error(`Server ${serverId} transport not initialized`);
}
return server.transport.send(request);
}
/**
* Cleanup on module destroy
*/
async onModuleDestroy(): Promise<void> {
const stopPromises = Array.from(this.servers.keys()).map((serverId) =>
this.stopServer(serverId).catch((error: unknown) => {
console.error(`Failed to stop server ${serverId}:`, error);
})
);
await Promise.all(stopPromises);
}
}