feat: communication spine — gateway, TUI, Discord #61

Merged
jason.woltje merged 2 commits from feat/communication-spine into main 2026-03-13 01:33:32 +00:00
14 changed files with 5009 additions and 10 deletions
Showing only changes of commit 98380e610d - Show all commits

View File

@@ -5,7 +5,7 @@
"main": "dist/main.js",
"scripts": {
"build": "tsc",
"dev": "tsc --watch",
"dev": "tsx watch src/main.ts",
"lint": "eslint src",
"typecheck": "tsc --noEmit",
"test": "vitest run"
@@ -14,12 +14,19 @@
"@nestjs/common": "^11.0.0",
"@nestjs/core": "^11.0.0",
"@nestjs/platform-fastify": "^11.0.0",
"@nestjs/platform-socket.io": "^11.0.0",
"@nestjs/websockets": "^11.0.0",
"@mariozechner/pi-coding-agent": "~0.57.1",
"fastify": "^5.0.0",
"reflect-metadata": "^0.2.0",
"rxjs": "^7.8.0"
"rxjs": "^7.8.0",
"socket.io": "^4.8.0",
"uuid": "^11.0.0"
},
"devDependencies": {
"@types/node": "^22.0.0",
"@types/uuid": "^10.0.0",
"tsx": "^4.0.0",
"typescript": "^5.8.0",
"vitest": "^2.0.0"
}

View File

@@ -0,0 +1,9 @@
import { Global, Module } from '@nestjs/common';
import { AgentService } from './agent.service.js';
@Global()
@Module({
providers: [AgentService],
exports: [AgentService],
})
export class AgentModule {}

View File

@@ -0,0 +1,94 @@
import { Injectable, Logger, type OnModuleDestroy } from '@nestjs/common';
import {
createAgentSession,
SessionManager,
type AgentSession as PiAgentSession,
type AgentSessionEvent,
} from '@mariozechner/pi-coding-agent';
export interface AgentSession {
id: string;
piSession: PiAgentSession;
listeners: Set<(event: AgentSessionEvent) => void>;
unsubscribe: () => void;
}
@Injectable()
export class AgentService implements OnModuleDestroy {
private readonly logger = new Logger(AgentService.name);
private readonly sessions = new Map<string, AgentSession>();
async createSession(sessionId: string): Promise<AgentSession> {
if (this.sessions.has(sessionId)) {
return this.sessions.get(sessionId)!;
}
this.logger.log(`Creating agent session: ${sessionId}`);
const { session: piSession } = await createAgentSession({
sessionManager: SessionManager.inMemory(),
tools: [],
});
const listeners = new Set<(event: AgentSessionEvent) => void>();
const unsubscribe = piSession.subscribe((event) => {
for (const listener of listeners) {
try {
listener(event);
} catch (err) {
this.logger.error(`Event listener error in session ${sessionId}`, err);
}
}
});
const session: AgentSession = {
id: sessionId,
piSession,
listeners,
unsubscribe,
};
this.sessions.set(sessionId, session);
this.logger.log(`Agent session ${sessionId} ready`);
return session;
}
getSession(sessionId: string): AgentSession | undefined {
return this.sessions.get(sessionId);
}
async prompt(sessionId: string, message: string): Promise<void> {
const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`No agent session found: ${sessionId}`);
}
await session.piSession.prompt(message);
}
onEvent(sessionId: string, listener: (event: AgentSessionEvent) => void): () => void {
const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`No agent session found: ${sessionId}`);
}
session.listeners.add(listener);
return () => session.listeners.delete(listener);
}
async destroySession(sessionId: string): Promise<void> {
const session = this.sessions.get(sessionId);
if (session) {
this.logger.log(`Destroying agent session ${sessionId}`);
session.unsubscribe();
session.listeners.clear();
this.sessions.delete(sessionId);
}
}
async onModuleDestroy(): Promise<void> {
this.logger.log('Shutting down all agent sessions');
const stops = Array.from(this.sessions.keys()).map((id) => this.destroySession(id));
await Promise.allSettled(stops);
}
}

