feat(web): wire WebSocket chat with streaming and conversation switching (#120)

- Fix socket singleton lifecycle: add destroySocket() and auto-reset on disconnect
  so page remounts always get a fresh authenticated connection
- Add explicit WebSocket transport preference to avoid polling fallback
- Filter socket events by conversationId to prevent cross-conversation bleed
  when switching between conversations mid-stream
- Use activeIdRef pattern so event handlers see current activeId without
  stale closures or listener re-registration on every render
- On agent:end, append accumulated text as assistant message to local state
  (matching TUI pattern; Pi session is in-memory so DB reload would miss it)
- Preserve REST persist of user messages for conversation history on reload
- Clear streaming state on conversation switch
- StreamingMessage: show animated typing indicator before first token arrives

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-15 12:55:38 -05:00
parent ea800e3f14
commit a8dc498794
3 changed files with 119 additions and 42 deletions

View File

@@ -2,7 +2,7 @@
import { useCallback, useEffect, useRef, useState } from 'react';
import { api } from '@/lib/api';
import { getSocket } from '@/lib/socket';
import { destroySocket, getSocket } from '@/lib/socket';
import type { Conversation, Message } from '@/lib/types';
import { ConversationList } from '@/components/chat/conversation-list';
import { MessageBubble } from '@/components/chat/message-bubble';
@@ -17,6 +17,15 @@ export default function ChatPage(): React.ReactElement {
const [isStreaming, setIsStreaming] = useState(false);
const messagesEndRef = useRef<HTMLDivElement>(null);
// Track the active conversation ID in a ref so socket event handlers always
// see the current value without needing to be re-registered.
const activeIdRef = useRef<string | null>(null);
activeIdRef.current = activeId;
// Accumulate streamed text in a ref so agent:end can read the full content
// without stale-closure issues.
const streamingTextRef = useRef('');
// Load conversations on mount
useEffect(() => {
api<Conversation[]>('/api/conversations')
@@ -30,6 +39,10 @@ export default function ChatPage(): React.ReactElement {
setMessages([]);
return;
}
// Clear streaming state when switching conversations
setIsStreaming(false);
setStreamingText('');
streamingTextRef.current = '';
api<Message[]>(`/api/conversations/${activeId}/messages`)
.then(setMessages)
.catch(() => {});
@@ -40,50 +53,81 @@ export default function ChatPage(): React.ReactElement {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages, streamingText]);
// Socket.io setup
// Socket.io setup — connect once for the page lifetime
useEffect(() => {
const socket = getSocket();
socket.connect();
socket.on('agent:text', (data: { conversationId: string; text: string }) => {
setStreamingText((prev) => prev + data.text);
});
socket.on('agent:start', () => {
function onAgentStart(data: { conversationId: string }): void {
// Only update state if the event belongs to the currently viewed conversation
if (activeIdRef.current !== data.conversationId) return;
setIsStreaming(true);
setStreamingText('');
});
streamingTextRef.current = '';
}
socket.on('agent:end', (data: { conversationId: string }) => {
function onAgentText(data: { conversationId: string; text: string }): void {
if (activeIdRef.current !== data.conversationId) return;
streamingTextRef.current += data.text;
setStreamingText((prev) => prev + data.text);
}
function onAgentEnd(data: { conversationId: string }): void {
if (activeIdRef.current !== data.conversationId) return;
const finalText = streamingTextRef.current;
setIsStreaming(false);
setStreamingText('');
// Reload messages to get the final persisted version
api<Message[]>(`/api/conversations/${data.conversationId}/messages`)
.then(setMessages)
.catch(() => {});
});
streamingTextRef.current = '';
// Append the completed assistant message to the local message list.
// The Pi agent session is in-memory so the assistant response is not
// persisted to the DB — we build the local UI state instead.
if (finalText) {
setMessages((prev) => [
...prev,
{
id: `assistant-${Date.now()}`,
conversationId: data.conversationId,
role: 'assistant' as const,
content: finalText,
createdAt: new Date().toISOString(),
},
]);
}
}
socket.on('error', (data: { error: string }) => {
function onError(data: { error: string; conversationId?: string }): void {
setIsStreaming(false);
setStreamingText('');
streamingTextRef.current = '';
setMessages((prev) => [
...prev,
{
id: `error-${Date.now()}`,
conversationId: '',
role: 'system',
conversationId: data.conversationId ?? '',
role: 'system' as const,
content: `Error: ${data.error}`,
createdAt: new Date().toISOString(),
},
]);
});
}
socket.on('agent:start', onAgentStart);
socket.on('agent:text', onAgentText);
socket.on('agent:end', onAgentEnd);
socket.on('error', onError);
// Connect if not already connected
if (!socket.connected) {
socket.connect();
}
return () => {
socket.off('agent:text');
socket.off('agent:start');
socket.off('agent:end');
socket.off('error');
socket.disconnect();
socket.off('agent:start', onAgentStart);
socket.off('agent:text', onAgentText);
socket.off('agent:end', onAgentEnd);
socket.off('error', onError);
// Fully tear down the socket when the chat page unmounts so we get a
// fresh authenticated connection next time the page is visited.
destroySocket();
};
}, []);
@@ -112,24 +156,34 @@ export default function ChatPage(): React.ReactElement {
convId = conv.id;
}
// Optimistic user message
const userMsg: Message = {
id: `temp-${Date.now()}`,
// Optimistic user message in local UI state
setMessages((prev) => [
...prev,
{
id: `user-${Date.now()}`,
conversationId: convId,
role: 'user',
role: 'user' as const,
content,
createdAt: new Date().toISOString(),
};
setMessages((prev) => [...prev, userMsg]);
},
]);
// Persist user message
await api<Message>(`/api/conversations/${convId}/messages`, {
// Persist the user message to the DB so conversation history is
// available when the page is reloaded or a new session starts.
api<Message>(`/api/conversations/${convId}/messages`, {
method: 'POST',
body: { role: 'user', content },
}).catch(() => {
// Non-fatal: the agent can still process the message even if
// REST persistence fails.
});
// Send to WebSocket for streaming response
// Send to WebSocket — gateway creates/resumes the agent session and
// streams the response back via agent:start / agent:text / agent:end.
const socket = getSocket();
if (!socket.connected) {
socket.connect();
}
socket.emit('message', { conversationId: convId, content });
},
[activeId],

View File

@@ -5,16 +5,22 @@ interface StreamingMessageProps {
text: string;
}
export function StreamingMessage({ text }: StreamingMessageProps): React.ReactElement | null {
if (!text) return null;
export function StreamingMessage({ text }: StreamingMessageProps): React.ReactElement {
return (
<div className="flex justify-start">
<div className="max-w-[75%] rounded-xl border border-surface-border bg-surface-elevated px-4 py-3 text-sm text-text-primary">
{text ? (
<div className="whitespace-pre-wrap break-words">{text}</div>
<div className="mt-1 flex items-center gap-1 text-xs text-text-muted">
) : (
<div className="flex items-center gap-2 text-text-muted">
<span className="inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500" />
Thinking...
<span className="inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500 [animation-delay:0.2s]" />
<span className="inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500 [animation-delay:0.4s]" />
</div>
)}
<div className="mt-1 flex items-center gap-1 text-xs text-text-muted">
<span className="inline-block h-1.5 w-1.5 animate-pulse rounded-full bg-blue-500" />
{text ? 'Responding...' : 'Thinking...'}
</div>
</div>
</div>

View File

@@ -9,7 +9,24 @@ export function getSocket(): Socket {
socket = io(`${GATEWAY_URL}/chat`, {
withCredentials: true,
autoConnect: false,
transports: ['websocket', 'polling'],
});
// Reset singleton reference when socket is fully closed so the next
// getSocket() call creates a fresh instance instead of returning a
// closed/dead socket.
socket.on('disconnect', () => {
socket = null;
});
}
return socket;
}
/** Tear down the singleton socket and reset the reference. */
export function destroySocket(): void {
if (socket) {
socket.offAny();
socket.disconnect();
socket = null;
}
}