Files
stack/apps/api/src/bridge/matrix/matrix.service.ts
Jason Woltje 8d19ac1f4b
Some checks failed
ci/woodpecker/push/infra Pipeline was successful
ci/woodpecker/push/api Pipeline failed
fix(#377): remediate code review and security findings
- Fix sendThreadMessage room mismatch: use channelId from options instead of hardcoded controlRoomId
- Add .catch() to fire-and-forget handleRoomMessage to prevent silent error swallowing
- Wrap dispatchJob in try-catch for user-visible error reporting in handleFixCommand
- Add MATRIX_BOT_USER_ID validation in connect() to prevent infinite message loops
- Fix streamResponse error masking: wrap finally/catch side-effects in try-catch
- Replace unsafe type assertion with public getClient() in MatrixRoomService
- Add orphaned room warning in provisionRoom on DB failure
- Add provider identity to Herald error logs
- Add channelId to ThreadMessageOptions interface and all callers
- Add missing env var warnings in BridgeModule factory
- Fix JSON injection in setup-bot.sh: use jq for safe JSON construction

Fixes #377

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-15 03:00:53 -06:00

650 lines
19 KiB
TypeScript

import { Injectable, Logger, Optional, Inject } from "@nestjs/common";
import { MatrixClient, SimpleFsStorageProvider, AutojoinRoomsMixin } from "matrix-bot-sdk";
import { StitcherService } from "../../stitcher/stitcher.service";
import { CommandParserService } from "../parser/command-parser.service";
import { CommandAction } from "../parser/command.interface";
import type { ParsedCommand } from "../parser/command.interface";
import { MatrixRoomService } from "./matrix-room.service";
import { sanitizeForLogging } from "../../common/utils";
import type {
IChatProvider,
ChatMessage,
ChatCommand,
ThreadCreateOptions,
ThreadMessageOptions,
} from "../interfaces";
/**
* Matrix room message event content
*/
interface MatrixMessageContent {
msgtype: string;
body: string;
"m.relates_to"?: MatrixRelatesTo;
}
/**
* Matrix relationship metadata for threads (MSC3440)
*/
interface MatrixRelatesTo {
rel_type: string;
event_id: string;
is_falling_back?: boolean;
"m.in_reply_to"?: {
event_id: string;
};
}
/**
* Matrix room event structure
*/
interface MatrixRoomEvent {
event_id: string;
sender: string;
origin_server_ts: number;
content: MatrixMessageContent;
}
/**
* Matrix Service - Matrix chat platform integration
*
* Responsibilities:
* - Connect to Matrix via access token
* - Listen for commands in mapped rooms (via MatrixRoomService)
* - Parse commands using shared CommandParserService
* - Forward commands to stitcher
* - Receive status updates from herald
* - Post updates to threads (MSC3440)
*/
@Injectable()
export class MatrixService implements IChatProvider {
private readonly logger = new Logger(MatrixService.name);
private client: MatrixClient | null = null;
private connected = false;
private readonly homeserverUrl: string;
private readonly accessToken: string;
private readonly botUserId: string;
private readonly controlRoomId: string;
private readonly workspaceId: string;
constructor(
private readonly stitcherService: StitcherService,
@Optional()
@Inject(CommandParserService)
private readonly commandParser: CommandParserService | null,
@Optional()
@Inject(MatrixRoomService)
private readonly matrixRoomService: MatrixRoomService | null
) {
this.homeserverUrl = process.env.MATRIX_HOMESERVER_URL ?? "";
this.accessToken = process.env.MATRIX_ACCESS_TOKEN ?? "";
this.botUserId = process.env.MATRIX_BOT_USER_ID ?? "";
this.controlRoomId = process.env.MATRIX_CONTROL_ROOM_ID ?? "";
this.workspaceId = process.env.MATRIX_WORKSPACE_ID ?? "";
}
/**
* Connect to Matrix homeserver
*/
async connect(): Promise<void> {
if (!this.homeserverUrl) {
throw new Error("MATRIX_HOMESERVER_URL is required");
}
if (!this.accessToken) {
throw new Error("MATRIX_ACCESS_TOKEN is required");
}
if (!this.workspaceId) {
throw new Error("MATRIX_WORKSPACE_ID is required");
}
if (!this.botUserId) {
throw new Error("MATRIX_BOT_USER_ID is required");
}
this.logger.log("Connecting to Matrix...");
const storage = new SimpleFsStorageProvider("matrix-bot-storage.json");
this.client = new MatrixClient(this.homeserverUrl, this.accessToken, storage);
// Auto-join rooms when invited
AutojoinRoomsMixin.setupOnClient(this.client);
// Setup event handlers
this.setupEventHandlers();
// Start syncing
await this.client.start();
this.connected = true;
this.logger.log(`Matrix bot connected as ${this.botUserId}`);
}
/**
* Setup event handlers for Matrix client
*/
private setupEventHandlers(): void {
if (!this.client) return;
this.client.on("room.message", (roomId: string, event: MatrixRoomEvent) => {
// Ignore messages from the bot itself
if (event.sender === this.botUserId) return;
// Only handle text messages
if (event.content.msgtype !== "m.text") return;
this.handleRoomMessage(roomId, event).catch((error: unknown) => {
this.logger.error(
`Error handling room message in ${roomId}:`,
error instanceof Error ? error.message : error
);
});
});
this.client.on("room.event", (_roomId: string, event: MatrixRoomEvent | null) => {
// Handle errors emitted as events
if (!event) {
const error = new Error("Received null event from Matrix");
const sanitizedError = sanitizeForLogging(error);
this.logger.error("Matrix client error:", sanitizedError);
}
});
}
/**
* Handle an incoming room message.
*
* Resolves the workspace for the room (via MatrixRoomService or fallback
* to the control room), then delegates to the shared CommandParserService
* for platform-agnostic command parsing and dispatches the result.
*/
private async handleRoomMessage(roomId: string, event: MatrixRoomEvent): Promise<void> {
// Resolve workspace: try MatrixRoomService first, fall back to control room
let resolvedWorkspaceId: string | null = null;
if (this.matrixRoomService) {
resolvedWorkspaceId = await this.matrixRoomService.getWorkspaceForRoom(roomId);
}
// Fallback: if the room is the configured control room, use the env workspace
if (!resolvedWorkspaceId && roomId === this.controlRoomId) {
resolvedWorkspaceId = this.workspaceId;
}
// If room is not mapped to any workspace, ignore the message
if (!resolvedWorkspaceId) {
return;
}
const messageContent = event.content.body;
// Build ChatMessage for interface compatibility
const chatMessage: ChatMessage = {
id: event.event_id,
channelId: roomId,
authorId: event.sender,
authorName: event.sender,
content: messageContent,
timestamp: new Date(event.origin_server_ts),
...(event.content["m.relates_to"]?.rel_type === "m.thread" && {
threadId: event.content["m.relates_to"].event_id,
}),
};
// Use shared CommandParserService if available
if (this.commandParser) {
// Normalize !mosaic to @mosaic for the shared parser
const normalizedContent = messageContent.replace(/^!mosaic/i, "@mosaic");
const result = this.commandParser.parseCommand(normalizedContent);
if (result.success) {
await this.handleParsedCommand(result.command, chatMessage, resolvedWorkspaceId);
} else if (normalizedContent.toLowerCase().startsWith("@mosaic")) {
// The user tried to use a command but it failed to parse -- send help
await this.sendMessage(roomId, result.error.help ?? result.error.message);
}
return;
}
// Fallback: use the built-in parseCommand if CommandParserService not injected
const command = this.parseCommand(chatMessage);
if (command) {
await this.handleCommand(command);
}
}
/**
* Handle a command parsed by the shared CommandParserService.
*
* Routes the ParsedCommand to the appropriate handler, passing
* along workspace context for job dispatch.
*/
private async handleParsedCommand(
parsed: ParsedCommand,
message: ChatMessage,
workspaceId: string
): Promise<void> {
this.logger.log(
`Handling command: ${parsed.action} from ${message.authorName} in workspace ${workspaceId}`
);
switch (parsed.action) {
case CommandAction.FIX:
await this.handleFixCommand(parsed.rawArgs, message, workspaceId);
break;
case CommandAction.STATUS:
await this.handleStatusCommand(parsed.rawArgs, message);
break;
case CommandAction.CANCEL:
await this.handleCancelCommand(parsed.rawArgs, message);
break;
case CommandAction.VERBOSE:
await this.handleVerboseCommand(parsed.rawArgs, message);
break;
case CommandAction.QUIET:
await this.handleQuietCommand(parsed.rawArgs, message);
break;
case CommandAction.HELP:
await this.handleHelpCommand(parsed.rawArgs, message);
break;
case CommandAction.RETRY:
await this.handleRetryCommand(parsed.rawArgs, message);
break;
default:
await this.sendMessage(
message.channelId,
`Unknown command. Type \`@mosaic help\` or \`!mosaic help\` for available commands.`
);
}
}
/**
* Disconnect from Matrix
*/
disconnect(): Promise<void> {
this.logger.log("Disconnecting from Matrix...");
this.connected = false;
if (this.client) {
this.client.stop();
}
return Promise.resolve();
}
/**
* Check if the provider is connected
*/
isConnected(): boolean {
return this.connected;
}
/**
* Get the underlying MatrixClient instance.
*
* Used by MatrixStreamingService for low-level operations
* (message edits, typing indicators) that require direct client access.
*
* @returns The MatrixClient instance, or null if not connected
*/
getClient(): MatrixClient | null {
return this.client;
}
/**
* Send a message to a room
*/
async sendMessage(roomId: string, content: string): Promise<void> {
if (!this.client) {
throw new Error("Matrix client is not connected");
}
const messageContent: MatrixMessageContent = {
msgtype: "m.text",
body: content,
};
await this.client.sendMessage(roomId, messageContent);
}
/**
* Create a thread for job updates (MSC3440)
*
* Matrix threads are created by sending an initial message
* and then replying with m.thread relation. The initial
* message event ID becomes the thread root.
*/
async createThread(options: ThreadCreateOptions): Promise<string> {
if (!this.client) {
throw new Error("Matrix client is not connected");
}
const { channelId, name, message } = options;
// Send the initial message that becomes the thread root
const initialContent: MatrixMessageContent = {
msgtype: "m.text",
body: `[${name}] ${message}`,
};
const eventId = await this.client.sendMessage(channelId, initialContent);
return eventId;
}
/**
* Send a message to a thread (MSC3440)
*
* Uses m.thread relation to associate the message with the thread root event.
*/
async sendThreadMessage(options: ThreadMessageOptions): Promise<void> {
if (!this.client) {
throw new Error("Matrix client is not connected");
}
const { threadId, channelId, content } = options;
// Use the channelId from options (threads are room-scoped), fall back to control room
const roomId = channelId || this.controlRoomId;
const threadContent: MatrixMessageContent = {
msgtype: "m.text",
body: content,
"m.relates_to": {
rel_type: "m.thread",
event_id: threadId,
is_falling_back: true,
"m.in_reply_to": {
event_id: threadId,
},
},
};
await this.client.sendMessage(roomId, threadContent);
}
/**
* Parse a command from a message (IChatProvider interface).
*
* Delegates to the shared CommandParserService when available,
* falling back to built-in parsing for backwards compatibility.
*/
parseCommand(message: ChatMessage): ChatCommand | null {
const { content } = message;
// Try shared parser first
if (this.commandParser) {
const normalizedContent = content.replace(/^!mosaic/i, "@mosaic");
const result = this.commandParser.parseCommand(normalizedContent);
if (result.success) {
return {
command: result.command.action,
args: result.command.rawArgs,
message,
};
}
return null;
}
// Fallback: built-in parsing for when CommandParserService is not injected
const lowerContent = content.toLowerCase();
if (!lowerContent.includes("@mosaic") && !lowerContent.includes("!mosaic")) {
return null;
}
const parts = content.trim().split(/\s+/);
const mosaicIndex = parts.findIndex(
(part) => part.toLowerCase().includes("@mosaic") || part.toLowerCase().includes("!mosaic")
);
if (mosaicIndex === -1 || mosaicIndex === parts.length - 1) {
return null;
}
const commandPart = parts[mosaicIndex + 1];
if (!commandPart) {
return null;
}
const command = commandPart.toLowerCase();
const args = parts.slice(mosaicIndex + 2);
const validCommands = ["fix", "status", "cancel", "verbose", "quiet", "help"];
if (!validCommands.includes(command)) {
return null;
}
return {
command,
args,
message,
};
}
/**
* Handle a parsed command (ChatCommand format, used by fallback path)
*/
async handleCommand(command: ChatCommand): Promise<void> {
const { command: cmd, args, message } = command;
this.logger.log(
`Handling command: ${cmd} with args: ${args.join(", ")} from ${message.authorName}`
);
switch (cmd) {
case "fix":
await this.handleFixCommand(args, message, this.workspaceId);
break;
case "status":
await this.handleStatusCommand(args, message);
break;
case "cancel":
await this.handleCancelCommand(args, message);
break;
case "verbose":
await this.handleVerboseCommand(args, message);
break;
case "quiet":
await this.handleQuietCommand(args, message);
break;
case "help":
await this.handleHelpCommand(args, message);
break;
default:
await this.sendMessage(
message.channelId,
`Unknown command: ${cmd}. Type \`@mosaic help\` or \`!mosaic help\` for available commands.`
);
}
}
/**
* Handle fix command - Start a job for an issue
*/
private async handleFixCommand(
args: string[],
message: ChatMessage,
workspaceId?: string
): Promise<void> {
if (args.length === 0 || !args[0]) {
await this.sendMessage(
message.channelId,
"Usage: `@mosaic fix <issue-number>` or `!mosaic fix <issue-number>`"
);
return;
}
// Parse issue number: handle both "#42" and "42" formats
const issueArg = args[0].replace(/^#/, "");
const issueNumber = parseInt(issueArg, 10);
if (isNaN(issueNumber)) {
await this.sendMessage(
message.channelId,
"Invalid issue number. Please provide a numeric issue number."
);
return;
}
const targetWorkspaceId = workspaceId ?? this.workspaceId;
// Create thread for job updates
const threadId = await this.createThread({
channelId: message.channelId,
name: `Job #${String(issueNumber)}`,
message: `Starting job for issue #${String(issueNumber)}...`,
});
// Dispatch job to stitcher
try {
const result = await this.stitcherService.dispatchJob({
workspaceId: targetWorkspaceId,
type: "code-task",
priority: 10,
metadata: {
issueNumber,
command: "fix",
channelId: message.channelId,
threadId: threadId,
authorId: message.authorId,
authorName: message.authorName,
},
});
// Send confirmation to thread
await this.sendThreadMessage({
threadId,
channelId: message.channelId,
content: `Job created: ${result.jobId}\nStatus: ${result.status}\nQueue: ${result.queueName}`,
});
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : "Unknown error";
this.logger.error(
`Failed to dispatch job for issue #${String(issueNumber)}: ${errorMessage}`
);
await this.sendThreadMessage({
threadId,
channelId: message.channelId,
content: `Failed to start job: ${errorMessage}`,
});
}
}
/**
* Handle status command - Get job status
*/
private async handleStatusCommand(args: string[], message: ChatMessage): Promise<void> {
if (args.length === 0 || !args[0]) {
await this.sendMessage(
message.channelId,
"Usage: `@mosaic status <job-id>` or `!mosaic status <job-id>`"
);
return;
}
const jobId = args[0];
// TODO: Implement job status retrieval from stitcher
await this.sendMessage(
message.channelId,
`Status command not yet implemented for job: ${jobId}`
);
}
/**
* Handle cancel command - Cancel a running job
*/
private async handleCancelCommand(args: string[], message: ChatMessage): Promise<void> {
if (args.length === 0 || !args[0]) {
await this.sendMessage(
message.channelId,
"Usage: `@mosaic cancel <job-id>` or `!mosaic cancel <job-id>`"
);
return;
}
const jobId = args[0];
// TODO: Implement job cancellation in stitcher
await this.sendMessage(
message.channelId,
`Cancel command not yet implemented for job: ${jobId}`
);
}
/**
* Handle retry command - Retry a failed job
*/
private async handleRetryCommand(args: string[], message: ChatMessage): Promise<void> {
if (args.length === 0 || !args[0]) {
await this.sendMessage(
message.channelId,
"Usage: `@mosaic retry <job-id>` or `!mosaic retry <job-id>`"
);
return;
}
const jobId = args[0];
// TODO: Implement job retry in stitcher
await this.sendMessage(
message.channelId,
`Retry command not yet implemented for job: ${jobId}`
);
}
/**
* Handle verbose command - Stream full logs to thread
*/
private async handleVerboseCommand(args: string[], message: ChatMessage): Promise<void> {
if (args.length === 0 || !args[0]) {
await this.sendMessage(
message.channelId,
"Usage: `@mosaic verbose <job-id>` or `!mosaic verbose <job-id>`"
);
return;
}
const jobId = args[0];
// TODO: Implement verbose logging
await this.sendMessage(message.channelId, `Verbose mode not yet implemented for job: ${jobId}`);
}
/**
* Handle quiet command - Reduce notifications
*/
private async handleQuietCommand(_args: string[], message: ChatMessage): Promise<void> {
// TODO: Implement quiet mode
await this.sendMessage(
message.channelId,
"Quiet mode not yet implemented. Currently showing milestone updates only."
);
}
/**
* Handle help command - Show available commands
*/
private async handleHelpCommand(_args: string[], message: ChatMessage): Promise<void> {
const helpMessage = `
**Available commands:**
\`@mosaic fix <issue>\` or \`!mosaic fix <issue>\` - Start job for issue
\`@mosaic status <job>\` or \`!mosaic status <job>\` - Get job status
\`@mosaic cancel <job>\` or \`!mosaic cancel <job>\` - Cancel running job
\`@mosaic retry <job>\` or \`!mosaic retry <job>\` - Retry failed job
\`@mosaic verbose <job>\` or \`!mosaic verbose <job>\` - Stream full logs to thread
\`@mosaic quiet\` or \`!mosaic quiet\` - Reduce notifications
\`@mosaic help\` or \`!mosaic help\` - Show this help message
**Noise Management:**
- Main room: Low verbosity (milestones only)
- Job threads: Medium verbosity (step completions)
- DMs: Configurable per user
`.trim();
await this.sendMessage(message.channelId, helpMessage);
}
}