fix: remediate 10 review findings in communication spine
- Fix createSession race condition with in-flight promise map - Fix listener leak: always cleanup previous subscription per client - Fix REST timeout returning HTTP 200 — now rejects with 504 - Fix fire-and-forget Discord sends — await with error handling - Fix non-null assertion on client.user in Discord plugin - Fix TUI disconnect mid-stream deadlock (reset streaming state) - Add connect_error handler to TUI and Discord plugin - Add connected guard on TUI message submit - Add relayEvent guard for disconnected sockets - Sanitize error messages sent to WebSocket clients - Add error logging/context to AgentService create/prompt/destroy Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -17,18 +17,39 @@ export interface AgentSession {
|
|||||||
export class AgentService implements OnModuleDestroy {
|
export class AgentService implements OnModuleDestroy {
|
||||||
private readonly logger = new Logger(AgentService.name);
|
private readonly logger = new Logger(AgentService.name);
|
||||||
private readonly sessions = new Map<string, AgentSession>();
|
private readonly sessions = new Map<string, AgentSession>();
|
||||||
|
private readonly creating = new Map<string, Promise<AgentSession>>();
|
||||||
|
|
||||||
async createSession(sessionId: string): Promise<AgentSession> {
|
async createSession(sessionId: string): Promise<AgentSession> {
|
||||||
if (this.sessions.has(sessionId)) {
|
const existing = this.sessions.get(sessionId);
|
||||||
return this.sessions.get(sessionId)!;
|
if (existing) return existing;
|
||||||
}
|
|
||||||
|
|
||||||
|
const inflight = this.creating.get(sessionId);
|
||||||
|
if (inflight) return inflight;
|
||||||
|
|
||||||
|
const promise = this.doCreateSession(sessionId).finally(() => {
|
||||||
|
this.creating.delete(sessionId);
|
||||||
|
});
|
||||||
|
this.creating.set(sessionId, promise);
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async doCreateSession(sessionId: string): Promise<AgentSession> {
|
||||||
this.logger.log(`Creating agent session: ${sessionId}`);
|
this.logger.log(`Creating agent session: ${sessionId}`);
|
||||||
|
|
||||||
const { session: piSession } = await createAgentSession({
|
let piSession: PiAgentSession;
|
||||||
sessionManager: SessionManager.inMemory(),
|
try {
|
||||||
tools: [],
|
const result = await createAgentSession({
|
||||||
});
|
sessionManager: SessionManager.inMemory(),
|
||||||
|
tools: [],
|
||||||
|
});
|
||||||
|
piSession = result.session;
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(
|
||||||
|
`Failed to create Pi SDK session for ${sessionId}`,
|
||||||
|
err instanceof Error ? err.stack : String(err),
|
||||||
|
);
|
||||||
|
throw new Error(`Agent session creation failed for ${sessionId}: ${String(err)}`);
|
||||||
|
}
|
||||||
|
|
||||||
const listeners = new Set<(event: AgentSessionEvent) => void>();
|
const listeners = new Set<(event: AgentSessionEvent) => void>();
|
||||||
|
|
||||||
@@ -64,7 +85,15 @@ export class AgentService implements OnModuleDestroy {
|
|||||||
if (!session) {
|
if (!session) {
|
||||||
throw new Error(`No agent session found: ${sessionId}`);
|
throw new Error(`No agent session found: ${sessionId}`);
|
||||||
}
|
}
|
||||||
await session.piSession.prompt(message);
|
try {
|
||||||
|
await session.piSession.prompt(message);
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(
|
||||||
|
`Pi SDK prompt failed for session=${sessionId}, messageLength=${message.length}`,
|
||||||
|
err instanceof Error ? err.stack : String(err),
|
||||||
|
);
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
onEvent(sessionId: string, listener: (event: AgentSessionEvent) => void): () => void {
|
onEvent(sessionId: string, listener: (event: AgentSessionEvent) => void): () => void {
|
||||||
@@ -78,17 +107,25 @@ export class AgentService implements OnModuleDestroy {
|
|||||||
|
|
||||||
async destroySession(sessionId: string): Promise<void> {
|
async destroySession(sessionId: string): Promise<void> {
|
||||||
const session = this.sessions.get(sessionId);
|
const session = this.sessions.get(sessionId);
|
||||||
if (session) {
|
if (!session) return;
|
||||||
this.logger.log(`Destroying agent session ${sessionId}`);
|
this.logger.log(`Destroying agent session ${sessionId}`);
|
||||||
|
try {
|
||||||
session.unsubscribe();
|
session.unsubscribe();
|
||||||
session.listeners.clear();
|
} catch (err) {
|
||||||
this.sessions.delete(sessionId);
|
this.logger.error(`Failed to unsubscribe Pi session ${sessionId}`, String(err));
|
||||||
}
|
}
|
||||||
|
session.listeners.clear();
|
||||||
|
this.sessions.delete(sessionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
async onModuleDestroy(): Promise<void> {
|
async onModuleDestroy(): Promise<void> {
|
||||||
this.logger.log('Shutting down all agent sessions');
|
this.logger.log('Shutting down all agent sessions');
|
||||||
const stops = Array.from(this.sessions.keys()).map((id) => this.destroySession(id));
|
const stops = Array.from(this.sessions.keys()).map((id) => this.destroySession(id));
|
||||||
await Promise.allSettled(stops);
|
const results = await Promise.allSettled(stops);
|
||||||
|
for (const result of results) {
|
||||||
|
if (result.status === 'rejected') {
|
||||||
|
this.logger.error('Session shutdown failure', String(result.reason));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { Controller, Post, Body, Logger } from '@nestjs/common';
|
import { Controller, Post, Body, Logger, HttpException, HttpStatus } from '@nestjs/common';
|
||||||
import type { AgentSessionEvent } from '@mariozechner/pi-coding-agent';
|
import type { AgentSessionEvent } from '@mariozechner/pi-coding-agent';
|
||||||
import { AgentService } from '../agent/agent.service.js';
|
import { AgentService } from '../agent/agent.service.js';
|
||||||
import { v4 as uuid } from 'uuid';
|
import { v4 as uuid } from 'uuid';
|
||||||
@@ -23,32 +23,52 @@ export class ChatController {
|
|||||||
async chat(@Body() body: ChatRequest): Promise<ChatResponse> {
|
async chat(@Body() body: ChatRequest): Promise<ChatResponse> {
|
||||||
const conversationId = body.conversationId ?? uuid();
|
const conversationId = body.conversationId ?? uuid();
|
||||||
|
|
||||||
let agentSession = this.agentService.getSession(conversationId);
|
try {
|
||||||
if (!agentSession) {
|
let agentSession = this.agentService.getSession(conversationId);
|
||||||
agentSession = await this.agentService.createSession(conversationId);
|
if (!agentSession) {
|
||||||
|
agentSession = await this.agentService.createSession(conversationId);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(
|
||||||
|
`Session creation failed for conversation=${conversationId}`,
|
||||||
|
err instanceof Error ? err.stack : String(err),
|
||||||
|
);
|
||||||
|
throw new HttpException('Agent session unavailable', HttpStatus.SERVICE_UNAVAILABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
let responseText = '';
|
let responseText = '';
|
||||||
|
|
||||||
const done = new Promise<void>((resolve) => {
|
const done = new Promise<void>((resolve, reject) => {
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
cleanup();
|
||||||
|
this.logger.error(`Agent response timed out after 120s for conversation=${conversationId}`);
|
||||||
|
reject(new Error('Agent response timed out'));
|
||||||
|
}, 120_000);
|
||||||
|
|
||||||
const cleanup = this.agentService.onEvent(conversationId, (event: AgentSessionEvent) => {
|
const cleanup = this.agentService.onEvent(conversationId, (event: AgentSessionEvent) => {
|
||||||
if (event.type === 'message_update' && event.assistantMessageEvent.type === 'text_delta') {
|
if (event.type === 'message_update' && event.assistantMessageEvent.type === 'text_delta') {
|
||||||
responseText += event.assistantMessageEvent.delta;
|
responseText += event.assistantMessageEvent.delta;
|
||||||
}
|
}
|
||||||
if (event.type === 'agent_end') {
|
if (event.type === 'agent_end') {
|
||||||
|
clearTimeout(timer);
|
||||||
cleanup();
|
cleanup();
|
||||||
resolve();
|
resolve();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
setTimeout(() => {
|
|
||||||
cleanup();
|
|
||||||
resolve();
|
|
||||||
}, 120_000);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
await this.agentService.prompt(conversationId, body.content);
|
try {
|
||||||
await done;
|
await this.agentService.prompt(conversationId, body.content);
|
||||||
|
await done;
|
||||||
|
} catch (err) {
|
||||||
|
if (err instanceof HttpException) throw err;
|
||||||
|
const message = err instanceof Error ? err.message : String(err);
|
||||||
|
if (message.includes('timed out')) {
|
||||||
|
throw new HttpException('Agent response timed out', HttpStatus.GATEWAY_TIMEOUT);
|
||||||
|
}
|
||||||
|
this.logger.error(`Chat prompt failed for conversation=${conversationId}`, String(err));
|
||||||
|
throw new HttpException('Agent processing failed', HttpStatus.INTERNAL_SERVER_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
return { conversationId, text: responseText };
|
return { conversationId, text: responseText };
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -62,14 +62,26 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa
|
|||||||
this.logger.log(`Message from ${client.id} in conversation ${conversationId}`);
|
this.logger.log(`Message from ${client.id} in conversation ${conversationId}`);
|
||||||
|
|
||||||
// Ensure agent session exists for this conversation
|
// Ensure agent session exists for this conversation
|
||||||
let agentSession = this.agentService.getSession(conversationId);
|
try {
|
||||||
if (!agentSession) {
|
let agentSession = this.agentService.getSession(conversationId);
|
||||||
agentSession = await this.agentService.createSession(conversationId);
|
if (!agentSession) {
|
||||||
|
agentSession = await this.agentService.createSession(conversationId);
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
this.logger.error(
|
||||||
|
`Session creation failed for client=${client.id}, conversation=${conversationId}`,
|
||||||
|
err instanceof Error ? err.stack : String(err),
|
||||||
|
);
|
||||||
|
client.emit('error', {
|
||||||
|
conversationId,
|
||||||
|
error: 'Failed to start agent session. Please try again.',
|
||||||
|
});
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up any previous event subscription for this client
|
// Always clean up previous listener to prevent leak
|
||||||
const existing = this.clientSessions.get(client.id);
|
const existing = this.clientSessions.get(client.id);
|
||||||
if (existing && existing.conversationId !== conversationId) {
|
if (existing) {
|
||||||
existing.cleanup();
|
existing.cleanup();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,12 +99,25 @@ export class ChatGateway implements OnGatewayInit, OnGatewayConnection, OnGatewa
|
|||||||
try {
|
try {
|
||||||
await this.agentService.prompt(conversationId, data.content);
|
await this.agentService.prompt(conversationId, data.content);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.logger.error(`Agent prompt failed: ${err}`);
|
this.logger.error(
|
||||||
client.emit('error', { conversationId, error: String(err) });
|
`Agent prompt failed for client=${client.id}, conversation=${conversationId}`,
|
||||||
|
err instanceof Error ? err.stack : String(err),
|
||||||
|
);
|
||||||
|
client.emit('error', {
|
||||||
|
conversationId,
|
||||||
|
error: 'The agent failed to process your message. Please try again.',
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private relayEvent(client: Socket, conversationId: string, event: AgentSessionEvent): void {
|
private relayEvent(client: Socket, conversationId: string, event: AgentSessionEvent): void {
|
||||||
|
if (!client.connected) {
|
||||||
|
this.logger.warn(
|
||||||
|
`Dropping event ${event.type} for disconnected client=${client.id}, conversation=${conversationId}`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
switch (event.type) {
|
switch (event.type) {
|
||||||
case 'agent_start':
|
case 'agent_start':
|
||||||
client.emit('agent:start', { conversationId });
|
client.emit('agent:start', { conversationId });
|
||||||
|
|||||||
@@ -32,7 +32,20 @@ export function TuiApp({ gatewayUrl, conversationId: initialConversationId }: Tu
|
|||||||
socketRef.current = socket;
|
socketRef.current = socket;
|
||||||
|
|
||||||
socket.on('connect', () => setConnected(true));
|
socket.on('connect', () => setConnected(true));
|
||||||
socket.on('disconnect', () => setConnected(false));
|
socket.on('disconnect', () => {
|
||||||
|
setConnected(false);
|
||||||
|
setIsStreaming(false);
|
||||||
|
setCurrentStreamText('');
|
||||||
|
});
|
||||||
|
socket.on('connect_error', (err: Error) => {
|
||||||
|
setMessages((msgs) => [
|
||||||
|
...msgs,
|
||||||
|
{
|
||||||
|
role: 'assistant',
|
||||||
|
content: `Connection failed: ${err.message}. Check that the gateway is running at ${gatewayUrl}.`,
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
socket.on('message:ack', (data: { conversationId: string }) => {
|
socket.on('message:ack', (data: { conversationId: string }) => {
|
||||||
setConversationId(data.conversationId);
|
setConversationId(data.conversationId);
|
||||||
@@ -69,7 +82,14 @@ export function TuiApp({ gatewayUrl, conversationId: initialConversationId }: Tu
|
|||||||
|
|
||||||
const handleSubmit = useCallback(
|
const handleSubmit = useCallback(
|
||||||
(value: string) => {
|
(value: string) => {
|
||||||
if (!value.trim() || isStreaming || !socketRef.current) return;
|
if (!value.trim() || isStreaming) return;
|
||||||
|
if (!socketRef.current?.connected) {
|
||||||
|
setMessages((msgs) => [
|
||||||
|
...msgs,
|
||||||
|
{ role: 'assistant', content: 'Not connected to gateway. Message not sent.' },
|
||||||
|
]);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
setMessages((msgs) => [...msgs, { role: 'user', content: value }]);
|
setMessages((msgs) => [...msgs, { role: 'user', content: value }]);
|
||||||
setInput('');
|
setInput('');
|
||||||
|
|||||||
@@ -39,6 +39,15 @@ export class DiscordPlugin {
|
|||||||
console.log('[discord] Connected to gateway');
|
console.log('[discord] Connected to gateway');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this.socket.on('disconnect', (reason: string) => {
|
||||||
|
console.error(`[discord] Disconnected from gateway: ${reason}`);
|
||||||
|
this.pendingResponses.clear();
|
||||||
|
});
|
||||||
|
|
||||||
|
this.socket.on('connect_error', (err: Error) => {
|
||||||
|
console.error(`[discord] Gateway connection error: ${err.message}`);
|
||||||
|
});
|
||||||
|
|
||||||
// Handle streaming text from gateway
|
// Handle streaming text from gateway
|
||||||
this.socket.on('agent:text', (data: { conversationId: string; text: string }) => {
|
this.socket.on('agent:text', (data: { conversationId: string; text: string }) => {
|
||||||
const pending = this.pendingResponses.get(data.conversationId);
|
const pending = this.pendingResponses.get(data.conversationId);
|
||||||
@@ -51,8 +60,10 @@ export class DiscordPlugin {
|
|||||||
this.socket.on('agent:end', (data: { conversationId: string }) => {
|
this.socket.on('agent:end', (data: { conversationId: string }) => {
|
||||||
const text = this.pendingResponses.get(data.conversationId);
|
const text = this.pendingResponses.get(data.conversationId);
|
||||||
if (text) {
|
if (text) {
|
||||||
this.sendToDiscord(data.conversationId, text);
|
|
||||||
this.pendingResponses.delete(data.conversationId);
|
this.pendingResponses.delete(data.conversationId);
|
||||||
|
this.sendToDiscord(data.conversationId, text).catch((err) => {
|
||||||
|
console.error(`[discord] Error sending response for ${data.conversationId}:`, err);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -79,18 +90,21 @@ export class DiscordPlugin {
|
|||||||
// Ignore bot messages
|
// Ignore bot messages
|
||||||
if (message.author.bot) return;
|
if (message.author.bot) return;
|
||||||
|
|
||||||
|
// Not ready yet
|
||||||
|
if (!this.client.user) return;
|
||||||
|
|
||||||
// Check guild binding
|
// Check guild binding
|
||||||
if (this.config.guildId && message.guildId !== this.config.guildId) return;
|
if (this.config.guildId && message.guildId !== this.config.guildId) return;
|
||||||
|
|
||||||
// Respond to DMs always, or mentions in channels
|
// Respond to DMs always, or mentions in channels
|
||||||
const isDM = !message.guildId;
|
const isDM = !message.guildId;
|
||||||
const isMention = message.mentions.has(this.client.user!);
|
const isMention = message.mentions.has(this.client.user);
|
||||||
|
|
||||||
if (!isDM && !isMention) return;
|
if (!isDM && !isMention) return;
|
||||||
|
|
||||||
// Strip bot mention from message content
|
// Strip bot mention from message content
|
||||||
const content = message.content
|
const content = message.content
|
||||||
.replace(new RegExp(`<@!?${this.client.user!.id}>`, 'g'), '')
|
.replace(new RegExp(`<@!?${this.client.user.id}>`, 'g'), '')
|
||||||
.trim();
|
.trim();
|
||||||
|
|
||||||
if (!content) return;
|
if (!content) return;
|
||||||
@@ -104,27 +118,45 @@ export class DiscordPlugin {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send to gateway
|
// Send to gateway
|
||||||
this.socket?.emit('message', {
|
if (!this.socket?.connected) {
|
||||||
|
console.error(
|
||||||
|
`[discord] Cannot forward message: not connected to gateway. channel=${channelId}`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.socket.emit('message', {
|
||||||
conversationId,
|
conversationId,
|
||||||
content,
|
content,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private sendToDiscord(conversationId: string, text: string): void {
|
private async sendToDiscord(conversationId: string, text: string): Promise<void> {
|
||||||
// Find the Discord channel for this conversation
|
// Find the Discord channel for this conversation
|
||||||
const channelId = Array.from(this.channelConversations.entries()).find(
|
const channelId = Array.from(this.channelConversations.entries()).find(
|
||||||
([, convId]) => convId === conversationId,
|
([, convId]) => convId === conversationId,
|
||||||
)?.[0];
|
)?.[0];
|
||||||
|
|
||||||
if (!channelId) return;
|
if (!channelId) {
|
||||||
|
console.error(`[discord] No channel found for conversation ${conversationId}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const channel = this.client.channels.cache.get(channelId);
|
const channel = this.client.channels.cache.get(channelId);
|
||||||
if (!channel || !('send' in channel)) return;
|
if (!channel || !('send' in channel)) {
|
||||||
|
console.error(
|
||||||
|
`[discord] Channel ${channelId} not sendable for conversation ${conversationId}`,
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Chunk responses for Discord's 2000-char limit
|
// Chunk responses for Discord's 2000-char limit
|
||||||
const chunks = this.chunkText(text, 1900);
|
const chunks = this.chunkText(text, 1900);
|
||||||
for (const chunk of chunks) {
|
for (const chunk of chunks) {
|
||||||
(channel as { send: (content: string) => Promise<unknown> }).send(chunk);
|
try {
|
||||||
|
await (channel as { send: (content: string) => Promise<unknown> }).send(chunk);
|
||||||
|
} catch (err) {
|
||||||
|
console.error(`[discord] Failed to send message to channel ${channelId}:`, err);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user