From 5425f9268e9c246395658db0a54c775d1356712a Mon Sep 17 00:00:00 2001 From: "jason.woltje" Date: Sun, 5 Apr 2026 05:27:59 +0000 Subject: [PATCH] feat(queue): mosaic queue CLI surface (#404) --- packages/mosaic/package.json | 1 + packages/mosaic/src/cli.ts | 5 + packages/queue/package.json | 1 + packages/queue/src/cli.spec.ts | 62 +++++++++ packages/queue/src/cli.ts | 248 +++++++++++++++++++++++++++++++++ packages/queue/src/index.ts | 1 + pnpm-lock.yaml | 6 + 7 files changed, 324 insertions(+) create mode 100644 packages/queue/src/cli.spec.ts create mode 100644 packages/queue/src/cli.ts diff --git a/packages/mosaic/package.json b/packages/mosaic/package.json index f5a3def..f51eda3 100644 --- a/packages/mosaic/package.json +++ b/packages/mosaic/package.json @@ -33,6 +33,7 @@ "@mosaicstack/macp": "workspace:*", "@mosaicstack/prdy": "workspace:*", "@mosaicstack/quality-rails": "workspace:*", + "@mosaicstack/queue": "workspace:*", "@mosaicstack/types": "workspace:*", "@clack/prompts": "^0.9.1", "commander": "^13.0.0", diff --git a/packages/mosaic/src/cli.ts b/packages/mosaic/src/cli.ts index 4b7e0d7..3f79224 100644 --- a/packages/mosaic/src/cli.ts +++ b/packages/mosaic/src/cli.ts @@ -4,6 +4,7 @@ import { createRequire } from 'module'; import { Command } from 'commander'; import { registerBrainCommand } from '@mosaicstack/brain'; import { registerQualityRails } from '@mosaicstack/quality-rails'; +import { registerQueueCommand } from '@mosaicstack/queue'; import { registerAgentCommand } from './commands/agent.js'; import { registerMissionCommand } from './commands/mission.js'; // prdy is registered via launch.ts @@ -342,6 +343,10 @@ registerBrainCommand(program); registerQualityRails(program); +// ─── queue ─────────────────────────────────────────────────────────────── + +registerQueueCommand(program); + // ─── update ───────────────────────────────────────────────────────────── program diff --git a/packages/queue/package.json b/packages/queue/package.json index 2be52cc..2fe762f 100644 --- a/packages/queue/package.json +++ b/packages/queue/package.json @@ -22,6 +22,7 @@ }, "dependencies": { "@mosaicstack/types": "workspace:*", + "commander": "^13.0.0", "ioredis": "^5.10.0" }, "devDependencies": { diff --git a/packages/queue/src/cli.spec.ts b/packages/queue/src/cli.spec.ts new file mode 100644 index 0000000..92db4e9 --- /dev/null +++ b/packages/queue/src/cli.spec.ts @@ -0,0 +1,62 @@ +import { describe, it, expect } from 'vitest'; +import { Command } from 'commander'; +import { registerQueueCommand } from './cli.js'; + +describe('registerQueueCommand', () => { + function buildProgram(): Command { + const program = new Command('mosaic'); + registerQueueCommand(program); + return program; + } + + it('registers a "queue" subcommand', () => { + const program = buildProgram(); + const queueCmd = program.commands.find((c) => c.name() === 'queue'); + expect(queueCmd).toBeDefined(); + }); + + it('queue has list, stats, pause, resume, jobs, drain subcommands', () => { + const program = buildProgram(); + const queueCmd = program.commands.find((c) => c.name() === 'queue'); + expect(queueCmd).toBeDefined(); + + const names = queueCmd!.commands.map((c) => c.name()); + expect(names).toContain('list'); + expect(names).toContain('stats'); + expect(names).toContain('pause'); + expect(names).toContain('resume'); + expect(names).toContain('jobs'); + expect(names).toContain('drain'); + }); + + it('jobs subcommand has a "tail" subcommand', () => { + const program = buildProgram(); + const queueCmd = program.commands.find((c) => c.name() === 'queue'); + const jobsCmd = queueCmd!.commands.find((c) => c.name() === 'jobs'); + expect(jobsCmd).toBeDefined(); + + const tailCmd = jobsCmd!.commands.find((c) => c.name() === 'tail'); + expect(tailCmd).toBeDefined(); + }); + + it('drain has a --yes option', () => { + const program = buildProgram(); + const queueCmd = program.commands.find((c) => c.name() === 'queue'); + const drainCmd = queueCmd!.commands.find((c) => c.name() === 'drain'); + expect(drainCmd).toBeDefined(); + + const optionNames = drainCmd!.options.map((o) => o.long); + expect(optionNames).toContain('--yes'); + }); + + it('stats accepts an optional [name] argument', () => { + const program = buildProgram(); + const queueCmd = program.commands.find((c) => c.name() === 'queue'); + const statsCmd = queueCmd!.commands.find((c) => c.name() === 'stats'); + expect(statsCmd).toBeDefined(); + // Should not throw when called without argument + const args = statsCmd!.registeredArguments; + expect(args.length).toBe(1); + expect(args[0]!.required).toBe(false); + }); +}); diff --git a/packages/queue/src/cli.ts b/packages/queue/src/cli.ts new file mode 100644 index 0000000..983e077 --- /dev/null +++ b/packages/queue/src/cli.ts @@ -0,0 +1,248 @@ +import type { Command } from 'commander'; + +import { createLocalAdapter } from './adapters/local.js'; +import type { QueueConfig } from './types.js'; + +/** Resolve adapter type from env; defaults to 'local'. */ +function resolveAdapterType(): 'bullmq' | 'local' { + const t = process.env['QUEUE_ADAPTER'] ?? 'local'; + return t === 'bullmq' ? 'bullmq' : 'local'; +} + +function resolveConfig(): QueueConfig { + const type = resolveAdapterType(); + if (type === 'bullmq') { + return { type: 'bullmq', url: process.env['VALKEY_URL'] }; + } + return { type: 'local', dataDir: process.env['QUEUE_DATA_DIR'] }; +} + +const BULLMQ_ONLY_MSG = + 'not supported by local adapter — use the bullmq tier for this (set QUEUE_ADAPTER=bullmq)'; + +/** + * Register queue subcommands on an existing Commander program. + * Follows the same pattern as registerQualityRails in @mosaicstack/quality-rails. + */ +export function registerQueueCommand(parent: Command): void { + buildQueueCommand(parent.command('queue').description('Manage Mosaic job queues')); +} + +function buildQueueCommand(queue: Command): void { + // ─── list ────────────────────────────────────────────────────────────── + queue + .command('list') + .description('List all queues known to the configured adapter') + .action(async () => { + const config = resolveConfig(); + + if (config.type === 'local') { + const adapter = createLocalAdapter(config); + // Local adapter tracks queues in its internal Map; we expose them by + // listing JSON files in the data dir. + const { readdirSync } = await import('node:fs'); + const { existsSync } = await import('node:fs'); + const dataDir = config.dataDir ?? '.mosaic/queue'; + if (!existsSync(dataDir)) { + console.log('No queues found (data dir does not exist yet).'); + await adapter.close(); + return; + } + const files = readdirSync(dataDir).filter((f: string) => f.endsWith('.json')); + if (files.length === 0) { + console.log('No queues found.'); + } else { + console.log('Queues (local adapter):'); + for (const f of files) { + console.log(` - ${f.slice(0, -5)}`); + } + } + await adapter.close(); + return; + } + + // bullmq — not enough info to enumerate queues without a BullMQ Board + console.log(BULLMQ_ONLY_MSG); + process.exit(0); + }); + + // ─── stats ───────────────────────────────────────────────────────────── + queue + .command('stats [name]') + .description('Show stats for a queue (or all queues)') + .action(async (name?: string) => { + const config = resolveConfig(); + + if (config.type === 'local') { + const adapter = createLocalAdapter(config); + const { readdirSync } = await import('node:fs'); + const { existsSync } = await import('node:fs'); + const dataDir = config.dataDir ?? '.mosaic/queue'; + + let names: string[] = []; + if (name) { + names = [name]; + } else { + if (existsSync(dataDir)) { + names = readdirSync(dataDir) + .filter((f: string) => f.endsWith('.json')) + .map((f: string) => f.slice(0, -5)); + } + } + + if (names.length === 0) { + console.log('No queues found.'); + await adapter.close(); + return; + } + + for (const queueName of names) { + const len = await adapter.length(queueName); + console.log(`Queue: ${queueName}`); + console.log(` waiting: ${len}`); + console.log(` active: 0 (local adapter — no active tracking)`); + console.log(` completed: 0 (local adapter — no completed tracking)`); + console.log(` failed: 0 (local adapter — no failed tracking)`); + console.log(` delayed: 0 (local adapter — no delayed tracking)`); + } + await adapter.close(); + return; + } + + // bullmq + console.log(BULLMQ_ONLY_MSG); + process.exit(0); + }); + + // ─── pause ───────────────────────────────────────────────────────────── + queue + .command('pause ') + .description('Pause job processing for a queue') + .action(async (_name: string) => { + const config = resolveConfig(); + + if (config.type === 'local') { + console.log(BULLMQ_ONLY_MSG); + process.exit(0); + return; + } + + console.log(BULLMQ_ONLY_MSG); + process.exit(0); + }); + + // ─── resume ──────────────────────────────────────────────────────────── + queue + .command('resume ') + .description('Resume job processing for a queue') + .action(async (_name: string) => { + const config = resolveConfig(); + + if (config.type === 'local') { + console.log(BULLMQ_ONLY_MSG); + process.exit(0); + return; + } + + console.log(BULLMQ_ONLY_MSG); + process.exit(0); + }); + + // ─── jobs tail ───────────────────────────────────────────────────────── + const jobs = queue.command('jobs').description('Job-level operations'); + + jobs + .command('tail [name]') + .description('Stream new jobs as they arrive (poll-based)') + .option('--interval ', 'Poll interval in ms', '2000') + .action(async (name: string | undefined, opts: { interval: string }) => { + const config = resolveConfig(); + const pollMs = parseInt(opts.interval, 10); + + if (config.type === 'local') { + const adapter = createLocalAdapter(config); + const { existsSync, readdirSync } = await import('node:fs'); + const dataDir = config.dataDir ?? '.mosaic/queue'; + + let names: string[] = []; + if (name) { + names = [name]; + } else { + if (existsSync(dataDir)) { + names = readdirSync(dataDir) + .filter((f: string) => f.endsWith('.json')) + .map((f: string) => f.slice(0, -5)); + } + } + + if (names.length === 0) { + console.log('No queues to tail.'); + await adapter.close(); + return; + } + + console.log(`Tailing queues: ${names.join(', ')} (Ctrl-C to stop)`); + const lastLen = new Map(); + for (const qn of names) { + lastLen.set(qn, await adapter.length(qn)); + } + + const timer = setInterval(async () => { + for (const qn of names) { + const len = await adapter.length(qn); + const prev = lastLen.get(qn) ?? 0; + if (len > prev) { + console.log( + `[${new Date().toISOString()}] ${qn}: ${len - prev} new job(s) (total: ${len})`, + ); + } + lastLen.set(qn, len); + } + }, pollMs); + + process.on('SIGINT', async () => { + clearInterval(timer); + await adapter.close(); + process.exit(0); + }); + + return; + } + + // bullmq — use subscribe on the channel + console.log(BULLMQ_ONLY_MSG); + process.exit(0); + }); + + // ─── drain ───────────────────────────────────────────────────────────── + queue + .command('drain ') + .description('Drain all pending jobs from a queue') + .option('--yes', 'Skip confirmation prompt') + .action(async (name: string, opts: { yes?: boolean }) => { + if (!opts.yes) { + console.error( + `WARNING: This will remove all pending jobs from queue "${name}". Re-run with --yes to confirm.`, + ); + process.exit(1); + return; + } + + const config = resolveConfig(); + + if (config.type === 'local') { + const adapter = createLocalAdapter(config); + let removed = 0; + while ((await adapter.length(name)) > 0) { + await adapter.dequeue(name); + removed++; + } + console.log(`Drained ${removed} job(s) from queue "${name}".`); + await adapter.close(); + return; + } + + console.log(BULLMQ_ONLY_MSG); + process.exit(0); + }); +} diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index 0aa4089..98173b9 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -11,6 +11,7 @@ export { type QueueAdapter, type QueueConfig as QueueAdapterConfig } from './typ export { createQueueAdapter, registerQueueAdapter } from './factory.js'; export { createBullMQAdapter } from './adapters/bullmq.js'; export { createLocalAdapter } from './adapters/local.js'; +export { registerQueueCommand } from './cli.js'; import { registerQueueAdapter } from './factory.js'; import { createBullMQAdapter } from './adapters/bullmq.js'; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5fbbfb8..606620d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -475,6 +475,9 @@ importers: '@mosaicstack/quality-rails': specifier: workspace:* version: link:../quality-rails + '@mosaicstack/queue': + specifier: workspace:* + version: link:../queue '@mosaicstack/types': specifier: workspace:* version: link:../types @@ -571,6 +574,9 @@ importers: '@mosaicstack/types': specifier: workspace:* version: link:../types + commander: + specifier: ^13.0.0 + version: 13.1.0 ioredis: specifier: ^5.10.0 version: 5.10.0