From 11d468cf7b8376478b0a204c878318d99b2aff08 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Thu, 12 Mar 2026 20:31:39 -0500 Subject: [PATCH] fix: remediate 10 review findings in communication spine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix createSession race condition with in-flight promise map - Fix listener leak: always cleanup previous subscription per client - Fix REST timeout returning HTTP 200 — now rejects with 504 - Fix fire-and-forget Discord sends — await with error handling - Fix non-null assertion on client.user in Discord plugin - Fix TUI disconnect mid-stream deadlock (reset streaming state) - Add connect_error handler to TUI and Discord plugin - Add connected guard on TUI message submit - Add relayEvent guard for disconnected sockets - Sanitize error messages sent to WebSocket clients - Add error logging/context to AgentService create/prompt/destroy Co-Authored-By: Claude Opus 4.6 --- apps/gateway/src/agent/agent.service.ts | 63 +++++++++++++++++++----- apps/gateway/src/chat/chat.controller.ts | 44 ++++++++++++----- apps/gateway/src/chat/chat.gateway.ts | 39 ++++++++++++--- packages/cli/src/tui/app.tsx | 24 ++++++++- plugins/discord/src/index.ts | 48 +++++++++++++++--- 5 files changed, 176 insertions(+), 42 deletions(-) diff --git a/apps/gateway/src/agent/agent.service.ts b/apps/gateway/src/agent/agent.service.ts index f8be42f..9da4dc1 100644 --- a/apps/gateway/src/agent/agent.service.ts +++ b/apps/gateway/src/agent/agent.service.ts @@ -17,18 +17,39 @@ export interface AgentSession { export class AgentService implements OnModuleDestroy { private readonly logger = new Logger(AgentService.name); private readonly sessions = new Map(); + private readonly creating = new Map>(); async createSession(sessionId: string): Promise { - if (this.sessions.has(sessionId)) { - return this.sessions.get(sessionId)!; - } + const existing = this.sessions.get(sessionId); + if (existing) return existing; + const inflight = this.creating.get(sessionId); + if (inflight) return inflight; + + const promise = this.doCreateSession(sessionId).finally(() => { + this.creating.delete(sessionId); + }); + this.creating.set(sessionId, promise); + return promise; + } + + private async doCreateSession(sessionId: string): Promise { this.logger.log(`Creating agent session: ${sessionId}`); - const { session: piSession } = await createAgentSession({ - sessionManager: SessionManager.inMemory(), - tools: [], - }); + let piSession: PiAgentSession; + try { + const result = await createAgentSession({ + sessionManager: SessionManager.inMemory(), + tools: [], + }); + piSession = result.session; + } catch (err) { + this.logger.error( + `Failed to create Pi SDK session for ${sessionId}`, + err instanceof Error ? err.stack : String(err), + ); + throw new Error(`Agent session creation failed for ${sessionId}: ${String(err)}`); + } const listeners = new Set<(event: AgentSessionEvent) => void>(); @@ -64,7 +85,15 @@ export class AgentService implements OnModuleDestroy { if (!session) { throw new Error(`No agent session found: ${sessionId}`); } - await session.piSession.prompt(message); + try { + await session.piSession.prompt(message); + } catch (err) { + this.logger.error( + `Pi SDK prompt failed for session=${sessionId}, messageLength=${message.length}`, + err instanceof Error ? err.stack : String(err), + ); + throw err; + } } onEvent(sessionId: string, listener: (event: AgentSessionEvent) => void): () => void { @@ -78,17 +107,25 @@ export class AgentService implements OnModuleDestroy { async destroySession(sessionId: string): Promise { const session = this.sessions.get(sessionId); - if (session) { - this.logger.log(`Destroying agent session ${sessionId}`); + if (!session) return; + this.logger.log(`Destroying agent session ${sessionId}`); + try { session.unsubscribe(); - session.listeners.clear(); - this.sessions.delete(sessionId); + } catch (err) { + this.logger.error(`Failed to unsubscribe Pi session ${sessionId}`, String(err)); } + session.listeners.clear(); + this.sessions.delete(sessionId); } async onModuleDestroy(): Promise { this.logger.log('Shutting down all agent sessions'); const stops = Array.from(this.sessions.keys()).map((id) => this.destroySession(id)); - await Promise.allSettled(stops); + const results = await Promise.allSettled(stops); + for (const result of results) { + if (result.status === 'rejected') { + this.logger.error('Session shutdown failure', String(result.reason)); + } + } } } diff --git a/apps/gateway/src/chat/chat.controller.ts b/apps/gateway/src/chat/chat.controller.ts index 5cf7526..108a165 100644 --- a/apps/gateway/src/chat/chat.controller.ts +++ b/apps/gateway/src/chat/chat.controller.ts @@ -1,4 +1,4 @@ -import { Controller, Post, Body, Logger } from '@nestjs/common'; +import { Controller, Post, Body, Logger, HttpException, HttpStatus } from '@nestjs/common'; import type { AgentSessionEvent } from '@mariozechner/pi-coding-agent'; import { AgentService } from '../agent/agent.service.js'; import { v4 as uuid } from 'uuid'; @@ -23,32 +23,52 @@ export class ChatController { async chat(@Body() body: ChatRequest): Promise { const conversationId = body.conversationId ?? uuid(); - let agentSession = this.agentService.getSession(conversationId); - if (!agentSession) { - agentSession = await this.agentService.createSession(conversationId); + try { + let agentSession = this.agentService.getSession(conversationId); + if (!agentSession) { + agentSession = await this.agentService.createSession(conversationId); + } + } catch (err) { + this.logger.error( + `Session creation failed for conversation=${conversationId}`, + err instanceof Error ? err.stack : String(err), + ); + throw new HttpException('Agent session unavailable', HttpStatus.SERVICE_UNAVAILABLE); } let responseText = ''; - const done = new Promise((resolve) => { + const done = new Promise((resolve, reject) => { + const timer = setTimeout(() => { + cleanup(); + this.logger.error(`Agent response timed out after 120s for conversation=${conversationId}`); + reject(new Error('Agent response timed out')); + }, 120_000); + const cleanup = this.agentService.onEvent(conversationId, (event: AgentSessionEvent) => { if (event.type === 'message_update' && event.assistantMessageEvent.type === 'text_delta') { responseText += event.assistantMessageEvent.delta; } if (event.type === 'agent_end') { + clearTimeout(timer); cleanup(); resolve(); } }); - - setTimeout(() => { - cleanup(); - resolve(); - }, 120_000); }); - await this.agentService.prompt(conversationId, body.content); - await done; + try { + await this.agentService.prompt(conversationId, body.content); + await done; + } catch (err) { + if (err instanceof HttpException) throw err; + const message = err instanceof Error ? err.message : String(err); + if (message.includes('timed out')) { + throw new HttpException('Agent response timed out', HttpStatus.GATEWAY_TIMEOUT); + } + this.logger.error(`Chat prompt failed for conversation=${conversationId}`, String(err)); + throw new HttpException('Agent processing failed', HttpStatus.INTERNAL_SERVER_ERROR); + } return { conversationId, text: responseText }; } diff --git a/apps/gateway/src/chat/chat.gateway.ts b/apps/gateway/src/chat/chat.gateway.ts index d81b949..f217191 100644 --- a/apps/gateway/src/chat/chat.gateway.ts +++ b/apps/gateway/src/chat/chat.gateway.ts @@ -62,14 +62,26 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa this.logger.log(`Message from ${client.id} in conversation ${conversationId}`); // Ensure agent session exists for this conversation - let agentSession = this.agentService.getSession(conversationId); - if (!agentSession) { - agentSession = await this.agentService.createSession(conversationId); + try { + let agentSession = this.agentService.getSession(conversationId); + if (!agentSession) { + agentSession = await this.agentService.createSession(conversationId); + } + } catch (err) { + this.logger.error( + `Session creation failed for client=${client.id}, conversation=${conversationId}`, + err instanceof Error ? err.stack : String(err), + ); + client.emit('error', { + conversationId, + error: 'Failed to start agent session. Please try again.', + }); + return; } - // Clean up any previous event subscription for this client + // Always clean up previous listener to prevent leak const existing = this.clientSessions.get(client.id); - if (existing && existing.conversationId !== conversationId) { + if (existing) { existing.cleanup(); } @@ -87,12 +99,25 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa try { await this.agentService.prompt(conversationId, data.content); } catch (err) { - this.logger.error(`Agent prompt failed: ${err}`); - client.emit('error', { conversationId, error: String(err) }); + this.logger.error( + `Agent prompt failed for client=${client.id}, conversation=${conversationId}`, + err instanceof Error ? err.stack : String(err), + ); + client.emit('error', { + conversationId, + error: 'The agent failed to process your message. Please try again.', + }); } } private relayEvent(client: Socket, conversationId: string, event: AgentSessionEvent): void { + if (!client.connected) { + this.logger.warn( + `Dropping event ${event.type} for disconnected client=${client.id}, conversation=${conversationId}`, + ); + return; + } + switch (event.type) { case 'agent_start': client.emit('agent:start', { conversationId }); diff --git a/packages/cli/src/tui/app.tsx b/packages/cli/src/tui/app.tsx index d4d1cbd..831d4a8 100644 --- a/packages/cli/src/tui/app.tsx +++ b/packages/cli/src/tui/app.tsx @@ -32,7 +32,20 @@ export function TuiApp({ gatewayUrl, conversationId: initialConversationId }: Tu socketRef.current = socket; socket.on('connect', () => setConnected(true)); - socket.on('disconnect', () => setConnected(false)); + socket.on('disconnect', () => { + setConnected(false); + setIsStreaming(false); + setCurrentStreamText(''); + }); + socket.on('connect_error', (err: Error) => { + setMessages((msgs) => [ + ...msgs, + { + role: 'assistant', + content: `Connection failed: ${err.message}. Check that the gateway is running at ${gatewayUrl}.`, + }, + ]); + }); socket.on('message:ack', (data: { conversationId: string }) => { setConversationId(data.conversationId); @@ -69,7 +82,14 @@ export function TuiApp({ gatewayUrl, conversationId: initialConversationId }: Tu const handleSubmit = useCallback( (value: string) => { - if (!value.trim() || isStreaming || !socketRef.current) return; + if (!value.trim() || isStreaming) return; + if (!socketRef.current?.connected) { + setMessages((msgs) => [ + ...msgs, + { role: 'assistant', content: 'Not connected to gateway. Message not sent.' }, + ]); + return; + } setMessages((msgs) => [...msgs, { role: 'user', content: value }]); setInput(''); diff --git a/plugins/discord/src/index.ts b/plugins/discord/src/index.ts index 34ac63c..7407045 100644 --- a/plugins/discord/src/index.ts +++ b/plugins/discord/src/index.ts @@ -39,6 +39,15 @@ export class DiscordPlugin { console.log('[discord] Connected to gateway'); }); + this.socket.on('disconnect', (reason: string) => { + console.error(`[discord] Disconnected from gateway: ${reason}`); + this.pendingResponses.clear(); + }); + + this.socket.on('connect_error', (err: Error) => { + console.error(`[discord] Gateway connection error: ${err.message}`); + }); + // Handle streaming text from gateway this.socket.on('agent:text', (data: { conversationId: string; text: string }) => { const pending = this.pendingResponses.get(data.conversationId); @@ -51,8 +60,10 @@ export class DiscordPlugin { this.socket.on('agent:end', (data: { conversationId: string }) => { const text = this.pendingResponses.get(data.conversationId); if (text) { - this.sendToDiscord(data.conversationId, text); this.pendingResponses.delete(data.conversationId); + this.sendToDiscord(data.conversationId, text).catch((err) => { + console.error(`[discord] Error sending response for ${data.conversationId}:`, err); + }); } }); @@ -79,18 +90,21 @@ export class DiscordPlugin { // Ignore bot messages if (message.author.bot) return; + // Not ready yet + if (!this.client.user) return; + // Check guild binding if (this.config.guildId && message.guildId !== this.config.guildId) return; // Respond to DMs always, or mentions in channels const isDM = !message.guildId; - const isMention = message.mentions.has(this.client.user!); + const isMention = message.mentions.has(this.client.user); if (!isDM && !isMention) return; // Strip bot mention from message content const content = message.content - .replace(new RegExp(`<@!?${this.client.user!.id}>`, 'g'), '') + .replace(new RegExp(`<@!?${this.client.user.id}>`, 'g'), '') .trim(); if (!content) return; @@ -104,27 +118,45 @@ export class DiscordPlugin { } // Send to gateway - this.socket?.emit('message', { + if (!this.socket?.connected) { + console.error( + `[discord] Cannot forward message: not connected to gateway. channel=${channelId}`, + ); + return; + } + this.socket.emit('message', { conversationId, content, }); } - private sendToDiscord(conversationId: string, text: string): void { + private async sendToDiscord(conversationId: string, text: string): Promise { // Find the Discord channel for this conversation const channelId = Array.from(this.channelConversations.entries()).find( ([, convId]) => convId === conversationId, )?.[0]; - if (!channelId) return; + if (!channelId) { + console.error(`[discord] No channel found for conversation ${conversationId}`); + return; + } const channel = this.client.channels.cache.get(channelId); - if (!channel || !('send' in channel)) return; + if (!channel || !('send' in channel)) { + console.error( + `[discord] Channel ${channelId} not sendable for conversation ${conversationId}`, + ); + return; + } // Chunk responses for Discord's 2000-char limit const chunks = this.chunkText(text, 1900); for (const chunk of chunks) { - (channel as { send: (content: string) => Promise }).send(chunk); + try { + await (channel as { send: (content: string) => Promise }).send(chunk); + } catch (err) { + console.error(`[discord] Failed to send message to channel ${channelId}:`, err); + } } }