Files
stack/apps/api/src/mcp/stdio-transport.ts
Jason Woltje b8805cee50 feat(#132): port MCP (Model Context Protocol) infrastructure
Implement MCP Phase 1 infrastructure for agent tool integration with
central hub, tool registry, and STDIO transport layers.

Components:
- McpHubService: Central registry for MCP server lifecycle
- StdioTransport: STDIO process communication with JSON-RPC 2.0
- ToolRegistryService: Tool catalog management
- McpController: REST API for MCP management

Endpoints:
- GET/POST /mcp/servers - List/register servers
- POST /mcp/servers/:id/start|stop - Lifecycle control
- DELETE /mcp/servers/:id - Unregister
- GET /mcp/tools - List tools
- POST /mcp/tools/:name/invoke - Invoke tool

Features:
- Full JSON-RPC 2.0 protocol support
- Process lifecycle management
- Buffered message parsing
- Type-safe with no explicit any types
- Proper cleanup on shutdown

Tests: 85 passing with 90.9% coverage

Fixes #132

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 13:07:58 -06:00

177 lines
4.3 KiB
TypeScript

import { spawn, type ChildProcess } from "node:child_process";
import type { McpRequest, McpResponse } from "./interfaces";
/**
* STDIO transport for MCP server communication
* Spawns a child process and communicates via stdin/stdout using JSON-RPC 2.0
*/
export class StdioTransport {
private process?: ChildProcess;
private pendingRequests = new Map<
string | number,
{ resolve: (value: McpResponse) => void; reject: (error: Error) => void }
>();
private buffer = "";
constructor(
private readonly command: string,
private readonly args?: string[],
private readonly env?: Record<string, string>
) {}
/**
* Start the child process
*/
async start(): Promise<void> {
if (this.isRunning()) {
return;
}
return new Promise((resolve, reject) => {
try {
this.process = spawn(this.command, this.args ?? [], {
env: { ...process.env, ...this.env },
stdio: ["pipe", "pipe", "pipe"],
});
this.process.stdout?.on("data", (data: Buffer) => {
this.handleStdout(data);
});
this.process.stderr?.on("data", (data: Buffer) => {
console.error(`MCP stderr: ${data.toString()}`);
});
this.process.on("error", (error) => {
this.handleProcessError(error);
reject(error);
});
this.process.on("exit", (code) => {
this.handleProcessExit(code);
});
// Resolve immediately after spawn
resolve();
} catch (error: unknown) {
reject(error instanceof Error ? error : new Error(String(error)));
}
});
}
/**
* Send a request and wait for response
*/
async send(request: McpRequest): Promise<McpResponse> {
if (!this.isRunning()) {
throw new Error("Process not running");
}
return new Promise((resolve, reject) => {
this.pendingRequests.set(request.id, { resolve, reject });
const message = JSON.stringify(request) + "\n";
this.process?.stdin?.write(message, (error) => {
if (error) {
this.pendingRequests.delete(request.id);
reject(error);
}
});
});
}
/**
* Stop the child process
*/
async stop(): Promise<void> {
if (!this.isRunning()) {
return;
}
return new Promise((resolve) => {
if (!this.process) {
resolve();
return;
}
this.process.once("exit", () => {
delete this.process;
resolve();
});
// Reject all pending requests
this.rejectAllPending(new Error("Process stopped"));
this.process.kill();
});
}
/**
* Check if process is running
*/
isRunning(): boolean {
return this.process !== undefined && !this.process.killed;
}
/**
* Handle stdout data
*/
private handleStdout(data: Buffer): void {
this.buffer += data.toString();
// Process complete JSON messages (delimited by newlines)
let newlineIndex: number;
while ((newlineIndex = this.buffer.indexOf("\n")) !== -1) {
const message = this.buffer.substring(0, newlineIndex);
this.buffer = this.buffer.substring(newlineIndex + 1);
if (message.trim()) {
try {
const response = JSON.parse(message) as McpResponse;
this.handleResponse(response);
} catch (error) {
console.error("Failed to parse MCP response:", error);
}
}
}
}
/**
* Handle parsed response
*/
private handleResponse(response: McpResponse): void {
const pending = this.pendingRequests.get(response.id);
if (pending) {
this.pendingRequests.delete(response.id);
pending.resolve(response);
}
}
/**
* Handle process error
*/
private handleProcessError(error: Error): void {
this.rejectAllPending(error);
delete this.process;
}
/**
* Handle process exit
*/
private handleProcessExit(code: number | null): void {
const exitCode = code !== null ? String(code) : "unknown";
this.rejectAllPending(new Error(`Process exited with code ${exitCode}`));
delete this.process;
}
/**
* Reject all pending requests
*/
private rejectAllPending(error: Error): void {
for (const pending of this.pendingRequests.values()) {
pending.reject(error);
}
this.pendingRequests.clear();
}
}