import { Inject, Injectable, Logger, Optional, type OnModuleInit, type OnModuleDestroy, } from '@nestjs/common'; import { Queue, Worker, type Job, type ConnectionOptions } from 'bullmq'; import type { LogService } from '@mosaic/log'; import { LOG_SERVICE } from '../log/log.tokens.js'; import type { JobDto, JobStatus } from './queue-admin.dto.js'; // --------------------------------------------------------------------------- // Typed job definitions // --------------------------------------------------------------------------- export interface SummarizationJobData { triggeredBy?: string; } export interface GCJobData { triggeredBy?: string; } export interface TierManagementJobData { triggeredBy?: string; } export type MosaicJobData = SummarizationJobData | GCJobData | TierManagementJobData; // --------------------------------------------------------------------------- // Queue health status // --------------------------------------------------------------------------- export interface QueueHealthStatus { queues: Record< string, { waiting: number; active: number; failed: number; completed: number; paused: boolean; } >; healthy: boolean; } // --------------------------------------------------------------------------- // Constants // --------------------------------------------------------------------------- export const QUEUE_SUMMARIZATION = 'mosaic-summarization'; export const QUEUE_GC = 'mosaic-gc'; export const QUEUE_TIER_MANAGEMENT = 'mosaic-tier-management'; const DEFAULT_VALKEY_URL = 'redis://localhost:6380'; /** * Parse a Redis URL string into a BullMQ-compatible ConnectionOptions object. * * BullMQ v5 does `Object.assign({ port: 6379, host: '127.0.0.1' }, opts)` in * its RedisConnection constructor. If opts is a URL string, Object.assign only * copies character-index properties and the defaults survive — so 6379 wins. * We must parse the URL ourselves and return a plain RedisOptions object. */ function getConnection(): ConnectionOptions { const url = process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL; try { const parsed = new URL(url); const opts: ConnectionOptions = { host: parsed.hostname || '127.0.0.1', port: parsed.port ? parseInt(parsed.port, 10) : 6380, }; if (parsed.password) { (opts as Record)['password'] = decodeURIComponent(parsed.password); } if (parsed.pathname && parsed.pathname.length > 1) { const db = parseInt(parsed.pathname.slice(1), 10); if (!isNaN(db)) { (opts as Record)['db'] = db; } } return opts; } catch { // Fallback: hope the value is already a host string ioredis understands return { host: '127.0.0.1', port: 6380 } as ConnectionOptions; } } // --------------------------------------------------------------------------- // Job handler type // --------------------------------------------------------------------------- export type JobHandler = (job: Job) => Promise; /** System session ID used for job-event log entries (no real user session). */ const SYSTEM_SESSION_ID = 'system'; // --------------------------------------------------------------------------- // QueueService // --------------------------------------------------------------------------- @Injectable() export class QueueService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(QueueService.name); private readonly connection: ConnectionOptions; private readonly queues = new Map>(); private readonly workers = new Map>(); constructor( @Optional() @Inject(LOG_SERVICE) private readonly logService: LogService | null, ) { this.connection = getConnection(); } onModuleInit(): void { this.logger.log('QueueService initialised (BullMQ)'); } async onModuleDestroy(): Promise { await this.closeAll(); } // ------------------------------------------------------------------------- // Queue helpers // ------------------------------------------------------------------------- /** * Get or create a BullMQ Queue for the given queue name. */ getQueue(name: string): Queue { let queue = this.queues.get(name) as Queue | undefined; if (!queue) { queue = new Queue(name, { connection: this.connection }); this.queues.set(name, queue as unknown as Queue); } return queue; } /** * Add a BullMQ repeatable job (cron-style). * Uses `jobId` as a deterministic key so duplicate registrations are idempotent. */ async addRepeatableJob( queueName: string, jobName: string, data: T, cronExpression: string, ): Promise { const queue = this.getQueue(queueName); // eslint-disable-next-line @typescript-eslint/no-explicit-any await (queue as Queue).add(jobName, data, { repeat: { pattern: cronExpression }, jobId: `${queueName}:${jobName}:repeatable`, }); this.logger.log( `Repeatable job "${jobName}" registered on "${queueName}" (cron: ${cronExpression})`, ); } /** * Register a Worker for the given queue name with error handling and * exponential backoff. */ registerWorker(queueName: string, handler: JobHandler): Worker { const worker = new Worker( queueName, async (job) => { this.logger.debug(`Processing job "${job.name}" (id=${job.id}) on queue "${queueName}"`); await this.logJobEvent( queueName, job.name, job.id ?? 'unknown', 'started', job.attemptsMade + 1, ); await handler(job); }, { connection: this.connection, // Exponential backoff: base 5s, factor 2, max 5 attempts settings: { backoffStrategy: (attemptsMade: number) => { return Math.min(5000 * Math.pow(2, attemptsMade - 1), 60_000); }, }, }, ); worker.on('completed', (job) => { this.logger.log(`Job "${job.name}" (id=${job.id}) completed on queue "${queueName}"`); this.logJobEvent( queueName, job.name, job.id ?? 'unknown', 'completed', job.attemptsMade, ).catch((err) => this.logger.warn(`Failed to write completed job log: ${String(err)}`)); }); worker.on('failed', (job, err) => { const errMsg = err instanceof Error ? err.message : String(err); this.logger.error( `Job "${job?.name ?? 'unknown'}" (id=${job?.id ?? 'unknown'}) failed on queue "${queueName}": ${errMsg}`, ); this.logJobEvent( queueName, job?.name ?? 'unknown', job?.id ?? 'unknown', 'failed', job?.attemptsMade ?? 0, errMsg, ).catch((e) => this.logger.warn(`Failed to write failed job log: ${String(e)}`)); }); this.workers.set(queueName, worker as unknown as Worker); return worker; } /** * Return queue health statistics for all managed queues. */ async getHealthStatus(): Promise { const queues: QueueHealthStatus['queues'] = {}; let healthy = true; for (const [name, queue] of this.queues) { try { const [waiting, active, failed, completed, paused] = await Promise.all([ queue.getWaitingCount(), queue.getActiveCount(), queue.getFailedCount(), queue.getCompletedCount(), queue.isPaused(), ]); queues[name] = { waiting, active, failed, completed, paused }; } catch (err) { this.logger.error(`Failed to fetch health for queue "${name}": ${err}`); healthy = false; queues[name] = { waiting: 0, active: 0, failed: 0, completed: 0, paused: false }; } } return { queues, healthy }; } // ------------------------------------------------------------------------- // Admin API helpers (M6-006) // ------------------------------------------------------------------------- /** * List jobs across all managed queues, optionally filtered by status. * BullMQ jobs are fetched by state type from each queue. */ async listJobs(status?: JobStatus): Promise { const jobs: JobDto[] = []; const states: JobStatus[] = status ? [status] : ['active', 'completed', 'failed', 'waiting', 'delayed']; for (const [queueName, queue] of this.queues) { try { for (const state of states) { // eslint-disable-next-line @typescript-eslint/no-explicit-any const raw = await (queue as Queue).getJobs([state as any]); for (const j of raw) { jobs.push(this.toJobDto(queueName, j, state)); } } } catch (err) { this.logger.warn(`Failed to list jobs for queue "${queueName}": ${String(err)}`); } } return jobs; } /** * Retry a specific failed job by its BullMQ job ID (format: "queueName:id"). * The caller passes "__" as the composite ID because BullMQ * job IDs are not globally unique — they are scoped to their queue. */ async retryJob(compositeId: string): Promise<{ ok: boolean; message: string }> { const sep = compositeId.lastIndexOf('__'); if (sep === -1) { return { ok: false, message: 'Invalid job id format. Expected "__".' }; } const queueName = compositeId.slice(0, sep); const jobId = compositeId.slice(sep + 2); const queue = this.queues.get(queueName); if (!queue) { return { ok: false, message: `Queue "${queueName}" not found.` }; } const job = await queue.getJob(jobId); if (!job) { return { ok: false, message: `Job "${jobId}" not found in queue "${queueName}".` }; } const state = await job.getState(); if (state !== 'failed') { return { ok: false, message: `Job "${jobId}" is not in failed state (current: ${state}).` }; } await job.retry('failed'); await this.logJobEvent(queueName, job.name, jobId, 'retried', (job.attemptsMade ?? 0) + 1); return { ok: true, message: `Job "${jobId}" on queue "${queueName}" queued for retry.` }; } /** * Pause a queue by name. */ async pauseQueue(name: string): Promise<{ ok: boolean; message: string }> { const queue = this.queues.get(name); if (!queue) return { ok: false, message: `Queue "${name}" not found.` }; await queue.pause(); this.logger.log(`Queue paused: ${name}`); return { ok: true, message: `Queue "${name}" paused.` }; } /** * Resume a paused queue by name. */ async resumeQueue(name: string): Promise<{ ok: boolean; message: string }> { const queue = this.queues.get(name); if (!queue) return { ok: false, message: `Queue "${name}" not found.` }; await queue.resume(); this.logger.log(`Queue resumed: ${name}`); return { ok: true, message: `Queue "${name}" resumed.` }; } private toJobDto(queueName: string, job: Job, status: JobStatus): JobDto { return { id: `${queueName}__${job.id ?? 'unknown'}`, name: job.name, queue: queueName, status, attempts: job.attemptsMade, maxAttempts: job.opts?.attempts ?? 1, createdAt: job.timestamp ? new Date(job.timestamp).toISOString() : undefined, processedAt: job.processedOn ? new Date(job.processedOn).toISOString() : undefined, finishedAt: job.finishedOn ? new Date(job.finishedOn).toISOString() : undefined, failedReason: job.failedReason, data: (job.data as Record) ?? {}, }; } // ------------------------------------------------------------------------- // Job event logging (M6-007) // ------------------------------------------------------------------------- /** Write a log entry to agent_logs for BullMQ job lifecycle events. */ private async logJobEvent( queueName: string, jobName: string, jobId: string, event: 'started' | 'completed' | 'retried' | 'failed', attempts: number, errorMessage?: string, ): Promise { if (!this.logService) return; const level = event === 'failed' ? ('error' as const) : ('info' as const); const content = event === 'failed' ? `Job "${jobName}" (${jobId}) on queue "${queueName}" failed: ${errorMessage ?? 'unknown error'}` : `Job "${jobName}" (${jobId}) on queue "${queueName}" ${event} (attempt ${attempts})`; try { await this.logService.logs.ingest({ sessionId: SYSTEM_SESSION_ID, userId: 'system', level, category: 'general', content, metadata: { jobId, jobName, queue: queueName, event, attempts, ...(errorMessage ? { errorMessage } : {}), }, }); } catch (err) { // Log errors must never crash job execution this.logger.warn(`Failed to write job event log for job ${jobId}: ${String(err)}`); } } // ------------------------------------------------------------------------- // Lifecycle // ------------------------------------------------------------------------- private async closeAll(): Promise { const workerCloses = Array.from(this.workers.values()).map((w) => w.close().catch((err) => this.logger.error(`Worker close error: ${err}`)), ); const queueCloses = Array.from(this.queues.values()).map((q) => q.close().catch((err) => this.logger.error(`Queue close error: ${err}`)), ); await Promise.all([...workerCloses, ...queueCloses]); this.workers.clear(); this.queues.clear(); this.logger.log('QueueService shut down'); } }