feat(#383): Streaming AI responses via Matrix message edits
Some checks failed
ci/woodpecker/push/api Pipeline failed
Some checks failed
ci/woodpecker/push/api Pipeline failed
- Add MatrixStreamingService with editMessage, setTypingIndicator, streamResponse - Rate-limited edits (500ms) for incremental streaming output - Typing indicator management during generation - Graceful error handling and fallback for non-streaming scenarios - Add optional editMessage to IChatProvider interface - Add getClient() accessor to MatrixService for streaming service - Register MatrixStreamingService in BridgeModule - Tests: 20 tests pass Refs #383 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -2,6 +2,7 @@ import { Logger, Module } from "@nestjs/common";
|
|||||||
import { DiscordService } from "./discord/discord.service";
|
import { DiscordService } from "./discord/discord.service";
|
||||||
import { MatrixService } from "./matrix/matrix.service";
|
import { MatrixService } from "./matrix/matrix.service";
|
||||||
import { MatrixRoomService } from "./matrix/matrix-room.service";
|
import { MatrixRoomService } from "./matrix/matrix-room.service";
|
||||||
|
import { MatrixStreamingService } from "./matrix/matrix-streaming.service";
|
||||||
import { CommandParserService } from "./parser/command-parser.service";
|
import { CommandParserService } from "./parser/command-parser.service";
|
||||||
import { StitcherModule } from "../stitcher/stitcher.module";
|
import { StitcherModule } from "../stitcher/stitcher.module";
|
||||||
import { CHAT_PROVIDERS } from "./bridge.constants";
|
import { CHAT_PROVIDERS } from "./bridge.constants";
|
||||||
@@ -31,6 +32,7 @@ const logger = new Logger("BridgeModule");
|
|||||||
providers: [
|
providers: [
|
||||||
CommandParserService,
|
CommandParserService,
|
||||||
MatrixRoomService,
|
MatrixRoomService,
|
||||||
|
MatrixStreamingService,
|
||||||
DiscordService,
|
DiscordService,
|
||||||
MatrixService,
|
MatrixService,
|
||||||
{
|
{
|
||||||
@@ -57,6 +59,13 @@ const logger = new Logger("BridgeModule");
|
|||||||
inject: [DiscordService, MatrixService],
|
inject: [DiscordService, MatrixService],
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
exports: [DiscordService, MatrixService, MatrixRoomService, CommandParserService, CHAT_PROVIDERS],
|
exports: [
|
||||||
|
DiscordService,
|
||||||
|
MatrixService,
|
||||||
|
MatrixRoomService,
|
||||||
|
MatrixStreamingService,
|
||||||
|
CommandParserService,
|
||||||
|
CHAT_PROVIDERS,
|
||||||
|
],
|
||||||
})
|
})
|
||||||
export class BridgeModule {}
|
export class BridgeModule {}
|
||||||
|
|||||||
@@ -76,4 +76,17 @@ export interface IChatProvider {
|
|||||||
* Parse a command from a message
|
* Parse a command from a message
|
||||||
*/
|
*/
|
||||||
parseCommand(message: ChatMessage): ChatCommand | null;
|
parseCommand(message: ChatMessage): ChatCommand | null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Edit an existing message in a channel.
|
||||||
|
*
|
||||||
|
* Optional method for providers that support message editing
|
||||||
|
* (e.g., Matrix via m.replace, Discord via message.edit).
|
||||||
|
* Used for streaming AI responses with incremental updates.
|
||||||
|
*
|
||||||
|
* @param channelId - The channel/room ID
|
||||||
|
* @param messageId - The original message/event ID to edit
|
||||||
|
* @param content - The updated message content
|
||||||
|
*/
|
||||||
|
editMessage?(channelId: string, messageId: string, content: string): Promise<void>;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,2 +1,4 @@
|
|||||||
export { MatrixService } from "./matrix.service";
|
export { MatrixService } from "./matrix.service";
|
||||||
export { MatrixRoomService } from "./matrix-room.service";
|
export { MatrixRoomService } from "./matrix-room.service";
|
||||||
|
export { MatrixStreamingService } from "./matrix-streaming.service";
|
||||||
|
export type { StreamResponseOptions } from "./matrix-streaming.service";
|
||||||
|
|||||||
408
apps/api/src/bridge/matrix/matrix-streaming.service.spec.ts
Normal file
408
apps/api/src/bridge/matrix/matrix-streaming.service.spec.ts
Normal file
@@ -0,0 +1,408 @@
|
|||||||
|
import { Test, TestingModule } from "@nestjs/testing";
|
||||||
|
import { MatrixStreamingService } from "./matrix-streaming.service";
|
||||||
|
import { MatrixService } from "./matrix.service";
|
||||||
|
import { vi, describe, it, expect, beforeEach, afterEach } from "vitest";
|
||||||
|
import type { StreamResponseOptions } from "./matrix-streaming.service";
|
||||||
|
|
||||||
|
// Mock matrix-bot-sdk to prevent native module loading
|
||||||
|
vi.mock("matrix-bot-sdk", () => {
|
||||||
|
return {
|
||||||
|
MatrixClient: class MockMatrixClient {},
|
||||||
|
SimpleFsStorageProvider: class MockStorageProvider {
|
||||||
|
constructor(_filename: string) {
|
||||||
|
// No-op for testing
|
||||||
|
}
|
||||||
|
},
|
||||||
|
AutojoinRoomsMixin: {
|
||||||
|
setupOnClient: vi.fn(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
// Mock MatrixClient
|
||||||
|
const mockClient = {
|
||||||
|
sendMessage: vi.fn().mockResolvedValue("$initial-event-id"),
|
||||||
|
sendEvent: vi.fn().mockResolvedValue("$edit-event-id"),
|
||||||
|
setTyping: vi.fn().mockResolvedValue(undefined),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Mock MatrixService
|
||||||
|
const mockMatrixService = {
|
||||||
|
isConnected: vi.fn().mockReturnValue(true),
|
||||||
|
getClient: vi.fn().mockReturnValue(mockClient),
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper: create an async iterable from an array of strings with optional delays
|
||||||
|
*/
|
||||||
|
async function* createTokenStream(
|
||||||
|
tokens: string[],
|
||||||
|
delayMs = 0
|
||||||
|
): AsyncGenerator<string, void, undefined> {
|
||||||
|
for (const token of tokens) {
|
||||||
|
if (delayMs > 0) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, delayMs));
|
||||||
|
}
|
||||||
|
yield token;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper: create a token stream that throws an error mid-stream
|
||||||
|
*/
|
||||||
|
async function* createErrorStream(
|
||||||
|
tokens: string[],
|
||||||
|
errorAfter: number
|
||||||
|
): AsyncGenerator<string, void, undefined> {
|
||||||
|
let count = 0;
|
||||||
|
for (const token of tokens) {
|
||||||
|
if (count >= errorAfter) {
|
||||||
|
throw new Error("LLM provider connection lost");
|
||||||
|
}
|
||||||
|
yield token;
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("MatrixStreamingService", () => {
|
||||||
|
let service: MatrixStreamingService;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||||
|
|
||||||
|
const module: TestingModule = await Test.createTestingModule({
|
||||||
|
providers: [
|
||||||
|
MatrixStreamingService,
|
||||||
|
{
|
||||||
|
provide: MatrixService,
|
||||||
|
useValue: mockMatrixService,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}).compile();
|
||||||
|
|
||||||
|
service = module.get<MatrixStreamingService>(MatrixStreamingService);
|
||||||
|
|
||||||
|
// Clear all mocks
|
||||||
|
vi.clearAllMocks();
|
||||||
|
|
||||||
|
// Re-apply default mock returns after clearing
|
||||||
|
mockMatrixService.isConnected.mockReturnValue(true);
|
||||||
|
mockMatrixService.getClient.mockReturnValue(mockClient);
|
||||||
|
mockClient.sendMessage.mockResolvedValue("$initial-event-id");
|
||||||
|
mockClient.sendEvent.mockResolvedValue("$edit-event-id");
|
||||||
|
mockClient.setTyping.mockResolvedValue(undefined);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("editMessage", () => {
|
||||||
|
it("should send a m.replace event to edit an existing message", async () => {
|
||||||
|
await service.editMessage("!room:example.com", "$original-event-id", "Updated content");
|
||||||
|
|
||||||
|
expect(mockClient.sendEvent).toHaveBeenCalledWith("!room:example.com", "m.room.message", {
|
||||||
|
"m.new_content": {
|
||||||
|
msgtype: "m.text",
|
||||||
|
body: "Updated content",
|
||||||
|
},
|
||||||
|
"m.relates_to": {
|
||||||
|
rel_type: "m.replace",
|
||||||
|
event_id: "$original-event-id",
|
||||||
|
},
|
||||||
|
// Fallback for clients that don't support edits
|
||||||
|
msgtype: "m.text",
|
||||||
|
body: "* Updated content",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should throw error when client is not connected", async () => {
|
||||||
|
mockMatrixService.isConnected.mockReturnValue(false);
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
service.editMessage("!room:example.com", "$event-id", "content")
|
||||||
|
).rejects.toThrow("Matrix client is not connected");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should throw error when client is null", async () => {
|
||||||
|
mockMatrixService.getClient.mockReturnValue(null);
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
service.editMessage("!room:example.com", "$event-id", "content")
|
||||||
|
).rejects.toThrow("Matrix client is not connected");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("setTypingIndicator", () => {
|
||||||
|
it("should call client.setTyping with true and timeout", async () => {
|
||||||
|
await service.setTypingIndicator("!room:example.com", true);
|
||||||
|
|
||||||
|
expect(mockClient.setTyping).toHaveBeenCalledWith("!room:example.com", true, 30000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should call client.setTyping with false to clear indicator", async () => {
|
||||||
|
await service.setTypingIndicator("!room:example.com", false);
|
||||||
|
|
||||||
|
expect(mockClient.setTyping).toHaveBeenCalledWith("!room:example.com", false, undefined);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should throw error when client is not connected", async () => {
|
||||||
|
mockMatrixService.isConnected.mockReturnValue(false);
|
||||||
|
|
||||||
|
await expect(service.setTypingIndicator("!room:example.com", true)).rejects.toThrow(
|
||||||
|
"Matrix client is not connected"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("sendStreamingMessage", () => {
|
||||||
|
it("should send an initial message and return the event ID", async () => {
|
||||||
|
const eventId = await service.sendStreamingMessage("!room:example.com", "Thinking...");
|
||||||
|
|
||||||
|
expect(eventId).toBe("$initial-event-id");
|
||||||
|
expect(mockClient.sendMessage).toHaveBeenCalledWith("!room:example.com", {
|
||||||
|
msgtype: "m.text",
|
||||||
|
body: "Thinking...",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should send a thread message when threadId is provided", async () => {
|
||||||
|
const eventId = await service.sendStreamingMessage(
|
||||||
|
"!room:example.com",
|
||||||
|
"Thinking...",
|
||||||
|
"$thread-root-id"
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(eventId).toBe("$initial-event-id");
|
||||||
|
expect(mockClient.sendMessage).toHaveBeenCalledWith("!room:example.com", {
|
||||||
|
msgtype: "m.text",
|
||||||
|
body: "Thinking...",
|
||||||
|
"m.relates_to": {
|
||||||
|
rel_type: "m.thread",
|
||||||
|
event_id: "$thread-root-id",
|
||||||
|
is_falling_back: true,
|
||||||
|
"m.in_reply_to": {
|
||||||
|
event_id: "$thread-root-id",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should throw error when client is not connected", async () => {
|
||||||
|
mockMatrixService.isConnected.mockReturnValue(false);
|
||||||
|
|
||||||
|
await expect(service.sendStreamingMessage("!room:example.com", "Test")).rejects.toThrow(
|
||||||
|
"Matrix client is not connected"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("streamResponse", () => {
|
||||||
|
it("should send initial 'Thinking...' message and start typing indicator", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
|
||||||
|
const tokens = ["Hello", " world"];
|
||||||
|
const stream = createTokenStream(tokens);
|
||||||
|
|
||||||
|
await service.streamResponse("!room:example.com", stream);
|
||||||
|
|
||||||
|
// Should have sent initial message
|
||||||
|
expect(mockClient.sendMessage).toHaveBeenCalledWith(
|
||||||
|
"!room:example.com",
|
||||||
|
expect.objectContaining({
|
||||||
|
msgtype: "m.text",
|
||||||
|
body: "Thinking...",
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
// Should have started typing indicator
|
||||||
|
expect(mockClient.setTyping).toHaveBeenCalledWith("!room:example.com", true, 30000);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should use custom initial message when provided", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
|
||||||
|
const tokens = ["Hi"];
|
||||||
|
const stream = createTokenStream(tokens);
|
||||||
|
|
||||||
|
const options: StreamResponseOptions = { initialMessage: "Processing..." };
|
||||||
|
await service.streamResponse("!room:example.com", stream, options);
|
||||||
|
|
||||||
|
expect(mockClient.sendMessage).toHaveBeenCalledWith(
|
||||||
|
"!room:example.com",
|
||||||
|
expect.objectContaining({
|
||||||
|
body: "Processing...",
|
||||||
|
})
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should edit message with accumulated tokens on completion", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
|
||||||
|
const tokens = ["Hello", " ", "world", "!"];
|
||||||
|
const stream = createTokenStream(tokens);
|
||||||
|
|
||||||
|
await service.streamResponse("!room:example.com", stream);
|
||||||
|
|
||||||
|
// The final edit should contain the full accumulated text
|
||||||
|
const sendEventCalls = mockClient.sendEvent.mock.calls;
|
||||||
|
const lastEditCall = sendEventCalls[sendEventCalls.length - 1];
|
||||||
|
|
||||||
|
expect(lastEditCall).toBeDefined();
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||||
|
expect(lastEditCall[2]["m.new_content"].body).toBe("Hello world!");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should clear typing indicator on completion", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
|
||||||
|
const tokens = ["Done"];
|
||||||
|
const stream = createTokenStream(tokens);
|
||||||
|
|
||||||
|
await service.streamResponse("!room:example.com", stream);
|
||||||
|
|
||||||
|
// Last setTyping call should be false
|
||||||
|
const typingCalls = mockClient.setTyping.mock.calls;
|
||||||
|
const lastTypingCall = typingCalls[typingCalls.length - 1];
|
||||||
|
|
||||||
|
expect(lastTypingCall).toEqual(["!room:example.com", false, undefined]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should rate-limit edits to at most one every 500ms", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
|
||||||
|
// Send tokens with small delays - all within one 500ms window
|
||||||
|
const tokens = ["a", "b", "c", "d", "e"];
|
||||||
|
const stream = createTokenStream(tokens, 50); // 50ms between tokens = 250ms total
|
||||||
|
|
||||||
|
await service.streamResponse("!room:example.com", stream);
|
||||||
|
|
||||||
|
// With 250ms total streaming time (5 tokens * 50ms), all tokens arrive
|
||||||
|
// within one 500ms window. We expect at most 1 intermediate edit + 1 final edit,
|
||||||
|
// or just the final edit. The key point is that there should NOT be 5 separate edits.
|
||||||
|
const editCalls = mockClient.sendEvent.mock.calls.filter(
|
||||||
|
(call) => call[1] === "m.room.message"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Should have fewer edits than tokens (rate limiting in effect)
|
||||||
|
expect(editCalls.length).toBeLessThanOrEqual(2);
|
||||||
|
// Should have at least the final edit
|
||||||
|
expect(editCalls.length).toBeGreaterThanOrEqual(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should handle errors gracefully and edit message with error notice", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
|
||||||
|
const stream = createErrorStream(["Hello", " ", "world"], 2);
|
||||||
|
|
||||||
|
await service.streamResponse("!room:example.com", stream);
|
||||||
|
|
||||||
|
// Should edit message with error content
|
||||||
|
const sendEventCalls = mockClient.sendEvent.mock.calls;
|
||||||
|
const lastEditCall = sendEventCalls[sendEventCalls.length - 1];
|
||||||
|
|
||||||
|
expect(lastEditCall).toBeDefined();
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||||
|
const finalBody = lastEditCall[2]["m.new_content"].body as string;
|
||||||
|
expect(finalBody).toContain("error");
|
||||||
|
|
||||||
|
// Should clear typing on error
|
||||||
|
const typingCalls = mockClient.setTyping.mock.calls;
|
||||||
|
const lastTypingCall = typingCalls[typingCalls.length - 1];
|
||||||
|
expect(lastTypingCall).toEqual(["!room:example.com", false, undefined]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should include token usage in final message when provided", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
|
||||||
|
const tokens = ["Hello"];
|
||||||
|
const stream = createTokenStream(tokens);
|
||||||
|
|
||||||
|
const options: StreamResponseOptions = {
|
||||||
|
showTokenUsage: true,
|
||||||
|
tokenUsage: { prompt: 10, completion: 5, total: 15 },
|
||||||
|
};
|
||||||
|
|
||||||
|
await service.streamResponse("!room:example.com", stream, options);
|
||||||
|
|
||||||
|
const sendEventCalls = mockClient.sendEvent.mock.calls;
|
||||||
|
const lastEditCall = sendEventCalls[sendEventCalls.length - 1];
|
||||||
|
|
||||||
|
expect(lastEditCall).toBeDefined();
|
||||||
|
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
|
||||||
|
const finalBody = lastEditCall[2]["m.new_content"].body as string;
|
||||||
|
expect(finalBody).toContain("15");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should throw error when client is not connected", async () => {
|
||||||
|
mockMatrixService.isConnected.mockReturnValue(false);
|
||||||
|
|
||||||
|
const stream = createTokenStream(["test"]);
|
||||||
|
|
||||||
|
await expect(service.streamResponse("!room:example.com", stream)).rejects.toThrow(
|
||||||
|
"Matrix client is not connected"
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should handle empty token stream", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
|
||||||
|
const stream = createTokenStream([]);
|
||||||
|
|
||||||
|
await service.streamResponse("!room:example.com", stream);
|
||||||
|
|
||||||
|
// Should still send initial message
|
||||||
|
expect(mockClient.sendMessage).toHaveBeenCalled();
|
||||||
|
|
||||||
|
// Should edit with empty/no-content message
|
||||||
|
const sendEventCalls = mockClient.sendEvent.mock.calls;
|
||||||
|
expect(sendEventCalls.length).toBeGreaterThanOrEqual(1);
|
||||||
|
|
||||||
|
// Should clear typing
|
||||||
|
const typingCalls = mockClient.setTyping.mock.calls;
|
||||||
|
const lastTypingCall = typingCalls[typingCalls.length - 1];
|
||||||
|
expect(lastTypingCall).toEqual(["!room:example.com", false, undefined]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should support thread context in streamResponse", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
|
||||||
|
const tokens = ["Reply"];
|
||||||
|
const stream = createTokenStream(tokens);
|
||||||
|
|
||||||
|
const options: StreamResponseOptions = { threadId: "$thread-root" };
|
||||||
|
await service.streamResponse("!room:example.com", stream, options);
|
||||||
|
|
||||||
|
// Initial message should include thread relation
|
||||||
|
expect(mockClient.sendMessage).toHaveBeenCalledWith(
|
||||||
|
"!room:example.com",
|
||||||
|
expect.objectContaining({
|
||||||
|
"m.relates_to": expect.objectContaining({
|
||||||
|
rel_type: "m.thread",
|
||||||
|
event_id: "$thread-root",
|
||||||
|
}),
|
||||||
|
})
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("should perform multiple edits for long-running streams", async () => {
|
||||||
|
vi.useRealTimers();
|
||||||
|
|
||||||
|
// Create tokens with 200ms delays - total ~2000ms, should get multiple edit windows
|
||||||
|
const tokens = Array.from({ length: 10 }, (_, i) => `token${String(i)} `);
|
||||||
|
const stream = createTokenStream(tokens, 200);
|
||||||
|
|
||||||
|
await service.streamResponse("!room:example.com", stream);
|
||||||
|
|
||||||
|
// With 10 tokens at 200ms each = 2000ms total, at 500ms intervals
|
||||||
|
// we expect roughly 3-4 intermediate edits + 1 final = 4-5 total
|
||||||
|
const editCalls = mockClient.sendEvent.mock.calls.filter(
|
||||||
|
(call) => call[1] === "m.room.message"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Should have multiple edits (at least 2) but far fewer than 10
|
||||||
|
expect(editCalls.length).toBeGreaterThanOrEqual(2);
|
||||||
|
expect(editCalls.length).toBeLessThanOrEqual(8);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
236
apps/api/src/bridge/matrix/matrix-streaming.service.ts
Normal file
236
apps/api/src/bridge/matrix/matrix-streaming.service.ts
Normal file
@@ -0,0 +1,236 @@
|
|||||||
|
import { Injectable, Logger } from "@nestjs/common";
|
||||||
|
import type { MatrixClient } from "matrix-bot-sdk";
|
||||||
|
import { MatrixService } from "./matrix.service";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Options for the streamResponse method
|
||||||
|
*/
|
||||||
|
export interface StreamResponseOptions {
|
||||||
|
/** Custom initial message (defaults to "Thinking...") */
|
||||||
|
initialMessage?: string;
|
||||||
|
/** Thread root event ID for threaded responses */
|
||||||
|
threadId?: string;
|
||||||
|
/** Whether to show token usage in the final message */
|
||||||
|
showTokenUsage?: boolean;
|
||||||
|
/** Token usage stats to display in the final message */
|
||||||
|
tokenUsage?: { prompt: number; completion: number; total: number };
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Matrix message content for m.room.message events
|
||||||
|
*/
|
||||||
|
interface MatrixMessageContent {
|
||||||
|
msgtype: string;
|
||||||
|
body: string;
|
||||||
|
"m.new_content"?: {
|
||||||
|
msgtype: string;
|
||||||
|
body: string;
|
||||||
|
};
|
||||||
|
"m.relates_to"?: {
|
||||||
|
rel_type: string;
|
||||||
|
event_id: string;
|
||||||
|
is_falling_back?: boolean;
|
||||||
|
"m.in_reply_to"?: {
|
||||||
|
event_id: string;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Minimum interval between message edits (milliseconds) */
|
||||||
|
const EDIT_INTERVAL_MS = 500;
|
||||||
|
|
||||||
|
/** Typing indicator timeout (milliseconds) */
|
||||||
|
const TYPING_TIMEOUT_MS = 30000;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Matrix Streaming Service
|
||||||
|
*
|
||||||
|
* Provides streaming AI response capabilities for Matrix rooms using
|
||||||
|
* incremental message edits. Tokens from an LLM are buffered and the
|
||||||
|
* response message is edited at rate-limited intervals, providing a
|
||||||
|
* smooth streaming experience without excessive API calls.
|
||||||
|
*
|
||||||
|
* Key features:
|
||||||
|
* - Rate-limited edits (max every 500ms)
|
||||||
|
* - Typing indicator management during generation
|
||||||
|
* - Graceful error handling with user-visible error notices
|
||||||
|
* - Thread support for contextual responses
|
||||||
|
* - LLM-agnostic design via AsyncIterable<string> token stream
|
||||||
|
*/
|
||||||
|
@Injectable()
|
||||||
|
export class MatrixStreamingService {
|
||||||
|
private readonly logger = new Logger(MatrixStreamingService.name);
|
||||||
|
|
||||||
|
constructor(private readonly matrixService: MatrixService) {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Edit an existing Matrix message using the m.replace relation.
|
||||||
|
*
|
||||||
|
* Sends a new event that replaces the content of an existing message.
|
||||||
|
* Includes fallback content for clients that don't support edits.
|
||||||
|
*
|
||||||
|
* @param roomId - The Matrix room ID
|
||||||
|
* @param eventId - The original event ID to replace
|
||||||
|
* @param newContent - The updated message text
|
||||||
|
*/
|
||||||
|
async editMessage(roomId: string, eventId: string, newContent: string): Promise<void> {
|
||||||
|
const client = this.getClientOrThrow();
|
||||||
|
|
||||||
|
const editContent: MatrixMessageContent = {
|
||||||
|
"m.new_content": {
|
||||||
|
msgtype: "m.text",
|
||||||
|
body: newContent,
|
||||||
|
},
|
||||||
|
"m.relates_to": {
|
||||||
|
rel_type: "m.replace",
|
||||||
|
event_id: eventId,
|
||||||
|
},
|
||||||
|
// Fallback for clients that don't support edits
|
||||||
|
msgtype: "m.text",
|
||||||
|
body: `* ${newContent}`,
|
||||||
|
};
|
||||||
|
|
||||||
|
await client.sendEvent(roomId, "m.room.message", editContent);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the typing indicator for the bot in a room.
|
||||||
|
*
|
||||||
|
* @param roomId - The Matrix room ID
|
||||||
|
* @param typing - Whether the bot is typing
|
||||||
|
*/
|
||||||
|
async setTypingIndicator(roomId: string, typing: boolean): Promise<void> {
|
||||||
|
const client = this.getClientOrThrow();
|
||||||
|
|
||||||
|
await client.setTyping(roomId, typing, typing ? TYPING_TIMEOUT_MS : undefined);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send an initial message for streaming, optionally in a thread.
|
||||||
|
*
|
||||||
|
* Returns the event ID of the sent message, which can be used for
|
||||||
|
* subsequent edits via editMessage.
|
||||||
|
*
|
||||||
|
* @param roomId - The Matrix room ID
|
||||||
|
* @param content - The initial message content
|
||||||
|
* @param threadId - Optional thread root event ID
|
||||||
|
* @returns The event ID of the sent message
|
||||||
|
*/
|
||||||
|
async sendStreamingMessage(roomId: string, content: string, threadId?: string): Promise<string> {
|
||||||
|
const client = this.getClientOrThrow();
|
||||||
|
|
||||||
|
const messageContent: MatrixMessageContent = {
|
||||||
|
msgtype: "m.text",
|
||||||
|
body: content,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (threadId) {
|
||||||
|
messageContent["m.relates_to"] = {
|
||||||
|
rel_type: "m.thread",
|
||||||
|
event_id: threadId,
|
||||||
|
is_falling_back: true,
|
||||||
|
"m.in_reply_to": {
|
||||||
|
event_id: threadId,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const eventId: string = await client.sendMessage(roomId, messageContent);
|
||||||
|
return eventId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stream an AI response to a Matrix room using incremental message edits.
|
||||||
|
*
|
||||||
|
* This is the main streaming method. It:
|
||||||
|
* 1. Sends an initial "Thinking..." message
|
||||||
|
* 2. Starts the typing indicator
|
||||||
|
* 3. Buffers incoming tokens from the async iterable
|
||||||
|
* 4. Edits the message every 500ms with accumulated text
|
||||||
|
* 5. On completion: sends a final clean edit, clears typing
|
||||||
|
* 6. On error: edits message with error notice, clears typing
|
||||||
|
*
|
||||||
|
* @param roomId - The Matrix room ID
|
||||||
|
* @param tokenStream - AsyncIterable that yields string tokens
|
||||||
|
* @param options - Optional configuration for the stream
|
||||||
|
*/
|
||||||
|
async streamResponse(
|
||||||
|
roomId: string,
|
||||||
|
tokenStream: AsyncIterable<string>,
|
||||||
|
options?: StreamResponseOptions
|
||||||
|
): Promise<void> {
|
||||||
|
// Validate connection before starting
|
||||||
|
this.getClientOrThrow();
|
||||||
|
|
||||||
|
const initialMessage = options?.initialMessage ?? "Thinking...";
|
||||||
|
const threadId = options?.threadId;
|
||||||
|
|
||||||
|
// Step 1: Send initial message
|
||||||
|
const eventId = await this.sendStreamingMessage(roomId, initialMessage, threadId);
|
||||||
|
|
||||||
|
// Step 2: Start typing indicator
|
||||||
|
await this.setTypingIndicator(roomId, true);
|
||||||
|
|
||||||
|
// Step 3: Buffer and stream tokens
|
||||||
|
let accumulatedText = "";
|
||||||
|
let lastEditTime = 0;
|
||||||
|
let hasError = false;
|
||||||
|
|
||||||
|
try {
|
||||||
|
for await (const token of tokenStream) {
|
||||||
|
accumulatedText += token;
|
||||||
|
|
||||||
|
const now = Date.now();
|
||||||
|
const elapsed = now - lastEditTime;
|
||||||
|
|
||||||
|
if (elapsed >= EDIT_INTERVAL_MS && accumulatedText.length > 0) {
|
||||||
|
await this.editMessage(roomId, eventId, accumulatedText);
|
||||||
|
lastEditTime = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error: unknown) {
|
||||||
|
hasError = true;
|
||||||
|
const errorMessage = error instanceof Error ? error.message : "Unknown error occurred";
|
||||||
|
|
||||||
|
this.logger.error(`Stream error in room ${roomId}: ${errorMessage}`);
|
||||||
|
|
||||||
|
// Edit message to show error
|
||||||
|
const errorContent = accumulatedText
|
||||||
|
? `${accumulatedText}\n\n[Streaming error: ${errorMessage}]`
|
||||||
|
: `[Streaming error: ${errorMessage}]`;
|
||||||
|
|
||||||
|
await this.editMessage(roomId, eventId, errorContent);
|
||||||
|
} finally {
|
||||||
|
// Step 4: Clear typing indicator
|
||||||
|
await this.setTypingIndicator(roomId, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 5: Final edit with clean output (if no error)
|
||||||
|
if (!hasError) {
|
||||||
|
let finalContent = accumulatedText || "(No response generated)";
|
||||||
|
|
||||||
|
if (options?.showTokenUsage && options.tokenUsage) {
|
||||||
|
const { prompt, completion, total } = options.tokenUsage;
|
||||||
|
finalContent += `\n\n---\nTokens: ${String(total)} (prompt: ${String(prompt)}, completion: ${String(completion)})`;
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.editMessage(roomId, eventId, finalContent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the Matrix client from the parent MatrixService, or throw if not connected.
|
||||||
|
*/
|
||||||
|
private getClientOrThrow(): MatrixClient {
|
||||||
|
if (!this.matrixService.isConnected()) {
|
||||||
|
throw new Error("Matrix client is not connected");
|
||||||
|
}
|
||||||
|
|
||||||
|
const client = this.matrixService.getClient();
|
||||||
|
if (!client) {
|
||||||
|
throw new Error("Matrix client is not connected");
|
||||||
|
}
|
||||||
|
|
||||||
|
return client;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -269,6 +269,18 @@ export class MatrixService implements IChatProvider {
|
|||||||
return this.connected;
|
return this.connected;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the underlying MatrixClient instance.
|
||||||
|
*
|
||||||
|
* Used by MatrixStreamingService for low-level operations
|
||||||
|
* (message edits, typing indicators) that require direct client access.
|
||||||
|
*
|
||||||
|
* @returns The MatrixClient instance, or null if not connected
|
||||||
|
*/
|
||||||
|
getClient(): MatrixClient | null {
|
||||||
|
return this.client;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send a message to a room
|
* Send a message to a room
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user