View File

@@ -1,7 +1,10 @@
import { Module } from '@nestjs/common';
import { HealthController } from './health/health.controller.js';
import { AgentModule } from './agent/agent.module.js';
import { ChatModule } from './chat/chat.module.js';
@Module({
imports: [AgentModule, ChatModule],
controllers: [HealthController],
})
export class AppModule {}

View File

@@ -0,0 +1,55 @@
import { Controller, Post, Body, Logger } from '@nestjs/common';
import type { AgentSessionEvent } from '@mariozechner/pi-coding-agent';
import { AgentService } from '../agent/agent.service.js';
import { v4 as uuid } from 'uuid';
interface ChatRequest {
conversationId?: string;
content: string;
}
interface ChatResponse {
conversationId: string;
text: string;
}
@Controller('api/chat')
export class ChatController {
private readonly logger = new Logger(ChatController.name);
constructor(private readonly agentService: AgentService) {}
@Post()
async chat(@Body() body: ChatRequest): Promise<ChatResponse> {
const conversationId = body.conversationId ?? uuid();
let agentSession = this.agentService.getSession(conversationId);
if (!agentSession) {
agentSession = await this.agentService.createSession(conversationId);
}
let responseText = '';
const done = new Promise<void>((resolve) => {
const cleanup = this.agentService.onEvent(conversationId, (event: AgentSessionEvent) => {
if (event.type === 'message_update' && event.assistantMessageEvent.type === 'text_delta') {
responseText += event.assistantMessageEvent.delta;
}
if (event.type === 'agent_end') {
cleanup();
resolve();
}
});
setTimeout(() => {
cleanup();
resolve();
}, 120_000);
});
await this.agentService.prompt(conversationId, body.content);
await done;
return { conversationId, text: responseText };
}
}

View File

@@ -0,0 +1,139 @@
import { Logger } from '@nestjs/common';
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
OnGatewayConnection,
OnGatewayDisconnect,
type OnGatewayInit,
ConnectedSocket,
MessageBody,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import type { AgentSessionEvent } from '@mariozechner/pi-coding-agent';
import { AgentService } from '../agent/agent.service.js';
import { v4 as uuid } from 'uuid';
interface ChatMessage {
conversationId?: string;
content: string;
}
@WebSocketGateway({
cors: { origin: '*' },
namespace: '/chat',
})
export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server!: Server;
private readonly logger = new Logger(ChatGateway.name);
private readonly clientSessions = new Map<
string,
{ conversationId: string; cleanup: () => void }
>();
constructor(private readonly agentService: AgentService) {}
afterInit(): void {
this.logger.log('Chat WebSocket gateway initialized');
}
handleConnection(client: Socket): void {
this.logger.log(`Client connected: ${client.id}`);
}
handleDisconnect(client: Socket): void {
this.logger.log(`Client disconnected: ${client.id}`);
const session = this.clientSessions.get(client.id);
if (session) {
session.cleanup();
this.clientSessions.delete(client.id);
}
}
@SubscribeMessage('message')
async handleMessage(
@ConnectedSocket() client: Socket,
@MessageBody() data: ChatMessage,
): Promise<void> {
const conversationId = data.conversationId ?? uuid();
this.logger.log(`Message from ${client.id} in conversation ${conversationId}`);
// Ensure agent session exists for this conversation
let agentSession = this.agentService.getSession(conversationId);
if (!agentSession) {
agentSession = await this.agentService.createSession(conversationId);
}
// Clean up any previous event subscription for this client
const existing = this.clientSessions.get(client.id);
if (existing && existing.conversationId !== conversationId) {
existing.cleanup();
}
// Subscribe to agent events and relay to client
const cleanup = this.agentService.onEvent(conversationId, (event: AgentSessionEvent) => {
this.relayEvent(client, conversationId, event);
});
this.clientSessions.set(client.id, { conversationId, cleanup });
// Send acknowledgment
client.emit('message:ack', { conversationId, messageId: uuid() });
// Dispatch to agent
try {
await this.agentService.prompt(conversationId, data.content);
} catch (err) {
this.logger.error(`Agent prompt failed: ${err}`);
client.emit('error', { conversationId, error: String(err) });
}
}
private relayEvent(client: Socket, conversationId: string, event: AgentSessionEvent): void {
switch (event.type) {
case 'agent_start':
client.emit('agent:start', { conversationId });
break;
case 'agent_end':
client.emit('agent:end', { conversationId });
break;
case 'message_update': {
const assistantEvent = event.assistantMessageEvent;
if (assistantEvent.type === 'text_delta') {
client.emit('agent:text', {
conversationId,
text: assistantEvent.delta,
});
} else if (assistantEvent.type === 'thinking_delta') {
client.emit('agent:thinking', {
conversationId,
text: assistantEvent.delta,
});
}
break;
}
case 'tool_execution_start':
client.emit('agent:tool:start', {
conversationId,
toolCallId: event.toolCallId,
toolName: event.toolName,
});
break;
case 'tool_execution_end':
client.emit('agent:tool:end', {
conversationId,
toolCallId: event.toolCallId,
toolName: event.toolName,
isError: event.isError,
});
break;
}
}
}

