import { Injectable, Logger, Inject, OnModuleDestroy } from '@nestjs/common'; import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; import { randomUUID } from 'node:crypto'; import { z } from 'zod'; import type { Brain } from '@mosaicstack/brain'; import type { Memory } from '@mosaicstack/memory'; import { BRAIN } from '../brain/brain.tokens.js'; import { MEMORY } from '../memory/memory.tokens.js'; import { EmbeddingService } from '../memory/embedding.service.js'; import { CoordService } from '../coord/coord.service.js'; interface SessionEntry { server: McpServer; transport: StreamableHTTPServerTransport; createdAt: Date; userId: string; } @Injectable() export class McpService implements OnModuleDestroy { private readonly logger = new Logger(McpService.name); private readonly sessions = new Map(); constructor( @Inject(BRAIN) private readonly brain: Brain, @Inject(MEMORY) private readonly memory: Memory, @Inject(EmbeddingService) private readonly embeddings: EmbeddingService, @Inject(CoordService) private readonly coordService: CoordService, ) {} /** * Creates a new MCP session with its own server + transport pair. * Returns the transport for use by the controller. */ createSession(userId: string): { sessionId: string; transport: StreamableHTTPServerTransport } { const sessionId = randomUUID(); const transport = new StreamableHTTPServerTransport({ sessionIdGenerator: () => sessionId, onsessioninitialized: (id) => { this.logger.log(`MCP session initialized: ${id} for user ${userId}`); }, }); const server = new McpServer( { name: 'mosaic-gateway', version: '1.0.0' }, { capabilities: { tools: {} } }, ); this.registerTools(server, userId); transport.onclose = () => { this.logger.log(`MCP session closed: ${sessionId}`); this.sessions.delete(sessionId); }; server.connect(transport).catch((err: unknown) => { this.logger.error( `MCP server connect error for session ${sessionId}: ${err instanceof Error ? err.message : String(err)}`, ); }); this.sessions.set(sessionId, { server, transport, createdAt: new Date(), userId }); return { sessionId, transport }; } /** * Returns the transport for an existing session, or null if not found. */ getSession(sessionId: string): StreamableHTTPServerTransport | null { return this.sessions.get(sessionId)?.transport ?? null; } /** * Registers all platform tools on the given McpServer instance. */ private registerTools(server: McpServer, _userId: string): void { // ─── Brain: Project tools ──────────────────────────────────────────── server.registerTool( 'brain_list_projects', { description: 'List all projects in the brain.', inputSchema: z.object({}), }, async () => { const projects = await this.brain.projects.findAll(); return { content: [{ type: 'text' as const, text: JSON.stringify(projects, null, 2) }], }; }, ); server.registerTool( 'brain_get_project', { description: 'Get a project by ID.', inputSchema: z.object({ id: z.string().describe('Project ID (UUID)'), }), }, async ({ id }) => { const project = await this.brain.projects.findById(id); return { content: [ { type: 'text' as const, text: project ? JSON.stringify(project, null, 2) : `Project not found: ${id}`, }, ], }; }, ); // ─── Brain: Task tools ─────────────────────────────────────────────── server.registerTool( 'brain_list_tasks', { description: 'List tasks, optionally filtered by project, mission, or status.', inputSchema: z.object({ projectId: z.string().optional().describe('Filter by project ID'), missionId: z.string().optional().describe('Filter by mission ID'), status: z.string().optional().describe('Filter by status'), }), }, async ({ projectId, missionId, status }) => { type TaskStatus = 'not-started' | 'in-progress' | 'blocked' | 'done' | 'cancelled'; let tasks; if (projectId) tasks = await this.brain.tasks.findByProject(projectId); else if (missionId) tasks = await this.brain.tasks.findByMission(missionId); else if (status) tasks = await this.brain.tasks.findByStatus(status as TaskStatus); else tasks = await this.brain.tasks.findAll(); return { content: [{ type: 'text' as const, text: JSON.stringify(tasks, null, 2) }] }; }, ); server.registerTool( 'brain_create_task', { description: 'Create a new task in the brain.', inputSchema: z.object({ title: z.string().describe('Task title'), description: z.string().optional().describe('Task description'), projectId: z.string().optional().describe('Project ID'), missionId: z.string().optional().describe('Mission ID'), priority: z.string().optional().describe('Priority: low, medium, high, critical'), }), }, async (params) => { type Priority = 'low' | 'medium' | 'high' | 'critical'; const task = await this.brain.tasks.create({ ...params, priority: params.priority as Priority | undefined, }); return { content: [{ type: 'text' as const, text: JSON.stringify(task, null, 2) }] }; }, ); server.registerTool( 'brain_update_task', { description: 'Update an existing task.', inputSchema: z.object({ id: z.string().describe('Task ID'), title: z.string().optional(), description: z.string().optional(), status: z .string() .optional() .describe('not-started, in-progress, blocked, done, cancelled'), priority: z.string().optional(), }), }, async ({ id, ...updates }) => { type TaskStatus = 'not-started' | 'in-progress' | 'blocked' | 'done' | 'cancelled'; type Priority = 'low' | 'medium' | 'high' | 'critical'; const task = await this.brain.tasks.update(id, { ...updates, status: updates.status as TaskStatus | undefined, priority: updates.priority as Priority | undefined, }); return { content: [ { type: 'text' as const, text: task ? JSON.stringify(task, null, 2) : `Task not found: ${id}`, }, ], }; }, ); // ─── Brain: Mission tools ──────────────────────────────────────────── server.registerTool( 'brain_list_missions', { description: 'List all missions, optionally filtered by project.', inputSchema: z.object({ projectId: z.string().optional().describe('Filter by project ID'), }), }, async ({ projectId }) => { const missions = projectId ? await this.brain.missions.findByProject(projectId) : await this.brain.missions.findAll(); return { content: [{ type: 'text' as const, text: JSON.stringify(missions, null, 2) }] }; }, ); server.registerTool( 'brain_list_conversations', { description: 'List conversations for a user.', inputSchema: z.object({ userId: z.string().describe('User ID'), }), }, async ({ userId }) => { const conversations = await this.brain.conversations.findAll(userId); return { content: [{ type: 'text' as const, text: JSON.stringify(conversations, null, 2) }], }; }, ); // ─── Memory tools ──────────────────────────────────────────────────── server.registerTool( 'memory_search', { description: 'Search across stored insights and knowledge using natural language. Returns semantically similar results.', inputSchema: z.object({ userId: z.string().describe('User ID to search memory for'), query: z.string().describe('Natural language search query'), limit: z.number().optional().describe('Max results (default 5)'), }), }, async ({ userId, query, limit }) => { if (!this.embeddings.available) { return { content: [ { type: 'text' as const, text: 'Semantic search unavailable — no embedding provider configured', }, ], }; } const embedding = await this.embeddings.embed(query); const results = await this.memory.insights.searchByEmbedding(userId, embedding, limit ?? 5); return { content: [{ type: 'text' as const, text: JSON.stringify(results, null, 2) }] }; }, ); server.registerTool( 'memory_get_preferences', { description: 'Retrieve stored preferences for a user.', inputSchema: z.object({ userId: z.string().describe('User ID'), category: z .string() .optional() .describe('Filter by category: communication, coding, workflow, appearance, general'), }), }, async ({ userId, category }) => { type Cat = 'communication' | 'coding' | 'workflow' | 'appearance' | 'general'; const prefs = category ? await this.memory.preferences.findByUserAndCategory(userId, category as Cat) : await this.memory.preferences.findByUser(userId); return { content: [{ type: 'text' as const, text: JSON.stringify(prefs, null, 2) }] }; }, ); server.registerTool( 'memory_save_preference', { description: 'Store a learned user preference (e.g., "prefers tables over paragraphs", "timezone: America/Chicago").', inputSchema: z.object({ userId: z.string().describe('User ID'), key: z.string().describe('Preference key'), value: z.string().describe('Preference value (JSON string)'), category: z .string() .optional() .describe('Category: communication, coding, workflow, appearance, general'), }), }, async ({ userId, key, value, category }) => { type Cat = 'communication' | 'coding' | 'workflow' | 'appearance' | 'general'; let parsedValue: unknown; try { parsedValue = JSON.parse(value); } catch { parsedValue = value; } const pref = await this.memory.preferences.upsert({ userId, key, value: parsedValue, category: (category as Cat) ?? 'general', source: 'agent', }); return { content: [{ type: 'text' as const, text: JSON.stringify(pref, null, 2) }] }; }, ); server.registerTool( 'memory_save_insight', { description: 'Store a learned insight, decision, or knowledge extracted from the current interaction.', inputSchema: z.object({ userId: z.string().describe('User ID'), content: z.string().describe('The insight or knowledge to store'), category: z .string() .optional() .describe('Category: decision, learning, preference, fact, pattern, general'), }), }, async ({ userId, content, category }) => { type Cat = 'decision' | 'learning' | 'preference' | 'fact' | 'pattern' | 'general'; const embedding = this.embeddings.available ? await this.embeddings.embed(content) : null; const insight = await this.memory.insights.create({ userId, content, embedding, source: 'agent', category: (category as Cat) ?? 'learning', }); return { content: [{ type: 'text' as const, text: JSON.stringify(insight, null, 2) }] }; }, ); // ─── Coord tools ───────────────────────────────────────────────────── server.registerTool( 'coord_mission_status', { description: 'Get the current orchestration mission status including milestones, tasks, and active session.', inputSchema: z.object({ projectPath: z .string() .optional() .describe('Project path. Defaults to gateway working directory.'), }), }, async ({ projectPath }) => { const resolvedPath = projectPath ?? process.cwd(); const status = await this.coordService.getMissionStatus(resolvedPath); return { content: [ { type: 'text' as const, text: status ? JSON.stringify(status, null, 2) : 'No active coord mission found.', }, ], }; }, ); server.registerTool( 'coord_list_tasks', { description: 'List all tasks from the orchestration TASKS.md file.', inputSchema: z.object({ projectPath: z .string() .optional() .describe('Project path. Defaults to gateway working directory.'), }), }, async ({ projectPath }) => { const resolvedPath = projectPath ?? process.cwd(); const tasks = await this.coordService.listTasks(resolvedPath); return { content: [{ type: 'text' as const, text: JSON.stringify(tasks, null, 2) }] }; }, ); server.registerTool( 'coord_task_detail', { description: 'Get detailed status for a specific orchestration task.', inputSchema: z.object({ taskId: z.string().describe('Task ID (e.g. P2-005)'), projectPath: z .string() .optional() .describe('Project path. Defaults to gateway working directory.'), }), }, async ({ taskId, projectPath }) => { const resolvedPath = projectPath ?? process.cwd(); const detail = await this.coordService.getTaskStatus(resolvedPath, taskId); return { content: [ { type: 'text' as const, text: detail ? JSON.stringify(detail, null, 2) : `Task ${taskId} not found in coord mission.`, }, ], }; }, ); } async onModuleDestroy(): Promise { this.logger.log(`Closing ${this.sessions.size} MCP sessions on shutdown`); const closePromises = Array.from(this.sessions.values()).map(({ transport }) => transport.close().catch((err: unknown) => { this.logger.warn( `Error closing MCP transport: ${err instanceof Error ? err.message : String(err)}`, ); }), ); await Promise.all(closePromises); this.sessions.clear(); } }