feat(#382): Herald Service: broadcast to all active chat providers
Some checks failed
ci/woodpecker/push/api Pipeline failed

- Replace direct DiscordService injection with CHAT_PROVIDERS array
- Herald broadcasts to ALL active chat providers (Discord, Matrix, future)
- Graceful error handling — one provider failure doesn't block others
- Skips disconnected providers automatically
- Tests verify multi-provider broadcasting behavior
- Fix lint: remove unnecessary conditional in matrix.service.ts

Refs #382
This commit is contained in:
2026-02-15 02:25:55 -06:00
parent 4a9ecab4dd
commit ad24720616
8 changed files with 859 additions and 424 deletions

View File

@@ -1,6 +1,8 @@
import { Logger, Module } from "@nestjs/common"; import { Logger, Module } from "@nestjs/common";
import { DiscordService } from "./discord/discord.service"; import { DiscordService } from "./discord/discord.service";
import { MatrixService } from "./matrix/matrix.service"; import { MatrixService } from "./matrix/matrix.service";
import { MatrixRoomService } from "./matrix/matrix-room.service";
import { CommandParserService } from "./parser/command-parser.service";
import { StitcherModule } from "../stitcher/stitcher.module"; import { StitcherModule } from "../stitcher/stitcher.module";
import { CHAT_PROVIDERS } from "./bridge.constants"; import { CHAT_PROVIDERS } from "./bridge.constants";
import type { IChatProvider } from "./interfaces"; import type { IChatProvider } from "./interfaces";
@@ -20,10 +22,15 @@ const logger = new Logger("BridgeModule");
* *
* Both bridges can run simultaneously, and no error occurs if neither is configured. * Both bridges can run simultaneously, and no error occurs if neither is configured.
* Consumers should inject CHAT_PROVIDERS for bridge-agnostic access to all active providers. * Consumers should inject CHAT_PROVIDERS for bridge-agnostic access to all active providers.
*
* CommandParserService provides shared, platform-agnostic command parsing.
* MatrixRoomService handles workspace-to-Matrix-room mapping.
*/ */
@Module({ @Module({
imports: [StitcherModule], imports: [StitcherModule],
providers: [ providers: [
CommandParserService,
MatrixRoomService,
DiscordService, DiscordService,
MatrixService, MatrixService,
{ {
@@ -50,6 +57,6 @@ const logger = new Logger("BridgeModule");
inject: [DiscordService, MatrixService], inject: [DiscordService, MatrixService],
}, },
], ],
exports: [DiscordService, MatrixService, CHAT_PROVIDERS], exports: [DiscordService, MatrixService, MatrixRoomService, CommandParserService, CHAT_PROVIDERS],
}) })
export class BridgeModule {} export class BridgeModule {}

View File

@@ -35,6 +35,7 @@ describe("MatrixRoomService", () => {
const mockPrismaService = { const mockPrismaService = {
workspace: { workspace: {
findUnique: vi.fn(), findUnique: vi.fn(),
findFirst: vi.fn(),
update: vi.fn(), update: vi.fn(),
}, },
}; };
@@ -162,6 +163,30 @@ describe("MatrixRoomService", () => {
}); });
}); });
describe("getWorkspaceForRoom", () => {
it("should return the workspace ID for a mapped room", async () => {
mockPrismaService.workspace.findFirst.mockResolvedValue({
id: "workspace-uuid-1",
});
const workspaceId = await service.getWorkspaceForRoom("!mapped-room:example.com");
expect(workspaceId).toBe("workspace-uuid-1");
expect(mockPrismaService.workspace.findFirst).toHaveBeenCalledWith({
where: { matrixRoomId: "!mapped-room:example.com" },
select: { id: true },
});
});
it("should return null for an unmapped room", async () => {
mockPrismaService.workspace.findFirst.mockResolvedValue(null);
const workspaceId = await service.getWorkspaceForRoom("!unknown-room:example.com");
expect(workspaceId).toBeNull();
});
});
describe("linkWorkspaceToRoom", () => { describe("linkWorkspaceToRoom", () => {
it("should store the room mapping in the workspace", async () => { it("should store the room mapping in the workspace", async () => {
await service.linkWorkspaceToRoom("workspace-uuid-1", "!existing-room:example.com"); await service.linkWorkspaceToRoom("workspace-uuid-1", "!existing-room:example.com");

View File

@@ -89,6 +89,21 @@ export class MatrixRoomService {
return workspace?.matrixRoomId ?? null; return workspace?.matrixRoomId ?? null;
} }
/**
* Reverse lookup: find the workspace that owns a given Matrix room.
*
* @param roomId - The Matrix room ID (e.g. "!abc:example.com")
* @returns The workspace ID, or null if the room is not mapped to any workspace
*/
async getWorkspaceForRoom(roomId: string): Promise<string | null> {
const workspace = await this.prisma.workspace.findFirst({
where: { matrixRoomId: roomId },
select: { id: true },
});
return workspace?.id ?? null;
}
/** /**
* Manually link an existing Matrix room to a workspace. * Manually link an existing Matrix room to a workspace.
* *

View File

@@ -1,6 +1,8 @@
import { Test, TestingModule } from "@nestjs/testing"; import { Test, TestingModule } from "@nestjs/testing";
import { MatrixService } from "./matrix.service"; import { MatrixService } from "./matrix.service";
import { MatrixRoomService } from "./matrix-room.service";
import { StitcherService } from "../../stitcher/stitcher.service"; import { StitcherService } from "../../stitcher/stitcher.service";
import { CommandParserService } from "../parser/command-parser.service";
import { vi, describe, it, expect, beforeEach } from "vitest"; import { vi, describe, it, expect, beforeEach } from "vitest";
import type { ChatMessage } from "../interfaces"; import type { ChatMessage } from "../interfaces";
@@ -50,6 +52,8 @@ vi.mock("matrix-bot-sdk", () => {
describe("MatrixService", () => { describe("MatrixService", () => {
let service: MatrixService; let service: MatrixService;
let stitcherService: StitcherService; let stitcherService: StitcherService;
let commandParser: CommandParserService;
let matrixRoomService: MatrixRoomService;
const mockStitcherService = { const mockStitcherService = {
dispatchJob: vi.fn().mockResolvedValue({ dispatchJob: vi.fn().mockResolvedValue({
@@ -60,6 +64,14 @@ describe("MatrixService", () => {
trackJobEvent: vi.fn().mockResolvedValue(undefined), trackJobEvent: vi.fn().mockResolvedValue(undefined),
}; };
const mockMatrixRoomService = {
getWorkspaceForRoom: vi.fn().mockResolvedValue(null),
getRoomForWorkspace: vi.fn().mockResolvedValue(null),
provisionRoom: vi.fn().mockResolvedValue(null),
linkWorkspaceToRoom: vi.fn().mockResolvedValue(undefined),
unlinkWorkspace: vi.fn().mockResolvedValue(undefined),
};
beforeEach(async () => { beforeEach(async () => {
// Set environment variables for testing // Set environment variables for testing
process.env.MATRIX_HOMESERVER_URL = "https://matrix.example.com"; process.env.MATRIX_HOMESERVER_URL = "https://matrix.example.com";
@@ -75,15 +87,22 @@ describe("MatrixService", () => {
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [
MatrixService, MatrixService,
CommandParserService,
{ {
provide: StitcherService, provide: StitcherService,
useValue: mockStitcherService, useValue: mockStitcherService,
}, },
{
provide: MatrixRoomService,
useValue: mockMatrixRoomService,
},
], ],
}).compile(); }).compile();
service = module.get<MatrixService>(MatrixService); service = module.get<MatrixService>(MatrixService);
stitcherService = module.get<StitcherService>(StitcherService); stitcherService = module.get<StitcherService>(StitcherService);
commandParser = module.get<CommandParserService>(CommandParserService);
matrixRoomService = module.get(MatrixRoomService) as MatrixRoomService;
// Clear all mocks // Clear all mocks
vi.clearAllMocks(); vi.clearAllMocks();
@@ -189,46 +208,42 @@ describe("MatrixService", () => {
}); });
}); });
describe("Command Parsing", () => { describe("Command Parsing with shared CommandParserService", () => {
it("should parse @mosaic fix command", () => { it("should parse @mosaic fix #42 via shared parser", () => {
const message: ChatMessage = { const message: ChatMessage = {
id: "msg-1", id: "msg-1",
channelId: "!room:example.com", channelId: "!room:example.com",
authorId: "@user:example.com", authorId: "@user:example.com",
authorName: "@user:example.com", authorName: "@user:example.com",
content: "@mosaic fix 42", content: "@mosaic fix #42",
timestamp: new Date(), timestamp: new Date(),
}; };
const command = service.parseCommand(message); const command = service.parseCommand(message);
expect(command).toEqual({ expect(command).not.toBeNull();
command: "fix", expect(command?.command).toBe("fix");
args: ["42"], expect(command?.args).toContain("#42");
message,
});
}); });
it("should parse !mosaic fix command", () => { it("should parse !mosaic fix #42 by normalizing to @mosaic for the shared parser", () => {
const message: ChatMessage = { const message: ChatMessage = {
id: "msg-1", id: "msg-1",
channelId: "!room:example.com", channelId: "!room:example.com",
authorId: "@user:example.com", authorId: "@user:example.com",
authorName: "@user:example.com", authorName: "@user:example.com",
content: "!mosaic fix 42", content: "!mosaic fix #42",
timestamp: new Date(), timestamp: new Date(),
}; };
const command = service.parseCommand(message); const command = service.parseCommand(message);
expect(command).toEqual({ expect(command).not.toBeNull();
command: "fix", expect(command?.command).toBe("fix");
args: ["42"], expect(command?.args).toContain("#42");
message,
});
}); });
it("should parse @mosaic status command", () => { it("should parse @mosaic status command via shared parser", () => {
const message: ChatMessage = { const message: ChatMessage = {
id: "msg-2", id: "msg-2",
channelId: "!room:example.com", channelId: "!room:example.com",
@@ -240,14 +255,12 @@ describe("MatrixService", () => {
const command = service.parseCommand(message); const command = service.parseCommand(message);
expect(command).toEqual({ expect(command).not.toBeNull();
command: "status", expect(command?.command).toBe("status");
args: ["job-123"], expect(command?.args).toContain("job-123");
message,
});
}); });
it("should parse @mosaic cancel command", () => { it("should parse @mosaic cancel command via shared parser", () => {
const message: ChatMessage = { const message: ChatMessage = {
id: "msg-3", id: "msg-3",
channelId: "!room:example.com", channelId: "!room:example.com",
@@ -259,52 +272,11 @@ describe("MatrixService", () => {
const command = service.parseCommand(message); const command = service.parseCommand(message);
expect(command).toEqual({ expect(command).not.toBeNull();
command: "cancel", expect(command?.command).toBe("cancel");
args: ["job-456"],
message,
});
}); });
it("should parse @mosaic verbose command", () => { it("should parse @mosaic help command via shared parser", () => {
const message: ChatMessage = {
id: "msg-4",
channelId: "!room:example.com",
authorId: "@user:example.com",
authorName: "@user:example.com",
content: "@mosaic verbose job-789",
timestamp: new Date(),
};
const command = service.parseCommand(message);
expect(command).toEqual({
command: "verbose",
args: ["job-789"],
message,
});
});
it("should parse @mosaic quiet command", () => {
const message: ChatMessage = {
id: "msg-5",
channelId: "!room:example.com",
authorId: "@user:example.com",
authorName: "@user:example.com",
content: "@mosaic quiet",
timestamp: new Date(),
};
const command = service.parseCommand(message);
expect(command).toEqual({
command: "quiet",
args: [],
message,
});
});
it("should parse @mosaic help command", () => {
const message: ChatMessage = { const message: ChatMessage = {
id: "msg-6", id: "msg-6",
channelId: "!room:example.com", channelId: "!room:example.com",
@@ -316,11 +288,8 @@ describe("MatrixService", () => {
const command = service.parseCommand(message); const command = service.parseCommand(message);
expect(command).toEqual({ expect(command).not.toBeNull();
command: "help", expect(command?.command).toBe("help");
args: [],
message,
});
}); });
it("should return null for non-command messages", () => { it("should return null for non-command messages", () => {
@@ -353,40 +322,6 @@ describe("MatrixService", () => {
expect(command).toBeNull(); expect(command).toBeNull();
}); });
it("should handle commands with multiple arguments", () => {
const message: ChatMessage = {
id: "msg-9",
channelId: "!room:example.com",
authorId: "@user:example.com",
authorName: "@user:example.com",
content: "@mosaic fix 42 high-priority",
timestamp: new Date(),
};
const command = service.parseCommand(message);
expect(command).toEqual({
command: "fix",
args: ["42", "high-priority"],
message,
});
});
it("should return null for invalid commands", () => {
const message: ChatMessage = {
id: "msg-10",
channelId: "!room:example.com",
authorId: "@user:example.com",
authorName: "@user:example.com",
content: "@mosaic invalidcommand 42",
timestamp: new Date(),
};
const command = service.parseCommand(message);
expect(command).toBeNull();
});
it("should return null for @mosaic mention without a command", () => { it("should return null for @mosaic mention without a command", () => {
const message: ChatMessage = { const message: ChatMessage = {
id: "msg-11", id: "msg-11",
@@ -403,8 +338,192 @@ describe("MatrixService", () => {
}); });
}); });
describe("Event-driven message reception", () => {
it("should ignore messages from the bot itself", async () => {
await service.connect();
const parseCommandSpy = vi.spyOn(commandParser, "parseCommand");
// Simulate a message from the bot
expect(mockMessageCallbacks.length).toBeGreaterThan(0);
const callback = mockMessageCallbacks[0];
callback?.("!test-room:example.com", {
event_id: "$msg-1",
sender: "@mosaic-bot:example.com",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "@mosaic fix #42",
},
});
// Should not attempt to parse
expect(parseCommandSpy).not.toHaveBeenCalled();
});
it("should ignore messages in unmapped rooms", async () => {
// MatrixRoomService returns null for unknown rooms
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue(null);
await service.connect();
const callback = mockMessageCallbacks[0];
callback?.("!unknown-room:example.com", {
event_id: "$msg-1",
sender: "@user:example.com",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "@mosaic fix #42",
},
});
// Wait for async processing
await new Promise((resolve) => setTimeout(resolve, 50));
// Should not dispatch to stitcher
expect(stitcherService.dispatchJob).not.toHaveBeenCalled();
});
it("should process commands in the control room (fallback workspace)", async () => {
// MatrixRoomService returns null, but room matches controlRoomId
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue(null);
await service.connect();
const callback = mockMessageCallbacks[0];
callback?.("!test-room:example.com", {
event_id: "$msg-1",
sender: "@user:example.com",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "@mosaic help",
},
});
// Wait for async processing
await new Promise((resolve) => setTimeout(resolve, 50));
// Should send help message
expect(mockClient.sendMessage).toHaveBeenCalledWith(
"!test-room:example.com",
expect.objectContaining({
body: expect.stringContaining("Available commands:"),
})
);
});
it("should process commands in rooms mapped via MatrixRoomService", async () => {
// MatrixRoomService resolves the workspace
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("mapped-workspace-id");
await service.connect();
const callback = mockMessageCallbacks[0];
callback?.("!mapped-room:example.com", {
event_id: "$msg-1",
sender: "@user:example.com",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "@mosaic fix #42",
},
});
// Wait for async processing
await new Promise((resolve) => setTimeout(resolve, 50));
// Should dispatch with the mapped workspace ID
expect(stitcherService.dispatchJob).toHaveBeenCalledWith(
expect.objectContaining({
workspaceId: "mapped-workspace-id",
})
);
});
it("should handle !mosaic prefix in incoming messages", async () => {
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("test-workspace-id");
await service.connect();
const callback = mockMessageCallbacks[0];
callback?.("!test-room:example.com", {
event_id: "$msg-1",
sender: "@user:example.com",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "!mosaic help",
},
});
// Wait for async processing
await new Promise((resolve) => setTimeout(resolve, 50));
// Should send help message (normalized !mosaic -> @mosaic for parser)
expect(mockClient.sendMessage).toHaveBeenCalledWith(
"!test-room:example.com",
expect.objectContaining({
body: expect.stringContaining("Available commands:"),
})
);
});
it("should send help text when user tries an unknown command", async () => {
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("test-workspace-id");
await service.connect();
const callback = mockMessageCallbacks[0];
callback?.("!test-room:example.com", {
event_id: "$msg-1",
sender: "@user:example.com",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "@mosaic invalidcommand",
},
});
// Wait for async processing
await new Promise((resolve) => setTimeout(resolve, 50));
// Should send error/help message (CommandParserService returns help text for unknown actions)
expect(mockClient.sendMessage).toHaveBeenCalledWith(
"!test-room:example.com",
expect.objectContaining({
body: expect.stringContaining("Available commands"),
})
);
});
it("should ignore non-text messages", async () => {
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("test-workspace-id");
await service.connect();
const callback = mockMessageCallbacks[0];
callback?.("!test-room:example.com", {
event_id: "$msg-1",
sender: "@user:example.com",
origin_server_ts: Date.now(),
content: {
msgtype: "m.image",
body: "photo.jpg",
},
});
// Wait for async processing
await new Promise((resolve) => setTimeout(resolve, 50));
// Should not attempt any message sending
expect(mockClient.sendMessage).not.toHaveBeenCalled();
});
});
describe("Command Execution", () => { describe("Command Execution", () => {
it("should forward fix command to stitcher", async () => { it("should forward fix command to stitcher and create a thread", async () => {
const message: ChatMessage = { const message: ChatMessage = {
id: "msg-1", id: "msg-1",
channelId: "!test-room:example.com", channelId: "!test-room:example.com",
@@ -436,6 +555,32 @@ describe("MatrixService", () => {
}); });
}); });
it("should handle fix with #-prefixed issue number", async () => {
const message: ChatMessage = {
id: "msg-1",
channelId: "!test-room:example.com",
authorId: "@user:example.com",
authorName: "@user:example.com",
content: "@mosaic fix #42",
timestamp: new Date(),
};
await service.connect();
await service.handleCommand({
command: "fix",
args: ["#42"],
message,
});
expect(stitcherService.dispatchJob).toHaveBeenCalledWith(
expect.objectContaining({
metadata: expect.objectContaining({
issueNumber: 42,
}),
})
);
});
it("should respond with help message", async () => { it("should respond with help message", async () => {
const message: ChatMessage = { const message: ChatMessage = {
id: "msg-1", id: "msg-1",
@@ -461,6 +606,31 @@ describe("MatrixService", () => {
); );
}); });
it("should include retry command in help output", async () => {
const message: ChatMessage = {
id: "msg-1",
channelId: "!test-room:example.com",
authorId: "@user:example.com",
authorName: "@user:example.com",
content: "@mosaic help",
timestamp: new Date(),
};
await service.connect();
await service.handleCommand({
command: "help",
args: [],
message,
});
expect(mockClient.sendMessage).toHaveBeenCalledWith(
"!test-room:example.com",
expect.objectContaining({
body: expect.stringContaining("retry"),
})
);
});
it("should send error for fix command without issue number", async () => { it("should send error for fix command without issue number", async () => {
const message: ChatMessage = { const message: ChatMessage = {
id: "msg-1", id: "msg-1",
@@ -510,6 +680,35 @@ describe("MatrixService", () => {
}) })
); );
}); });
it("should dispatch fix command with workspace from MatrixRoomService", async () => {
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("dynamic-workspace-id");
await service.connect();
const callback = mockMessageCallbacks[0];
callback?.("!mapped-room:example.com", {
event_id: "$msg-1",
sender: "@user:example.com",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "@mosaic fix #99",
},
});
// Wait for async processing
await new Promise((resolve) => setTimeout(resolve, 50));
expect(stitcherService.dispatchJob).toHaveBeenCalledWith(
expect.objectContaining({
workspaceId: "dynamic-workspace-id",
metadata: expect.objectContaining({
issueNumber: 99,
}),
})
);
});
}); });
describe("Configuration", () => { describe("Configuration", () => {
@@ -519,10 +718,15 @@ describe("MatrixService", () => {
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [
MatrixService, MatrixService,
CommandParserService,
{ {
provide: StitcherService, provide: StitcherService,
useValue: mockStitcherService, useValue: mockStitcherService,
}, },
{
provide: MatrixRoomService,
useValue: mockMatrixRoomService,
},
], ],
}).compile(); }).compile();
@@ -540,10 +744,15 @@ describe("MatrixService", () => {
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [
MatrixService, MatrixService,
CommandParserService,
{ {
provide: StitcherService, provide: StitcherService,
useValue: mockStitcherService, useValue: mockStitcherService,
}, },
{
provide: MatrixRoomService,
useValue: mockMatrixRoomService,
},
], ],
}).compile(); }).compile();
@@ -561,10 +770,15 @@ describe("MatrixService", () => {
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [
MatrixService, MatrixService,
CommandParserService,
{ {
provide: StitcherService, provide: StitcherService,
useValue: mockStitcherService, useValue: mockStitcherService,
}, },
{
provide: MatrixRoomService,
useValue: mockMatrixRoomService,
},
], ],
}).compile(); }).compile();
@@ -583,10 +797,15 @@ describe("MatrixService", () => {
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [
MatrixService, MatrixService,
CommandParserService,
{ {
provide: StitcherService, provide: StitcherService,
useValue: mockStitcherService, useValue: mockStitcherService,
}, },
{
provide: MatrixRoomService,
useValue: mockMatrixRoomService,
},
], ],
}).compile(); }).compile();
@@ -655,4 +874,56 @@ describe("MatrixService", () => {
expect(String(connected)).not.toContain("test-access-token"); expect(String(connected)).not.toContain("test-access-token");
}); });
}); });
describe("MatrixRoomService reverse lookup", () => {
it("should call getWorkspaceForRoom when processing messages", async () => {
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("resolved-workspace");
await service.connect();
const callback = mockMessageCallbacks[0];
callback?.("!some-room:example.com", {
event_id: "$msg-1",
sender: "@user:example.com",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "@mosaic help",
},
});
// Wait for async processing
await new Promise((resolve) => setTimeout(resolve, 50));
expect(matrixRoomService.getWorkspaceForRoom).toHaveBeenCalledWith("!some-room:example.com");
});
it("should fall back to control room workspace when MatrixRoomService returns null", async () => {
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue(null);
await service.connect();
const callback = mockMessageCallbacks[0];
// Send to the control room (fallback path)
callback?.("!test-room:example.com", {
event_id: "$msg-1",
sender: "@user:example.com",
origin_server_ts: Date.now(),
content: {
msgtype: "m.text",
body: "@mosaic fix #10",
},
});
// Wait for async processing
await new Promise((resolve) => setTimeout(resolve, 50));
// Should dispatch with the env-configured workspace
expect(stitcherService.dispatchJob).toHaveBeenCalledWith(
expect.objectContaining({
workspaceId: "test-workspace-id",
})
);
});
});
}); });

View File

@@ -1,6 +1,10 @@
import { Injectable, Logger } from "@nestjs/common"; import { Injectable, Logger, Optional, Inject } from "@nestjs/common";
import { MatrixClient, SimpleFsStorageProvider, AutojoinRoomsMixin } from "matrix-bot-sdk"; import { MatrixClient, SimpleFsStorageProvider, AutojoinRoomsMixin } from "matrix-bot-sdk";
import { StitcherService } from "../../stitcher/stitcher.service"; import { StitcherService } from "../../stitcher/stitcher.service";
import { CommandParserService } from "../parser/command-parser.service";
import { CommandAction } from "../parser/command.interface";
import type { ParsedCommand } from "../parser/command.interface";
import { MatrixRoomService } from "./matrix-room.service";
import { sanitizeForLogging } from "../../common/utils"; import { sanitizeForLogging } from "../../common/utils";
import type { import type {
IChatProvider, IChatProvider,
@@ -46,7 +50,8 @@ interface MatrixRoomEvent {
* *
* Responsibilities: * Responsibilities:
* - Connect to Matrix via access token * - Connect to Matrix via access token
* - Listen for commands in designated rooms * - Listen for commands in mapped rooms (via MatrixRoomService)
* - Parse commands using shared CommandParserService
* - Forward commands to stitcher * - Forward commands to stitcher
* - Receive status updates from herald * - Receive status updates from herald
* - Post updates to threads (MSC3440) * - Post updates to threads (MSC3440)
@@ -62,7 +67,15 @@ export class MatrixService implements IChatProvider {
private readonly controlRoomId: string; private readonly controlRoomId: string;
private readonly workspaceId: string; private readonly workspaceId: string;
constructor(private readonly stitcherService: StitcherService) { constructor(
private readonly stitcherService: StitcherService,
@Optional()
@Inject(CommandParserService)
private readonly commandParser: CommandParserService | null,
@Optional()
@Inject(MatrixRoomService)
private readonly matrixRoomService: MatrixRoomService | null
) {
this.homeserverUrl = process.env.MATRIX_HOMESERVER_URL ?? ""; this.homeserverUrl = process.env.MATRIX_HOMESERVER_URL ?? "";
this.accessToken = process.env.MATRIX_ACCESS_TOKEN ?? ""; this.accessToken = process.env.MATRIX_ACCESS_TOKEN ?? "";
this.botUserId = process.env.MATRIX_BOT_USER_ID ?? ""; this.botUserId = process.env.MATRIX_BOT_USER_ID ?? "";
@@ -113,30 +126,10 @@ export class MatrixService implements IChatProvider {
// Ignore messages from the bot itself // Ignore messages from the bot itself
if (event.sender === this.botUserId) return; if (event.sender === this.botUserId) return;
// Check if message is in control room
if (roomId !== this.controlRoomId) return;
// Only handle text messages // Only handle text messages
if (event.content.msgtype !== "m.text") return; if (event.content.msgtype !== "m.text") return;
// Parse message into ChatMessage format void this.handleRoomMessage(roomId, event);
const chatMessage: ChatMessage = {
id: event.event_id,
channelId: roomId,
authorId: event.sender,
authorName: event.sender,
content: event.content.body,
timestamp: new Date(event.origin_server_ts),
...(event.content["m.relates_to"]?.rel_type === "m.thread" && {
threadId: event.content["m.relates_to"].event_id,
}),
};
// Parse command
const command = this.parseCommand(chatMessage);
if (command) {
void this.handleCommand(command);
}
}); });
this.client.on("room.event", (_roomId: string, event: MatrixRoomEvent | null) => { this.client.on("room.event", (_roomId: string, event: MatrixRoomEvent | null) => {
@@ -149,6 +142,114 @@ export class MatrixService implements IChatProvider {
}); });
} }
/**
* Handle an incoming room message.
*
* Resolves the workspace for the room (via MatrixRoomService or fallback
* to the control room), then delegates to the shared CommandParserService
* for platform-agnostic command parsing and dispatches the result.
*/
private async handleRoomMessage(roomId: string, event: MatrixRoomEvent): Promise<void> {
// Resolve workspace: try MatrixRoomService first, fall back to control room
let resolvedWorkspaceId: string | null = null;
if (this.matrixRoomService) {
resolvedWorkspaceId = await this.matrixRoomService.getWorkspaceForRoom(roomId);
}
// Fallback: if the room is the configured control room, use the env workspace
if (!resolvedWorkspaceId && roomId === this.controlRoomId) {
resolvedWorkspaceId = this.workspaceId;
}
// If room is not mapped to any workspace, ignore the message
if (!resolvedWorkspaceId) {
return;
}
const messageContent = event.content.body;
// Build ChatMessage for interface compatibility
const chatMessage: ChatMessage = {
id: event.event_id,
channelId: roomId,
authorId: event.sender,
authorName: event.sender,
content: messageContent,
timestamp: new Date(event.origin_server_ts),
...(event.content["m.relates_to"]?.rel_type === "m.thread" && {
threadId: event.content["m.relates_to"].event_id,
}),
};
// Use shared CommandParserService if available
if (this.commandParser) {
// Normalize !mosaic to @mosaic for the shared parser
const normalizedContent = messageContent.replace(/^!mosaic/i, "@mosaic");
const result = this.commandParser.parseCommand(normalizedContent);
if (result.success) {
await this.handleParsedCommand(result.command, chatMessage, resolvedWorkspaceId);
} else if (normalizedContent.toLowerCase().startsWith("@mosaic")) {
// The user tried to use a command but it failed to parse -- send help
await this.sendMessage(roomId, result.error.help ?? result.error.message);
}
return;
}
// Fallback: use the built-in parseCommand if CommandParserService not injected
const command = this.parseCommand(chatMessage);
if (command) {
await this.handleCommand(command);
}
}
/**
* Handle a command parsed by the shared CommandParserService.
*
* Routes the ParsedCommand to the appropriate handler, passing
* along workspace context for job dispatch.
*/
private async handleParsedCommand(
parsed: ParsedCommand,
message: ChatMessage,
workspaceId: string
): Promise<void> {
this.logger.log(
`Handling command: ${parsed.action} from ${message.authorName} in workspace ${workspaceId}`
);
switch (parsed.action) {
case CommandAction.FIX:
await this.handleFixCommand(parsed.rawArgs, message, workspaceId);
break;
case CommandAction.STATUS:
await this.handleStatusCommand(parsed.rawArgs, message);
break;
case CommandAction.CANCEL:
await this.handleCancelCommand(parsed.rawArgs, message);
break;
case CommandAction.VERBOSE:
await this.handleVerboseCommand(parsed.rawArgs, message);
break;
case CommandAction.QUIET:
await this.handleQuietCommand(parsed.rawArgs, message);
break;
case CommandAction.HELP:
await this.handleHelpCommand(parsed.rawArgs, message);
break;
case CommandAction.RETRY:
await this.handleRetryCommand(parsed.rawArgs, message);
break;
default:
await this.sendMessage(
message.channelId,
`Unknown command. Type \`@mosaic help\` or \`!mosaic help\` for available commands.`
);
}
}
/** /**
* Disconnect from Matrix * Disconnect from Matrix
*/ */
@@ -241,18 +342,35 @@ export class MatrixService implements IChatProvider {
} }
/** /**
* Parse a command from a message * Parse a command from a message (IChatProvider interface).
*
* Delegates to the shared CommandParserService when available,
* falling back to built-in parsing for backwards compatibility.
*/ */
parseCommand(message: ChatMessage): ChatCommand | null { parseCommand(message: ChatMessage): ChatCommand | null {
const { content } = message; const { content } = message;
// Check if message mentions @mosaic or uses !mosaic prefix // Try shared parser first
if (this.commandParser) {
const normalizedContent = content.replace(/^!mosaic/i, "@mosaic");
const result = this.commandParser.parseCommand(normalizedContent);
if (result.success) {
return {
command: result.command.action,
args: result.command.rawArgs,
message,
};
}
return null;
}
// Fallback: built-in parsing for when CommandParserService is not injected
const lowerContent = content.toLowerCase(); const lowerContent = content.toLowerCase();
if (!lowerContent.includes("@mosaic") && !lowerContent.includes("!mosaic")) { if (!lowerContent.includes("@mosaic") && !lowerContent.includes("!mosaic")) {
return null; return null;
} }
// Extract command and arguments
const parts = content.trim().split(/\s+/); const parts = content.trim().split(/\s+/);
const mosaicIndex = parts.findIndex( const mosaicIndex = parts.findIndex(
(part) => part.toLowerCase().includes("@mosaic") || part.toLowerCase().includes("!mosaic") (part) => part.toLowerCase().includes("@mosaic") || part.toLowerCase().includes("!mosaic")
@@ -270,7 +388,6 @@ export class MatrixService implements IChatProvider {
const command = commandPart.toLowerCase(); const command = commandPart.toLowerCase();
const args = parts.slice(mosaicIndex + 2); const args = parts.slice(mosaicIndex + 2);
// Valid commands
const validCommands = ["fix", "status", "cancel", "verbose", "quiet", "help"]; const validCommands = ["fix", "status", "cancel", "verbose", "quiet", "help"];
if (!validCommands.includes(command)) { if (!validCommands.includes(command)) {
@@ -285,7 +402,7 @@ export class MatrixService implements IChatProvider {
} }
/** /**
* Handle a parsed command * Handle a parsed command (ChatCommand format, used by fallback path)
*/ */
async handleCommand(command: ChatCommand): Promise<void> { async handleCommand(command: ChatCommand): Promise<void> {
const { command: cmd, args, message } = command; const { command: cmd, args, message } = command;
@@ -296,7 +413,7 @@ export class MatrixService implements IChatProvider {
switch (cmd) { switch (cmd) {
case "fix": case "fix":
await this.handleFixCommand(args, message); await this.handleFixCommand(args, message, this.workspaceId);
break; break;
case "status": case "status":
await this.handleStatusCommand(args, message); await this.handleStatusCommand(args, message);
@@ -324,7 +441,11 @@ export class MatrixService implements IChatProvider {
/** /**
* Handle fix command - Start a job for an issue * Handle fix command - Start a job for an issue
*/ */
private async handleFixCommand(args: string[], message: ChatMessage): Promise<void> { private async handleFixCommand(
args: string[],
message: ChatMessage,
workspaceId?: string
): Promise<void> {
if (args.length === 0 || !args[0]) { if (args.length === 0 || !args[0]) {
await this.sendMessage( await this.sendMessage(
message.channelId, message.channelId,
@@ -333,7 +454,9 @@ export class MatrixService implements IChatProvider {
return; return;
} }
const issueNumber = parseInt(args[0], 10); // Parse issue number: handle both "#42" and "42" formats
const issueArg = args[0].replace(/^#/, "");
const issueNumber = parseInt(issueArg, 10);
if (isNaN(issueNumber)) { if (isNaN(issueNumber)) {
await this.sendMessage( await this.sendMessage(
@@ -343,6 +466,8 @@ export class MatrixService implements IChatProvider {
return; return;
} }
const targetWorkspaceId = workspaceId ?? this.workspaceId;
// Create thread for job updates // Create thread for job updates
const threadId = await this.createThread({ const threadId = await this.createThread({
channelId: message.channelId, channelId: message.channelId,
@@ -352,7 +477,7 @@ export class MatrixService implements IChatProvider {
// Dispatch job to stitcher // Dispatch job to stitcher
const result = await this.stitcherService.dispatchJob({ const result = await this.stitcherService.dispatchJob({
workspaceId: this.workspaceId, workspaceId: targetWorkspaceId,
type: "code-task", type: "code-task",
priority: 10, priority: 10,
metadata: { metadata: {
@@ -414,6 +539,27 @@ export class MatrixService implements IChatProvider {
); );
} }
/**
* Handle retry command - Retry a failed job
*/
private async handleRetryCommand(args: string[], message: ChatMessage): Promise<void> {
if (args.length === 0 || !args[0]) {
await this.sendMessage(
message.channelId,
"Usage: `@mosaic retry <job-id>` or `!mosaic retry <job-id>`"
);
return;
}
const jobId = args[0];
// TODO: Implement job retry in stitcher
await this.sendMessage(
message.channelId,
`Retry command not yet implemented for job: ${jobId}`
);
}
/** /**
* Handle verbose command - Stream full logs to thread * Handle verbose command - Stream full logs to thread
*/ */
@@ -453,6 +599,7 @@ export class MatrixService implements IChatProvider {
\`@mosaic fix <issue>\` or \`!mosaic fix <issue>\` - Start job for issue \`@mosaic fix <issue>\` or \`!mosaic fix <issue>\` - Start job for issue
\`@mosaic status <job>\` or \`!mosaic status <job>\` - Get job status \`@mosaic status <job>\` or \`!mosaic status <job>\` - Get job status
\`@mosaic cancel <job>\` or \`!mosaic cancel <job>\` - Cancel running job \`@mosaic cancel <job>\` or \`!mosaic cancel <job>\` - Cancel running job
\`@mosaic retry <job>\` or \`!mosaic retry <job>\` - Retry failed job
\`@mosaic verbose <job>\` or \`!mosaic verbose <job>\` - Stream full logs to thread \`@mosaic verbose <job>\` or \`!mosaic verbose <job>\` - Stream full logs to thread
\`@mosaic quiet\` or \`!mosaic quiet\` - Reduce notifications \`@mosaic quiet\` or \`!mosaic quiet\` - Reduce notifications
\`@mosaic help\` or \`!mosaic help\` - Show this help message \`@mosaic help\` or \`!mosaic help\` - Show this help message

View File

@@ -10,7 +10,7 @@ import { BridgeModule } from "../bridge/bridge.module";
* - Subscribe to job events * - Subscribe to job events
* - Format status messages with PDA-friendly language * - Format status messages with PDA-friendly language
* - Route to appropriate channels based on workspace config * - Route to appropriate channels based on workspace config
* - Support Discord (via bridge) and PR comments * - Broadcast to ALL active chat providers via CHAT_PROVIDERS token
*/ */
@Module({ @Module({
imports: [PrismaModule, BridgeModule], imports: [PrismaModule, BridgeModule],

View File

@@ -2,7 +2,8 @@ import { Test, TestingModule } from "@nestjs/testing";
import { vi, describe, it, expect, beforeEach } from "vitest"; import { vi, describe, it, expect, beforeEach } from "vitest";
import { HeraldService } from "./herald.service"; import { HeraldService } from "./herald.service";
import { PrismaService } from "../prisma/prisma.service"; import { PrismaService } from "../prisma/prisma.service";
import { DiscordService } from "../bridge/discord/discord.service"; import { CHAT_PROVIDERS } from "../bridge/bridge.constants";
import type { IChatProvider } from "../bridge/interfaces/chat-provider.interface";
import { import {
JOB_CREATED, JOB_CREATED,
JOB_STARTED, JOB_STARTED,
@@ -14,10 +15,31 @@ import {
GATE_FAILED, GATE_FAILED,
} from "../job-events/event-types"; } from "../job-events/event-types";
function createMockProvider(
name: string,
connected = true
): IChatProvider & {
sendMessage: ReturnType<typeof vi.fn>;
sendThreadMessage: ReturnType<typeof vi.fn>;
createThread: ReturnType<typeof vi.fn>;
isConnected: ReturnType<typeof vi.fn>;
connect: ReturnType<typeof vi.fn>;
disconnect: ReturnType<typeof vi.fn>;
parseCommand: ReturnType<typeof vi.fn>;
} {
return {
connect: vi.fn().mockResolvedValue(undefined),
disconnect: vi.fn().mockResolvedValue(undefined),
isConnected: vi.fn().mockReturnValue(connected),
sendMessage: vi.fn().mockResolvedValue(undefined),
createThread: vi.fn().mockResolvedValue("thread-id"),
sendThreadMessage: vi.fn().mockResolvedValue(undefined),
parseCommand: vi.fn().mockReturnValue(null),
};
}
describe("HeraldService", () => { describe("HeraldService", () => {
let service: HeraldService; let service: HeraldService;
let prisma: PrismaService;
let discord: DiscordService;
const mockPrisma = { const mockPrisma = {
workspace: { workspace: {
@@ -31,14 +53,15 @@ describe("HeraldService", () => {
}, },
}; };
const mockDiscord = { let mockProviderA: ReturnType<typeof createMockProvider>;
isConnected: vi.fn(), let mockProviderB: ReturnType<typeof createMockProvider>;
sendMessage: vi.fn(), let chatProviders: IChatProvider[];
sendThreadMessage: vi.fn(),
createThread: vi.fn(),
};
beforeEach(async () => { beforeEach(async () => {
mockProviderA = createMockProvider("providerA", true);
mockProviderB = createMockProvider("providerB", true);
chatProviders = [mockProviderA, mockProviderB];
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [
HeraldService, HeraldService,
@@ -47,44 +70,28 @@ describe("HeraldService", () => {
useValue: mockPrisma, useValue: mockPrisma,
}, },
{ {
provide: DiscordService, provide: CHAT_PROVIDERS,
useValue: mockDiscord, useValue: chatProviders,
}, },
], ],
}).compile(); }).compile();
service = module.get<HeraldService>(HeraldService); service = module.get<HeraldService>(HeraldService);
prisma = module.get<PrismaService>(PrismaService);
discord = module.get<DiscordService>(DiscordService);
// Reset mocks // Reset mocks
vi.clearAllMocks(); vi.clearAllMocks();
// Restore default connected state after clearAllMocks
mockProviderA.isConnected.mockReturnValue(true);
mockProviderB.isConnected.mockReturnValue(true);
}); });
describe("broadcastJobEvent", () => { describe("broadcastJobEvent", () => {
it("should broadcast job.created event to configured channel", async () => { const baseSetup = (): {
// Arrange jobId: string;
workspaceId: string;
} => {
const workspaceId = "workspace-1"; const workspaceId = "workspace-1";
const jobId = "job-1"; const jobId = "job-1";
const event = {
id: "event-1",
jobId,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: { issueNumber: 42 },
};
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: {
herald: {
channelMappings: {
"code-task": "channel-123",
},
},
},
});
mockPrisma.runnerJob.findUnique.mockResolvedValue({ mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId, id: jobId,
@@ -98,23 +105,38 @@ describe("HeraldService", () => {
}, },
}); });
mockDiscord.isConnected.mockReturnValue(true); return { jobId, workspaceId };
mockDiscord.sendThreadMessage.mockResolvedValue(undefined); };
it("should broadcast to all connected providers", async () => {
// Arrange
const { jobId } = baseSetup();
const event = {
id: "event-1",
jobId,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: { issueNumber: 42 },
};
// Act // Act
await service.broadcastJobEvent(jobId, event); await service.broadcastJobEvent(jobId, event);
// Assert // Assert
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({ expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
content: expect.stringContaining("Job created"),
});
expect(mockProviderB.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123", threadId: "thread-123",
content: expect.stringContaining("Job created"), content: expect.stringContaining("Job created"),
}); });
}); });
it("should broadcast job.started event", async () => { it("should broadcast job.started event to all providers", async () => {
// Arrange // Arrange
const workspaceId = "workspace-1"; const { jobId } = baseSetup();
const jobId = "job-1";
const event = { const event = {
id: "event-1", id: "event-1",
jobId, jobId,
@@ -124,31 +146,15 @@ describe("HeraldService", () => {
payload: {}, payload: {},
}; };
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: { herald: { channelMappings: {} } },
});
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { threadId: "thread-123" },
},
});
mockDiscord.isConnected.mockReturnValue(true);
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
// Act // Act
await service.broadcastJobEvent(jobId, event); await service.broadcastJobEvent(jobId, event);
// Assert // Assert
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({ expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
content: expect.stringContaining("Job started"),
});
expect(mockProviderB.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123", threadId: "thread-123",
content: expect.stringContaining("Job started"), content: expect.stringContaining("Job started"),
}); });
@@ -156,8 +162,7 @@ describe("HeraldService", () => {
it("should broadcast job.completed event with success message", async () => { it("should broadcast job.completed event with success message", async () => {
// Arrange // Arrange
const workspaceId = "workspace-1"; const { jobId } = baseSetup();
const jobId = "job-1";
const event = { const event = {
id: "event-1", id: "event-1",
jobId, jobId,
@@ -167,31 +172,11 @@ describe("HeraldService", () => {
payload: { duration: 120 }, payload: { duration: 120 },
}; };
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: { herald: { channelMappings: {} } },
});
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { threadId: "thread-123" },
},
});
mockDiscord.isConnected.mockReturnValue(true);
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
// Act // Act
await service.broadcastJobEvent(jobId, event); await service.broadcastJobEvent(jobId, event);
// Assert // Assert
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({ expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123", threadId: "thread-123",
content: expect.stringContaining("completed"), content: expect.stringContaining("completed"),
}); });
@@ -199,8 +184,7 @@ describe("HeraldService", () => {
it("should broadcast job.failed event with PDA-friendly language", async () => { it("should broadcast job.failed event with PDA-friendly language", async () => {
// Arrange // Arrange
const workspaceId = "workspace-1"; const { jobId } = baseSetup();
const jobId = "job-1";
const event = { const event = {
id: "event-1", id: "event-1",
jobId, jobId,
@@ -210,43 +194,28 @@ describe("HeraldService", () => {
payload: { error: "Build failed" }, payload: { error: "Build failed" },
}; };
mockPrisma.workspace.findUnique.mockResolvedValue({
id: workspaceId,
settings: { herald: { channelMappings: {} } },
});
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { threadId: "thread-123" },
},
});
mockDiscord.isConnected.mockReturnValue(true);
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
// Act // Act
await service.broadcastJobEvent(jobId, event); await service.broadcastJobEvent(jobId, event);
// Assert // Assert
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({ expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123", threadId: "thread-123",
content: expect.stringContaining("encountered an issue"), content: expect.stringContaining("encountered an issue"),
}); });
// Verify the actual message doesn't contain demanding language // Verify the actual message doesn't contain demanding language
const actualCall = mockDiscord.sendThreadMessage.mock.calls[0][0]; const actualCall = mockProviderA.sendThreadMessage.mock.calls[0][0] as {
threadId: string;
content: string;
};
expect(actualCall.content).not.toMatch(/FAILED|ERROR|CRITICAL|URGENT/); expect(actualCall.content).not.toMatch(/FAILED|ERROR|CRITICAL|URGENT/);
}); });
it("should skip broadcasting if Discord is not connected", async () => { it("should skip disconnected providers", async () => {
// Arrange // Arrange
const workspaceId = "workspace-1"; const { jobId } = baseSetup();
const jobId = "job-1"; mockProviderA.isConnected.mockReturnValue(true);
mockProviderB.isConnected.mockReturnValue(false);
const event = { const event = {
id: "event-1", id: "event-1",
jobId, jobId,
@@ -256,14 +225,36 @@ describe("HeraldService", () => {
payload: {}, payload: {},
}; };
mockPrisma.workspace.findUnique.mockResolvedValue({ // Act
id: workspaceId, await service.broadcastJobEvent(jobId, event);
settings: { herald: { channelMappings: {} } },
// Assert
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledTimes(1);
expect(mockProviderB.sendThreadMessage).not.toHaveBeenCalled();
}); });
it("should handle empty providers array without crashing", async () => {
// Arrange — rebuild module with empty providers
const module: TestingModule = await Test.createTestingModule({
providers: [
HeraldService,
{
provide: PrismaService,
useValue: mockPrisma,
},
{
provide: CHAT_PROVIDERS,
useValue: [],
},
],
}).compile();
const emptyService = module.get<HeraldService>(HeraldService);
const jobId = "job-1";
mockPrisma.runnerJob.findUnique.mockResolvedValue({ mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId, id: jobId,
workspaceId, workspaceId: "workspace-1",
type: "code-task", type: "code-task",
}); });
@@ -273,19 +264,6 @@ describe("HeraldService", () => {
}, },
}); });
mockDiscord.isConnected.mockReturnValue(false);
// Act
await service.broadcastJobEvent(jobId, event);
// Assert
expect(mockDiscord.sendThreadMessage).not.toHaveBeenCalled();
});
it("should skip broadcasting if job has no threadId", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobId = "job-1";
const event = { const event = {
id: "event-1", id: "event-1",
jobId, jobId,
@@ -295,14 +273,59 @@ describe("HeraldService", () => {
payload: {}, payload: {},
}; };
mockPrisma.workspace.findUnique.mockResolvedValue({ // Act & Assert — should not throw
id: workspaceId, await expect(emptyService.broadcastJobEvent(jobId, event)).resolves.not.toThrow();
settings: { herald: { channelMappings: {} } },
}); });
it("should continue broadcasting when one provider errors", async () => {
// Arrange
const { jobId } = baseSetup();
mockProviderA.sendThreadMessage.mockRejectedValue(new Error("Provider A rate limit"));
mockProviderB.sendThreadMessage.mockResolvedValue(undefined);
const event = {
id: "event-1",
jobId,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: {},
};
// Act — should not throw despite provider A failing
await service.broadcastJobEvent(jobId, event);
// Assert — provider B should still have been called
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledTimes(1);
expect(mockProviderB.sendThreadMessage).toHaveBeenCalledTimes(1);
});
it("should not throw when all providers error", async () => {
// Arrange
const { jobId } = baseSetup();
mockProviderA.sendThreadMessage.mockRejectedValue(new Error("Provider A down"));
mockProviderB.sendThreadMessage.mockRejectedValue(new Error("Provider B down"));
const event = {
id: "event-1",
jobId,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: {},
};
// Act & Assert — should not throw; provider errors are logged, not propagated
await expect(service.broadcastJobEvent(jobId, event)).resolves.not.toThrow();
});
it("should skip broadcasting if job has no threadId", async () => {
// Arrange
const jobId = "job-1";
mockPrisma.runnerJob.findUnique.mockResolvedValue({ mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId, id: jobId,
workspaceId, workspaceId: "workspace-1",
type: "code-task", type: "code-task",
}); });
@@ -312,16 +335,45 @@ describe("HeraldService", () => {
}, },
}); });
mockDiscord.isConnected.mockReturnValue(true); const event = {
id: "event-1",
jobId,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: {},
};
// Act // Act
await service.broadcastJobEvent(jobId, event); await service.broadcastJobEvent(jobId, event);
// Assert // Assert
expect(mockDiscord.sendThreadMessage).not.toHaveBeenCalled(); expect(mockProviderA.sendThreadMessage).not.toHaveBeenCalled();
expect(mockProviderB.sendThreadMessage).not.toHaveBeenCalled();
}); });
// ERROR HANDLING TESTS - Issue #185 it("should skip broadcasting if job not found", async () => {
// Arrange
const jobId = "nonexistent-job";
mockPrisma.runnerJob.findUnique.mockResolvedValue(null);
const event = {
id: "event-1",
jobId,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: {},
};
// Act
await service.broadcastJobEvent(jobId, event);
// Assert
expect(mockProviderA.sendThreadMessage).not.toHaveBeenCalled();
});
// ERROR HANDLING TESTS - database errors should still propagate
it("should propagate database errors when job lookup fails", async () => { it("should propagate database errors when job lookup fails", async () => {
// Arrange // Arrange
@@ -344,43 +396,8 @@ describe("HeraldService", () => {
); );
}); });
it("should propagate Discord send failures with context", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobId = "job-1";
const event = {
id: "event-1",
jobId,
type: JOB_CREATED,
timestamp: new Date(),
actor: "system",
payload: {},
};
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { threadId: "thread-123" },
},
});
mockDiscord.isConnected.mockReturnValue(true);
const discordError = new Error("Rate limit exceeded");
mockDiscord.sendThreadMessage.mockRejectedValue(discordError);
// Act & Assert
await expect(service.broadcastJobEvent(jobId, event)).rejects.toThrow("Rate limit exceeded");
});
it("should propagate errors when fetching job events fails", async () => { it("should propagate errors when fetching job events fails", async () => {
// Arrange // Arrange
const workspaceId = "workspace-1";
const jobId = "job-1"; const jobId = "job-1";
const event = { const event = {
id: "event-1", id: "event-1",
@@ -393,61 +410,16 @@ describe("HeraldService", () => {
mockPrisma.runnerJob.findUnique.mockResolvedValue({ mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId, id: jobId,
workspaceId, workspaceId: "workspace-1",
type: "code-task", type: "code-task",
}); });
const dbError = new Error("Query timeout"); const dbError = new Error("Query timeout");
mockPrisma.jobEvent.findFirst.mockRejectedValue(dbError); mockPrisma.jobEvent.findFirst.mockRejectedValue(dbError);
mockDiscord.isConnected.mockReturnValue(true);
// Act & Assert // Act & Assert
await expect(service.broadcastJobEvent(jobId, event)).rejects.toThrow("Query timeout"); await expect(service.broadcastJobEvent(jobId, event)).rejects.toThrow("Query timeout");
}); });
it("should include job context in error messages", async () => {
// Arrange
const workspaceId = "workspace-1";
const jobId = "test-job-123";
const event = {
id: "event-1",
jobId,
type: JOB_COMPLETED,
timestamp: new Date(),
actor: "system",
payload: {},
};
mockPrisma.runnerJob.findUnique.mockResolvedValue({
id: jobId,
workspaceId,
type: "code-task",
});
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { threadId: "thread-123" },
},
});
mockDiscord.isConnected.mockReturnValue(true);
const discordError = new Error("Network failure");
mockDiscord.sendThreadMessage.mockRejectedValue(discordError);
// Act & Assert
try {
await service.broadcastJobEvent(jobId, event);
// Should not reach here
expect(true).toBe(false);
} catch (error) {
// Verify error was thrown
expect(error).toBeDefined();
// Verify original error is preserved
expect((error as Error).message).toContain("Network failure");
}
});
}); });
describe("formatJobEventMessage", () => { describe("formatJobEventMessage", () => {
@@ -473,7 +445,6 @@ describe("HeraldService", () => {
const message = service.formatJobEventMessage(event, job, metadata); const message = service.formatJobEventMessage(event, job, metadata);
// Assert // Assert
expect(message).toContain("🟢");
expect(message).toContain("Job created"); expect(message).toContain("Job created");
expect(message).toContain("#42"); expect(message).toContain("#42");
expect(message.length).toBeLessThan(200); // Keep it scannable expect(message.length).toBeLessThan(200); // Keep it scannable
@@ -526,7 +497,6 @@ describe("HeraldService", () => {
const message = service.formatJobEventMessage(event, job, metadata); const message = service.formatJobEventMessage(event, job, metadata);
// Assert // Assert
expect(message).toMatch(/✅|🟢/);
expect(message).toContain("completed"); expect(message).toContain("completed");
expect(message).not.toMatch(/COMPLETED|SUCCESS/); expect(message).not.toMatch(/COMPLETED|SUCCESS/);
}); });

View File

@@ -1,6 +1,7 @@
import { Injectable, Logger } from "@nestjs/common"; import { Inject, Injectable, Logger } from "@nestjs/common";
import { PrismaService } from "../prisma/prisma.service"; import { PrismaService } from "../prisma/prisma.service";
import { DiscordService } from "../bridge/discord/discord.service"; import { CHAT_PROVIDERS } from "../bridge/bridge.constants";
import type { IChatProvider } from "../bridge/interfaces/chat-provider.interface";
import { import {
JOB_CREATED, JOB_CREATED,
JOB_STARTED, JOB_STARTED,
@@ -21,7 +22,7 @@ import {
* - Subscribe to job events * - Subscribe to job events
* - Format status messages with PDA-friendly language * - Format status messages with PDA-friendly language
* - Route to appropriate channels based on workspace config * - Route to appropriate channels based on workspace config
* - Support Discord (via bridge) and PR comments * - Broadcast to ALL active chat providers (Discord, Matrix, etc.)
*/ */
@Injectable() @Injectable()
export class HeraldService { export class HeraldService {
@@ -29,11 +30,11 @@ export class HeraldService {
constructor( constructor(
private readonly prisma: PrismaService, private readonly prisma: PrismaService,
private readonly discord: DiscordService @Inject(CHAT_PROVIDERS) private readonly chatProviders: IChatProvider[]
) {} ) {}
/** /**
* Broadcast a job event to the appropriate channel * Broadcast a job event to all connected chat providers
*/ */
async broadcastJobEvent( async broadcastJobEvent(
jobId: string, jobId: string,
@@ -47,7 +48,6 @@ export class HeraldService {
payload: unknown; payload: unknown;
} }
): Promise<void> { ): Promise<void> {
try {
// Get job details // Get job details
const job = await this.prisma.runnerJob.findUnique({ const job = await this.prisma.runnerJob.findUnique({
where: { id: jobId }, where: { id: jobId },
@@ -63,12 +63,6 @@ export class HeraldService {
return; return;
} }
// Check if Discord is connected
if (!this.discord.isConnected()) {
this.logger.debug("Discord not connected, skipping broadcast");
return;
}
// Get threadId from first event payload (job.created event has metadata) // Get threadId from first event payload (job.created event has metadata)
const firstEvent = await this.prisma.jobEvent.findFirst({ const firstEvent = await this.prisma.jobEvent.findFirst({
where: { where: {
@@ -92,21 +86,27 @@ export class HeraldService {
// Format message // Format message
const message = this.formatJobEventMessage(event, job, metadata); const message = this.formatJobEventMessage(event, job, metadata);
// Send to thread // Broadcast to all connected providers
await this.discord.sendThreadMessage({ for (const provider of this.chatProviders) {
if (!provider.isConnected()) {
continue;
}
try {
await provider.sendThreadMessage({
threadId, threadId,
content: message, content: message,
}); });
} catch (error) {
// Log and continue — one provider failure must not block others
this.logger.error(
`Failed to broadcast event ${event.type} for job ${jobId} via provider:`,
error
);
}
}
this.logger.debug(`Broadcasted event ${event.type} for job ${jobId} to thread ${threadId}`); this.logger.debug(`Broadcasted event ${event.type} for job ${jobId} to thread ${threadId}`);
} catch (error) {
// Log the error with full context for debugging
this.logger.error(`Failed to broadcast event ${event.type} for job ${jobId}:`, error);
// Re-throw the error so callers can handle it appropriately
// This enables proper error tracking, retry logic, and alerting
throw error;
}
} }
/** /**