- Updated all package.json name fields and dependency references - Updated all TypeScript/JavaScript imports - Updated .woodpecker/publish.yml filters and registry paths - Updated tools/install.sh scope default - Updated .npmrc registry paths (worktree + host) - Enhanced update-checker.ts with checkForAllUpdates() multi-package support - Updated CLI update command to show table of all packages - Added KNOWN_PACKAGES, formatAllPackagesTable, getInstallAllCommand - Marked checkForUpdate() with @deprecated JSDoc Closes #391
413 lines
14 KiB
TypeScript
413 lines
14 KiB
TypeScript
import {
|
|
Inject,
|
|
Injectable,
|
|
Logger,
|
|
Optional,
|
|
type OnModuleInit,
|
|
type OnModuleDestroy,
|
|
} from '@nestjs/common';
|
|
import { Queue, Worker, type Job, type ConnectionOptions } from 'bullmq';
|
|
import type { LogService } from '@mosaicstack/log';
|
|
import { LOG_SERVICE } from '../log/log.tokens.js';
|
|
import type { JobDto, JobStatus } from './queue-admin.dto.js';
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Typed job definitions
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export interface SummarizationJobData {
|
|
triggeredBy?: string;
|
|
}
|
|
|
|
export interface GCJobData {
|
|
triggeredBy?: string;
|
|
}
|
|
|
|
export interface TierManagementJobData {
|
|
triggeredBy?: string;
|
|
}
|
|
|
|
export type MosaicJobData = SummarizationJobData | GCJobData | TierManagementJobData;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Queue health status
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export interface QueueHealthStatus {
|
|
queues: Record<
|
|
string,
|
|
{
|
|
waiting: number;
|
|
active: number;
|
|
failed: number;
|
|
completed: number;
|
|
paused: boolean;
|
|
}
|
|
>;
|
|
healthy: boolean;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Constants
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export const QUEUE_SUMMARIZATION = 'mosaic-summarization';
|
|
export const QUEUE_GC = 'mosaic-gc';
|
|
export const QUEUE_TIER_MANAGEMENT = 'mosaic-tier-management';
|
|
|
|
const DEFAULT_VALKEY_URL = 'redis://localhost:6380';
|
|
|
|
/**
|
|
* Parse a Redis URL string into a BullMQ-compatible ConnectionOptions object.
|
|
*
|
|
* BullMQ v5 does `Object.assign({ port: 6379, host: '127.0.0.1' }, opts)` in
|
|
* its RedisConnection constructor. If opts is a URL string, Object.assign only
|
|
* copies character-index properties and the defaults survive — so 6379 wins.
|
|
* We must parse the URL ourselves and return a plain RedisOptions object.
|
|
*/
|
|
function getConnection(): ConnectionOptions {
|
|
const url = process.env['VALKEY_URL'] ?? DEFAULT_VALKEY_URL;
|
|
try {
|
|
const parsed = new URL(url);
|
|
const opts: ConnectionOptions = {
|
|
host: parsed.hostname || '127.0.0.1',
|
|
port: parsed.port ? parseInt(parsed.port, 10) : 6380,
|
|
};
|
|
if (parsed.password) {
|
|
(opts as Record<string, unknown>)['password'] = decodeURIComponent(parsed.password);
|
|
}
|
|
if (parsed.pathname && parsed.pathname.length > 1) {
|
|
const db = parseInt(parsed.pathname.slice(1), 10);
|
|
if (!isNaN(db)) {
|
|
(opts as Record<string, unknown>)['db'] = db;
|
|
}
|
|
}
|
|
return opts;
|
|
} catch {
|
|
// Fallback: hope the value is already a host string ioredis understands
|
|
return { host: '127.0.0.1', port: 6380 } as ConnectionOptions;
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Job handler type
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export type JobHandler<T = MosaicJobData> = (job: Job<T>) => Promise<void>;
|
|
|
|
/** System session ID used for job-event log entries (no real user session). */
|
|
const SYSTEM_SESSION_ID = 'system';
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// QueueService
|
|
// ---------------------------------------------------------------------------
|
|
|
|
@Injectable()
|
|
export class QueueService implements OnModuleInit, OnModuleDestroy {
|
|
private readonly logger = new Logger(QueueService.name);
|
|
private readonly connection: ConnectionOptions;
|
|
private readonly queues = new Map<string, Queue<MosaicJobData>>();
|
|
private readonly workers = new Map<string, Worker<MosaicJobData>>();
|
|
|
|
constructor(
|
|
@Optional()
|
|
@Inject(LOG_SERVICE)
|
|
private readonly logService: LogService | null,
|
|
) {
|
|
this.connection = getConnection();
|
|
}
|
|
|
|
onModuleInit(): void {
|
|
this.logger.log('QueueService initialised (BullMQ)');
|
|
}
|
|
|
|
async onModuleDestroy(): Promise<void> {
|
|
await this.closeAll();
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Queue helpers
|
|
// -------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Get or create a BullMQ Queue for the given queue name.
|
|
*/
|
|
getQueue<T extends MosaicJobData = MosaicJobData>(name: string): Queue<T> {
|
|
let queue = this.queues.get(name) as Queue<T> | undefined;
|
|
if (!queue) {
|
|
queue = new Queue<T>(name, { connection: this.connection });
|
|
this.queues.set(name, queue as unknown as Queue<MosaicJobData>);
|
|
}
|
|
return queue;
|
|
}
|
|
|
|
/**
|
|
* Add a BullMQ repeatable job (cron-style).
|
|
* Uses `jobId` as a deterministic key so duplicate registrations are idempotent.
|
|
*/
|
|
async addRepeatableJob<T extends MosaicJobData>(
|
|
queueName: string,
|
|
jobName: string,
|
|
data: T,
|
|
cronExpression: string,
|
|
): Promise<void> {
|
|
const queue = this.getQueue<T>(queueName);
|
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
await (queue as Queue<any>).add(jobName, data, {
|
|
repeat: { pattern: cronExpression },
|
|
jobId: `${queueName}:${jobName}:repeatable`,
|
|
});
|
|
this.logger.log(
|
|
`Repeatable job "${jobName}" registered on "${queueName}" (cron: ${cronExpression})`,
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Register a Worker for the given queue name with error handling and
|
|
* exponential backoff.
|
|
*/
|
|
registerWorker<T extends MosaicJobData>(queueName: string, handler: JobHandler<T>): Worker<T> {
|
|
const worker = new Worker<T>(
|
|
queueName,
|
|
async (job) => {
|
|
this.logger.debug(`Processing job "${job.name}" (id=${job.id}) on queue "${queueName}"`);
|
|
await this.logJobEvent(
|
|
queueName,
|
|
job.name,
|
|
job.id ?? 'unknown',
|
|
'started',
|
|
job.attemptsMade + 1,
|
|
);
|
|
await handler(job);
|
|
},
|
|
{
|
|
connection: this.connection,
|
|
// Exponential backoff: base 5s, factor 2, max 5 attempts
|
|
settings: {
|
|
backoffStrategy: (attemptsMade: number) => {
|
|
return Math.min(5000 * Math.pow(2, attemptsMade - 1), 60_000);
|
|
},
|
|
},
|
|
},
|
|
);
|
|
|
|
worker.on('completed', (job) => {
|
|
this.logger.log(`Job "${job.name}" (id=${job.id}) completed on queue "${queueName}"`);
|
|
this.logJobEvent(
|
|
queueName,
|
|
job.name,
|
|
job.id ?? 'unknown',
|
|
'completed',
|
|
job.attemptsMade,
|
|
).catch((err) => this.logger.warn(`Failed to write completed job log: ${String(err)}`));
|
|
});
|
|
|
|
worker.on('failed', (job, err) => {
|
|
const errMsg = err instanceof Error ? err.message : String(err);
|
|
this.logger.error(
|
|
`Job "${job?.name ?? 'unknown'}" (id=${job?.id ?? 'unknown'}) failed on queue "${queueName}": ${errMsg}`,
|
|
);
|
|
this.logJobEvent(
|
|
queueName,
|
|
job?.name ?? 'unknown',
|
|
job?.id ?? 'unknown',
|
|
'failed',
|
|
job?.attemptsMade ?? 0,
|
|
errMsg,
|
|
).catch((e) => this.logger.warn(`Failed to write failed job log: ${String(e)}`));
|
|
});
|
|
|
|
this.workers.set(queueName, worker as unknown as Worker<MosaicJobData>);
|
|
return worker;
|
|
}
|
|
|
|
/**
|
|
* Return queue health statistics for all managed queues.
|
|
*/
|
|
async getHealthStatus(): Promise<QueueHealthStatus> {
|
|
const queues: QueueHealthStatus['queues'] = {};
|
|
let healthy = true;
|
|
|
|
for (const [name, queue] of this.queues) {
|
|
try {
|
|
const [waiting, active, failed, completed, paused] = await Promise.all([
|
|
queue.getWaitingCount(),
|
|
queue.getActiveCount(),
|
|
queue.getFailedCount(),
|
|
queue.getCompletedCount(),
|
|
queue.isPaused(),
|
|
]);
|
|
queues[name] = { waiting, active, failed, completed, paused };
|
|
} catch (err) {
|
|
this.logger.error(`Failed to fetch health for queue "${name}": ${err}`);
|
|
healthy = false;
|
|
queues[name] = { waiting: 0, active: 0, failed: 0, completed: 0, paused: false };
|
|
}
|
|
}
|
|
|
|
return { queues, healthy };
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Admin API helpers (M6-006)
|
|
// -------------------------------------------------------------------------
|
|
|
|
/**
|
|
* List jobs across all managed queues, optionally filtered by status.
|
|
* BullMQ jobs are fetched by state type from each queue.
|
|
*/
|
|
async listJobs(status?: JobStatus): Promise<JobDto[]> {
|
|
const jobs: JobDto[] = [];
|
|
const states: JobStatus[] = status
|
|
? [status]
|
|
: ['active', 'completed', 'failed', 'waiting', 'delayed'];
|
|
|
|
for (const [queueName, queue] of this.queues) {
|
|
try {
|
|
for (const state of states) {
|
|
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
|
const raw = await (queue as Queue<any>).getJobs([state as any]);
|
|
for (const j of raw) {
|
|
jobs.push(this.toJobDto(queueName, j, state));
|
|
}
|
|
}
|
|
} catch (err) {
|
|
this.logger.warn(`Failed to list jobs for queue "${queueName}": ${String(err)}`);
|
|
}
|
|
}
|
|
|
|
return jobs;
|
|
}
|
|
|
|
/**
|
|
* Retry a specific failed job by its BullMQ job ID (format: "queueName:id").
|
|
* The caller passes "<queueName>__<jobId>" as the composite ID because BullMQ
|
|
* job IDs are not globally unique — they are scoped to their queue.
|
|
*/
|
|
async retryJob(compositeId: string): Promise<{ ok: boolean; message: string }> {
|
|
const sep = compositeId.lastIndexOf('__');
|
|
if (sep === -1) {
|
|
return { ok: false, message: 'Invalid job id format. Expected "<queue>__<jobId>".' };
|
|
}
|
|
const queueName = compositeId.slice(0, sep);
|
|
const jobId = compositeId.slice(sep + 2);
|
|
|
|
const queue = this.queues.get(queueName);
|
|
if (!queue) {
|
|
return { ok: false, message: `Queue "${queueName}" not found.` };
|
|
}
|
|
|
|
const job = await queue.getJob(jobId);
|
|
if (!job) {
|
|
return { ok: false, message: `Job "${jobId}" not found in queue "${queueName}".` };
|
|
}
|
|
|
|
const state = await job.getState();
|
|
if (state !== 'failed') {
|
|
return { ok: false, message: `Job "${jobId}" is not in failed state (current: ${state}).` };
|
|
}
|
|
|
|
await job.retry('failed');
|
|
await this.logJobEvent(queueName, job.name, jobId, 'retried', (job.attemptsMade ?? 0) + 1);
|
|
return { ok: true, message: `Job "${jobId}" on queue "${queueName}" queued for retry.` };
|
|
}
|
|
|
|
/**
|
|
* Pause a queue by name.
|
|
*/
|
|
async pauseQueue(name: string): Promise<{ ok: boolean; message: string }> {
|
|
const queue = this.queues.get(name);
|
|
if (!queue) return { ok: false, message: `Queue "${name}" not found.` };
|
|
await queue.pause();
|
|
this.logger.log(`Queue paused: ${name}`);
|
|
return { ok: true, message: `Queue "${name}" paused.` };
|
|
}
|
|
|
|
/**
|
|
* Resume a paused queue by name.
|
|
*/
|
|
async resumeQueue(name: string): Promise<{ ok: boolean; message: string }> {
|
|
const queue = this.queues.get(name);
|
|
if (!queue) return { ok: false, message: `Queue "${name}" not found.` };
|
|
await queue.resume();
|
|
this.logger.log(`Queue resumed: ${name}`);
|
|
return { ok: true, message: `Queue "${name}" resumed.` };
|
|
}
|
|
|
|
private toJobDto(queueName: string, job: Job<MosaicJobData>, status: JobStatus): JobDto {
|
|
return {
|
|
id: `${queueName}__${job.id ?? 'unknown'}`,
|
|
name: job.name,
|
|
queue: queueName,
|
|
status,
|
|
attempts: job.attemptsMade,
|
|
maxAttempts: job.opts?.attempts ?? 1,
|
|
createdAt: job.timestamp ? new Date(job.timestamp).toISOString() : undefined,
|
|
processedAt: job.processedOn ? new Date(job.processedOn).toISOString() : undefined,
|
|
finishedAt: job.finishedOn ? new Date(job.finishedOn).toISOString() : undefined,
|
|
failedReason: job.failedReason,
|
|
data: (job.data as Record<string, unknown>) ?? {},
|
|
};
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Job event logging (M6-007)
|
|
// -------------------------------------------------------------------------
|
|
|
|
/** Write a log entry to agent_logs for BullMQ job lifecycle events. */
|
|
private async logJobEvent(
|
|
queueName: string,
|
|
jobName: string,
|
|
jobId: string,
|
|
event: 'started' | 'completed' | 'retried' | 'failed',
|
|
attempts: number,
|
|
errorMessage?: string,
|
|
): Promise<void> {
|
|
if (!this.logService) return;
|
|
|
|
const level = event === 'failed' ? ('error' as const) : ('info' as const);
|
|
const content =
|
|
event === 'failed'
|
|
? `Job "${jobName}" (${jobId}) on queue "${queueName}" failed: ${errorMessage ?? 'unknown error'}`
|
|
: `Job "${jobName}" (${jobId}) on queue "${queueName}" ${event} (attempt ${attempts})`;
|
|
|
|
try {
|
|
await this.logService.logs.ingest({
|
|
sessionId: SYSTEM_SESSION_ID,
|
|
userId: 'system',
|
|
level,
|
|
category: 'general',
|
|
content,
|
|
metadata: {
|
|
jobId,
|
|
jobName,
|
|
queue: queueName,
|
|
event,
|
|
attempts,
|
|
...(errorMessage ? { errorMessage } : {}),
|
|
},
|
|
});
|
|
} catch (err) {
|
|
// Log errors must never crash job execution
|
|
this.logger.warn(`Failed to write job event log for job ${jobId}: ${String(err)}`);
|
|
}
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Lifecycle
|
|
// -------------------------------------------------------------------------
|
|
|
|
private async closeAll(): Promise<void> {
|
|
const workerCloses = Array.from(this.workers.values()).map((w) =>
|
|
w.close().catch((err) => this.logger.error(`Worker close error: ${err}`)),
|
|
);
|
|
const queueCloses = Array.from(this.queues.values()).map((q) =>
|
|
q.close().catch((err) => this.logger.error(`Queue close error: ${err}`)),
|
|
);
|
|
await Promise.all([...workerCloses, ...queueCloses]);
|
|
this.workers.clear();
|
|
this.queues.clear();
|
|
this.logger.log('QueueService shut down');
|
|
}
|
|
}
|