View File

@@ -0,0 +1,9 @@
import { Module } from '@nestjs/common';
import { ChatGateway } from './chat.gateway.js';
import { ChatController } from './chat.controller.js';
@Module({
controllers: [ChatController],
providers: [ChatGateway],
})
export class ChatModule {}

View File

@@ -1,10 +1,11 @@
{
"name": "@mosaic/cli",
"version": "0.0.0",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"bin": {
"mosaic": "dist/index.js"
"mosaic": "dist/cli.js"
},
"exports": {
".": {
@@ -14,11 +15,22 @@
},
"scripts": {
"build": "tsc",
"dev": "tsx src/cli.ts",
"lint": "eslint src",
"typecheck": "tsc --noEmit",
"test": "vitest run"
},
"dependencies": {
"ink": "^5.0.0",
"ink-text-input": "^6.0.0",
"ink-spinner": "^5.0.0",
"react": "^18.3.0",
"socket.io-client": "^4.8.0",
"commander": "^13.0.0"
},
"devDependencies": {
"@types/react": "^18.3.0",
"tsx": "^4.0.0",
"typescript": "^5.8.0",
"vitest": "^2.0.0"
}

28
packages/cli/src/cli.ts Normal file
View File

@@ -0,0 +1,28 @@
#!/usr/bin/env node
import { Command } from 'commander';
const program = new Command();
program.name('mosaic').description('Mosaic Stack CLI').version('0.0.0');
program
.command('tui')
.description('Launch interactive TUI connected to the gateway')
.option('-g, --gateway <url>', 'Gateway URL', 'http://localhost:4000')
.option('-c, --conversation <id>', 'Resume a conversation by ID')
.action(async (opts: { gateway: string; conversation?: string }) => {
// Dynamic import to avoid loading React/Ink for other commands
const { render } = await import('ink');
const React = await import('react');
const { TuiApp } = await import('./tui/app.js');
render(
React.createElement(TuiApp, {
gatewayUrl: opts.gateway,
conversationId: opts.conversation,
}),
);
});
program.parse();

View File

@@ -0,0 +1,144 @@
import React, { useState, useCallback, useEffect, useRef } from 'react';
import { Box, Text, useInput, useApp } from 'ink';
import TextInput from 'ink-text-input';
import Spinner from 'ink-spinner';
import { io, type Socket } from 'socket.io-client';
interface Message {
role: 'user' | 'assistant';
content: string;
}
interface TuiAppProps {
gatewayUrl: string;
conversationId?: string;
}
export function TuiApp({ gatewayUrl, conversationId: initialConversationId }: TuiAppProps) {
const { exit } = useApp();
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const [connected, setConnected] = useState(false);
const [conversationId, setConversationId] = useState(initialConversationId);
const [currentStreamText, setCurrentStreamText] = useState('');
const socketRef = useRef<Socket | null>(null);
useEffect(() => {
const socket = io(`${gatewayUrl}/chat`, {
transports: ['websocket'],
});
socketRef.current = socket;
socket.on('connect', () => setConnected(true));
socket.on('disconnect', () => setConnected(false));
socket.on('message:ack', (data: { conversationId: string }) => {
setConversationId(data.conversationId);
});
socket.on('agent:start', () => {
setIsStreaming(true);
setCurrentStreamText('');
});
socket.on('agent:text', (data: { text: string }) => {
setCurrentStreamText((prev) => prev + data.text);
});
socket.on('agent:end', () => {
setCurrentStreamText((prev) => {
if (prev) {
setMessages((msgs) => [...msgs, { role: 'assistant', content: prev }]);
}
return '';
});
setIsStreaming(false);
});
socket.on('error', (data: { error: string }) => {
setMessages((msgs) => [...msgs, { role: 'assistant', content: `Error: ${data.error}` }]);
setIsStreaming(false);
});
return () => {
socket.disconnect();
};
}, [gatewayUrl]);
const handleSubmit = useCallback(
(value: string) => {
if (!value.trim() || isStreaming || !socketRef.current) return;
setMessages((msgs) => [...msgs, { role: 'user', content: value }]);
setInput('');
socketRef.current.emit('message', {
conversationId,
content: value,
});
},
[conversationId, isStreaming],
);
useInput((ch, key) => {
if (key.ctrl && ch === 'c') {
exit();
}
});
return (
<Box flexDirection="column" padding={1}>
<Box marginBottom={1}>
<Text bold color="blue">
Mosaic
</Text>
<Text> </Text>
<Text dimColor>{connected ? `connected` : 'connecting...'}</Text>
{conversationId && <Text dimColor> | {conversationId.slice(0, 8)}</Text>}
</Box>
<Box flexDirection="column" marginBottom={1}>
{messages.map((msg, i) => (
<Box key={i} marginBottom={1}>
<Text bold color={msg.role === 'user' ? 'green' : 'cyan'}>
{msg.role === 'user' ? '> ' : ' '}
</Text>
<Text wrap="wrap">{msg.content}</Text>
</Box>
))}
{isStreaming && currentStreamText && (
<Box marginBottom={1}>
<Text bold color="cyan">
{' '}
</Text>
<Text wrap="wrap">{currentStreamText}</Text>
</Box>
)}
{isStreaming && !currentStreamText && (
<Box>
<Text color="cyan">
<Spinner type="dots" />
</Text>
<Text dimColor> thinking...</Text>
</Box>
)}
</Box>
<Box>
<Text bold color="green">
{'> '}
</Text>
<TextInput
value={input}
onChange={setInput}
onSubmit={handleSubmit}
placeholder={isStreaming ? 'waiting...' : 'type a message'}
/>
</Box>
</Box>
);
}

View File

@@ -2,7 +2,8 @@
"extends": "../../tsconfig.base.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src"
"rootDir": "src",
"jsx": "react-jsx"
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]

View File

@@ -11,11 +11,17 @@
},
"scripts": {
"build": "tsc",
"dev": "tsx watch src/index.ts",
"lint": "eslint src",
"typecheck": "tsc --noEmit",
"test": "vitest run"
},
"dependencies": {
"discord.js": "^14.16.0",
"socket.io-client": "^4.8.0"
},
"devDependencies": {
"tsx": "^4.0.0",
"typescript": "^5.8.0",
"vitest": "^2.0.0"
}

View File

@@ -1 +1,153 @@
export const VERSION = '0.0.0';
import { Client, GatewayIntentBits, type Message as DiscordMessage } from 'discord.js';
import { io, type Socket } from 'socket.io-client';
export interface DiscordPluginConfig {
token: string;
gatewayUrl: string;
/** Which guild to bind to (single-guild only for v0.1.0) */
guildId?: string;
}
export class DiscordPlugin {
private client: Client;
private socket: Socket | null = null;
private config: DiscordPluginConfig;
/** Map Discord channel ID → Mosaic conversation ID */
private channelConversations = new Map<string, string>();
/** Track in-flight responses to avoid duplicate streaming */
private pendingResponses = new Map<string, string>();
constructor(config: DiscordPluginConfig) {
this.config = config;
this.client = new Client({
intents: [
GatewayIntentBits.Guilds,
GatewayIntentBits.GuildMessages,
GatewayIntentBits.MessageContent,
GatewayIntentBits.DirectMessages,
],
});
}
async start(): Promise<void> {
// Connect to gateway WebSocket
this.socket = io(`${this.config.gatewayUrl}/chat`, {
transports: ['websocket'],
});
this.socket.on('connect', () => {
console.log('[discord] Connected to gateway');
});
// Handle streaming text from gateway
this.socket.on('agent:text', (data: { conversationId: string; text: string }) => {
const pending = this.pendingResponses.get(data.conversationId);
if (pending !== undefined) {
this.pendingResponses.set(data.conversationId, pending + data.text);
}
});
// When agent finishes, send the accumulated response
this.socket.on('agent:end', (data: { conversationId: string }) => {
const text = this.pendingResponses.get(data.conversationId);
if (text) {
this.sendToDiscord(data.conversationId, text);
this.pendingResponses.delete(data.conversationId);
}
});
this.socket.on('agent:start', (data: { conversationId: string }) => {
this.pendingResponses.set(data.conversationId, '');
});
// Set up Discord message handler
this.client.on('messageCreate', (message) => this.handleDiscordMessage(message));
this.client.on('ready', () => {
console.log(`[discord] Bot logged in as ${this.client.user?.tag}`);
});
await this.client.login(this.config.token);
}
async stop(): Promise<void> {
this.socket?.disconnect();
await this.client.destroy();
}
private handleDiscordMessage(message: DiscordMessage): void {
// Ignore bot messages
if (message.author.bot) return;
// Check guild binding
if (this.config.guildId && message.guildId !== this.config.guildId) return;
// Respond to DMs always, or mentions in channels
const isDM = !message.guildId;
const isMention = message.mentions.has(this.client.user!);
if (!isDM && !isMention) return;
// Strip bot mention from message content
const content = message.content
.replace(new RegExp(`<@!?${this.client.user!.id}>`, 'g'), '')
.trim();
if (!content) return;
// Get or create conversation for this Discord channel
const channelId = message.channelId;
let conversationId = this.channelConversations.get(channelId);
if (!conversationId) {
conversationId = `discord-${channelId}`;
this.channelConversations.set(channelId, conversationId);
}
// Send to gateway
this.socket?.emit('message', {
conversationId,
content,
});
}
private sendToDiscord(conversationId: string, text: string): void {
// Find the Discord channel for this conversation
const channelId = Array.from(this.channelConversations.entries()).find(
([, convId]) => convId === conversationId,
)?.[0];
if (!channelId) return;
const channel = this.client.channels.cache.get(channelId);
if (!channel || !('send' in channel)) return;
// Chunk responses for Discord's 2000-char limit
const chunks = this.chunkText(text, 1900);
for (const chunk of chunks) {
(channel as { send: (content: string) => Promise<unknown> }).send(chunk);
}
}
private chunkText(text: string, maxLength: number): string[] {
if (text.length <= maxLength) return [text];
const chunks: string[] = [];
let remaining = text;
while (remaining.length > 0) {
if (remaining.length <= maxLength) {
chunks.push(remaining);
break;
}
// Try to break at a newline
let breakPoint = remaining.lastIndexOf('\n', maxLength);
if (breakPoint <= 0) breakPoint = maxLength;
chunks.push(remaining.slice(0, breakPoint));
remaining = remaining.slice(breakPoint).trimStart();
}
return chunks;
}
}

4350
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff