Files
stack/apps/gateway/src/queue/queue.service.ts
2026-03-23 01:21:03 +00:00

387 lines
13 KiB
TypeScript

import {
Inject,
Injectable,
Logger,
Optional,
type OnModuleInit,
type OnModuleDestroy,
} from '@nestjs/common';
import { Queue, Worker, type Job, type ConnectionOptions } from 'bullmq';
import type { LogService } from '@mosaic/log';
import { LOG_SERVICE } from '../log/log.tokens.js';
import type { JobDto, JobStatus } from './queue-admin.dto.js';
// ---------------------------------------------------------------------------
// Typed job definitions
// ---------------------------------------------------------------------------
export interface SummarizationJobData {
triggeredBy?: string;
}
export interface GCJobData {
triggeredBy?: string;
}
export interface TierManagementJobData {
triggeredBy?: string;
}
export type MosaicJobData = SummarizationJobData | GCJobData | TierManagementJobData;
// ---------------------------------------------------------------------------
// Queue health status
// ---------------------------------------------------------------------------
export interface QueueHealthStatus {
queues: Record<
string,
{
waiting: number;
active: number;
failed: number;
completed: number;
paused: boolean;
}
>;
healthy: boolean;
}
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
export const QUEUE_SUMMARIZATION = 'mosaic:summarization';
export const QUEUE_GC = 'mosaic:gc';
export const QUEUE_TIER_MANAGEMENT = 'mosaic:tier-management';
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
function getConnection(): ConnectionOptions {
const url = process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
// BullMQ ConnectionOptions accepts a URL string (ioredis-compatible)
return url as unknown as ConnectionOptions;
}
// ---------------------------------------------------------------------------
// Job handler type
// ---------------------------------------------------------------------------
export type JobHandler<T = MosaicJobData> = (job: Job<T>) => Promise<void>;
/** System session ID used for job-event log entries (no real user session). */
const SYSTEM_SESSION_ID = 'system';
// ---------------------------------------------------------------------------
// QueueService
// ---------------------------------------------------------------------------
@Injectable()
export class QueueService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(QueueService.name);
private readonly connection: ConnectionOptions;
private readonly queues = new Map<string, Queue<MosaicJobData>>();
private readonly workers = new Map<string, Worker<MosaicJobData>>();
constructor(
@Optional()
@Inject(LOG_SERVICE)
private readonly logService: LogService | null,
) {
this.connection = getConnection();
}
onModuleInit(): void {
this.logger.log('QueueService initialised (BullMQ)');
}
async onModuleDestroy(): Promise<void> {
await this.closeAll();
}
// -------------------------------------------------------------------------
// Queue helpers
// -------------------------------------------------------------------------
/**
* Get or create a BullMQ Queue for the given queue name.
*/
getQueue<T extends MosaicJobData = MosaicJobData>(name: string): Queue<T> {
let queue = this.queues.get(name) as Queue<T> | undefined;
if (!queue) {
queue = new Queue<T>(name, { connection: this.connection });
this.queues.set(name, queue as unknown as Queue<MosaicJobData>);
}
return queue;
}
/**
* Add a BullMQ repeatable job (cron-style).
* Uses `jobId` as a deterministic key so duplicate registrations are idempotent.
*/
async addRepeatableJob<T extends MosaicJobData>(
queueName: string,
jobName: string,
data: T,
cronExpression: string,
): Promise<void> {
const queue = this.getQueue<T>(queueName);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await (queue as Queue<any>).add(jobName, data, {
repeat: { pattern: cronExpression },
jobId: `${queueName}:${jobName}:repeatable`,
});
this.logger.log(
`Repeatable job "${jobName}" registered on "${queueName}" (cron: ${cronExpression})`,
);
}
/**
* Register a Worker for the given queue name with error handling and
* exponential backoff.
*/
registerWorker<T extends MosaicJobData>(queueName: string, handler: JobHandler<T>): Worker<T> {
const worker = new Worker<T>(
queueName,
async (job) => {
this.logger.debug(`Processing job "${job.name}" (id=${job.id}) on queue "${queueName}"`);
await this.logJobEvent(
queueName,
job.name,
job.id ?? 'unknown',
'started',
job.attemptsMade + 1,
);
await handler(job);
},
{
connection: this.connection,
// Exponential backoff: base 5s, factor 2, max 5 attempts
settings: {
backoffStrategy: (attemptsMade: number) => {
return Math.min(5000 * Math.pow(2, attemptsMade - 1), 60_000);
},
},
},
);
worker.on('completed', (job) => {
this.logger.log(`Job "${job.name}" (id=${job.id}) completed on queue "${queueName}"`);
this.logJobEvent(
queueName,
job.name,
job.id ?? 'unknown',
'completed',
job.attemptsMade,
).catch((err) => this.logger.warn(`Failed to write completed job log: ${String(err)}`));
});
worker.on('failed', (job, err) => {
const errMsg = err instanceof Error ? err.message : String(err);
this.logger.error(
`Job "${job?.name ?? 'unknown'}" (id=${job?.id ?? 'unknown'}) failed on queue "${queueName}": ${errMsg}`,
);
this.logJobEvent(
queueName,
job?.name ?? 'unknown',
job?.id ?? 'unknown',
'failed',
job?.attemptsMade ?? 0,
errMsg,
).catch((e) => this.logger.warn(`Failed to write failed job log: ${String(e)}`));
});
this.workers.set(queueName, worker as unknown as Worker<MosaicJobData>);
return worker;
}
/**
* Return queue health statistics for all managed queues.
*/
async getHealthStatus(): Promise<QueueHealthStatus> {
const queues: QueueHealthStatus['queues'] = {};
let healthy = true;
for (const [name, queue] of this.queues) {
try {
const [waiting, active, failed, completed, paused] = await Promise.all([
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getFailedCount(),
queue.getCompletedCount(),
queue.isPaused(),
]);
queues[name] = { waiting, active, failed, completed, paused };
} catch (err) {
this.logger.error(`Failed to fetch health for queue "${name}": ${err}`);
healthy = false;
queues[name] = { waiting: 0, active: 0, failed: 0, completed: 0, paused: false };
}
}
return { queues, healthy };
}
// -------------------------------------------------------------------------
// Admin API helpers (M6-006)
// -------------------------------------------------------------------------
/**
* List jobs across all managed queues, optionally filtered by status.
* BullMQ jobs are fetched by state type from each queue.
*/
async listJobs(status?: JobStatus): Promise<JobDto[]> {
const jobs: JobDto[] = [];
const states: JobStatus[] = status
? [status]
: ['active', 'completed', 'failed', 'waiting', 'delayed'];
for (const [queueName, queue] of this.queues) {
try {
for (const state of states) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const raw = await (queue as Queue<any>).getJobs([state as any]);
for (const j of raw) {
jobs.push(this.toJobDto(queueName, j, state));
}
}
} catch (err) {
this.logger.warn(`Failed to list jobs for queue "${queueName}": ${String(err)}`);
}
}
return jobs;
}
/**
* Retry a specific failed job by its BullMQ job ID (format: "queueName:id").
* The caller passes "<queueName>__<jobId>" as the composite ID because BullMQ
* job IDs are not globally unique — they are scoped to their queue.
*/
async retryJob(compositeId: string): Promise<{ ok: boolean; message: string }> {
const sep = compositeId.lastIndexOf('__');
if (sep === -1) {
return { ok: false, message: 'Invalid job id format. Expected "<queue>__<jobId>".' };
}
const queueName = compositeId.slice(0, sep);
const jobId = compositeId.slice(sep + 2);
const queue = this.queues.get(queueName);
if (!queue) {
return { ok: false, message: `Queue "${queueName}" not found.` };
}
const job = await queue.getJob(jobId);
if (!job) {
return { ok: false, message: `Job "${jobId}" not found in queue "${queueName}".` };
}
const state = await job.getState();
if (state !== 'failed') {
return { ok: false, message: `Job "${jobId}" is not in failed state (current: ${state}).` };
}
await job.retry('failed');
await this.logJobEvent(queueName, job.name, jobId, 'retried', (job.attemptsMade ?? 0) + 1);
return { ok: true, message: `Job "${jobId}" on queue "${queueName}" queued for retry.` };
}
/**
* Pause a queue by name.
*/
async pauseQueue(name: string): Promise<{ ok: boolean; message: string }> {
const queue = this.queues.get(name);
if (!queue) return { ok: false, message: `Queue "${name}" not found.` };
await queue.pause();
this.logger.log(`Queue paused: ${name}`);
return { ok: true, message: `Queue "${name}" paused.` };
}
/**
* Resume a paused queue by name.
*/
async resumeQueue(name: string): Promise<{ ok: boolean; message: string }> {
const queue = this.queues.get(name);
if (!queue) return { ok: false, message: `Queue "${name}" not found.` };
await queue.resume();
this.logger.log(`Queue resumed: ${name}`);
return { ok: true, message: `Queue "${name}" resumed.` };
}
private toJobDto(queueName: string, job: Job<MosaicJobData>, status: JobStatus): JobDto {
return {
id: `${queueName}__${job.id ?? 'unknown'}`,
name: job.name,
queue: queueName,
status,
attempts: job.attemptsMade,
maxAttempts: job.opts?.attempts ?? 1,
createdAt: job.timestamp ? new Date(job.timestamp).toISOString() : undefined,
processedAt: job.processedOn ? new Date(job.processedOn).toISOString() : undefined,
finishedAt: job.finishedOn ? new Date(job.finishedOn).toISOString() : undefined,
failedReason: job.failedReason,
data: (job.data as Record<string, unknown>) ?? {},
};
}
// -------------------------------------------------------------------------
// Job event logging (M6-007)
// -------------------------------------------------------------------------
/** Write a log entry to agent_logs for BullMQ job lifecycle events. */
private async logJobEvent(
queueName: string,
jobName: string,
jobId: string,
event: 'started' | 'completed' | 'retried' | 'failed',
attempts: number,
errorMessage?: string,
): Promise<void> {
if (!this.logService) return;
const level = event === 'failed' ? ('error' as const) : ('info' as const);
const content =
event === 'failed'
? `Job "${jobName}" (${jobId}) on queue "${queueName}" failed: ${errorMessage ?? 'unknown error'}`
: `Job "${jobName}" (${jobId}) on queue "${queueName}" ${event} (attempt ${attempts})`;
try {
await this.logService.logs.ingest({
sessionId: SYSTEM_SESSION_ID,
userId: 'system',
level,
category: 'general',
content,
metadata: {
jobId,
jobName,
queue: queueName,
event,
attempts,
...(errorMessage ? { errorMessage } : {}),
},
});
} catch (err) {
// Log errors must never crash job execution
this.logger.warn(`Failed to write job event log for job ${jobId}: ${String(err)}`);
}
}
// -------------------------------------------------------------------------
// Lifecycle
// -------------------------------------------------------------------------
private async closeAll(): Promise<void> {
const workerCloses = Array.from(this.workers.values()).map((w) =>
w.close().catch((err) => this.logger.error(`Worker close error: ${err}`)),
);
const queueCloses = Array.from(this.queues.values()).map((q) =>
q.close().catch((err) => this.logger.error(`Queue close error: ${err}`)),
);
await Promise.all([...workerCloses, ...queueCloses]);
this.workers.clear();
this.queues.clear();
this.logger.log('QueueService shut down');
}
}