feat: M12-MatrixBridge — Matrix/Element chat bridge integration #408

Merged
jason.woltje merged 18 commits from feature/m12-matrix-bridge into develop 2026-02-15 18:22:17 +00:00
13 changed files with 179 additions and 76 deletions
Showing only changes of commit 8d19ac1f4b - Show all commits

View File

@@ -46,6 +46,16 @@ const logger = new Logger("BridgeModule");
}
if (process.env.MATRIX_ACCESS_TOKEN) {
const missingVars = [
"MATRIX_HOMESERVER_URL",
"MATRIX_BOT_USER_ID",
"MATRIX_WORKSPACE_ID",
].filter((v) => !process.env[v]);
if (missingVars.length > 0) {
logger.warn(
`Matrix bridge enabled but missing: ${missingVars.join(", ")}. connect() will fail.`
);
}
providers.push(matrix);
logger.log("Matrix bridge enabled (MATRIX_ACCESS_TOKEN detected)");
}

View File

@@ -187,6 +187,7 @@ describe("DiscordService", () => {
await service.connect();
await service.sendThreadMessage({
threadId: "thread-123",
channelId: "test-channel-id",
content: "Step completed",
});

View File

@@ -305,6 +305,7 @@ export class DiscordService implements IChatProvider {
// Send confirmation to thread
await this.sendThreadMessage({
threadId,
channelId: message.channelId,
content: `Job created: ${result.jobId}\nStatus: ${result.status}\nQueue: ${result.queueName}`,
});
}

View File

@@ -28,6 +28,7 @@ export interface ThreadCreateOptions {
export interface ThreadMessageOptions {
threadId: string;
channelId: string;
content: string;
}

View File

@@ -486,9 +486,9 @@ describe("Matrix Bridge Integration Tests", () => {
})
);
// Confirmation message sent as thread reply
// Confirmation message sent as thread reply (uses channelId from message, not hardcoded controlRoomId)
const confirmationCall = sendCalls[1];
expect(confirmationCall?.[0]).toBe("!control-room:example.com");
expect(confirmationCall?.[0]).toBe("!room:example.com");
expect(confirmationCall?.[1]).toEqual(
expect.objectContaining({
body: expect.stringContaining("Job created: job-integ-001"),
@@ -519,7 +519,7 @@ describe("Matrix Bridge Integration Tests", () => {
setMatrixEnv();
// Create a connected mock MatrixService that tracks sendThreadMessage calls
const threadMessages: Array<{ threadId: string; content: string }> = [];
const threadMessages: Array<{ threadId: string; channelId: string; content: string }> = [];
const mockMatrixProvider: IChatProvider = {
connect: vi.fn().mockResolvedValue(undefined),
disconnect: vi.fn().mockResolvedValue(undefined),
@@ -527,7 +527,7 @@ describe("Matrix Bridge Integration Tests", () => {
sendMessage: vi.fn().mockResolvedValue(undefined),
createThread: vi.fn().mockResolvedValue("$thread-id"),
sendThreadMessage: vi.fn().mockImplementation(async (options) => {
threadMessages.push(options as { threadId: string; content: string });
threadMessages.push(options as { threadId: string; channelId: string; content: string });
}),
parseCommand: vi.fn().mockReturnValue(null),
};
@@ -545,6 +545,7 @@ describe("Matrix Bridge Integration Tests", () => {
payload: {
metadata: {
threadId: "$thread-herald-root",
channelId: "!herald-room:example.com",
issueNumber: 55,
},
},
@@ -617,6 +618,7 @@ describe("Matrix Bridge Integration Tests", () => {
payload: {
metadata: {
threadId: "$thread-skip",
channelId: "!skip-room:example.com",
issueNumber: 1,
},
},
@@ -689,6 +691,7 @@ describe("Matrix Bridge Integration Tests", () => {
payload: {
metadata: {
threadId: "$thread-err",
channelId: "!err-room:example.com",
issueNumber: 77,
},
},

View File

@@ -24,12 +24,13 @@ describe("MatrixRoomService", () => {
const mockCreateRoom = vi.fn().mockResolvedValue("!new-room:example.com");
const mockMatrixClient = {
createRoom: mockCreateRoom,
};
const mockMatrixService = {
isConnected: vi.fn().mockReturnValue(true),
// Private field accessed by MatrixRoomService.getMatrixClient()
client: {
createRoom: mockCreateRoom,
},
getClient: vi.fn().mockReturnValue(mockMatrixClient),
};
const mockPrismaService = {

View File

@@ -64,10 +64,17 @@ export class MatrixRoomService {
const roomId = await client.createRoom(roomOptions);
// Store the room mapping
await this.prisma.workspace.update({
where: { id: workspaceId },
data: { matrixRoomId: roomId },
});
try {
await this.prisma.workspace.update({
where: { id: workspaceId },
data: { matrixRoomId: roomId },
});
} catch (dbError: unknown) {
this.logger.error(
`Failed to store room mapping for workspace ${workspaceId}, room ${roomId} may be orphaned: ${dbError instanceof Error ? dbError.message : "unknown"}`
);
throw dbError;
}
this.logger.log(`Matrix room ${roomId} provisioned and linked to workspace ${workspaceId}`);
@@ -134,19 +141,11 @@ export class MatrixRoomService {
}
/**
* Access the underlying MatrixClient from the MatrixService.
*
* The MatrixService stores the client as a private field, so we
* access it via a known private property name. This is intentional
* to avoid exposing the client publicly on the service interface.
* Access the underlying MatrixClient from the MatrixService
* via the public getClient() accessor.
*/
private getMatrixClient(): MatrixClient | null {
if (!this.matrixService) return null;
// Access the private client field from MatrixService.
// MatrixService stores `client` as a private property; we use a type assertion
// to access it since exposing it publicly is not appropriate for the service API.
const service = this.matrixService as unknown as { client: MatrixClient | null };
return service.client;
return this.matrixService.getClient();
}
}

View File

@@ -195,14 +195,26 @@ export class MatrixStreamingService {
this.logger.error(`Stream error in room ${roomId}: ${errorMessage}`);
// Edit message to show error
const errorContent = accumulatedText
? `${accumulatedText}\n\n[Streaming error: ${errorMessage}]`
: `[Streaming error: ${errorMessage}]`;
try {
const errorContent = accumulatedText
? `${accumulatedText}\n\n[Streaming error: ${errorMessage}]`
: `[Streaming error: ${errorMessage}]`;
await this.editMessage(roomId, eventId, errorContent);
await this.editMessage(roomId, eventId, errorContent);
} catch (editError: unknown) {
this.logger.warn(
`Failed to edit error message in ${roomId}: ${editError instanceof Error ? editError.message : "unknown"}`
);
}
} finally {
// Step 4: Clear typing indicator
await this.setTypingIndicator(roomId, false);
try {
await this.setTypingIndicator(roomId, false);
} catch (typingError: unknown) {
this.logger.warn(
`Failed to clear typing indicator in ${roomId}: ${typingError instanceof Error ? typingError.message : "unknown"}`
);
}
}
// Step 5: Final edit with clean output (if no error)

View File

@@ -171,6 +171,7 @@ describe("MatrixService", () => {
await service.connect();
await service.sendThreadMessage({
threadId: "$root-event-id",
channelId: "!test-room:example.com",
content: "Step completed",
});
@@ -188,6 +189,28 @@ describe("MatrixService", () => {
});
});
it("should fall back to controlRoomId when channelId is empty", async () => {
await service.connect();
await service.sendThreadMessage({
threadId: "$root-event-id",
channelId: "",
content: "Fallback message",
});
expect(mockClient.sendMessage).toHaveBeenCalledWith("!test-room:example.com", {
msgtype: "m.text",
body: "Fallback message",
"m.relates_to": {
rel_type: "m.thread",
event_id: "$root-event-id",
is_falling_back: true,
"m.in_reply_to": {
event_id: "$root-event-id",
},
},
});
});
it("should throw error when creating thread without connection", async () => {
await expect(
service.createThread({
@@ -202,6 +225,7 @@ describe("MatrixService", () => {
await expect(
service.sendThreadMessage({
threadId: "$event-id",
channelId: "!room:example.com",
content: "Test",
})
).rejects.toThrow("Matrix client is not connected");
@@ -764,6 +788,32 @@ describe("MatrixService", () => {
process.env.MATRIX_ACCESS_TOKEN = "test-access-token";
});
it("should throw error if MATRIX_BOT_USER_ID is not set", async () => {
delete process.env.MATRIX_BOT_USER_ID;
const module: TestingModule = await Test.createTestingModule({
providers: [
MatrixService,
CommandParserService,
{
provide: StitcherService,
useValue: mockStitcherService,
},
{
provide: MatrixRoomService,
useValue: mockMatrixRoomService,
},
],
}).compile();
const newService = module.get<MatrixService>(MatrixService);
await expect(newService.connect()).rejects.toThrow("MATRIX_BOT_USER_ID is required");
// Restore for other tests
process.env.MATRIX_BOT_USER_ID = "@mosaic-bot:example.com";
});
it("should throw error if MATRIX_WORKSPACE_ID is not set", async () => {
delete process.env.MATRIX_WORKSPACE_ID;

View File

@@ -99,6 +99,10 @@ export class MatrixService implements IChatProvider {
throw new Error("MATRIX_WORKSPACE_ID is required");
}
if (!this.botUserId) {
throw new Error("MATRIX_BOT_USER_ID is required");
}
this.logger.log("Connecting to Matrix...");
const storage = new SimpleFsStorageProvider("matrix-bot-storage.json");
@@ -129,7 +133,12 @@ export class MatrixService implements IChatProvider {
// Only handle text messages
if (event.content.msgtype !== "m.text") return;
void this.handleRoomMessage(roomId, event);
this.handleRoomMessage(roomId, event).catch((error: unknown) => {
this.logger.error(
`Error handling room message in ${roomId}:`,
error instanceof Error ? error.message : error
);
});
});
this.client.on("room.event", (_roomId: string, event: MatrixRoomEvent | null) => {
@@ -332,10 +341,10 @@ export class MatrixService implements IChatProvider {
throw new Error("Matrix client is not connected");
}
const { threadId, content } = options;
const { threadId, channelId, content } = options;
// Extract roomId from the control room (threads are room-scoped)
const roomId = this.controlRoomId;
// Use the channelId from options (threads are room-scoped), fall back to control room
const roomId = channelId || this.controlRoomId;
const threadContent: MatrixMessageContent = {
msgtype: "m.text",
@@ -488,25 +497,38 @@ export class MatrixService implements IChatProvider {
});
// Dispatch job to stitcher
const result = await this.stitcherService.dispatchJob({
workspaceId: targetWorkspaceId,
type: "code-task",
priority: 10,
metadata: {
issueNumber,
command: "fix",
channelId: message.channelId,
threadId: threadId,
authorId: message.authorId,
authorName: message.authorName,
},
});
try {
const result = await this.stitcherService.dispatchJob({
workspaceId: targetWorkspaceId,
type: "code-task",
priority: 10,
metadata: {
issueNumber,
command: "fix",
channelId: message.channelId,
threadId: threadId,
authorId: message.authorId,
authorName: message.authorName,
},
});
// Send confirmation to thread
await this.sendThreadMessage({
threadId,
content: `Job created: ${result.jobId}\nStatus: ${result.status}\nQueue: ${result.queueName}`,
});
// Send confirmation to thread
await this.sendThreadMessage({
threadId,
channelId: message.channelId,
content: `Job created: ${result.jobId}\nStatus: ${result.status}\nQueue: ${result.queueName}`,
});
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : "Unknown error";
this.logger.error(
`Failed to dispatch job for issue #${String(issueNumber)}: ${errorMessage}`
);
await this.sendThreadMessage({
threadId,
channelId: message.channelId,
content: `Failed to start job: ${errorMessage}`,
});
}
}
/**

View File

@@ -101,7 +101,7 @@ describe("HeraldService", () => {
mockPrisma.jobEvent.findFirst.mockResolvedValue({
payload: {
metadata: { issueNumber: 42, threadId: "thread-123" },
metadata: { issueNumber: 42, threadId: "thread-123", channelId: "channel-abc" },
},
});
@@ -126,10 +126,12 @@ describe("HeraldService", () => {
// Assert
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
channelId: "channel-abc",
content: expect.stringContaining("Job created"),
});
expect(mockProviderB.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
channelId: "channel-abc",
content: expect.stringContaining("Job created"),
});
});
@@ -152,10 +154,12 @@ describe("HeraldService", () => {
// Assert
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
channelId: "channel-abc",
content: expect.stringContaining("Job started"),
});
expect(mockProviderB.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
channelId: "channel-abc",
content: expect.stringContaining("Job started"),
});
});
@@ -178,6 +182,7 @@ describe("HeraldService", () => {
// Assert
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
channelId: "channel-abc",
content: expect.stringContaining("completed"),
});
});
@@ -200,11 +205,13 @@ describe("HeraldService", () => {
// Assert
expect(mockProviderA.sendThreadMessage).toHaveBeenCalledWith({
threadId: "thread-123",
channelId: "channel-abc",
content: expect.stringContaining("encountered an issue"),
});
// Verify the actual message doesn't contain demanding language
const actualCall = mockProviderA.sendThreadMessage.mock.calls[0][0] as {
threadId: string;
channelId: string;
content: string;
};
expect(actualCall.content).not.toMatch(/FAILED|ERROR|CRITICAL|URGENT/);

View File

@@ -77,6 +77,7 @@ export class HeraldService {
const firstEventPayload = firstEvent?.payload as Record<string, unknown> | undefined;
const metadata = firstEventPayload?.metadata as Record<string, unknown> | undefined;
const threadId = metadata?.threadId as string | undefined;
const channelId = metadata?.channelId as string | undefined;
if (!threadId) {
this.logger.debug(`Job ${jobId} has no threadId, skipping broadcast`);
@@ -95,13 +96,15 @@ export class HeraldService {
try {
await provider.sendThreadMessage({
threadId,
channelId: channelId ?? "",
content: message,
});
} catch (error) {
} catch (error: unknown) {
// Log and continue — one provider failure must not block others
const providerName = provider.constructor.name;
this.logger.error(
`Failed to broadcast event ${event.type} for job ${jobId} via provider:`,
error
`Failed to broadcast event ${event.type} for job ${jobId} via ${providerName}:`,
error instanceof Error ? error.message : error
);
}
}

View File

@@ -112,14 +112,11 @@ echo ""
echo "Step 2: Obtaining admin access token..."
ADMIN_LOGIN_RESPONSE=$(curl -sS -X POST "${SYNAPSE_URL}/_matrix/client/v3/login" \
-H "Content-Type: application/json" \
-d "{
\"type\": \"m.login.password\",
\"identifier\": {
\"type\": \"m.id.user\",
\"user\": \"${ADMIN_USERNAME}\"
},
\"password\": \"${ADMIN_PASSWORD}\"
}" 2>/dev/null)
-d "$(jq -n \
--arg user "$ADMIN_USERNAME" \
--arg pw "$ADMIN_PASSWORD" \
'{type: "m.login.password", identifier: {type: "m.id.user", user: $user}, password: $pw}')" \
2>/dev/null)
ADMIN_TOKEN=$(echo "${ADMIN_LOGIN_RESPONSE}" | python3 -c "import sys,json; print(json.load(sys.stdin).get('access_token',''))" 2>/dev/null || true)
@@ -140,12 +137,11 @@ echo "Step 3: Registering bot account '${BOT_USERNAME}'..."
BOT_REGISTER_RESPONSE=$(curl -sS -X PUT "${SYNAPSE_URL}/_synapse/admin/v2/users/@${BOT_USERNAME}:localhost" \
-H "Authorization: Bearer ${ADMIN_TOKEN}" \
-H "Content-Type: application/json" \
-d "{
\"password\": \"${BOT_PASSWORD}\",
\"displayname\": \"${BOT_DISPLAY_NAME}\",
\"admin\": false,
\"deactivated\": false
}" 2>/dev/null)
-d "$(jq -n \
--arg pw "$BOT_PASSWORD" \
--arg dn "$BOT_DISPLAY_NAME" \
'{password: $pw, displayname: $dn, admin: false, deactivated: false}')" \
2>/dev/null)
BOT_EXISTS=$(echo "${BOT_REGISTER_RESPONSE}" | python3 -c "import sys,json; d=json.load(sys.stdin); print('yes' if d.get('name') else 'no')" 2>/dev/null || echo "no")
@@ -162,14 +158,11 @@ echo ""
echo "Step 4: Obtaining bot access token..."
BOT_LOGIN_RESPONSE=$(curl -sS -X POST "${SYNAPSE_URL}/_matrix/client/v3/login" \
-H "Content-Type: application/json" \
-d "{
\"type\": \"m.login.password\",
\"identifier\": {
\"type\": \"m.id.user\",
\"user\": \"${BOT_USERNAME}\"
},
\"password\": \"${BOT_PASSWORD}\"
}" 2>/dev/null)
-d "$(jq -n \
--arg user "$BOT_USERNAME" \
--arg pw "$BOT_PASSWORD" \
'{type: "m.login.password", identifier: {type: "m.id.user", user: $user}, password: $pw}')" \
2>/dev/null)
BOT_TOKEN=$(echo "${BOT_LOGIN_RESPONSE}" | python3 -c "import sys,json; print(json.load(sys.stdin).get('access_token',''))" 2>/dev/null || true)