From 9b2db93afcb4f3ab66e030e1d4887c7f66ca1b78 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Fri, 6 Mar 2026 09:29:41 -0600 Subject: [PATCH] feat(queue): add commander CLI for core queue ops (MQ-005) --- packages/queue/package.json | 1 + packages/queue/src/cli.ts | 345 +++++++++++++++++++++++++++++++ packages/queue/src/index.ts | 6 + packages/queue/tests/cli.test.ts | 218 +++++++++++++++++++ pnpm-lock.yaml | 9 + 5 files changed, 579 insertions(+) create mode 100644 packages/queue/src/cli.ts create mode 100644 packages/queue/tests/cli.test.ts diff --git a/packages/queue/package.json b/packages/queue/package.json index 5aa258b..8942e0e 100644 --- a/packages/queue/package.json +++ b/packages/queue/package.json @@ -14,6 +14,7 @@ "test": "vitest run" }, "dependencies": { + "commander": "^14.0.3", "ioredis": "^5.10.0" } } diff --git a/packages/queue/src/cli.ts b/packages/queue/src/cli.ts new file mode 100644 index 0000000..a42bf25 --- /dev/null +++ b/packages/queue/src/cli.ts @@ -0,0 +1,345 @@ +import { + Command, + CommanderError, + InvalidArgumentError, + Option, +} from 'commander'; + +import { assertRedisHealthy, createRedisClient } from './redis-connection.js'; +import { + RedisTaskRepository, + type ClaimTaskInput, + type CompleteTaskInput, + type RedisTaskClient, +} from './task-repository.js'; +import { + TASK_LANES, + TASK_PRIORITIES, + TASK_STATUSES, + type CreateTaskInput, + type TaskLane, + type TaskListFilters, + type TaskPriority, + type TaskStatus, +} from './task.js'; + +export type QueueRepository = Pick< + RedisTaskRepository, + 'create' | 'list' | 'get' | 'claim' | 'release' | 'complete' +>; + +export interface QueueRepositorySession { + readonly repository: QueueRepository; + readonly close: () => Promise; +} + +export interface QueueCliDependencies { + readonly openSession: () => Promise; + readonly stdout: (line: string) => void; + readonly stderr: (line: string) => void; +} + +interface CreateCommandOptions { + readonly title: string; + readonly description?: string; + readonly priority?: TaskPriority; + readonly lane?: TaskLane; + readonly dependency?: string[]; +} + +interface ListCommandOptions { + readonly project?: string; + readonly mission?: string; + readonly status?: TaskStatus; +} + +interface ClaimCommandOptions { + readonly agent: string; + readonly ttl: number; +} + +interface ReleaseCommandOptions { + readonly agent?: string; +} + +interface CompleteCommandOptions { + readonly agent?: string; + readonly summary?: string; +} + +interface ClosableRedisTaskClient extends RedisTaskClient { + quit(): Promise; +} + +const DEFAULT_DEPENDENCIES: QueueCliDependencies = { + openSession: openRedisSession, + stdout: (line: string) => { + console.log(line); + }, + stderr: (line: string) => { + console.error(line); + }, +}; + +const PRIORITY_SET = new Set(TASK_PRIORITIES); +const LANE_SET = new Set(TASK_LANES); +const STATUS_SET = new Set(TASK_STATUSES); + +export function buildQueueCli( + dependencyOverrides: Partial = {}, +): Command { + const dependencies = resolveDependencies(dependencyOverrides); + const program = new Command(); + program + .name('mosaic') + .description('mosaic queue command line interface') + .exitOverride(); + + program.configureOutput({ + writeOut: (output: string) => dependencies.stdout(output.trimEnd()), + writeErr: (output: string) => dependencies.stderr(output.trimEnd()), + }); + + const queue = program.command('queue').description('Manage queue tasks'); + + queue + .command('create ') + .description('Create a queue task') + .requiredOption('--title ', 'Task title') + .option('--description <description>', 'Task description') + .addOption( + new Option('--priority <priority>', 'Task priority') + .choices(TASK_PRIORITIES) + .argParser(parsePriority), + ) + .addOption( + new Option('--lane <lane>', 'Task lane').choices(TASK_LANES).argParser(parseLane), + ) + .option('--dependency <taskIds...>', 'Task dependencies') + .action( + async ( + project: string, + mission: string, + taskId: string, + options: CreateCommandOptions, + ) => { + await withSession(dependencies, async (repository) => { + const payload: CreateTaskInput = { + project, + mission, + taskId, + title: options.title, + description: options.description, + priority: options.priority, + dependencies: options.dependency, + lane: options.lane, + }; + const task = await repository.create(payload); + dependencies.stdout(JSON.stringify(task, null, 2)); + }); + }, + ); + + queue + .command('list') + .description('List queue tasks') + .option('--project <project>', 'Filter by project') + .option('--mission <mission>', 'Filter by mission') + .addOption( + new Option('--status <status>', 'Filter by status') + .choices(TASK_STATUSES) + .argParser(parseStatus), + ) + .action(async (options: ListCommandOptions) => { + await withSession(dependencies, async (repository) => { + const filters: TaskListFilters = { + project: options.project, + mission: options.mission, + status: options.status, + }; + const tasks = await repository.list(filters); + dependencies.stdout(JSON.stringify(tasks, null, 2)); + }); + }); + + queue + .command('show <taskId>') + .description('Show a single queue task') + .action(async (taskId: string) => { + await withSession(dependencies, async (repository) => { + const task = await repository.get(taskId); + + if (task === null) { + throw new Error(`Task ${taskId} was not found.`); + } + + dependencies.stdout(JSON.stringify(task, null, 2)); + }); + }); + + queue + .command('claim <taskId>') + .description('Claim a pending task') + .requiredOption('--agent <agentId>', 'Agent identifier') + .requiredOption('--ttl <seconds>', 'Claim TTL in seconds', parsePositiveInteger) + .action(async (taskId: string, options: ClaimCommandOptions) => { + await withSession(dependencies, async (repository) => { + const claimInput: ClaimTaskInput = { + agentId: options.agent, + ttlSeconds: options.ttl, + }; + const task = await repository.claim(taskId, claimInput); + dependencies.stdout(JSON.stringify(task, null, 2)); + }); + }); + + queue + .command('release <taskId>') + .description('Release a claimed task back to pending') + .option('--agent <agentId>', 'Expected owner agent id') + .action(async (taskId: string, options: ReleaseCommandOptions) => { + await withSession(dependencies, async (repository) => { + const task = await repository.release(taskId, { + agentId: options.agent, + }); + dependencies.stdout(JSON.stringify(task, null, 2)); + }); + }); + + queue + .command('complete <taskId>') + .description('Complete a claimed task') + .option('--agent <agentId>', 'Expected owner agent id') + .option('--summary <summary>', 'Optional completion summary') + .action(async (taskId: string, options: CompleteCommandOptions) => { + await withSession(dependencies, async (repository) => { + const completeInput: CompleteTaskInput = { + agentId: options.agent, + summary: options.summary, + }; + const task = await repository.complete(taskId, completeInput); + dependencies.stdout(JSON.stringify(task, null, 2)); + }); + }); + + return program; +} + +export async function runQueueCli( + argv: string[] = process.argv, + dependencyOverrides: Partial<QueueCliDependencies> = {}, +): Promise<number> { + const dependencies = resolveDependencies(dependencyOverrides); + const program = buildQueueCli(dependencies); + + try { + await program.parseAsync(argv, { + from: 'node', + }); + return 0; + } catch (error) { + if (error instanceof CommanderError) { + if (error.code === 'commander.helpDisplayed') { + return 0; + } + + if (error.code.startsWith('commander.')) { + return error.exitCode; + } + } + + dependencies.stderr(formatError(error)); + return 1; + } +} + +async function openRedisSession(): Promise<QueueRepositorySession> { + const redisClient = createRedisClient<ClosableRedisTaskClient>(); + + try { + await assertRedisHealthy(redisClient); + + return { + repository: new RedisTaskRepository({ + client: redisClient, + }), + close: async () => { + await redisClient.quit(); + }, + }; + } catch (error) { + await redisClient.quit(); + throw error; + } +} + +async function withSession( + dependencies: QueueCliDependencies, + action: (repository: QueueRepository) => Promise<void>, +): Promise<void> { + const session = await dependencies.openSession(); + + try { + await action(session.repository); + } finally { + await session.close(); + } +} + +function resolveDependencies( + overrides: Partial<QueueCliDependencies>, +): QueueCliDependencies { + const openSession = overrides.openSession ?? DEFAULT_DEPENDENCIES.openSession; + const stdout = overrides.stdout ?? DEFAULT_DEPENDENCIES.stdout; + const stderr = overrides.stderr ?? DEFAULT_DEPENDENCIES.stderr; + + return { + openSession: () => openSession(), + stdout: (line: string) => stdout(line), + stderr: (line: string) => stderr(line), + }; +} + +function parsePositiveInteger(value: string): number { + const parsed = Number.parseInt(value, 10); + + if (!Number.isInteger(parsed) || parsed <= 0) { + throw new InvalidArgumentError(`Expected a positive integer, received "${value}"`); + } + + return parsed; +} + +function parsePriority(value: string): TaskPriority { + if (!PRIORITY_SET.has(value as TaskPriority)) { + throw new InvalidArgumentError( + `Expected one of ${TASK_PRIORITIES.join(', ')}, received "${value}"`, + ); + } + + return value as TaskPriority; +} + +function parseLane(value: string): TaskLane { + if (!LANE_SET.has(value as TaskLane)) { + throw new InvalidArgumentError( + `Expected one of ${TASK_LANES.join(', ')}, received "${value}"`, + ); + } + + return value as TaskLane; +} + +function parseStatus(value: string): TaskStatus { + if (!STATUS_SET.has(value as TaskStatus)) { + throw new InvalidArgumentError( + `Expected one of ${TASK_STATUSES.join(', ')}, received "${value}"`, + ); + } + + return value as TaskStatus; +} + +function formatError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index 1a005ff..f991cac 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -45,3 +45,9 @@ export type { TaskStatus, TaskUpdateInput, } from './task.js'; +export { buildQueueCli, runQueueCli } from './cli.js'; +export type { + QueueCliDependencies, + QueueRepository, + QueueRepositorySession, +} from './cli.js'; diff --git a/packages/queue/tests/cli.test.ts b/packages/queue/tests/cli.test.ts new file mode 100644 index 0000000..7101eac --- /dev/null +++ b/packages/queue/tests/cli.test.ts @@ -0,0 +1,218 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { runQueueCli, type QueueCliDependencies, type QueueRepository } from '../src/cli.js'; + +function createRepositoryMock(): QueueRepository { + return { + create: vi.fn(() => + Promise.resolve({ + id: 'MQ-005', + project: 'queue', + mission: 'phase1', + taskId: 'MQ-005', + title: 'Build CLI', + status: 'pending', + priority: 'medium', + dependencies: [], + lane: 'any', + retryCount: 0, + createdAt: 1, + updatedAt: 1, + }), + ), + list: vi.fn(() => Promise.resolve([])), + get: vi.fn(() => Promise.resolve(null)), + claim: vi.fn(() => + Promise.resolve({ + id: 'MQ-005', + project: 'queue', + mission: 'phase1', + taskId: 'MQ-005', + title: 'Build CLI', + status: 'claimed', + priority: 'medium', + dependencies: [], + lane: 'any', + claimedBy: 'agent-a', + claimedAt: 2, + claimTTL: 60, + retryCount: 0, + createdAt: 1, + updatedAt: 2, + }), + ), + release: vi.fn(() => + Promise.resolve({ + id: 'MQ-005', + project: 'queue', + mission: 'phase1', + taskId: 'MQ-005', + title: 'Build CLI', + status: 'pending', + priority: 'medium', + dependencies: [], + lane: 'any', + retryCount: 0, + createdAt: 1, + updatedAt: 3, + }), + ), + complete: vi.fn(() => + Promise.resolve({ + id: 'MQ-005', + project: 'queue', + mission: 'phase1', + taskId: 'MQ-005', + title: 'Build CLI', + status: 'completed', + priority: 'medium', + dependencies: [], + lane: 'any', + completionSummary: 'done', + retryCount: 0, + createdAt: 1, + updatedAt: 4, + completedAt: 4, + }), + ), + }; +} + +function createDependencies( + repository: QueueRepository, +): QueueCliDependencies & { outputs: string[]; errors: string[] } { + const outputs: string[] = []; + const errors: string[] = []; + const close = vi.fn(() => Promise.resolve(undefined)); + + return { + openSession: () => + Promise.resolve({ + repository, + close, + }), + stdout: (line) => { + outputs.push(line); + }, + stderr: (line) => { + errors.push(line); + }, + outputs, + errors, + }; +} + +describe('runQueueCli', () => { + it('creates a task from command options', async () => { + const repository = createRepositoryMock(); + const dependencies = createDependencies(repository); + + const exitCode = await runQueueCli( + [ + 'node', + 'mosaic', + 'queue', + 'create', + 'queue', + 'phase1', + 'MQ-005', + '--title', + 'Build CLI', + '--priority', + 'high', + '--lane', + 'coding', + '--dependency', + 'MQ-002', + 'MQ-003', + ], + dependencies, + ); + + expect(exitCode).toBe(0); + expect(repository.create).toHaveBeenCalledWith({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-005', + title: 'Build CLI', + description: undefined, + priority: 'high', + dependencies: ['MQ-002', 'MQ-003'], + lane: 'coding', + }); + }); + + it('lists tasks with filters', async () => { + const repository = createRepositoryMock(); + const dependencies = createDependencies(repository); + + const exitCode = await runQueueCli( + [ + 'node', + 'mosaic', + 'queue', + 'list', + '--project', + 'queue', + '--mission', + 'phase1', + '--status', + 'pending', + ], + dependencies, + ); + + expect(exitCode).toBe(0); + expect(repository.list).toHaveBeenCalledWith({ + project: 'queue', + mission: 'phase1', + status: 'pending', + }); + }); + + it('claims and completes tasks with typed options', async () => { + const repository = createRepositoryMock(); + const dependencies = createDependencies(repository); + + const claimExitCode = await runQueueCli( + [ + 'node', + 'mosaic', + 'queue', + 'claim', + 'MQ-005', + '--agent', + 'agent-a', + '--ttl', + '60', + ], + dependencies, + ); + + const completeExitCode = await runQueueCli( + [ + 'node', + 'mosaic', + 'queue', + 'complete', + 'MQ-005', + '--agent', + 'agent-a', + '--summary', + 'done', + ], + dependencies, + ); + + expect(claimExitCode).toBe(0); + expect(completeExitCode).toBe(0); + expect(repository.claim).toHaveBeenCalledWith('MQ-005', { + agentId: 'agent-a', + ttlSeconds: 60, + }); + expect(repository.complete).toHaveBeenCalledWith('MQ-005', { + agentId: 'agent-a', + summary: 'done', + }); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index acb4068..7030b18 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -29,6 +29,9 @@ importers: packages/queue: dependencies: + commander: + specifier: ^14.0.3 + version: 14.0.3 ioredis: specifier: ^5.10.0 version: 5.10.0 @@ -548,6 +551,10 @@ packages: color-name@1.1.4: resolution: {integrity: sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==} + commander@14.0.3: + resolution: {integrity: sha512-H+y0Jo/T1RZ9qPP4Eh1pkcQcLRglraJaSLoyOtHxu6AapkjWVCy2Sit1QQ4x3Dng8qDlSsZEet7g5Pq06MvTgw==} + engines: {node: '>=20'} + concat-map@0.0.1: resolution: {integrity: sha512-/Srv4dswyQNBfohGpz9o6Yb3Gz3SrUDqBH5rTuhGR7ahtlbYKnVxw2bCFMRljaA7EXHaXZ8wsHdodFvbkhKmqg==} @@ -1447,6 +1454,8 @@ snapshots: color-name@1.1.4: {} + commander@14.0.3: {} + concat-map@0.0.1: {} cross-spawn@7.0.6: