feat(queue): add commander CLI for core queue ops (MQ-005)

This commit is contained in:
2026-03-06 09:29:41 -06:00
parent 5049d04977
commit 9b2db93afc
5 changed files with 579 additions and 0 deletions

View File

@@ -14,6 +14,7 @@
"test": "vitest run"
},
"dependencies": {
"commander": "^14.0.3",
"ioredis": "^5.10.0"
}
}

345
packages/queue/src/cli.ts Normal file
View File

@@ -0,0 +1,345 @@
import {
Command,
CommanderError,
InvalidArgumentError,
Option,
} from 'commander';
import { assertRedisHealthy, createRedisClient } from './redis-connection.js';
import {
RedisTaskRepository,
type ClaimTaskInput,
type CompleteTaskInput,
type RedisTaskClient,
} from './task-repository.js';
import {
TASK_LANES,
TASK_PRIORITIES,
TASK_STATUSES,
type CreateTaskInput,
type TaskLane,
type TaskListFilters,
type TaskPriority,
type TaskStatus,
} from './task.js';
export type QueueRepository = Pick<
RedisTaskRepository,
'create' | 'list' | 'get' | 'claim' | 'release' | 'complete'
>;
export interface QueueRepositorySession {
readonly repository: QueueRepository;
readonly close: () => Promise<void>;
}
export interface QueueCliDependencies {
readonly openSession: () => Promise<QueueRepositorySession>;
readonly stdout: (line: string) => void;
readonly stderr: (line: string) => void;
}
interface CreateCommandOptions {
readonly title: string;
readonly description?: string;
readonly priority?: TaskPriority;
readonly lane?: TaskLane;
readonly dependency?: string[];
}
interface ListCommandOptions {
readonly project?: string;
readonly mission?: string;
readonly status?: TaskStatus;
}
interface ClaimCommandOptions {
readonly agent: string;
readonly ttl: number;
}
interface ReleaseCommandOptions {
readonly agent?: string;
}
interface CompleteCommandOptions {
readonly agent?: string;
readonly summary?: string;
}
interface ClosableRedisTaskClient extends RedisTaskClient {
quit(): Promise<string>;
}
const DEFAULT_DEPENDENCIES: QueueCliDependencies = {
openSession: openRedisSession,
stdout: (line: string) => {
console.log(line);
},
stderr: (line: string) => {
console.error(line);
},
};
const PRIORITY_SET = new Set<TaskPriority>(TASK_PRIORITIES);
const LANE_SET = new Set<TaskLane>(TASK_LANES);
const STATUS_SET = new Set<TaskStatus>(TASK_STATUSES);
export function buildQueueCli(
dependencyOverrides: Partial<QueueCliDependencies> = {},
): Command {
const dependencies = resolveDependencies(dependencyOverrides);
const program = new Command();
program
.name('mosaic')
.description('mosaic queue command line interface')
.exitOverride();
program.configureOutput({
writeOut: (output: string) => dependencies.stdout(output.trimEnd()),
writeErr: (output: string) => dependencies.stderr(output.trimEnd()),
});
const queue = program.command('queue').description('Manage queue tasks');
queue
.command('create <project> <mission> <taskId>')
.description('Create a queue task')
.requiredOption('--title <title>', 'Task title')
.option('--description <description>', 'Task description')
.addOption(
new Option('--priority <priority>', 'Task priority')
.choices(TASK_PRIORITIES)
.argParser(parsePriority),
)
.addOption(
new Option('--lane <lane>', 'Task lane').choices(TASK_LANES).argParser(parseLane),
)
.option('--dependency <taskIds...>', 'Task dependencies')
.action(
async (
project: string,
mission: string,
taskId: string,
options: CreateCommandOptions,
) => {
await withSession(dependencies, async (repository) => {
const payload: CreateTaskInput = {
project,
mission,
taskId,
title: options.title,
description: options.description,
priority: options.priority,
dependencies: options.dependency,
lane: options.lane,
};
const task = await repository.create(payload);
dependencies.stdout(JSON.stringify(task, null, 2));
});
},
);
queue
.command('list')
.description('List queue tasks')
.option('--project <project>', 'Filter by project')
.option('--mission <mission>', 'Filter by mission')
.addOption(
new Option('--status <status>', 'Filter by status')
.choices(TASK_STATUSES)
.argParser(parseStatus),
)
.action(async (options: ListCommandOptions) => {
await withSession(dependencies, async (repository) => {
const filters: TaskListFilters = {
project: options.project,
mission: options.mission,
status: options.status,
};
const tasks = await repository.list(filters);
dependencies.stdout(JSON.stringify(tasks, null, 2));
});
});
queue
.command('show <taskId>')
.description('Show a single queue task')
.action(async (taskId: string) => {
await withSession(dependencies, async (repository) => {
const task = await repository.get(taskId);
if (task === null) {
throw new Error(`Task ${taskId} was not found.`);
}
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
queue
.command('claim <taskId>')
.description('Claim a pending task')
.requiredOption('--agent <agentId>', 'Agent identifier')
.requiredOption('--ttl <seconds>', 'Claim TTL in seconds', parsePositiveInteger)
.action(async (taskId: string, options: ClaimCommandOptions) => {
await withSession(dependencies, async (repository) => {
const claimInput: ClaimTaskInput = {
agentId: options.agent,
ttlSeconds: options.ttl,
};
const task = await repository.claim(taskId, claimInput);
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
queue
.command('release <taskId>')
.description('Release a claimed task back to pending')
.option('--agent <agentId>', 'Expected owner agent id')
.action(async (taskId: string, options: ReleaseCommandOptions) => {
await withSession(dependencies, async (repository) => {
const task = await repository.release(taskId, {
agentId: options.agent,
});
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
queue
.command('complete <taskId>')
.description('Complete a claimed task')
.option('--agent <agentId>', 'Expected owner agent id')
.option('--summary <summary>', 'Optional completion summary')
.action(async (taskId: string, options: CompleteCommandOptions) => {
await withSession(dependencies, async (repository) => {
const completeInput: CompleteTaskInput = {
agentId: options.agent,
summary: options.summary,
};
const task = await repository.complete(taskId, completeInput);
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
return program;
}
export async function runQueueCli(
argv: string[] = process.argv,
dependencyOverrides: Partial<QueueCliDependencies> = {},
): Promise<number> {
const dependencies = resolveDependencies(dependencyOverrides);
const program = buildQueueCli(dependencies);
try {
await program.parseAsync(argv, {
from: 'node',
});
return 0;
} catch (error) {
if (error instanceof CommanderError) {
if (error.code === 'commander.helpDisplayed') {
return 0;
}
if (error.code.startsWith('commander.')) {
return error.exitCode;
}
}
dependencies.stderr(formatError(error));
return 1;
}
}
async function openRedisSession(): Promise<QueueRepositorySession> {
const redisClient = createRedisClient<ClosableRedisTaskClient>();
try {
await assertRedisHealthy(redisClient);
return {
repository: new RedisTaskRepository({
client: redisClient,
}),
close: async () => {
await redisClient.quit();
},
};
} catch (error) {
await redisClient.quit();
throw error;
}
}
async function withSession(
dependencies: QueueCliDependencies,
action: (repository: QueueRepository) => Promise<void>,
): Promise<void> {
const session = await dependencies.openSession();
try {
await action(session.repository);
} finally {
await session.close();
}
}
function resolveDependencies(
overrides: Partial<QueueCliDependencies>,
): QueueCliDependencies {
const openSession = overrides.openSession ?? DEFAULT_DEPENDENCIES.openSession;
const stdout = overrides.stdout ?? DEFAULT_DEPENDENCIES.stdout;
const stderr = overrides.stderr ?? DEFAULT_DEPENDENCIES.stderr;
return {
openSession: () => openSession(),
stdout: (line: string) => stdout(line),
stderr: (line: string) => stderr(line),
};
}
function parsePositiveInteger(value: string): number {
const parsed = Number.parseInt(value, 10);
if (!Number.isInteger(parsed) || parsed <= 0) {
throw new InvalidArgumentError(`Expected a positive integer, received "${value}"`);
}
return parsed;
}
function parsePriority(value: string): TaskPriority {
if (!PRIORITY_SET.has(value as TaskPriority)) {
throw new InvalidArgumentError(
`Expected one of ${TASK_PRIORITIES.join(', ')}, received "${value}"`,
);
}
return value as TaskPriority;
}
function parseLane(value: string): TaskLane {
if (!LANE_SET.has(value as TaskLane)) {
throw new InvalidArgumentError(
`Expected one of ${TASK_LANES.join(', ')}, received "${value}"`,
);
}
return value as TaskLane;
}
function parseStatus(value: string): TaskStatus {
if (!STATUS_SET.has(value as TaskStatus)) {
throw new InvalidArgumentError(
`Expected one of ${TASK_STATUSES.join(', ')}, received "${value}"`,
);
}
return value as TaskStatus;
}
function formatError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

View File

@@ -45,3 +45,9 @@ export type {
TaskStatus,
TaskUpdateInput,
} from './task.js';
export { buildQueueCli, runQueueCli } from './cli.js';
export type {
QueueCliDependencies,
QueueRepository,
QueueRepositorySession,
} from './cli.js';

View File

@@ -0,0 +1,218 @@
import { describe, expect, it, vi } from 'vitest';
import { runQueueCli, type QueueCliDependencies, type QueueRepository } from '../src/cli.js';
function createRepositoryMock(): QueueRepository {
return {
create: vi.fn(() =>
Promise.resolve({
id: 'MQ-005',
project: 'queue',
mission: 'phase1',
taskId: 'MQ-005',
title: 'Build CLI',
status: 'pending',
priority: 'medium',
dependencies: [],
lane: 'any',
retryCount: 0,
createdAt: 1,
updatedAt: 1,
}),
),
list: vi.fn(() => Promise.resolve([])),
get: vi.fn(() => Promise.resolve(null)),
claim: vi.fn(() =>
Promise.resolve({
id: 'MQ-005',
project: 'queue',
mission: 'phase1',
taskId: 'MQ-005',
title: 'Build CLI',
status: 'claimed',
priority: 'medium',
dependencies: [],
lane: 'any',
claimedBy: 'agent-a',
claimedAt: 2,
claimTTL: 60,
retryCount: 0,
createdAt: 1,
updatedAt: 2,
}),
),
release: vi.fn(() =>
Promise.resolve({
id: 'MQ-005',
project: 'queue',
mission: 'phase1',
taskId: 'MQ-005',
title: 'Build CLI',
status: 'pending',
priority: 'medium',
dependencies: [],
lane: 'any',
retryCount: 0,
createdAt: 1,
updatedAt: 3,
}),
),
complete: vi.fn(() =>
Promise.resolve({
id: 'MQ-005',
project: 'queue',
mission: 'phase1',
taskId: 'MQ-005',
title: 'Build CLI',
status: 'completed',
priority: 'medium',
dependencies: [],
lane: 'any',
completionSummary: 'done',
retryCount: 0,
createdAt: 1,
updatedAt: 4,
completedAt: 4,
}),
),
};
}
function createDependencies(
repository: QueueRepository,
): QueueCliDependencies & { outputs: string[]; errors: string[] } {
const outputs: string[] = [];
const errors: string[] = [];
const close = vi.fn(() => Promise.resolve(undefined));
return {
openSession: () =>
Promise.resolve({
repository,
close,
}),
stdout: (line) => {
outputs.push(line);
},
stderr: (line) => {
errors.push(line);
},
outputs,
errors,
};
}
describe('runQueueCli', () => {
it('creates a task from command options', async () => {
const repository = createRepositoryMock();
const dependencies = createDependencies(repository);
const exitCode = await runQueueCli(
[
'node',
'mosaic',
'queue',
'create',
'queue',
'phase1',
'MQ-005',
'--title',
'Build CLI',
'--priority',
'high',
'--lane',
'coding',
'--dependency',
'MQ-002',
'MQ-003',
],
dependencies,
);
expect(exitCode).toBe(0);
expect(repository.create).toHaveBeenCalledWith({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-005',
title: 'Build CLI',
description: undefined,
priority: 'high',
dependencies: ['MQ-002', 'MQ-003'],
lane: 'coding',
});
});
it('lists tasks with filters', async () => {
const repository = createRepositoryMock();
const dependencies = createDependencies(repository);
const exitCode = await runQueueCli(
[
'node',
'mosaic',
'queue',
'list',
'--project',
'queue',
'--mission',
'phase1',
'--status',
'pending',
],
dependencies,
);
expect(exitCode).toBe(0);
expect(repository.list).toHaveBeenCalledWith({
project: 'queue',
mission: 'phase1',
status: 'pending',
});
});
it('claims and completes tasks with typed options', async () => {
const repository = createRepositoryMock();
const dependencies = createDependencies(repository);
const claimExitCode = await runQueueCli(
[
'node',
'mosaic',
'queue',
'claim',
'MQ-005',
'--agent',
'agent-a',
'--ttl',
'60',
],
dependencies,
);
const completeExitCode = await runQueueCli(
[
'node',
'mosaic',
'queue',
'complete',
'MQ-005',
'--agent',
'agent-a',
'--summary',
'done',
],
dependencies,
);
expect(claimExitCode).toBe(0);
expect(completeExitCode).toBe(0);
expect(repository.claim).toHaveBeenCalledWith('MQ-005', {
agentId: 'agent-a',
ttlSeconds: 60,
});
expect(repository.complete).toHaveBeenCalledWith('MQ-005', {
agentId: 'agent-a',
summary: 'done',
});
});
});