From 93cd31435bc868dc31cd21a9e9d353d477644427 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 15 Feb 2026 02:34:36 -0600 Subject: [PATCH] feat(#383): Streaming AI responses via Matrix message edits - Add MatrixStreamingService with editMessage, setTypingIndicator, streamResponse - Rate-limited edits (500ms) for incremental streaming output - Typing indicator management during generation - Graceful error handling and fallback for non-streaming scenarios - Add optional editMessage to IChatProvider interface - Add getClient() accessor to MatrixService for streaming service - Register MatrixStreamingService in BridgeModule - Tests: 20 tests pass Refs #383 Co-Authored-By: Claude Opus 4.6 --- apps/api/src/bridge/bridge.module.ts | 11 +- .../interfaces/chat-provider.interface.ts | 13 + apps/api/src/bridge/matrix/index.ts | 2 + .../matrix/matrix-streaming.service.spec.ts | 408 ++++++++++++++++++ .../bridge/matrix/matrix-streaming.service.ts | 236 ++++++++++ apps/api/src/bridge/matrix/matrix.service.ts | 12 + 6 files changed, 681 insertions(+), 1 deletion(-) create mode 100644 apps/api/src/bridge/matrix/matrix-streaming.service.spec.ts create mode 100644 apps/api/src/bridge/matrix/matrix-streaming.service.ts diff --git a/apps/api/src/bridge/bridge.module.ts b/apps/api/src/bridge/bridge.module.ts index 43ece04..d966b5c 100644 --- a/apps/api/src/bridge/bridge.module.ts +++ b/apps/api/src/bridge/bridge.module.ts @@ -2,6 +2,7 @@ import { Logger, Module } from "@nestjs/common"; import { DiscordService } from "./discord/discord.service"; import { MatrixService } from "./matrix/matrix.service"; import { MatrixRoomService } from "./matrix/matrix-room.service"; +import { MatrixStreamingService } from "./matrix/matrix-streaming.service"; import { CommandParserService } from "./parser/command-parser.service"; import { StitcherModule } from "../stitcher/stitcher.module"; import { CHAT_PROVIDERS } from "./bridge.constants"; @@ -31,6 +32,7 @@ const logger = new Logger("BridgeModule"); providers: [ CommandParserService, MatrixRoomService, + MatrixStreamingService, DiscordService, MatrixService, { @@ -57,6 +59,13 @@ const logger = new Logger("BridgeModule"); inject: [DiscordService, MatrixService], }, ], - exports: [DiscordService, MatrixService, MatrixRoomService, CommandParserService, CHAT_PROVIDERS], + exports: [ + DiscordService, + MatrixService, + MatrixRoomService, + MatrixStreamingService, + CommandParserService, + CHAT_PROVIDERS, + ], }) export class BridgeModule {} diff --git a/apps/api/src/bridge/interfaces/chat-provider.interface.ts b/apps/api/src/bridge/interfaces/chat-provider.interface.ts index 382ca82..2e8b5f9 100644 --- a/apps/api/src/bridge/interfaces/chat-provider.interface.ts +++ b/apps/api/src/bridge/interfaces/chat-provider.interface.ts @@ -76,4 +76,17 @@ export interface IChatProvider { * Parse a command from a message */ parseCommand(message: ChatMessage): ChatCommand | null; + + /** + * Edit an existing message in a channel. + * + * Optional method for providers that support message editing + * (e.g., Matrix via m.replace, Discord via message.edit). + * Used for streaming AI responses with incremental updates. + * + * @param channelId - The channel/room ID + * @param messageId - The original message/event ID to edit + * @param content - The updated message content + */ + editMessage?(channelId: string, messageId: string, content: string): Promise; } diff --git a/apps/api/src/bridge/matrix/index.ts b/apps/api/src/bridge/matrix/index.ts index 34c67f7..7a73857 100644 --- a/apps/api/src/bridge/matrix/index.ts +++ b/apps/api/src/bridge/matrix/index.ts @@ -1,2 +1,4 @@ export { MatrixService } from "./matrix.service"; export { MatrixRoomService } from "./matrix-room.service"; +export { MatrixStreamingService } from "./matrix-streaming.service"; +export type { StreamResponseOptions } from "./matrix-streaming.service"; diff --git a/apps/api/src/bridge/matrix/matrix-streaming.service.spec.ts b/apps/api/src/bridge/matrix/matrix-streaming.service.spec.ts new file mode 100644 index 0000000..e87f0e2 --- /dev/null +++ b/apps/api/src/bridge/matrix/matrix-streaming.service.spec.ts @@ -0,0 +1,408 @@ +import { Test, TestingModule } from "@nestjs/testing"; +import { MatrixStreamingService } from "./matrix-streaming.service"; +import { MatrixService } from "./matrix.service"; +import { vi, describe, it, expect, beforeEach, afterEach } from "vitest"; +import type { StreamResponseOptions } from "./matrix-streaming.service"; + +// Mock matrix-bot-sdk to prevent native module loading +vi.mock("matrix-bot-sdk", () => { + return { + MatrixClient: class MockMatrixClient {}, + SimpleFsStorageProvider: class MockStorageProvider { + constructor(_filename: string) { + // No-op for testing + } + }, + AutojoinRoomsMixin: { + setupOnClient: vi.fn(), + }, + }; +}); + +// Mock MatrixClient +const mockClient = { + sendMessage: vi.fn().mockResolvedValue("$initial-event-id"), + sendEvent: vi.fn().mockResolvedValue("$edit-event-id"), + setTyping: vi.fn().mockResolvedValue(undefined), +}; + +// Mock MatrixService +const mockMatrixService = { + isConnected: vi.fn().mockReturnValue(true), + getClient: vi.fn().mockReturnValue(mockClient), +}; + +/** + * Helper: create an async iterable from an array of strings with optional delays + */ +async function* createTokenStream( + tokens: string[], + delayMs = 0 +): AsyncGenerator { + for (const token of tokens) { + if (delayMs > 0) { + await new Promise((resolve) => setTimeout(resolve, delayMs)); + } + yield token; + } +} + +/** + * Helper: create a token stream that throws an error mid-stream + */ +async function* createErrorStream( + tokens: string[], + errorAfter: number +): AsyncGenerator { + let count = 0; + for (const token of tokens) { + if (count >= errorAfter) { + throw new Error("LLM provider connection lost"); + } + yield token; + count++; + } +} + +describe("MatrixStreamingService", () => { + let service: MatrixStreamingService; + + beforeEach(async () => { + vi.useFakeTimers({ shouldAdvanceTime: true }); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + MatrixStreamingService, + { + provide: MatrixService, + useValue: mockMatrixService, + }, + ], + }).compile(); + + service = module.get(MatrixStreamingService); + + // Clear all mocks + vi.clearAllMocks(); + + // Re-apply default mock returns after clearing + mockMatrixService.isConnected.mockReturnValue(true); + mockMatrixService.getClient.mockReturnValue(mockClient); + mockClient.sendMessage.mockResolvedValue("$initial-event-id"); + mockClient.sendEvent.mockResolvedValue("$edit-event-id"); + mockClient.setTyping.mockResolvedValue(undefined); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + describe("editMessage", () => { + it("should send a m.replace event to edit an existing message", async () => { + await service.editMessage("!room:example.com", "$original-event-id", "Updated content"); + + expect(mockClient.sendEvent).toHaveBeenCalledWith("!room:example.com", "m.room.message", { + "m.new_content": { + msgtype: "m.text", + body: "Updated content", + }, + "m.relates_to": { + rel_type: "m.replace", + event_id: "$original-event-id", + }, + // Fallback for clients that don't support edits + msgtype: "m.text", + body: "* Updated content", + }); + }); + + it("should throw error when client is not connected", async () => { + mockMatrixService.isConnected.mockReturnValue(false); + + await expect( + service.editMessage("!room:example.com", "$event-id", "content") + ).rejects.toThrow("Matrix client is not connected"); + }); + + it("should throw error when client is null", async () => { + mockMatrixService.getClient.mockReturnValue(null); + + await expect( + service.editMessage("!room:example.com", "$event-id", "content") + ).rejects.toThrow("Matrix client is not connected"); + }); + }); + + describe("setTypingIndicator", () => { + it("should call client.setTyping with true and timeout", async () => { + await service.setTypingIndicator("!room:example.com", true); + + expect(mockClient.setTyping).toHaveBeenCalledWith("!room:example.com", true, 30000); + }); + + it("should call client.setTyping with false to clear indicator", async () => { + await service.setTypingIndicator("!room:example.com", false); + + expect(mockClient.setTyping).toHaveBeenCalledWith("!room:example.com", false, undefined); + }); + + it("should throw error when client is not connected", async () => { + mockMatrixService.isConnected.mockReturnValue(false); + + await expect(service.setTypingIndicator("!room:example.com", true)).rejects.toThrow( + "Matrix client is not connected" + ); + }); + }); + + describe("sendStreamingMessage", () => { + it("should send an initial message and return the event ID", async () => { + const eventId = await service.sendStreamingMessage("!room:example.com", "Thinking..."); + + expect(eventId).toBe("$initial-event-id"); + expect(mockClient.sendMessage).toHaveBeenCalledWith("!room:example.com", { + msgtype: "m.text", + body: "Thinking...", + }); + }); + + it("should send a thread message when threadId is provided", async () => { + const eventId = await service.sendStreamingMessage( + "!room:example.com", + "Thinking...", + "$thread-root-id" + ); + + expect(eventId).toBe("$initial-event-id"); + expect(mockClient.sendMessage).toHaveBeenCalledWith("!room:example.com", { + msgtype: "m.text", + body: "Thinking...", + "m.relates_to": { + rel_type: "m.thread", + event_id: "$thread-root-id", + is_falling_back: true, + "m.in_reply_to": { + event_id: "$thread-root-id", + }, + }, + }); + }); + + it("should throw error when client is not connected", async () => { + mockMatrixService.isConnected.mockReturnValue(false); + + await expect(service.sendStreamingMessage("!room:example.com", "Test")).rejects.toThrow( + "Matrix client is not connected" + ); + }); + }); + + describe("streamResponse", () => { + it("should send initial 'Thinking...' message and start typing indicator", async () => { + vi.useRealTimers(); + + const tokens = ["Hello", " world"]; + const stream = createTokenStream(tokens); + + await service.streamResponse("!room:example.com", stream); + + // Should have sent initial message + expect(mockClient.sendMessage).toHaveBeenCalledWith( + "!room:example.com", + expect.objectContaining({ + msgtype: "m.text", + body: "Thinking...", + }) + ); + + // Should have started typing indicator + expect(mockClient.setTyping).toHaveBeenCalledWith("!room:example.com", true, 30000); + }); + + it("should use custom initial message when provided", async () => { + vi.useRealTimers(); + + const tokens = ["Hi"]; + const stream = createTokenStream(tokens); + + const options: StreamResponseOptions = { initialMessage: "Processing..." }; + await service.streamResponse("!room:example.com", stream, options); + + expect(mockClient.sendMessage).toHaveBeenCalledWith( + "!room:example.com", + expect.objectContaining({ + body: "Processing...", + }) + ); + }); + + it("should edit message with accumulated tokens on completion", async () => { + vi.useRealTimers(); + + const tokens = ["Hello", " ", "world", "!"]; + const stream = createTokenStream(tokens); + + await service.streamResponse("!room:example.com", stream); + + // The final edit should contain the full accumulated text + const sendEventCalls = mockClient.sendEvent.mock.calls; + const lastEditCall = sendEventCalls[sendEventCalls.length - 1]; + + expect(lastEditCall).toBeDefined(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + expect(lastEditCall[2]["m.new_content"].body).toBe("Hello world!"); + }); + + it("should clear typing indicator on completion", async () => { + vi.useRealTimers(); + + const tokens = ["Done"]; + const stream = createTokenStream(tokens); + + await service.streamResponse("!room:example.com", stream); + + // Last setTyping call should be false + const typingCalls = mockClient.setTyping.mock.calls; + const lastTypingCall = typingCalls[typingCalls.length - 1]; + + expect(lastTypingCall).toEqual(["!room:example.com", false, undefined]); + }); + + it("should rate-limit edits to at most one every 500ms", async () => { + vi.useRealTimers(); + + // Send tokens with small delays - all within one 500ms window + const tokens = ["a", "b", "c", "d", "e"]; + const stream = createTokenStream(tokens, 50); // 50ms between tokens = 250ms total + + await service.streamResponse("!room:example.com", stream); + + // With 250ms total streaming time (5 tokens * 50ms), all tokens arrive + // within one 500ms window. We expect at most 1 intermediate edit + 1 final edit, + // or just the final edit. The key point is that there should NOT be 5 separate edits. + const editCalls = mockClient.sendEvent.mock.calls.filter( + (call) => call[1] === "m.room.message" + ); + + // Should have fewer edits than tokens (rate limiting in effect) + expect(editCalls.length).toBeLessThanOrEqual(2); + // Should have at least the final edit + expect(editCalls.length).toBeGreaterThanOrEqual(1); + }); + + it("should handle errors gracefully and edit message with error notice", async () => { + vi.useRealTimers(); + + const stream = createErrorStream(["Hello", " ", "world"], 2); + + await service.streamResponse("!room:example.com", stream); + + // Should edit message with error content + const sendEventCalls = mockClient.sendEvent.mock.calls; + const lastEditCall = sendEventCalls[sendEventCalls.length - 1]; + + expect(lastEditCall).toBeDefined(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const finalBody = lastEditCall[2]["m.new_content"].body as string; + expect(finalBody).toContain("error"); + + // Should clear typing on error + const typingCalls = mockClient.setTyping.mock.calls; + const lastTypingCall = typingCalls[typingCalls.length - 1]; + expect(lastTypingCall).toEqual(["!room:example.com", false, undefined]); + }); + + it("should include token usage in final message when provided", async () => { + vi.useRealTimers(); + + const tokens = ["Hello"]; + const stream = createTokenStream(tokens); + + const options: StreamResponseOptions = { + showTokenUsage: true, + tokenUsage: { prompt: 10, completion: 5, total: 15 }, + }; + + await service.streamResponse("!room:example.com", stream, options); + + const sendEventCalls = mockClient.sendEvent.mock.calls; + const lastEditCall = sendEventCalls[sendEventCalls.length - 1]; + + expect(lastEditCall).toBeDefined(); + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const finalBody = lastEditCall[2]["m.new_content"].body as string; + expect(finalBody).toContain("15"); + }); + + it("should throw error when client is not connected", async () => { + mockMatrixService.isConnected.mockReturnValue(false); + + const stream = createTokenStream(["test"]); + + await expect(service.streamResponse("!room:example.com", stream)).rejects.toThrow( + "Matrix client is not connected" + ); + }); + + it("should handle empty token stream", async () => { + vi.useRealTimers(); + + const stream = createTokenStream([]); + + await service.streamResponse("!room:example.com", stream); + + // Should still send initial message + expect(mockClient.sendMessage).toHaveBeenCalled(); + + // Should edit with empty/no-content message + const sendEventCalls = mockClient.sendEvent.mock.calls; + expect(sendEventCalls.length).toBeGreaterThanOrEqual(1); + + // Should clear typing + const typingCalls = mockClient.setTyping.mock.calls; + const lastTypingCall = typingCalls[typingCalls.length - 1]; + expect(lastTypingCall).toEqual(["!room:example.com", false, undefined]); + }); + + it("should support thread context in streamResponse", async () => { + vi.useRealTimers(); + + const tokens = ["Reply"]; + const stream = createTokenStream(tokens); + + const options: StreamResponseOptions = { threadId: "$thread-root" }; + await service.streamResponse("!room:example.com", stream, options); + + // Initial message should include thread relation + expect(mockClient.sendMessage).toHaveBeenCalledWith( + "!room:example.com", + expect.objectContaining({ + "m.relates_to": expect.objectContaining({ + rel_type: "m.thread", + event_id: "$thread-root", + }), + }) + ); + }); + + it("should perform multiple edits for long-running streams", async () => { + vi.useRealTimers(); + + // Create tokens with 200ms delays - total ~2000ms, should get multiple edit windows + const tokens = Array.from({ length: 10 }, (_, i) => `token${String(i)} `); + const stream = createTokenStream(tokens, 200); + + await service.streamResponse("!room:example.com", stream); + + // With 10 tokens at 200ms each = 2000ms total, at 500ms intervals + // we expect roughly 3-4 intermediate edits + 1 final = 4-5 total + const editCalls = mockClient.sendEvent.mock.calls.filter( + (call) => call[1] === "m.room.message" + ); + + // Should have multiple edits (at least 2) but far fewer than 10 + expect(editCalls.length).toBeGreaterThanOrEqual(2); + expect(editCalls.length).toBeLessThanOrEqual(8); + }); + }); +}); diff --git a/apps/api/src/bridge/matrix/matrix-streaming.service.ts b/apps/api/src/bridge/matrix/matrix-streaming.service.ts new file mode 100644 index 0000000..f2ecdbd --- /dev/null +++ b/apps/api/src/bridge/matrix/matrix-streaming.service.ts @@ -0,0 +1,236 @@ +import { Injectable, Logger } from "@nestjs/common"; +import type { MatrixClient } from "matrix-bot-sdk"; +import { MatrixService } from "./matrix.service"; + +/** + * Options for the streamResponse method + */ +export interface StreamResponseOptions { + /** Custom initial message (defaults to "Thinking...") */ + initialMessage?: string; + /** Thread root event ID for threaded responses */ + threadId?: string; + /** Whether to show token usage in the final message */ + showTokenUsage?: boolean; + /** Token usage stats to display in the final message */ + tokenUsage?: { prompt: number; completion: number; total: number }; +} + +/** + * Matrix message content for m.room.message events + */ +interface MatrixMessageContent { + msgtype: string; + body: string; + "m.new_content"?: { + msgtype: string; + body: string; + }; + "m.relates_to"?: { + rel_type: string; + event_id: string; + is_falling_back?: boolean; + "m.in_reply_to"?: { + event_id: string; + }; + }; +} + +/** Minimum interval between message edits (milliseconds) */ +const EDIT_INTERVAL_MS = 500; + +/** Typing indicator timeout (milliseconds) */ +const TYPING_TIMEOUT_MS = 30000; + +/** + * Matrix Streaming Service + * + * Provides streaming AI response capabilities for Matrix rooms using + * incremental message edits. Tokens from an LLM are buffered and the + * response message is edited at rate-limited intervals, providing a + * smooth streaming experience without excessive API calls. + * + * Key features: + * - Rate-limited edits (max every 500ms) + * - Typing indicator management during generation + * - Graceful error handling with user-visible error notices + * - Thread support for contextual responses + * - LLM-agnostic design via AsyncIterable token stream + */ +@Injectable() +export class MatrixStreamingService { + private readonly logger = new Logger(MatrixStreamingService.name); + + constructor(private readonly matrixService: MatrixService) {} + + /** + * Edit an existing Matrix message using the m.replace relation. + * + * Sends a new event that replaces the content of an existing message. + * Includes fallback content for clients that don't support edits. + * + * @param roomId - The Matrix room ID + * @param eventId - The original event ID to replace + * @param newContent - The updated message text + */ + async editMessage(roomId: string, eventId: string, newContent: string): Promise { + const client = this.getClientOrThrow(); + + const editContent: MatrixMessageContent = { + "m.new_content": { + msgtype: "m.text", + body: newContent, + }, + "m.relates_to": { + rel_type: "m.replace", + event_id: eventId, + }, + // Fallback for clients that don't support edits + msgtype: "m.text", + body: `* ${newContent}`, + }; + + await client.sendEvent(roomId, "m.room.message", editContent); + } + + /** + * Set the typing indicator for the bot in a room. + * + * @param roomId - The Matrix room ID + * @param typing - Whether the bot is typing + */ + async setTypingIndicator(roomId: string, typing: boolean): Promise { + const client = this.getClientOrThrow(); + + await client.setTyping(roomId, typing, typing ? TYPING_TIMEOUT_MS : undefined); + } + + /** + * Send an initial message for streaming, optionally in a thread. + * + * Returns the event ID of the sent message, which can be used for + * subsequent edits via editMessage. + * + * @param roomId - The Matrix room ID + * @param content - The initial message content + * @param threadId - Optional thread root event ID + * @returns The event ID of the sent message + */ + async sendStreamingMessage(roomId: string, content: string, threadId?: string): Promise { + const client = this.getClientOrThrow(); + + const messageContent: MatrixMessageContent = { + msgtype: "m.text", + body: content, + }; + + if (threadId) { + messageContent["m.relates_to"] = { + rel_type: "m.thread", + event_id: threadId, + is_falling_back: true, + "m.in_reply_to": { + event_id: threadId, + }, + }; + } + + const eventId: string = await client.sendMessage(roomId, messageContent); + return eventId; + } + + /** + * Stream an AI response to a Matrix room using incremental message edits. + * + * This is the main streaming method. It: + * 1. Sends an initial "Thinking..." message + * 2. Starts the typing indicator + * 3. Buffers incoming tokens from the async iterable + * 4. Edits the message every 500ms with accumulated text + * 5. On completion: sends a final clean edit, clears typing + * 6. On error: edits message with error notice, clears typing + * + * @param roomId - The Matrix room ID + * @param tokenStream - AsyncIterable that yields string tokens + * @param options - Optional configuration for the stream + */ + async streamResponse( + roomId: string, + tokenStream: AsyncIterable, + options?: StreamResponseOptions + ): Promise { + // Validate connection before starting + this.getClientOrThrow(); + + const initialMessage = options?.initialMessage ?? "Thinking..."; + const threadId = options?.threadId; + + // Step 1: Send initial message + const eventId = await this.sendStreamingMessage(roomId, initialMessage, threadId); + + // Step 2: Start typing indicator + await this.setTypingIndicator(roomId, true); + + // Step 3: Buffer and stream tokens + let accumulatedText = ""; + let lastEditTime = 0; + let hasError = false; + + try { + for await (const token of tokenStream) { + accumulatedText += token; + + const now = Date.now(); + const elapsed = now - lastEditTime; + + if (elapsed >= EDIT_INTERVAL_MS && accumulatedText.length > 0) { + await this.editMessage(roomId, eventId, accumulatedText); + lastEditTime = now; + } + } + } catch (error: unknown) { + hasError = true; + const errorMessage = error instanceof Error ? error.message : "Unknown error occurred"; + + this.logger.error(`Stream error in room ${roomId}: ${errorMessage}`); + + // Edit message to show error + const errorContent = accumulatedText + ? `${accumulatedText}\n\n[Streaming error: ${errorMessage}]` + : `[Streaming error: ${errorMessage}]`; + + await this.editMessage(roomId, eventId, errorContent); + } finally { + // Step 4: Clear typing indicator + await this.setTypingIndicator(roomId, false); + } + + // Step 5: Final edit with clean output (if no error) + if (!hasError) { + let finalContent = accumulatedText || "(No response generated)"; + + if (options?.showTokenUsage && options.tokenUsage) { + const { prompt, completion, total } = options.tokenUsage; + finalContent += `\n\n---\nTokens: ${String(total)} (prompt: ${String(prompt)}, completion: ${String(completion)})`; + } + + await this.editMessage(roomId, eventId, finalContent); + } + } + + /** + * Get the Matrix client from the parent MatrixService, or throw if not connected. + */ + private getClientOrThrow(): MatrixClient { + if (!this.matrixService.isConnected()) { + throw new Error("Matrix client is not connected"); + } + + const client = this.matrixService.getClient(); + if (!client) { + throw new Error("Matrix client is not connected"); + } + + return client; + } +} diff --git a/apps/api/src/bridge/matrix/matrix.service.ts b/apps/api/src/bridge/matrix/matrix.service.ts index 2da5948..5674dc5 100644 --- a/apps/api/src/bridge/matrix/matrix.service.ts +++ b/apps/api/src/bridge/matrix/matrix.service.ts @@ -269,6 +269,18 @@ export class MatrixService implements IChatProvider { return this.connected; } + /** + * Get the underlying MatrixClient instance. + * + * Used by MatrixStreamingService for low-level operations + * (message edits, typing indicators) that require direct client access. + * + * @returns The MatrixClient instance, or null if not connected + */ + getClient(): MatrixClient | null { + return this.client; + } + /** * Send a message to a room */