Implement MCP Phase 1 infrastructure for agent tool integration with central hub, tool registry, and STDIO transport layers. Components: - McpHubService: Central registry for MCP server lifecycle - StdioTransport: STDIO process communication with JSON-RPC 2.0 - ToolRegistryService: Tool catalog management - McpController: REST API for MCP management Endpoints: - GET/POST /mcp/servers - List/register servers - POST /mcp/servers/:id/start|stop - Lifecycle control - DELETE /mcp/servers/:id - Unregister - GET /mcp/tools - List tools - POST /mcp/tools/:name/invoke - Invoke tool Features: - Full JSON-RPC 2.0 protocol support - Process lifecycle management - Buffered message parsing - Type-safe with no explicit any types - Proper cleanup on shutdown Tests: 85 passing with 90.9% coverage Fixes #132 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
307 lines
8.5 KiB
TypeScript
307 lines
8.5 KiB
TypeScript
import { describe, it, expect, beforeEach, afterEach, vi } from "vitest";
|
|
import { StdioTransport } from "./stdio-transport";
|
|
import type { McpRequest, McpResponse } from "./interfaces";
|
|
import { EventEmitter } from "node:events";
|
|
|
|
// Mock child_process
|
|
let mockProcess: any;
|
|
|
|
vi.mock("node:child_process", () => {
|
|
return {
|
|
spawn: vi.fn(() => {
|
|
class MockChildProcess extends EventEmitter {
|
|
stdin = {
|
|
write: vi.fn((data: any, callback?: any) => {
|
|
if (callback) callback();
|
|
return true;
|
|
}),
|
|
end: vi.fn(),
|
|
};
|
|
stdout = new EventEmitter();
|
|
stderr = new EventEmitter();
|
|
kill = vi.fn(() => {
|
|
this.killed = true;
|
|
setTimeout(() => this.emit("exit", 0), 0);
|
|
});
|
|
killed = false;
|
|
pid = 12345;
|
|
}
|
|
|
|
mockProcess = new MockChildProcess();
|
|
return mockProcess;
|
|
}),
|
|
};
|
|
});
|
|
|
|
describe("StdioTransport", () => {
|
|
let transport: StdioTransport;
|
|
const command = "test-command";
|
|
const args = ["arg1", "arg2"];
|
|
const env = { TEST_VAR: "value" };
|
|
|
|
beforeEach(() => {
|
|
vi.clearAllMocks();
|
|
});
|
|
|
|
afterEach(async () => {
|
|
if (transport && transport.isRunning()) {
|
|
await transport.stop();
|
|
}
|
|
});
|
|
|
|
describe("constructor", () => {
|
|
it("should create transport with command only", () => {
|
|
transport = new StdioTransport(command);
|
|
expect(transport).toBeDefined();
|
|
expect(transport.isRunning()).toBe(false);
|
|
});
|
|
|
|
it("should create transport with command and args", () => {
|
|
transport = new StdioTransport(command, args);
|
|
expect(transport).toBeDefined();
|
|
});
|
|
|
|
it("should create transport with command, args, and env", () => {
|
|
transport = new StdioTransport(command, args, env);
|
|
expect(transport).toBeDefined();
|
|
});
|
|
});
|
|
|
|
describe("start", () => {
|
|
it("should start the child process", async () => {
|
|
transport = new StdioTransport(command, args, env);
|
|
await transport.start();
|
|
expect(transport.isRunning()).toBe(true);
|
|
});
|
|
|
|
it("should not start if already running", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
const firstStart = transport.isRunning();
|
|
await transport.start();
|
|
const secondStart = transport.isRunning();
|
|
|
|
expect(firstStart).toBe(true);
|
|
expect(secondStart).toBe(true);
|
|
});
|
|
});
|
|
|
|
describe("send", () => {
|
|
it("should send request and receive response", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
|
|
const request: McpRequest = {
|
|
jsonrpc: "2.0",
|
|
id: 1,
|
|
method: "test",
|
|
params: { foo: "bar" },
|
|
};
|
|
|
|
const expectedResponse: McpResponse = {
|
|
jsonrpc: "2.0",
|
|
id: 1,
|
|
result: { success: true },
|
|
};
|
|
|
|
// Simulate response after a short delay
|
|
const sendPromise = transport.send(request);
|
|
setTimeout(() => {
|
|
mockProcess.stdout.emit("data", Buffer.from(JSON.stringify(expectedResponse) + "\n"));
|
|
}, 10);
|
|
|
|
const response = await sendPromise;
|
|
expect(response).toEqual(expectedResponse);
|
|
});
|
|
|
|
it("should throw error if not running", async () => {
|
|
transport = new StdioTransport(command);
|
|
const request: McpRequest = {
|
|
jsonrpc: "2.0",
|
|
id: 1,
|
|
method: "test",
|
|
};
|
|
|
|
await expect(transport.send(request)).rejects.toThrow("Process not running");
|
|
});
|
|
|
|
it("should handle error responses", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
|
|
const request: McpRequest = {
|
|
jsonrpc: "2.0",
|
|
id: 1,
|
|
method: "test",
|
|
};
|
|
|
|
const errorResponse: McpResponse = {
|
|
jsonrpc: "2.0",
|
|
id: 1,
|
|
error: {
|
|
code: -32601,
|
|
message: "Method not found",
|
|
},
|
|
};
|
|
|
|
const sendPromise = transport.send(request);
|
|
setTimeout(() => {
|
|
mockProcess.stdout.emit("data", Buffer.from(JSON.stringify(errorResponse) + "\n"));
|
|
}, 10);
|
|
|
|
const response = await sendPromise;
|
|
expect(response.error).toBeDefined();
|
|
expect(response.error?.code).toBe(-32601);
|
|
});
|
|
|
|
it("should handle multiple pending requests", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
|
|
const request1: McpRequest = { jsonrpc: "2.0", id: 1, method: "test1" };
|
|
const request2: McpRequest = { jsonrpc: "2.0", id: 2, method: "test2" };
|
|
|
|
const response1Promise = transport.send(request1);
|
|
const response2Promise = transport.send(request2);
|
|
|
|
setTimeout(() => {
|
|
mockProcess.stdout.emit(
|
|
"data",
|
|
Buffer.from(JSON.stringify({ jsonrpc: "2.0", id: 2, result: "result2" }) + "\n")
|
|
);
|
|
mockProcess.stdout.emit(
|
|
"data",
|
|
Buffer.from(JSON.stringify({ jsonrpc: "2.0", id: 1, result: "result1" }) + "\n")
|
|
);
|
|
}, 10);
|
|
|
|
const [response1, response2] = await Promise.all([response1Promise, response2Promise]);
|
|
expect(response1.id).toBe(1);
|
|
expect(response2.id).toBe(2);
|
|
});
|
|
|
|
it("should handle partial JSON messages", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
|
|
const request: McpRequest = {
|
|
jsonrpc: "2.0",
|
|
id: 1,
|
|
method: "test",
|
|
};
|
|
|
|
const fullResponse = JSON.stringify({
|
|
jsonrpc: "2.0",
|
|
id: 1,
|
|
result: { success: true },
|
|
});
|
|
|
|
const sendPromise = transport.send(request);
|
|
setTimeout(() => {
|
|
// Send response in chunks
|
|
mockProcess.stdout.emit("data", Buffer.from(fullResponse.substring(0, 20)));
|
|
mockProcess.stdout.emit("data", Buffer.from(fullResponse.substring(20) + "\n"));
|
|
}, 10);
|
|
|
|
const response = await sendPromise;
|
|
expect(response.id).toBe(1);
|
|
});
|
|
});
|
|
|
|
describe("stop", () => {
|
|
it("should stop the running process", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
expect(transport.isRunning()).toBe(true);
|
|
|
|
await transport.stop();
|
|
expect(transport.isRunning()).toBe(false);
|
|
});
|
|
|
|
it("should not throw error if already stopped", async () => {
|
|
transport = new StdioTransport(command);
|
|
await expect(transport.stop()).resolves.not.toThrow();
|
|
});
|
|
|
|
it("should reject pending requests on stop", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
|
|
const request: McpRequest = {
|
|
jsonrpc: "2.0",
|
|
id: 1,
|
|
method: "test",
|
|
};
|
|
|
|
const sendPromise = transport.send(request).catch((error) => error);
|
|
|
|
// Stop immediately
|
|
await transport.stop();
|
|
|
|
const result = await sendPromise;
|
|
expect(result).toBeInstanceOf(Error);
|
|
});
|
|
});
|
|
|
|
describe("isRunning", () => {
|
|
it("should return false when not started", () => {
|
|
transport = new StdioTransport(command);
|
|
expect(transport.isRunning()).toBe(false);
|
|
});
|
|
|
|
it("should return true when started", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
expect(transport.isRunning()).toBe(true);
|
|
});
|
|
|
|
it("should return false after stopped", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
await transport.stop();
|
|
expect(transport.isRunning()).toBe(false);
|
|
});
|
|
});
|
|
|
|
describe("error handling", () => {
|
|
it("should handle process exit", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
|
|
const mockProcess = (transport as any).process;
|
|
mockProcess.emit("exit", 0);
|
|
|
|
expect(transport.isRunning()).toBe(false);
|
|
});
|
|
|
|
it("should handle process errors", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
|
|
const mockProcess = (transport as any).process;
|
|
mockProcess.emit("error", new Error("Process error"));
|
|
|
|
expect(transport.isRunning()).toBe(false);
|
|
});
|
|
|
|
it("should reject pending requests on process error", async () => {
|
|
transport = new StdioTransport(command);
|
|
await transport.start();
|
|
|
|
const request: McpRequest = {
|
|
jsonrpc: "2.0",
|
|
id: 1,
|
|
method: "test",
|
|
};
|
|
|
|
const sendPromise = transport.send(request);
|
|
|
|
setTimeout(() => {
|
|
mockProcess.emit("error", new Error("Process crashed"));
|
|
}, 10);
|
|
|
|
await expect(sendPromise).rejects.toThrow();
|
|
});
|
|
});
|
|
});
|