- Fix sendThreadMessage room mismatch: use channelId from options instead of hardcoded controlRoomId - Add .catch() to fire-and-forget handleRoomMessage to prevent silent error swallowing - Wrap dispatchJob in try-catch for user-visible error reporting in handleFixCommand - Add MATRIX_BOT_USER_ID validation in connect() to prevent infinite message loops - Fix streamResponse error masking: wrap finally/catch side-effects in try-catch - Replace unsafe type assertion with public getClient() in MatrixRoomService - Add orphaned room warning in provisionRoom on DB failure - Add provider identity to Herald error logs - Add channelId to ThreadMessageOptions interface and all callers - Add missing env var warnings in BridgeModule factory - Fix JSON injection in setup-bot.sh: use jq for safe JSON construction Fixes #377 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
650 lines
19 KiB
TypeScript
650 lines
19 KiB
TypeScript
import { Injectable, Logger, Optional, Inject } from "@nestjs/common";
|
|
import { MatrixClient, SimpleFsStorageProvider, AutojoinRoomsMixin } from "matrix-bot-sdk";
|
|
import { StitcherService } from "../../stitcher/stitcher.service";
|
|
import { CommandParserService } from "../parser/command-parser.service";
|
|
import { CommandAction } from "../parser/command.interface";
|
|
import type { ParsedCommand } from "../parser/command.interface";
|
|
import { MatrixRoomService } from "./matrix-room.service";
|
|
import { sanitizeForLogging } from "../../common/utils";
|
|
import type {
|
|
IChatProvider,
|
|
ChatMessage,
|
|
ChatCommand,
|
|
ThreadCreateOptions,
|
|
ThreadMessageOptions,
|
|
} from "../interfaces";
|
|
|
|
/**
|
|
* Matrix room message event content
|
|
*/
|
|
interface MatrixMessageContent {
|
|
msgtype: string;
|
|
body: string;
|
|
"m.relates_to"?: MatrixRelatesTo;
|
|
}
|
|
|
|
/**
|
|
* Matrix relationship metadata for threads (MSC3440)
|
|
*/
|
|
interface MatrixRelatesTo {
|
|
rel_type: string;
|
|
event_id: string;
|
|
is_falling_back?: boolean;
|
|
"m.in_reply_to"?: {
|
|
event_id: string;
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Matrix room event structure
|
|
*/
|
|
interface MatrixRoomEvent {
|
|
event_id: string;
|
|
sender: string;
|
|
origin_server_ts: number;
|
|
content: MatrixMessageContent;
|
|
}
|
|
|
|
/**
|
|
* Matrix Service - Matrix chat platform integration
|
|
*
|
|
* Responsibilities:
|
|
* - Connect to Matrix via access token
|
|
* - Listen for commands in mapped rooms (via MatrixRoomService)
|
|
* - Parse commands using shared CommandParserService
|
|
* - Forward commands to stitcher
|
|
* - Receive status updates from herald
|
|
* - Post updates to threads (MSC3440)
|
|
*/
|
|
@Injectable()
|
|
export class MatrixService implements IChatProvider {
|
|
private readonly logger = new Logger(MatrixService.name);
|
|
private client: MatrixClient | null = null;
|
|
private connected = false;
|
|
private readonly homeserverUrl: string;
|
|
private readonly accessToken: string;
|
|
private readonly botUserId: string;
|
|
private readonly controlRoomId: string;
|
|
private readonly workspaceId: string;
|
|
|
|
constructor(
|
|
private readonly stitcherService: StitcherService,
|
|
@Optional()
|
|
@Inject(CommandParserService)
|
|
private readonly commandParser: CommandParserService | null,
|
|
@Optional()
|
|
@Inject(MatrixRoomService)
|
|
private readonly matrixRoomService: MatrixRoomService | null
|
|
) {
|
|
this.homeserverUrl = process.env.MATRIX_HOMESERVER_URL ?? "";
|
|
this.accessToken = process.env.MATRIX_ACCESS_TOKEN ?? "";
|
|
this.botUserId = process.env.MATRIX_BOT_USER_ID ?? "";
|
|
this.controlRoomId = process.env.MATRIX_CONTROL_ROOM_ID ?? "";
|
|
this.workspaceId = process.env.MATRIX_WORKSPACE_ID ?? "";
|
|
}
|
|
|
|
/**
|
|
* Connect to Matrix homeserver
|
|
*/
|
|
async connect(): Promise<void> {
|
|
if (!this.homeserverUrl) {
|
|
throw new Error("MATRIX_HOMESERVER_URL is required");
|
|
}
|
|
|
|
if (!this.accessToken) {
|
|
throw new Error("MATRIX_ACCESS_TOKEN is required");
|
|
}
|
|
|
|
if (!this.workspaceId) {
|
|
throw new Error("MATRIX_WORKSPACE_ID is required");
|
|
}
|
|
|
|
if (!this.botUserId) {
|
|
throw new Error("MATRIX_BOT_USER_ID is required");
|
|
}
|
|
|
|
this.logger.log("Connecting to Matrix...");
|
|
|
|
const storage = new SimpleFsStorageProvider("matrix-bot-storage.json");
|
|
this.client = new MatrixClient(this.homeserverUrl, this.accessToken, storage);
|
|
|
|
// Auto-join rooms when invited
|
|
AutojoinRoomsMixin.setupOnClient(this.client);
|
|
|
|
// Setup event handlers
|
|
this.setupEventHandlers();
|
|
|
|
// Start syncing
|
|
await this.client.start();
|
|
this.connected = true;
|
|
this.logger.log(`Matrix bot connected as ${this.botUserId}`);
|
|
}
|
|
|
|
/**
|
|
* Setup event handlers for Matrix client
|
|
*/
|
|
private setupEventHandlers(): void {
|
|
if (!this.client) return;
|
|
|
|
this.client.on("room.message", (roomId: string, event: MatrixRoomEvent) => {
|
|
// Ignore messages from the bot itself
|
|
if (event.sender === this.botUserId) return;
|
|
|
|
// Only handle text messages
|
|
if (event.content.msgtype !== "m.text") return;
|
|
|
|
this.handleRoomMessage(roomId, event).catch((error: unknown) => {
|
|
this.logger.error(
|
|
`Error handling room message in ${roomId}:`,
|
|
error instanceof Error ? error.message : error
|
|
);
|
|
});
|
|
});
|
|
|
|
this.client.on("room.event", (_roomId: string, event: MatrixRoomEvent | null) => {
|
|
// Handle errors emitted as events
|
|
if (!event) {
|
|
const error = new Error("Received null event from Matrix");
|
|
const sanitizedError = sanitizeForLogging(error);
|
|
this.logger.error("Matrix client error:", sanitizedError);
|
|
}
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Handle an incoming room message.
|
|
*
|
|
* Resolves the workspace for the room (via MatrixRoomService or fallback
|
|
* to the control room), then delegates to the shared CommandParserService
|
|
* for platform-agnostic command parsing and dispatches the result.
|
|
*/
|
|
private async handleRoomMessage(roomId: string, event: MatrixRoomEvent): Promise<void> {
|
|
// Resolve workspace: try MatrixRoomService first, fall back to control room
|
|
let resolvedWorkspaceId: string | null = null;
|
|
|
|
if (this.matrixRoomService) {
|
|
resolvedWorkspaceId = await this.matrixRoomService.getWorkspaceForRoom(roomId);
|
|
}
|
|
|
|
// Fallback: if the room is the configured control room, use the env workspace
|
|
if (!resolvedWorkspaceId && roomId === this.controlRoomId) {
|
|
resolvedWorkspaceId = this.workspaceId;
|
|
}
|
|
|
|
// If room is not mapped to any workspace, ignore the message
|
|
if (!resolvedWorkspaceId) {
|
|
return;
|
|
}
|
|
|
|
const messageContent = event.content.body;
|
|
|
|
// Build ChatMessage for interface compatibility
|
|
const chatMessage: ChatMessage = {
|
|
id: event.event_id,
|
|
channelId: roomId,
|
|
authorId: event.sender,
|
|
authorName: event.sender,
|
|
content: messageContent,
|
|
timestamp: new Date(event.origin_server_ts),
|
|
...(event.content["m.relates_to"]?.rel_type === "m.thread" && {
|
|
threadId: event.content["m.relates_to"].event_id,
|
|
}),
|
|
};
|
|
|
|
// Use shared CommandParserService if available
|
|
if (this.commandParser) {
|
|
// Normalize !mosaic to @mosaic for the shared parser
|
|
const normalizedContent = messageContent.replace(/^!mosaic/i, "@mosaic");
|
|
|
|
const result = this.commandParser.parseCommand(normalizedContent);
|
|
|
|
if (result.success) {
|
|
await this.handleParsedCommand(result.command, chatMessage, resolvedWorkspaceId);
|
|
} else if (normalizedContent.toLowerCase().startsWith("@mosaic")) {
|
|
// The user tried to use a command but it failed to parse -- send help
|
|
await this.sendMessage(roomId, result.error.help ?? result.error.message);
|
|
}
|
|
return;
|
|
}
|
|
|
|
// Fallback: use the built-in parseCommand if CommandParserService not injected
|
|
const command = this.parseCommand(chatMessage);
|
|
if (command) {
|
|
await this.handleCommand(command);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a command parsed by the shared CommandParserService.
|
|
*
|
|
* Routes the ParsedCommand to the appropriate handler, passing
|
|
* along workspace context for job dispatch.
|
|
*/
|
|
private async handleParsedCommand(
|
|
parsed: ParsedCommand,
|
|
message: ChatMessage,
|
|
workspaceId: string
|
|
): Promise<void> {
|
|
this.logger.log(
|
|
`Handling command: ${parsed.action} from ${message.authorName} in workspace ${workspaceId}`
|
|
);
|
|
|
|
switch (parsed.action) {
|
|
case CommandAction.FIX:
|
|
await this.handleFixCommand(parsed.rawArgs, message, workspaceId);
|
|
break;
|
|
case CommandAction.STATUS:
|
|
await this.handleStatusCommand(parsed.rawArgs, message);
|
|
break;
|
|
case CommandAction.CANCEL:
|
|
await this.handleCancelCommand(parsed.rawArgs, message);
|
|
break;
|
|
case CommandAction.VERBOSE:
|
|
await this.handleVerboseCommand(parsed.rawArgs, message);
|
|
break;
|
|
case CommandAction.QUIET:
|
|
await this.handleQuietCommand(parsed.rawArgs, message);
|
|
break;
|
|
case CommandAction.HELP:
|
|
await this.handleHelpCommand(parsed.rawArgs, message);
|
|
break;
|
|
case CommandAction.RETRY:
|
|
await this.handleRetryCommand(parsed.rawArgs, message);
|
|
break;
|
|
default:
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
`Unknown command. Type \`@mosaic help\` or \`!mosaic help\` for available commands.`
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Disconnect from Matrix
|
|
*/
|
|
disconnect(): Promise<void> {
|
|
this.logger.log("Disconnecting from Matrix...");
|
|
this.connected = false;
|
|
if (this.client) {
|
|
this.client.stop();
|
|
}
|
|
return Promise.resolve();
|
|
}
|
|
|
|
/**
|
|
* Check if the provider is connected
|
|
*/
|
|
isConnected(): boolean {
|
|
return this.connected;
|
|
}
|
|
|
|
/**
|
|
* Get the underlying MatrixClient instance.
|
|
*
|
|
* Used by MatrixStreamingService for low-level operations
|
|
* (message edits, typing indicators) that require direct client access.
|
|
*
|
|
* @returns The MatrixClient instance, or null if not connected
|
|
*/
|
|
getClient(): MatrixClient | null {
|
|
return this.client;
|
|
}
|
|
|
|
/**
|
|
* Send a message to a room
|
|
*/
|
|
async sendMessage(roomId: string, content: string): Promise<void> {
|
|
if (!this.client) {
|
|
throw new Error("Matrix client is not connected");
|
|
}
|
|
|
|
const messageContent: MatrixMessageContent = {
|
|
msgtype: "m.text",
|
|
body: content,
|
|
};
|
|
|
|
await this.client.sendMessage(roomId, messageContent);
|
|
}
|
|
|
|
/**
|
|
* Create a thread for job updates (MSC3440)
|
|
*
|
|
* Matrix threads are created by sending an initial message
|
|
* and then replying with m.thread relation. The initial
|
|
* message event ID becomes the thread root.
|
|
*/
|
|
async createThread(options: ThreadCreateOptions): Promise<string> {
|
|
if (!this.client) {
|
|
throw new Error("Matrix client is not connected");
|
|
}
|
|
|
|
const { channelId, name, message } = options;
|
|
|
|
// Send the initial message that becomes the thread root
|
|
const initialContent: MatrixMessageContent = {
|
|
msgtype: "m.text",
|
|
body: `[${name}] ${message}`,
|
|
};
|
|
|
|
const eventId = await this.client.sendMessage(channelId, initialContent);
|
|
|
|
return eventId;
|
|
}
|
|
|
|
/**
|
|
* Send a message to a thread (MSC3440)
|
|
*
|
|
* Uses m.thread relation to associate the message with the thread root event.
|
|
*/
|
|
async sendThreadMessage(options: ThreadMessageOptions): Promise<void> {
|
|
if (!this.client) {
|
|
throw new Error("Matrix client is not connected");
|
|
}
|
|
|
|
const { threadId, channelId, content } = options;
|
|
|
|
// Use the channelId from options (threads are room-scoped), fall back to control room
|
|
const roomId = channelId || this.controlRoomId;
|
|
|
|
const threadContent: MatrixMessageContent = {
|
|
msgtype: "m.text",
|
|
body: content,
|
|
"m.relates_to": {
|
|
rel_type: "m.thread",
|
|
event_id: threadId,
|
|
is_falling_back: true,
|
|
"m.in_reply_to": {
|
|
event_id: threadId,
|
|
},
|
|
},
|
|
};
|
|
|
|
await this.client.sendMessage(roomId, threadContent);
|
|
}
|
|
|
|
/**
|
|
* Parse a command from a message (IChatProvider interface).
|
|
*
|
|
* Delegates to the shared CommandParserService when available,
|
|
* falling back to built-in parsing for backwards compatibility.
|
|
*/
|
|
parseCommand(message: ChatMessage): ChatCommand | null {
|
|
const { content } = message;
|
|
|
|
// Try shared parser first
|
|
if (this.commandParser) {
|
|
const normalizedContent = content.replace(/^!mosaic/i, "@mosaic");
|
|
const result = this.commandParser.parseCommand(normalizedContent);
|
|
|
|
if (result.success) {
|
|
return {
|
|
command: result.command.action,
|
|
args: result.command.rawArgs,
|
|
message,
|
|
};
|
|
}
|
|
return null;
|
|
}
|
|
|
|
// Fallback: built-in parsing for when CommandParserService is not injected
|
|
const lowerContent = content.toLowerCase();
|
|
if (!lowerContent.includes("@mosaic") && !lowerContent.includes("!mosaic")) {
|
|
return null;
|
|
}
|
|
|
|
const parts = content.trim().split(/\s+/);
|
|
const mosaicIndex = parts.findIndex(
|
|
(part) => part.toLowerCase().includes("@mosaic") || part.toLowerCase().includes("!mosaic")
|
|
);
|
|
|
|
if (mosaicIndex === -1 || mosaicIndex === parts.length - 1) {
|
|
return null;
|
|
}
|
|
|
|
const commandPart = parts[mosaicIndex + 1];
|
|
if (!commandPart) {
|
|
return null;
|
|
}
|
|
|
|
const command = commandPart.toLowerCase();
|
|
const args = parts.slice(mosaicIndex + 2);
|
|
|
|
const validCommands = ["fix", "status", "cancel", "verbose", "quiet", "help"];
|
|
|
|
if (!validCommands.includes(command)) {
|
|
return null;
|
|
}
|
|
|
|
return {
|
|
command,
|
|
args,
|
|
message,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Handle a parsed command (ChatCommand format, used by fallback path)
|
|
*/
|
|
async handleCommand(command: ChatCommand): Promise<void> {
|
|
const { command: cmd, args, message } = command;
|
|
|
|
this.logger.log(
|
|
`Handling command: ${cmd} with args: ${args.join(", ")} from ${message.authorName}`
|
|
);
|
|
|
|
switch (cmd) {
|
|
case "fix":
|
|
await this.handleFixCommand(args, message, this.workspaceId);
|
|
break;
|
|
case "status":
|
|
await this.handleStatusCommand(args, message);
|
|
break;
|
|
case "cancel":
|
|
await this.handleCancelCommand(args, message);
|
|
break;
|
|
case "verbose":
|
|
await this.handleVerboseCommand(args, message);
|
|
break;
|
|
case "quiet":
|
|
await this.handleQuietCommand(args, message);
|
|
break;
|
|
case "help":
|
|
await this.handleHelpCommand(args, message);
|
|
break;
|
|
default:
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
`Unknown command: ${cmd}. Type \`@mosaic help\` or \`!mosaic help\` for available commands.`
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle fix command - Start a job for an issue
|
|
*/
|
|
private async handleFixCommand(
|
|
args: string[],
|
|
message: ChatMessage,
|
|
workspaceId?: string
|
|
): Promise<void> {
|
|
if (args.length === 0 || !args[0]) {
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
"Usage: `@mosaic fix <issue-number>` or `!mosaic fix <issue-number>`"
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Parse issue number: handle both "#42" and "42" formats
|
|
const issueArg = args[0].replace(/^#/, "");
|
|
const issueNumber = parseInt(issueArg, 10);
|
|
|
|
if (isNaN(issueNumber)) {
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
"Invalid issue number. Please provide a numeric issue number."
|
|
);
|
|
return;
|
|
}
|
|
|
|
const targetWorkspaceId = workspaceId ?? this.workspaceId;
|
|
|
|
// Create thread for job updates
|
|
const threadId = await this.createThread({
|
|
channelId: message.channelId,
|
|
name: `Job #${String(issueNumber)}`,
|
|
message: `Starting job for issue #${String(issueNumber)}...`,
|
|
});
|
|
|
|
// Dispatch job to stitcher
|
|
try {
|
|
const result = await this.stitcherService.dispatchJob({
|
|
workspaceId: targetWorkspaceId,
|
|
type: "code-task",
|
|
priority: 10,
|
|
metadata: {
|
|
issueNumber,
|
|
command: "fix",
|
|
channelId: message.channelId,
|
|
threadId: threadId,
|
|
authorId: message.authorId,
|
|
authorName: message.authorName,
|
|
},
|
|
});
|
|
|
|
// Send confirmation to thread
|
|
await this.sendThreadMessage({
|
|
threadId,
|
|
channelId: message.channelId,
|
|
content: `Job created: ${result.jobId}\nStatus: ${result.status}\nQueue: ${result.queueName}`,
|
|
});
|
|
} catch (error: unknown) {
|
|
const errorMessage = error instanceof Error ? error.message : "Unknown error";
|
|
this.logger.error(
|
|
`Failed to dispatch job for issue #${String(issueNumber)}: ${errorMessage}`
|
|
);
|
|
await this.sendThreadMessage({
|
|
threadId,
|
|
channelId: message.channelId,
|
|
content: `Failed to start job: ${errorMessage}`,
|
|
});
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle status command - Get job status
|
|
*/
|
|
private async handleStatusCommand(args: string[], message: ChatMessage): Promise<void> {
|
|
if (args.length === 0 || !args[0]) {
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
"Usage: `@mosaic status <job-id>` or `!mosaic status <job-id>`"
|
|
);
|
|
return;
|
|
}
|
|
|
|
const jobId = args[0];
|
|
|
|
// TODO: Implement job status retrieval from stitcher
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
`Status command not yet implemented for job: ${jobId}`
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Handle cancel command - Cancel a running job
|
|
*/
|
|
private async handleCancelCommand(args: string[], message: ChatMessage): Promise<void> {
|
|
if (args.length === 0 || !args[0]) {
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
"Usage: `@mosaic cancel <job-id>` or `!mosaic cancel <job-id>`"
|
|
);
|
|
return;
|
|
}
|
|
|
|
const jobId = args[0];
|
|
|
|
// TODO: Implement job cancellation in stitcher
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
`Cancel command not yet implemented for job: ${jobId}`
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Handle retry command - Retry a failed job
|
|
*/
|
|
private async handleRetryCommand(args: string[], message: ChatMessage): Promise<void> {
|
|
if (args.length === 0 || !args[0]) {
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
"Usage: `@mosaic retry <job-id>` or `!mosaic retry <job-id>`"
|
|
);
|
|
return;
|
|
}
|
|
|
|
const jobId = args[0];
|
|
|
|
// TODO: Implement job retry in stitcher
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
`Retry command not yet implemented for job: ${jobId}`
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Handle verbose command - Stream full logs to thread
|
|
*/
|
|
private async handleVerboseCommand(args: string[], message: ChatMessage): Promise<void> {
|
|
if (args.length === 0 || !args[0]) {
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
"Usage: `@mosaic verbose <job-id>` or `!mosaic verbose <job-id>`"
|
|
);
|
|
return;
|
|
}
|
|
|
|
const jobId = args[0];
|
|
|
|
// TODO: Implement verbose logging
|
|
await this.sendMessage(message.channelId, `Verbose mode not yet implemented for job: ${jobId}`);
|
|
}
|
|
|
|
/**
|
|
* Handle quiet command - Reduce notifications
|
|
*/
|
|
private async handleQuietCommand(_args: string[], message: ChatMessage): Promise<void> {
|
|
// TODO: Implement quiet mode
|
|
await this.sendMessage(
|
|
message.channelId,
|
|
"Quiet mode not yet implemented. Currently showing milestone updates only."
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Handle help command - Show available commands
|
|
*/
|
|
private async handleHelpCommand(_args: string[], message: ChatMessage): Promise<void> {
|
|
const helpMessage = `
|
|
**Available commands:**
|
|
|
|
\`@mosaic fix <issue>\` or \`!mosaic fix <issue>\` - Start job for issue
|
|
\`@mosaic status <job>\` or \`!mosaic status <job>\` - Get job status
|
|
\`@mosaic cancel <job>\` or \`!mosaic cancel <job>\` - Cancel running job
|
|
\`@mosaic retry <job>\` or \`!mosaic retry <job>\` - Retry failed job
|
|
\`@mosaic verbose <job>\` or \`!mosaic verbose <job>\` - Stream full logs to thread
|
|
\`@mosaic quiet\` or \`!mosaic quiet\` - Reduce notifications
|
|
\`@mosaic help\` or \`!mosaic help\` - Show this help message
|
|
|
|
**Noise Management:**
|
|
- Main room: Low verbosity (milestones only)
|
|
- Job threads: Medium verbosity (step completions)
|
|
- DMs: Configurable per user
|
|
`.trim();
|
|
|
|
await this.sendMessage(message.channelId, helpMessage);
|
|
}
|
|
}
|