feat(#378): Install matrix-bot-sdk and create MatrixService skeleton
- Add matrix-bot-sdk dependency to @mosaic/api - Create MatrixService implementing IChatProvider interface - Support connect/disconnect, message sending, thread management - Parse @mosaic and !mosaic command prefixes - Delegate commands to StitcherService (same flow as Discord) - Add comprehensive unit tests with mocked MatrixClient (31 tests) - Add Matrix env vars to .env.example Refs #378 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
468
apps/api/src/bridge/matrix/matrix.service.ts
Normal file
468
apps/api/src/bridge/matrix/matrix.service.ts
Normal file
@@ -0,0 +1,468 @@
|
||||
import { Injectable, Logger } from "@nestjs/common";
|
||||
import { MatrixClient, SimpleFsStorageProvider, AutojoinRoomsMixin } from "matrix-bot-sdk";
|
||||
import { StitcherService } from "../../stitcher/stitcher.service";
|
||||
import { sanitizeForLogging } from "../../common/utils";
|
||||
import type {
|
||||
IChatProvider,
|
||||
ChatMessage,
|
||||
ChatCommand,
|
||||
ThreadCreateOptions,
|
||||
ThreadMessageOptions,
|
||||
} from "../interfaces";
|
||||
|
||||
/**
|
||||
* Matrix room message event content
|
||||
*/
|
||||
interface MatrixMessageContent {
|
||||
msgtype: string;
|
||||
body: string;
|
||||
"m.relates_to"?: MatrixRelatesTo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Matrix relationship metadata for threads (MSC3440)
|
||||
*/
|
||||
interface MatrixRelatesTo {
|
||||
rel_type: string;
|
||||
event_id: string;
|
||||
is_falling_back?: boolean;
|
||||
"m.in_reply_to"?: {
|
||||
event_id: string;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Matrix room event structure
|
||||
*/
|
||||
interface MatrixRoomEvent {
|
||||
event_id: string;
|
||||
sender: string;
|
||||
origin_server_ts: number;
|
||||
content: MatrixMessageContent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Matrix Service - Matrix chat platform integration
|
||||
*
|
||||
* Responsibilities:
|
||||
* - Connect to Matrix via access token
|
||||
* - Listen for commands in designated rooms
|
||||
* - Forward commands to stitcher
|
||||
* - Receive status updates from herald
|
||||
* - Post updates to threads (MSC3440)
|
||||
*/
|
||||
@Injectable()
|
||||
export class MatrixService implements IChatProvider {
|
||||
private readonly logger = new Logger(MatrixService.name);
|
||||
private client: MatrixClient | null = null;
|
||||
private connected = false;
|
||||
private readonly homeserverUrl: string;
|
||||
private readonly accessToken: string;
|
||||
private readonly botUserId: string;
|
||||
private readonly controlRoomId: string;
|
||||
private readonly workspaceId: string;
|
||||
|
||||
constructor(private readonly stitcherService: StitcherService) {
|
||||
this.homeserverUrl = process.env.MATRIX_HOMESERVER_URL ?? "";
|
||||
this.accessToken = process.env.MATRIX_ACCESS_TOKEN ?? "";
|
||||
this.botUserId = process.env.MATRIX_BOT_USER_ID ?? "";
|
||||
this.controlRoomId = process.env.MATRIX_CONTROL_ROOM_ID ?? "";
|
||||
this.workspaceId = process.env.MATRIX_WORKSPACE_ID ?? "";
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to Matrix homeserver
|
||||
*/
|
||||
async connect(): Promise<void> {
|
||||
if (!this.homeserverUrl) {
|
||||
throw new Error("MATRIX_HOMESERVER_URL is required");
|
||||
}
|
||||
|
||||
if (!this.accessToken) {
|
||||
throw new Error("MATRIX_ACCESS_TOKEN is required");
|
||||
}
|
||||
|
||||
if (!this.workspaceId) {
|
||||
throw new Error("MATRIX_WORKSPACE_ID is required");
|
||||
}
|
||||
|
||||
this.logger.log("Connecting to Matrix...");
|
||||
|
||||
const storage = new SimpleFsStorageProvider("matrix-bot-storage.json");
|
||||
this.client = new MatrixClient(this.homeserverUrl, this.accessToken, storage);
|
||||
|
||||
// Auto-join rooms when invited
|
||||
AutojoinRoomsMixin.setupOnClient(this.client);
|
||||
|
||||
// Setup event handlers
|
||||
this.setupEventHandlers();
|
||||
|
||||
// Start syncing
|
||||
await this.client.start();
|
||||
this.connected = true;
|
||||
this.logger.log(`Matrix bot connected as ${this.botUserId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup event handlers for Matrix client
|
||||
*/
|
||||
private setupEventHandlers(): void {
|
||||
if (!this.client) return;
|
||||
|
||||
this.client.on("room.message", (roomId: string, event: MatrixRoomEvent) => {
|
||||
// Ignore messages from the bot itself
|
||||
if (event.sender === this.botUserId) return;
|
||||
|
||||
// Check if message is in control room
|
||||
if (roomId !== this.controlRoomId) return;
|
||||
|
||||
// Only handle text messages
|
||||
if (event.content.msgtype !== "m.text") return;
|
||||
|
||||
// Parse message into ChatMessage format
|
||||
const chatMessage: ChatMessage = {
|
||||
id: event.event_id,
|
||||
channelId: roomId,
|
||||
authorId: event.sender,
|
||||
authorName: event.sender,
|
||||
content: event.content.body,
|
||||
timestamp: new Date(event.origin_server_ts),
|
||||
...(event.content["m.relates_to"]?.rel_type === "m.thread" && {
|
||||
threadId: event.content["m.relates_to"].event_id,
|
||||
}),
|
||||
};
|
||||
|
||||
// Parse command
|
||||
const command = this.parseCommand(chatMessage);
|
||||
if (command) {
|
||||
void this.handleCommand(command);
|
||||
}
|
||||
});
|
||||
|
||||
this.client.on("room.event", (_roomId: string, event: MatrixRoomEvent | null) => {
|
||||
// Handle errors emitted as events
|
||||
if (!event) {
|
||||
const error = new Error("Received null event from Matrix");
|
||||
const sanitizedError = sanitizeForLogging(error);
|
||||
this.logger.error("Matrix client error:", sanitizedError);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnect from Matrix
|
||||
*/
|
||||
disconnect(): Promise<void> {
|
||||
this.logger.log("Disconnecting from Matrix...");
|
||||
this.connected = false;
|
||||
if (this.client) {
|
||||
this.client.stop();
|
||||
}
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the provider is connected
|
||||
*/
|
||||
isConnected(): boolean {
|
||||
return this.connected;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to a room
|
||||
*/
|
||||
async sendMessage(roomId: string, content: string): Promise<void> {
|
||||
if (!this.client) {
|
||||
throw new Error("Matrix client is not connected");
|
||||
}
|
||||
|
||||
const messageContent: MatrixMessageContent = {
|
||||
msgtype: "m.text",
|
||||
body: content,
|
||||
};
|
||||
|
||||
await this.client.sendMessage(roomId, messageContent);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a thread for job updates (MSC3440)
|
||||
*
|
||||
* Matrix threads are created by sending an initial message
|
||||
* and then replying with m.thread relation. The initial
|
||||
* message event ID becomes the thread root.
|
||||
*/
|
||||
async createThread(options: ThreadCreateOptions): Promise<string> {
|
||||
if (!this.client) {
|
||||
throw new Error("Matrix client is not connected");
|
||||
}
|
||||
|
||||
const { channelId, name, message } = options;
|
||||
|
||||
// Send the initial message that becomes the thread root
|
||||
const initialContent: MatrixMessageContent = {
|
||||
msgtype: "m.text",
|
||||
body: `[${name}] ${message}`,
|
||||
};
|
||||
|
||||
const eventId = await this.client.sendMessage(channelId, initialContent);
|
||||
|
||||
return eventId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a message to a thread (MSC3440)
|
||||
*
|
||||
* Uses m.thread relation to associate the message with the thread root event.
|
||||
*/
|
||||
async sendThreadMessage(options: ThreadMessageOptions): Promise<void> {
|
||||
if (!this.client) {
|
||||
throw new Error("Matrix client is not connected");
|
||||
}
|
||||
|
||||
const { threadId, content } = options;
|
||||
|
||||
// Extract roomId from the control room (threads are room-scoped)
|
||||
const roomId = this.controlRoomId;
|
||||
|
||||
const threadContent: MatrixMessageContent = {
|
||||
msgtype: "m.text",
|
||||
body: content,
|
||||
"m.relates_to": {
|
||||
rel_type: "m.thread",
|
||||
event_id: threadId,
|
||||
is_falling_back: true,
|
||||
"m.in_reply_to": {
|
||||
event_id: threadId,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
await this.client.sendMessage(roomId, threadContent);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse a command from a message
|
||||
*/
|
||||
parseCommand(message: ChatMessage): ChatCommand | null {
|
||||
const { content } = message;
|
||||
|
||||
// Check if message mentions @mosaic or uses !mosaic prefix
|
||||
const lowerContent = content.toLowerCase();
|
||||
if (!lowerContent.includes("@mosaic") && !lowerContent.includes("!mosaic")) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Extract command and arguments
|
||||
const parts = content.trim().split(/\s+/);
|
||||
const mosaicIndex = parts.findIndex(
|
||||
(part) => part.toLowerCase().includes("@mosaic") || part.toLowerCase().includes("!mosaic")
|
||||
);
|
||||
|
||||
if (mosaicIndex === -1 || mosaicIndex === parts.length - 1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const commandPart = parts[mosaicIndex + 1];
|
||||
if (!commandPart) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const command = commandPart.toLowerCase();
|
||||
const args = parts.slice(mosaicIndex + 2);
|
||||
|
||||
// Valid commands
|
||||
const validCommands = ["fix", "status", "cancel", "verbose", "quiet", "help"];
|
||||
|
||||
if (!validCommands.includes(command)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return {
|
||||
command,
|
||||
args,
|
||||
message,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a parsed command
|
||||
*/
|
||||
async handleCommand(command: ChatCommand): Promise<void> {
|
||||
const { command: cmd, args, message } = command;
|
||||
|
||||
this.logger.log(
|
||||
`Handling command: ${cmd} with args: ${args.join(", ")} from ${message.authorName}`
|
||||
);
|
||||
|
||||
switch (cmd) {
|
||||
case "fix":
|
||||
await this.handleFixCommand(args, message);
|
||||
break;
|
||||
case "status":
|
||||
await this.handleStatusCommand(args, message);
|
||||
break;
|
||||
case "cancel":
|
||||
await this.handleCancelCommand(args, message);
|
||||
break;
|
||||
case "verbose":
|
||||
await this.handleVerboseCommand(args, message);
|
||||
break;
|
||||
case "quiet":
|
||||
await this.handleQuietCommand(args, message);
|
||||
break;
|
||||
case "help":
|
||||
await this.handleHelpCommand(args, message);
|
||||
break;
|
||||
default:
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
`Unknown command: ${cmd}. Type \`@mosaic help\` or \`!mosaic help\` for available commands.`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle fix command - Start a job for an issue
|
||||
*/
|
||||
private async handleFixCommand(args: string[], message: ChatMessage): Promise<void> {
|
||||
if (args.length === 0 || !args[0]) {
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
"Usage: `@mosaic fix <issue-number>` or `!mosaic fix <issue-number>`"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const issueNumber = parseInt(args[0], 10);
|
||||
|
||||
if (isNaN(issueNumber)) {
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
"Invalid issue number. Please provide a numeric issue number."
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Create thread for job updates
|
||||
const threadId = await this.createThread({
|
||||
channelId: message.channelId,
|
||||
name: `Job #${String(issueNumber)}`,
|
||||
message: `Starting job for issue #${String(issueNumber)}...`,
|
||||
});
|
||||
|
||||
// Dispatch job to stitcher
|
||||
const result = await this.stitcherService.dispatchJob({
|
||||
workspaceId: this.workspaceId,
|
||||
type: "code-task",
|
||||
priority: 10,
|
||||
metadata: {
|
||||
issueNumber,
|
||||
command: "fix",
|
||||
channelId: message.channelId,
|
||||
threadId: threadId,
|
||||
authorId: message.authorId,
|
||||
authorName: message.authorName,
|
||||
},
|
||||
});
|
||||
|
||||
// Send confirmation to thread
|
||||
await this.sendThreadMessage({
|
||||
threadId,
|
||||
content: `Job created: ${result.jobId}\nStatus: ${result.status}\nQueue: ${result.queueName}`,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle status command - Get job status
|
||||
*/
|
||||
private async handleStatusCommand(args: string[], message: ChatMessage): Promise<void> {
|
||||
if (args.length === 0 || !args[0]) {
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
"Usage: `@mosaic status <job-id>` or `!mosaic status <job-id>`"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const jobId = args[0];
|
||||
|
||||
// TODO: Implement job status retrieval from stitcher
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
`Status command not yet implemented for job: ${jobId}`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle cancel command - Cancel a running job
|
||||
*/
|
||||
private async handleCancelCommand(args: string[], message: ChatMessage): Promise<void> {
|
||||
if (args.length === 0 || !args[0]) {
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
"Usage: `@mosaic cancel <job-id>` or `!mosaic cancel <job-id>`"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const jobId = args[0];
|
||||
|
||||
// TODO: Implement job cancellation in stitcher
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
`Cancel command not yet implemented for job: ${jobId}`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle verbose command - Stream full logs to thread
|
||||
*/
|
||||
private async handleVerboseCommand(args: string[], message: ChatMessage): Promise<void> {
|
||||
if (args.length === 0 || !args[0]) {
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
"Usage: `@mosaic verbose <job-id>` or `!mosaic verbose <job-id>`"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const jobId = args[0];
|
||||
|
||||
// TODO: Implement verbose logging
|
||||
await this.sendMessage(message.channelId, `Verbose mode not yet implemented for job: ${jobId}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle quiet command - Reduce notifications
|
||||
*/
|
||||
private async handleQuietCommand(_args: string[], message: ChatMessage): Promise<void> {
|
||||
// TODO: Implement quiet mode
|
||||
await this.sendMessage(
|
||||
message.channelId,
|
||||
"Quiet mode not yet implemented. Currently showing milestone updates only."
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle help command - Show available commands
|
||||
*/
|
||||
private async handleHelpCommand(_args: string[], message: ChatMessage): Promise<void> {
|
||||
const helpMessage = `
|
||||
**Available commands:**
|
||||
|
||||
\`@mosaic fix <issue>\` or \`!mosaic fix <issue>\` - Start job for issue
|
||||
\`@mosaic status <job>\` or \`!mosaic status <job>\` - Get job status
|
||||
\`@mosaic cancel <job>\` or \`!mosaic cancel <job>\` - Cancel running job
|
||||
\`@mosaic verbose <job>\` or \`!mosaic verbose <job>\` - Stream full logs to thread
|
||||
\`@mosaic quiet\` or \`!mosaic quiet\` - Reduce notifications
|
||||
\`@mosaic help\` or \`!mosaic help\` - Show this help message
|
||||
|
||||
**Noise Management:**
|
||||
- Main room: Low verbosity (milestones only)
|
||||
- Job threads: Medium verbosity (step completions)
|
||||
- DMs: Configurable per user
|
||||
`.trim();
|
||||
|
||||
await this.sendMessage(message.channelId, helpMessage);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user