feat(gateway): add MCP server endpoint with streamable HTTP transport (#52)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful

- Install @modelcontextprotocol/sdk and zod in apps/gateway
- Create McpModule with McpService and controller at /mcp endpoint
- Implement stateful streamable HTTP transport (MCP spec 2025-03-26)
- Expose 14 tools: brain (projects/tasks/missions), memory, coord
- Require valid BetterAuth session for all MCP connections
- Per-session McpServer + StreamableHTTPServerTransport instances
- Follows same Fastify onRequest hook pattern as auth handler
- Add mcp.dto.ts with McpToolDescriptor and McpServerInfo interfaces

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-15 13:04:27 -05:00
parent a8dc498794
commit e2de76ca9f
9 changed files with 1196 additions and 11 deletions

View File

@@ -15,6 +15,7 @@
"@fastify/helmet": "^13.0.2",
"@mariozechner/pi-ai": "~0.57.1",
"@mariozechner/pi-coding-agent": "~0.57.1",
"@modelcontextprotocol/sdk": "^1.27.1",
"@mosaic/auth": "workspace:^",
"@mosaic/brain": "workspace:^",
"@mosaic/coord": "workspace:^",
@@ -47,7 +48,8 @@
"reflect-metadata": "^0.2.0",
"rxjs": "^7.8.0",
"socket.io": "^4.8.0",
"uuid": "^11.0.0"
"uuid": "^11.0.0",
"zod": "^4.3.6"
},
"devDependencies": {
"@types/node": "^22.0.0",

View File

@@ -15,6 +15,7 @@ import { MemoryModule } from './memory/memory.module.js';
import { LogModule } from './log/log.module.js';
import { SkillsModule } from './skills/skills.module.js';
import { PluginModule } from './plugin/plugin.module.js';
import { McpModule } from './mcp/mcp.module.js';
import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
@Module({
@@ -34,6 +35,7 @@ import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
LogModule,
SkillsModule,
PluginModule,
McpModule,
],
controllers: [HealthController],
providers: [

View File

@@ -13,6 +13,8 @@ import { FastifyAdapter, type NestFastifyApplication } from '@nestjs/platform-fa
import helmet from '@fastify/helmet';
import { AppModule } from './app.module.js';
import { mountAuthHandler } from './auth/auth.controller.js';
import { mountMcpHandler } from './mcp/mcp.controller.js';
import { McpService } from './mcp/mcp.service.js';
async function bootstrap(): Promise<void> {
const logger = new Logger('Bootstrap');
@@ -50,6 +52,7 @@ async function bootstrap(): Promise<void> {
);
mountAuthHandler(app);
mountMcpHandler(app, app.get(McpService));
const port = Number(process.env['GATEWAY_PORT'] ?? 4000);
await app.listen(port, '0.0.0.0');

View File

@@ -0,0 +1,142 @@
import type { IncomingMessage, ServerResponse } from 'node:http';
import { Logger } from '@nestjs/common';
import { fromNodeHeaders } from 'better-auth/node';
import type { Auth } from '@mosaic/auth';
import type { NestFastifyApplication } from '@nestjs/platform-fastify';
import type { McpService } from './mcp.service.js';
import { AUTH } from '../auth/auth.tokens.js';
/**
* Mounts the MCP streamable HTTP transport endpoint at /mcp on the Fastify instance.
*
* This follows the same low-level Fastify hook pattern used by the auth controller,
* bypassing NestJS routing to directly delegate to the MCP SDK transport handlers.
*
* Endpoint: POST /mcp (and GET /mcp for SSE stream reconnect)
* Auth: Requires a valid BetterAuth session (cookie or Authorization header).
* Session: Stateful — each initialized client gets a session ID via Mcp-Session-Id header.
*/
export function mountMcpHandler(app: NestFastifyApplication, mcpService: McpService): void {
const auth = app.get<Auth>(AUTH);
const logger = new Logger('McpController');
const fastify = app.getHttpAdapter().getInstance();
fastify.addHook(
'onRequest',
(
req: { raw: IncomingMessage; url: string; method: string },
reply: { raw: ServerResponse; hijack: () => void },
done: () => void,
) => {
if (!req.url.startsWith('/mcp')) {
done();
return;
}
reply.hijack();
handleMcpRequest(req, reply, auth, mcpService, logger).catch((err: unknown) => {
logger.error(
`MCP request handler error: ${err instanceof Error ? err.message : String(err)}`,
);
if (!reply.raw.headersSent) {
reply.raw.writeHead(500, { 'Content-Type': 'application/json' });
}
if (!reply.raw.writableEnded) {
reply.raw.end(JSON.stringify({ error: 'Internal server error' }));
}
});
},
);
}
async function handleMcpRequest(
req: { raw: IncomingMessage; url: string; method: string },
reply: { raw: ServerResponse; hijack: () => void },
auth: Auth,
mcpService: McpService,
logger: Logger,
): Promise<void> {
// ─── Authentication ─────────────────────────────────────────────────────
const headers = fromNodeHeaders(req.raw.headers);
const result = await auth.api.getSession({ headers });
if (!result) {
reply.raw.writeHead(401, { 'Content-Type': 'application/json' });
reply.raw.end(JSON.stringify({ error: 'Unauthorized: valid session required' }));
return;
}
const userId = result.user.id;
// ─── Session routing ─────────────────────────────────────────────────────
const sessionId = req.raw.headers['mcp-session-id'];
if (typeof sessionId === 'string' && sessionId.length > 0) {
// Existing session request
const transport = mcpService.getSession(sessionId);
if (!transport) {
logger.warn(`MCP session not found: ${sessionId}`);
reply.raw.writeHead(404, { 'Content-Type': 'application/json' });
reply.raw.end(JSON.stringify({ error: 'Session not found' }));
return;
}
await transport.handleRequest(req.raw, reply.raw);
return;
}
// ─── Initialize new session ───────────────────────────────────────────────
// Only POST requests can initialize a new session (must be initialize message)
if (req.method !== 'POST') {
reply.raw.writeHead(400, { 'Content-Type': 'application/json' });
reply.raw.end(
JSON.stringify({
error: 'New session must be established via POST with initialize message',
}),
);
return;
}
// Parse body to verify this is an initialize request before creating a session
let body: unknown;
try {
body = await readRequestBody(req.raw);
} catch (err) {
logger.warn(
`Failed to parse MCP request body: ${err instanceof Error ? err.message : String(err)}`,
);
reply.raw.writeHead(400, { 'Content-Type': 'application/json' });
reply.raw.end(JSON.stringify({ error: 'Invalid request body' }));
return;
}
// Create new session and handle this initializing request
const { transport } = mcpService.createSession(userId);
logger.log(`New MCP session created for user ${userId}`);
await transport.handleRequest(req.raw, reply.raw, body);
}
/**
* Reads and parses the JSON body from a Node.js IncomingMessage.
*/
function readRequestBody(req: IncomingMessage): Promise<unknown> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
req.on('data', (chunk: Buffer) => chunks.push(chunk));
req.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf8');
if (!raw) {
resolve(undefined);
return;
}
try {
resolve(JSON.parse(raw));
} catch (err) {
reject(err);
}
});
req.on('error', reject);
});
}

