import { Injectable, Logger } from "@nestjs/common"; import type { MatrixClient } from "matrix-bot-sdk"; import { MatrixService } from "./matrix.service"; /** * Options for the streamResponse method */ export interface StreamResponseOptions { /** Custom initial message (defaults to "Thinking...") */ initialMessage?: string; /** Thread root event ID for threaded responses */ threadId?: string; /** Whether to show token usage in the final message */ showTokenUsage?: boolean; /** Token usage stats to display in the final message */ tokenUsage?: { prompt: number; completion: number; total: number }; } /** * Matrix message content for m.room.message events */ interface MatrixMessageContent { msgtype: string; body: string; "m.new_content"?: { msgtype: string; body: string; }; "m.relates_to"?: { rel_type: string; event_id: string; is_falling_back?: boolean; "m.in_reply_to"?: { event_id: string; }; }; } /** Minimum interval between message edits (milliseconds) */ const EDIT_INTERVAL_MS = 500; /** Typing indicator timeout (milliseconds) */ const TYPING_TIMEOUT_MS = 30000; /** * Matrix Streaming Service * * Provides streaming AI response capabilities for Matrix rooms using * incremental message edits. Tokens from an LLM are buffered and the * response message is edited at rate-limited intervals, providing a * smooth streaming experience without excessive API calls. * * Key features: * - Rate-limited edits (max every 500ms) * - Typing indicator management during generation * - Graceful error handling with user-visible error notices * - Thread support for contextual responses * - LLM-agnostic design via AsyncIterable token stream */ @Injectable() export class MatrixStreamingService { private readonly logger = new Logger(MatrixStreamingService.name); constructor(private readonly matrixService: MatrixService) {} /** * Edit an existing Matrix message using the m.replace relation. * * Sends a new event that replaces the content of an existing message. * Includes fallback content for clients that don't support edits. * * @param roomId - The Matrix room ID * @param eventId - The original event ID to replace * @param newContent - The updated message text */ async editMessage(roomId: string, eventId: string, newContent: string): Promise { const client = this.getClientOrThrow(); const editContent: MatrixMessageContent = { "m.new_content": { msgtype: "m.text", body: newContent, }, "m.relates_to": { rel_type: "m.replace", event_id: eventId, }, // Fallback for clients that don't support edits msgtype: "m.text", body: `* ${newContent}`, }; await client.sendEvent(roomId, "m.room.message", editContent); } /** * Set the typing indicator for the bot in a room. * * @param roomId - The Matrix room ID * @param typing - Whether the bot is typing */ async setTypingIndicator(roomId: string, typing: boolean): Promise { const client = this.getClientOrThrow(); await client.setTyping(roomId, typing, typing ? TYPING_TIMEOUT_MS : undefined); } /** * Send an initial message for streaming, optionally in a thread. * * Returns the event ID of the sent message, which can be used for * subsequent edits via editMessage. * * @param roomId - The Matrix room ID * @param content - The initial message content * @param threadId - Optional thread root event ID * @returns The event ID of the sent message */ async sendStreamingMessage(roomId: string, content: string, threadId?: string): Promise { const client = this.getClientOrThrow(); const messageContent: MatrixMessageContent = { msgtype: "m.text", body: content, }; if (threadId) { messageContent["m.relates_to"] = { rel_type: "m.thread", event_id: threadId, is_falling_back: true, "m.in_reply_to": { event_id: threadId, }, }; } const eventId: string = await client.sendMessage(roomId, messageContent); return eventId; } /** * Stream an AI response to a Matrix room using incremental message edits. * * This is the main streaming method. It: * 1. Sends an initial "Thinking..." message * 2. Starts the typing indicator * 3. Buffers incoming tokens from the async iterable * 4. Edits the message every 500ms with accumulated text * 5. On completion: sends a final clean edit, clears typing * 6. On error: edits message with error notice, clears typing * * @param roomId - The Matrix room ID * @param tokenStream - AsyncIterable that yields string tokens * @param options - Optional configuration for the stream */ async streamResponse( roomId: string, tokenStream: AsyncIterable, options?: StreamResponseOptions ): Promise { // Validate connection before starting this.getClientOrThrow(); const initialMessage = options?.initialMessage ?? "Thinking..."; const threadId = options?.threadId; // Step 1: Send initial message const eventId = await this.sendStreamingMessage(roomId, initialMessage, threadId); // Step 2: Start typing indicator await this.setTypingIndicator(roomId, true); // Step 3: Buffer and stream tokens let accumulatedText = ""; let lastEditTime = 0; let hasError = false; try { for await (const token of tokenStream) { accumulatedText += token; const now = Date.now(); const elapsed = now - lastEditTime; if (elapsed >= EDIT_INTERVAL_MS && accumulatedText.length > 0) { await this.editMessage(roomId, eventId, accumulatedText); lastEditTime = now; } } } catch (error: unknown) { hasError = true; const errorMessage = error instanceof Error ? error.message : "Unknown error occurred"; this.logger.error(`Stream error in room ${roomId}: ${errorMessage}`); // Edit message to show error try { const errorContent = accumulatedText ? `${accumulatedText}\n\n[Streaming error: ${errorMessage}]` : `[Streaming error: ${errorMessage}]`; await this.editMessage(roomId, eventId, errorContent); } catch (editError: unknown) { this.logger.warn( `Failed to edit error message in ${roomId}: ${editError instanceof Error ? editError.message : "unknown"}` ); } } finally { // Step 4: Clear typing indicator try { await this.setTypingIndicator(roomId, false); } catch (typingError: unknown) { this.logger.warn( `Failed to clear typing indicator in ${roomId}: ${typingError instanceof Error ? typingError.message : "unknown"}` ); } } // Step 5: Final edit with clean output (if no error) if (!hasError) { let finalContent = accumulatedText || "(No response generated)"; if (options?.showTokenUsage && options.tokenUsage) { const { prompt, completion, total } = options.tokenUsage; finalContent += `\n\n---\nTokens: ${String(total)} (prompt: ${String(prompt)}, completion: ${String(completion)})`; } await this.editMessage(roomId, eventId, finalContent); } } /** * Get the Matrix client from the parent MatrixService, or throw if not connected. */ private getClientOrThrow(): MatrixClient { if (!this.matrixService.isConnected()) { throw new Error("Matrix client is not connected"); } const client = this.matrixService.getClient(); if (!client) { throw new Error("Matrix client is not connected"); } return client; } }