feat(web): wire WebSocket chat with streaming and conversation switching (#120)
- Fix socket singleton lifecycle: add destroySocket() and auto-reset on disconnect so page remounts always get a fresh authenticated connection - Add explicit WebSocket transport preference to avoid polling fallback - Filter socket events by conversationId to prevent cross-conversation bleed when switching between conversations mid-stream - Use activeIdRef pattern so event handlers see current activeId without stale closures or listener re-registration on every render - On agent:end, append accumulated text as assistant message to local state (matching TUI pattern; Pi session is in-memory so DB reload would miss it) - Preserve REST persist of user messages for conversation history on reload - Clear streaming state on conversation switch - StreamingMessage: show animated typing indicator before first token arrives Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,7 +2,7 @@
|
||||
|
||||
import { useCallback, useEffect, useRef, useState } from 'react';
|
||||
import { api } from '@/lib/api';
|
||||
import { getSocket } from '@/lib/socket';
|
||||
import { destroySocket, getSocket } from '@/lib/socket';
|
||||
import type { Conversation, Message } from '@/lib/types';
|
||||
import { ConversationList } from '@/components/chat/conversation-list';
|
||||
import { MessageBubble } from '@/components/chat/message-bubble';
|
||||
@@ -17,6 +17,15 @@ export default function ChatPage(): React.ReactElement {
|
||||
const [isStreaming, setIsStreaming] = useState(false);
|
||||
const messagesEndRef = useRef<HTMLDivElement>(null);
|
||||
|
||||
// Track the active conversation ID in a ref so socket event handlers always
|
||||
// see the current value without needing to be re-registered.
|
||||
const activeIdRef = useRef<string | null>(null);
|
||||
activeIdRef.current = activeId;
|
||||
|
||||
// Accumulate streamed text in a ref so agent:end can read the full content
|
||||
// without stale-closure issues.
|
||||
const streamingTextRef = useRef('');
|
||||
|
||||
// Load conversations on mount
|
||||
useEffect(() => {
|
||||
api<Conversation[]>('/api/conversations')
|
||||
@@ -30,6 +39,10 @@ export default function ChatPage(): React.ReactElement {
|
||||
setMessages([]);
|
||||
return;
|
||||
}
|
||||
// Clear streaming state when switching conversations
|
||||
setIsStreaming(false);
|
||||
setStreamingText('');
|
||||
streamingTextRef.current = '';
|
||||
api<Message[]>(`/api/conversations/${activeId}/messages`)
|
||||
.then(setMessages)
|
||||
.catch(() => {});
|
||||
@@ -40,50 +53,81 @@ export default function ChatPage(): React.ReactElement {
|
||||
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
|
||||
}, [messages, streamingText]);
|
||||
|
||||
// Socket.io setup
|
||||
// Socket.io setup — connect once for the page lifetime
|
||||
useEffect(() => {
|
||||
const socket = getSocket();
|
||||
socket.connect();
|
||||
|
||||
socket.on('agent:text', (data: { conversationId: string; text: string }) => {
|
||||
setStreamingText((prev) => prev + data.text);
|
||||
});
|
||||
|
||||
socket.on('agent:start', () => {
|
||||
function onAgentStart(data: { conversationId: string }): void {
|
||||
// Only update state if the event belongs to the currently viewed conversation
|
||||
if (activeIdRef.current !== data.conversationId) return;
|
||||
setIsStreaming(true);
|
||||
setStreamingText('');
|
||||
});
|
||||
streamingTextRef.current = '';
|
||||
}
|
||||
|
||||
socket.on('agent:end', (data: { conversationId: string }) => {
|
||||
function onAgentText(data: { conversationId: string; text: string }): void {
|
||||
if (activeIdRef.current !== data.conversationId) return;
|
||||
streamingTextRef.current += data.text;
|
||||
setStreamingText((prev) => prev + data.text);
|
||||
}
|
||||
|
||||
function onAgentEnd(data: { conversationId: string }): void {
|
||||
if (activeIdRef.current !== data.conversationId) return;
|
||||
const finalText = streamingTextRef.current;
|
||||
setIsStreaming(false);
|
||||
setStreamingText('');
|
||||
// Reload messages to get the final persisted version
|
||||
api<Message[]>(`/api/conversations/${data.conversationId}/messages`)
|
||||
.then(setMessages)
|
||||
.catch(() => {});
|
||||
});
|
||||
streamingTextRef.current = '';
|
||||
// Append the completed assistant message to the local message list.
|
||||
// The Pi agent session is in-memory so the assistant response is not
|
||||
// persisted to the DB — we build the local UI state instead.
|
||||
if (finalText) {
|
||||
setMessages((prev) => [
|
||||
...prev,
|
||||
{
|
||||
id: `assistant-${Date.now()}`,
|
||||
conversationId: data.conversationId,
|
||||
role: 'assistant' as const,
|
||||
content: finalText,
|
||||
createdAt: new Date().toISOString(),
|
||||
},
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
socket.on('error', (data: { error: string }) => {
|
||||
function onError(data: { error: string; conversationId?: string }): void {
|
||||
setIsStreaming(false);
|
||||
setStreamingText('');
|
||||
streamingTextRef.current = '';
|
||||
setMessages((prev) => [
|
||||
...prev,
|
||||
{
|
||||
id: `error-${Date.now()}`,
|
||||
conversationId: '',
|
||||
role: 'system',
|
||||
conversationId: data.conversationId ?? '',
|
||||
role: 'system' as const,
|
||||
content: `Error: ${data.error}`,
|
||||
createdAt: new Date().toISOString(),
|
||||
},
|
||||
]);
|
||||
});
|
||||
}
|
||||
|
||||
socket.on('agent:start', onAgentStart);
|
||||
socket.on('agent:text', onAgentText);
|
||||
socket.on('agent:end', onAgentEnd);
|
||||
socket.on('error', onError);
|
||||
|
||||
// Connect if not already connected
|
||||
if (!socket.connected) {
|
||||
socket.connect();
|
||||
}
|
||||
|
||||
return () => {
|
||||
socket.off('agent:text');
|
||||
socket.off('agent:start');
|
||||
socket.off('agent:end');
|
||||
socket.off('error');
|
||||
socket.disconnect();
|
||||
socket.off('agent:start', onAgentStart);
|
||||
socket.off('agent:text', onAgentText);
|
||||
socket.off('agent:end', onAgentEnd);
|
||||
socket.off('error', onError);
|
||||
// Fully tear down the socket when the chat page unmounts so we get a
|
||||
// fresh authenticated connection next time the page is visited.
|
||||
destroySocket();
|
||||
};
|
||||
}, []);
|
||||
|
||||
@@ -112,24 +156,34 @@ export default function ChatPage(): React.ReactElement {
|
||||
convId = conv.id;
|
||||
}
|
||||
|
||||
// Optimistic user message
|
||||
const userMsg: Message = {
|
||||
id: `temp-${Date.now()}`,
|
||||
// Optimistic user message in local UI state
|
||||
setMessages((prev) => [
|
||||
...prev,
|
||||
{
|
||||
id: `user-${Date.now()}`,
|
||||
conversationId: convId,
|
||||
role: 'user',
|
||||
role: 'user' as const,
|
||||
content,
|
||||
createdAt: new Date().toISOString(),
|
||||
};
|
||||
setMessages((prev) => [...prev, userMsg]);
|
||||
},
|
||||
]);
|
||||
|
||||
// Persist user message
|
||||
await api<Message>(`/api/conversations/${convId}/messages`, {
|
||||
// Persist the user message to the DB so conversation history is
|
||||
// available when the page is reloaded or a new session starts.
|
||||
api<Message>(`/api/conversations/${convId}/messages`, {
|
||||
method: 'POST',
|
||||
body: { role: 'user', content },
|
||||
}).catch(() => {
|
||||
// Non-fatal: the agent can still process the message even if
|
||||
// REST persistence fails.
|
||||
});
|
||||
|
||||
// Send to WebSocket for streaming response
|
||||
// Send to WebSocket — gateway creates/resumes the agent session and
|
||||
// streams the response back via agent:start / agent:text / agent:end.
|
||||
const socket = getSocket();
|
||||
if (!socket.connected) {
|
||||
socket.connect();
|
||||
}
|
||||
socket.emit('message', { conversationId: convId, content });
|
||||
},
|
||||
[activeId],
|
||||
|
||||
@@ -5,16 +5,22 @@ interface StreamingMessageProps {
|
||||
text: string;
|
||||
}
|
||||
|
||||
export function StreamingMessage({ text }: StreamingMessageProps): React.ReactElement | null {
|
||||
if (!text) return null;
|
||||
|
||||
export function StreamingMessage({ text }: StreamingMessageProps): React.ReactElement {
|
||||
return (
|
||||
<div className="flex justify-start">
|
||||
<div className="max-w-[75%] rounded-xl border border-surface-border bg-surface-elevated px-4 py-3 text-sm text-text-primary">
|
||||
{text ? (
|
||||
<div className="whitespace-pre-wrap break-words">{text}</div>
|
||||
<div className="mt-1 flex items-center gap-1 text-xs text-text-muted">
|
||||
) : (
|
||||
<div className="flex items-center gap-2 text-text-muted">
|
||||
<span className="inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500" />
|
||||
Thinking...
|
||||
<span className="inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500 [animation-delay:0.2s]" />
|
||||
<span className="inline-block h-2 w-2 animate-pulse rounded-full bg-blue-500 [animation-delay:0.4s]" />
|
||||
</div>
|
||||
)}
|
||||
<div className="mt-1 flex items-center gap-1 text-xs text-text-muted">
|
||||
<span className="inline-block h-1.5 w-1.5 animate-pulse rounded-full bg-blue-500" />
|
||||
{text ? 'Responding...' : 'Thinking...'}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -9,7 +9,24 @@ export function getSocket(): Socket {
|
||||
socket = io(`${GATEWAY_URL}/chat`, {
|
||||
withCredentials: true,
|
||||
autoConnect: false,
|
||||
transports: ['websocket', 'polling'],
|
||||
});
|
||||
|
||||
// Reset singleton reference when socket is fully closed so the next
|
||||
// getSocket() call creates a fresh instance instead of returning a
|
||||
// closed/dead socket.
|
||||
socket.on('disconnect', () => {
|
||||
socket = null;
|
||||
});
|
||||
}
|
||||
return socket;
|
||||
}
|
||||
|
||||
/** Tear down the singleton socket and reset the reference. */
|
||||
export function destroySocket(): void {
|
||||
if (socket) {
|
||||
socket.offAny();
|
||||
socket.disconnect();
|
||||
socket = null;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user