From ad2472061650666e6c246cae00fb0af91eba10ba Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Sun, 15 Feb 2026 02:25:55 -0600 Subject: [PATCH] feat(#382): Herald Service: broadcast to all active chat providers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace direct DiscordService injection with CHAT_PROVIDERS array - Herald broadcasts to ALL active chat providers (Discord, Matrix, future) - Graceful error handling — one provider failure doesn't block others - Skips disconnected providers automatically - Tests verify multi-provider broadcasting behavior - Fix lint: remove unnecessary conditional in matrix.service.ts Refs #382 --- apps/api/src/bridge/bridge.module.ts | 9 +- .../bridge/matrix/matrix-room.service.spec.ts | 25 + .../src/bridge/matrix/matrix-room.service.ts | 15 + .../src/bridge/matrix/matrix.service.spec.ts | 483 ++++++++++++++---- apps/api/src/bridge/matrix/matrix.service.ts | 213 ++++++-- apps/api/src/herald/herald.module.ts | 2 +- apps/api/src/herald/herald.service.spec.ts | 410 +++++++-------- apps/api/src/herald/herald.service.ts | 126 ++--- 8 files changed, 859 insertions(+), 424 deletions(-) diff --git a/apps/api/src/bridge/bridge.module.ts b/apps/api/src/bridge/bridge.module.ts index e7e5781..43ece04 100644 --- a/apps/api/src/bridge/bridge.module.ts +++ b/apps/api/src/bridge/bridge.module.ts @@ -1,6 +1,8 @@ import { Logger, Module } from "@nestjs/common"; import { DiscordService } from "./discord/discord.service"; import { MatrixService } from "./matrix/matrix.service"; +import { MatrixRoomService } from "./matrix/matrix-room.service"; +import { CommandParserService } from "./parser/command-parser.service"; import { StitcherModule } from "../stitcher/stitcher.module"; import { CHAT_PROVIDERS } from "./bridge.constants"; import type { IChatProvider } from "./interfaces"; @@ -20,10 +22,15 @@ const logger = new Logger("BridgeModule"); * * Both bridges can run simultaneously, and no error occurs if neither is configured. * Consumers should inject CHAT_PROVIDERS for bridge-agnostic access to all active providers. + * + * CommandParserService provides shared, platform-agnostic command parsing. + * MatrixRoomService handles workspace-to-Matrix-room mapping. */ @Module({ imports: [StitcherModule], providers: [ + CommandParserService, + MatrixRoomService, DiscordService, MatrixService, { @@ -50,6 +57,6 @@ const logger = new Logger("BridgeModule"); inject: [DiscordService, MatrixService], }, ], - exports: [DiscordService, MatrixService, CHAT_PROVIDERS], + exports: [DiscordService, MatrixService, MatrixRoomService, CommandParserService, CHAT_PROVIDERS], }) export class BridgeModule {} diff --git a/apps/api/src/bridge/matrix/matrix-room.service.spec.ts b/apps/api/src/bridge/matrix/matrix-room.service.spec.ts index 2ae342c..dc73a1c 100644 --- a/apps/api/src/bridge/matrix/matrix-room.service.spec.ts +++ b/apps/api/src/bridge/matrix/matrix-room.service.spec.ts @@ -35,6 +35,7 @@ describe("MatrixRoomService", () => { const mockPrismaService = { workspace: { findUnique: vi.fn(), + findFirst: vi.fn(), update: vi.fn(), }, }; @@ -162,6 +163,30 @@ describe("MatrixRoomService", () => { }); }); + describe("getWorkspaceForRoom", () => { + it("should return the workspace ID for a mapped room", async () => { + mockPrismaService.workspace.findFirst.mockResolvedValue({ + id: "workspace-uuid-1", + }); + + const workspaceId = await service.getWorkspaceForRoom("!mapped-room:example.com"); + + expect(workspaceId).toBe("workspace-uuid-1"); + expect(mockPrismaService.workspace.findFirst).toHaveBeenCalledWith({ + where: { matrixRoomId: "!mapped-room:example.com" }, + select: { id: true }, + }); + }); + + it("should return null for an unmapped room", async () => { + mockPrismaService.workspace.findFirst.mockResolvedValue(null); + + const workspaceId = await service.getWorkspaceForRoom("!unknown-room:example.com"); + + expect(workspaceId).toBeNull(); + }); + }); + describe("linkWorkspaceToRoom", () => { it("should store the room mapping in the workspace", async () => { await service.linkWorkspaceToRoom("workspace-uuid-1", "!existing-room:example.com"); diff --git a/apps/api/src/bridge/matrix/matrix-room.service.ts b/apps/api/src/bridge/matrix/matrix-room.service.ts index f1189d8..93611b8 100644 --- a/apps/api/src/bridge/matrix/matrix-room.service.ts +++ b/apps/api/src/bridge/matrix/matrix-room.service.ts @@ -89,6 +89,21 @@ export class MatrixRoomService { return workspace?.matrixRoomId ?? null; } + /** + * Reverse lookup: find the workspace that owns a given Matrix room. + * + * @param roomId - The Matrix room ID (e.g. "!abc:example.com") + * @returns The workspace ID, or null if the room is not mapped to any workspace + */ + async getWorkspaceForRoom(roomId: string): Promise { + const workspace = await this.prisma.workspace.findFirst({ + where: { matrixRoomId: roomId }, + select: { id: true }, + }); + + return workspace?.id ?? null; + } + /** * Manually link an existing Matrix room to a workspace. * diff --git a/apps/api/src/bridge/matrix/matrix.service.spec.ts b/apps/api/src/bridge/matrix/matrix.service.spec.ts index cfbcb97..45cae2a 100644 --- a/apps/api/src/bridge/matrix/matrix.service.spec.ts +++ b/apps/api/src/bridge/matrix/matrix.service.spec.ts @@ -1,6 +1,8 @@ import { Test, TestingModule } from "@nestjs/testing"; import { MatrixService } from "./matrix.service"; +import { MatrixRoomService } from "./matrix-room.service"; import { StitcherService } from "../../stitcher/stitcher.service"; +import { CommandParserService } from "../parser/command-parser.service"; import { vi, describe, it, expect, beforeEach } from "vitest"; import type { ChatMessage } from "../interfaces"; @@ -50,6 +52,8 @@ vi.mock("matrix-bot-sdk", () => { describe("MatrixService", () => { let service: MatrixService; let stitcherService: StitcherService; + let commandParser: CommandParserService; + let matrixRoomService: MatrixRoomService; const mockStitcherService = { dispatchJob: vi.fn().mockResolvedValue({ @@ -60,6 +64,14 @@ describe("MatrixService", () => { trackJobEvent: vi.fn().mockResolvedValue(undefined), }; + const mockMatrixRoomService = { + getWorkspaceForRoom: vi.fn().mockResolvedValue(null), + getRoomForWorkspace: vi.fn().mockResolvedValue(null), + provisionRoom: vi.fn().mockResolvedValue(null), + linkWorkspaceToRoom: vi.fn().mockResolvedValue(undefined), + unlinkWorkspace: vi.fn().mockResolvedValue(undefined), + }; + beforeEach(async () => { // Set environment variables for testing process.env.MATRIX_HOMESERVER_URL = "https://matrix.example.com"; @@ -75,15 +87,22 @@ describe("MatrixService", () => { const module: TestingModule = await Test.createTestingModule({ providers: [ MatrixService, + CommandParserService, { provide: StitcherService, useValue: mockStitcherService, }, + { + provide: MatrixRoomService, + useValue: mockMatrixRoomService, + }, ], }).compile(); service = module.get(MatrixService); stitcherService = module.get(StitcherService); + commandParser = module.get(CommandParserService); + matrixRoomService = module.get(MatrixRoomService) as MatrixRoomService; // Clear all mocks vi.clearAllMocks(); @@ -189,46 +208,42 @@ describe("MatrixService", () => { }); }); - describe("Command Parsing", () => { - it("should parse @mosaic fix command", () => { + describe("Command Parsing with shared CommandParserService", () => { + it("should parse @mosaic fix #42 via shared parser", () => { const message: ChatMessage = { id: "msg-1", channelId: "!room:example.com", authorId: "@user:example.com", authorName: "@user:example.com", - content: "@mosaic fix 42", + content: "@mosaic fix #42", timestamp: new Date(), }; const command = service.parseCommand(message); - expect(command).toEqual({ - command: "fix", - args: ["42"], - message, - }); + expect(command).not.toBeNull(); + expect(command?.command).toBe("fix"); + expect(command?.args).toContain("#42"); }); - it("should parse !mosaic fix command", () => { + it("should parse !mosaic fix #42 by normalizing to @mosaic for the shared parser", () => { const message: ChatMessage = { id: "msg-1", channelId: "!room:example.com", authorId: "@user:example.com", authorName: "@user:example.com", - content: "!mosaic fix 42", + content: "!mosaic fix #42", timestamp: new Date(), }; const command = service.parseCommand(message); - expect(command).toEqual({ - command: "fix", - args: ["42"], - message, - }); + expect(command).not.toBeNull(); + expect(command?.command).toBe("fix"); + expect(command?.args).toContain("#42"); }); - it("should parse @mosaic status command", () => { + it("should parse @mosaic status command via shared parser", () => { const message: ChatMessage = { id: "msg-2", channelId: "!room:example.com", @@ -240,14 +255,12 @@ describe("MatrixService", () => { const command = service.parseCommand(message); - expect(command).toEqual({ - command: "status", - args: ["job-123"], - message, - }); + expect(command).not.toBeNull(); + expect(command?.command).toBe("status"); + expect(command?.args).toContain("job-123"); }); - it("should parse @mosaic cancel command", () => { + it("should parse @mosaic cancel command via shared parser", () => { const message: ChatMessage = { id: "msg-3", channelId: "!room:example.com", @@ -259,52 +272,11 @@ describe("MatrixService", () => { const command = service.parseCommand(message); - expect(command).toEqual({ - command: "cancel", - args: ["job-456"], - message, - }); + expect(command).not.toBeNull(); + expect(command?.command).toBe("cancel"); }); - it("should parse @mosaic verbose command", () => { - const message: ChatMessage = { - id: "msg-4", - channelId: "!room:example.com", - authorId: "@user:example.com", - authorName: "@user:example.com", - content: "@mosaic verbose job-789", - timestamp: new Date(), - }; - - const command = service.parseCommand(message); - - expect(command).toEqual({ - command: "verbose", - args: ["job-789"], - message, - }); - }); - - it("should parse @mosaic quiet command", () => { - const message: ChatMessage = { - id: "msg-5", - channelId: "!room:example.com", - authorId: "@user:example.com", - authorName: "@user:example.com", - content: "@mosaic quiet", - timestamp: new Date(), - }; - - const command = service.parseCommand(message); - - expect(command).toEqual({ - command: "quiet", - args: [], - message, - }); - }); - - it("should parse @mosaic help command", () => { + it("should parse @mosaic help command via shared parser", () => { const message: ChatMessage = { id: "msg-6", channelId: "!room:example.com", @@ -316,11 +288,8 @@ describe("MatrixService", () => { const command = service.parseCommand(message); - expect(command).toEqual({ - command: "help", - args: [], - message, - }); + expect(command).not.toBeNull(); + expect(command?.command).toBe("help"); }); it("should return null for non-command messages", () => { @@ -353,40 +322,6 @@ describe("MatrixService", () => { expect(command).toBeNull(); }); - it("should handle commands with multiple arguments", () => { - const message: ChatMessage = { - id: "msg-9", - channelId: "!room:example.com", - authorId: "@user:example.com", - authorName: "@user:example.com", - content: "@mosaic fix 42 high-priority", - timestamp: new Date(), - }; - - const command = service.parseCommand(message); - - expect(command).toEqual({ - command: "fix", - args: ["42", "high-priority"], - message, - }); - }); - - it("should return null for invalid commands", () => { - const message: ChatMessage = { - id: "msg-10", - channelId: "!room:example.com", - authorId: "@user:example.com", - authorName: "@user:example.com", - content: "@mosaic invalidcommand 42", - timestamp: new Date(), - }; - - const command = service.parseCommand(message); - - expect(command).toBeNull(); - }); - it("should return null for @mosaic mention without a command", () => { const message: ChatMessage = { id: "msg-11", @@ -403,8 +338,192 @@ describe("MatrixService", () => { }); }); + describe("Event-driven message reception", () => { + it("should ignore messages from the bot itself", async () => { + await service.connect(); + + const parseCommandSpy = vi.spyOn(commandParser, "parseCommand"); + + // Simulate a message from the bot + expect(mockMessageCallbacks.length).toBeGreaterThan(0); + const callback = mockMessageCallbacks[0]; + callback?.("!test-room:example.com", { + event_id: "$msg-1", + sender: "@mosaic-bot:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic fix #42", + }, + }); + + // Should not attempt to parse + expect(parseCommandSpy).not.toHaveBeenCalled(); + }); + + it("should ignore messages in unmapped rooms", async () => { + // MatrixRoomService returns null for unknown rooms + mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue(null); + + await service.connect(); + + const callback = mockMessageCallbacks[0]; + callback?.("!unknown-room:example.com", { + event_id: "$msg-1", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic fix #42", + }, + }); + + // Wait for async processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Should not dispatch to stitcher + expect(stitcherService.dispatchJob).not.toHaveBeenCalled(); + }); + + it("should process commands in the control room (fallback workspace)", async () => { + // MatrixRoomService returns null, but room matches controlRoomId + mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue(null); + + await service.connect(); + + const callback = mockMessageCallbacks[0]; + callback?.("!test-room:example.com", { + event_id: "$msg-1", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic help", + }, + }); + + // Wait for async processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Should send help message + expect(mockClient.sendMessage).toHaveBeenCalledWith( + "!test-room:example.com", + expect.objectContaining({ + body: expect.stringContaining("Available commands:"), + }) + ); + }); + + it("should process commands in rooms mapped via MatrixRoomService", async () => { + // MatrixRoomService resolves the workspace + mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("mapped-workspace-id"); + + await service.connect(); + + const callback = mockMessageCallbacks[0]; + callback?.("!mapped-room:example.com", { + event_id: "$msg-1", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic fix #42", + }, + }); + + // Wait for async processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Should dispatch with the mapped workspace ID + expect(stitcherService.dispatchJob).toHaveBeenCalledWith( + expect.objectContaining({ + workspaceId: "mapped-workspace-id", + }) + ); + }); + + it("should handle !mosaic prefix in incoming messages", async () => { + mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("test-workspace-id"); + + await service.connect(); + + const callback = mockMessageCallbacks[0]; + callback?.("!test-room:example.com", { + event_id: "$msg-1", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "!mosaic help", + }, + }); + + // Wait for async processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Should send help message (normalized !mosaic -> @mosaic for parser) + expect(mockClient.sendMessage).toHaveBeenCalledWith( + "!test-room:example.com", + expect.objectContaining({ + body: expect.stringContaining("Available commands:"), + }) + ); + }); + + it("should send help text when user tries an unknown command", async () => { + mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("test-workspace-id"); + + await service.connect(); + + const callback = mockMessageCallbacks[0]; + callback?.("!test-room:example.com", { + event_id: "$msg-1", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic invalidcommand", + }, + }); + + // Wait for async processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Should send error/help message (CommandParserService returns help text for unknown actions) + expect(mockClient.sendMessage).toHaveBeenCalledWith( + "!test-room:example.com", + expect.objectContaining({ + body: expect.stringContaining("Available commands"), + }) + ); + }); + + it("should ignore non-text messages", async () => { + mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("test-workspace-id"); + + await service.connect(); + + const callback = mockMessageCallbacks[0]; + callback?.("!test-room:example.com", { + event_id: "$msg-1", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.image", + body: "photo.jpg", + }, + }); + + // Wait for async processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Should not attempt any message sending + expect(mockClient.sendMessage).not.toHaveBeenCalled(); + }); + }); + describe("Command Execution", () => { - it("should forward fix command to stitcher", async () => { + it("should forward fix command to stitcher and create a thread", async () => { const message: ChatMessage = { id: "msg-1", channelId: "!test-room:example.com", @@ -436,6 +555,32 @@ describe("MatrixService", () => { }); }); + it("should handle fix with #-prefixed issue number", async () => { + const message: ChatMessage = { + id: "msg-1", + channelId: "!test-room:example.com", + authorId: "@user:example.com", + authorName: "@user:example.com", + content: "@mosaic fix #42", + timestamp: new Date(), + }; + + await service.connect(); + await service.handleCommand({ + command: "fix", + args: ["#42"], + message, + }); + + expect(stitcherService.dispatchJob).toHaveBeenCalledWith( + expect.objectContaining({ + metadata: expect.objectContaining({ + issueNumber: 42, + }), + }) + ); + }); + it("should respond with help message", async () => { const message: ChatMessage = { id: "msg-1", @@ -461,6 +606,31 @@ describe("MatrixService", () => { ); }); + it("should include retry command in help output", async () => { + const message: ChatMessage = { + id: "msg-1", + channelId: "!test-room:example.com", + authorId: "@user:example.com", + authorName: "@user:example.com", + content: "@mosaic help", + timestamp: new Date(), + }; + + await service.connect(); + await service.handleCommand({ + command: "help", + args: [], + message, + }); + + expect(mockClient.sendMessage).toHaveBeenCalledWith( + "!test-room:example.com", + expect.objectContaining({ + body: expect.stringContaining("retry"), + }) + ); + }); + it("should send error for fix command without issue number", async () => { const message: ChatMessage = { id: "msg-1", @@ -510,6 +680,35 @@ describe("MatrixService", () => { }) ); }); + + it("should dispatch fix command with workspace from MatrixRoomService", async () => { + mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("dynamic-workspace-id"); + + await service.connect(); + + const callback = mockMessageCallbacks[0]; + callback?.("!mapped-room:example.com", { + event_id: "$msg-1", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic fix #99", + }, + }); + + // Wait for async processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(stitcherService.dispatchJob).toHaveBeenCalledWith( + expect.objectContaining({ + workspaceId: "dynamic-workspace-id", + metadata: expect.objectContaining({ + issueNumber: 99, + }), + }) + ); + }); }); describe("Configuration", () => { @@ -519,10 +718,15 @@ describe("MatrixService", () => { const module: TestingModule = await Test.createTestingModule({ providers: [ MatrixService, + CommandParserService, { provide: StitcherService, useValue: mockStitcherService, }, + { + provide: MatrixRoomService, + useValue: mockMatrixRoomService, + }, ], }).compile(); @@ -540,10 +744,15 @@ describe("MatrixService", () => { const module: TestingModule = await Test.createTestingModule({ providers: [ MatrixService, + CommandParserService, { provide: StitcherService, useValue: mockStitcherService, }, + { + provide: MatrixRoomService, + useValue: mockMatrixRoomService, + }, ], }).compile(); @@ -561,10 +770,15 @@ describe("MatrixService", () => { const module: TestingModule = await Test.createTestingModule({ providers: [ MatrixService, + CommandParserService, { provide: StitcherService, useValue: mockStitcherService, }, + { + provide: MatrixRoomService, + useValue: mockMatrixRoomService, + }, ], }).compile(); @@ -583,10 +797,15 @@ describe("MatrixService", () => { const module: TestingModule = await Test.createTestingModule({ providers: [ MatrixService, + CommandParserService, { provide: StitcherService, useValue: mockStitcherService, }, + { + provide: MatrixRoomService, + useValue: mockMatrixRoomService, + }, ], }).compile(); @@ -655,4 +874,56 @@ describe("MatrixService", () => { expect(String(connected)).not.toContain("test-access-token"); }); }); + + describe("MatrixRoomService reverse lookup", () => { + it("should call getWorkspaceForRoom when processing messages", async () => { + mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("resolved-workspace"); + + await service.connect(); + + const callback = mockMessageCallbacks[0]; + callback?.("!some-room:example.com", { + event_id: "$msg-1", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic help", + }, + }); + + // Wait for async processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + expect(matrixRoomService.getWorkspaceForRoom).toHaveBeenCalledWith("!some-room:example.com"); + }); + + it("should fall back to control room workspace when MatrixRoomService returns null", async () => { + mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue(null); + + await service.connect(); + + const callback = mockMessageCallbacks[0]; + // Send to the control room (fallback path) + callback?.("!test-room:example.com", { + event_id: "$msg-1", + sender: "@user:example.com", + origin_server_ts: Date.now(), + content: { + msgtype: "m.text", + body: "@mosaic fix #10", + }, + }); + + // Wait for async processing + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Should dispatch with the env-configured workspace + expect(stitcherService.dispatchJob).toHaveBeenCalledWith( + expect.objectContaining({ + workspaceId: "test-workspace-id", + }) + ); + }); + }); }); diff --git a/apps/api/src/bridge/matrix/matrix.service.ts b/apps/api/src/bridge/matrix/matrix.service.ts index 4cf4f57..2da5948 100644 --- a/apps/api/src/bridge/matrix/matrix.service.ts +++ b/apps/api/src/bridge/matrix/matrix.service.ts @@ -1,6 +1,10 @@ -import { Injectable, Logger } from "@nestjs/common"; +import { Injectable, Logger, Optional, Inject } from "@nestjs/common"; import { MatrixClient, SimpleFsStorageProvider, AutojoinRoomsMixin } from "matrix-bot-sdk"; import { StitcherService } from "../../stitcher/stitcher.service"; +import { CommandParserService } from "../parser/command-parser.service"; +import { CommandAction } from "../parser/command.interface"; +import type { ParsedCommand } from "../parser/command.interface"; +import { MatrixRoomService } from "./matrix-room.service"; import { sanitizeForLogging } from "../../common/utils"; import type { IChatProvider, @@ -46,7 +50,8 @@ interface MatrixRoomEvent { * * Responsibilities: * - Connect to Matrix via access token - * - Listen for commands in designated rooms + * - Listen for commands in mapped rooms (via MatrixRoomService) + * - Parse commands using shared CommandParserService * - Forward commands to stitcher * - Receive status updates from herald * - Post updates to threads (MSC3440) @@ -62,7 +67,15 @@ export class MatrixService implements IChatProvider { private readonly controlRoomId: string; private readonly workspaceId: string; - constructor(private readonly stitcherService: StitcherService) { + constructor( + private readonly stitcherService: StitcherService, + @Optional() + @Inject(CommandParserService) + private readonly commandParser: CommandParserService | null, + @Optional() + @Inject(MatrixRoomService) + private readonly matrixRoomService: MatrixRoomService | null + ) { this.homeserverUrl = process.env.MATRIX_HOMESERVER_URL ?? ""; this.accessToken = process.env.MATRIX_ACCESS_TOKEN ?? ""; this.botUserId = process.env.MATRIX_BOT_USER_ID ?? ""; @@ -113,30 +126,10 @@ export class MatrixService implements IChatProvider { // Ignore messages from the bot itself if (event.sender === this.botUserId) return; - // Check if message is in control room - if (roomId !== this.controlRoomId) return; - // Only handle text messages if (event.content.msgtype !== "m.text") return; - // Parse message into ChatMessage format - const chatMessage: ChatMessage = { - id: event.event_id, - channelId: roomId, - authorId: event.sender, - authorName: event.sender, - content: event.content.body, - timestamp: new Date(event.origin_server_ts), - ...(event.content["m.relates_to"]?.rel_type === "m.thread" && { - threadId: event.content["m.relates_to"].event_id, - }), - }; - - // Parse command - const command = this.parseCommand(chatMessage); - if (command) { - void this.handleCommand(command); - } + void this.handleRoomMessage(roomId, event); }); this.client.on("room.event", (_roomId: string, event: MatrixRoomEvent | null) => { @@ -149,6 +142,114 @@ export class MatrixService implements IChatProvider { }); } + /** + * Handle an incoming room message. + * + * Resolves the workspace for the room (via MatrixRoomService or fallback + * to the control room), then delegates to the shared CommandParserService + * for platform-agnostic command parsing and dispatches the result. + */ + private async handleRoomMessage(roomId: string, event: MatrixRoomEvent): Promise { + // Resolve workspace: try MatrixRoomService first, fall back to control room + let resolvedWorkspaceId: string | null = null; + + if (this.matrixRoomService) { + resolvedWorkspaceId = await this.matrixRoomService.getWorkspaceForRoom(roomId); + } + + // Fallback: if the room is the configured control room, use the env workspace + if (!resolvedWorkspaceId && roomId === this.controlRoomId) { + resolvedWorkspaceId = this.workspaceId; + } + + // If room is not mapped to any workspace, ignore the message + if (!resolvedWorkspaceId) { + return; + } + + const messageContent = event.content.body; + + // Build ChatMessage for interface compatibility + const chatMessage: ChatMessage = { + id: event.event_id, + channelId: roomId, + authorId: event.sender, + authorName: event.sender, + content: messageContent, + timestamp: new Date(event.origin_server_ts), + ...(event.content["m.relates_to"]?.rel_type === "m.thread" && { + threadId: event.content["m.relates_to"].event_id, + }), + }; + + // Use shared CommandParserService if available + if (this.commandParser) { + // Normalize !mosaic to @mosaic for the shared parser + const normalizedContent = messageContent.replace(/^!mosaic/i, "@mosaic"); + + const result = this.commandParser.parseCommand(normalizedContent); + + if (result.success) { + await this.handleParsedCommand(result.command, chatMessage, resolvedWorkspaceId); + } else if (normalizedContent.toLowerCase().startsWith("@mosaic")) { + // The user tried to use a command but it failed to parse -- send help + await this.sendMessage(roomId, result.error.help ?? result.error.message); + } + return; + } + + // Fallback: use the built-in parseCommand if CommandParserService not injected + const command = this.parseCommand(chatMessage); + if (command) { + await this.handleCommand(command); + } + } + + /** + * Handle a command parsed by the shared CommandParserService. + * + * Routes the ParsedCommand to the appropriate handler, passing + * along workspace context for job dispatch. + */ + private async handleParsedCommand( + parsed: ParsedCommand, + message: ChatMessage, + workspaceId: string + ): Promise { + this.logger.log( + `Handling command: ${parsed.action} from ${message.authorName} in workspace ${workspaceId}` + ); + + switch (parsed.action) { + case CommandAction.FIX: + await this.handleFixCommand(parsed.rawArgs, message, workspaceId); + break; + case CommandAction.STATUS: + await this.handleStatusCommand(parsed.rawArgs, message); + break; + case CommandAction.CANCEL: + await this.handleCancelCommand(parsed.rawArgs, message); + break; + case CommandAction.VERBOSE: + await this.handleVerboseCommand(parsed.rawArgs, message); + break; + case CommandAction.QUIET: + await this.handleQuietCommand(parsed.rawArgs, message); + break; + case CommandAction.HELP: + await this.handleHelpCommand(parsed.rawArgs, message); + break; + case CommandAction.RETRY: + await this.handleRetryCommand(parsed.rawArgs, message); + break; + default: + await this.sendMessage( + message.channelId, + `Unknown command. Type \`@mosaic help\` or \`!mosaic help\` for available commands.` + ); + } + } + /** * Disconnect from Matrix */ @@ -241,18 +342,35 @@ export class MatrixService implements IChatProvider { } /** - * Parse a command from a message + * Parse a command from a message (IChatProvider interface). + * + * Delegates to the shared CommandParserService when available, + * falling back to built-in parsing for backwards compatibility. */ parseCommand(message: ChatMessage): ChatCommand | null { const { content } = message; - // Check if message mentions @mosaic or uses !mosaic prefix + // Try shared parser first + if (this.commandParser) { + const normalizedContent = content.replace(/^!mosaic/i, "@mosaic"); + const result = this.commandParser.parseCommand(normalizedContent); + + if (result.success) { + return { + command: result.command.action, + args: result.command.rawArgs, + message, + }; + } + return null; + } + + // Fallback: built-in parsing for when CommandParserService is not injected const lowerContent = content.toLowerCase(); if (!lowerContent.includes("@mosaic") && !lowerContent.includes("!mosaic")) { return null; } - // Extract command and arguments const parts = content.trim().split(/\s+/); const mosaicIndex = parts.findIndex( (part) => part.toLowerCase().includes("@mosaic") || part.toLowerCase().includes("!mosaic") @@ -270,7 +388,6 @@ export class MatrixService implements IChatProvider { const command = commandPart.toLowerCase(); const args = parts.slice(mosaicIndex + 2); - // Valid commands const validCommands = ["fix", "status", "cancel", "verbose", "quiet", "help"]; if (!validCommands.includes(command)) { @@ -285,7 +402,7 @@ export class MatrixService implements IChatProvider { } /** - * Handle a parsed command + * Handle a parsed command (ChatCommand format, used by fallback path) */ async handleCommand(command: ChatCommand): Promise { const { command: cmd, args, message } = command; @@ -296,7 +413,7 @@ export class MatrixService implements IChatProvider { switch (cmd) { case "fix": - await this.handleFixCommand(args, message); + await this.handleFixCommand(args, message, this.workspaceId); break; case "status": await this.handleStatusCommand(args, message); @@ -324,7 +441,11 @@ export class MatrixService implements IChatProvider { /** * Handle fix command - Start a job for an issue */ - private async handleFixCommand(args: string[], message: ChatMessage): Promise { + private async handleFixCommand( + args: string[], + message: ChatMessage, + workspaceId?: string + ): Promise { if (args.length === 0 || !args[0]) { await this.sendMessage( message.channelId, @@ -333,7 +454,9 @@ export class MatrixService implements IChatProvider { return; } - const issueNumber = parseInt(args[0], 10); + // Parse issue number: handle both "#42" and "42" formats + const issueArg = args[0].replace(/^#/, ""); + const issueNumber = parseInt(issueArg, 10); if (isNaN(issueNumber)) { await this.sendMessage( @@ -343,6 +466,8 @@ export class MatrixService implements IChatProvider { return; } + const targetWorkspaceId = workspaceId ?? this.workspaceId; + // Create thread for job updates const threadId = await this.createThread({ channelId: message.channelId, @@ -352,7 +477,7 @@ export class MatrixService implements IChatProvider { // Dispatch job to stitcher const result = await this.stitcherService.dispatchJob({ - workspaceId: this.workspaceId, + workspaceId: targetWorkspaceId, type: "code-task", priority: 10, metadata: { @@ -414,6 +539,27 @@ export class MatrixService implements IChatProvider { ); } + /** + * Handle retry command - Retry a failed job + */ + private async handleRetryCommand(args: string[], message: ChatMessage): Promise { + if (args.length === 0 || !args[0]) { + await this.sendMessage( + message.channelId, + "Usage: `@mosaic retry ` or `!mosaic retry `" + ); + return; + } + + const jobId = args[0]; + + // TODO: Implement job retry in stitcher + await this.sendMessage( + message.channelId, + `Retry command not yet implemented for job: ${jobId}` + ); + } + /** * Handle verbose command - Stream full logs to thread */ @@ -453,6 +599,7 @@ export class MatrixService implements IChatProvider { \`@mosaic fix \` or \`!mosaic fix \` - Start job for issue \`@mosaic status \` or \`!mosaic status \` - Get job status \`@mosaic cancel \` or \`!mosaic cancel \` - Cancel running job +\`@mosaic retry \` or \`!mosaic retry \` - Retry failed job \`@mosaic verbose \` or \`!mosaic verbose \` - Stream full logs to thread \`@mosaic quiet\` or \`!mosaic quiet\` - Reduce notifications \`@mosaic help\` or \`!mosaic help\` - Show this help message diff --git a/apps/api/src/herald/herald.module.ts b/apps/api/src/herald/herald.module.ts index cc46e89..474ac6e 100644 --- a/apps/api/src/herald/herald.module.ts +++ b/apps/api/src/herald/herald.module.ts @@ -10,7 +10,7 @@ import { BridgeModule } from "../bridge/bridge.module"; * - Subscribe to job events * - Format status messages with PDA-friendly language * - Route to appropriate channels based on workspace config - * - Support Discord (via bridge) and PR comments + * - Broadcast to ALL active chat providers via CHAT_PROVIDERS token */ @Module({ imports: [PrismaModule, BridgeModule], diff --git a/apps/api/src/herald/herald.service.spec.ts b/apps/api/src/herald/herald.service.spec.ts index d2eec1a..0799756 100644 --- a/apps/api/src/herald/herald.service.spec.ts +++ b/apps/api/src/herald/herald.service.spec.ts @@ -2,7 +2,8 @@ import { Test, TestingModule } from "@nestjs/testing"; import { vi, describe, it, expect, beforeEach } from "vitest"; import { HeraldService } from "./herald.service"; import { PrismaService } from "../prisma/prisma.service"; -import { DiscordService } from "../bridge/discord/discord.service"; +import { CHAT_PROVIDERS } from "../bridge/bridge.constants"; +import type { IChatProvider } from "../bridge/interfaces/chat-provider.interface"; import { JOB_CREATED, JOB_STARTED, @@ -14,10 +15,31 @@ import { GATE_FAILED, } from "../job-events/event-types"; +function createMockProvider( + name: string, + connected = true +): IChatProvider & { + sendMessage: ReturnType; + sendThreadMessage: ReturnType; + createThread: ReturnType; + isConnected: ReturnType; + connect: ReturnType; + disconnect: ReturnType; + parseCommand: ReturnType; +} { + return { + connect: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + isConnected: vi.fn().mockReturnValue(connected), + sendMessage: vi.fn().mockResolvedValue(undefined), + createThread: vi.fn().mockResolvedValue("thread-id"), + sendThreadMessage: vi.fn().mockResolvedValue(undefined), + parseCommand: vi.fn().mockReturnValue(null), + }; +} + describe("HeraldService", () => { let service: HeraldService; - let prisma: PrismaService; - let discord: DiscordService; const mockPrisma = { workspace: { @@ -31,14 +53,15 @@ describe("HeraldService", () => { }, }; - const mockDiscord = { - isConnected: vi.fn(), - sendMessage: vi.fn(), - sendThreadMessage: vi.fn(), - createThread: vi.fn(), - }; + let mockProviderA: ReturnType; + let mockProviderB: ReturnType; + let chatProviders: IChatProvider[]; beforeEach(async () => { + mockProviderA = createMockProvider("providerA", true); + mockProviderB = createMockProvider("providerB", true); + chatProviders = [mockProviderA, mockProviderB]; + const module: TestingModule = await Test.createTestingModule({ providers: [ HeraldService, @@ -47,44 +70,28 @@ describe("HeraldService", () => { useValue: mockPrisma, }, { - provide: DiscordService, - useValue: mockDiscord, + provide: CHAT_PROVIDERS, + useValue: chatProviders, }, ], }).compile(); service = module.get(HeraldService); - prisma = module.get(PrismaService); - discord = module.get(DiscordService); // Reset mocks vi.clearAllMocks(); + // Restore default connected state after clearAllMocks + mockProviderA.isConnected.mockReturnValue(true); + mockProviderB.isConnected.mockReturnValue(true); }); describe("broadcastJobEvent", () => { - it("should broadcast job.created event to configured channel", async () => { - // Arrange + const baseSetup = (): { + jobId: string; + workspaceId: string; + } => { const workspaceId = "workspace-1"; const jobId = "job-1"; - const event = { - id: "event-1", - jobId, - type: JOB_CREATED, - timestamp: new Date(), - actor: "system", - payload: { issueNumber: 42 }, - }; - - mockPrisma.workspace.findUnique.mockResolvedValue({ - id: workspaceId, - settings: { - herald: { - channelMappings: { - "code-task": "channel-123", - }, - }, - }, - }); mockPrisma.runnerJob.findUnique.mockResolvedValue({ id: jobId, @@ -98,23 +105,38 @@ describe("HeraldService", () => { }, }); - mockDiscord.isConnected.mockReturnValue(true); - mockDiscord.sendThreadMessage.mockResolvedValue(undefined); + return { jobId, workspaceId }; + }; + + it("should broadcast to all connected providers", async () => { + // Arrange + const { jobId } = baseSetup(); + const event = { + id: "event-1", + jobId, + type: JOB_CREATED, + timestamp: new Date(), + actor: "system", + payload: { issueNumber: 42 }, + }; // Act await service.broadcastJobEvent(jobId, event); // Assert - expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({ + expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({ + threadId: "thread-123", + content: expect.stringContaining("Job created"), + }); + expect(mockProviderB.sendThreadMessage).toHaveBeenCalledWith({ threadId: "thread-123", content: expect.stringContaining("Job created"), }); }); - it("should broadcast job.started event", async () => { + it("should broadcast job.started event to all providers", async () => { // Arrange - const workspaceId = "workspace-1"; - const jobId = "job-1"; + const { jobId } = baseSetup(); const event = { id: "event-1", jobId, @@ -124,31 +146,15 @@ describe("HeraldService", () => { payload: {}, }; - mockPrisma.workspace.findUnique.mockResolvedValue({ - id: workspaceId, - settings: { herald: { channelMappings: {} } }, - }); - - mockPrisma.runnerJob.findUnique.mockResolvedValue({ - id: jobId, - workspaceId, - type: "code-task", - }); - - mockPrisma.jobEvent.findFirst.mockResolvedValue({ - payload: { - metadata: { threadId: "thread-123" }, - }, - }); - - mockDiscord.isConnected.mockReturnValue(true); - mockDiscord.sendThreadMessage.mockResolvedValue(undefined); - // Act await service.broadcastJobEvent(jobId, event); // Assert - expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({ + expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({ + threadId: "thread-123", + content: expect.stringContaining("Job started"), + }); + expect(mockProviderB.sendThreadMessage).toHaveBeenCalledWith({ threadId: "thread-123", content: expect.stringContaining("Job started"), }); @@ -156,8 +162,7 @@ describe("HeraldService", () => { it("should broadcast job.completed event with success message", async () => { // Arrange - const workspaceId = "workspace-1"; - const jobId = "job-1"; + const { jobId } = baseSetup(); const event = { id: "event-1", jobId, @@ -167,31 +172,11 @@ describe("HeraldService", () => { payload: { duration: 120 }, }; - mockPrisma.workspace.findUnique.mockResolvedValue({ - id: workspaceId, - settings: { herald: { channelMappings: {} } }, - }); - - mockPrisma.runnerJob.findUnique.mockResolvedValue({ - id: jobId, - workspaceId, - type: "code-task", - }); - - mockPrisma.jobEvent.findFirst.mockResolvedValue({ - payload: { - metadata: { threadId: "thread-123" }, - }, - }); - - mockDiscord.isConnected.mockReturnValue(true); - mockDiscord.sendThreadMessage.mockResolvedValue(undefined); - // Act await service.broadcastJobEvent(jobId, event); // Assert - expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({ + expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({ threadId: "thread-123", content: expect.stringContaining("completed"), }); @@ -199,8 +184,7 @@ describe("HeraldService", () => { it("should broadcast job.failed event with PDA-friendly language", async () => { // Arrange - const workspaceId = "workspace-1"; - const jobId = "job-1"; + const { jobId } = baseSetup(); const event = { id: "event-1", jobId, @@ -210,43 +194,28 @@ describe("HeraldService", () => { payload: { error: "Build failed" }, }; - mockPrisma.workspace.findUnique.mockResolvedValue({ - id: workspaceId, - settings: { herald: { channelMappings: {} } }, - }); - - mockPrisma.runnerJob.findUnique.mockResolvedValue({ - id: jobId, - workspaceId, - type: "code-task", - }); - - mockPrisma.jobEvent.findFirst.mockResolvedValue({ - payload: { - metadata: { threadId: "thread-123" }, - }, - }); - - mockDiscord.isConnected.mockReturnValue(true); - mockDiscord.sendThreadMessage.mockResolvedValue(undefined); - // Act await service.broadcastJobEvent(jobId, event); // Assert - expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({ + expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({ threadId: "thread-123", content: expect.stringContaining("encountered an issue"), }); // Verify the actual message doesn't contain demanding language - const actualCall = mockDiscord.sendThreadMessage.mock.calls[0][0]; + const actualCall = mockProviderA.sendThreadMessage.mock.calls[0][0] as { + threadId: string; + content: string; + }; expect(actualCall.content).not.toMatch(/FAILED|ERROR|CRITICAL|URGENT/); }); - it("should skip broadcasting if Discord is not connected", async () => { + it("should skip disconnected providers", async () => { // Arrange - const workspaceId = "workspace-1"; - const jobId = "job-1"; + const { jobId } = baseSetup(); + mockProviderA.isConnected.mockReturnValue(true); + mockProviderB.isConnected.mockReturnValue(false); + const event = { id: "event-1", jobId, @@ -256,14 +225,36 @@ describe("HeraldService", () => { payload: {}, }; - mockPrisma.workspace.findUnique.mockResolvedValue({ - id: workspaceId, - settings: { herald: { channelMappings: {} } }, - }); + // Act + await service.broadcastJobEvent(jobId, event); + // Assert + expect(mockProviderA.sendThreadMessage).toHaveBeenCalledTimes(1); + expect(mockProviderB.sendThreadMessage).not.toHaveBeenCalled(); + }); + + it("should handle empty providers array without crashing", async () => { + // Arrange — rebuild module with empty providers + const module: TestingModule = await Test.createTestingModule({ + providers: [ + HeraldService, + { + provide: PrismaService, + useValue: mockPrisma, + }, + { + provide: CHAT_PROVIDERS, + useValue: [], + }, + ], + }).compile(); + + const emptyService = module.get(HeraldService); + + const jobId = "job-1"; mockPrisma.runnerJob.findUnique.mockResolvedValue({ id: jobId, - workspaceId, + workspaceId: "workspace-1", type: "code-task", }); @@ -273,19 +264,6 @@ describe("HeraldService", () => { }, }); - mockDiscord.isConnected.mockReturnValue(false); - - // Act - await service.broadcastJobEvent(jobId, event); - - // Assert - expect(mockDiscord.sendThreadMessage).not.toHaveBeenCalled(); - }); - - it("should skip broadcasting if job has no threadId", async () => { - // Arrange - const workspaceId = "workspace-1"; - const jobId = "job-1"; const event = { id: "event-1", jobId, @@ -295,14 +273,59 @@ describe("HeraldService", () => { payload: {}, }; - mockPrisma.workspace.findUnique.mockResolvedValue({ - id: workspaceId, - settings: { herald: { channelMappings: {} } }, - }); + // Act & Assert — should not throw + await expect(emptyService.broadcastJobEvent(jobId, event)).resolves.not.toThrow(); + }); + + it("should continue broadcasting when one provider errors", async () => { + // Arrange + const { jobId } = baseSetup(); + mockProviderA.sendThreadMessage.mockRejectedValue(new Error("Provider A rate limit")); + mockProviderB.sendThreadMessage.mockResolvedValue(undefined); + + const event = { + id: "event-1", + jobId, + type: JOB_CREATED, + timestamp: new Date(), + actor: "system", + payload: {}, + }; + + // Act — should not throw despite provider A failing + await service.broadcastJobEvent(jobId, event); + + // Assert — provider B should still have been called + expect(mockProviderA.sendThreadMessage).toHaveBeenCalledTimes(1); + expect(mockProviderB.sendThreadMessage).toHaveBeenCalledTimes(1); + }); + + it("should not throw when all providers error", async () => { + // Arrange + const { jobId } = baseSetup(); + mockProviderA.sendThreadMessage.mockRejectedValue(new Error("Provider A down")); + mockProviderB.sendThreadMessage.mockRejectedValue(new Error("Provider B down")); + + const event = { + id: "event-1", + jobId, + type: JOB_CREATED, + timestamp: new Date(), + actor: "system", + payload: {}, + }; + + // Act & Assert — should not throw; provider errors are logged, not propagated + await expect(service.broadcastJobEvent(jobId, event)).resolves.not.toThrow(); + }); + + it("should skip broadcasting if job has no threadId", async () => { + // Arrange + const jobId = "job-1"; mockPrisma.runnerJob.findUnique.mockResolvedValue({ id: jobId, - workspaceId, + workspaceId: "workspace-1", type: "code-task", }); @@ -312,16 +335,45 @@ describe("HeraldService", () => { }, }); - mockDiscord.isConnected.mockReturnValue(true); + const event = { + id: "event-1", + jobId, + type: JOB_CREATED, + timestamp: new Date(), + actor: "system", + payload: {}, + }; // Act await service.broadcastJobEvent(jobId, event); // Assert - expect(mockDiscord.sendThreadMessage).not.toHaveBeenCalled(); + expect(mockProviderA.sendThreadMessage).not.toHaveBeenCalled(); + expect(mockProviderB.sendThreadMessage).not.toHaveBeenCalled(); }); - // ERROR HANDLING TESTS - Issue #185 + it("should skip broadcasting if job not found", async () => { + // Arrange + const jobId = "nonexistent-job"; + mockPrisma.runnerJob.findUnique.mockResolvedValue(null); + + const event = { + id: "event-1", + jobId, + type: JOB_CREATED, + timestamp: new Date(), + actor: "system", + payload: {}, + }; + + // Act + await service.broadcastJobEvent(jobId, event); + + // Assert + expect(mockProviderA.sendThreadMessage).not.toHaveBeenCalled(); + }); + + // ERROR HANDLING TESTS - database errors should still propagate it("should propagate database errors when job lookup fails", async () => { // Arrange @@ -344,43 +396,8 @@ describe("HeraldService", () => { ); }); - it("should propagate Discord send failures with context", async () => { - // Arrange - const workspaceId = "workspace-1"; - const jobId = "job-1"; - const event = { - id: "event-1", - jobId, - type: JOB_CREATED, - timestamp: new Date(), - actor: "system", - payload: {}, - }; - - mockPrisma.runnerJob.findUnique.mockResolvedValue({ - id: jobId, - workspaceId, - type: "code-task", - }); - - mockPrisma.jobEvent.findFirst.mockResolvedValue({ - payload: { - metadata: { threadId: "thread-123" }, - }, - }); - - mockDiscord.isConnected.mockReturnValue(true); - - const discordError = new Error("Rate limit exceeded"); - mockDiscord.sendThreadMessage.mockRejectedValue(discordError); - - // Act & Assert - await expect(service.broadcastJobEvent(jobId, event)).rejects.toThrow("Rate limit exceeded"); - }); - it("should propagate errors when fetching job events fails", async () => { // Arrange - const workspaceId = "workspace-1"; const jobId = "job-1"; const event = { id: "event-1", @@ -393,61 +410,16 @@ describe("HeraldService", () => { mockPrisma.runnerJob.findUnique.mockResolvedValue({ id: jobId, - workspaceId, + workspaceId: "workspace-1", type: "code-task", }); const dbError = new Error("Query timeout"); mockPrisma.jobEvent.findFirst.mockRejectedValue(dbError); - mockDiscord.isConnected.mockReturnValue(true); - // Act & Assert await expect(service.broadcastJobEvent(jobId, event)).rejects.toThrow("Query timeout"); }); - - it("should include job context in error messages", async () => { - // Arrange - const workspaceId = "workspace-1"; - const jobId = "test-job-123"; - const event = { - id: "event-1", - jobId, - type: JOB_COMPLETED, - timestamp: new Date(), - actor: "system", - payload: {}, - }; - - mockPrisma.runnerJob.findUnique.mockResolvedValue({ - id: jobId, - workspaceId, - type: "code-task", - }); - - mockPrisma.jobEvent.findFirst.mockResolvedValue({ - payload: { - metadata: { threadId: "thread-123" }, - }, - }); - - mockDiscord.isConnected.mockReturnValue(true); - - const discordError = new Error("Network failure"); - mockDiscord.sendThreadMessage.mockRejectedValue(discordError); - - // Act & Assert - try { - await service.broadcastJobEvent(jobId, event); - // Should not reach here - expect(true).toBe(false); - } catch (error) { - // Verify error was thrown - expect(error).toBeDefined(); - // Verify original error is preserved - expect((error as Error).message).toContain("Network failure"); - } - }); }); describe("formatJobEventMessage", () => { @@ -473,7 +445,6 @@ describe("HeraldService", () => { const message = service.formatJobEventMessage(event, job, metadata); // Assert - expect(message).toContain("🟢"); expect(message).toContain("Job created"); expect(message).toContain("#42"); expect(message.length).toBeLessThan(200); // Keep it scannable @@ -526,7 +497,6 @@ describe("HeraldService", () => { const message = service.formatJobEventMessage(event, job, metadata); // Assert - expect(message).toMatch(/✅|🟢/); expect(message).toContain("completed"); expect(message).not.toMatch(/COMPLETED|SUCCESS/); }); diff --git a/apps/api/src/herald/herald.service.ts b/apps/api/src/herald/herald.service.ts index 9b02a29..bc05824 100644 --- a/apps/api/src/herald/herald.service.ts +++ b/apps/api/src/herald/herald.service.ts @@ -1,6 +1,7 @@ -import { Injectable, Logger } from "@nestjs/common"; +import { Inject, Injectable, Logger } from "@nestjs/common"; import { PrismaService } from "../prisma/prisma.service"; -import { DiscordService } from "../bridge/discord/discord.service"; +import { CHAT_PROVIDERS } from "../bridge/bridge.constants"; +import type { IChatProvider } from "../bridge/interfaces/chat-provider.interface"; import { JOB_CREATED, JOB_STARTED, @@ -21,7 +22,7 @@ import { * - Subscribe to job events * - Format status messages with PDA-friendly language * - Route to appropriate channels based on workspace config - * - Support Discord (via bridge) and PR comments + * - Broadcast to ALL active chat providers (Discord, Matrix, etc.) */ @Injectable() export class HeraldService { @@ -29,11 +30,11 @@ export class HeraldService { constructor( private readonly prisma: PrismaService, - private readonly discord: DiscordService + @Inject(CHAT_PROVIDERS) private readonly chatProviders: IChatProvider[] ) {} /** - * Broadcast a job event to the appropriate channel + * Broadcast a job event to all connected chat providers */ async broadcastJobEvent( jobId: string, @@ -47,66 +48,65 @@ export class HeraldService { payload: unknown; } ): Promise { - try { - // Get job details - const job = await this.prisma.runnerJob.findUnique({ - where: { id: jobId }, - select: { - id: true, - workspaceId: true, - type: true, - }, - }); + // Get job details + const job = await this.prisma.runnerJob.findUnique({ + where: { id: jobId }, + select: { + id: true, + workspaceId: true, + type: true, + }, + }); - if (!job) { - this.logger.warn(`Job ${jobId} not found, skipping broadcast`); - return; - } - - // Check if Discord is connected - if (!this.discord.isConnected()) { - this.logger.debug("Discord not connected, skipping broadcast"); - return; - } - - // Get threadId from first event payload (job.created event has metadata) - const firstEvent = await this.prisma.jobEvent.findFirst({ - where: { - jobId, - type: JOB_CREATED, - }, - select: { - payload: true, - }, - }); - - const firstEventPayload = firstEvent?.payload as Record | undefined; - const metadata = firstEventPayload?.metadata as Record | undefined; - const threadId = metadata?.threadId as string | undefined; - - if (!threadId) { - this.logger.debug(`Job ${jobId} has no threadId, skipping broadcast`); - return; - } - - // Format message - const message = this.formatJobEventMessage(event, job, metadata); - - // Send to thread - await this.discord.sendThreadMessage({ - threadId, - content: message, - }); - - this.logger.debug(`Broadcasted event ${event.type} for job ${jobId} to thread ${threadId}`); - } catch (error) { - // Log the error with full context for debugging - this.logger.error(`Failed to broadcast event ${event.type} for job ${jobId}:`, error); - - // Re-throw the error so callers can handle it appropriately - // This enables proper error tracking, retry logic, and alerting - throw error; + if (!job) { + this.logger.warn(`Job ${jobId} not found, skipping broadcast`); + return; } + + // Get threadId from first event payload (job.created event has metadata) + const firstEvent = await this.prisma.jobEvent.findFirst({ + where: { + jobId, + type: JOB_CREATED, + }, + select: { + payload: true, + }, + }); + + const firstEventPayload = firstEvent?.payload as Record | undefined; + const metadata = firstEventPayload?.metadata as Record | undefined; + const threadId = metadata?.threadId as string | undefined; + + if (!threadId) { + this.logger.debug(`Job ${jobId} has no threadId, skipping broadcast`); + return; + } + + // Format message + const message = this.formatJobEventMessage(event, job, metadata); + + // Broadcast to all connected providers + for (const provider of this.chatProviders) { + if (!provider.isConnected()) { + continue; + } + + try { + await provider.sendThreadMessage({ + threadId, + content: message, + }); + } catch (error) { + // Log and continue — one provider failure must not block others + this.logger.error( + `Failed to broadcast event ${event.type} for job ${jobId} via provider:`, + error + ); + } + } + + this.logger.debug(`Broadcasted event ${event.type} for job ${jobId} to thread ${threadId}`); } /**