Files
stack/apps/api/src/mcp/stdio-transport.spec.ts
Jason Woltje b8805cee50 feat(#132): port MCP (Model Context Protocol) infrastructure
Implement MCP Phase 1 infrastructure for agent tool integration with
central hub, tool registry, and STDIO transport layers.

Components:
- McpHubService: Central registry for MCP server lifecycle
- StdioTransport: STDIO process communication with JSON-RPC 2.0
- ToolRegistryService: Tool catalog management
- McpController: REST API for MCP management

Endpoints:
- GET/POST /mcp/servers - List/register servers
- POST /mcp/servers/:id/start|stop - Lifecycle control
- DELETE /mcp/servers/:id - Unregister
- GET /mcp/tools - List tools
- POST /mcp/tools/:name/invoke - Invoke tool

Features:
- Full JSON-RPC 2.0 protocol support
- Process lifecycle management
- Buffered message parsing
- Type-safe with no explicit any types
- Proper cleanup on shutdown

Tests: 85 passing with 90.9% coverage

Fixes #132

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 13:07:58 -06:00

307 lines
8.5 KiB
TypeScript

import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
import { StdioTransport } from "./stdio-transport";
import type { McpRequest, McpResponse } from "./interfaces";
import { EventEmitter } from "node:events";
// Mock child_process
let mockProcess: any;
vi.mock("node:child_process", () => {
return {
spawn: vi.fn(() => {
class MockChildProcess extends EventEmitter {
stdin = {
write: vi.fn((data: any, callback?: any) => {
if (callback) callback();
return true;
}),
end: vi.fn(),
};
stdout = new EventEmitter();
stderr = new EventEmitter();
kill = vi.fn(() => {
this.killed = true;
setTimeout(() => this.emit("exit", 0), 0);
});
killed = false;
pid = 12345;
}
mockProcess = new MockChildProcess();
return mockProcess;
}),
};
});
describe("StdioTransport", () => {
let transport: StdioTransport;
const command = "test-command";
const args = ["arg1", "arg2"];
const env = { TEST_VAR: "value" };
beforeEach(() => {
vi.clearAllMocks();
});
afterEach(async () => {
if (transport && transport.isRunning()) {
await transport.stop();
}
});
describe("constructor", () => {
it("should create transport with command only", () => {
transport = new StdioTransport(command);
expect(transport).toBeDefined();
expect(transport.isRunning()).toBe(false);
});
it("should create transport with command and args", () => {
transport = new StdioTransport(command, args);
expect(transport).toBeDefined();
});
it("should create transport with command, args, and env", () => {
transport = new StdioTransport(command, args, env);
expect(transport).toBeDefined();
});
});
describe("start", () => {
it("should start the child process", async () => {
transport = new StdioTransport(command, args, env);
await transport.start();
expect(transport.isRunning()).toBe(true);
});
it("should not start if already running", async () => {
transport = new StdioTransport(command);
await transport.start();
const firstStart = transport.isRunning();
await transport.start();
const secondStart = transport.isRunning();
expect(firstStart).toBe(true);
expect(secondStart).toBe(true);
});
});
describe("send", () => {
it("should send request and receive response", async () => {
transport = new StdioTransport(command);
await transport.start();
const request: McpRequest = {
jsonrpc: "2.0",
id: 1,
method: "test",
params: { foo: "bar" },
};
const expectedResponse: McpResponse = {
jsonrpc: "2.0",
id: 1,
result: { success: true },
};
// Simulate response after a short delay
const sendPromise = transport.send(request);
setTimeout(() => {
mockProcess.stdout.emit("data", Buffer.from(JSON.stringify(expectedResponse) + "\n"));
}, 10);
const response = await sendPromise;
expect(response).toEqual(expectedResponse);
});
it("should throw error if not running", async () => {
transport = new StdioTransport(command);
const request: McpRequest = {
jsonrpc: "2.0",
id: 1,
method: "test",
};
await expect(transport.send(request)).rejects.toThrow("Process not running");
});
it("should handle error responses", async () => {
transport = new StdioTransport(command);
await transport.start();
const request: McpRequest = {
jsonrpc: "2.0",
id: 1,
method: "test",
};
const errorResponse: McpResponse = {
jsonrpc: "2.0",
id: 1,
error: {
code: -32601,
message: "Method not found",
},
};
const sendPromise = transport.send(request);
setTimeout(() => {
mockProcess.stdout.emit("data", Buffer.from(JSON.stringify(errorResponse) + "\n"));
}, 10);
const response = await sendPromise;
expect(response.error).toBeDefined();
expect(response.error?.code).toBe(-32601);
});
it("should handle multiple pending requests", async () => {
transport = new StdioTransport(command);
await transport.start();
const request1: McpRequest = { jsonrpc: "2.0", id: 1, method: "test1" };
const request2: McpRequest = { jsonrpc: "2.0", id: 2, method: "test2" };
const response1Promise = transport.send(request1);
const response2Promise = transport.send(request2);
setTimeout(() => {
mockProcess.stdout.emit(
"data",
Buffer.from(JSON.stringify({ jsonrpc: "2.0", id: 2, result: "result2" }) + "\n")
);
mockProcess.stdout.emit(
"data",
Buffer.from(JSON.stringify({ jsonrpc: "2.0", id: 1, result: "result1" }) + "\n")
);
}, 10);
const [response1, response2] = await Promise.all([response1Promise, response2Promise]);
expect(response1.id).toBe(1);
expect(response2.id).toBe(2);
});
it("should handle partial JSON messages", async () => {
transport = new StdioTransport(command);
await transport.start();
const request: McpRequest = {
jsonrpc: "2.0",
id: 1,
method: "test",
};
const fullResponse = JSON.stringify({
jsonrpc: "2.0",
id: 1,
result: { success: true },
});
const sendPromise = transport.send(request);
setTimeout(() => {
// Send response in chunks
mockProcess.stdout.emit("data", Buffer.from(fullResponse.substring(0, 20)));
mockProcess.stdout.emit("data", Buffer.from(fullResponse.substring(20) + "\n"));
}, 10);
const response = await sendPromise;
expect(response.id).toBe(1);
});
});
describe("stop", () => {
it("should stop the running process", async () => {
transport = new StdioTransport(command);
await transport.start();
expect(transport.isRunning()).toBe(true);
await transport.stop();
expect(transport.isRunning()).toBe(false);
});
it("should not throw error if already stopped", async () => {
transport = new StdioTransport(command);
await expect(transport.stop()).resolves.not.toThrow();
});
it("should reject pending requests on stop", async () => {
transport = new StdioTransport(command);
await transport.start();
const request: McpRequest = {
jsonrpc: "2.0",
id: 1,
method: "test",
};
const sendPromise = transport.send(request).catch((error) => error);
// Stop immediately
await transport.stop();
const result = await sendPromise;
expect(result).toBeInstanceOf(Error);
});
});
describe("isRunning", () => {
it("should return false when not started", () => {
transport = new StdioTransport(command);
expect(transport.isRunning()).toBe(false);
});
it("should return true when started", async () => {
transport = new StdioTransport(command);
await transport.start();
expect(transport.isRunning()).toBe(true);
});
it("should return false after stopped", async () => {
transport = new StdioTransport(command);
await transport.start();
await transport.stop();
expect(transport.isRunning()).toBe(false);
});
});
describe("error handling", () => {
it("should handle process exit", async () => {
transport = new StdioTransport(command);
await transport.start();
const mockProcess = (transport as any).process;
mockProcess.emit("exit", 0);
expect(transport.isRunning()).toBe(false);
});
it("should handle process errors", async () => {
transport = new StdioTransport(command);
await transport.start();
const mockProcess = (transport as any).process;
mockProcess.emit("error", new Error("Process error"));
expect(transport.isRunning()).toBe(false);
});
it("should reject pending requests on process error", async () => {
transport = new StdioTransport(command);
await transport.start();
const request: McpRequest = {
jsonrpc: "2.0",
id: 1,
method: "test",
};
const sendPromise = transport.send(request);
setTimeout(() => {
mockProcess.emit("error", new Error("Process crashed"));
}, 10);
await expect(sendPromise).rejects.toThrow();
});
});
});