diff --git a/apps/api/src/bridge/matrix/matrix-bridge.integration.spec.ts b/apps/api/src/bridge/matrix/matrix-bridge.integration.spec.ts new file mode 100644 index 0000000..539ee51 --- /dev/null +++ b/apps/api/src/bridge/matrix/matrix-bridge.integration.spec.ts @@ -0,0 +1,1062 @@ +/** + * Matrix Bridge Integration Tests + * + * These tests verify cross-service interactions in the Matrix bridge subsystem. + * They use the NestJS Test module with mocked external dependencies (Prisma, + * matrix-bot-sdk, discord.js) but test ACTUAL service-to-service wiring. + * + * Scenarios covered: + * 1. BridgeModule DI: CHAT_PROVIDERS includes MatrixService when MATRIX_ACCESS_TOKEN is set + * 2. BridgeModule without Matrix: Matrix excluded when MATRIX_ACCESS_TOKEN unset + * 3. Command flow: room.message -> MatrixService -> CommandParserService -> StitcherService + * 4. Herald broadcast: HeraldService broadcasts to MatrixService as a CHAT_PROVIDERS entry + * 5. Room-workspace mapping: MatrixRoomService resolves workspace for MatrixService.handleRoomMessage + * 6. Streaming flow: MatrixStreamingService.streamResponse via MatrixService's client + * 7. Multi-provider coexistence: Both Discord and Matrix in CHAT_PROVIDERS + */ + +import { Test, TestingModule } from "@nestjs/testing"; +import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; +import { BridgeModule } from "../bridge.module"; +import { CHAT_PROVIDERS } from "../bridge.constants"; +import { MatrixService } from "./matrix.service"; +import { MatrixRoomService } from "./matrix-room.service"; +import { MatrixStreamingService } from "./matrix-streaming.service"; +import { CommandParserService } from "../parser/command-parser.service"; +import { DiscordService } from "../discord/discord.service"; +import { StitcherService } from "../../stitcher/stitcher.service"; +import { HeraldService } from "../../herald/herald.service"; +import { PrismaService } from "../../prisma/prisma.service"; +import { BullMqService } from "../../bullmq/bullmq.service"; +import type { IChatProvider } from "../interfaces"; +import { JOB_CREATED, JOB_STARTED } from "../../job-events/event-types"; + +// --------------------------------------------------------------------------- +// Mock discord.js +// --------------------------------------------------------------------------- +const mockDiscordReadyCallbacks: Array<() => void> = []; +const mockDiscordClient = { + login: vi.fn().mockImplementation(async () => { + mockDiscordReadyCallbacks.forEach((cb) => cb()); + return Promise.resolve(); + }), + destroy: vi.fn().mockResolvedValue(undefined), + on: vi.fn(), + once: vi.fn().mockImplementation((event: string, callback: () => void) => { + if (event === "ready") { + mockDiscordReadyCallbacks.push(callback); + } + }), + user: { tag: "TestBot#1234" }, + channels: { fetch: vi.fn() }, + guilds: { fetch: vi.fn() }, +}; + +vi.mock("discord.js", () => ({ + Client: class MockClient { + login = mockDiscordClient.login; + destroy = mockDiscordClient.destroy; + on = mockDiscordClient.on; + once = mockDiscordClient.once; + user = mockDiscordClient.user; + channels = mockDiscordClient.channels; + guilds = mockDiscordClient.guilds; + }, + Events: { + ClientReady: "ready", + MessageCreate: "messageCreate", + Error: "error", + }, + GatewayIntentBits: { + Guilds: 1 << 0, + GuildMessages: 1 << 9, + MessageContent: 1 << 15, + }, +})); + +// --------------------------------------------------------------------------- +// Mock matrix-bot-sdk +// --------------------------------------------------------------------------- +const mockMatrixMessageCallbacks: Array<(roomId: string, event: Record) => void> = + []; +const mockMatrixEventCallbacks: Array<(roomId: string, event: Record) => void> = + []; + +const mockMatrixClient = { + start: vi.fn().mockResolvedValue(undefined), + stop: vi.fn(), + on: vi + .fn() + .mockImplementation( + (event: string, callback: (roomId: string, evt: Record) => void) => { + if (event === "room.message") { + mockMatrixMessageCallbacks.push(callback); + } + if (event === "room.event") { + mockMatrixEventCallbacks.push(callback); + } + } + ), + sendMessage: vi.fn().mockResolvedValue("$mock-event-id"), + sendEvent: vi.fn().mockResolvedValue("$mock-edit-event-id"), + setTyping: vi.fn().mockResolvedValue(undefined), + createRoom: vi.fn().mockResolvedValue("!new-room:example.com"), +}; + +vi.mock("matrix-bot-sdk", () => ({ + MatrixClient: class MockMatrixClient { + start = mockMatrixClient.start; + stop = mockMatrixClient.stop; + on = mockMatrixClient.on; + sendMessage = mockMatrixClient.sendMessage; + sendEvent = mockMatrixClient.sendEvent; + setTyping = mockMatrixClient.setTyping; + createRoom = mockMatrixClient.createRoom; + }, + SimpleFsStorageProvider: class MockStorage { + constructor(_path: string) { + // no-op + } + }, + AutojoinRoomsMixin: { + setupOnClient: vi.fn(), + }, +})); + +// --------------------------------------------------------------------------- +// Saved environment variables +// --------------------------------------------------------------------------- +interface SavedEnvVars { + DISCORD_BOT_TOKEN?: string; + DISCORD_GUILD_ID?: string; + DISCORD_CONTROL_CHANNEL_ID?: string; + DISCORD_WORKSPACE_ID?: string; + MATRIX_ACCESS_TOKEN?: string; + MATRIX_HOMESERVER_URL?: string; + MATRIX_BOT_USER_ID?: string; + MATRIX_CONTROL_ROOM_ID?: string; + MATRIX_WORKSPACE_ID?: string; + ENCRYPTION_KEY?: string; +} + +const ENV_KEYS: (keyof SavedEnvVars)[] = [ + "DISCORD_BOT_TOKEN", + "DISCORD_GUILD_ID", + "DISCORD_CONTROL_CHANNEL_ID", + "DISCORD_WORKSPACE_ID", + "MATRIX_ACCESS_TOKEN", + "MATRIX_HOMESERVER_URL", + "MATRIX_BOT_USER_ID", + "MATRIX_CONTROL_ROOM_ID", + "MATRIX_WORKSPACE_ID", + "ENCRYPTION_KEY", +]; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function saveAndClearEnv(): SavedEnvVars { + const saved: SavedEnvVars = {}; + for (const key of ENV_KEYS) { + saved[key] = process.env[key]; + delete process.env[key]; + } + return saved; +} + +function restoreEnv(saved: SavedEnvVars): void { + for (const [key, value] of Object.entries(saved)) { + if (value === undefined) { + delete process.env[key]; + } else { + process.env[key] = value; + } + } +} + +function setMatrixEnv(): void { + process.env.MATRIX_ACCESS_TOKEN = "test-matrix-token"; + process.env.MATRIX_HOMESERVER_URL = "https://matrix.example.com"; + process.env.MATRIX_BOT_USER_ID = "@bot:example.com"; + process.env.MATRIX_CONTROL_ROOM_ID = "!control-room:example.com"; + process.env.MATRIX_WORKSPACE_ID = "ws-integration-test"; +} + +function setDiscordEnv(): void { + process.env.DISCORD_BOT_TOKEN = "test-discord-token"; + process.env.DISCORD_GUILD_ID = "test-guild-id"; + process.env.DISCORD_CONTROL_CHANNEL_ID = "test-channel-id"; + process.env.DISCORD_WORKSPACE_ID = "ws-discord-test"; +} + +function setEncryptionKey(): void { + process.env.ENCRYPTION_KEY = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"; +} + +/** + * Compile the full BridgeModule with only external deps mocked + */ +async function compileBridgeModule(): Promise { + return Test.createTestingModule({ + imports: [BridgeModule], + }) + .overrideProvider(PrismaService) + .useValue({}) + .overrideProvider(BullMqService) + .useValue({}) + .compile(); +} + +/** + * Create an async iterable from an array of string tokens + */ +async function* createTokenStream(tokens: string[]): AsyncGenerator { + for (const token of tokens) { + yield token; + } +} + +// =========================================================================== +// Integration Tests +// =========================================================================== + +describe("Matrix Bridge Integration Tests", () => { + let savedEnv: SavedEnvVars; + + beforeEach(() => { + savedEnv = saveAndClearEnv(); + setEncryptionKey(); + + // Clear callback arrays + mockMatrixMessageCallbacks.length = 0; + mockMatrixEventCallbacks.length = 0; + mockDiscordReadyCallbacks.length = 0; + + vi.clearAllMocks(); + }); + + afterEach(() => { + restoreEnv(savedEnv); + }); + + // ========================================================================= + // Scenario 1: BridgeModule DI with Matrix enabled + // ========================================================================= + describe("BridgeModule DI: Matrix enabled", () => { + it("should include MatrixService in CHAT_PROVIDERS when MATRIX_ACCESS_TOKEN is set", async () => { + setMatrixEnv(); + const module = await compileBridgeModule(); + + const providers = module.get(CHAT_PROVIDERS); + + expect(providers).toBeDefined(); + expect(providers.length).toBeGreaterThanOrEqual(1); + + const matrixProvider = providers.find((p) => p instanceof MatrixService); + expect(matrixProvider).toBeDefined(); + expect(matrixProvider).toBeInstanceOf(MatrixService); + }); + + it("should export MatrixService, MatrixRoomService, MatrixStreamingService, and CommandParserService", async () => { + setMatrixEnv(); + const module = await compileBridgeModule(); + + expect(module.get(MatrixService)).toBeInstanceOf(MatrixService); + expect(module.get(MatrixRoomService)).toBeInstanceOf(MatrixRoomService); + expect(module.get(MatrixStreamingService)).toBeInstanceOf(MatrixStreamingService); + expect(module.get(CommandParserService)).toBeInstanceOf(CommandParserService); + }); + + it("should provide StitcherService to MatrixService via StitcherModule import", async () => { + setMatrixEnv(); + const module = await compileBridgeModule(); + + const stitcher = module.get(StitcherService); + expect(stitcher).toBeDefined(); + expect(stitcher).toBeInstanceOf(StitcherService); + }); + }); + + // ========================================================================= + // Scenario 2: BridgeModule without Matrix + // ========================================================================= + describe("BridgeModule DI: Matrix disabled", () => { + it("should NOT include MatrixService in CHAT_PROVIDERS when MATRIX_ACCESS_TOKEN is unset", async () => { + // No Matrix env vars set - only encryption key + const module = await compileBridgeModule(); + + const providers = module.get(CHAT_PROVIDERS); + + expect(providers).toBeDefined(); + const matrixProvider = providers.find((p) => p instanceof MatrixService); + expect(matrixProvider).toBeUndefined(); + }); + + it("should still register MatrixService as a provider even when not in CHAT_PROVIDERS", async () => { + // MatrixService is always registered (for optional injection), just not in CHAT_PROVIDERS + const module = await compileBridgeModule(); + + const matrixService = module.get(MatrixService); + expect(matrixService).toBeDefined(); + expect(matrixService).toBeInstanceOf(MatrixService); + }); + + it("should produce empty CHAT_PROVIDERS when neither bridge is configured", async () => { + const module = await compileBridgeModule(); + + const providers = module.get(CHAT_PROVIDERS); + + expect(providers).toEqual([]); + }); + }); + + // ========================================================================= + // Scenario 3: Command flow - message -> parser -> stitcher + // ========================================================================= + describe("Command flow: message -> CommandParserService -> StitcherService", () => { + let matrixService: MatrixService; + let stitcherService: StitcherService; + let commandParser: CommandParserService; + + const mockStitcher = { + dispatchJob: vi.fn().mockResolvedValue({ + jobId: "job-integ-001", + queueName: "main", + status: "PENDING", + }), + trackJobEvent: vi.fn().mockResolvedValue(undefined), + }; + + const mockRoomService = { + getWorkspaceForRoom: vi.fn().mockResolvedValue(null), + getRoomForWorkspace: vi.fn().mockResolvedValue(null), + provisionRoom: vi.fn().mockResolvedValue(null), + linkWorkspaceToRoom: vi.fn().mockResolvedValue(undefined), + unlinkWorkspace: vi.fn().mockResolvedValue(undefined), + }; + + beforeEach(async () => { + setMatrixEnv(); + + const module = await Test.createTestingModule({ + providers: [ + MatrixService, + CommandParserService, + { + provide: StitcherService, + useValue: mockStitcher, + }, + { + provide: MatrixRoomService, + useValue: mockRoomService, + }, + ], + }).compile(); + + matrixService = module.get(MatrixService); + stitcherService = module.get(StitcherService); + commandParser = module.get(CommandParserService); + }); + + it("should parse @mosaic fix #42 through CommandParserService and dispatch to StitcherService", async () => { + // MatrixRoomService returns a workspace for the room + mockRoomService.getWorkspaceForRoom.mockResolvedValue("ws-mapped-123"); + + await matrixService.connect(); + + // Simulate incoming Matrix message event + const callback = mockMatrixMessageCallbacks[0]; + expect(callback).toBeDefined(); + + callback?.("!some-room:example.com", { + event_id: "$ev-fix-42", + sender: "@alice:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic fix #42", + }, + }); + + // Wait for async processing + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Verify StitcherService.dispatchJob was called with correct workspace + expect(stitcherService.dispatchJob).toHaveBeenCalledWith( + expect.objectContaining({ + workspaceId: "ws-mapped-123", + type: "code-task", + priority: 10, + metadata: expect.objectContaining({ + issueNumber: 42, + command: "fix", + authorId: "@alice:example.com", + }), + }) + ); + }); + + it("should normalize !mosaic prefix through CommandParserService and dispatch correctly", async () => { + mockRoomService.getWorkspaceForRoom.mockResolvedValue("ws-bang-prefix"); + + await matrixService.connect(); + + const callback = mockMatrixMessageCallbacks[0]; + callback?.("!room:example.com", { + event_id: "$ev-bang-fix", + sender: "@bob:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "!mosaic fix #99", + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(stitcherService.dispatchJob).toHaveBeenCalledWith( + expect.objectContaining({ + workspaceId: "ws-bang-prefix", + metadata: expect.objectContaining({ + issueNumber: 99, + }), + }) + ); + }); + + it("should send help text when CommandParserService fails to parse an invalid command", async () => { + mockRoomService.getWorkspaceForRoom.mockResolvedValue("ws-test"); + + await matrixService.connect(); + + const callback = mockMatrixMessageCallbacks[0]; + callback?.("!room:example.com", { + event_id: "$ev-bad-cmd", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic invalidcmd", + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Should NOT dispatch to stitcher + expect(stitcherService.dispatchJob).not.toHaveBeenCalled(); + + // Should send help text back to the room + expect(mockMatrixClient.sendMessage).toHaveBeenCalledWith( + "!room:example.com", + expect.objectContaining({ + body: expect.stringContaining("Available commands"), + }) + ); + }); + + it("should create a thread and send confirmation after dispatching a fix command", async () => { + mockRoomService.getWorkspaceForRoom.mockResolvedValue("ws-thread-test"); + + await matrixService.connect(); + + const callback = mockMatrixMessageCallbacks[0]; + callback?.("!room:example.com", { + event_id: "$ev-fix-thread", + sender: "@alice:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic fix #10", + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // First sendMessage creates the thread root + const sendCalls = mockMatrixClient.sendMessage.mock.calls; + expect(sendCalls.length).toBeGreaterThanOrEqual(2); + + // Thread root message + const threadRootCall = sendCalls[0]; + expect(threadRootCall?.[0]).toBe("!room:example.com"); + expect(threadRootCall?.[1]).toEqual( + expect.objectContaining({ + body: expect.stringContaining("Job #10"), + }) + ); + + // Confirmation message sent as thread reply + const confirmationCall = sendCalls[1]; + expect(confirmationCall?.[0]).toBe("!control-room:example.com"); + expect(confirmationCall?.[1]).toEqual( + expect.objectContaining({ + body: expect.stringContaining("Job created: job-integ-001"), + "m.relates_to": expect.objectContaining({ + rel_type: "m.thread", + }), + }) + ); + }); + + it("should verify CommandParserService is the real service (not a mock)", () => { + // This confirms the integration test wires up the actual CommandParserService + const result = commandParser.parseCommand("@mosaic fix #42"); + + expect(result.success).toBe(true); + if (result.success) { + expect(result.command.action).toBe("fix"); + expect(result.command.issue?.number).toBe(42); + } + }); + }); + + // ========================================================================= + // Scenario 4: Herald broadcast to MatrixService via CHAT_PROVIDERS + // ========================================================================= + describe("Herald broadcast via CHAT_PROVIDERS", () => { + it("should broadcast to MatrixService when it is connected", async () => { + setMatrixEnv(); + + // Create a connected mock MatrixService that tracks sendThreadMessage calls + const threadMessages: Array<{ threadId: string; content: string }> = []; + const mockMatrixProvider: IChatProvider = { + connect: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + isConnected: vi.fn().mockReturnValue(true), + sendMessage: vi.fn().mockResolvedValue(undefined), + createThread: vi.fn().mockResolvedValue("$thread-id"), + sendThreadMessage: vi.fn().mockImplementation(async (options) => { + threadMessages.push(options as { threadId: string; content: string }); + }), + parseCommand: vi.fn().mockReturnValue(null), + }; + + const mockPrisma = { + runnerJob: { + findUnique: vi.fn().mockResolvedValue({ + id: "job-herald-001", + workspaceId: "ws-herald-test", + type: "code-task", + }), + }, + jobEvent: { + findFirst: vi.fn().mockResolvedValue({ + payload: { + metadata: { + threadId: "$thread-herald-root", + issueNumber: 55, + }, + }, + }), + }, + }; + + const module = await Test.createTestingModule({ + providers: [ + HeraldService, + { + provide: PrismaService, + useValue: mockPrisma, + }, + { + provide: CHAT_PROVIDERS, + useValue: [mockMatrixProvider], + }, + ], + }).compile(); + + const herald = module.get(HeraldService); + + await herald.broadcastJobEvent("job-herald-001", { + id: "evt-001", + jobId: "job-herald-001", + type: JOB_STARTED, + timestamp: new Date(), + actor: "stitcher", + payload: {}, + }); + + // Verify Herald sent the message via the MatrixService (CHAT_PROVIDERS) + expect(threadMessages).toHaveLength(1); + expect(threadMessages[0]?.threadId).toBe("$thread-herald-root"); + expect(threadMessages[0]?.content).toContain("#55"); + }); + + it("should skip disconnected providers and continue to next", async () => { + const disconnectedProvider: IChatProvider = { + connect: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + isConnected: vi.fn().mockReturnValue(false), + sendMessage: vi.fn().mockResolvedValue(undefined), + createThread: vi.fn().mockResolvedValue("$t"), + sendThreadMessage: vi.fn().mockResolvedValue(undefined), + parseCommand: vi.fn().mockReturnValue(null), + }; + + const connectedProvider: IChatProvider = { + connect: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + isConnected: vi.fn().mockReturnValue(true), + sendMessage: vi.fn().mockResolvedValue(undefined), + createThread: vi.fn().mockResolvedValue("$t"), + sendThreadMessage: vi.fn().mockResolvedValue(undefined), + parseCommand: vi.fn().mockReturnValue(null), + }; + + const mockPrisma = { + runnerJob: { + findUnique: vi.fn().mockResolvedValue({ + id: "job-skip-001", + workspaceId: "ws-skip", + type: "code-task", + }), + }, + jobEvent: { + findFirst: vi.fn().mockResolvedValue({ + payload: { + metadata: { + threadId: "$thread-skip", + issueNumber: 1, + }, + }, + }), + }, + }; + + const module = await Test.createTestingModule({ + providers: [ + HeraldService, + { + provide: PrismaService, + useValue: mockPrisma, + }, + { + provide: CHAT_PROVIDERS, + useValue: [disconnectedProvider, connectedProvider], + }, + ], + }).compile(); + + const herald = module.get(HeraldService); + + await herald.broadcastJobEvent("job-skip-001", { + id: "evt-002", + jobId: "job-skip-001", + type: JOB_CREATED, + timestamp: new Date(), + actor: "stitcher", + payload: {}, + }); + + // Disconnected provider should NOT have received message + expect(disconnectedProvider.sendThreadMessage).not.toHaveBeenCalled(); + // Connected provider SHOULD have received message + expect(connectedProvider.sendThreadMessage).toHaveBeenCalledTimes(1); + }); + + it("should continue broadcasting to other providers if one throws an error", async () => { + const failingProvider: IChatProvider = { + connect: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + isConnected: vi.fn().mockReturnValue(true), + sendMessage: vi.fn().mockResolvedValue(undefined), + createThread: vi.fn().mockResolvedValue("$t"), + sendThreadMessage: vi.fn().mockRejectedValue(new Error("Network failure")), + parseCommand: vi.fn().mockReturnValue(null), + }; + + const healthyProvider: IChatProvider = { + connect: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + isConnected: vi.fn().mockReturnValue(true), + sendMessage: vi.fn().mockResolvedValue(undefined), + createThread: vi.fn().mockResolvedValue("$t"), + sendThreadMessage: vi.fn().mockResolvedValue(undefined), + parseCommand: vi.fn().mockReturnValue(null), + }; + + const mockPrisma = { + runnerJob: { + findUnique: vi.fn().mockResolvedValue({ + id: "job-err-001", + workspaceId: "ws-err", + type: "code-task", + }), + }, + jobEvent: { + findFirst: vi.fn().mockResolvedValue({ + payload: { + metadata: { + threadId: "$thread-err", + issueNumber: 77, + }, + }, + }), + }, + }; + + const module = await Test.createTestingModule({ + providers: [ + HeraldService, + { + provide: PrismaService, + useValue: mockPrisma, + }, + { + provide: CHAT_PROVIDERS, + useValue: [failingProvider, healthyProvider], + }, + ], + }).compile(); + + const herald = module.get(HeraldService); + + // Should not throw even though first provider fails + await expect( + herald.broadcastJobEvent("job-err-001", { + id: "evt-003", + jobId: "job-err-001", + type: JOB_STARTED, + timestamp: new Date(), + actor: "stitcher", + payload: {}, + }) + ).resolves.toBeUndefined(); + + // Both providers should have been attempted + expect(failingProvider.sendThreadMessage).toHaveBeenCalledTimes(1); + expect(healthyProvider.sendThreadMessage).toHaveBeenCalledTimes(1); + }); + }); + + // ========================================================================= + // Scenario 5: Room-workspace mapping integration + // ========================================================================= + describe("Room-workspace mapping: MatrixRoomService -> MatrixService", () => { + let matrixService: MatrixService; + + const mockStitcher = { + dispatchJob: vi.fn().mockResolvedValue({ + jobId: "job-room-001", + queueName: "main", + status: "PENDING", + }), + trackJobEvent: vi.fn().mockResolvedValue(undefined), + }; + + const mockPrisma = { + workspace: { + findFirst: vi.fn(), + findUnique: vi.fn(), + update: vi.fn(), + }, + }; + + beforeEach(async () => { + setMatrixEnv(); + + const module = await Test.createTestingModule({ + providers: [ + MatrixService, + CommandParserService, + MatrixRoomService, + { + provide: StitcherService, + useValue: mockStitcher, + }, + { + provide: PrismaService, + useValue: mockPrisma, + }, + ], + }).compile(); + + matrixService = module.get(MatrixService); + }); + + it("should resolve workspace from MatrixRoomService's Prisma lookup and dispatch command", async () => { + // Mock Prisma: room maps to workspace + mockPrisma.workspace.findFirst.mockResolvedValue({ id: "ws-prisma-resolved" }); + + await matrixService.connect(); + + const callback = mockMatrixMessageCallbacks[0]; + callback?.("!mapped-room:example.com", { + event_id: "$ev-room-map", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic fix #77", + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // MatrixRoomService should have queried Prisma with the room ID + expect(mockPrisma.workspace.findFirst).toHaveBeenCalledWith({ + where: { matrixRoomId: "!mapped-room:example.com" }, + select: { id: true }, + }); + + // StitcherService should have been called with the resolved workspace + expect(mockStitcher.dispatchJob).toHaveBeenCalledWith( + expect.objectContaining({ + workspaceId: "ws-prisma-resolved", + }) + ); + }); + + it("should fall back to control room workspace when room is not mapped in Prisma", async () => { + // Prisma returns no workspace for arbitrary rooms + mockPrisma.workspace.findFirst.mockResolvedValue(null); + + await matrixService.connect(); + + const callback = mockMatrixMessageCallbacks[0]; + // Send to the control room (which is !control-room:example.com from setMatrixEnv) + callback?.("!control-room:example.com", { + event_id: "$ev-control-fallback", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic fix #5", + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + // Should use the env-configured workspace ID as fallback + expect(mockStitcher.dispatchJob).toHaveBeenCalledWith( + expect.objectContaining({ + workspaceId: "ws-integration-test", + }) + ); + }); + + it("should ignore messages in unmapped rooms that are not the control room", async () => { + mockPrisma.workspace.findFirst.mockResolvedValue(null); + + await matrixService.connect(); + + const callback = mockMatrixMessageCallbacks[0]; + callback?.("!unknown-room:example.com", { + event_id: "$ev-unmapped", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic fix #1", + }, + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(mockStitcher.dispatchJob).not.toHaveBeenCalled(); + }); + }); + + // ========================================================================= + // Scenario 6: Streaming flow - MatrixStreamingService via MatrixService's client + // ========================================================================= + describe("Streaming flow: MatrixStreamingService via MatrixService client", () => { + let streamingService: MatrixStreamingService; + let matrixService: MatrixService; + + const mockStitcher = { + dispatchJob: vi.fn().mockResolvedValue({ + jobId: "job-stream-001", + queueName: "main", + status: "PENDING", + }), + }; + + const mockRoomService = { + getWorkspaceForRoom: vi.fn().mockResolvedValue(null), + getRoomForWorkspace: vi.fn().mockResolvedValue(null), + provisionRoom: vi.fn().mockResolvedValue(null), + linkWorkspaceToRoom: vi.fn().mockResolvedValue(undefined), + unlinkWorkspace: vi.fn().mockResolvedValue(undefined), + }; + + beforeEach(async () => { + setMatrixEnv(); + + const module = await Test.createTestingModule({ + providers: [ + MatrixService, + MatrixStreamingService, + CommandParserService, + { + provide: StitcherService, + useValue: mockStitcher, + }, + { + provide: MatrixRoomService, + useValue: mockRoomService, + }, + ], + }).compile(); + + matrixService = module.get(MatrixService); + streamingService = module.get(MatrixStreamingService); + }); + + it("should use the real MatrixService's client for streaming operations", async () => { + // Connect MatrixService so the client is available + await matrixService.connect(); + + // Verify the client is available via getClient + const client = matrixService.getClient(); + expect(client).not.toBeNull(); + + // Verify MatrixStreamingService can use the client + expect(matrixService.isConnected()).toBe(true); + }); + + it("should stream response through MatrixStreamingService using MatrixService connection", async () => { + await matrixService.connect(); + + const tokens = ["Hello", " ", "world"]; + const stream = createTokenStream(tokens); + + await streamingService.streamResponse("!room:example.com", stream); + + // Verify initial message was sent via the client + expect(mockMatrixClient.sendMessage).toHaveBeenCalledWith( + "!room:example.com", + expect.objectContaining({ + msgtype: "m.text", + body: "Thinking...", + }) + ); + + // Verify typing indicator was managed + expect(mockMatrixClient.setTyping).toHaveBeenCalledWith("!room:example.com", true, 30000); + // Last setTyping call should clear the indicator + const typingCalls = mockMatrixClient.setTyping.mock.calls; + const lastTypingCall = typingCalls[typingCalls.length - 1]; + expect(lastTypingCall).toEqual(["!room:example.com", false, undefined]); + + // Verify the final edit contains accumulated text + const editCalls = mockMatrixClient.sendEvent.mock.calls; + expect(editCalls.length).toBeGreaterThanOrEqual(1); + const lastEditCall = editCalls[editCalls.length - 1]; + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + expect(lastEditCall[2]["m.new_content"].body).toBe("Hello world"); + }); + + it("should throw when streaming without a connected MatrixService", async () => { + // Do NOT connect MatrixService + const stream = createTokenStream(["test"]); + + await expect(streamingService.streamResponse("!room:example.com", stream)).rejects.toThrow( + "Matrix client is not connected" + ); + }); + + it("should support threaded streaming via MatrixStreamingService", async () => { + await matrixService.connect(); + + const tokens = ["Threaded", " ", "reply"]; + const stream = createTokenStream(tokens); + + await streamingService.streamResponse("!room:example.com", stream, { + threadId: "$thread-root-event", + }); + + // Initial message should include thread relation + expect(mockMatrixClient.sendMessage).toHaveBeenCalledWith( + "!room:example.com", + expect.objectContaining({ + "m.relates_to": expect.objectContaining({ + rel_type: "m.thread", + event_id: "$thread-root-event", + }), + }) + ); + }); + }); + + // ========================================================================= + // Scenario 7: Multi-provider coexistence + // ========================================================================= + describe("Multi-provider coexistence: Discord + Matrix", () => { + it("should include both DiscordService and MatrixService in CHAT_PROVIDERS when both tokens are set", async () => { + setDiscordEnv(); + setMatrixEnv(); + + const module = await compileBridgeModule(); + + const providers = module.get(CHAT_PROVIDERS); + + expect(providers).toHaveLength(2); + + const discordProvider = providers.find((p) => p instanceof DiscordService); + const matrixProvider = providers.find((p) => p instanceof MatrixService); + + expect(discordProvider).toBeInstanceOf(DiscordService); + expect(matrixProvider).toBeInstanceOf(MatrixService); + }); + + it("should maintain correct provider order: Discord first, then Matrix", async () => { + setDiscordEnv(); + setMatrixEnv(); + + const module = await compileBridgeModule(); + + const providers = module.get(CHAT_PROVIDERS); + + // The factory pushes Discord first, then Matrix (based on BridgeModule order) + expect(providers[0]).toBeInstanceOf(DiscordService); + expect(providers[1]).toBeInstanceOf(MatrixService); + }); + + it("should share the same CommandParserService and StitcherService across both providers", async () => { + setDiscordEnv(); + setMatrixEnv(); + + const module = await compileBridgeModule(); + + const discordService = module.get(DiscordService); + const matrixService = module.get(MatrixService); + const stitcher = module.get(StitcherService); + const parser = module.get(CommandParserService); + + // Both services exist and are distinct instances + expect(discordService).toBeDefined(); + expect(matrixService).toBeDefined(); + expect(discordService).not.toBe(matrixService); + + // Shared singletons + expect(stitcher).toBeDefined(); + expect(parser).toBeDefined(); + }); + + it("should include only DiscordService when MATRIX_ACCESS_TOKEN is unset", async () => { + setDiscordEnv(); + // No Matrix env vars + + const module = await compileBridgeModule(); + + const providers = module.get(CHAT_PROVIDERS); + + expect(providers).toHaveLength(1); + expect(providers[0]).toBeInstanceOf(DiscordService); + }); + + it("should include only MatrixService when DISCORD_BOT_TOKEN is unset", async () => { + setMatrixEnv(); + // No Discord env vars + + const module = await compileBridgeModule(); + + const providers = module.get(CHAT_PROVIDERS); + + expect(providers).toHaveLength(1); + expect(providers[0]).toBeInstanceOf(MatrixService); + }); + }); +});