View File

@@ -0,0 +1,19 @@
/**
* MCP (Model Context Protocol) DTOs
*
* Defines the data transfer objects for the MCP streamable HTTP transport.
* See: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http
*/
export interface McpToolDescriptor {
name: string;
description: string;
inputSchema: Record<string, unknown>;
}
export interface McpServerInfo {
name: string;
version: string;
protocolVersion: string;
tools: McpToolDescriptor[];
}

View File

@@ -0,0 +1,10 @@
import { Module } from '@nestjs/common';
import { McpService } from './mcp.service.js';
import { CoordModule } from '../coord/coord.module.js';
@Module({
imports: [CoordModule],
providers: [McpService],
exports: [McpService],
})
export class McpModule {}

View File

@@ -0,0 +1,429 @@
import { Injectable, Logger, Inject, OnModuleDestroy } from '@nestjs/common';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { randomUUID } from 'node:crypto';
import { z } from 'zod';
import type { Brain } from '@mosaic/brain';
import type { Memory } from '@mosaic/memory';
import { BRAIN } from '../brain/brain.tokens.js';
import { MEMORY } from '../memory/memory.tokens.js';
import { EmbeddingService } from '../memory/embedding.service.js';
import { CoordService } from '../coord/coord.service.js';
interface SessionEntry {
server: McpServer;
transport: StreamableHTTPServerTransport;
createdAt: Date;
userId: string;
}
@Injectable()
export class McpService implements OnModuleDestroy {
private readonly logger = new Logger(McpService.name);
private readonly sessions = new Map<string, SessionEntry>();
constructor(
@Inject(BRAIN) private readonly brain: Brain,
@Inject(MEMORY) private readonly memory: Memory,
@Inject(EmbeddingService) private readonly embeddings: EmbeddingService,
@Inject(CoordService) private readonly coordService: CoordService,
) {}
/**
* Creates a new MCP session with its own server + transport pair.
* Returns the transport for use by the controller.
*/
createSession(userId: string): { sessionId: string; transport: StreamableHTTPServerTransport } {
const sessionId = randomUUID();
const transport = new StreamableHTTPServerTransport({
sessionIdGenerator: () => sessionId,
onsessioninitialized: (id) => {
this.logger.log(`MCP session initialized: ${id} for user ${userId}`);
},
});
const server = new McpServer(
{ name: 'mosaic-gateway', version: '1.0.0' },
{ capabilities: { tools: {} } },
);
this.registerTools(server, userId);
transport.onclose = () => {
this.logger.log(`MCP session closed: ${sessionId}`);
this.sessions.delete(sessionId);
};
server.connect(transport).catch((err: unknown) => {
this.logger.error(
`MCP server connect error for session ${sessionId}: ${err instanceof Error ? err.message : String(err)}`,
);
});
this.sessions.set(sessionId, { server, transport, createdAt: new Date(), userId });
return { sessionId, transport };
}
/**
* Returns the transport for an existing session, or null if not found.
*/
getSession(sessionId: string): StreamableHTTPServerTransport | null {
return this.sessions.get(sessionId)?.transport ?? null;
}
/**
* Registers all platform tools on the given McpServer instance.
*/
private registerTools(server: McpServer, _userId: string): void {
// ─── Brain: Project tools ────────────────────────────────────────────
server.registerTool(
'brain_list_projects',
{
description: 'List all projects in the brain.',
inputSchema: z.object({}),
},
async () => {
const projects = await this.brain.projects.findAll();
return {
content: [{ type: 'text' as const, text: JSON.stringify(projects, null, 2) }],
};
},
);
server.registerTool(
'brain_get_project',
{
description: 'Get a project by ID.',
inputSchema: z.object({
id: z.string().describe('Project ID (UUID)'),
}),
},
async ({ id }) => {
const project = await this.brain.projects.findById(id);
return {
content: [
{
type: 'text' as const,
text: project ? JSON.stringify(project, null, 2) : `Project not found: ${id}`,
},
],
};
},
);
// ─── Brain: Task tools ───────────────────────────────────────────────
server.registerTool(
'brain_list_tasks',
{
description: 'List tasks, optionally filtered by project, mission, or status.',
inputSchema: z.object({
projectId: z.string().optional().describe('Filter by project ID'),
missionId: z.string().optional().describe('Filter by mission ID'),
status: z.string().optional().describe('Filter by status'),
}),
},
async ({ projectId, missionId, status }) => {
type TaskStatus = 'not-started' | 'in-progress' | 'blocked' | 'done' | 'cancelled';
let tasks;
if (projectId) tasks = await this.brain.tasks.findByProject(projectId);
else if (missionId) tasks = await this.brain.tasks.findByMission(missionId);
else if (status) tasks = await this.brain.tasks.findByStatus(status as TaskStatus);
else tasks = await this.brain.tasks.findAll();
return { content: [{ type: 'text' as const, text: JSON.stringify(tasks, null, 2) }] };
},
);
server.registerTool(
'brain_create_task',
{
description: 'Create a new task in the brain.',
inputSchema: z.object({
title: z.string().describe('Task title'),
description: z.string().optional().describe('Task description'),
projectId: z.string().optional().describe('Project ID'),
missionId: z.string().optional().describe('Mission ID'),
priority: z.string().optional().describe('Priority: low, medium, high, critical'),
}),
},
async (params) => {
type Priority = 'low' | 'medium' | 'high' | 'critical';
const task = await this.brain.tasks.create({
...params,
priority: params.priority as Priority | undefined,
});
return { content: [{ type: 'text' as const, text: JSON.stringify(task, null, 2) }] };
},
);
server.registerTool(
'brain_update_task',
{
description: 'Update an existing task.',
inputSchema: z.object({
id: z.string().describe('Task ID'),
title: z.string().optional(),
description: z.string().optional(),
status: z
.string()
.optional()
.describe('not-started, in-progress, blocked, done, cancelled'),
priority: z.string().optional(),
}),
},
async ({ id, ...updates }) => {
type TaskStatus = 'not-started' | 'in-progress' | 'blocked' | 'done' | 'cancelled';
type Priority = 'low' | 'medium' | 'high' | 'critical';
const task = await this.brain.tasks.update(id, {
...updates,
status: updates.status as TaskStatus | undefined,
priority: updates.priority as Priority | undefined,
});
return {
content: [
{
type: 'text' as const,
text: task ? JSON.stringify(task, null, 2) : `Task not found: ${id}`,
},
],
};
},
);
// ─── Brain: Mission tools ────────────────────────────────────────────
server.registerTool(
'brain_list_missions',
{
description: 'List all missions, optionally filtered by project.',
inputSchema: z.object({
projectId: z.string().optional().describe('Filter by project ID'),
}),
},
async ({ projectId }) => {
const missions = projectId
? await this.brain.missions.findByProject(projectId)
: await this.brain.missions.findAll();
return { content: [{ type: 'text' as const, text: JSON.stringify(missions, null, 2) }] };
},
);
server.registerTool(
'brain_list_conversations',
{
description: 'List conversations for a user.',
inputSchema: z.object({
userId: z.string().describe('User ID'),
}),
},
async ({ userId }) => {
const conversations = await this.brain.conversations.findAll(userId);
return {
content: [{ type: 'text' as const, text: JSON.stringify(conversations, null, 2) }],
};
},
);
// ─── Memory tools ────────────────────────────────────────────────────
server.registerTool(
'memory_search',
{
description:
'Search across stored insights and knowledge using natural language. Returns semantically similar results.',
inputSchema: z.object({
userId: z.string().describe('User ID to search memory for'),
query: z.string().describe('Natural language search query'),
limit: z.number().optional().describe('Max results (default 5)'),
}),
},
async ({ userId, query, limit }) => {
if (!this.embeddings.available) {
return {
content: [
{
type: 'text' as const,
text: 'Semantic search unavailable — no embedding provider configured',
},
],
};
}
const embedding = await this.embeddings.embed(query);
const results = await this.memory.insights.searchByEmbedding(userId, embedding, limit ?? 5);
return { content: [{ type: 'text' as const, text: JSON.stringify(results, null, 2) }] };
},
);
server.registerTool(
'memory_get_preferences',
{
description: 'Retrieve stored preferences for a user.',
inputSchema: z.object({
userId: z.string().describe('User ID'),
category: z
.string()
.optional()
.describe('Filter by category: communication, coding, workflow, appearance, general'),
}),
},
async ({ userId, category }) => {
type Cat = 'communication' | 'coding' | 'workflow' | 'appearance' | 'general';
const prefs = category
? await this.memory.preferences.findByUserAndCategory(userId, category as Cat)
: await this.memory.preferences.findByUser(userId);
return { content: [{ type: 'text' as const, text: JSON.stringify(prefs, null, 2) }] };
},
);
server.registerTool(
'memory_save_preference',
{
description:
'Store a learned user preference (e.g., "prefers tables over paragraphs", "timezone: America/Chicago").',
inputSchema: z.object({
userId: z.string().describe('User ID'),
key: z.string().describe('Preference key'),
value: z.string().describe('Preference value (JSON string)'),
category: z
.string()
.optional()
.describe('Category: communication, coding, workflow, appearance, general'),
}),
},
async ({ userId, key, value, category }) => {
type Cat = 'communication' | 'coding' | 'workflow' | 'appearance' | 'general';
let parsedValue: unknown;
try {
parsedValue = JSON.parse(value);
} catch {
parsedValue = value;
}
const pref = await this.memory.preferences.upsert({
userId,
key,
value: parsedValue,
category: (category as Cat) ?? 'general',
source: 'agent',
});
return { content: [{ type: 'text' as const, text: JSON.stringify(pref, null, 2) }] };
},
);
server.registerTool(
'memory_save_insight',
{
description:
'Store a learned insight, decision, or knowledge extracted from the current interaction.',
inputSchema: z.object({
userId: z.string().describe('User ID'),
content: z.string().describe('The insight or knowledge to store'),
category: z
.string()
.optional()
.describe('Category: decision, learning, preference, fact, pattern, general'),
}),
},
async ({ userId, content, category }) => {
type Cat = 'decision' | 'learning' | 'preference' | 'fact' | 'pattern' | 'general';
const embedding = this.embeddings.available ? await this.embeddings.embed(content) : null;
const insight = await this.memory.insights.create({
userId,
content,
embedding,
source: 'agent',
category: (category as Cat) ?? 'learning',
});
return { content: [{ type: 'text' as const, text: JSON.stringify(insight, null, 2) }] };
},
);
// ─── Coord tools ─────────────────────────────────────────────────────
server.registerTool(
'coord_mission_status',
{
description:
'Get the current orchestration mission status including milestones, tasks, and active session.',
inputSchema: z.object({
projectPath: z
.string()
.optional()
.describe('Project path. Defaults to gateway working directory.'),
}),
},
async ({ projectPath }) => {
const resolvedPath = projectPath ?? process.cwd();
const status = await this.coordService.getMissionStatus(resolvedPath);
return {
content: [
{
type: 'text' as const,
text: status ? JSON.stringify(status, null, 2) : 'No active coord mission found.',
},
],
};
},
);
server.registerTool(
'coord_list_tasks',
{
description: 'List all tasks from the orchestration TASKS.md file.',
inputSchema: z.object({
projectPath: z
.string()
.optional()
.describe('Project path. Defaults to gateway working directory.'),
}),
},
async ({ projectPath }) => {
const resolvedPath = projectPath ?? process.cwd();
const tasks = await this.coordService.listTasks(resolvedPath);
return { content: [{ type: 'text' as const, text: JSON.stringify(tasks, null, 2) }] };
},
);
server.registerTool(
'coord_task_detail',
{
description: 'Get detailed status for a specific orchestration task.',
inputSchema: z.object({
taskId: z.string().describe('Task ID (e.g. P2-005)'),
projectPath: z
.string()
.optional()
.describe('Project path. Defaults to gateway working directory.'),
}),
},
async ({ taskId, projectPath }) => {
const resolvedPath = projectPath ?? process.cwd();
const detail = await this.coordService.getTaskStatus(resolvedPath, taskId);
return {
content: [
{
type: 'text' as const,
text: detail
? JSON.stringify(detail, null, 2)
: `Task ${taskId} not found in coord mission.`,
},
],
};
},
);
}
async onModuleDestroy(): Promise<void> {
this.logger.log(`Closing ${this.sessions.size} MCP sessions on shutdown`);
const closePromises = Array.from(this.sessions.values()).map(({ transport }) =>
transport.close().catch((err: unknown) => {
this.logger.warn(
`Error closing MCP transport: ${err instanceof Error ? err.message : String(err)}`,
);
}),
);
await Promise.all(closePromises);
this.sessions.clear();
}
}

View File

@@ -0,0 +1 @@
export const MCP_SERVICE = 'MCP_SERVICE';

597
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff