- Fix sendThreadMessage room mismatch: use channelId from options instead of hardcoded controlRoomId - Add .catch() to fire-and-forget handleRoomMessage to prevent silent error swallowing - Wrap dispatchJob in try-catch for user-visible error reporting in handleFixCommand - Add MATRIX_BOT_USER_ID validation in connect() to prevent infinite message loops - Fix streamResponse error masking: wrap finally/catch side-effects in try-catch - Replace unsafe type assertion with public getClient() in MatrixRoomService - Add orphaned room warning in provisionRoom on DB failure - Add provider identity to Herald error logs - Add channelId to ThreadMessageOptions interface and all callers - Add missing env var warnings in BridgeModule factory - Fix JSON injection in setup-bot.sh: use jq for safe JSON construction Fixes #377 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
249 lines
7.6 KiB
TypeScript
249 lines
7.6 KiB
TypeScript
import { Injectable, Logger } from "@nestjs/common";
|
|
import type { MatrixClient } from "matrix-bot-sdk";
|
|
import { MatrixService } from "./matrix.service";
|
|
|
|
/**
|
|
* Options for the streamResponse method
|
|
*/
|
|
export interface StreamResponseOptions {
|
|
/** Custom initial message (defaults to "Thinking...") */
|
|
initialMessage?: string;
|
|
/** Thread root event ID for threaded responses */
|
|
threadId?: string;
|
|
/** Whether to show token usage in the final message */
|
|
showTokenUsage?: boolean;
|
|
/** Token usage stats to display in the final message */
|
|
tokenUsage?: { prompt: number; completion: number; total: number };
|
|
}
|
|
|
|
/**
|
|
* Matrix message content for m.room.message events
|
|
*/
|
|
interface MatrixMessageContent {
|
|
msgtype: string;
|
|
body: string;
|
|
"m.new_content"?: {
|
|
msgtype: string;
|
|
body: string;
|
|
};
|
|
"m.relates_to"?: {
|
|
rel_type: string;
|
|
event_id: string;
|
|
is_falling_back?: boolean;
|
|
"m.in_reply_to"?: {
|
|
event_id: string;
|
|
};
|
|
};
|
|
}
|
|
|
|
/** Minimum interval between message edits (milliseconds) */
|
|
const EDIT_INTERVAL_MS = 500;
|
|
|
|
/** Typing indicator timeout (milliseconds) */
|
|
const TYPING_TIMEOUT_MS = 30000;
|
|
|
|
/**
|
|
* Matrix Streaming Service
|
|
*
|
|
* Provides streaming AI response capabilities for Matrix rooms using
|
|
* incremental message edits. Tokens from an LLM are buffered and the
|
|
* response message is edited at rate-limited intervals, providing a
|
|
* smooth streaming experience without excessive API calls.
|
|
*
|
|
* Key features:
|
|
* - Rate-limited edits (max every 500ms)
|
|
* - Typing indicator management during generation
|
|
* - Graceful error handling with user-visible error notices
|
|
* - Thread support for contextual responses
|
|
* - LLM-agnostic design via AsyncIterable<string> token stream
|
|
*/
|
|
@Injectable()
|
|
export class MatrixStreamingService {
|
|
private readonly logger = new Logger(MatrixStreamingService.name);
|
|
|
|
constructor(private readonly matrixService: MatrixService) {}
|
|
|
|
/**
|
|
* Edit an existing Matrix message using the m.replace relation.
|
|
*
|
|
* Sends a new event that replaces the content of an existing message.
|
|
* Includes fallback content for clients that don't support edits.
|
|
*
|
|
* @param roomId - The Matrix room ID
|
|
* @param eventId - The original event ID to replace
|
|
* @param newContent - The updated message text
|
|
*/
|
|
async editMessage(roomId: string, eventId: string, newContent: string): Promise<void> {
|
|
const client = this.getClientOrThrow();
|
|
|
|
const editContent: MatrixMessageContent = {
|
|
"m.new_content": {
|
|
msgtype: "m.text",
|
|
body: newContent,
|
|
},
|
|
"m.relates_to": {
|
|
rel_type: "m.replace",
|
|
event_id: eventId,
|
|
},
|
|
// Fallback for clients that don't support edits
|
|
msgtype: "m.text",
|
|
body: `* ${newContent}`,
|
|
};
|
|
|
|
await client.sendEvent(roomId, "m.room.message", editContent);
|
|
}
|
|
|
|
/**
|
|
* Set the typing indicator for the bot in a room.
|
|
*
|
|
* @param roomId - The Matrix room ID
|
|
* @param typing - Whether the bot is typing
|
|
*/
|
|
async setTypingIndicator(roomId: string, typing: boolean): Promise<void> {
|
|
const client = this.getClientOrThrow();
|
|
|
|
await client.setTyping(roomId, typing, typing ? TYPING_TIMEOUT_MS : undefined);
|
|
}
|
|
|
|
/**
|
|
* Send an initial message for streaming, optionally in a thread.
|
|
*
|
|
* Returns the event ID of the sent message, which can be used for
|
|
* subsequent edits via editMessage.
|
|
*
|
|
* @param roomId - The Matrix room ID
|
|
* @param content - The initial message content
|
|
* @param threadId - Optional thread root event ID
|
|
* @returns The event ID of the sent message
|
|
*/
|
|
async sendStreamingMessage(roomId: string, content: string, threadId?: string): Promise<string> {
|
|
const client = this.getClientOrThrow();
|
|
|
|
const messageContent: MatrixMessageContent = {
|
|
msgtype: "m.text",
|
|
body: content,
|
|
};
|
|
|
|
if (threadId) {
|
|
messageContent["m.relates_to"] = {
|
|
rel_type: "m.thread",
|
|
event_id: threadId,
|
|
is_falling_back: true,
|
|
"m.in_reply_to": {
|
|
event_id: threadId,
|
|
},
|
|
};
|
|
}
|
|
|
|
const eventId: string = await client.sendMessage(roomId, messageContent);
|
|
return eventId;
|
|
}
|
|
|
|
/**
|
|
* Stream an AI response to a Matrix room using incremental message edits.
|
|
*
|
|
* This is the main streaming method. It:
|
|
* 1. Sends an initial "Thinking..." message
|
|
* 2. Starts the typing indicator
|
|
* 3. Buffers incoming tokens from the async iterable
|
|
* 4. Edits the message every 500ms with accumulated text
|
|
* 5. On completion: sends a final clean edit, clears typing
|
|
* 6. On error: edits message with error notice, clears typing
|
|
*
|
|
* @param roomId - The Matrix room ID
|
|
* @param tokenStream - AsyncIterable that yields string tokens
|
|
* @param options - Optional configuration for the stream
|
|
*/
|
|
async streamResponse(
|
|
roomId: string,
|
|
tokenStream: AsyncIterable<string>,
|
|
options?: StreamResponseOptions
|
|
): Promise<void> {
|
|
// Validate connection before starting
|
|
this.getClientOrThrow();
|
|
|
|
const initialMessage = options?.initialMessage ?? "Thinking...";
|
|
const threadId = options?.threadId;
|
|
|
|
// Step 1: Send initial message
|
|
const eventId = await this.sendStreamingMessage(roomId, initialMessage, threadId);
|
|
|
|
// Step 2: Start typing indicator
|
|
await this.setTypingIndicator(roomId, true);
|
|
|
|
// Step 3: Buffer and stream tokens
|
|
let accumulatedText = "";
|
|
let lastEditTime = 0;
|
|
let hasError = false;
|
|
|
|
try {
|
|
for await (const token of tokenStream) {
|
|
accumulatedText += token;
|
|
|
|
const now = Date.now();
|
|
const elapsed = now - lastEditTime;
|
|
|
|
if (elapsed >= EDIT_INTERVAL_MS && accumulatedText.length > 0) {
|
|
await this.editMessage(roomId, eventId, accumulatedText);
|
|
lastEditTime = now;
|
|
}
|
|
}
|
|
} catch (error: unknown) {
|
|
hasError = true;
|
|
const errorMessage = error instanceof Error ? error.message : "Unknown error occurred";
|
|
|
|
this.logger.error(`Stream error in room ${roomId}: ${errorMessage}`);
|
|
|
|
// Edit message to show error
|
|
try {
|
|
const errorContent = accumulatedText
|
|
? `${accumulatedText}\n\n[Streaming error: ${errorMessage}]`
|
|
: `[Streaming error: ${errorMessage}]`;
|
|
|
|
await this.editMessage(roomId, eventId, errorContent);
|
|
} catch (editError: unknown) {
|
|
this.logger.warn(
|
|
`Failed to edit error message in ${roomId}: ${editError instanceof Error ? editError.message : "unknown"}`
|
|
);
|
|
}
|
|
} finally {
|
|
// Step 4: Clear typing indicator
|
|
try {
|
|
await this.setTypingIndicator(roomId, false);
|
|
} catch (typingError: unknown) {
|
|
this.logger.warn(
|
|
`Failed to clear typing indicator in ${roomId}: ${typingError instanceof Error ? typingError.message : "unknown"}`
|
|
);
|
|
}
|
|
}
|
|
|
|
// Step 5: Final edit with clean output (if no error)
|
|
if (!hasError) {
|
|
let finalContent = accumulatedText || "(No response generated)";
|
|
|
|
if (options?.showTokenUsage && options.tokenUsage) {
|
|
const { prompt, completion, total } = options.tokenUsage;
|
|
finalContent += `\n\n---\nTokens: ${String(total)} (prompt: ${String(prompt)}, completion: ${String(completion)})`;
|
|
}
|
|
|
|
await this.editMessage(roomId, eventId, finalContent);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Get the Matrix client from the parent MatrixService, or throw if not connected.
|
|
*/
|
|
private getClientOrThrow(): MatrixClient {
|
|
if (!this.matrixService.isConnected()) {
|
|
throw new Error("Matrix client is not connected");
|
|
}
|
|
|
|
const client = this.matrixService.getClient();
|
|
if (!client) {
|
|
throw new Error("Matrix client is not connected");
|
|
}
|
|
|
|
return client;
|
|
}
|
|
}
|