From a8dc498794b308a8554c22a503f28df5a398cb03 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 15 Mar 2026 12:55:38 -0500 Subject: [PATCH] feat(web): wire WebSocket chat with streaming and conversation switching (#120) - Fix socket singleton lifecycle: add destroySocket() and auto-reset on disconnect so page remounts always get a fresh authenticated connection - Add explicit WebSocket transport preference to avoid polling fallback - Filter socket events by conversationId to prevent cross-conversation bleed when switching between conversations mid-stream - Use activeIdRef pattern so event handlers see current activeId without stale closures or listener re-registration on every render - On agent:end, append accumulated text as assistant message to local state (matching TUI pattern; Pi session is in-memory so DB reload would miss it) - Preserve REST persist of user messages for conversation history on reload - Clear streaming state on conversation switch - StreamingMessage: show animated typing indicator before first token arrives Co-Authored-By: Claude Sonnet 4.6 --- apps/web/src/app/(dashboard)/chat/page.tsx | 126 +++++++++++++----- .../src/components/chat/streaming-message.tsx | 18 ++- apps/web/src/lib/socket.ts | 17 +++ 3 files changed, 119 insertions(+), 42 deletions(-) diff --git a/apps/web/src/app/(dashboard)/chat/page.tsx b/apps/web/src/app/(dashboard)/chat/page.tsx index a27a6a7..fa56994 100644 --- a/apps/web/src/app/(dashboard)/chat/page.tsx +++ b/apps/web/src/app/(dashboard)/chat/page.tsx @@ -2,7 +2,7 @@ import { useCallback, useEffect, useRef, useState } from 'react'; import { api } from '@/lib/api'; -import { getSocket } from '@/lib/socket'; +import { destroySocket, getSocket } from '@/lib/socket'; import type { Conversation, Message } from '@/lib/types'; import { ConversationList } from '@/components/chat/conversation-list'; import { MessageBubble } from '@/components/chat/message-bubble'; @@ -17,6 +17,15 @@ export default function ChatPage(): React.ReactElement { const [isStreaming, setIsStreaming] = useState(false); const messagesEndRef = useRef(null); + // Track the active conversation ID in a ref so socket event handlers always + // see the current value without needing to be re-registered. + const activeIdRef = useRef(null); + activeIdRef.current = activeId; + + // Accumulate streamed text in a ref so agent:end can read the full content + // without stale-closure issues. + const streamingTextRef = useRef(''); + // Load conversations on mount useEffect(() => { api('/api/conversations') @@ -30,6 +39,10 @@ export default function ChatPage(): React.ReactElement { setMessages([]); return; } + // Clear streaming state when switching conversations + setIsStreaming(false); + setStreamingText(''); + streamingTextRef.current = ''; api(`/api/conversations/${activeId}/messages`) .then(setMessages) .catch(() => {}); @@ -40,50 +53,81 @@ export default function ChatPage(): React.ReactElement { messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }); }, [messages, streamingText]); - // Socket.io setup + // Socket.io setup — connect once for the page lifetime useEffect(() => { const socket = getSocket(); - socket.connect(); - socket.on('agent:text', (data: { conversationId: string; text: string }) => { - setStreamingText((prev) => prev + data.text); - }); - - socket.on('agent:start', () => { + function onAgentStart(data: { conversationId: string }): void { + // Only update state if the event belongs to the currently viewed conversation + if (activeIdRef.current !== data.conversationId) return; setIsStreaming(true); setStreamingText(''); - }); + streamingTextRef.current = ''; + } - socket.on('agent:end', (data: { conversationId: string }) => { + function onAgentText(data: { conversationId: string; text: string }): void { + if (activeIdRef.current !== data.conversationId) return; + streamingTextRef.current += data.text; + setStreamingText((prev) => prev + data.text); + } + + function onAgentEnd(data: { conversationId: string }): void { + if (activeIdRef.current !== data.conversationId) return; + const finalText = streamingTextRef.current; setIsStreaming(false); setStreamingText(''); - // Reload messages to get the final persisted version - api(`/api/conversations/${data.conversationId}/messages`) - .then(setMessages) - .catch(() => {}); - }); + streamingTextRef.current = ''; + // Append the completed assistant message to the local message list. + // The Pi agent session is in-memory so the assistant response is not + // persisted to the DB — we build the local UI state instead. + if (finalText) { + setMessages((prev) => [ + ...prev, + { + id: `assistant-${Date.now()}`, + conversationId: data.conversationId, + role: 'assistant' as const, + content: finalText, + createdAt: new Date().toISOString(), + }, + ]); + } + } - socket.on('error', (data: { error: string }) => { + function onError(data: { error: string; conversationId?: string }): void { setIsStreaming(false); setStreamingText(''); + streamingTextRef.current = ''; setMessages((prev) => [ ...prev, { id: `error-${Date.now()}`, - conversationId: '', - role: 'system', + conversationId: data.conversationId ?? '', + role: 'system' as const, content: `Error: ${data.error}`, createdAt: new Date().toISOString(), }, ]); - }); + } + + socket.on('agent:start', onAgentStart); + socket.on('agent:text', onAgentText); + socket.on('agent:end', onAgentEnd); + socket.on('error', onError); + + // Connect if not already connected + if (!socket.connected) { + socket.connect(); + } return () => { - socket.off('agent:text'); - socket.off('agent:start'); - socket.off('agent:end'); - socket.off('error'); - socket.disconnect(); + socket.off('agent:start', onAgentStart); + socket.off('agent:text', onAgentText); + socket.off('agent:end', onAgentEnd); + socket.off('error', onError); + // Fully tear down the socket when the chat page unmounts so we get a + // fresh authenticated connection next time the page is visited. + destroySocket(); }; }, []); @@ -112,24 +156,34 @@ export default function ChatPage(): React.ReactElement { convId = conv.id; } - // Optimistic user message - const userMsg: Message = { - id: `temp-${Date.now()}`, - conversationId: convId, - role: 'user', - content, - createdAt: new Date().toISOString(), - }; - setMessages((prev) => [...prev, userMsg]); + // Optimistic user message in local UI state + setMessages((prev) => [ + ...prev, + { + id: `user-${Date.now()}`, + conversationId: convId, + role: 'user' as const, + content, + createdAt: new Date().toISOString(), + }, + ]); - // Persist user message - await api(`/api/conversations/${convId}/messages`, { + // Persist the user message to the DB so conversation history is + // available when the page is reloaded or a new session starts. + api(`/api/conversations/${convId}/messages`, { method: 'POST', body: { role: 'user', content }, + }).catch(() => { + // Non-fatal: the agent can still process the message even if + // REST persistence fails. }); - // Send to WebSocket for streaming response + // Send to WebSocket — gateway creates/resumes the agent session and + // streams the response back via agent:start / agent:text / agent:end. const socket = getSocket(); + if (!socket.connected) { + socket.connect(); + } socket.emit('message', { conversationId: convId, content }); }, [activeId], diff --git a/apps/web/src/components/chat/streaming-message.tsx b/apps/web/src/components/chat/streaming-message.tsx index 1f93cc0..d9cc2c7 100644 --- a/apps/web/src/components/chat/streaming-message.tsx +++ b/apps/web/src/components/chat/streaming-message.tsx @@ -5,16 +5,22 @@ interface StreamingMessageProps { text: string; } -export function StreamingMessage({ text }: StreamingMessageProps): React.ReactElement | null { - if (!text) return null; - +export function StreamingMessage({ text }: StreamingMessageProps): React.ReactElement { return (
-
{text}
+ {text ? ( +
{text}
+ ) : ( +
+ + + +
+ )}
- - Thinking... + + {text ? 'Responding...' : 'Thinking...'}
diff --git a/apps/web/src/lib/socket.ts b/apps/web/src/lib/socket.ts index 38540c3..11d16bd 100644 --- a/apps/web/src/lib/socket.ts +++ b/apps/web/src/lib/socket.ts @@ -9,7 +9,24 @@ export function getSocket(): Socket { socket = io(`${GATEWAY_URL}/chat`, { withCredentials: true, autoConnect: false, + transports: ['websocket', 'polling'], + }); + + // Reset singleton reference when socket is fully closed so the next + // getSocket() call creates a fresh instance instead of returning a + // closed/dead socket. + socket.on('disconnect', () => { + socket = null; }); } return socket; } + +/** Tear down the singleton socket and reset the reference. */ +export function destroySocket(): void { + if (socket) { + socket.offAny(); + socket.disconnect(); + socket = null; + } +}