feat(web): implement SSE chat streaming with real-time token rendering (#516)
Some checks failed
ci/woodpecker/push/web Pipeline failed

Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #516.
This commit is contained in:
2026-02-26 02:39:43 +00:00
committed by jason.woltje
parent 6290fc3d53
commit 7de0e734b0
8 changed files with 797 additions and 297 deletions

View File

@@ -14,6 +14,7 @@ import type { ChatResponse } from "@/lib/api/chat";
// Mock the API modules - use importOriginal to preserve types/enums
vi.mock("@/lib/api/chat", () => ({
sendChatMessage: vi.fn(),
streamChatMessage: vi.fn(),
}));
vi.mock("@/lib/api/ideas", async (importOriginal) => {
@@ -30,6 +31,9 @@ vi.mock("@/lib/api/ideas", async (importOriginal) => {
const mockSendChatMessage = chatApi.sendChatMessage as MockedFunction<
typeof chatApi.sendChatMessage
>;
const mockStreamChatMessage = chatApi.streamChatMessage as MockedFunction<
typeof chatApi.streamChatMessage
>;
const mockCreateConversation = ideasApi.createConversation as MockedFunction<
typeof ideasApi.createConversation
>;
@@ -70,9 +74,62 @@ function createMockIdea(id: string, title: string, content: string): Idea {
} as Idea;
}
/**
* Configure streamChatMessage to immediately fail,
* triggering the fallback to sendChatMessage.
*/
function makeStreamFail(): void {
mockStreamChatMessage.mockImplementation(
(
_request,
_onChunk,
_onComplete,
onError: (err: Error) => void,
_signal?: AbortSignal
): void => {
// Call synchronously so the Promise rejects immediately
onError(new Error("Streaming not available"));
}
);
}
/**
* Configure streamChatMessage to succeed with given tokens.
* Uses a ref-style object to share cancellation state across the async boundary.
*/
function makeStreamSucceed(tokens: string[]): void {
mockStreamChatMessage.mockImplementation(
(
_request,
onChunk: (chunk: string) => void,
onComplete: () => void,
_onError: (err: Error) => void,
signal?: AbortSignal
): void => {
const state = { cancelled: false };
signal?.addEventListener("abort", () => {
state.cancelled = true;
});
const run = async (): Promise<void> => {
for (const token of tokens) {
if (state.cancelled) return;
await Promise.resolve();
onChunk(token);
}
if (!state.cancelled) {
onComplete();
}
};
void run();
}
);
}
describe("useChat", () => {
beforeEach(() => {
vi.clearAllMocks();
// Default: streaming fails so tests exercise the fallback path
makeStreamFail();
});
afterEach(() => {
@@ -87,13 +144,19 @@ describe("useChat", () => {
expect(result.current.messages[0]?.role).toBe("assistant");
expect(result.current.messages[0]?.id).toBe("welcome");
expect(result.current.isLoading).toBe(false);
expect(result.current.isStreaming).toBe(false);
expect(result.current.error).toBeNull();
expect(result.current.conversationId).toBeNull();
});
it("should expose abortStream function", () => {
const { result } = renderHook(() => useChat());
expect(typeof result.current.abortStream).toBe("function");
});
});
describe("sendMessage", () => {
it("should add user message and assistant response", async () => {
describe("sendMessage (fallback path when streaming fails)", () => {
it("should add user message and assistant response via fallback", async () => {
mockSendChatMessage.mockResolvedValueOnce(createMockChatResponse("Hello there!"));
mockCreateConversation.mockResolvedValueOnce(createMockIdea("conv-1", "Test", ""));
@@ -119,47 +182,13 @@ describe("useChat", () => {
});
expect(mockSendChatMessage).not.toHaveBeenCalled();
expect(mockStreamChatMessage).not.toHaveBeenCalled();
expect(result.current.messages).toHaveLength(1); // only welcome
});
it("should not send while loading", async () => {
let resolveFirst: ((value: ChatResponse) => void) | undefined;
const firstPromise = new Promise<ChatResponse>((resolve) => {
resolveFirst = resolve;
});
mockSendChatMessage.mockReturnValueOnce(firstPromise);
const { result } = renderHook(() => useChat());
// Start first message
act(() => {
void result.current.sendMessage("First");
});
expect(result.current.isLoading).toBe(true);
// Try to send second while loading
await act(async () => {
await result.current.sendMessage("Second");
});
// Should only have one call
expect(mockSendChatMessage).toHaveBeenCalledTimes(1);
// Cleanup - resolve the pending promise
mockCreateConversation.mockResolvedValueOnce(createMockIdea("conv-1", "Test", ""));
await act(async () => {
if (resolveFirst) {
resolveFirst(createMockChatResponse("Response"));
}
// Allow promise to settle
await Promise.resolve();
});
});
it("should handle API errors gracefully", async () => {
vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
mockSendChatMessage.mockRejectedValueOnce(new Error("API Error"));
const onError = vi.fn();
@@ -171,46 +200,178 @@ describe("useChat", () => {
expect(result.current.error).toBe("Unable to send message. Please try again.");
expect(onError).toHaveBeenCalledWith(expect.any(Error));
// Should have welcome + user + error message
expect(result.current.messages).toHaveLength(3);
expect(result.current.messages[2]?.content).toBe("Something went wrong. Please try again.");
});
});
describe("streaming path", () => {
it("should stream tokens into assistant message", async () => {
const tokens = ["Hello", " world", "!"];
makeStreamSucceed(tokens);
mockCreateConversation.mockResolvedValueOnce(createMockIdea("conv-1", "Test", ""));
const { result } = renderHook(() => useChat());
await act(async () => {
await result.current.sendMessage("Hi");
});
expect(result.current.messages).toHaveLength(3);
expect(result.current.messages[2]?.role).toBe("assistant");
expect(result.current.messages[2]?.content).toBe("Hello world!");
});
it("should set isStreaming true during streaming then false when done", async () => {
let capturedOnChunk: ((chunk: string) => void) | undefined;
let capturedOnComplete: (() => void) | undefined;
mockStreamChatMessage.mockImplementation(
(
_request,
onChunk: (chunk: string) => void,
onComplete: () => void,
_onError: (err: Error) => void
): void => {
capturedOnChunk = onChunk;
capturedOnComplete = onComplete;
}
);
mockCreateConversation.mockResolvedValueOnce(createMockIdea("conv-1", "Test", ""));
const { result } = renderHook(() => useChat());
let sendDone = false;
act(() => {
void result.current.sendMessage("Hello").then(() => {
sendDone = true;
});
});
// Send first token (triggers streaming state)
await act(async () => {
capturedOnChunk?.("Hello");
await Promise.resolve();
});
expect(result.current.isStreaming).toBe(true);
// Complete the stream
await act(async () => {
capturedOnComplete?.();
await Promise.resolve();
await Promise.resolve();
await Promise.resolve();
});
expect(result.current.isStreaming).toBe(false);
expect(sendDone).toBe(true);
});
it("should keep partial content on abort", async () => {
let capturedOnChunk: ((chunk: string) => void) | undefined;
mockStreamChatMessage.mockImplementation(
(
_request,
onChunk: (chunk: string) => void,
_onComplete: () => void,
_onError: (err: Error) => void,
signal?: AbortSignal
): void => {
capturedOnChunk = onChunk;
if (signal) {
signal.addEventListener("abort", () => {
// Stream aborted
});
}
}
);
const { result } = renderHook(() => useChat());
act(() => {
void result.current.sendMessage("Hello");
});
await act(async () => {
capturedOnChunk?.("Partial");
capturedOnChunk?.(" content");
await Promise.resolve();
});
await act(async () => {
result.current.abortStream();
await Promise.resolve();
});
expect(result.current.isStreaming).toBe(false);
const assistantMsg = result.current.messages.find(
(m) => m.role === "assistant" && m.id !== "welcome"
);
expect(assistantMsg?.content).toBe("Partial content");
});
it("should not send while streaming", async () => {
let capturedOnChunk: ((chunk: string) => void) | undefined;
mockStreamChatMessage.mockImplementation(
(
_request,
onChunk: (chunk: string) => void,
_onComplete: () => void,
_onError: (err: Error) => void
): void => {
capturedOnChunk = onChunk;
}
);
const { result } = renderHook(() => useChat());
act(() => {
void result.current.sendMessage("First");
});
await act(async () => {
capturedOnChunk?.("token");
await Promise.resolve();
});
expect(result.current.isStreaming).toBe(true);
await act(async () => {
await result.current.sendMessage("Second");
});
// Only one stream call
expect(mockStreamChatMessage).toHaveBeenCalledTimes(1);
});
});
describe("rapid sends - stale closure prevention", () => {
it("should not lose messages on rapid sequential sends", async () => {
// This test verifies that functional state updates prevent message loss
// when multiple messages are sent in quick succession
let callCount = 0;
mockSendChatMessage.mockImplementation(async (): Promise<ChatResponse> => {
callCount++;
// Small delay to simulate network
await Promise.resolve();
return createMockChatResponse(`Response ${String(callCount)}`);
});
// Use streaming success path for deterministic behavior
makeStreamSucceed(["Response 1"]);
mockCreateConversation.mockResolvedValue(createMockIdea("conv-1", "Test", ""));
const { result } = renderHook(() => useChat());
// Send first message
await act(async () => {
await result.current.sendMessage("Message 1");
});
// Verify first message cycle complete
expect(result.current.messages).toHaveLength(3); // welcome + user1 + assistant1
// Send second message
makeStreamSucceed(["Response 2"]);
await act(async () => {
await result.current.sendMessage("Message 2");
});
// Verify all messages are present (no data loss)
expect(result.current.messages).toHaveLength(5); // welcome + user1 + assistant1 + user2 + assistant2
// Verify message order and content
const userMessages = result.current.messages.filter((m) => m.role === "user");
expect(userMessages).toHaveLength(2);
expect(userMessages[0]?.content).toBe("Message 1");
@@ -218,69 +379,56 @@ describe("useChat", () => {
});
it("should use functional updates for all message state changes", async () => {
// This test verifies that the implementation uses functional updates
// by checking that messages accumulate correctly
mockSendChatMessage.mockResolvedValue(createMockChatResponse("Response"));
mockCreateConversation.mockResolvedValue(createMockIdea("conv-1", "Test", ""));
const { result } = renderHook(() => useChat());
// Track message count after each operation
const messageCounts: number[] = [];
makeStreamSucceed(["R1"]);
await act(async () => {
await result.current.sendMessage("Test 1");
});
messageCounts.push(result.current.messages.length);
makeStreamSucceed(["R2"]);
await act(async () => {
await result.current.sendMessage("Test 2");
});
messageCounts.push(result.current.messages.length);
makeStreamSucceed(["R3"]);
await act(async () => {
await result.current.sendMessage("Test 3");
});
messageCounts.push(result.current.messages.length);
// Should accumulate: 3, 5, 7 (welcome + pairs of user/assistant)
expect(messageCounts).toEqual([3, 5, 7]);
// Verify final state has all messages
expect(result.current.messages).toHaveLength(7);
const userMessages = result.current.messages.filter((m) => m.role === "user");
expect(userMessages).toHaveLength(3);
});
it("should maintain correct message order with ref-based state tracking", async () => {
// This test verifies that messagesRef is properly synchronized
const responses = ["First response", "Second response", "Third response"];
let responseIndex = 0;
mockSendChatMessage.mockImplementation((): Promise<ChatResponse> => {
const response = responses[responseIndex++];
return Promise.resolve(createMockChatResponse(response ?? ""));
});
mockCreateConversation.mockResolvedValue(createMockIdea("conv-1", "Test", ""));
const { result } = renderHook(() => useChat());
makeStreamSucceed(["First response"]);
await act(async () => {
await result.current.sendMessage("Query 1");
});
makeStreamSucceed(["Second response"]);
await act(async () => {
await result.current.sendMessage("Query 2");
});
makeStreamSucceed(["Third response"]);
await act(async () => {
await result.current.sendMessage("Query 3");
});
// Verify messages are in correct order
const messages = result.current.messages;
expect(messages[0]?.id).toBe("welcome");
expect(messages[1]?.content).toBe("Query 1");
@@ -337,14 +485,12 @@ describe("useChat", () => {
await result.current.loadConversation("conv-bad");
});
// Should fall back to welcome message
expect(result.current.messages).toHaveLength(1);
expect(result.current.messages[0]?.id).toBe("welcome");
});
it("should fall back to welcome message when stored data has wrong shape", async () => {
vi.spyOn(console, "warn").mockImplementation(() => undefined);
// Valid JSON but wrong shape (object instead of array, missing required fields)
mockGetIdea.mockResolvedValueOnce(
createMockIdea("conv-bad", "Wrong Shape", JSON.stringify({ not: "an array" }))
);
@@ -408,7 +554,6 @@ describe("useChat", () => {
const { result } = renderHook(() => useChat());
// Should resolve without throwing - errors are handled internally
await act(async () => {
await expect(result.current.loadConversation("conv-err")).resolves.toBeUndefined();
});
@@ -419,19 +564,17 @@ describe("useChat", () => {
describe("startNewConversation", () => {
it("should reset to initial state", async () => {
mockSendChatMessage.mockResolvedValueOnce(createMockChatResponse("Response"));
makeStreamSucceed(["Response"]);
mockCreateConversation.mockResolvedValueOnce(createMockIdea("conv-1", "Test", ""));
const { result } = renderHook(() => useChat());
// Send a message to have some state
await act(async () => {
await result.current.sendMessage("Hello");
});
expect(result.current.messages.length).toBeGreaterThan(1);
// Start new conversation
act(() => {
result.current.startNewConversation();
});
@@ -446,6 +589,7 @@ describe("useChat", () => {
describe("clearError", () => {
it("should clear error state", async () => {
vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
mockSendChatMessage.mockRejectedValueOnce(new Error("Test error"));
const { result } = renderHook(() => useChat());
@@ -467,6 +611,7 @@ describe("useChat", () => {
describe("error context logging", () => {
it("should log comprehensive error context when sendMessage fails", async () => {
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
mockSendChatMessage.mockRejectedValueOnce(new Error("LLM timeout"));
const { result } = renderHook(() => useChat({ model: "llama3.2" }));
@@ -489,6 +634,7 @@ describe("useChat", () => {
it("should truncate long message previews to 50 characters", async () => {
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
mockSendChatMessage.mockRejectedValueOnce(new Error("Failed"));
const longMessage = "A".repeat(100);
@@ -509,9 +655,10 @@ describe("useChat", () => {
it("should include message count in error context", async () => {
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
// First successful message
mockSendChatMessage.mockResolvedValueOnce(createMockChatResponse("OK"));
// First successful message via streaming
makeStreamSucceed(["OK"]);
mockCreateConversation.mockResolvedValueOnce(createMockIdea("conv-1", "Test", ""));
const { result } = renderHook(() => useChat());
@@ -520,14 +667,14 @@ describe("useChat", () => {
await result.current.sendMessage("First");
});
// Second message fails
// Second message: streaming fails, fallback fails
makeStreamFail();
mockSendChatMessage.mockRejectedValueOnce(new Error("Fail"));
await act(async () => {
await result.current.sendMessage("Second");
});
// messageCount should reflect messages including the new user message
expect(consoleSpy).toHaveBeenCalledWith(
"Failed to send chat message",
expect.objectContaining({
@@ -540,6 +687,7 @@ describe("useChat", () => {
describe("LLM vs persistence error separation", () => {
it("should show LLM error and add error message to chat when API fails", async () => {
vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
mockSendChatMessage.mockRejectedValueOnce(new Error("Model not available"));
const { result } = renderHook(() => useChat());
@@ -549,13 +697,29 @@ describe("useChat", () => {
});
expect(result.current.error).toBe("Unable to send message. Please try again.");
// Should have welcome + user + error message
expect(result.current.messages).toHaveLength(3);
expect(result.current.messages[2]?.content).toBe("Something went wrong. Please try again.");
});
it("should keep assistant message visible when save fails", async () => {
it("should keep assistant message visible when save fails (streaming path)", async () => {
vi.spyOn(console, "error").mockImplementation(() => undefined);
makeStreamSucceed(["Great answer!"]);
mockCreateConversation.mockRejectedValueOnce(new Error("Database connection lost"));
const { result } = renderHook(() => useChat());
await act(async () => {
await result.current.sendMessage("Hello");
});
expect(result.current.messages).toHaveLength(3); // welcome + user + assistant
expect(result.current.messages[2]?.content).toBe("Great answer!");
expect(result.current.error).toContain("Message sent but failed to save");
});
it("should keep assistant message visible when save fails (fallback path)", async () => {
vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
mockSendChatMessage.mockResolvedValueOnce(createMockChatResponse("Great answer!"));
mockCreateConversation.mockRejectedValueOnce(new Error("Database connection lost"));
@@ -565,16 +729,14 @@ describe("useChat", () => {
await result.current.sendMessage("Hello");
});
// Assistant message should still be visible
expect(result.current.messages).toHaveLength(3); // welcome + user + assistant
expect(result.current.messages).toHaveLength(3);
expect(result.current.messages[2]?.content).toBe("Great answer!");
// Error should indicate persistence failure
expect(result.current.error).toContain("Message sent but failed to save");
});
it("should log with PERSISTENCE_ERROR type when save fails", async () => {
const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
mockSendChatMessage.mockResolvedValueOnce(createMockChatResponse("Response"));
mockCreateConversation.mockRejectedValueOnce(new Error("DB error"));
@@ -591,7 +753,6 @@ describe("useChat", () => {
})
);
// Should NOT have logged as LLM_ERROR
const llmErrorCalls = consoleSpy.mock.calls.filter((call) => {
const ctx: unknown = call[1];
return (
@@ -606,8 +767,9 @@ describe("useChat", () => {
it("should use different user-facing messages for LLM vs save errors", async () => {
vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
// Test LLM error message
// LLM error path (streaming fails + fallback fails)
mockSendChatMessage.mockRejectedValueOnce(new Error("Timeout"));
const { result: result1 } = renderHook(() => useChat());
@@ -617,8 +779,8 @@ describe("useChat", () => {
const llmError = result1.current.error;
// Test save error message
mockSendChatMessage.mockResolvedValueOnce(createMockChatResponse("OK"));
// Save error path (streaming succeeds, save fails)
makeStreamSucceed(["OK"]);
mockCreateConversation.mockRejectedValueOnce(new Error("DB down"));
const { result: result2 } = renderHook(() => useChat());
@@ -628,7 +790,6 @@ describe("useChat", () => {
const saveError = result2.current.error;
// They should be different
expect(llmError).toBe("Unable to send message. Please try again.");
expect(saveError).toContain("Message sent but failed to save");
expect(llmError).not.toEqual(saveError);
@@ -636,6 +797,7 @@ describe("useChat", () => {
it("should handle non-Error throws from LLM API", async () => {
vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
mockSendChatMessage.mockRejectedValueOnce("string error");
const onError = vi.fn();
@@ -652,7 +814,8 @@ describe("useChat", () => {
it("should handle non-Error throws from persistence layer", async () => {
vi.spyOn(console, "error").mockImplementation(() => undefined);
mockSendChatMessage.mockResolvedValueOnce(createMockChatResponse("OK"));
vi.spyOn(console, "warn").mockImplementation(() => undefined);
makeStreamSucceed(["OK"]);
mockCreateConversation.mockRejectedValueOnce("DB string error");
const onError = vi.fn();
@@ -662,7 +825,6 @@ describe("useChat", () => {
await result.current.sendMessage("Hello");
});
// Assistant message should still be visible
expect(result.current.messages[2]?.content).toBe("OK");
expect(result.current.error).toBe("Message sent but failed to save. Please try again.");
expect(onError).toHaveBeenCalledWith(expect.any(Error));
@@ -670,8 +832,9 @@ describe("useChat", () => {
it("should handle updateConversation failure for existing conversations", async () => {
vi.spyOn(console, "error").mockImplementation(() => undefined);
vi.spyOn(console, "warn").mockImplementation(() => undefined);
// First message succeeds and creates conversation
// First message via fallback
mockSendChatMessage.mockResolvedValueOnce(createMockChatResponse("First response"));
mockCreateConversation.mockResolvedValueOnce(createMockIdea("conv-1", "Test", ""));
@@ -683,7 +846,8 @@ describe("useChat", () => {
expect(result.current.conversationId).toBe("conv-1");
// Second message succeeds but updateConversation fails
// Second message via fallback, updateConversation fails
makeStreamFail();
mockSendChatMessage.mockResolvedValueOnce(createMockChatResponse("Second response"));
mockUpdateConversation.mockRejectedValueOnce(new Error("Connection reset"));
@@ -691,8 +855,10 @@ describe("useChat", () => {
await result.current.sendMessage("Second");
});
// Assistant message should still be visible
expect(result.current.messages[4]?.content).toBe("Second response");
const assistantMessages = result.current.messages.filter(
(m) => m.role === "assistant" && m.id !== "welcome"
);
expect(assistantMessages[assistantMessages.length - 1]?.content).toBe("Second response");
expect(result.current.error).toBe("Message sent but failed to save. Please try again.");
});
});