feat(gateway): add MCP server endpoint with streamable HTTP transport #137
@@ -15,6 +15,7 @@
|
|||||||
"@fastify/helmet": "^13.0.2",
|
"@fastify/helmet": "^13.0.2",
|
||||||
"@mariozechner/pi-ai": "~0.57.1",
|
"@mariozechner/pi-ai": "~0.57.1",
|
||||||
"@mariozechner/pi-coding-agent": "~0.57.1",
|
"@mariozechner/pi-coding-agent": "~0.57.1",
|
||||||
|
"@modelcontextprotocol/sdk": "^1.27.1",
|
||||||
"@mosaic/auth": "workspace:^",
|
"@mosaic/auth": "workspace:^",
|
||||||
"@mosaic/brain": "workspace:^",
|
"@mosaic/brain": "workspace:^",
|
||||||
"@mosaic/coord": "workspace:^",
|
"@mosaic/coord": "workspace:^",
|
||||||
@@ -47,7 +48,8 @@
|
|||||||
"reflect-metadata": "^0.2.0",
|
"reflect-metadata": "^0.2.0",
|
||||||
"rxjs": "^7.8.0",
|
"rxjs": "^7.8.0",
|
||||||
"socket.io": "^4.8.0",
|
"socket.io": "^4.8.0",
|
||||||
"uuid": "^11.0.0"
|
"uuid": "^11.0.0",
|
||||||
|
"zod": "^4.3.6"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@types/node": "^22.0.0",
|
"@types/node": "^22.0.0",
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import { MemoryModule } from './memory/memory.module.js';
|
|||||||
import { LogModule } from './log/log.module.js';
|
import { LogModule } from './log/log.module.js';
|
||||||
import { SkillsModule } from './skills/skills.module.js';
|
import { SkillsModule } from './skills/skills.module.js';
|
||||||
import { PluginModule } from './plugin/plugin.module.js';
|
import { PluginModule } from './plugin/plugin.module.js';
|
||||||
|
import { McpModule } from './mcp/mcp.module.js';
|
||||||
import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
|
import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
|
||||||
|
|
||||||
@Module({
|
@Module({
|
||||||
@@ -34,6 +35,7 @@ import { ThrottlerGuard, ThrottlerModule } from '@nestjs/throttler';
|
|||||||
LogModule,
|
LogModule,
|
||||||
SkillsModule,
|
SkillsModule,
|
||||||
PluginModule,
|
PluginModule,
|
||||||
|
McpModule,
|
||||||
],
|
],
|
||||||
controllers: [HealthController],
|
controllers: [HealthController],
|
||||||
providers: [
|
providers: [
|
||||||
|
|||||||
@@ -13,6 +13,8 @@ import { FastifyAdapter, type NestFastifyApplication } from '@nestjs/platform-fa
|
|||||||
import helmet from '@fastify/helmet';
|
import helmet from '@fastify/helmet';
|
||||||
import { AppModule } from './app.module.js';
|
import { AppModule } from './app.module.js';
|
||||||
import { mountAuthHandler } from './auth/auth.controller.js';
|
import { mountAuthHandler } from './auth/auth.controller.js';
|
||||||
|
import { mountMcpHandler } from './mcp/mcp.controller.js';
|
||||||
|
import { McpService } from './mcp/mcp.service.js';
|
||||||
|
|
||||||
async function bootstrap(): Promise<void> {
|
async function bootstrap(): Promise<void> {
|
||||||
const logger = new Logger('Bootstrap');
|
const logger = new Logger('Bootstrap');
|
||||||
@@ -50,6 +52,7 @@ async function bootstrap(): Promise<void> {
|
|||||||
);
|
);
|
||||||
|
|
||||||
mountAuthHandler(app);
|
mountAuthHandler(app);
|
||||||
|
mountMcpHandler(app, app.get(McpService));
|
||||||
|
|
||||||
const port = Number(process.env['GATEWAY_PORT'] ?? 4000);
|
const port = Number(process.env['GATEWAY_PORT'] ?? 4000);
|
||||||
await app.listen(port, '0.0.0.0');
|
await app.listen(port, '0.0.0.0');
|
||||||
|
|||||||
142
apps/gateway/src/mcp/mcp.controller.ts
Normal file
142
apps/gateway/src/mcp/mcp.controller.ts
Normal file
@@ -0,0 +1,142 @@
|
|||||||
|
import type { IncomingMessage, ServerResponse } from 'node:http';
|
||||||
|
import { Logger } from '@nestjs/common';
|
||||||
|
import { fromNodeHeaders } from 'better-auth/node';
|
||||||
|
import type { Auth } from '@mosaic/auth';
|
||||||
|
import type { NestFastifyApplication } from '@nestjs/platform-fastify';
|
||||||
|
import type { McpService } from './mcp.service.js';
|
||||||
|
import { AUTH } from '../auth/auth.tokens.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mounts the MCP streamable HTTP transport endpoint at /mcp on the Fastify instance.
|
||||||
|
*
|
||||||
|
* This follows the same low-level Fastify hook pattern used by the auth controller,
|
||||||
|
* bypassing NestJS routing to directly delegate to the MCP SDK transport handlers.
|
||||||
|
*
|
||||||
|
* Endpoint: POST /mcp (and GET /mcp for SSE stream reconnect)
|
||||||
|
* Auth: Requires a valid BetterAuth session (cookie or Authorization header).
|
||||||
|
* Session: Stateful — each initialized client gets a session ID via Mcp-Session-Id header.
|
||||||
|
*/
|
||||||
|
export function mountMcpHandler(app: NestFastifyApplication, mcpService: McpService): void {
|
||||||
|
const auth = app.get<Auth>(AUTH);
|
||||||
|
const logger = new Logger('McpController');
|
||||||
|
const fastify = app.getHttpAdapter().getInstance();
|
||||||
|
|
||||||
|
fastify.addHook(
|
||||||
|
'onRequest',
|
||||||
|
(
|
||||||
|
req: { raw: IncomingMessage; url: string; method: string },
|
||||||
|
reply: { raw: ServerResponse; hijack: () => void },
|
||||||
|
done: () => void,
|
||||||
|
) => {
|
||||||
|
if (!req.url.startsWith('/mcp')) {
|
||||||
|
done();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.hijack();
|
||||||
|
|
||||||
|
handleMcpRequest(req, reply, auth, mcpService, logger).catch((err: unknown) => {
|
||||||
|
logger.error(
|
||||||
|
`MCP request handler error: ${err instanceof Error ? err.message : String(err)}`,
|
||||||
|
);
|
||||||
|
if (!reply.raw.headersSent) {
|
||||||
|
reply.raw.writeHead(500, { 'Content-Type': 'application/json' });
|
||||||
|
}
|
||||||
|
if (!reply.raw.writableEnded) {
|
||||||
|
reply.raw.end(JSON.stringify({ error: 'Internal server error' }));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function handleMcpRequest(
|
||||||
|
req: { raw: IncomingMessage; url: string; method: string },
|
||||||
|
reply: { raw: ServerResponse; hijack: () => void },
|
||||||
|
auth: Auth,
|
||||||
|
mcpService: McpService,
|
||||||
|
logger: Logger,
|
||||||
|
): Promise<void> {
|
||||||
|
// ─── Authentication ─────────────────────────────────────────────────────
|
||||||
|
const headers = fromNodeHeaders(req.raw.headers);
|
||||||
|
const result = await auth.api.getSession({ headers });
|
||||||
|
|
||||||
|
if (!result) {
|
||||||
|
reply.raw.writeHead(401, { 'Content-Type': 'application/json' });
|
||||||
|
reply.raw.end(JSON.stringify({ error: 'Unauthorized: valid session required' }));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const userId = result.user.id;
|
||||||
|
|
||||||
|
// ─── Session routing ─────────────────────────────────────────────────────
|
||||||
|
const sessionId = req.raw.headers['mcp-session-id'];
|
||||||
|
|
||||||
|
if (typeof sessionId === 'string' && sessionId.length > 0) {
|
||||||
|
// Existing session request
|
||||||
|
const transport = mcpService.getSession(sessionId);
|
||||||
|
if (!transport) {
|
||||||
|
logger.warn(`MCP session not found: ${sessionId}`);
|
||||||
|
reply.raw.writeHead(404, { 'Content-Type': 'application/json' });
|
||||||
|
reply.raw.end(JSON.stringify({ error: 'Session not found' }));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
await transport.handleRequest(req.raw, reply.raw);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Initialize new session ───────────────────────────────────────────────
|
||||||
|
// Only POST requests can initialize a new session (must be initialize message)
|
||||||
|
if (req.method !== 'POST') {
|
||||||
|
reply.raw.writeHead(400, { 'Content-Type': 'application/json' });
|
||||||
|
reply.raw.end(
|
||||||
|
JSON.stringify({
|
||||||
|
error: 'New session must be established via POST with initialize message',
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse body to verify this is an initialize request before creating a session
|
||||||
|
let body: unknown;
|
||||||
|
try {
|
||||||
|
body = await readRequestBody(req.raw);
|
||||||
|
} catch (err) {
|
||||||
|
logger.warn(
|
||||||
|
`Failed to parse MCP request body: ${err instanceof Error ? err.message : String(err)}`,
|
||||||
|
);
|
||||||
|
reply.raw.writeHead(400, { 'Content-Type': 'application/json' });
|
||||||
|
reply.raw.end(JSON.stringify({ error: 'Invalid request body' }));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create new session and handle this initializing request
|
||||||
|
const { transport } = mcpService.createSession(userId);
|
||||||
|
logger.log(`New MCP session created for user ${userId}`);
|
||||||
|
|
||||||
|
await transport.handleRequest(req.raw, reply.raw, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads and parses the JSON body from a Node.js IncomingMessage.
|
||||||
|
*/
|
||||||
|
function readRequestBody(req: IncomingMessage): Promise<unknown> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const chunks: Buffer[] = [];
|
||||||
|
req.on('data', (chunk: Buffer) => chunks.push(chunk));
|
||||||
|
req.on('end', () => {
|
||||||
|
const raw = Buffer.concat(chunks).toString('utf8');
|
||||||
|
if (!raw) {
|
||||||
|
resolve(undefined);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
resolve(JSON.parse(raw));
|
||||||
|
} catch (err) {
|
||||||
|
reject(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
req.on('error', reject);
|
||||||
|
});
|
||||||
|
}
|
||||||
19
apps/gateway/src/mcp/mcp.dto.ts
Normal file
19
apps/gateway/src/mcp/mcp.dto.ts
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
/**
|
||||||
|
* MCP (Model Context Protocol) DTOs
|
||||||
|
*
|
||||||
|
* Defines the data transfer objects for the MCP streamable HTTP transport.
|
||||||
|
* See: https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http
|
||||||
|
*/
|
||||||
|
|
||||||
|
export interface McpToolDescriptor {
|
||||||
|
name: string;
|
||||||
|
description: string;
|
||||||
|
inputSchema: Record<string, unknown>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface McpServerInfo {
|
||||||
|
name: string;
|
||||||
|
version: string;
|
||||||
|
protocolVersion: string;
|
||||||
|
tools: McpToolDescriptor[];
|
||||||
|
}
|
||||||
10
apps/gateway/src/mcp/mcp.module.ts
Normal file
10
apps/gateway/src/mcp/mcp.module.ts
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
import { Module } from '@nestjs/common';
|
||||||
|
import { McpService } from './mcp.service.js';
|
||||||
|
import { CoordModule } from '../coord/coord.module.js';
|
||||||
|
|
||||||
|
@Module({
|
||||||
|
imports: [CoordModule],
|
||||||
|
providers: [McpService],
|
||||||
|
exports: [McpService],
|
||||||
|
})
|
||||||
|
export class McpModule {}
|
||||||
429
apps/gateway/src/mcp/mcp.service.ts
Normal file
429
apps/gateway/src/mcp/mcp.service.ts
Normal file
@@ -0,0 +1,429 @@
|
|||||||
|
import { Injectable, Logger, Inject, OnModuleDestroy } from '@nestjs/common';
|
||||||
|
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
|
||||||
|
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
|
||||||
|
import { randomUUID } from 'node:crypto';
|
||||||
|
import { z } from 'zod';
|
||||||
|
import type { Brain } from '@mosaic/brain';
|
||||||
|
import type { Memory } from '@mosaic/memory';
|
||||||
|
import { BRAIN } from '../brain/brain.tokens.js';
|
||||||
|
import { MEMORY } from '../memory/memory.tokens.js';
|
||||||
|
import { EmbeddingService } from '../memory/embedding.service.js';
|
||||||
|
import { CoordService } from '../coord/coord.service.js';
|
||||||
|
|
||||||
|
interface SessionEntry {
|
||||||
|
server: McpServer;
|
||||||
|
transport: StreamableHTTPServerTransport;
|
||||||
|
createdAt: Date;
|
||||||
|
userId: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Injectable()
|
||||||
|
export class McpService implements OnModuleDestroy {
|
||||||
|
private readonly logger = new Logger(McpService.name);
|
||||||
|
private readonly sessions = new Map<string, SessionEntry>();
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
@Inject(BRAIN) private readonly brain: Brain,
|
||||||
|
@Inject(MEMORY) private readonly memory: Memory,
|
||||||
|
@Inject(EmbeddingService) private readonly embeddings: EmbeddingService,
|
||||||
|
@Inject(CoordService) private readonly coordService: CoordService,
|
||||||
|
) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new MCP session with its own server + transport pair.
|
||||||
|
* Returns the transport for use by the controller.
|
||||||
|
*/
|
||||||
|
createSession(userId: string): { sessionId: string; transport: StreamableHTTPServerTransport } {
|
||||||
|
const sessionId = randomUUID();
|
||||||
|
|
||||||
|
const transport = new StreamableHTTPServerTransport({
|
||||||
|
sessionIdGenerator: () => sessionId,
|
||||||
|
onsessioninitialized: (id) => {
|
||||||
|
this.logger.log(`MCP session initialized: ${id} for user ${userId}`);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const server = new McpServer(
|
||||||
|
{ name: 'mosaic-gateway', version: '1.0.0' },
|
||||||
|
{ capabilities: { tools: {} } },
|
||||||
|
);
|
||||||
|
|
||||||
|
this.registerTools(server, userId);
|
||||||
|
|
||||||
|
transport.onclose = () => {
|
||||||
|
this.logger.log(`MCP session closed: ${sessionId}`);
|
||||||
|
this.sessions.delete(sessionId);
|
||||||
|
};
|
||||||
|
|
||||||
|
server.connect(transport).catch((err: unknown) => {
|
||||||
|
this.logger.error(
|
||||||
|
`MCP server connect error for session ${sessionId}: ${err instanceof Error ? err.message : String(err)}`,
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.sessions.set(sessionId, { server, transport, createdAt: new Date(), userId });
|
||||||
|
return { sessionId, transport };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the transport for an existing session, or null if not found.
|
||||||
|
*/
|
||||||
|
getSession(sessionId: string): StreamableHTTPServerTransport | null {
|
||||||
|
return this.sessions.get(sessionId)?.transport ?? null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Registers all platform tools on the given McpServer instance.
|
||||||
|
*/
|
||||||
|
private registerTools(server: McpServer, _userId: string): void {
|
||||||
|
// ─── Brain: Project tools ────────────────────────────────────────────
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'brain_list_projects',
|
||||||
|
{
|
||||||
|
description: 'List all projects in the brain.',
|
||||||
|
inputSchema: z.object({}),
|
||||||
|
},
|
||||||
|
async () => {
|
||||||
|
const projects = await this.brain.projects.findAll();
|
||||||
|
return {
|
||||||
|
content: [{ type: 'text' as const, text: JSON.stringify(projects, null, 2) }],
|
||||||
|
};
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'brain_get_project',
|
||||||
|
{
|
||||||
|
description: 'Get a project by ID.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
id: z.string().describe('Project ID (UUID)'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ id }) => {
|
||||||
|
const project = await this.brain.projects.findById(id);
|
||||||
|
return {
|
||||||
|
content: [
|
||||||
|
{
|
||||||
|
type: 'text' as const,
|
||||||
|
text: project ? JSON.stringify(project, null, 2) : `Project not found: ${id}`,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
// ─── Brain: Task tools ───────────────────────────────────────────────
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'brain_list_tasks',
|
||||||
|
{
|
||||||
|
description: 'List tasks, optionally filtered by project, mission, or status.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
projectId: z.string().optional().describe('Filter by project ID'),
|
||||||
|
missionId: z.string().optional().describe('Filter by mission ID'),
|
||||||
|
status: z.string().optional().describe('Filter by status'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ projectId, missionId, status }) => {
|
||||||
|
type TaskStatus = 'not-started' | 'in-progress' | 'blocked' | 'done' | 'cancelled';
|
||||||
|
let tasks;
|
||||||
|
if (projectId) tasks = await this.brain.tasks.findByProject(projectId);
|
||||||
|
else if (missionId) tasks = await this.brain.tasks.findByMission(missionId);
|
||||||
|
else if (status) tasks = await this.brain.tasks.findByStatus(status as TaskStatus);
|
||||||
|
else tasks = await this.brain.tasks.findAll();
|
||||||
|
return { content: [{ type: 'text' as const, text: JSON.stringify(tasks, null, 2) }] };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'brain_create_task',
|
||||||
|
{
|
||||||
|
description: 'Create a new task in the brain.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
title: z.string().describe('Task title'),
|
||||||
|
description: z.string().optional().describe('Task description'),
|
||||||
|
projectId: z.string().optional().describe('Project ID'),
|
||||||
|
missionId: z.string().optional().describe('Mission ID'),
|
||||||
|
priority: z.string().optional().describe('Priority: low, medium, high, critical'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async (params) => {
|
||||||
|
type Priority = 'low' | 'medium' | 'high' | 'critical';
|
||||||
|
const task = await this.brain.tasks.create({
|
||||||
|
...params,
|
||||||
|
priority: params.priority as Priority | undefined,
|
||||||
|
});
|
||||||
|
return { content: [{ type: 'text' as const, text: JSON.stringify(task, null, 2) }] };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'brain_update_task',
|
||||||
|
{
|
||||||
|
description: 'Update an existing task.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
id: z.string().describe('Task ID'),
|
||||||
|
title: z.string().optional(),
|
||||||
|
description: z.string().optional(),
|
||||||
|
status: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.describe('not-started, in-progress, blocked, done, cancelled'),
|
||||||
|
priority: z.string().optional(),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ id, ...updates }) => {
|
||||||
|
type TaskStatus = 'not-started' | 'in-progress' | 'blocked' | 'done' | 'cancelled';
|
||||||
|
type Priority = 'low' | 'medium' | 'high' | 'critical';
|
||||||
|
const task = await this.brain.tasks.update(id, {
|
||||||
|
...updates,
|
||||||
|
status: updates.status as TaskStatus | undefined,
|
||||||
|
priority: updates.priority as Priority | undefined,
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
content: [
|
||||||
|
{
|
||||||
|
type: 'text' as const,
|
||||||
|
text: task ? JSON.stringify(task, null, 2) : `Task not found: ${id}`,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
// ─── Brain: Mission tools ────────────────────────────────────────────
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'brain_list_missions',
|
||||||
|
{
|
||||||
|
description: 'List all missions, optionally filtered by project.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
projectId: z.string().optional().describe('Filter by project ID'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ projectId }) => {
|
||||||
|
const missions = projectId
|
||||||
|
? await this.brain.missions.findByProject(projectId)
|
||||||
|
: await this.brain.missions.findAll();
|
||||||
|
return { content: [{ type: 'text' as const, text: JSON.stringify(missions, null, 2) }] };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'brain_list_conversations',
|
||||||
|
{
|
||||||
|
description: 'List conversations for a user.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
userId: z.string().describe('User ID'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ userId }) => {
|
||||||
|
const conversations = await this.brain.conversations.findAll(userId);
|
||||||
|
return {
|
||||||
|
content: [{ type: 'text' as const, text: JSON.stringify(conversations, null, 2) }],
|
||||||
|
};
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
// ─── Memory tools ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'memory_search',
|
||||||
|
{
|
||||||
|
description:
|
||||||
|
'Search across stored insights and knowledge using natural language. Returns semantically similar results.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
userId: z.string().describe('User ID to search memory for'),
|
||||||
|
query: z.string().describe('Natural language search query'),
|
||||||
|
limit: z.number().optional().describe('Max results (default 5)'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ userId, query, limit }) => {
|
||||||
|
if (!this.embeddings.available) {
|
||||||
|
return {
|
||||||
|
content: [
|
||||||
|
{
|
||||||
|
type: 'text' as const,
|
||||||
|
text: 'Semantic search unavailable — no embedding provider configured',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
const embedding = await this.embeddings.embed(query);
|
||||||
|
const results = await this.memory.insights.searchByEmbedding(userId, embedding, limit ?? 5);
|
||||||
|
return { content: [{ type: 'text' as const, text: JSON.stringify(results, null, 2) }] };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'memory_get_preferences',
|
||||||
|
{
|
||||||
|
description: 'Retrieve stored preferences for a user.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
userId: z.string().describe('User ID'),
|
||||||
|
category: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.describe('Filter by category: communication, coding, workflow, appearance, general'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ userId, category }) => {
|
||||||
|
type Cat = 'communication' | 'coding' | 'workflow' | 'appearance' | 'general';
|
||||||
|
const prefs = category
|
||||||
|
? await this.memory.preferences.findByUserAndCategory(userId, category as Cat)
|
||||||
|
: await this.memory.preferences.findByUser(userId);
|
||||||
|
return { content: [{ type: 'text' as const, text: JSON.stringify(prefs, null, 2) }] };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'memory_save_preference',
|
||||||
|
{
|
||||||
|
description:
|
||||||
|
'Store a learned user preference (e.g., "prefers tables over paragraphs", "timezone: America/Chicago").',
|
||||||
|
inputSchema: z.object({
|
||||||
|
userId: z.string().describe('User ID'),
|
||||||
|
key: z.string().describe('Preference key'),
|
||||||
|
value: z.string().describe('Preference value (JSON string)'),
|
||||||
|
category: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.describe('Category: communication, coding, workflow, appearance, general'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ userId, key, value, category }) => {
|
||||||
|
type Cat = 'communication' | 'coding' | 'workflow' | 'appearance' | 'general';
|
||||||
|
let parsedValue: unknown;
|
||||||
|
try {
|
||||||
|
parsedValue = JSON.parse(value);
|
||||||
|
} catch {
|
||||||
|
parsedValue = value;
|
||||||
|
}
|
||||||
|
const pref = await this.memory.preferences.upsert({
|
||||||
|
userId,
|
||||||
|
key,
|
||||||
|
value: parsedValue,
|
||||||
|
category: (category as Cat) ?? 'general',
|
||||||
|
source: 'agent',
|
||||||
|
});
|
||||||
|
return { content: [{ type: 'text' as const, text: JSON.stringify(pref, null, 2) }] };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'memory_save_insight',
|
||||||
|
{
|
||||||
|
description:
|
||||||
|
'Store a learned insight, decision, or knowledge extracted from the current interaction.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
userId: z.string().describe('User ID'),
|
||||||
|
content: z.string().describe('The insight or knowledge to store'),
|
||||||
|
category: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.describe('Category: decision, learning, preference, fact, pattern, general'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ userId, content, category }) => {
|
||||||
|
type Cat = 'decision' | 'learning' | 'preference' | 'fact' | 'pattern' | 'general';
|
||||||
|
const embedding = this.embeddings.available ? await this.embeddings.embed(content) : null;
|
||||||
|
const insight = await this.memory.insights.create({
|
||||||
|
userId,
|
||||||
|
content,
|
||||||
|
embedding,
|
||||||
|
source: 'agent',
|
||||||
|
category: (category as Cat) ?? 'learning',
|
||||||
|
});
|
||||||
|
return { content: [{ type: 'text' as const, text: JSON.stringify(insight, null, 2) }] };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
// ─── Coord tools ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'coord_mission_status',
|
||||||
|
{
|
||||||
|
description:
|
||||||
|
'Get the current orchestration mission status including milestones, tasks, and active session.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
projectPath: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.describe('Project path. Defaults to gateway working directory.'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ projectPath }) => {
|
||||||
|
const resolvedPath = projectPath ?? process.cwd();
|
||||||
|
const status = await this.coordService.getMissionStatus(resolvedPath);
|
||||||
|
return {
|
||||||
|
content: [
|
||||||
|
{
|
||||||
|
type: 'text' as const,
|
||||||
|
text: status ? JSON.stringify(status, null, 2) : 'No active coord mission found.',
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'coord_list_tasks',
|
||||||
|
{
|
||||||
|
description: 'List all tasks from the orchestration TASKS.md file.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
projectPath: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.describe('Project path. Defaults to gateway working directory.'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ projectPath }) => {
|
||||||
|
const resolvedPath = projectPath ?? process.cwd();
|
||||||
|
const tasks = await this.coordService.listTasks(resolvedPath);
|
||||||
|
return { content: [{ type: 'text' as const, text: JSON.stringify(tasks, null, 2) }] };
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
server.registerTool(
|
||||||
|
'coord_task_detail',
|
||||||
|
{
|
||||||
|
description: 'Get detailed status for a specific orchestration task.',
|
||||||
|
inputSchema: z.object({
|
||||||
|
taskId: z.string().describe('Task ID (e.g. P2-005)'),
|
||||||
|
projectPath: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.describe('Project path. Defaults to gateway working directory.'),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
async ({ taskId, projectPath }) => {
|
||||||
|
const resolvedPath = projectPath ?? process.cwd();
|
||||||
|
const detail = await this.coordService.getTaskStatus(resolvedPath, taskId);
|
||||||
|
return {
|
||||||
|
content: [
|
||||||
|
{
|
||||||
|
type: 'text' as const,
|
||||||
|
text: detail
|
||||||
|
? JSON.stringify(detail, null, 2)
|
||||||
|
: `Task ${taskId} not found in coord mission.`,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
async onModuleDestroy(): Promise<void> {
|
||||||
|
this.logger.log(`Closing ${this.sessions.size} MCP sessions on shutdown`);
|
||||||
|
const closePromises = Array.from(this.sessions.values()).map(({ transport }) =>
|
||||||
|
transport.close().catch((err: unknown) => {
|
||||||
|
this.logger.warn(
|
||||||
|
`Error closing MCP transport: ${err instanceof Error ? err.message : String(err)}`,
|
||||||
|
);
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
await Promise.all(closePromises);
|
||||||
|
this.sessions.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
1
apps/gateway/src/mcp/mcp.tokens.ts
Normal file
1
apps/gateway/src/mcp/mcp.tokens.ts
Normal file
@@ -0,0 +1 @@
|
|||||||
|
export const MCP_SERVICE = 'MCP_SERVICE';
|
||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import { useCallback, useEffect, useRef, useState } from 'react';
|
import { useCallback, useEffect, useRef, useState } from 'react';
|
||||||
import { api } from '@/lib/api';
|
import { api } from '@/lib/api';
|
||||||
import { getSocket } from '@/lib/socket';
|
import { destroySocket, getSocket } from '@/lib/socket';
|
||||||
import type { Conversation, Message } from '@/lib/types';
|
import type { Conversation, Message } from '@/lib/types';
|
||||||
import { ConversationList } from '@/components/chat/conversation-list';
|
import { ConversationList } from '@/components/chat/conversation-list';
|
||||||
import { MessageBubble } from '@/components/chat/message-bubble';
|
import { MessageBubble } from '@/components/chat/message-bubble';
|
||||||
@@ -17,6 +17,15 @@ export default function ChatPage(): React.ReactElement {
|
|||||||
const [isStreaming, setIsStreaming] = useState(false);
|
const [isStreaming, setIsStreaming] = useState(false);
|
||||||
const messagesEndRef = useRef<HTMLDivElement>(null);
|
const messagesEndRef = useRef<HTMLDivElement>(null);
|
||||||
|
|
||||||
|
// Track the active conversation ID in a ref so socket event handlers always
|
||||||
|
// see the current value without needing to be re-registered.
|
||||||
|
const activeIdRef = useRef<string | null>(null);
|
||||||
|
activeIdRef.current = activeId;
|
||||||
|
|
||||||
|
// Accumulate streamed text in a ref so agent:end can read the full content
|
||||||
|
// without stale-closure issues.
|
||||||
|
const streamingTextRef = useRef('');
|
||||||
|
|
||||||
// Load conversations on mount
|
// Load conversations on mount
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
api<Conversation[]>('/api/conversations')
|
api<Conversation[]>('/api/conversations')
|
||||||
@@ -30,6 +39,10 @@ export default function ChatPage(): React.ReactElement {
|
|||||||
setMessages([]);
|
setMessages([]);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// Clear streaming state when switching conversations
|
||||||
|
setIsStreaming(false);
|
||||||
|
setStreamingText('');
|
||||||
|
streamingTextRef.current = '';
|
||||||
api<Message[]>(`/api/conversations/${activeId}/messages`)
|
api<Message[]>(`/api/conversations/${activeId}/messages`)
|
||||||
.then(setMessages)
|
.then(setMessages)
|
||||||
.catch(() => {});
|
.catch(() => {});
|
||||||
@@ -40,50 +53,81 @@ export default function ChatPage(): React.ReactElement {
|
|||||||
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
|
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
|
||||||
}, [messages, streamingText]);
|
}, [messages, streamingText]);
|
||||||
|
|
||||||
// Socket.io setup
|
// Socket.io setup — connect once for the page lifetime
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
const socket = getSocket();
|
const socket = getSocket();
|
||||||
socket.connect();
|
|
||||||
|
|
||||||
socket.on('agent:text', (data: { conversationId: string; text: string }) => {
|
function onAgentStart(data: { conversationId: string }): void {
|
||||||
setStreamingText((prev) => prev + data.text);
|
// Only update state if the event belongs to the currently viewed conversation
|
||||||
});
|
if (activeIdRef.current !== data.conversationId) return;
|
||||||
|
|
||||||
socket.on('agent:start', () => {
|
|
||||||
setIsStreaming(true);
|
setIsStreaming(true);
|
||||||
setStreamingText('');
|
setStreamingText('');
|
||||||
});
|
streamingTextRef.current = '';
|
||||||
|
}
|
||||||
|
|
||||||
socket.on('agent:end', (data: { conversationId: string }) => {
|
function onAgentText(data: { conversationId: string; text: string }): void {
|
||||||
|
if (activeIdRef.current !== data.conversationId) return;
|
||||||
|
streamingTextRef.current += data.text;
|
||||||
|
setStreamingText((prev) => prev + data.text);
|
||||||
|
}
|
||||||
|
|
||||||
|
function onAgentEnd(data: { conversationId: string }): void {
|
||||||
|
if (activeIdRef.current !== data.conversationId) return;
|
||||||
|
const finalText = streamingTextRef.current;
|
||||||
setIsStreaming(false);
|
setIsStreaming(false);
|
||||||
setStreamingText('');
|
setStreamingText('');
|
||||||
// Reload messages to get the final persisted version
|
streamingTextRef.current = '';
|
||||||
api<Message[]>(`/api/conversations/${data.conversationId}/messages`)
|
// Append the completed assistant message to the local message list.
|
||||||
.then(setMessages)
|
// The Pi agent session is in-memory so the assistant response is not
|
||||||
.catch(() => {});
|
// persisted to the DB — we build the local UI state instead.
|
||||||
});
|
if (finalText) {
|
||||||
|
setMessages((prev) => [
|
||||||
|
...prev,
|
||||||
|
{
|
||||||
|
id: `assistant-${Date.now()}`,
|
||||||
|
conversationId: data.conversationId,
|
||||||
|
role: 'assistant' as const,
|
||||||
|
content: finalText,
|
||||||
|
createdAt: new Date().toISOString(),
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
socket.on('error', (data: { error: string }) => {
|
function onError(data: { error: string; conversationId?: string }): void {
|
||||||
setIsStreaming(false);
|
setIsStreaming(false);
|
||||||
setStreamingText('');
|
setStreamingText('');
|
||||||
|
streamingTextRef.current = '';
|
||||||
setMessages((prev) => [
|
setMessages((prev) => [
|
||||||
...prev,
|
...prev,
|
||||||
{
|
{
|
||||||
id: `error-${Date.now()}`,
|
id: `error-${Date.now()}`,
|
||||||
conversationId: '',
|
conversationId: data.conversationId ?? '',
|
||||||
role: 'system',
|
role: 'system' as const,
|
||||||
content: `Error: ${data.error}`,
|
content: `Error: ${data.error}`,
|
||||||
createdAt: new Date().toISOString(),
|
createdAt: new Date().toISOString(),
|
||||||
},
|
},
|
||||||
]);
|
]);
|
||||||
});
|
}
|
||||||
|
|
||||||
|
socket.on('agent:start', onAgentStart);
|
||||||
|
socket.on('agent:text', onAgentText);
|
||||||
|
socket.on('agent:end', onAgentEnd);
|
||||||
|
socket.on('error', onError);
|
||||||
|
|
||||||
|
// Connect if not already connected
|
||||||
|
if (!socket.connected) {
|
||||||
|
socket.connect();
|
||||||
|
}
|
||||||
|
|
||||||
return () => {
|
return () => {
|
||||||
socket.off('agent:text');
|
socket.off('agent:start', onAgentStart);
|
||||||
socket.off('agent:start');
|
socket.off('agent:text', onAgentText);
|
||||||
socket.off('agent:end');
|
socket.off('agent:end', onAgentEnd);
|
||||||
socket.off('error');
|
socket.off('error', onError);
|
||||||
socket.disconnect();
|
// Fully tear down the socket when the chat page unmounts so we get a
|
||||||
|
// fresh authenticated connection next time the page is visited.
|
||||||
|
destroySocket();
|
||||||
};
|
};
|
||||||
}, []);
|
}, []);
|
||||||
|
|
||||||
@@ -112,24 +156,34 @@ export default function ChatPage(): React.ReactElement {
|
|||||||
convId = conv.id;
|
convId = conv.id;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Optimistic user message
|
// Optimistic user message in local UI state
|
||||||
const userMsg: Message = {
|
setMessages((prev) => [
|
||||||
id: `temp-${Date.now()}`,
|
...prev,
|
||||||
|
{
|
||||||
|
id: `user-${Date.now()}`,
|
||||||
conversationId: convId,
|
conversationId: convId,
|
||||||
role: 'user',
|
role: 'user' as const,
|
||||||
content,
|
content,
|
||||||
createdAt: new Date().toISOString(),
|
createdAt: new Date().toISOString(),
|
||||||
};
|
},
|
||||||
setMessages((prev) => [...prev, userMsg]);
|
]);
|
||||||
|
|
||||||
// Persist user message
|
// Persist the user message to the DB so conversation history is
|
||||||
await api<Message>(`/api/conversations/${convId}/messages`, {
|
// available when the page is reloaded or a new session starts.
|
||||||
|
api<Message>(`/api/conversations/${convId}/messages`, {
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
body: { role: 'user', content },
|
body: { role: 'user', content },
|
||||||
|
}).catch(() => {
|
||||||
|
// Non-fatal: the agent can still process the message even if
|
||||||
|
// REST persistence fails.
|
||||||
});
|
});
|
||||||
|
|
||||||
// Send to WebSocket for streaming response
|
// Send to WebSocket — gateway creates/resumes the agent session and
|
||||||
|
// streams the response back via agent:start / agent:text / agent:end.
|
||||||
const socket = getSocket();
|
const socket = getSocket();
|
||||||
|
if (!socket.connected) {
|
||||||
|
socket.connect();
|
||||||
|
}
|
||||||
socket.emit('message', { conversationId: convId, content });
|
socket.emit('message', { conversationId: convId, content });
|
||||||
},
|
},
|
||||||
[activeId],
|
[activeId],
|
||||||
|
|||||||
@@ -5,16 +5,22 @@ interface StreamingMessageProps {
|
|||||||
text: string;
|
text: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function StreamingMessage({ text }: StreamingMessageProps): React.ReactElement | null {
|
export function StreamingMessage({ text }: StreamingMessageProps): React.ReactElement {
|
||||||
if (!text) return null;
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<div className="flex justify-start">
|
<div className="flex justify-start">
|
||||||
<div className="max-w-[75%] rounded-xl border border-surface-border bg-surface-elevated px-4 py-3 text-sm text-text-primary">
|
<div className="max-w-[75%] rounded-xl border border-surface-border bg-surface-elevated px-4 py-3 text-sm text-text-primary">
|
||||||
|
{text ? (
|
||||||
<div className="whitespace-pre-wrap break-words">{text}</div>
|
<div className="whitespace-pre-wrap break-words">{text}</div>
|
||||||
<div className="mt-1 flex items-center gap-1 text-xs text-text-muted">
|
) : (
|
||||||
|
<div className="flex items-center gap-2 text-text-muted">
|
||||||
<span className="inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500" />
|
<span className="inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500" />
|
||||||
Thinking...
|
<span className="inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500 [animation-delay:0.2s]" />
|
||||||
|
<span className="inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500 [animation-delay:0.4s]" />
|
||||||
|
</div>
|
||||||
|
)}
|
||||||
|
<div className="mt-1 flex items-center gap-1 text-xs text-text-muted">
|
||||||
|
<span className="inline-block h-1.5 w-1.5 animate-pulse rounded-full bg-blue-500" />
|
||||||
|
{text ? 'Responding...' : 'Thinking...'}
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|||||||
@@ -9,7 +9,24 @@ export function getSocket(): Socket {
|
|||||||
socket = io(`${GATEWAY_URL}/chat`, {
|
socket = io(`${GATEWAY_URL}/chat`, {
|
||||||
withCredentials: true,
|
withCredentials: true,
|
||||||
autoConnect: false,
|
autoConnect: false,
|
||||||
|
transports: ['websocket', 'polling'],
|
||||||
|
});
|
||||||
|
|
||||||
|
// Reset singleton reference when socket is fully closed so the next
|
||||||
|
// getSocket() call creates a fresh instance instead of returning a
|
||||||
|
// closed/dead socket.
|
||||||
|
socket.on('disconnect', () => {
|
||||||
|
socket = null;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return socket;
|
return socket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Tear down the singleton socket and reset the reference. */
|
||||||
|
export function destroySocket(): void {
|
||||||
|
if (socket) {
|
||||||
|
socket.offAny();
|
||||||
|
socket.disconnect();
|
||||||
|
socket = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
597
pnpm-lock.yaml
generated
597
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user