import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from "@nestjs/common"; import { Queue, QueueOptions } from "bullmq"; import { QUEUE_NAMES, QueueName } from "./queues"; /** * Health status interface for BullMQ */ export interface BullMqHealthStatus { connected: boolean; queues: Record; } /** * BullMqService - Job queue service using BullMQ with Valkey backend * * This service provides job queue operations for the Mosaic Component Architecture: * - Main queue for general purpose jobs * - Runner queue for read-only operations * - Weaver queue for write operations * - Inspector queue for validation operations * * Shares the same Valkey connection used by ValkeyService (VALKEY_URL). */ @Injectable() export class BullMqService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(BullMqService.name); private readonly queues = new Map(); async onModuleInit(): Promise { const valkeyUrl = process.env.VALKEY_URL ?? "redis://localhost:6379"; this.logger.log(`Initializing BullMQ with Valkey at ${valkeyUrl}`); // Parse Redis URL for connection options const connectionOptions = this.parseRedisUrl(valkeyUrl); const queueOptions: QueueOptions = { connection: connectionOptions, defaultJobOptions: { attempts: 3, backoff: { type: "exponential", delay: 1000, }, removeOnComplete: { age: 3600, // Keep completed jobs for 1 hour count: 1000, // Keep last 1000 completed jobs }, removeOnFail: { age: 86400, // Keep failed jobs for 24 hours }, }, }; // Create all queues await this.createQueue(QUEUE_NAMES.MAIN, queueOptions); await this.createQueue(QUEUE_NAMES.RUNNER, queueOptions); await this.createQueue(QUEUE_NAMES.WEAVER, queueOptions); await this.createQueue(QUEUE_NAMES.INSPECTOR, queueOptions); this.logger.log(`BullMQ initialized with ${this.queues.size.toString()} queues`); } async onModuleDestroy(): Promise { this.logger.log("Closing BullMQ queues"); for (const [name, queue] of this.queues.entries()) { await queue.close(); this.logger.log(`Queue closed: ${name}`); } this.queues.clear(); } /** * Create a queue with the given name and options */ private async createQueue(name: QueueName, options: QueueOptions): Promise { const queue = new Queue(name, options); // Wait for queue to be ready await queue.waitUntilReady(); this.queues.set(name, queue); this.logger.log(`Queue created: ${name}`); return queue; } /** * Get a queue by name */ getQueue(name: QueueName): Queue | null { return this.queues.get(name) ?? null; } /** * Get all queues */ getQueues(): Map { return this.queues; } /** * Add a job to a queue */ async addJob( queueName: QueueName, jobName: string, data: unknown, options?: { priority?: number; delay?: number; attempts?: number; } ): Promise> { const queue = this.queues.get(queueName); if (!queue) { throw new Error(`Queue not found: ${queueName}`); } const job = await queue.add(jobName, data, options); this.logger.log(`Job added to ${queueName}: ${jobName} (id: ${job.id ?? "unknown"})`); return job; } /** * Health check - verify all queues are connected */ async healthCheck(): Promise { try { for (const queue of this.queues.values()) { // Check if queue client is connected const client = await queue.client; await client.ping(); } return true; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); this.logger.error("BullMQ health check failed:", errorMessage); return false; } } /** * Get health status with queue counts */ async getHealthStatus(): Promise { const connected = await this.healthCheck(); const queues: Record = {}; for (const [name, queue] of this.queues.entries()) { try { const count = await queue.count(); queues[name] = count; } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); this.logger.error(`Failed to get count for queue ${name}:`, errorMessage); queues[name] = -1; } } return { connected, queues }; } /** * Parse Redis URL into connection options */ private parseRedisUrl(url: string): { host: string; port: number } { try { const parsed = new URL(url); return { host: parsed.hostname, port: parseInt(parsed.port || "6379", 10), }; } catch { this.logger.warn(`Failed to parse Redis URL: ${url}, using defaults`); return { host: "localhost", port: 6379, }; } } }