import type { Command } from 'commander'; import { createLocalAdapter } from './adapters/local.js'; import type { QueueConfig } from './types.js'; /** Resolve adapter type from env; defaults to 'local'. */ function resolveAdapterType(): 'bullmq' | 'local' { const t = process.env['QUEUE_ADAPTER'] ?? 'local'; return t === 'bullmq' ? 'bullmq' : 'local'; } function resolveConfig(): QueueConfig { const type = resolveAdapterType(); if (type === 'bullmq') { return { type: 'bullmq', url: process.env['VALKEY_URL'] }; } return { type: 'local', dataDir: process.env['QUEUE_DATA_DIR'] }; } const BULLMQ_ONLY_MSG = 'not supported by local adapter — use the bullmq tier for this (set QUEUE_ADAPTER=bullmq)'; /** * Register queue subcommands on an existing Commander program. * Follows the same pattern as registerQualityRails in @mosaicstack/quality-rails. */ export function registerQueueCommand(parent: Command): void { buildQueueCommand(parent.command('queue').description('Manage Mosaic job queues')); } function buildQueueCommand(queue: Command): void { // ─── list ────────────────────────────────────────────────────────────── queue .command('list') .description('List all queues known to the configured adapter') .action(async () => { const config = resolveConfig(); if (config.type === 'local') { const adapter = createLocalAdapter(config); // Local adapter tracks queues in its internal Map; we expose them by // listing JSON files in the data dir. const { readdirSync } = await import('node:fs'); const { existsSync } = await import('node:fs'); const dataDir = config.dataDir ?? '.mosaic/queue'; if (!existsSync(dataDir)) { console.log('No queues found (data dir does not exist yet).'); await adapter.close(); return; } const files = readdirSync(dataDir).filter((f: string) => f.endsWith('.json')); if (files.length === 0) { console.log('No queues found.'); } else { console.log('Queues (local adapter):'); for (const f of files) { console.log(` - ${f.slice(0, -5)}`); } } await adapter.close(); return; } // bullmq — not enough info to enumerate queues without a BullMQ Board console.log(BULLMQ_ONLY_MSG); process.exit(0); }); // ─── stats ───────────────────────────────────────────────────────────── queue .command('stats [name]') .description('Show stats for a queue (or all queues)') .action(async (name?: string) => { const config = resolveConfig(); if (config.type === 'local') { const adapter = createLocalAdapter(config); const { readdirSync } = await import('node:fs'); const { existsSync } = await import('node:fs'); const dataDir = config.dataDir ?? '.mosaic/queue'; let names: string[] = []; if (name) { names = [name]; } else { if (existsSync(dataDir)) { names = readdirSync(dataDir) .filter((f: string) => f.endsWith('.json')) .map((f: string) => f.slice(0, -5)); } } if (names.length === 0) { console.log('No queues found.'); await adapter.close(); return; } for (const queueName of names) { const len = await adapter.length(queueName); console.log(`Queue: ${queueName}`); console.log(` waiting: ${len}`); console.log(` active: 0 (local adapter — no active tracking)`); console.log(` completed: 0 (local adapter — no completed tracking)`); console.log(` failed: 0 (local adapter — no failed tracking)`); console.log(` delayed: 0 (local adapter — no delayed tracking)`); } await adapter.close(); return; } // bullmq console.log(BULLMQ_ONLY_MSG); process.exit(0); }); // ─── pause ───────────────────────────────────────────────────────────── queue .command('pause ') .description('Pause job processing for a queue') .action(async (_name: string) => { const config = resolveConfig(); if (config.type === 'local') { console.log(BULLMQ_ONLY_MSG); process.exit(0); return; } console.log(BULLMQ_ONLY_MSG); process.exit(0); }); // ─── resume ──────────────────────────────────────────────────────────── queue .command('resume ') .description('Resume job processing for a queue') .action(async (_name: string) => { const config = resolveConfig(); if (config.type === 'local') { console.log(BULLMQ_ONLY_MSG); process.exit(0); return; } console.log(BULLMQ_ONLY_MSG); process.exit(0); }); // ─── jobs tail ───────────────────────────────────────────────────────── const jobs = queue.command('jobs').description('Job-level operations'); jobs .command('tail [name]') .description('Stream new jobs as they arrive (poll-based)') .option('--interval ', 'Poll interval in ms', '2000') .action(async (name: string | undefined, opts: { interval: string }) => { const config = resolveConfig(); const pollMs = parseInt(opts.interval, 10); if (config.type === 'local') { const adapter = createLocalAdapter(config); const { existsSync, readdirSync } = await import('node:fs'); const dataDir = config.dataDir ?? '.mosaic/queue'; let names: string[] = []; if (name) { names = [name]; } else { if (existsSync(dataDir)) { names = readdirSync(dataDir) .filter((f: string) => f.endsWith('.json')) .map((f: string) => f.slice(0, -5)); } } if (names.length === 0) { console.log('No queues to tail.'); await adapter.close(); return; } console.log(`Tailing queues: ${names.join(', ')} (Ctrl-C to stop)`); const lastLen = new Map(); for (const qn of names) { lastLen.set(qn, await adapter.length(qn)); } const timer = setInterval(async () => { for (const qn of names) { const len = await adapter.length(qn); const prev = lastLen.get(qn) ?? 0; if (len > prev) { console.log( `[${new Date().toISOString()}] ${qn}: ${len - prev} new job(s) (total: ${len})`, ); } lastLen.set(qn, len); } }, pollMs); process.on('SIGINT', async () => { clearInterval(timer); await adapter.close(); process.exit(0); }); return; } // bullmq — use subscribe on the channel console.log(BULLMQ_ONLY_MSG); process.exit(0); }); // ─── drain ───────────────────────────────────────────────────────────── queue .command('drain ') .description('Drain all pending jobs from a queue') .option('--yes', 'Skip confirmation prompt') .action(async (name: string, opts: { yes?: boolean }) => { if (!opts.yes) { console.error( `WARNING: This will remove all pending jobs from queue "${name}". Re-run with --yes to confirm.`, ); process.exit(1); return; } const config = resolveConfig(); if (config.type === 'local') { const adapter = createLocalAdapter(config); let removed = 0; while ((await adapter.length(name)) > 0) { await adapter.dequeue(name); removed++; } console.log(`Drained ${removed} job(s) from queue "${name}".`); await adapter.close(); return; } console.log(BULLMQ_ONLY_MSG); process.exit(0); }); }