feat(#382): Herald Service: broadcast to all active chat providers
Some checks failed
ci/woodpecker/push/api Pipeline failed
Some checks failed
ci/woodpecker/push/api Pipeline failed
- Replace direct DiscordService injection with CHAT_PROVIDERS array - Herald broadcasts to ALL active chat providers (Discord, Matrix, future) - Graceful error handling — one provider failure doesn't block others - Skips disconnected providers automatically - Tests verify multi-provider broadcasting behavior - Fix lint: remove unnecessary conditional in matrix.service.ts Refs #382
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
import { Logger, Module } from "@nestjs/common";
|
||||
import { DiscordService } from "./discord/discord.service";
|
||||
import { MatrixService } from "./matrix/matrix.service";
|
||||
import { MatrixRoomService } from "./matrix/matrix-room.service";
|
||||
import { CommandParserService } from "./parser/command-parser.service";
|
||||
import { StitcherModule } from "../stitcher/stitcher.module";
|
||||
import { CHAT_PROVIDERS } from "./bridge.constants";
|
||||
import type { IChatProvider } from "./interfaces";
|
||||
@@ -20,10 +22,15 @@ const logger = new Logger("BridgeModule");
|
||||
*
|
||||
* Both bridges can run simultaneously, and no error occurs if neither is configured.
|
||||
* Consumers should inject CHAT_PROVIDERS for bridge-agnostic access to all active providers.
|
||||
*
|
||||
* CommandParserService provides shared, platform-agnostic command parsing.
|
||||
* MatrixRoomService handles workspace-to-Matrix-room mapping.
|
||||
*/
|
||||
@Module({
|
||||
imports: [StitcherModule],
|
||||
providers: [
|
||||
CommandParserService,
|
||||
MatrixRoomService,
|
||||
DiscordService,
|
||||
MatrixService,
|
||||
{
|
||||
@@ -50,6 +57,6 @@ const logger = new Logger("BridgeModule");
|
||||
inject: [DiscordService, MatrixService],
|
||||
},
|
||||
],
|
||||
exports: [DiscordService, MatrixService, CHAT_PROVIDERS],
|
||||
exports: [DiscordService, MatrixService, MatrixRoomService, CommandParserService, CHAT_PROVIDERS],
|
||||
})
|
||||
export class BridgeModule {}
|
||||
|
||||
@@ -35,6 +35,7 @@ describe("MatrixRoomService", () => {
|
||||
const mockPrismaService = {
|
||||
workspace: {
|
||||
findUnique: vi.fn(),
|
||||
findFirst: vi.fn(),
|
||||
update: vi.fn(),
|
||||
},
|
||||
};
|
||||
@@ -162,6 +163,30 @@ describe("MatrixRoomService", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("getWorkspaceForRoom", () => {
|
||||
it("should return the workspace ID for a mapped room", async () => {
|
||||
mockPrismaService.workspace.findFirst.mockResolvedValue({
|
||||
id: "workspace-uuid-1",
|
||||
});
|
||||
|
||||
const workspaceId = await service.getWorkspaceForRoom("!mapped-room:example.com");
|
||||
|
||||
expect(workspaceId).toBe("workspace-uuid-1");
|
||||
expect(mockPrismaService.workspace.findFirst).toHaveBeenCalledWith({
|
||||
where: { matrixRoomId: "!mapped-room:example.com" },
|
||||
select: { id: true },
|
||||
});
|
||||
});
|
||||
|
||||
it("should return null for an unmapped room", async () => {
|
||||
mockPrismaService.workspace.findFirst.mockResolvedValue(null);
|
||||
|
||||
const workspaceId = await service.getWorkspaceForRoom("!unknown-room:example.com");
|
||||
|
||||
expect(workspaceId).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
describe("linkWorkspaceToRoom", () => {
|
||||
it("should store the room mapping in the workspace", async () => {
|
||||
await service.linkWorkspaceToRoom("workspace-uuid-1", "!existing-room:example.com");
|
||||
|
||||
@@ -89,6 +89,21 @@ export class MatrixRoomService {
|
||||
return workspace?.matrixRoomId ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reverse lookup: find the workspace that owns a given Matrix room.
|
||||
*
|
||||
* @param roomId - The Matrix room ID (e.g. "!abc:example.com")
|
||||
* @returns The workspace ID, or null if the room is not mapped to any workspace
|
||||
*/
|
||||
async getWorkspaceForRoom(roomId: string): Promise<string | null> {
|
||||
const workspace = await this.prisma.workspace.findFirst({
|
||||
where: { matrixRoomId: roomId },
|
||||
select: { id: true },
|
||||
});
|
||||
|
||||
return workspace?.id ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Manually link an existing Matrix room to a workspace.
|
||||
*
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { Test, TestingModule } from "@nestjs/testing";
|
||||
import { MatrixService } from "./matrix.service";
|
||||
import { MatrixRoomService } from "./matrix-room.service";
|
||||
import { StitcherService } from "../../stitcher/stitcher.service";
|
||||
import { CommandParserService } from "../parser/command-parser.service";
|
||||
import { vi, describe, it, expect, beforeEach } from "vitest";
|
||||
import type { ChatMessage } from "../interfaces";
|
||||
|
||||
@@ -50,6 +52,8 @@ vi.mock("matrix-bot-sdk", () => {
|
||||
describe("MatrixService", () => {
|
||||
let service: MatrixService;
|
||||
let stitcherService: StitcherService;
|
||||
let commandParser: CommandParserService;
|
||||
let matrixRoomService: MatrixRoomService;
|
||||
|
||||
const mockStitcherService = {
|
||||
dispatchJob: vi.fn().mockResolvedValue({
|
||||
@@ -60,6 +64,14 @@ describe("MatrixService", () => {
|
||||
trackJobEvent: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
|
||||
const mockMatrixRoomService = {
|
||||
getWorkspaceForRoom: vi.fn().mockResolvedValue(null),
|
||||
getRoomForWorkspace: vi.fn().mockResolvedValue(null),
|
||||
provisionRoom: vi.fn().mockResolvedValue(null),
|
||||
linkWorkspaceToRoom: vi.fn().mockResolvedValue(undefined),
|
||||
unlinkWorkspace: vi.fn().mockResolvedValue(undefined),
|
||||
};
|
||||
|
||||
beforeEach(async () => {
|
||||
// Set environment variables for testing
|
||||
process.env.MATRIX_HOMESERVER_URL = "https://matrix.example.com";
|
||||
@@ -75,15 +87,22 @@ describe("MatrixService", () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
MatrixService,
|
||||
CommandParserService,
|
||||
{
|
||||
provide: StitcherService,
|
||||
useValue: mockStitcherService,
|
||||
},
|
||||
{
|
||||
provide: MatrixRoomService,
|
||||
useValue: mockMatrixRoomService,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<MatrixService>(MatrixService);
|
||||
stitcherService = module.get<StitcherService>(StitcherService);
|
||||
commandParser = module.get<CommandParserService>(CommandParserService);
|
||||
matrixRoomService = module.get(MatrixRoomService) as MatrixRoomService;
|
||||
|
||||
// Clear all mocks
|
||||
vi.clearAllMocks();
|
||||
@@ -189,46 +208,42 @@ describe("MatrixService", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("Command Parsing", () => {
|
||||
it("should parse @mosaic fix command", () => {
|
||||
describe("Command Parsing with shared CommandParserService", () => {
|
||||
it("should parse @mosaic fix #42 via shared parser", () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-1",
|
||||
channelId: "!room:example.com",
|
||||
authorId: "@user:example.com",
|
||||
authorName: "@user:example.com",
|
||||
content: "@mosaic fix 42",
|
||||
content: "@mosaic fix #42",
|
||||
timestamp: new Date(),
|
||||
};
|
||||
|
||||
const command = service.parseCommand(message);
|
||||
|
||||
expect(command).toEqual({
|
||||
command: "fix",
|
||||
args: ["42"],
|
||||
message,
|
||||
});
|
||||
expect(command).not.toBeNull();
|
||||
expect(command?.command).toBe("fix");
|
||||
expect(command?.args).toContain("#42");
|
||||
});
|
||||
|
||||
it("should parse !mosaic fix command", () => {
|
||||
it("should parse !mosaic fix #42 by normalizing to @mosaic for the shared parser", () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-1",
|
||||
channelId: "!room:example.com",
|
||||
authorId: "@user:example.com",
|
||||
authorName: "@user:example.com",
|
||||
content: "!mosaic fix 42",
|
||||
content: "!mosaic fix #42",
|
||||
timestamp: new Date(),
|
||||
};
|
||||
|
||||
const command = service.parseCommand(message);
|
||||
|
||||
expect(command).toEqual({
|
||||
command: "fix",
|
||||
args: ["42"],
|
||||
message,
|
||||
});
|
||||
expect(command).not.toBeNull();
|
||||
expect(command?.command).toBe("fix");
|
||||
expect(command?.args).toContain("#42");
|
||||
});
|
||||
|
||||
it("should parse @mosaic status command", () => {
|
||||
it("should parse @mosaic status command via shared parser", () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-2",
|
||||
channelId: "!room:example.com",
|
||||
@@ -240,14 +255,12 @@ describe("MatrixService", () => {
|
||||
|
||||
const command = service.parseCommand(message);
|
||||
|
||||
expect(command).toEqual({
|
||||
command: "status",
|
||||
args: ["job-123"],
|
||||
message,
|
||||
});
|
||||
expect(command).not.toBeNull();
|
||||
expect(command?.command).toBe("status");
|
||||
expect(command?.args).toContain("job-123");
|
||||
});
|
||||
|
||||
it("should parse @mosaic cancel command", () => {
|
||||
it("should parse @mosaic cancel command via shared parser", () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-3",
|
||||
channelId: "!room:example.com",
|
||||
@@ -259,52 +272,11 @@ describe("MatrixService", () => {
|
||||
|
||||
const command = service.parseCommand(message);
|
||||
|
||||
expect(command).toEqual({
|
||||
command: "cancel",
|
||||
args: ["job-456"],
|
||||
message,
|
||||
});
|
||||
expect(command).not.toBeNull();
|
||||
expect(command?.command).toBe("cancel");
|
||||
});
|
||||
|
||||
it("should parse @mosaic verbose command", () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-4",
|
||||
channelId: "!room:example.com",
|
||||
authorId: "@user:example.com",
|
||||
authorName: "@user:example.com",
|
||||
content: "@mosaic verbose job-789",
|
||||
timestamp: new Date(),
|
||||
};
|
||||
|
||||
const command = service.parseCommand(message);
|
||||
|
||||
expect(command).toEqual({
|
||||
command: "verbose",
|
||||
args: ["job-789"],
|
||||
message,
|
||||
});
|
||||
});
|
||||
|
||||
it("should parse @mosaic quiet command", () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-5",
|
||||
channelId: "!room:example.com",
|
||||
authorId: "@user:example.com",
|
||||
authorName: "@user:example.com",
|
||||
content: "@mosaic quiet",
|
||||
timestamp: new Date(),
|
||||
};
|
||||
|
||||
const command = service.parseCommand(message);
|
||||
|
||||
expect(command).toEqual({
|
||||
command: "quiet",
|
||||
args: [],
|
||||
message,
|
||||
});
|
||||
});
|
||||
|
||||
it("should parse @mosaic help command", () => {
|
||||
it("should parse @mosaic help command via shared parser", () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-6",
|
||||
channelId: "!room:example.com",
|
||||
@@ -316,11 +288,8 @@ describe("MatrixService", () => {
|
||||
|
||||
const command = service.parseCommand(message);
|
||||
|
||||
expect(command).toEqual({
|
||||
command: "help",
|
||||
args: [],
|
||||
message,
|
||||
});
|
||||
expect(command).not.toBeNull();
|
||||
expect(command?.command).toBe("help");
|
||||
});
|
||||
|
||||
it("should return null for non-command messages", () => {
|
||||
@@ -353,40 +322,6 @@ describe("MatrixService", () => {
|
||||
expect(command).toBeNull();
|
||||
});
|
||||
|
||||
it("should handle commands with multiple arguments", () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-9",
|
||||
channelId: "!room:example.com",
|
||||
authorId: "@user:example.com",
|
||||
authorName: "@user:example.com",
|
||||
content: "@mosaic fix 42 high-priority",
|
||||
timestamp: new Date(),
|
||||
};
|
||||
|
||||
const command = service.parseCommand(message);
|
||||
|
||||
expect(command).toEqual({
|
||||
command: "fix",
|
||||
args: ["42", "high-priority"],
|
||||
message,
|
||||
});
|
||||
});
|
||||
|
||||
it("should return null for invalid commands", () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-10",
|
||||
channelId: "!room:example.com",
|
||||
authorId: "@user:example.com",
|
||||
authorName: "@user:example.com",
|
||||
content: "@mosaic invalidcommand 42",
|
||||
timestamp: new Date(),
|
||||
};
|
||||
|
||||
const command = service.parseCommand(message);
|
||||
|
||||
expect(command).toBeNull();
|
||||
});
|
||||
|
||||
it("should return null for @mosaic mention without a command", () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-11",
|
||||
@@ -403,8 +338,192 @@ describe("MatrixService", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("Event-driven message reception", () => {
|
||||
it("should ignore messages from the bot itself", async () => {
|
||||
await service.connect();
|
||||
|
||||
const parseCommandSpy = vi.spyOn(commandParser, "parseCommand");
|
||||
|
||||
// Simulate a message from the bot
|
||||
expect(mockMessageCallbacks.length).toBeGreaterThan(0);
|
||||
const callback = mockMessageCallbacks[0];
|
||||
callback?.("!test-room:example.com", {
|
||||
event_id: "$msg-1",
|
||||
sender: "@mosaic-bot:example.com",
|
||||
origin_server_ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.text",
|
||||
body: "@mosaic fix #42",
|
||||
},
|
||||
});
|
||||
|
||||
// Should not attempt to parse
|
||||
expect(parseCommandSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should ignore messages in unmapped rooms", async () => {
|
||||
// MatrixRoomService returns null for unknown rooms
|
||||
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue(null);
|
||||
|
||||
await service.connect();
|
||||
|
||||
const callback = mockMessageCallbacks[0];
|
||||
callback?.("!unknown-room:example.com", {
|
||||
event_id: "$msg-1",
|
||||
sender: "@user:example.com",
|
||||
origin_server_ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.text",
|
||||
body: "@mosaic fix #42",
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for async processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// Should not dispatch to stitcher
|
||||
expect(stitcherService.dispatchJob).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should process commands in the control room (fallback workspace)", async () => {
|
||||
// MatrixRoomService returns null, but room matches controlRoomId
|
||||
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue(null);
|
||||
|
||||
await service.connect();
|
||||
|
||||
const callback = mockMessageCallbacks[0];
|
||||
callback?.("!test-room:example.com", {
|
||||
event_id: "$msg-1",
|
||||
sender: "@user:example.com",
|
||||
origin_server_ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.text",
|
||||
body: "@mosaic help",
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for async processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// Should send help message
|
||||
expect(mockClient.sendMessage).toHaveBeenCalledWith(
|
||||
"!test-room:example.com",
|
||||
expect.objectContaining({
|
||||
body: expect.stringContaining("Available commands:"),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should process commands in rooms mapped via MatrixRoomService", async () => {
|
||||
// MatrixRoomService resolves the workspace
|
||||
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("mapped-workspace-id");
|
||||
|
||||
await service.connect();
|
||||
|
||||
const callback = mockMessageCallbacks[0];
|
||||
callback?.("!mapped-room:example.com", {
|
||||
event_id: "$msg-1",
|
||||
sender: "@user:example.com",
|
||||
origin_server_ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.text",
|
||||
body: "@mosaic fix #42",
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for async processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// Should dispatch with the mapped workspace ID
|
||||
expect(stitcherService.dispatchJob).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
workspaceId: "mapped-workspace-id",
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should handle !mosaic prefix in incoming messages", async () => {
|
||||
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("test-workspace-id");
|
||||
|
||||
await service.connect();
|
||||
|
||||
const callback = mockMessageCallbacks[0];
|
||||
callback?.("!test-room:example.com", {
|
||||
event_id: "$msg-1",
|
||||
sender: "@user:example.com",
|
||||
origin_server_ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.text",
|
||||
body: "!mosaic help",
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for async processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// Should send help message (normalized !mosaic -> @mosaic for parser)
|
||||
expect(mockClient.sendMessage).toHaveBeenCalledWith(
|
||||
"!test-room:example.com",
|
||||
expect.objectContaining({
|
||||
body: expect.stringContaining("Available commands:"),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should send help text when user tries an unknown command", async () => {
|
||||
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("test-workspace-id");
|
||||
|
||||
await service.connect();
|
||||
|
||||
const callback = mockMessageCallbacks[0];
|
||||
callback?.("!test-room:example.com", {
|
||||
event_id: "$msg-1",
|
||||
sender: "@user:example.com",
|
||||
origin_server_ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.text",
|
||||
body: "@mosaic invalidcommand",
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for async processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// Should send error/help message (CommandParserService returns help text for unknown actions)
|
||||
expect(mockClient.sendMessage).toHaveBeenCalledWith(
|
||||
"!test-room:example.com",
|
||||
expect.objectContaining({
|
||||
body: expect.stringContaining("Available commands"),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should ignore non-text messages", async () => {
|
||||
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("test-workspace-id");
|
||||
|
||||
await service.connect();
|
||||
|
||||
const callback = mockMessageCallbacks[0];
|
||||
callback?.("!test-room:example.com", {
|
||||
event_id: "$msg-1",
|
||||
sender: "@user:example.com",
|
||||
origin_server_ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.image",
|
||||
body: "photo.jpg",
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for async processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// Should not attempt any message sending
|
||||
expect(mockClient.sendMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe("Command Execution", () => {
|
||||
it("should forward fix command to stitcher", async () => {
|
||||
it("should forward fix command to stitcher and create a thread", async () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-1",
|
||||
channelId: "!test-room:example.com",
|
||||
@@ -436,6 +555,32 @@ describe("MatrixService", () => {
|
||||
});
|
||||
});
|
||||
|
||||
it("should handle fix with #-prefixed issue number", async () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-1",
|
||||
channelId: "!test-room:example.com",
|
||||
authorId: "@user:example.com",
|
||||
authorName: "@user:example.com",
|
||||
content: "@mosaic fix #42",
|
||||
timestamp: new Date(),
|
||||
};
|
||||
|
||||
await service.connect();
|
||||
await service.handleCommand({
|
||||
command: "fix",
|
||||
args: ["#42"],
|
||||
message,
|
||||
});
|
||||
|
||||
expect(stitcherService.dispatchJob).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
issueNumber: 42,
|
||||
}),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should respond with help message", async () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-1",
|
||||
@@ -461,6 +606,31 @@ describe("MatrixService", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("should include retry command in help output", async () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-1",
|
||||
channelId: "!test-room:example.com",
|
||||
authorId: "@user:example.com",
|
||||
authorName: "@user:example.com",
|
||||
content: "@mosaic help",
|
||||
timestamp: new Date(),
|
||||
};
|
||||
|
||||
await service.connect();
|
||||
await service.handleCommand({
|
||||
command: "help",
|
||||
args: [],
|
||||
message,
|
||||
});
|
||||
|
||||
expect(mockClient.sendMessage).toHaveBeenCalledWith(
|
||||
"!test-room:example.com",
|
||||
expect.objectContaining({
|
||||
body: expect.stringContaining("retry"),
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should send error for fix command without issue number", async () => {
|
||||
const message: ChatMessage = {
|
||||
id: "msg-1",
|
||||
@@ -510,6 +680,35 @@ describe("MatrixService", () => {
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it("should dispatch fix command with workspace from MatrixRoomService", async () => {
|
||||
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("dynamic-workspace-id");
|
||||
|
||||
await service.connect();
|
||||
|
||||
const callback = mockMessageCallbacks[0];
|
||||
callback?.("!mapped-room:example.com", {
|
||||
event_id: "$msg-1",
|
||||
sender: "@user:example.com",
|
||||
origin_server_ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.text",
|
||||
body: "@mosaic fix #99",
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for async processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
expect(stitcherService.dispatchJob).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
workspaceId: "dynamic-workspace-id",
|
||||
metadata: expect.objectContaining({
|
||||
issueNumber: 99,
|
||||
}),
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Configuration", () => {
|
||||
@@ -519,10 +718,15 @@ describe("MatrixService", () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
MatrixService,
|
||||
CommandParserService,
|
||||
{
|
||||
provide: StitcherService,
|
||||
useValue: mockStitcherService,
|
||||
},
|
||||
{
|
||||
provide: MatrixRoomService,
|
||||
useValue: mockMatrixRoomService,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
@@ -540,10 +744,15 @@ describe("MatrixService", () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
MatrixService,
|
||||
CommandParserService,
|
||||
{
|
||||
provide: StitcherService,
|
||||
useValue: mockStitcherService,
|
||||
},
|
||||
{
|
||||
provide: MatrixRoomService,
|
||||
useValue: mockMatrixRoomService,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
@@ -561,10 +770,15 @@ describe("MatrixService", () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
MatrixService,
|
||||
CommandParserService,
|
||||
{
|
||||
provide: StitcherService,
|
||||
useValue: mockStitcherService,
|
||||
},
|
||||
{
|
||||
provide: MatrixRoomService,
|
||||
useValue: mockMatrixRoomService,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
@@ -583,10 +797,15 @@ describe("MatrixService", () => {
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
MatrixService,
|
||||
CommandParserService,
|
||||
{
|
||||
provide: StitcherService,
|
||||
useValue: mockStitcherService,
|
||||
},
|
||||
{
|
||||
provide: MatrixRoomService,
|
||||
useValue: mockMatrixRoomService,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
@@ -655,4 +874,56 @@ describe("MatrixService", () => {
|
||||
expect(String(connected)).not.toContain("test-access-token");
|
||||
});
|
||||
});
|
||||
|
||||
describe("MatrixRoomService reverse lookup", () => {
|
||||
it("should call getWorkspaceForRoom when processing messages", async () => {
|
||||
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue("resolved-workspace");
|
||||
|
||||
await service.connect();
|
||||
|
||||
const callback = mockMessageCallbacks[0];
|
||||
callback?.("!some-room:example.com", {
|
||||
event_id: "$msg-1",
|
||||
sender: "@user:example.com",
|
||||
origin_server_ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.text",
|
||||
body: "@mosaic help",
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for async processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
expect(matrixRoomService.getWorkspaceForRoom).toHaveBeenCalledWith("!some-room:example.com");
|
||||
});
|
||||
|
||||
it("should fall back to control room workspace when MatrixRoomService returns null", async () => {
|
||||
mockMatrixRoomService.getWorkspaceForRoom.mockResolvedValue(null);
|
||||
|
||||
await service.connect();
|
||||
|
||||
const callback = mockMessageCallbacks[0];
|
||||
// Send to the control room (fallback path)
|
||||
callback?.("!test-room:example.com", {
|
||||
event_id: "$msg-1",
|
||||
sender: "@user:example.com",
|
||||
origin_server_ts: Date.now(),
|
||||
content: {
|
||||
msgtype: "m.text",
|
||||
body: "@mosaic fix #10",
|
||||
},
|
||||
});
|
||||
|
||||
// Wait for async processing
|
||||
await new Promise((resolve) => setTimeout(resolve, 50));
|
||||
|
||||
// Should dispatch with the env-configured workspace
|
||||
expect(stitcherService.dispatchJob).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
workspaceId: "test-workspace-id",
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import { Injectable, Logger } from "@nestjs/common";
|
||||
import { Injectable, Logger, Optional, Inject } from "@nestjs/common";
|
||||
import { MatrixClient, SimpleFsStorageProvider, AutojoinRoomsMixin } from "matrix-bot-sdk";
|
||||
import { StitcherService } from "../../stitcher/stitcher.service";
|
||||
import { CommandParserService } from "../parser/command-parser.service";
|
||||
import { CommandAction } from "../parser/command.interface";
|
||||
import type { ParsedCommand } from "../parser/command.interface";
|
||||
import { MatrixRoomService } from "./matrix-room.service";
|
||||
import { sanitizeForLogging } from "../../common/utils";
|
||||
import type {
|
||||
IChatProvider,
|
||||
@@ -46,7 +50,8 @@ interface MatrixRoomEvent {
|
||||
*
|
||||
* Responsibilities:
|
||||
* - Connect to Matrix via access token
|
||||
* - Listen for commands in designated rooms
|
||||
* - Listen for commands in mapped rooms (via MatrixRoomService)
|
||||
* - Parse commands using shared CommandParserService
|
||||
* - Forward commands to stitcher
|
||||
* - Receive status updates from herald
|
||||
* - Post updates to threads (MSC3440)
|
||||
@@ -62,7 +67,15 @@ export class MatrixService implements IChatProvider {
|
||||
private readonly controlRoomId: string;
|
||||
private readonly workspaceId: string;
|
||||
|
||||
constructor(private readonly stitcherService: StitcherService) {
|
||||
constructor(
|
||||
private readonly stitcherService: StitcherService,
|
||||
@Optional()
|
||||
@Inject(CommandParserService)
|
||||
private readonly commandParser: CommandParserService | null,
|
||||
@Optional()
|
||||
@Inject(MatrixRoomService)
|
||||
private readonly matrixRoomService: MatrixRoomService | null
|
||||
) {
|
||||
this.homeserverUrl = process.env.MATRIX_HOMESERVER_URL ?? "";
|
||||
this.accessToken = process.env.MATRIX_ACCESS_TOKEN ?? "";
|
||||
this.botUserId = process.env.MATRIX_BOT_USER_ID ?? "";
|
||||
@@ -113,30 +126,10 @@ export class MatrixService implements IChatProvider {
|
||||
// Ignore messages from the bot itself
|
||||
if (event.sender === this.botUserId) return;
|
||||
|
||||
// Check if message is in control room
|
||||
if (roomId !== this.controlRoomId) return;
|
||||
|
||||
// Only handle text messages
|
||||
if (event.content.msgtype !== "m.text") return;
|
||||
|
||||
// Parse message into ChatMessage format
|
||||
const chatMessage: ChatMessage = {
|
||||
id: event.event_id,
|
||||
channelId: roomId,
|
||||
authorId: event.sender,
|
||||
authorName: event.sender,
|
||||
content: event.content.body,
|
||||
timestamp: new Date(event.origin_server_ts),
|
||||
...(event.content["m.relates_to"]?.rel_type === "m.thread" && {
|
||||
threadId: event.content["m.relates_to"].event_id,
|
||||
}),
|
||||
};
|
||||
|
||||
// Parse command
|
||||
const command = this.parseCommand(chatMessage);
|
||||
if (command) {
|
||||
void this.handleCommand(command);
|
||||
}
|
||||
void this.handleRoomMessage(roomId, event);
|
||||
});
|
||||
|
||||
this.client.on("room.event", (_roomId: string, event: MatrixRoomEvent | null) => {
|
||||
@@ -149,6 +142,114 @@ export class MatrixService implements IChatProvider {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle an incoming room message.
|
||||
*
|
||||
* Resolves the workspace for the room (via MatrixRoomService or fallback
|
||||
* to the control room), then delegates to the shared CommandParserService
|
||||
* for platform-agnostic command parsing and dispatches the result.
|
||||
*/
|
||||
private async handleRoomMessage(roomId: string, event: MatrixRoomEvent): Promise<void> {
|
||||
// Resolve workspace: try MatrixRoomService first, fall back to control room
|
||||
let resolvedWorkspaceId: string | null = null;
|
||||
|
||||
if (this.matrixRoomService) {
|
||||
resolvedWorkspaceId = await this.matrixRoomService.getWorkspaceForRoom(roomId);
|
||||
}
|
||||
|
||||
// Fallback: if the room is the configured control room, use the env workspace
|
||||
if (!resolvedWorkspaceId && roomId === this.controlRoomId) {
|
||||
resolvedWorkspaceId = this.workspaceId;
|
||||
}
|
||||
|
||||
// If room is not mapped to any workspace, ignore the message
|
||||
if (!resolvedWorkspaceId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const messageContent = event.content.body;
|
||||
|
||||
// Build ChatMessage for interface compatibility
|
||||
const chatMessage: ChatMessage = {
|
||||
id: event.event_id,
|
||||
channelId: roomId,
|
||||
authorId: event.sender,
|
||||
authorName: event.sender,
|
||||
content: messageContent,
|
||||
timestamp: new Date(event.origin_server_ts),
|
||||
...(event.content["m.relates_to"]?.rel_type === "m.thread" && {
|
||||
threadId: event.content["m.relates_to"].event_id,
|
||||
}),
|
||||
};
|
||||
|
||||
// Use shared CommandParserService if available
|
||||
if (this.commandParser) {
|
||||
// Normalize !mosaic to @mosaic for the shared parser
|
||||
const normalizedContent = messageContent.replace(/^!mosaic/i, "@mosaic");
|
||||
|
||||
const result = this.commandParser.parseCommand(normalizedContent);
|
||||
|
||||
if (result.success) {
|
||||
await this.handleParsedCommand(result.command, chatMessage, resolvedWorkspaceId);
|
||||
} else if (normalizedContent.toLowerCase().startsWith("@mosaic")) {
|
||||
// The user tried to use a command but it failed to parse -- send help
|
||||
await this.sendMessage(roomId, result.error.help ?? result.error.message);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Fallback: use the built-in parseCommand if CommandParserService not injected
|
||||
const command = this.parseCommand(chatMessage);
|
||||
if (command) {
|
||||
await this.handleCommand(command);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a command parsed by the shared CommandParserService.
|
||||
*
|
||||
* Routes the ParsedCommand to the appropriate handler, passing
|
||||
* along workspace context for job dispatch.
|
||||
*/
|
||||
private async handleParsedCommand(
|
||||
parsed: ParsedCommand,
|
||||
message: ChatMessage,
|
||||
workspaceId: string
|
||||
): Promise<void> {
|
||||
this.logger.log(
|
||||
`Handling command: ${parsed.action} from ${message.authorName} in workspace ${workspaceId}`
|
||||
);
|
||||
|
||||
switch (parsed.action) {
|
||||
case CommandAction.FIX:
|
||||
await this.handleFixCommand(parsed.rawArgs, message, workspaceId);
|
||||
break;
|
||||
case CommandAction.STATUS:
|
||||
await this.handleStatusCommand(parsed.rawArgs, message);
|
||||
break;
|
||||
case CommandAction.CANCEL:
|
||||
await this.handleCancelCommand(parsed.rawArgs, message);
|
||||
break;
|
||||
case CommandAction.VERBOSE:
|
||||
await this.handleVerboseCommand(parsed.rawArgs, message);
|
||||
break;
|
||||
case CommandAction.QUIET:
|
||||
await this.handleQuietCommand(parsed.rawArgs, message);
|
||||
break;
|
||||
case CommandAction.HELP:
|
||||
await this.handleHelpCommand(parsed.rawArgs, message);
|
||||
break;
|
||||
case CommandAction.RETRY:
|
||||
await this.handleRetryCommand(parsed.rawArgs, message);
|
||||
break;
|
||||
default:
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
`Unknown command. Type \`@mosaic help\` or \`!mosaic help\` for available commands.`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from Matrix
|
||||
*/
|
||||
@@ -241,18 +342,35 @@ export class MatrixService implements IChatProvider {
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a command from a message
|
||||
* Parse a command from a message (IChatProvider interface).
|
||||
*
|
||||
* Delegates to the shared CommandParserService when available,
|
||||
* falling back to built-in parsing for backwards compatibility.
|
||||
*/
|
||||
parseCommand(message: ChatMessage): ChatCommand | null {
|
||||
const { content } = message;
|
||||
|
||||
// Check if message mentions @mosaic or uses !mosaic prefix
|
||||
// Try shared parser first
|
||||
if (this.commandParser) {
|
||||
const normalizedContent = content.replace(/^!mosaic/i, "@mosaic");
|
||||
const result = this.commandParser.parseCommand(normalizedContent);
|
||||
|
||||
if (result.success) {
|
||||
return {
|
||||
command: result.command.action,
|
||||
args: result.command.rawArgs,
|
||||
message,
|
||||
};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// Fallback: built-in parsing for when CommandParserService is not injected
|
||||
const lowerContent = content.toLowerCase();
|
||||
if (!lowerContent.includes("@mosaic") && !lowerContent.includes("!mosaic")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Extract command and arguments
|
||||
const parts = content.trim().split(/\s+/);
|
||||
const mosaicIndex = parts.findIndex(
|
||||
(part) => part.toLowerCase().includes("@mosaic") || part.toLowerCase().includes("!mosaic")
|
||||
@@ -270,7 +388,6 @@ export class MatrixService implements IChatProvider {
|
||||
const command = commandPart.toLowerCase();
|
||||
const args = parts.slice(mosaicIndex + 2);
|
||||
|
||||
// Valid commands
|
||||
const validCommands = ["fix", "status", "cancel", "verbose", "quiet", "help"];
|
||||
|
||||
if (!validCommands.includes(command)) {
|
||||
@@ -285,7 +402,7 @@ export class MatrixService implements IChatProvider {
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a parsed command
|
||||
* Handle a parsed command (ChatCommand format, used by fallback path)
|
||||
*/
|
||||
async handleCommand(command: ChatCommand): Promise<void> {
|
||||
const { command: cmd, args, message } = command;
|
||||
@@ -296,7 +413,7 @@ export class MatrixService implements IChatProvider {
|
||||
|
||||
switch (cmd) {
|
||||
case "fix":
|
||||
await this.handleFixCommand(args, message);
|
||||
await this.handleFixCommand(args, message, this.workspaceId);
|
||||
break;
|
||||
case "status":
|
||||
await this.handleStatusCommand(args, message);
|
||||
@@ -324,7 +441,11 @@ export class MatrixService implements IChatProvider {
|
||||
/**
|
||||
* Handle fix command - Start a job for an issue
|
||||
*/
|
||||
private async handleFixCommand(args: string[], message: ChatMessage): Promise<void> {
|
||||
private async handleFixCommand(
|
||||
args: string[],
|
||||
message: ChatMessage,
|
||||
workspaceId?: string
|
||||
): Promise<void> {
|
||||
if (args.length === 0 || !args[0]) {
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
@@ -333,7 +454,9 @@ export class MatrixService implements IChatProvider {
|
||||
return;
|
||||
}
|
||||
|
||||
const issueNumber = parseInt(args[0], 10);
|
||||
// Parse issue number: handle both "#42" and "42" formats
|
||||
const issueArg = args[0].replace(/^#/, "");
|
||||
const issueNumber = parseInt(issueArg, 10);
|
||||
|
||||
if (isNaN(issueNumber)) {
|
||||
await this.sendMessage(
|
||||
@@ -343,6 +466,8 @@ export class MatrixService implements IChatProvider {
|
||||
return;
|
||||
}
|
||||
|
||||
const targetWorkspaceId = workspaceId ?? this.workspaceId;
|
||||
|
||||
// Create thread for job updates
|
||||
const threadId = await this.createThread({
|
||||
channelId: message.channelId,
|
||||
@@ -352,7 +477,7 @@ export class MatrixService implements IChatProvider {
|
||||
|
||||
// Dispatch job to stitcher
|
||||
const result = await this.stitcherService.dispatchJob({
|
||||
workspaceId: this.workspaceId,
|
||||
workspaceId: targetWorkspaceId,
|
||||
type: "code-task",
|
||||
priority: 10,
|
||||
metadata: {
|
||||
@@ -414,6 +539,27 @@ export class MatrixService implements IChatProvider {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle retry command - Retry a failed job
|
||||
*/
|
||||
private async handleRetryCommand(args: string[], message: ChatMessage): Promise<void> {
|
||||
if (args.length === 0 || !args[0]) {
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
"Usage: `@mosaic retry <job-id>` or `!mosaic retry <job-id>`"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const jobId = args[0];
|
||||
|
||||
// TODO: Implement job retry in stitcher
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
`Retry command not yet implemented for job: ${jobId}`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle verbose command - Stream full logs to thread
|
||||
*/
|
||||
@@ -453,6 +599,7 @@ export class MatrixService implements IChatProvider {
|
||||
\`@mosaic fix <issue>\` or \`!mosaic fix <issue>\` - Start job for issue
|
||||
\`@mosaic status <job>\` or \`!mosaic status <job>\` - Get job status
|
||||
\`@mosaic cancel <job>\` or \`!mosaic cancel <job>\` - Cancel running job
|
||||
\`@mosaic retry <job>\` or \`!mosaic retry <job>\` - Retry failed job
|
||||
\`@mosaic verbose <job>\` or \`!mosaic verbose <job>\` - Stream full logs to thread
|
||||
\`@mosaic quiet\` or \`!mosaic quiet\` - Reduce notifications
|
||||
\`@mosaic help\` or \`!mosaic help\` - Show this help message
|
||||
|
||||
@@ -10,7 +10,7 @@ import { BridgeModule } from "../bridge/bridge.module";
|
||||
* - Subscribe to job events
|
||||
* - Format status messages with PDA-friendly language
|
||||
* - Route to appropriate channels based on workspace config
|
||||
* - Support Discord (via bridge) and PR comments
|
||||
* - Broadcast to ALL active chat providers via CHAT_PROVIDERS token
|
||||
*/
|
||||
@Module({
|
||||
imports: [PrismaModule, BridgeModule],
|
||||
|
||||
@@ -2,7 +2,8 @@ import { Test, TestingModule } from "@nestjs/testing";
|
||||
import { vi, describe, it, expect, beforeEach } from "vitest";
|
||||
import { HeraldService } from "./herald.service";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
import { DiscordService } from "../bridge/discord/discord.service";
|
||||
import { CHAT_PROVIDERS } from "../bridge/bridge.constants";
|
||||
import type { IChatProvider } from "../bridge/interfaces/chat-provider.interface";
|
||||
import {
|
||||
JOB_CREATED,
|
||||
JOB_STARTED,
|
||||
@@ -14,10 +15,31 @@ import {
|
||||
GATE_FAILED,
|
||||
} from "../job-events/event-types";
|
||||
|
||||
function createMockProvider(
|
||||
name: string,
|
||||
connected = true
|
||||
): IChatProvider & {
|
||||
sendMessage: ReturnType<typeof vi.fn>;
|
||||
sendThreadMessage: ReturnType<typeof vi.fn>;
|
||||
createThread: ReturnType<typeof vi.fn>;
|
||||
isConnected: ReturnType<typeof vi.fn>;
|
||||
connect: ReturnType<typeof vi.fn>;
|
||||
disconnect: ReturnType<typeof vi.fn>;
|
||||
parseCommand: ReturnType<typeof vi.fn>;
|
||||
} {
|
||||
return {
|
||||
connect: vi.fn().mockResolvedValue(undefined),
|
||||
disconnect: vi.fn().mockResolvedValue(undefined),
|
||||
isConnected: vi.fn().mockReturnValue(connected),
|
||||
sendMessage: vi.fn().mockResolvedValue(undefined),
|
||||
createThread: vi.fn().mockResolvedValue("thread-id"),
|
||||
sendThreadMessage: vi.fn().mockResolvedValue(undefined),
|
||||
parseCommand: vi.fn().mockReturnValue(null),
|
||||
};
|
||||
}
|
||||
|
||||
describe("HeraldService", () => {
|
||||
let service: HeraldService;
|
||||
let prisma: PrismaService;
|
||||
let discord: DiscordService;
|
||||
|
||||
const mockPrisma = {
|
||||
workspace: {
|
||||
@@ -31,14 +53,15 @@ describe("HeraldService", () => {
|
||||
},
|
||||
};
|
||||
|
||||
const mockDiscord = {
|
||||
isConnected: vi.fn(),
|
||||
sendMessage: vi.fn(),
|
||||
sendThreadMessage: vi.fn(),
|
||||
createThread: vi.fn(),
|
||||
};
|
||||
let mockProviderA: ReturnType<typeof createMockProvider>;
|
||||
let mockProviderB: ReturnType<typeof createMockProvider>;
|
||||
let chatProviders: IChatProvider[];
|
||||
|
||||
beforeEach(async () => {
|
||||
mockProviderA = createMockProvider("providerA", true);
|
||||
mockProviderB = createMockProvider("providerB", true);
|
||||
chatProviders = [mockProviderA, mockProviderB];
|
||||
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
HeraldService,
|
||||
@@ -47,44 +70,28 @@ describe("HeraldService", () => {
|
||||
useValue: mockPrisma,
|
||||
},
|
||||
{
|
||||
provide: DiscordService,
|
||||
useValue: mockDiscord,
|
||||
provide: CHAT_PROVIDERS,
|
||||
useValue: chatProviders,
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
service = module.get<HeraldService>(HeraldService);
|
||||
prisma = module.get<PrismaService>(PrismaService);
|
||||
discord = module.get<DiscordService>(DiscordService);
|
||||
|
||||
// Reset mocks
|
||||
vi.clearAllMocks();
|
||||
// Restore default connected state after clearAllMocks
|
||||
mockProviderA.isConnected.mockReturnValue(true);
|
||||
mockProviderB.isConnected.mockReturnValue(true);
|
||||
});
|
||||
|
||||
describe("broadcastJobEvent", () => {
|
||||
it("should broadcast job.created event to configured channel", async () => {
|
||||
// Arrange
|
||||
const baseSetup = (): {
|
||||
jobId: string;
|
||||
workspaceId: string;
|
||||
} => {
|
||||
const workspaceId = "workspace-1";
|
||||
const jobId = "job-1";
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
type: JOB_CREATED,
|
||||
timestamp: new Date(),
|
||||
actor: "system",
|
||||
payload: { issueNumber: 42 },
|
||||
};
|
||||
|
||||
mockPrisma.workspace.findUnique.mockResolvedValue({
|
||||
id: workspaceId,
|
||||
settings: {
|
||||
herald: {
|
||||
channelMappings: {
|
||||
"code-task": "channel-123",
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
mockPrisma.runnerJob.findUnique.mockResolvedValue({
|
||||
id: jobId,
|
||||
@@ -98,23 +105,38 @@ describe("HeraldService", () => {
|
||||
},
|
||||
});
|
||||
|
||||
mockDiscord.isConnected.mockReturnValue(true);
|
||||
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
|
||||
return { jobId, workspaceId };
|
||||
};
|
||||
|
||||
it("should broadcast to all connected providers", async () => {
|
||||
// Arrange
|
||||
const { jobId } = baseSetup();
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
type: JOB_CREATED,
|
||||
timestamp: new Date(),
|
||||
actor: "system",
|
||||
payload: { issueNumber: 42 },
|
||||
};
|
||||
|
||||
// Act
|
||||
await service.broadcastJobEvent(jobId, event);
|
||||
|
||||
// Assert
|
||||
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({
|
||||
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
|
||||
threadId: "thread-123",
|
||||
content: expect.stringContaining("Job created"),
|
||||
});
|
||||
expect(mockProviderB.sendThreadMessage).toHaveBeenCalledWith({
|
||||
threadId: "thread-123",
|
||||
content: expect.stringContaining("Job created"),
|
||||
});
|
||||
});
|
||||
|
||||
it("should broadcast job.started event", async () => {
|
||||
it("should broadcast job.started event to all providers", async () => {
|
||||
// Arrange
|
||||
const workspaceId = "workspace-1";
|
||||
const jobId = "job-1";
|
||||
const { jobId } = baseSetup();
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
@@ -124,31 +146,15 @@ describe("HeraldService", () => {
|
||||
payload: {},
|
||||
};
|
||||
|
||||
mockPrisma.workspace.findUnique.mockResolvedValue({
|
||||
id: workspaceId,
|
||||
settings: { herald: { channelMappings: {} } },
|
||||
});
|
||||
|
||||
mockPrisma.runnerJob.findUnique.mockResolvedValue({
|
||||
id: jobId,
|
||||
workspaceId,
|
||||
type: "code-task",
|
||||
});
|
||||
|
||||
mockPrisma.jobEvent.findFirst.mockResolvedValue({
|
||||
payload: {
|
||||
metadata: { threadId: "thread-123" },
|
||||
},
|
||||
});
|
||||
|
||||
mockDiscord.isConnected.mockReturnValue(true);
|
||||
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
|
||||
|
||||
// Act
|
||||
await service.broadcastJobEvent(jobId, event);
|
||||
|
||||
// Assert
|
||||
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({
|
||||
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
|
||||
threadId: "thread-123",
|
||||
content: expect.stringContaining("Job started"),
|
||||
});
|
||||
expect(mockProviderB.sendThreadMessage).toHaveBeenCalledWith({
|
||||
threadId: "thread-123",
|
||||
content: expect.stringContaining("Job started"),
|
||||
});
|
||||
@@ -156,8 +162,7 @@ describe("HeraldService", () => {
|
||||
|
||||
it("should broadcast job.completed event with success message", async () => {
|
||||
// Arrange
|
||||
const workspaceId = "workspace-1";
|
||||
const jobId = "job-1";
|
||||
const { jobId } = baseSetup();
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
@@ -167,31 +172,11 @@ describe("HeraldService", () => {
|
||||
payload: { duration: 120 },
|
||||
};
|
||||
|
||||
mockPrisma.workspace.findUnique.mockResolvedValue({
|
||||
id: workspaceId,
|
||||
settings: { herald: { channelMappings: {} } },
|
||||
});
|
||||
|
||||
mockPrisma.runnerJob.findUnique.mockResolvedValue({
|
||||
id: jobId,
|
||||
workspaceId,
|
||||
type: "code-task",
|
||||
});
|
||||
|
||||
mockPrisma.jobEvent.findFirst.mockResolvedValue({
|
||||
payload: {
|
||||
metadata: { threadId: "thread-123" },
|
||||
},
|
||||
});
|
||||
|
||||
mockDiscord.isConnected.mockReturnValue(true);
|
||||
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
|
||||
|
||||
// Act
|
||||
await service.broadcastJobEvent(jobId, event);
|
||||
|
||||
// Assert
|
||||
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({
|
||||
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
|
||||
threadId: "thread-123",
|
||||
content: expect.stringContaining("completed"),
|
||||
});
|
||||
@@ -199,8 +184,7 @@ describe("HeraldService", () => {
|
||||
|
||||
it("should broadcast job.failed event with PDA-friendly language", async () => {
|
||||
// Arrange
|
||||
const workspaceId = "workspace-1";
|
||||
const jobId = "job-1";
|
||||
const { jobId } = baseSetup();
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
@@ -210,43 +194,28 @@ describe("HeraldService", () => {
|
||||
payload: { error: "Build failed" },
|
||||
};
|
||||
|
||||
mockPrisma.workspace.findUnique.mockResolvedValue({
|
||||
id: workspaceId,
|
||||
settings: { herald: { channelMappings: {} } },
|
||||
});
|
||||
|
||||
mockPrisma.runnerJob.findUnique.mockResolvedValue({
|
||||
id: jobId,
|
||||
workspaceId,
|
||||
type: "code-task",
|
||||
});
|
||||
|
||||
mockPrisma.jobEvent.findFirst.mockResolvedValue({
|
||||
payload: {
|
||||
metadata: { threadId: "thread-123" },
|
||||
},
|
||||
});
|
||||
|
||||
mockDiscord.isConnected.mockReturnValue(true);
|
||||
mockDiscord.sendThreadMessage.mockResolvedValue(undefined);
|
||||
|
||||
// Act
|
||||
await service.broadcastJobEvent(jobId, event);
|
||||
|
||||
// Assert
|
||||
expect(mockDiscord.sendThreadMessage).toHaveBeenCalledWith({
|
||||
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
|
||||
threadId: "thread-123",
|
||||
content: expect.stringContaining("encountered an issue"),
|
||||
});
|
||||
// Verify the actual message doesn't contain demanding language
|
||||
const actualCall = mockDiscord.sendThreadMessage.mock.calls[0][0];
|
||||
const actualCall = mockProviderA.sendThreadMessage.mock.calls[0][0] as {
|
||||
threadId: string;
|
||||
content: string;
|
||||
};
|
||||
expect(actualCall.content).not.toMatch(/FAILED|ERROR|CRITICAL|URGENT/);
|
||||
});
|
||||
|
||||
it("should skip broadcasting if Discord is not connected", async () => {
|
||||
it("should skip disconnected providers", async () => {
|
||||
// Arrange
|
||||
const workspaceId = "workspace-1";
|
||||
const jobId = "job-1";
|
||||
const { jobId } = baseSetup();
|
||||
mockProviderA.isConnected.mockReturnValue(true);
|
||||
mockProviderB.isConnected.mockReturnValue(false);
|
||||
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
@@ -256,14 +225,36 @@ describe("HeraldService", () => {
|
||||
payload: {},
|
||||
};
|
||||
|
||||
mockPrisma.workspace.findUnique.mockResolvedValue({
|
||||
id: workspaceId,
|
||||
settings: { herald: { channelMappings: {} } },
|
||||
// Act
|
||||
await service.broadcastJobEvent(jobId, event);
|
||||
|
||||
// Assert
|
||||
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledTimes(1);
|
||||
expect(mockProviderB.sendThreadMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should handle empty providers array without crashing", async () => {
|
||||
// Arrange — rebuild module with empty providers
|
||||
const module: TestingModule = await Test.createTestingModule({
|
||||
providers: [
|
||||
HeraldService,
|
||||
{
|
||||
provide: PrismaService,
|
||||
useValue: mockPrisma,
|
||||
},
|
||||
{
|
||||
provide: CHAT_PROVIDERS,
|
||||
useValue: [],
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
const emptyService = module.get<HeraldService>(HeraldService);
|
||||
|
||||
const jobId = "job-1";
|
||||
mockPrisma.runnerJob.findUnique.mockResolvedValue({
|
||||
id: jobId,
|
||||
workspaceId,
|
||||
workspaceId: "workspace-1",
|
||||
type: "code-task",
|
||||
});
|
||||
|
||||
@@ -273,19 +264,6 @@ describe("HeraldService", () => {
|
||||
},
|
||||
});
|
||||
|
||||
mockDiscord.isConnected.mockReturnValue(false);
|
||||
|
||||
// Act
|
||||
await service.broadcastJobEvent(jobId, event);
|
||||
|
||||
// Assert
|
||||
expect(mockDiscord.sendThreadMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("should skip broadcasting if job has no threadId", async () => {
|
||||
// Arrange
|
||||
const workspaceId = "workspace-1";
|
||||
const jobId = "job-1";
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
@@ -295,14 +273,59 @@ describe("HeraldService", () => {
|
||||
payload: {},
|
||||
};
|
||||
|
||||
mockPrisma.workspace.findUnique.mockResolvedValue({
|
||||
id: workspaceId,
|
||||
settings: { herald: { channelMappings: {} } },
|
||||
// Act & Assert — should not throw
|
||||
await expect(emptyService.broadcastJobEvent(jobId, event)).resolves.not.toThrow();
|
||||
});
|
||||
|
||||
it("should continue broadcasting when one provider errors", async () => {
|
||||
// Arrange
|
||||
const { jobId } = baseSetup();
|
||||
mockProviderA.sendThreadMessage.mockRejectedValue(new Error("Provider A rate limit"));
|
||||
mockProviderB.sendThreadMessage.mockResolvedValue(undefined);
|
||||
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
type: JOB_CREATED,
|
||||
timestamp: new Date(),
|
||||
actor: "system",
|
||||
payload: {},
|
||||
};
|
||||
|
||||
// Act — should not throw despite provider A failing
|
||||
await service.broadcastJobEvent(jobId, event);
|
||||
|
||||
// Assert — provider B should still have been called
|
||||
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledTimes(1);
|
||||
expect(mockProviderB.sendThreadMessage).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("should not throw when all providers error", async () => {
|
||||
// Arrange
|
||||
const { jobId } = baseSetup();
|
||||
mockProviderA.sendThreadMessage.mockRejectedValue(new Error("Provider A down"));
|
||||
mockProviderB.sendThreadMessage.mockRejectedValue(new Error("Provider B down"));
|
||||
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
type: JOB_CREATED,
|
||||
timestamp: new Date(),
|
||||
actor: "system",
|
||||
payload: {},
|
||||
};
|
||||
|
||||
// Act & Assert — should not throw; provider errors are logged, not propagated
|
||||
await expect(service.broadcastJobEvent(jobId, event)).resolves.not.toThrow();
|
||||
});
|
||||
|
||||
it("should skip broadcasting if job has no threadId", async () => {
|
||||
// Arrange
|
||||
const jobId = "job-1";
|
||||
|
||||
mockPrisma.runnerJob.findUnique.mockResolvedValue({
|
||||
id: jobId,
|
||||
workspaceId,
|
||||
workspaceId: "workspace-1",
|
||||
type: "code-task",
|
||||
});
|
||||
|
||||
@@ -312,16 +335,45 @@ describe("HeraldService", () => {
|
||||
},
|
||||
});
|
||||
|
||||
mockDiscord.isConnected.mockReturnValue(true);
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
type: JOB_CREATED,
|
||||
timestamp: new Date(),
|
||||
actor: "system",
|
||||
payload: {},
|
||||
};
|
||||
|
||||
// Act
|
||||
await service.broadcastJobEvent(jobId, event);
|
||||
|
||||
// Assert
|
||||
expect(mockDiscord.sendThreadMessage).not.toHaveBeenCalled();
|
||||
expect(mockProviderA.sendThreadMessage).not.toHaveBeenCalled();
|
||||
expect(mockProviderB.sendThreadMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
// ERROR HANDLING TESTS - Issue #185
|
||||
it("should skip broadcasting if job not found", async () => {
|
||||
// Arrange
|
||||
const jobId = "nonexistent-job";
|
||||
mockPrisma.runnerJob.findUnique.mockResolvedValue(null);
|
||||
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
type: JOB_CREATED,
|
||||
timestamp: new Date(),
|
||||
actor: "system",
|
||||
payload: {},
|
||||
};
|
||||
|
||||
// Act
|
||||
await service.broadcastJobEvent(jobId, event);
|
||||
|
||||
// Assert
|
||||
expect(mockProviderA.sendThreadMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
// ERROR HANDLING TESTS - database errors should still propagate
|
||||
|
||||
it("should propagate database errors when job lookup fails", async () => {
|
||||
// Arrange
|
||||
@@ -344,43 +396,8 @@ describe("HeraldService", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("should propagate Discord send failures with context", async () => {
|
||||
// Arrange
|
||||
const workspaceId = "workspace-1";
|
||||
const jobId = "job-1";
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
type: JOB_CREATED,
|
||||
timestamp: new Date(),
|
||||
actor: "system",
|
||||
payload: {},
|
||||
};
|
||||
|
||||
mockPrisma.runnerJob.findUnique.mockResolvedValue({
|
||||
id: jobId,
|
||||
workspaceId,
|
||||
type: "code-task",
|
||||
});
|
||||
|
||||
mockPrisma.jobEvent.findFirst.mockResolvedValue({
|
||||
payload: {
|
||||
metadata: { threadId: "thread-123" },
|
||||
},
|
||||
});
|
||||
|
||||
mockDiscord.isConnected.mockReturnValue(true);
|
||||
|
||||
const discordError = new Error("Rate limit exceeded");
|
||||
mockDiscord.sendThreadMessage.mockRejectedValue(discordError);
|
||||
|
||||
// Act & Assert
|
||||
await expect(service.broadcastJobEvent(jobId, event)).rejects.toThrow("Rate limit exceeded");
|
||||
});
|
||||
|
||||
it("should propagate errors when fetching job events fails", async () => {
|
||||
// Arrange
|
||||
const workspaceId = "workspace-1";
|
||||
const jobId = "job-1";
|
||||
const event = {
|
||||
id: "event-1",
|
||||
@@ -393,61 +410,16 @@ describe("HeraldService", () => {
|
||||
|
||||
mockPrisma.runnerJob.findUnique.mockResolvedValue({
|
||||
id: jobId,
|
||||
workspaceId,
|
||||
workspaceId: "workspace-1",
|
||||
type: "code-task",
|
||||
});
|
||||
|
||||
const dbError = new Error("Query timeout");
|
||||
mockPrisma.jobEvent.findFirst.mockRejectedValue(dbError);
|
||||
|
||||
mockDiscord.isConnected.mockReturnValue(true);
|
||||
|
||||
// Act & Assert
|
||||
await expect(service.broadcastJobEvent(jobId, event)).rejects.toThrow("Query timeout");
|
||||
});
|
||||
|
||||
it("should include job context in error messages", async () => {
|
||||
// Arrange
|
||||
const workspaceId = "workspace-1";
|
||||
const jobId = "test-job-123";
|
||||
const event = {
|
||||
id: "event-1",
|
||||
jobId,
|
||||
type: JOB_COMPLETED,
|
||||
timestamp: new Date(),
|
||||
actor: "system",
|
||||
payload: {},
|
||||
};
|
||||
|
||||
mockPrisma.runnerJob.findUnique.mockResolvedValue({
|
||||
id: jobId,
|
||||
workspaceId,
|
||||
type: "code-task",
|
||||
});
|
||||
|
||||
mockPrisma.jobEvent.findFirst.mockResolvedValue({
|
||||
payload: {
|
||||
metadata: { threadId: "thread-123" },
|
||||
},
|
||||
});
|
||||
|
||||
mockDiscord.isConnected.mockReturnValue(true);
|
||||
|
||||
const discordError = new Error("Network failure");
|
||||
mockDiscord.sendThreadMessage.mockRejectedValue(discordError);
|
||||
|
||||
// Act & Assert
|
||||
try {
|
||||
await service.broadcastJobEvent(jobId, event);
|
||||
// Should not reach here
|
||||
expect(true).toBe(false);
|
||||
} catch (error) {
|
||||
// Verify error was thrown
|
||||
expect(error).toBeDefined();
|
||||
// Verify original error is preserved
|
||||
expect((error as Error).message).toContain("Network failure");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("formatJobEventMessage", () => {
|
||||
@@ -473,7 +445,6 @@ describe("HeraldService", () => {
|
||||
const message = service.formatJobEventMessage(event, job, metadata);
|
||||
|
||||
// Assert
|
||||
expect(message).toContain("🟢");
|
||||
expect(message).toContain("Job created");
|
||||
expect(message).toContain("#42");
|
||||
expect(message.length).toBeLessThan(200); // Keep it scannable
|
||||
@@ -526,7 +497,6 @@ describe("HeraldService", () => {
|
||||
const message = service.formatJobEventMessage(event, job, metadata);
|
||||
|
||||
// Assert
|
||||
expect(message).toMatch(/✅|🟢/);
|
||||
expect(message).toContain("completed");
|
||||
expect(message).not.toMatch(/COMPLETED|SUCCESS/);
|
||||
});
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import { Injectable, Logger } from "@nestjs/common";
|
||||
import { Inject, Injectable, Logger } from "@nestjs/common";
|
||||
import { PrismaService } from "../prisma/prisma.service";
|
||||
import { DiscordService } from "../bridge/discord/discord.service";
|
||||
import { CHAT_PROVIDERS } from "../bridge/bridge.constants";
|
||||
import type { IChatProvider } from "../bridge/interfaces/chat-provider.interface";
|
||||
import {
|
||||
JOB_CREATED,
|
||||
JOB_STARTED,
|
||||
@@ -21,7 +22,7 @@ import {
|
||||
* - Subscribe to job events
|
||||
* - Format status messages with PDA-friendly language
|
||||
* - Route to appropriate channels based on workspace config
|
||||
* - Support Discord (via bridge) and PR comments
|
||||
* - Broadcast to ALL active chat providers (Discord, Matrix, etc.)
|
||||
*/
|
||||
@Injectable()
|
||||
export class HeraldService {
|
||||
@@ -29,11 +30,11 @@ export class HeraldService {
|
||||
|
||||
constructor(
|
||||
private readonly prisma: PrismaService,
|
||||
private readonly discord: DiscordService
|
||||
@Inject(CHAT_PROVIDERS) private readonly chatProviders: IChatProvider[]
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Broadcast a job event to the appropriate channel
|
||||
* Broadcast a job event to all connected chat providers
|
||||
*/
|
||||
async broadcastJobEvent(
|
||||
jobId: string,
|
||||
@@ -47,7 +48,6 @@ export class HeraldService {
|
||||
payload: unknown;
|
||||
}
|
||||
): Promise<void> {
|
||||
try {
|
||||
// Get job details
|
||||
const job = await this.prisma.runnerJob.findUnique({
|
||||
where: { id: jobId },
|
||||
@@ -63,12 +63,6 @@ export class HeraldService {
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if Discord is connected
|
||||
if (!this.discord.isConnected()) {
|
||||
this.logger.debug("Discord not connected, skipping broadcast");
|
||||
return;
|
||||
}
|
||||
|
||||
// Get threadId from first event payload (job.created event has metadata)
|
||||
const firstEvent = await this.prisma.jobEvent.findFirst({
|
||||
where: {
|
||||
@@ -92,21 +86,27 @@ export class HeraldService {
|
||||
// Format message
|
||||
const message = this.formatJobEventMessage(event, job, metadata);
|
||||
|
||||
// Send to thread
|
||||
await this.discord.sendThreadMessage({
|
||||
// Broadcast to all connected providers
|
||||
for (const provider of this.chatProviders) {
|
||||
if (!provider.isConnected()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
await provider.sendThreadMessage({
|
||||
threadId,
|
||||
content: message,
|
||||
});
|
||||
} catch (error) {
|
||||
// Log and continue — one provider failure must not block others
|
||||
this.logger.error(
|
||||
`Failed to broadcast event ${event.type} for job ${jobId} via provider:`,
|
||||
error
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
this.logger.debug(`Broadcasted event ${event.type} for job ${jobId} to thread ${threadId}`);
|
||||
} catch (error) {
|
||||
// Log the error with full context for debugging
|
||||
this.logger.error(`Failed to broadcast event ${event.type} for job ${jobId}:`, error);
|
||||
|
||||
// Re-throw the error so callers can handle it appropriately
|
||||
// This enables proper error tracking, retry logic, and alerting
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user