From 727b3defc9f0c06f664f28e31fb02074c561eaf7 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Fri, 6 Mar 2026 13:18:22 -0600 Subject: [PATCH] feat(queue): stage queue migration package --- packages/queue/README.md | 14 + packages/queue/bin/mosaic-queue-mcp.ts | 11 + packages/queue/bin/mosaic-queue.ts | 6 + packages/queue/package.json | 40 ++ packages/queue/src/cli.ts | 343 ++++++++++ packages/queue/src/index.ts | 69 ++ packages/queue/src/mcp-server.ts | 309 +++++++++ packages/queue/src/mcp-tool-schemas.ts | 44 ++ packages/queue/src/redis-connection.ts | 95 +++ packages/queue/src/task-repository.ts | 632 ++++++++++++++++++ packages/queue/src/task.ts | 12 + packages/queue/tests/cli.test.ts | 218 ++++++ packages/queue/tests/mcp-server.test.ts | 50 ++ packages/queue/tests/mcp-tool-schemas.test.ts | 90 +++ packages/queue/tests/redis-connection.test.ts | 76 +++ packages/queue/tests/smoke.test.ts | 9 + packages/queue/tests/task-atomic.test.ts | 459 +++++++++++++ packages/queue/tests/task-repository.test.ts | 332 +++++++++ packages/queue/tsconfig.build.json | 10 + packages/queue/tsconfig.json | 8 + packages/queue/vitest.config.ts | 8 + 21 files changed, 2835 insertions(+) create mode 100644 packages/queue/README.md create mode 100644 packages/queue/bin/mosaic-queue-mcp.ts create mode 100644 packages/queue/bin/mosaic-queue.ts create mode 100644 packages/queue/package.json create mode 100644 packages/queue/src/cli.ts create mode 100644 packages/queue/src/index.ts create mode 100644 packages/queue/src/mcp-server.ts create mode 100644 packages/queue/src/mcp-tool-schemas.ts create mode 100644 packages/queue/src/redis-connection.ts create mode 100644 packages/queue/src/task-repository.ts create mode 100644 packages/queue/src/task.ts create mode 100644 packages/queue/tests/cli.test.ts create mode 100644 packages/queue/tests/mcp-server.test.ts create mode 100644 packages/queue/tests/mcp-tool-schemas.test.ts create mode 100644 packages/queue/tests/redis-connection.test.ts create mode 100644 packages/queue/tests/smoke.test.ts create mode 100644 packages/queue/tests/task-atomic.test.ts create mode 100644 packages/queue/tests/task-repository.test.ts create mode 100644 packages/queue/tsconfig.build.json create mode 100644 packages/queue/tsconfig.json create mode 100644 packages/queue/vitest.config.ts diff --git a/packages/queue/README.md b/packages/queue/README.md new file mode 100644 index 0000000..fcdc888 --- /dev/null +++ b/packages/queue/README.md @@ -0,0 +1,14 @@ +# @mosaic/queue + +Valkey-backed task queue package for Mosaic monorepo migration. + +## Exports + +- Queue repository logic and Redis connection helpers +- CLI runner (`mosaic-queue`) +- MCP server runner (`mosaic-queue-mcp`) +- Runtime task constants (`TASK_STATUSES`, `TASK_PRIORITIES`, `TASK_LANES`) + +## Note + +Task type definitions are expected to come from `@mosaic/types`. diff --git a/packages/queue/bin/mosaic-queue-mcp.ts b/packages/queue/bin/mosaic-queue-mcp.ts new file mode 100644 index 0000000..6e45fc8 --- /dev/null +++ b/packages/queue/bin/mosaic-queue-mcp.ts @@ -0,0 +1,11 @@ +#!/usr/bin/env node + +import { startQueueMcpServer } from '../src/mcp-server.js'; + +try { + await startQueueMcpServer(); +} catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error(message); + process.exitCode = 1; +} diff --git a/packages/queue/bin/mosaic-queue.ts b/packages/queue/bin/mosaic-queue.ts new file mode 100644 index 0000000..e52b1fb --- /dev/null +++ b/packages/queue/bin/mosaic-queue.ts @@ -0,0 +1,6 @@ +#!/usr/bin/env node + +import { runQueueCli } from '../src/cli.js'; + +const exitCode = await runQueueCli(process.argv); +process.exitCode = exitCode; diff --git a/packages/queue/package.json b/packages/queue/package.json new file mode 100644 index 0000000..e3013c1 --- /dev/null +++ b/packages/queue/package.json @@ -0,0 +1,40 @@ +{ + "name": "@mosaic/queue", + "version": "0.1.0", + "description": "Valkey-backed task queue exposed via CLI and MCP", + "license": "MIT", + "type": "module", + "main": "dist/src/index.js", + "types": "dist/src/index.d.ts", + "exports": { + ".": { + "types": "./dist/src/index.d.ts", + "import": "./dist/src/index.js" + } + }, + "bin": { + "mosaic-queue": "dist/bin/mosaic-queue.js", + "mosaic-queue-mcp": "dist/bin/mosaic-queue-mcp.js" + }, + "files": [ + "dist" + ], + "scripts": { + "lint": "eslint \"src/**/*.ts\" \"tests/**/*.ts\" \"bin/**/*.ts\" \"vitest.config.ts\"", + "build": "tsc -p tsconfig.build.json", + "test": "vitest run", + "prepublishOnly": "pnpm lint && pnpm test && pnpm build" + }, + "engines": { + "node": ">=20.0.0" + }, + "publishConfig": { + "access": "public" + }, + "dependencies": { + "@modelcontextprotocol/sdk": "^1.27.1", + "commander": "^14.0.3", + "ioredis": "^5.10.0", + "zod": "^4.3.6" + } +} diff --git a/packages/queue/src/cli.ts b/packages/queue/src/cli.ts new file mode 100644 index 0000000..b8671af --- /dev/null +++ b/packages/queue/src/cli.ts @@ -0,0 +1,343 @@ +import { + Command, + CommanderError, + InvalidArgumentError, + Option, +} from 'commander'; + +import { assertRedisHealthy, createRedisClient } from './redis-connection.js'; +import { + RedisTaskRepository, + type ClaimTaskInput, + type CompleteTaskInput, + type RedisTaskClient, +} from './task-repository.js'; +import { TASK_LANES, TASK_PRIORITIES, TASK_STATUSES } from './task.js'; +import type { + CreateTaskInput, + TaskLane, + TaskListFilters, + TaskPriority, + TaskStatus, +} from '@mosaic/types'; + +export type QueueRepository = Pick< + RedisTaskRepository, + 'create' | 'list' | 'get' | 'claim' | 'release' | 'complete' +>; + +export interface QueueRepositorySession { + readonly repository: QueueRepository; + readonly close: () => Promise; +} + +export interface QueueCliDependencies { + readonly openSession: () => Promise; + readonly stdout: (line: string) => void; + readonly stderr: (line: string) => void; +} + +interface CreateCommandOptions { + readonly title: string; + readonly description?: string; + readonly priority?: TaskPriority; + readonly lane?: TaskLane; + readonly dependency?: string[]; +} + +interface ListCommandOptions { + readonly project?: string; + readonly mission?: string; + readonly status?: TaskStatus; +} + +interface ClaimCommandOptions { + readonly agent: string; + readonly ttl: number; +} + +interface ReleaseCommandOptions { + readonly agent?: string; +} + +interface CompleteCommandOptions { + readonly agent?: string; + readonly summary?: string; +} + +interface ClosableRedisTaskClient extends RedisTaskClient { + quit(): Promise; +} + +const DEFAULT_DEPENDENCIES: QueueCliDependencies = { + openSession: openRedisSession, + stdout: (line: string) => { + console.log(line); + }, + stderr: (line: string) => { + console.error(line); + }, +}; + +const PRIORITY_SET = new Set(TASK_PRIORITIES); +const LANE_SET = new Set(TASK_LANES); +const STATUS_SET = new Set(TASK_STATUSES); + +export function buildQueueCli( + dependencyOverrides: Partial = {}, +): Command { + const dependencies = resolveDependencies(dependencyOverrides); + const program = new Command(); + program + .name('mosaic') + .description('mosaic queue command line interface') + .exitOverride(); + + program.configureOutput({ + writeOut: (output: string) => dependencies.stdout(output.trimEnd()), + writeErr: (output: string) => dependencies.stderr(output.trimEnd()), + }); + + const queue = program.command('queue').description('Manage queue tasks'); + + queue + .command('create ') + .description('Create a queue task') + .requiredOption('--title ', 'Task title') + .option('--description <description>', 'Task description') + .addOption( + new Option('--priority <priority>', 'Task priority') + .choices(TASK_PRIORITIES) + .argParser(parsePriority), + ) + .addOption( + new Option('--lane <lane>', 'Task lane').choices(TASK_LANES).argParser(parseLane), + ) + .option('--dependency <taskIds...>', 'Task dependencies') + .action( + async ( + project: string, + mission: string, + taskId: string, + options: CreateCommandOptions, + ) => { + await withSession(dependencies, async (repository) => { + const payload: CreateTaskInput = { + project, + mission, + taskId, + title: options.title, + description: options.description, + priority: options.priority, + dependencies: options.dependency, + lane: options.lane, + }; + const task = await repository.create(payload); + dependencies.stdout(JSON.stringify(task, null, 2)); + }); + }, + ); + + queue + .command('list') + .description('List queue tasks') + .option('--project <project>', 'Filter by project') + .option('--mission <mission>', 'Filter by mission') + .addOption( + new Option('--status <status>', 'Filter by status') + .choices(TASK_STATUSES) + .argParser(parseStatus), + ) + .action(async (options: ListCommandOptions) => { + await withSession(dependencies, async (repository) => { + const filters: TaskListFilters = { + project: options.project, + mission: options.mission, + status: options.status, + }; + const tasks = await repository.list(filters); + dependencies.stdout(JSON.stringify(tasks, null, 2)); + }); + }); + + queue + .command('show <taskId>') + .description('Show a single queue task') + .action(async (taskId: string) => { + await withSession(dependencies, async (repository) => { + const task = await repository.get(taskId); + + if (task === null) { + throw new Error(`Task ${taskId} was not found.`); + } + + dependencies.stdout(JSON.stringify(task, null, 2)); + }); + }); + + queue + .command('claim <taskId>') + .description('Claim a pending task') + .requiredOption('--agent <agentId>', 'Agent identifier') + .requiredOption('--ttl <seconds>', 'Claim TTL in seconds', parsePositiveInteger) + .action(async (taskId: string, options: ClaimCommandOptions) => { + await withSession(dependencies, async (repository) => { + const claimInput: ClaimTaskInput = { + agentId: options.agent, + ttlSeconds: options.ttl, + }; + const task = await repository.claim(taskId, claimInput); + dependencies.stdout(JSON.stringify(task, null, 2)); + }); + }); + + queue + .command('release <taskId>') + .description('Release a claimed task back to pending') + .option('--agent <agentId>', 'Expected owner agent id') + .action(async (taskId: string, options: ReleaseCommandOptions) => { + await withSession(dependencies, async (repository) => { + const task = await repository.release(taskId, { + agentId: options.agent, + }); + dependencies.stdout(JSON.stringify(task, null, 2)); + }); + }); + + queue + .command('complete <taskId>') + .description('Complete a claimed task') + .option('--agent <agentId>', 'Expected owner agent id') + .option('--summary <summary>', 'Optional completion summary') + .action(async (taskId: string, options: CompleteCommandOptions) => { + await withSession(dependencies, async (repository) => { + const completeInput: CompleteTaskInput = { + agentId: options.agent, + summary: options.summary, + }; + const task = await repository.complete(taskId, completeInput); + dependencies.stdout(JSON.stringify(task, null, 2)); + }); + }); + + return program; +} + +export async function runQueueCli( + argv: string[] = process.argv, + dependencyOverrides: Partial<QueueCliDependencies> = {}, +): Promise<number> { + const dependencies = resolveDependencies(dependencyOverrides); + const program = buildQueueCli(dependencies); + + try { + await program.parseAsync(argv, { + from: 'node', + }); + return 0; + } catch (error) { + if (error instanceof CommanderError) { + if (error.code === 'commander.helpDisplayed') { + return 0; + } + + if (error.code.startsWith('commander.')) { + return error.exitCode; + } + } + + dependencies.stderr(formatError(error)); + return 1; + } +} + +async function openRedisSession(): Promise<QueueRepositorySession> { + const redisClient = createRedisClient<ClosableRedisTaskClient>(); + + try { + await assertRedisHealthy(redisClient); + + return { + repository: new RedisTaskRepository({ + client: redisClient, + }), + close: async () => { + await redisClient.quit(); + }, + }; + } catch (error) { + await redisClient.quit(); + throw error; + } +} + +async function withSession( + dependencies: QueueCliDependencies, + action: (repository: QueueRepository) => Promise<void>, +): Promise<void> { + const session = await dependencies.openSession(); + + try { + await action(session.repository); + } finally { + await session.close(); + } +} + +function resolveDependencies( + overrides: Partial<QueueCliDependencies>, +): QueueCliDependencies { + const openSession = overrides.openSession ?? DEFAULT_DEPENDENCIES.openSession; + const stdout = overrides.stdout ?? DEFAULT_DEPENDENCIES.stdout; + const stderr = overrides.stderr ?? DEFAULT_DEPENDENCIES.stderr; + + return { + openSession: () => openSession(), + stdout: (line: string) => stdout(line), + stderr: (line: string) => stderr(line), + }; +} + +function parsePositiveInteger(value: string): number { + const parsed = Number.parseInt(value, 10); + + if (!Number.isInteger(parsed) || parsed <= 0) { + throw new InvalidArgumentError(`Expected a positive integer, received "${value}"`); + } + + return parsed; +} + +function parsePriority(value: string): TaskPriority { + if (!PRIORITY_SET.has(value as TaskPriority)) { + throw new InvalidArgumentError( + `Expected one of ${TASK_PRIORITIES.join(', ')}, received "${value}"`, + ); + } + + return value as TaskPriority; +} + +function parseLane(value: string): TaskLane { + if (!LANE_SET.has(value as TaskLane)) { + throw new InvalidArgumentError( + `Expected one of ${TASK_LANES.join(', ')}, received "${value}"`, + ); + } + + return value as TaskLane; +} + +function parseStatus(value: string): TaskStatus { + if (!STATUS_SET.has(value as TaskStatus)) { + throw new InvalidArgumentError( + `Expected one of ${TASK_STATUSES.join(', ')}, received "${value}"`, + ); + } + + return value as TaskStatus; +} + +function formatError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts new file mode 100644 index 0000000..4618995 --- /dev/null +++ b/packages/queue/src/index.ts @@ -0,0 +1,69 @@ +export const packageVersion = '0.1.0'; + +export { + assertRedisHealthy, + createRedisClient, + resolveRedisUrl, + runRedisHealthCheck, +} from './redis-connection.js'; +export type { + CreateRedisClientOptions, + RedisClientConstructor, + RedisHealthCheck, + RedisPingClient, +} from './redis-connection.js'; +export { + RedisTaskRepository, + TaskAlreadyExistsError, + TaskAtomicConflictError, + TaskNotFoundError, + TaskOwnershipError, + TaskSerializationError, + TaskTransitionError, +} from './task-repository.js'; +export type { + ClaimTaskInput, + CompleteTaskInput, + FailTaskInput, + HeartbeatTaskInput, + RedisTaskClient, + RedisTaskRepositoryOptions, + RedisTaskTransaction, + ReleaseTaskInput, +} from './task-repository.js'; +export { TASK_LANES, TASK_PRIORITIES, TASK_STATUSES } from './task.js'; +export type { + CreateTaskInput, + Task, + TaskLane, + TaskListFilters, + TaskPriority, + TaskStatus, + TaskUpdateInput, +} from '@mosaic/types'; +export { buildQueueCli, runQueueCli } from './cli.js'; +export type { + QueueCliDependencies, + QueueRepository, + QueueRepositorySession, +} from './cli.js'; +export { + QUEUE_MCP_TOOL_DEFINITIONS, + buildQueueMcpServer, + startQueueMcpServer, +} from './mcp-server.js'; +export type { + QueueMcpDependencies, + QueueMcpRepository, + QueueMcpSession, +} from './mcp-server.js'; +export { + queueClaimToolInputSchema, + queueCompleteToolInputSchema, + queueFailToolInputSchema, + queueGetToolInputSchema, + queueHeartbeatToolInputSchema, + queueListToolInputSchema, + queueReleaseToolInputSchema, + queueStatusToolInputSchema, +} from './mcp-tool-schemas.js'; diff --git a/packages/queue/src/mcp-server.ts b/packages/queue/src/mcp-server.ts new file mode 100644 index 0000000..967a00a --- /dev/null +++ b/packages/queue/src/mcp-server.ts @@ -0,0 +1,309 @@ +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js'; +import type { CallToolResult, Implementation } from '@modelcontextprotocol/sdk/types.js'; +import type { z } from 'zod'; + +import { + assertRedisHealthy, + createRedisClient, + runRedisHealthCheck, + type RedisHealthCheck, +} from './redis-connection.js'; +import { + queueClaimToolInputSchema, + queueCompleteToolInputSchema, + queueFailToolInputSchema, + queueGetToolInputSchema, + queueHeartbeatToolInputSchema, + queueListToolInputSchema, + queueReleaseToolInputSchema, + queueStatusToolInputSchema, +} from './mcp-tool-schemas.js'; +import { + RedisTaskRepository, + type RedisTaskClient, +} from './task-repository.js'; +import { TASK_STATUSES } from './task.js'; +import type { Task, TaskStatus } from '@mosaic/types'; + +export type QueueMcpRepository = Pick< + RedisTaskRepository, + 'list' | 'get' | 'claim' | 'heartbeat' | 'release' | 'complete' | 'fail' +>; + +export interface QueueMcpSession { + readonly repository: QueueMcpRepository; + readonly checkHealth: () => Promise<RedisHealthCheck>; + readonly close: () => Promise<void>; +} + +export interface QueueMcpDependencies { + readonly openSession: () => Promise<QueueMcpSession>; + readonly serverInfo: Implementation; +} + +type ToolSchema = z.ZodObject<Record<string, z.ZodTypeAny>>; + +interface QueueMcpToolDefinition<TArgs extends z.ZodTypeAny> { + readonly name: string; + readonly description: string; + readonly inputSchema: TArgs; + readonly execute: ( + session: QueueMcpSession, + input: z.output<TArgs>, + ) => Promise<unknown>; +} + +interface ClosableRedisTaskClient extends RedisTaskClient { + quit(): Promise<string>; +} + +const DEFAULT_SERVER_INFO: Implementation = { + name: 'mosaic-queue', + version: '0.1.0', +}; + +const DEFAULT_DEPENDENCIES: QueueMcpDependencies = { + openSession: openRedisMcpSession, + serverInfo: DEFAULT_SERVER_INFO, +}; + +export const QUEUE_MCP_TOOL_DEFINITIONS = [ + { + name: 'queue_list', + description: 'List queue tasks with optional project/mission/status filters', + inputSchema: queueListToolInputSchema, + execute: async (session, input) => { + const tasks = await session.repository.list(input); + return { + tasks, + }; + }, + }, + { + name: 'queue_get', + description: 'Get a single queue task by taskId', + inputSchema: queueGetToolInputSchema, + execute: async (session, input) => { + const task = await session.repository.get(input.taskId); + return { + task, + }; + }, + }, + { + name: 'queue_claim', + description: 'Atomically claim a task for an agent', + inputSchema: queueClaimToolInputSchema, + execute: async (session, input) => { + const task = await session.repository.claim(input.taskId, { + agentId: input.agentId, + ttlSeconds: input.ttlSeconds, + }); + + return { + task, + }; + }, + }, + { + name: 'queue_heartbeat', + description: 'Refresh claim ownership TTL for a task', + inputSchema: queueHeartbeatToolInputSchema, + execute: async (session, input) => { + const task = await session.repository.heartbeat(input.taskId, { + agentId: input.agentId, + ttlSeconds: input.ttlSeconds, + }); + + return { + task, + }; + }, + }, + { + name: 'queue_release', + description: 'Release a claimed task back to pending', + inputSchema: queueReleaseToolInputSchema, + execute: async (session, input) => { + const task = await session.repository.release(input.taskId, { + agentId: input.agentId, + }); + + return { + task, + }; + }, + }, + { + name: 'queue_complete', + description: 'Mark a claimed task as completed', + inputSchema: queueCompleteToolInputSchema, + execute: async (session, input) => { + const task = await session.repository.complete(input.taskId, { + agentId: input.agentId, + summary: input.summary, + }); + + return { + task, + }; + }, + }, + { + name: 'queue_fail', + description: 'Mark a claimed task as failed with a reason', + inputSchema: queueFailToolInputSchema, + execute: async (session, input) => { + const task = await session.repository.fail(input.taskId, { + agentId: input.agentId, + reason: input.reason, + }); + + return { + task, + }; + }, + }, + { + name: 'queue_status', + description: 'Return queue health and task status counters', + inputSchema: queueStatusToolInputSchema, + execute: async (session) => { + const tasks = await session.repository.list({}); + const health = await session.checkHealth(); + const counts = countStatuses(tasks); + + return { + health, + counts, + total: tasks.length, + }; + }, + }, +] as const satisfies readonly QueueMcpToolDefinition<ToolSchema>[]; + +export function buildQueueMcpServer( + dependencyOverrides: Partial<QueueMcpDependencies> = {}, +): McpServer { + const dependencies = resolveDependencies(dependencyOverrides); + const server = new McpServer(dependencies.serverInfo); + + for (const definition of QUEUE_MCP_TOOL_DEFINITIONS) { + server.registerTool( + definition.name, + { + description: definition.description, + inputSchema: definition.inputSchema, + }, + async (args) => { + return withSession(dependencies, async (session) => { + try { + const parsedArgs = definition.inputSchema.parse(args); + const response = await definition.execute(session, parsedArgs); + return toToolResult(response); + } catch (error) { + return toToolErrorResult(error); + } + }); + }, + ); + } + + return server; +} + +export async function startQueueMcpServer( + dependencyOverrides: Partial<QueueMcpDependencies> = {}, +): Promise<McpServer> { + const server = buildQueueMcpServer(dependencyOverrides); + const transport = new StdioServerTransport(); + await server.connect(transport); + return server; +} + +function resolveDependencies( + overrides: Partial<QueueMcpDependencies>, +): QueueMcpDependencies { + const openSession = overrides.openSession ?? DEFAULT_DEPENDENCIES.openSession; + const serverInfo = overrides.serverInfo ?? DEFAULT_DEPENDENCIES.serverInfo; + + return { + openSession: () => openSession(), + serverInfo, + }; +} + +async function withSession( + dependencies: QueueMcpDependencies, + handler: (session: QueueMcpSession) => Promise<CallToolResult>, +): Promise<CallToolResult> { + const session = await dependencies.openSession(); + + try { + return await handler(session); + } finally { + await session.close(); + } +} + +async function openRedisMcpSession(): Promise<QueueMcpSession> { + const redisClient = createRedisClient<ClosableRedisTaskClient>(); + + try { + await assertRedisHealthy(redisClient); + + return { + repository: new RedisTaskRepository({ + client: redisClient, + }), + checkHealth: async () => runRedisHealthCheck(redisClient), + close: async () => { + await redisClient.quit(); + }, + }; + } catch (error) { + await redisClient.quit(); + throw error; + } +} + +function toToolResult(payload: unknown): CallToolResult { + return { + content: [ + { + type: 'text', + text: JSON.stringify(payload, null, 2), + }, + ], + }; +} + +function toToolErrorResult(error: unknown): CallToolResult { + return { + isError: true, + content: [ + { + type: 'text', + text: formatError(error), + }, + ], + }; +} + +function formatError(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + +function countStatuses(tasks: readonly Task[]): Record<TaskStatus, number> { + const counts = Object.fromEntries(TASK_STATUSES.map((status) => [status, 0])) as Record< + TaskStatus, + number + >; + + for (const task of tasks) { + counts[task.status] += 1; + } + + return counts; +} diff --git a/packages/queue/src/mcp-tool-schemas.ts b/packages/queue/src/mcp-tool-schemas.ts new file mode 100644 index 0000000..bd0c4f9 --- /dev/null +++ b/packages/queue/src/mcp-tool-schemas.ts @@ -0,0 +1,44 @@ +import { z } from 'zod'; + +import { TASK_STATUSES } from './task.js'; + +export const queueListToolInputSchema = z.object({ + project: z.string().min(1).optional(), + mission: z.string().min(1).optional(), + status: z.enum(TASK_STATUSES).optional(), +}); + +export const queueGetToolInputSchema = z.object({ + taskId: z.string().min(1), +}); + +export const queueClaimToolInputSchema = z.object({ + taskId: z.string().min(1), + agentId: z.string().min(1), + ttlSeconds: z.number().int().positive(), +}); + +export const queueHeartbeatToolInputSchema = z.object({ + taskId: z.string().min(1), + agentId: z.string().min(1).optional(), + ttlSeconds: z.number().int().positive().optional(), +}); + +export const queueReleaseToolInputSchema = z.object({ + taskId: z.string().min(1), + agentId: z.string().min(1).optional(), +}); + +export const queueCompleteToolInputSchema = z.object({ + taskId: z.string().min(1), + agentId: z.string().min(1).optional(), + summary: z.string().min(1).optional(), +}); + +export const queueFailToolInputSchema = z.object({ + taskId: z.string().min(1), + agentId: z.string().min(1).optional(), + reason: z.string().min(1), +}); + +export const queueStatusToolInputSchema = z.object({}); diff --git a/packages/queue/src/redis-connection.ts b/packages/queue/src/redis-connection.ts new file mode 100644 index 0000000..b7612b6 --- /dev/null +++ b/packages/queue/src/redis-connection.ts @@ -0,0 +1,95 @@ +import Redis, { type RedisOptions } from 'ioredis'; + +const ERR_MISSING_REDIS_URL = + 'Missing required Valkey/Redis connection URL. Set VALKEY_URL or REDIS_URL.'; + +export interface RedisHealthCheck { + readonly checkedAt: number; + readonly latencyMs: number; + readonly ok: boolean; + readonly response?: string; + readonly error?: string; +} + +export interface RedisPingClient { + ping(): Promise<string>; +} + +export type RedisClientConstructor<TClient> = new ( + url: string, + options?: RedisOptions, +) => TClient; + +export interface CreateRedisClientOptions<TClient> { + readonly env?: NodeJS.ProcessEnv; + readonly redisConstructor?: RedisClientConstructor<TClient>; + readonly redisOptions?: RedisOptions; +} + +export function resolveRedisUrl(env: NodeJS.ProcessEnv = process.env): string { + const resolvedUrl = env.VALKEY_URL ?? env.REDIS_URL; + + if (typeof resolvedUrl !== 'string' || resolvedUrl.trim().length === 0) { + throw new Error(ERR_MISSING_REDIS_URL); + } + + return resolvedUrl; +} + +export function createRedisClient<TClient = Redis>( + options: CreateRedisClientOptions<TClient> = {}, +): TClient { + const redisUrl = resolveRedisUrl(options.env); + + const RedisCtor = + options.redisConstructor ?? + (Redis as unknown as RedisClientConstructor<TClient>); + + return new RedisCtor(redisUrl, { + maxRetriesPerRequest: null, + ...options.redisOptions, + }); +} + +export async function runRedisHealthCheck( + client: RedisPingClient, +): Promise<RedisHealthCheck> { + const startedAt = process.hrtime.bigint(); + + try { + const response = await client.ping(); + const elapsedMs = Number((process.hrtime.bigint() - startedAt) / 1_000_000n); + + return { + checkedAt: Date.now(), + latencyMs: elapsedMs, + ok: true, + response, + }; + } catch (error) { + const elapsedMs = Number((process.hrtime.bigint() - startedAt) / 1_000_000n); + const message = + error instanceof Error ? error.message : 'Unknown redis health check error'; + + return { + checkedAt: Date.now(), + latencyMs: elapsedMs, + ok: false, + error: message, + }; + } +} + +export async function assertRedisHealthy( + client: RedisPingClient, +): Promise<RedisHealthCheck> { + const health = await runRedisHealthCheck(client); + + if (!health.ok) { + throw new Error( + `Redis health check failed after ${health.latencyMs}ms: ${health.error ?? 'unknown error'}`, + ); + } + + return health; +} diff --git a/packages/queue/src/task-repository.ts b/packages/queue/src/task-repository.ts new file mode 100644 index 0000000..3b08959 --- /dev/null +++ b/packages/queue/src/task-repository.ts @@ -0,0 +1,632 @@ +import { TASK_LANES, TASK_PRIORITIES, TASK_STATUSES } from './task.js'; +import type { + CreateTaskInput, + Task, + TaskLane, + TaskListFilters, + TaskPriority, + TaskStatus, + TaskUpdateInput, +} from '@mosaic/types'; + +const STATUS_SET = new Set<TaskStatus>(TASK_STATUSES); +const PRIORITY_SET = new Set<TaskPriority>(TASK_PRIORITIES); +const LANE_SET = new Set<TaskLane>(TASK_LANES); + +const DEFAULT_KEY_PREFIX = 'mosaic:queue'; +const MAX_ATOMIC_RETRIES = 8; +const UPDATE_ALLOWED_STATUS_TRANSITIONS: Readonly<Record<TaskStatus, readonly TaskStatus[]>> = { + pending: ['blocked'], + blocked: ['pending'], + claimed: ['in-progress'], + 'in-progress': ['claimed'], + completed: [], + failed: [], +}; + +interface RepositoryKeys { + readonly taskIds: string; + task(taskId: string): string; +} + +export interface RedisTaskClient { + get(key: string): Promise<string | null>; + mget(...keys: string[]): Promise<(string | null)[]>; + set(key: string, value: string, mode?: 'NX' | 'XX'): Promise<'OK' | null>; + smembers(key: string): Promise<string[]>; + sadd(key: string, member: string): Promise<number>; + watch(...keys: string[]): Promise<'OK'>; + unwatch(): Promise<'OK'>; + multi(): RedisTaskTransaction; +} + +export interface RedisTaskTransaction { + set(key: string, value: string, mode?: 'NX' | 'XX'): RedisTaskTransaction; + sadd(key: string, member: string): RedisTaskTransaction; + exec(): Promise<readonly (readonly [Error | null, unknown])[] | null>; +} + +export interface RedisTaskRepositoryOptions { + readonly client: RedisTaskClient; + readonly keyPrefix?: string; + readonly now?: () => number; +} + +export interface ClaimTaskInput { + readonly agentId: string; + readonly ttlSeconds: number; +} + +export interface ReleaseTaskInput { + readonly agentId?: string; +} + +export interface HeartbeatTaskInput { + readonly agentId?: string; + readonly ttlSeconds?: number; +} + +export interface CompleteTaskInput { + readonly agentId?: string; + readonly summary?: string; +} + +export interface FailTaskInput { + readonly agentId?: string; + readonly reason: string; +} + +export class TaskAlreadyExistsError extends Error { + public constructor(taskId: string) { + super(`Task ${taskId} already exists.`); + this.name = 'TaskAlreadyExistsError'; + } +} + +export class TaskNotFoundError extends Error { + public constructor(taskId: string) { + super(`Task ${taskId} was not found.`); + this.name = 'TaskNotFoundError'; + } +} + +export class TaskSerializationError extends Error { + public constructor(taskId: string, message: string) { + super(`Unable to deserialize task ${taskId}: ${message}`); + this.name = 'TaskSerializationError'; + } +} + +export class TaskTransitionError extends Error { + public constructor(taskId: string, status: TaskStatus, action: string) { + super(`Task ${taskId} cannot transition from ${status} via ${action}.`); + this.name = 'TaskTransitionError'; + } +} + +export class TaskOwnershipError extends Error { + public constructor(taskId: string, expectedAgentId: string, actualAgentId: string) { + super( + `Task ${taskId} is owned by ${actualAgentId}, not ${expectedAgentId}.`, + ); + this.name = 'TaskOwnershipError'; + } +} + +export class TaskAtomicConflictError extends Error { + public constructor(taskId: string) { + super(`Task ${taskId} could not be updated atomically after multiple retries.`); + this.name = 'TaskAtomicConflictError'; + } +} + +export class RedisTaskRepository { + private readonly client: RedisTaskClient; + private readonly keys: RepositoryKeys; + private readonly now: () => number; + + public constructor(options: RedisTaskRepositoryOptions) { + this.client = options.client; + this.keys = buildRepositoryKeys(options.keyPrefix ?? DEFAULT_KEY_PREFIX); + this.now = options.now ?? Date.now; + } + + public async create(input: CreateTaskInput): Promise<Task> { + const timestamp = this.now(); + + const task: Task = { + id: input.taskId, + project: input.project, + mission: input.mission, + taskId: input.taskId, + title: input.title, + description: input.description, + status: 'pending', + priority: input.priority ?? 'medium', + dependencies: [...(input.dependencies ?? [])], + lane: input.lane ?? 'any', + retryCount: 0, + metadata: input.metadata, + createdAt: timestamp, + updatedAt: timestamp, + }; + + const taskKey = this.keys.task(task.taskId); + const serializedTask = JSON.stringify(task); + + for (let attempt = 0; attempt < MAX_ATOMIC_RETRIES; attempt += 1) { + await this.client.watch(taskKey); + + try { + const transaction = this.client.multi(); + transaction.set(taskKey, serializedTask, 'NX'); + transaction.sadd(this.keys.taskIds, task.taskId); + const execResult = await transaction.exec(); + + if (execResult === null) { + continue; + } + + const setResult = execResult[0]; + + if (setResult === undefined) { + throw new TaskAtomicConflictError(task.taskId); + } + + const [setError, setReply] = setResult; + + if (setError !== null) { + throw setError; + } + + if (setReply !== 'OK') { + throw new TaskAlreadyExistsError(task.taskId); + } + + const saddResult = execResult[1]; + if (saddResult !== undefined && saddResult[0] !== null) { + throw saddResult[0]; + } + + return task; + } finally { + await this.client.unwatch(); + } + } + + throw new TaskAlreadyExistsError(task.taskId); + } + + public async get(taskId: string): Promise<Task | null> { + const raw = await this.client.get(this.keys.task(taskId)); + + if (raw === null) { + return null; + } + + return deserializeTask(taskId, raw); + } + + public async list(filters: TaskListFilters = {}): Promise<Task[]> { + const taskIds = await this.client.smembers(this.keys.taskIds); + + if (taskIds.length === 0) { + return []; + } + + const taskKeys = taskIds.map((taskId) => this.keys.task(taskId)); + const records = await this.client.mget(...taskKeys); + const tasks: Task[] = []; + + for (const [index, rawTask] of records.entries()) { + if (rawTask === null || rawTask === undefined) { + continue; + } + + const taskId = taskIds[index]; + + if (taskId === undefined) { + continue; + } + + tasks.push(deserializeTask(taskId, rawTask)); + } + + return tasks + .filter((task) => + matchesFilters(task, { + project: filters.project, + mission: filters.mission, + status: filters.status, + }), + ) + .sort((left, right) => left.createdAt - right.createdAt); + } + + public async update(taskId: string, patch: TaskUpdateInput): Promise<Task> { + return this.mutateTaskAtomically(taskId, (existing, now) => { + assertUpdatePatchIsAllowed(taskId, existing, patch); + + return { + ...existing, + ...patch, + dependencies: + patch.dependencies === undefined ? existing.dependencies : [...patch.dependencies], + updatedAt: now, + }; + }); + } + + public async claim(taskId: string, input: ClaimTaskInput): Promise<Task> { + if (input.ttlSeconds <= 0) { + throw new Error(`Task ${taskId} claim ttl must be greater than 0 seconds.`); + } + + return this.mutateTaskAtomically(taskId, (existing, now) => { + if (!canClaimTask(existing, now)) { + throw new TaskTransitionError(taskId, existing.status, 'claim'); + } + + const base = withoutCompletionAndFailureFields(withoutClaimFields(existing)); + + return { + ...base, + status: 'claimed', + claimedBy: input.agentId, + claimedAt: now, + claimTTL: input.ttlSeconds, + updatedAt: now, + }; + }); + } + + public async release(taskId: string, input: ReleaseTaskInput = {}): Promise<Task> { + return this.mutateTaskAtomically(taskId, (existing, now) => { + if (!isClaimedLikeStatus(existing.status)) { + throw new TaskTransitionError(taskId, existing.status, 'release'); + } + + assertTaskOwnership(taskId, existing, input.agentId); + + const base = withoutClaimFields(existing); + + return { + ...base, + status: 'pending', + updatedAt: now, + }; + }); + } + + public async heartbeat( + taskId: string, + input: HeartbeatTaskInput = {}, + ): Promise<Task> { + return this.mutateTaskAtomically(taskId, (existing, now) => { + if (!isClaimedLikeStatus(existing.status)) { + throw new TaskTransitionError(taskId, existing.status, 'heartbeat'); + } + + if (isClaimExpired(existing, now)) { + throw new TaskTransitionError(taskId, existing.status, 'heartbeat'); + } + + assertTaskOwnership(taskId, existing, input.agentId); + + const ttl = input.ttlSeconds ?? existing.claimTTL; + + if (ttl === undefined || ttl <= 0) { + throw new TaskTransitionError(taskId, existing.status, 'heartbeat'); + } + + return { + ...existing, + claimedAt: now, + claimTTL: ttl, + updatedAt: now, + }; + }); + } + + public async complete( + taskId: string, + input: CompleteTaskInput = {}, + ): Promise<Task> { + return this.mutateTaskAtomically(taskId, (existing, now) => { + if (!isClaimedLikeStatus(existing.status)) { + throw new TaskTransitionError(taskId, existing.status, 'complete'); + } + + assertTaskOwnership(taskId, existing, input.agentId); + + const base = withoutCompletionAndFailureFields(withoutClaimFields(existing)); + + return { + ...base, + status: 'completed', + completedAt: now, + ...(input.summary === undefined ? {} : { completionSummary: input.summary }), + updatedAt: now, + }; + }); + } + + public async fail(taskId: string, input: FailTaskInput): Promise<Task> { + return this.mutateTaskAtomically(taskId, (existing, now) => { + if (!isClaimedLikeStatus(existing.status)) { + throw new TaskTransitionError(taskId, existing.status, 'fail'); + } + + assertTaskOwnership(taskId, existing, input.agentId); + + const base = withoutCompletionAndFailureFields(withoutClaimFields(existing)); + + return { + ...base, + status: 'failed', + failedAt: now, + failureReason: input.reason, + retryCount: existing.retryCount + 1, + updatedAt: now, + }; + }); + } + + private async mutateTaskAtomically( + taskId: string, + mutation: (existing: Task, now: number) => Task, + ): Promise<Task> { + const taskKey = this.keys.task(taskId); + + for (let attempt = 0; attempt < MAX_ATOMIC_RETRIES; attempt += 1) { + await this.client.watch(taskKey); + + try { + const raw = await this.client.get(taskKey); + + if (raw === null) { + throw new TaskNotFoundError(taskId); + } + + const existing = deserializeTask(taskId, raw); + const updated = mutation(existing, this.now()); + + const transaction = this.client.multi(); + transaction.set(taskKey, JSON.stringify(updated), 'XX'); + transaction.sadd(this.keys.taskIds, taskId); + const execResult = await transaction.exec(); + + if (execResult === null) { + continue; + } + + const setResult = execResult[0]; + if (setResult === undefined) { + throw new TaskAtomicConflictError(taskId); + } + + const [setError, setReply] = setResult; + + if (setError !== null) { + throw setError; + } + + if (setReply !== 'OK') { + throw new TaskNotFoundError(taskId); + } + + const saddResult = execResult[1]; + if (saddResult !== undefined && saddResult[0] !== null) { + throw saddResult[0]; + } + + return updated; + } finally { + await this.client.unwatch(); + } + } + + throw new TaskAtomicConflictError(taskId); + } +} + +function matchesFilters(task: Task, filters: TaskListFilters): boolean { + if (filters.project !== undefined && task.project !== filters.project) { + return false; + } + + if (filters.mission !== undefined && task.mission !== filters.mission) { + return false; + } + + if (filters.status !== undefined && task.status !== filters.status) { + return false; + } + + return true; +} + +function assertUpdatePatchIsAllowed(taskId: string, task: Task, patch: TaskUpdateInput): void { + if (patch.status !== undefined && !canTransitionStatusViaUpdate(task.status, patch.status)) { + throw new TaskTransitionError(taskId, task.status, 'update'); + } + + if ( + patch.claimedBy !== undefined || + patch.claimedAt !== undefined || + patch.claimTTL !== undefined || + patch.completedAt !== undefined || + patch.failedAt !== undefined || + patch.failureReason !== undefined || + patch.completionSummary !== undefined || + patch.retryCount !== undefined + ) { + throw new TaskTransitionError(taskId, task.status, 'update'); + } +} + +function canTransitionStatusViaUpdate(from: TaskStatus, to: TaskStatus): boolean { + if (from === to) { + return true; + } + + return UPDATE_ALLOWED_STATUS_TRANSITIONS[from].includes(to); +} + +function canClaimTask(task: Task, now: number): boolean { + if (task.status === 'pending') { + return true; + } + + if (!isClaimedLikeStatus(task.status)) { + return false; + } + + return isClaimExpired(task, now); +} + +function isClaimedLikeStatus(status: TaskStatus): boolean { + return status === 'claimed' || status === 'in-progress'; +} + +function isClaimExpired(task: Task, now: number): boolean { + if (task.claimedAt === undefined || task.claimTTL === undefined) { + return false; + } + + return task.claimedAt + task.claimTTL * 1000 <= now; +} + +function assertTaskOwnership( + taskId: string, + task: Task, + expectedAgentId: string | undefined, +): void { + if (expectedAgentId === undefined || task.claimedBy === undefined) { + return; + } + + if (task.claimedBy !== expectedAgentId) { + throw new TaskOwnershipError(taskId, expectedAgentId, task.claimedBy); + } +} + +type TaskWithoutClaimFields = Omit<Task, 'claimedBy' | 'claimedAt' | 'claimTTL'>; +type TaskWithoutCompletionAndFailureFields = Omit< + Task, + 'completedAt' | 'failedAt' | 'failureReason' | 'completionSummary' +>; + +function withoutClaimFields(task: Task): TaskWithoutClaimFields { + const draft: Partial<Task> = { ...task }; + delete draft.claimedBy; + delete draft.claimedAt; + delete draft.claimTTL; + return draft as TaskWithoutClaimFields; +} + +function withoutCompletionAndFailureFields( + task: TaskWithoutClaimFields, +): TaskWithoutCompletionAndFailureFields { + const draft: Partial<TaskWithoutClaimFields> = { ...task }; + delete draft.completedAt; + delete draft.failedAt; + delete draft.failureReason; + delete draft.completionSummary; + return draft as TaskWithoutCompletionAndFailureFields; +} + +function deserializeTask(taskId: string, raw: string): Task { + let parsed: unknown; + + try { + parsed = JSON.parse(raw); + } catch (error) { + throw new TaskSerializationError( + taskId, + error instanceof Error ? error.message : 'invalid JSON', + ); + } + + if (!isRecord(parsed)) { + throw new TaskSerializationError(taskId, 'task payload is not an object'); + } + + const requiredStringKeys = ['id', 'project', 'mission', 'taskId', 'title'] as const; + const requiredNumberKeys = ['retryCount', 'createdAt', 'updatedAt'] as const; + + for (const key of requiredStringKeys) { + if (typeof parsed[key] !== 'string') { + throw new TaskSerializationError(taskId, `missing string field "${key}"`); + } + } + + for (const key of requiredNumberKeys) { + if (typeof parsed[key] !== 'number') { + throw new TaskSerializationError(taskId, `missing numeric field "${key}"`); + } + } + + if (!STATUS_SET.has(parsed.status as TaskStatus)) { + throw new TaskSerializationError(taskId, 'invalid status value'); + } + + if (!PRIORITY_SET.has(parsed.priority as TaskPriority)) { + throw new TaskSerializationError(taskId, 'invalid priority value'); + } + + if (!LANE_SET.has(parsed.lane as TaskLane)) { + throw new TaskSerializationError(taskId, 'invalid lane value'); + } + + if (!Array.isArray(parsed.dependencies)) { + throw new TaskSerializationError(taskId, 'dependencies must be an array'); + } + + if (!parsed.dependencies.every((dependency): dependency is string => typeof dependency === 'string')) { + throw new TaskSerializationError(taskId, 'dependencies must contain only strings'); + } + + return { + id: parsed.id, + project: parsed.project, + mission: parsed.mission, + taskId: parsed.taskId, + title: parsed.title, + status: parsed.status, + priority: parsed.priority, + dependencies: parsed.dependencies, + lane: parsed.lane, + retryCount: parsed.retryCount, + createdAt: parsed.createdAt, + updatedAt: parsed.updatedAt, + ...(typeof parsed.description === 'string' + ? { description: parsed.description } + : {}), + ...(typeof parsed.claimedBy === 'string' ? { claimedBy: parsed.claimedBy } : {}), + ...(typeof parsed.claimedAt === 'number' ? { claimedAt: parsed.claimedAt } : {}), + ...(typeof parsed.claimTTL === 'number' ? { claimTTL: parsed.claimTTL } : {}), + ...(typeof parsed.completedAt === 'number' + ? { completedAt: parsed.completedAt } + : {}), + ...(typeof parsed.failedAt === 'number' ? { failedAt: parsed.failedAt } : {}), + ...(typeof parsed.failureReason === 'string' + ? { failureReason: parsed.failureReason } + : {}), + ...(typeof parsed.completionSummary === 'string' + ? { completionSummary: parsed.completionSummary } + : {}), + ...(isRecord(parsed.metadata) ? { metadata: parsed.metadata } : {}), + }; +} + +function buildRepositoryKeys(keyPrefix: string): RepositoryKeys { + return { + taskIds: `${keyPrefix}:task-ids`, + task: (taskId: string) => `${keyPrefix}:task:${taskId}`, + }; +} + +function isRecord(value: unknown): value is Record<string, unknown> { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} diff --git a/packages/queue/src/task.ts b/packages/queue/src/task.ts new file mode 100644 index 0000000..5d2662a --- /dev/null +++ b/packages/queue/src/task.ts @@ -0,0 +1,12 @@ +export const TASK_STATUSES = [ + 'pending', + 'claimed', + 'in-progress', + 'completed', + 'failed', + 'blocked', +] as const; + +export const TASK_PRIORITIES = ['critical', 'high', 'medium', 'low'] as const; + +export const TASK_LANES = ['planning', 'coding', 'any'] as const; diff --git a/packages/queue/tests/cli.test.ts b/packages/queue/tests/cli.test.ts new file mode 100644 index 0000000..7101eac --- /dev/null +++ b/packages/queue/tests/cli.test.ts @@ -0,0 +1,218 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { runQueueCli, type QueueCliDependencies, type QueueRepository } from '../src/cli.js'; + +function createRepositoryMock(): QueueRepository { + return { + create: vi.fn(() => + Promise.resolve({ + id: 'MQ-005', + project: 'queue', + mission: 'phase1', + taskId: 'MQ-005', + title: 'Build CLI', + status: 'pending', + priority: 'medium', + dependencies: [], + lane: 'any', + retryCount: 0, + createdAt: 1, + updatedAt: 1, + }), + ), + list: vi.fn(() => Promise.resolve([])), + get: vi.fn(() => Promise.resolve(null)), + claim: vi.fn(() => + Promise.resolve({ + id: 'MQ-005', + project: 'queue', + mission: 'phase1', + taskId: 'MQ-005', + title: 'Build CLI', + status: 'claimed', + priority: 'medium', + dependencies: [], + lane: 'any', + claimedBy: 'agent-a', + claimedAt: 2, + claimTTL: 60, + retryCount: 0, + createdAt: 1, + updatedAt: 2, + }), + ), + release: vi.fn(() => + Promise.resolve({ + id: 'MQ-005', + project: 'queue', + mission: 'phase1', + taskId: 'MQ-005', + title: 'Build CLI', + status: 'pending', + priority: 'medium', + dependencies: [], + lane: 'any', + retryCount: 0, + createdAt: 1, + updatedAt: 3, + }), + ), + complete: vi.fn(() => + Promise.resolve({ + id: 'MQ-005', + project: 'queue', + mission: 'phase1', + taskId: 'MQ-005', + title: 'Build CLI', + status: 'completed', + priority: 'medium', + dependencies: [], + lane: 'any', + completionSummary: 'done', + retryCount: 0, + createdAt: 1, + updatedAt: 4, + completedAt: 4, + }), + ), + }; +} + +function createDependencies( + repository: QueueRepository, +): QueueCliDependencies & { outputs: string[]; errors: string[] } { + const outputs: string[] = []; + const errors: string[] = []; + const close = vi.fn(() => Promise.resolve(undefined)); + + return { + openSession: () => + Promise.resolve({ + repository, + close, + }), + stdout: (line) => { + outputs.push(line); + }, + stderr: (line) => { + errors.push(line); + }, + outputs, + errors, + }; +} + +describe('runQueueCli', () => { + it('creates a task from command options', async () => { + const repository = createRepositoryMock(); + const dependencies = createDependencies(repository); + + const exitCode = await runQueueCli( + [ + 'node', + 'mosaic', + 'queue', + 'create', + 'queue', + 'phase1', + 'MQ-005', + '--title', + 'Build CLI', + '--priority', + 'high', + '--lane', + 'coding', + '--dependency', + 'MQ-002', + 'MQ-003', + ], + dependencies, + ); + + expect(exitCode).toBe(0); + expect(repository.create).toHaveBeenCalledWith({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-005', + title: 'Build CLI', + description: undefined, + priority: 'high', + dependencies: ['MQ-002', 'MQ-003'], + lane: 'coding', + }); + }); + + it('lists tasks with filters', async () => { + const repository = createRepositoryMock(); + const dependencies = createDependencies(repository); + + const exitCode = await runQueueCli( + [ + 'node', + 'mosaic', + 'queue', + 'list', + '--project', + 'queue', + '--mission', + 'phase1', + '--status', + 'pending', + ], + dependencies, + ); + + expect(exitCode).toBe(0); + expect(repository.list).toHaveBeenCalledWith({ + project: 'queue', + mission: 'phase1', + status: 'pending', + }); + }); + + it('claims and completes tasks with typed options', async () => { + const repository = createRepositoryMock(); + const dependencies = createDependencies(repository); + + const claimExitCode = await runQueueCli( + [ + 'node', + 'mosaic', + 'queue', + 'claim', + 'MQ-005', + '--agent', + 'agent-a', + '--ttl', + '60', + ], + dependencies, + ); + + const completeExitCode = await runQueueCli( + [ + 'node', + 'mosaic', + 'queue', + 'complete', + 'MQ-005', + '--agent', + 'agent-a', + '--summary', + 'done', + ], + dependencies, + ); + + expect(claimExitCode).toBe(0); + expect(completeExitCode).toBe(0); + expect(repository.claim).toHaveBeenCalledWith('MQ-005', { + agentId: 'agent-a', + ttlSeconds: 60, + }); + expect(repository.complete).toHaveBeenCalledWith('MQ-005', { + agentId: 'agent-a', + summary: 'done', + }); + }); +}); diff --git a/packages/queue/tests/mcp-server.test.ts b/packages/queue/tests/mcp-server.test.ts new file mode 100644 index 0000000..4d92867 --- /dev/null +++ b/packages/queue/tests/mcp-server.test.ts @@ -0,0 +1,50 @@ +import { describe, expect, it } from 'vitest'; + +import { + QUEUE_MCP_TOOL_DEFINITIONS, + buildQueueMcpServer, +} from '../src/mcp-server.js'; + +describe('queue MCP server', () => { + it('declares all required phase-1 tools', () => { + const toolNames = QUEUE_MCP_TOOL_DEFINITIONS.map((tool) => tool.name).sort(); + + expect(toolNames).toEqual([ + 'queue_claim', + 'queue_complete', + 'queue_fail', + 'queue_get', + 'queue_heartbeat', + 'queue_list', + 'queue_release', + 'queue_status', + ]); + }); + + it('builds an MCP server instance', () => { + const server = buildQueueMcpServer({ + openSession: () => + Promise.resolve({ + repository: { + list: () => Promise.resolve([]), + get: () => Promise.resolve(null), + claim: () => Promise.reject(new Error('not implemented')), + heartbeat: () => Promise.reject(new Error('not implemented')), + release: () => Promise.reject(new Error('not implemented')), + complete: () => Promise.reject(new Error('not implemented')), + fail: () => Promise.reject(new Error('not implemented')), + }, + checkHealth: () => + Promise.resolve({ + checkedAt: 1, + latencyMs: 0, + ok: true, + response: 'PONG', + }), + close: () => Promise.resolve(), + }), + }); + + expect(server).toBeDefined(); + }); +}); diff --git a/packages/queue/tests/mcp-tool-schemas.test.ts b/packages/queue/tests/mcp-tool-schemas.test.ts new file mode 100644 index 0000000..0593cf9 --- /dev/null +++ b/packages/queue/tests/mcp-tool-schemas.test.ts @@ -0,0 +1,90 @@ +import { describe, expect, it } from 'vitest'; + +import { + queueClaimToolInputSchema, + queueCompleteToolInputSchema, + queueFailToolInputSchema, + queueGetToolInputSchema, + queueHeartbeatToolInputSchema, + queueListToolInputSchema, + queueReleaseToolInputSchema, + queueStatusToolInputSchema, +} from '../src/mcp-tool-schemas.js'; + +describe('MCP tool schemas', () => { + it('validates queue_list filters', () => { + const parsed = queueListToolInputSchema.parse({ + project: 'queue', + mission: 'phase1', + status: 'pending', + }); + + expect(parsed).toEqual({ + project: 'queue', + mission: 'phase1', + status: 'pending', + }); + }); + + it('requires a taskId for queue_get', () => { + expect(() => queueGetToolInputSchema.parse({})).toThrowError(); + }); + + it('requires positive ttlSeconds for queue_claim', () => { + expect(() => + queueClaimToolInputSchema.parse({ + taskId: 'MQ-007', + agentId: 'agent-a', + ttlSeconds: 0, + }), + ).toThrowError(); + }); + + it('accepts optional fields for queue_heartbeat and queue_release', () => { + const heartbeat = queueHeartbeatToolInputSchema.parse({ + taskId: 'MQ-007', + ttlSeconds: 30, + }); + + const release = queueReleaseToolInputSchema.parse({ + taskId: 'MQ-007', + }); + + expect(heartbeat).toEqual({ + taskId: 'MQ-007', + ttlSeconds: 30, + }); + expect(release).toEqual({ + taskId: 'MQ-007', + }); + }); + + it('validates queue_complete and queue_fail payloads', () => { + const complete = queueCompleteToolInputSchema.parse({ + taskId: 'MQ-007', + agentId: 'agent-a', + summary: 'done', + }); + + const fail = queueFailToolInputSchema.parse({ + taskId: 'MQ-007', + reason: 'boom', + }); + + expect(complete).toEqual({ + taskId: 'MQ-007', + agentId: 'agent-a', + summary: 'done', + }); + expect(fail).toEqual({ + taskId: 'MQ-007', + reason: 'boom', + }); + }); + + it('accepts an empty payload for queue_status', () => { + const parsed = queueStatusToolInputSchema.parse({}); + + expect(parsed).toEqual({}); + }); +}); diff --git a/packages/queue/tests/redis-connection.test.ts b/packages/queue/tests/redis-connection.test.ts new file mode 100644 index 0000000..3a4b072 --- /dev/null +++ b/packages/queue/tests/redis-connection.test.ts @@ -0,0 +1,76 @@ +import { describe, expect, it } from 'vitest'; + +import { + createRedisClient, + resolveRedisUrl, + runRedisHealthCheck, +} from '../src/redis-connection.js'; + +describe('resolveRedisUrl', () => { + it('prefers VALKEY_URL when both env vars are present', () => { + const url = resolveRedisUrl({ + VALKEY_URL: 'redis://valkey.local:6379', + REDIS_URL: 'redis://redis.local:6379', + }); + + expect(url).toBe('redis://valkey.local:6379'); + }); + + it('falls back to REDIS_URL when VALKEY_URL is missing', () => { + const url = resolveRedisUrl({ + REDIS_URL: 'redis://redis.local:6379', + }); + + expect(url).toBe('redis://redis.local:6379'); + }); + + it('throws loudly when no redis environment variable exists', () => { + expect(() => resolveRedisUrl({})).toThrowError( + /Missing required Valkey\/Redis connection URL/i, + ); + }); +}); + +describe('createRedisClient', () => { + it('uses env URL for client creation with no hardcoded defaults', () => { + class FakeRedis { + public readonly url: string; + + public constructor(url: string) { + this.url = url; + } + } + + const client = createRedisClient({ + env: { + VALKEY_URL: 'redis://queue.local:6379', + }, + redisConstructor: FakeRedis, + }); + + expect(client.url).toBe('redis://queue.local:6379'); + }); +}); + +describe('runRedisHealthCheck', () => { + it('returns healthy status when ping succeeds', async () => { + const health = await runRedisHealthCheck({ + ping: () => Promise.resolve('PONG'), + }); + + expect(health.ok).toBe(true); + expect(health.response).toBe('PONG'); + expect(health.latencyMs).toBeTypeOf('number'); + expect(health.latencyMs).toBeGreaterThanOrEqual(0); + }); + + it('returns unhealthy status when ping fails', async () => { + const health = await runRedisHealthCheck({ + ping: () => Promise.reject(new Error('connection refused')), + }); + + expect(health.ok).toBe(false); + expect(health.error).toMatch(/connection refused/i); + expect(health.latencyMs).toBeTypeOf('number'); + }); +}); diff --git a/packages/queue/tests/smoke.test.ts b/packages/queue/tests/smoke.test.ts new file mode 100644 index 0000000..578bebd --- /dev/null +++ b/packages/queue/tests/smoke.test.ts @@ -0,0 +1,9 @@ +import { describe, expect, it } from 'vitest'; + +import { packageVersion } from '../src/index.js'; + +describe('package bootstrap', () => { + it('exposes package version constant', () => { + expect(packageVersion).toBe('0.1.0'); + }); +}); diff --git a/packages/queue/tests/task-atomic.test.ts b/packages/queue/tests/task-atomic.test.ts new file mode 100644 index 0000000..2646f6d --- /dev/null +++ b/packages/queue/tests/task-atomic.test.ts @@ -0,0 +1,459 @@ +import { describe, expect, it } from 'vitest'; + +import { + RedisTaskRepository, + TaskAlreadyExistsError, + TaskOwnershipError, + TaskTransitionError, + type RedisTaskClient, + type RedisTaskTransaction, +} from '../src/task-repository.js'; + +type QueuedOperation = + | { + readonly type: 'set'; + readonly key: string; + readonly value: string; + readonly mode?: 'NX' | 'XX'; + } + | { + readonly type: 'sadd'; + readonly key: string; + readonly member: string; + }; + +class InMemoryRedisBackend { + public readonly kv = new Map<string, string>(); + public readonly sets = new Map<string, Set<string>>(); + public readonly revisions = new Map<string, number>(); + + public getRevision(key: string): number { + return this.revisions.get(key) ?? 0; + } + + public bumpRevision(key: string): void { + this.revisions.set(key, this.getRevision(key) + 1); + } +} + +class InMemoryRedisTransaction implements RedisTaskTransaction { + private readonly operations: QueuedOperation[] = []; + + public constructor( + private readonly backend: InMemoryRedisBackend, + private readonly watchedRevisions: ReadonlyMap<string, number>, + ) {} + + public set(key: string, value: string, mode?: 'NX' | 'XX'): RedisTaskTransaction { + this.operations.push({ + type: 'set', + key, + value, + mode, + }); + return this; + } + + public sadd(key: string, member: string): RedisTaskTransaction { + this.operations.push({ + type: 'sadd', + key, + member, + }); + return this; + } + + public exec(): Promise<readonly (readonly [Error | null, unknown])[] | null> { + for (const [key, revision] of this.watchedRevisions.entries()) { + if (this.backend.getRevision(key) !== revision) { + return Promise.resolve(null); + } + } + + const results: (readonly [Error | null, unknown])[] = []; + + for (const operation of this.operations) { + if (operation.type === 'set') { + const exists = this.backend.kv.has(operation.key); + if (operation.mode === 'NX' && exists) { + results.push([null, null]); + continue; + } + + if (operation.mode === 'XX' && !exists) { + results.push([null, null]); + continue; + } + + this.backend.kv.set(operation.key, operation.value); + this.backend.bumpRevision(operation.key); + results.push([null, 'OK']); + continue; + } + + const set = this.backend.sets.get(operation.key) ?? new Set<string>(); + const before = set.size; + + set.add(operation.member); + this.backend.sets.set(operation.key, set); + this.backend.bumpRevision(operation.key); + results.push([null, set.size === before ? 0 : 1]); + } + + return Promise.resolve(results); + } +} + +class InMemoryAtomicRedisClient implements RedisTaskClient { + private watchedRevisions = new Map<string, number>(); + + public constructor(private readonly backend: InMemoryRedisBackend) {} + + public get(key: string): Promise<string | null> { + return Promise.resolve(this.backend.kv.get(key) ?? null); + } + + public mget(...keys: string[]): Promise<(string | null)[]> { + return Promise.resolve(keys.map((key) => this.backend.kv.get(key) ?? null)); + } + + public set( + key: string, + value: string, + mode?: 'NX' | 'XX', + ): Promise<'OK' | null> { + const exists = this.backend.kv.has(key); + + if (mode === 'NX' && exists) { + return Promise.resolve(null); + } + + if (mode === 'XX' && !exists) { + return Promise.resolve(null); + } + + this.backend.kv.set(key, value); + this.backend.bumpRevision(key); + + return Promise.resolve('OK'); + } + + public smembers(key: string): Promise<string[]> { + return Promise.resolve([...(this.backend.sets.get(key) ?? new Set<string>())]); + } + + public sadd(key: string, member: string): Promise<number> { + const values = this.backend.sets.get(key) ?? new Set<string>(); + const before = values.size; + + values.add(member); + this.backend.sets.set(key, values); + this.backend.bumpRevision(key); + + return Promise.resolve(values.size === before ? 0 : 1); + } + + public watch(...keys: string[]): Promise<'OK'> { + this.watchedRevisions = new Map( + keys.map((key) => [key, this.backend.getRevision(key)]), + ); + return Promise.resolve('OK'); + } + + public unwatch(): Promise<'OK'> { + this.watchedRevisions.clear(); + return Promise.resolve('OK'); + } + + public multi(): RedisTaskTransaction { + const watchedSnapshot = new Map(this.watchedRevisions); + this.watchedRevisions.clear(); + return new InMemoryRedisTransaction(this.backend, watchedSnapshot); + } +} + +class StrictAtomicRedisClient extends InMemoryAtomicRedisClient { + public override set( + key: string, + value: string, + mode?: 'NX' | 'XX', + ): Promise<'OK' | null> { + void key; + void value; + void mode; + throw new Error('Direct set() is not allowed in strict atomic tests.'); + } + + public override sadd(key: string, member: string): Promise<number> { + void key; + void member; + throw new Error('Direct sadd() is not allowed in strict atomic tests.'); + } +} + +function createRepositoryPair(now: () => number): [RedisTaskRepository, RedisTaskRepository] { + const backend = new InMemoryRedisBackend(); + + return [ + new RedisTaskRepository({ + client: new InMemoryAtomicRedisClient(backend), + now, + }), + new RedisTaskRepository({ + client: new InMemoryAtomicRedisClient(backend), + now, + }), + ]; +} + +function createStrictRepositoryPair( + now: () => number, +): [RedisTaskRepository, RedisTaskRepository] { + const backend = new InMemoryRedisBackend(); + + return [ + new RedisTaskRepository({ + client: new StrictAtomicRedisClient(backend), + now, + }), + new RedisTaskRepository({ + client: new StrictAtomicRedisClient(backend), + now, + }), + ]; +} + +describe('RedisTaskRepository atomic transitions', () => { + it('creates atomically under concurrent create race', async () => { + const [repositoryA, repositoryB] = createStrictRepositoryPair( + () => 1_700_000_000_000, + ); + + const [createA, createB] = await Promise.allSettled([ + repositoryA.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-004-CREATE', + title: 'create race', + }), + repositoryB.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-004-CREATE', + title: 'create race duplicate', + }), + ]); + + const fulfilled = [createA, createB].filter( + (result) => result.status === 'fulfilled', + ); + const rejected = [createA, createB].filter( + (result) => result.status === 'rejected', + ); + + expect(fulfilled).toHaveLength(1); + expect(rejected).toHaveLength(1); + expect(rejected[0]?.reason).toBeInstanceOf(TaskAlreadyExistsError); + }); + + it('claims a pending task once and blocks concurrent double-claim', async () => { + let timestamp = 1_700_000_000_000; + const now = (): number => timestamp; + const [repositoryA, repositoryB] = createRepositoryPair(now); + + await repositoryA.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-004', + title: 'Atomic claim', + }); + + const [claimA, claimB] = await Promise.allSettled([ + repositoryA.claim('MQ-004', { agentId: 'agent-a', ttlSeconds: 60 }), + repositoryB.claim('MQ-004', { agentId: 'agent-b', ttlSeconds: 60 }), + ]); + + const fulfilled = [claimA, claimB].filter((result) => result.status === 'fulfilled'); + const rejected = [claimA, claimB].filter((result) => result.status === 'rejected'); + + expect(fulfilled).toHaveLength(1); + expect(rejected).toHaveLength(1); + }); + + it('allows claim takeover after TTL expiry', async () => { + let timestamp = 1_700_000_000_000; + const now = (): number => timestamp; + const [repositoryA, repositoryB] = createRepositoryPair(now); + + await repositoryA.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-004-EXP', + title: 'TTL expiry', + }); + + await repositoryA.claim('MQ-004-EXP', { + agentId: 'agent-a', + ttlSeconds: 1, + }); + + timestamp += 2_000; + + const takeover = await repositoryB.claim('MQ-004-EXP', { + agentId: 'agent-b', + ttlSeconds: 60, + }); + + expect(takeover.claimedBy).toBe('agent-b'); + }); + + it('releases a claimed task back to pending', async () => { + const [repository] = createRepositoryPair(() => 1_700_000_000_000); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-004-REL', + title: 'Release test', + }); + + await repository.claim('MQ-004-REL', { + agentId: 'agent-a', + ttlSeconds: 60, + }); + + const released = await repository.release('MQ-004-REL', { + agentId: 'agent-a', + }); + + expect(released.status).toBe('pending'); + expect(released.claimedBy).toBeUndefined(); + expect(released.claimedAt).toBeUndefined(); + }); + + it('heartbeats, completes, and fails with valid transitions', async () => { + let timestamp = 1_700_000_000_000; + const now = (): number => timestamp; + const [repository] = createRepositoryPair(now); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-004-HCF', + title: 'Transition test', + }); + + await repository.claim('MQ-004-HCF', { + agentId: 'agent-a', + ttlSeconds: 60, + }); + + timestamp += 1_000; + const heartbeat = await repository.heartbeat('MQ-004-HCF', { + agentId: 'agent-a', + ttlSeconds: 120, + }); + expect(heartbeat.claimTTL).toBe(120); + expect(heartbeat.claimedAt).toBe(1_700_000_001_000); + + const completed = await repository.complete('MQ-004-HCF', { + agentId: 'agent-a', + summary: 'done', + }); + expect(completed.status).toBe('completed'); + expect(completed.completionSummary).toBe('done'); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-004-FAIL', + title: 'Failure test', + }); + + await repository.claim('MQ-004-FAIL', { + agentId: 'agent-a', + ttlSeconds: 60, + }); + + const failed = await repository.fail('MQ-004-FAIL', { + agentId: 'agent-a', + reason: 'boom', + }); + + expect(failed.status).toBe('failed'); + expect(failed.failureReason).toBe('boom'); + expect(failed.retryCount).toBe(1); + }); + + it('rejects invalid transitions', async () => { + const [repository] = createRepositoryPair(() => 1_700_000_000_000); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-004-INV', + title: 'Invalid transitions', + }); + + await expect( + repository.complete('MQ-004-INV', { + agentId: 'agent-a', + }), + ).rejects.toBeInstanceOf(TaskTransitionError); + }); + + it('enforces claim ownership for release and complete', async () => { + const [repository] = createRepositoryPair(() => 1_700_000_000_000); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-004-OWN', + title: 'Ownership checks', + }); + + await repository.claim('MQ-004-OWN', { + agentId: 'agent-a', + ttlSeconds: 60, + }); + + await expect( + repository.release('MQ-004-OWN', { + agentId: 'agent-b', + }), + ).rejects.toBeInstanceOf(TaskOwnershipError); + + await expect( + repository.complete('MQ-004-OWN', { + agentId: 'agent-b', + }), + ).rejects.toBeInstanceOf(TaskOwnershipError); + }); + + it('merges concurrent non-conflicting update patches atomically', async () => { + const [repositoryA, repositoryB] = createRepositoryPair(() => 1_700_000_000_000); + + await repositoryA.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-004-UPD', + title: 'Original title', + description: 'Original description', + }); + + await Promise.all([ + repositoryA.update('MQ-004-UPD', { + title: 'Updated title', + }), + repositoryB.update('MQ-004-UPD', { + description: 'Updated description', + }), + ]); + + const latest = await repositoryA.get('MQ-004-UPD'); + + expect(latest).not.toBeNull(); + expect(latest?.title).toBe('Updated title'); + expect(latest?.description).toBe('Updated description'); + }); +}); diff --git a/packages/queue/tests/task-repository.test.ts b/packages/queue/tests/task-repository.test.ts new file mode 100644 index 0000000..d0232fc --- /dev/null +++ b/packages/queue/tests/task-repository.test.ts @@ -0,0 +1,332 @@ +import { describe, expect, it } from 'vitest'; + +import { + RedisTaskRepository, + TaskAlreadyExistsError, + TaskTransitionError, + type RedisTaskClient, + type RedisTaskTransaction, +} from '../src/task-repository.js'; + +class NoopRedisTransaction implements RedisTaskTransaction { + private readonly operations: ( + | { + readonly type: 'set'; + readonly key: string; + readonly value: string; + readonly mode?: 'NX' | 'XX'; + } + | { + readonly type: 'sadd'; + readonly key: string; + readonly member: string; + } + )[] = []; + + public constructor( + private readonly kv: Map<string, string>, + private readonly sets: Map<string, Set<string>>, + ) {} + + public set(key: string, value: string, mode?: 'NX' | 'XX'): RedisTaskTransaction { + this.operations.push({ + type: 'set', + key, + value, + mode, + }); + return this; + } + + public sadd(key: string, member: string): RedisTaskTransaction { + this.operations.push({ + type: 'sadd', + key, + member, + }); + return this; + } + + public exec(): Promise<readonly (readonly [Error | null, unknown])[] | null> { + const results: (readonly [Error | null, unknown])[] = []; + + for (const operation of this.operations) { + if (operation.type === 'set') { + const exists = this.kv.has(operation.key); + + if (operation.mode === 'NX' && exists) { + results.push([null, null]); + continue; + } + + if (operation.mode === 'XX' && !exists) { + results.push([null, null]); + continue; + } + + this.kv.set(operation.key, operation.value); + results.push([null, 'OK']); + continue; + } + + const values = this.sets.get(operation.key) ?? new Set<string>(); + const beforeSize = values.size; + + values.add(operation.member); + this.sets.set(operation.key, values); + results.push([null, values.size === beforeSize ? 0 : 1]); + } + + return Promise.resolve(results); + } +} + +class InMemoryRedisClient implements RedisTaskClient { + private readonly kv = new Map<string, string>(); + private readonly sets = new Map<string, Set<string>>(); + + public get(key: string): Promise<string | null> { + return Promise.resolve(this.kv.get(key) ?? null); + } + + public mget(...keys: string[]): Promise<(string | null)[]> { + return Promise.resolve(keys.map((key) => this.kv.get(key) ?? null)); + } + + public set( + key: string, + value: string, + mode?: 'NX' | 'XX', + ): Promise<'OK' | null> { + const exists = this.kv.has(key); + + if (mode === 'NX' && exists) { + return Promise.resolve(null); + } + + if (mode === 'XX' && !exists) { + return Promise.resolve(null); + } + + this.kv.set(key, value); + return Promise.resolve('OK'); + } + + public smembers(key: string): Promise<string[]> { + return Promise.resolve([...(this.sets.get(key) ?? new Set<string>())]); + } + + public sadd(key: string, member: string): Promise<number> { + const values = this.sets.get(key) ?? new Set<string>(); + const beforeSize = values.size; + + values.add(member); + this.sets.set(key, values); + + return Promise.resolve(values.size === beforeSize ? 0 : 1); + } + + public watch(): Promise<'OK'> { + return Promise.resolve('OK'); + } + + public unwatch(): Promise<'OK'> { + return Promise.resolve('OK'); + } + + public multi(): RedisTaskTransaction { + return new NoopRedisTransaction(this.kv, this.sets); + } +} + +class MgetTrackingRedisClient extends InMemoryRedisClient { + public getCalls = 0; + public mgetCalls = 0; + public lastMgetKeys: string[] = []; + + public override get(key: string): Promise<string | null> { + this.getCalls += 1; + return super.get(key); + } + + public override mget(...keys: string[]): Promise<(string | null)[]> { + this.mgetCalls += 1; + this.lastMgetKeys = [...keys]; + return super.mget(...keys); + } +} + +describe('RedisTaskRepository CRUD', () => { + it('creates and fetches a task with defaults', async () => { + const repository = new RedisTaskRepository({ + client: new InMemoryRedisClient(), + now: () => 1_700_000_000_000, + }); + + const created = await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003', + title: 'Implement task CRUD', + }); + + const fetched = await repository.get('MQ-003'); + + expect(created.id).toBe('MQ-003'); + expect(created.status).toBe('pending'); + expect(created.priority).toBe('medium'); + expect(created.lane).toBe('any'); + expect(created.dependencies).toEqual([]); + expect(created.createdAt).toBe(1_700_000_000_000); + expect(fetched).toEqual(created); + }); + + it('throws when creating a duplicate task id', async () => { + const repository = new RedisTaskRepository({ + client: new InMemoryRedisClient(), + }); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003', + title: 'First task', + }); + + await expect( + repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003', + title: 'Duplicate', + }), + ).rejects.toBeInstanceOf(TaskAlreadyExistsError); + }); + + it('lists tasks and filters by project, mission, and status', async () => { + const repository = new RedisTaskRepository({ + client: new InMemoryRedisClient(), + }); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003A', + title: 'Pending task', + }); + + await repository.create({ + project: 'queue', + mission: 'phase2', + taskId: 'MQ-003B', + title: 'Claimed task', + }); + + await repository.claim('MQ-003B', { + agentId: 'agent-a', + ttlSeconds: 60, + }); + + const byProject = await repository.list({ + project: 'queue', + }); + const byMission = await repository.list({ + mission: 'phase2', + }); + const byStatus = await repository.list({ + status: 'claimed', + }); + + expect(byProject).toHaveLength(2); + expect(byMission.map((task) => task.taskId)).toEqual(['MQ-003B']); + expect(byStatus.map((task) => task.taskId)).toEqual(['MQ-003B']); + }); + + it('lists 3+ tasks with a single mget call', async () => { + const client = new MgetTrackingRedisClient(); + const repository = new RedisTaskRepository({ + client, + }); + + await repository.create({ + project: 'queue', + mission: 'phase-list', + taskId: 'MQ-MGET-001', + title: 'Task one', + }); + await repository.create({ + project: 'queue', + mission: 'phase-list', + taskId: 'MQ-MGET-002', + title: 'Task two', + }); + await repository.create({ + project: 'queue', + mission: 'phase-list', + taskId: 'MQ-MGET-003', + title: 'Task three', + }); + + const tasks = await repository.list(); + + expect(tasks).toHaveLength(3); + expect(client.mgetCalls).toBe(1); + expect(client.getCalls).toBe(0); + expect(client.lastMgetKeys).toHaveLength(3); + }); + + it('updates mutable fields and preserves immutable fields', async () => { + const repository = new RedisTaskRepository({ + client: new InMemoryRedisClient(), + now: () => 1_700_000_000_001, + }); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003', + title: 'Original title', + description: 'Original description', + }); + + const updated = await repository.update('MQ-003', { + title: 'Updated title', + description: 'Updated description', + priority: 'high', + lane: 'coding', + dependencies: ['MQ-002'], + metadata: { + source: 'unit-test', + }, + }); + + expect(updated.title).toBe('Updated title'); + expect(updated.description).toBe('Updated description'); + expect(updated.priority).toBe('high'); + expect(updated.lane).toBe('coding'); + expect(updated.dependencies).toEqual(['MQ-002']); + expect(updated.metadata).toEqual({ source: 'unit-test' }); + expect(updated.project).toBe('queue'); + expect(updated.taskId).toBe('MQ-003'); + expect(updated.updatedAt).toBe(1_700_000_000_001); + }); + + it('rejects status transitions through update()', async () => { + const repository = new RedisTaskRepository({ + client: new InMemoryRedisClient(), + }); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003-TRANSITION', + title: 'Transition guard', + }); + + await expect( + repository.update('MQ-003-TRANSITION', { + status: 'completed', + }), + ).rejects.toBeInstanceOf(TaskTransitionError); + }); +}); diff --git a/packages/queue/tsconfig.build.json b/packages/queue/tsconfig.build.json new file mode 100644 index 0000000..8cfeb06 --- /dev/null +++ b/packages/queue/tsconfig.build.json @@ -0,0 +1,10 @@ +{ + "extends": "./tsconfig.json", + "compilerOptions": { + "noEmit": false, + "rootDir": ".", + "outDir": "dist" + }, + "include": ["src/**/*.ts", "bin/**/*.ts"], + "exclude": ["tests/**/*"] +} diff --git a/packages/queue/tsconfig.json b/packages/queue/tsconfig.json new file mode 100644 index 0000000..30ab1ef --- /dev/null +++ b/packages/queue/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "noEmit": true, + "rootDir": "." + }, + "include": ["src/**/*.ts", "tests/**/*.ts", "bin/**/*.ts", "vitest.config.ts"] +} diff --git a/packages/queue/vitest.config.ts b/packages/queue/vitest.config.ts new file mode 100644 index 0000000..8363e16 --- /dev/null +++ b/packages/queue/vitest.config.ts @@ -0,0 +1,8 @@ +import { defineConfig } from 'vitest/config'; + +export default defineConfig({ + test: { + environment: 'node', + include: ['tests/**/*.test.ts'], + }, +});