From a235aebf20d893dfc0087ce504fc02c9c49fcf98 Mon Sep 17 00:00:00 2001 From: Jason Woltje Date: Fri, 6 Mar 2026 09:19:53 -0600 Subject: [PATCH] feat(queue): add redis task CRUD repository (MQ-003) --- packages/queue/src/index.ts | 21 ++ packages/queue/src/task-repository.ts | 271 +++++++++++++++++++ packages/queue/src/task.ts | 76 ++++++ packages/queue/tests/task-repository.test.ts | 171 ++++++++++++ 4 files changed, 539 insertions(+) create mode 100644 packages/queue/src/task-repository.ts create mode 100644 packages/queue/src/task.ts create mode 100644 packages/queue/tests/task-repository.test.ts diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index 74a9354..9998ac0 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -12,3 +12,24 @@ export type { RedisHealthCheck, RedisPingClient, } from './redis-connection.js'; +export { + RedisTaskRepository, + TaskAlreadyExistsError, + TaskNotFoundError, + TaskSerializationError, +} from './task-repository.js'; +export type { RedisTaskClient, RedisTaskRepositoryOptions } from './task-repository.js'; +export { + TASK_LANES, + TASK_PRIORITIES, + TASK_STATUSES, +} from './task.js'; +export type { + CreateTaskInput, + Task, + TaskLane, + TaskListFilters, + TaskPriority, + TaskStatus, + TaskUpdateInput, +} from './task.js'; diff --git a/packages/queue/src/task-repository.ts b/packages/queue/src/task-repository.ts new file mode 100644 index 0000000..b31f8b8 --- /dev/null +++ b/packages/queue/src/task-repository.ts @@ -0,0 +1,271 @@ +import { + TASK_LANES, + TASK_PRIORITIES, + TASK_STATUSES, + type CreateTaskInput, + type Task, + type TaskLane, + type TaskListFilters, + type TaskPriority, + type TaskStatus, + type TaskUpdateInput, +} from './task.js'; + +const STATUS_SET = new Set(TASK_STATUSES); +const PRIORITY_SET = new Set(TASK_PRIORITIES); +const LANE_SET = new Set(TASK_LANES); + +const DEFAULT_KEY_PREFIX = 'mosaic:queue'; + +interface RepositoryKeys { + readonly taskIds: string; + task(taskId: string): string; +} + +export interface RedisTaskClient { + get(key: string): Promise; + set(key: string, value: string, mode?: 'NX' | 'XX'): Promise<'OK' | null>; + smembers(key: string): Promise; + sadd(key: string, member: string): Promise; +} + +export interface RedisTaskRepositoryOptions { + readonly client: RedisTaskClient; + readonly keyPrefix?: string; + readonly now?: () => number; +} + +export class TaskAlreadyExistsError extends Error { + public constructor(taskId: string) { + super(`Task ${taskId} already exists.`); + this.name = 'TaskAlreadyExistsError'; + } +} + +export class TaskNotFoundError extends Error { + public constructor(taskId: string) { + super(`Task ${taskId} was not found.`); + this.name = 'TaskNotFoundError'; + } +} + +export class TaskSerializationError extends Error { + public constructor(taskId: string, message: string) { + super(`Unable to deserialize task ${taskId}: ${message}`); + this.name = 'TaskSerializationError'; + } +} + +export class RedisTaskRepository { + private readonly client: RedisTaskClient; + private readonly keys: RepositoryKeys; + private readonly now: () => number; + + public constructor(options: RedisTaskRepositoryOptions) { + this.client = options.client; + this.keys = buildRepositoryKeys(options.keyPrefix ?? DEFAULT_KEY_PREFIX); + this.now = options.now ?? Date.now; + } + + public async create(input: CreateTaskInput): Promise { + const timestamp = this.now(); + + const task: Task = { + id: input.taskId, + project: input.project, + mission: input.mission, + taskId: input.taskId, + title: input.title, + description: input.description, + status: 'pending', + priority: input.priority ?? 'medium', + dependencies: [...(input.dependencies ?? [])], + lane: input.lane ?? 'any', + retryCount: 0, + metadata: input.metadata, + createdAt: timestamp, + updatedAt: timestamp, + }; + + const saveResult = await this.client.set( + this.keys.task(task.taskId), + JSON.stringify(task), + 'NX', + ); + + if (saveResult !== 'OK') { + throw new TaskAlreadyExistsError(task.taskId); + } + + await this.client.sadd(this.keys.taskIds, task.taskId); + + return task; + } + + public async get(taskId: string): Promise { + const raw = await this.client.get(this.keys.task(taskId)); + + if (raw === null) { + return null; + } + + return deserializeTask(taskId, raw); + } + + public async list(filters: TaskListFilters = {}): Promise { + const taskIds = await this.client.smembers(this.keys.taskIds); + const records = await Promise.all(taskIds.map(async (taskId) => this.get(taskId))); + const tasks = records.filter((task): task is Task => task !== null); + + return tasks + .filter((task) => + matchesFilters(task, { + project: filters.project, + mission: filters.mission, + status: filters.status, + }), + ) + .sort((left, right) => left.createdAt - right.createdAt); + } + + public async update(taskId: string, patch: TaskUpdateInput): Promise { + const existing = await this.get(taskId); + + if (existing === null) { + throw new TaskNotFoundError(taskId); + } + + const updated: Task = { + ...existing, + ...patch, + dependencies: + patch.dependencies === undefined ? existing.dependencies : [...patch.dependencies], + updatedAt: this.now(), + }; + + const saveResult = await this.client.set( + this.keys.task(taskId), + JSON.stringify(updated), + 'XX', + ); + + if (saveResult !== 'OK') { + throw new TaskNotFoundError(taskId); + } + + await this.client.sadd(this.keys.taskIds, taskId); + + return updated; + } +} + +function matchesFilters(task: Task, filters: TaskListFilters): boolean { + if (filters.project !== undefined && task.project !== filters.project) { + return false; + } + + if (filters.mission !== undefined && task.mission !== filters.mission) { + return false; + } + + if (filters.status !== undefined && task.status !== filters.status) { + return false; + } + + return true; +} + +function deserializeTask(taskId: string, raw: string): Task { + let parsed: unknown; + + try { + parsed = JSON.parse(raw); + } catch (error) { + throw new TaskSerializationError( + taskId, + error instanceof Error ? error.message : 'invalid JSON', + ); + } + + if (!isRecord(parsed)) { + throw new TaskSerializationError(taskId, 'task payload is not an object'); + } + + const requiredStringKeys = ['id', 'project', 'mission', 'taskId', 'title'] as const; + const requiredNumberKeys = ['retryCount', 'createdAt', 'updatedAt'] as const; + + for (const key of requiredStringKeys) { + if (typeof parsed[key] !== 'string') { + throw new TaskSerializationError(taskId, `missing string field "${key}"`); + } + } + + for (const key of requiredNumberKeys) { + if (typeof parsed[key] !== 'number') { + throw new TaskSerializationError(taskId, `missing numeric field "${key}"`); + } + } + + if (!STATUS_SET.has(parsed.status as TaskStatus)) { + throw new TaskSerializationError(taskId, 'invalid status value'); + } + + if (!PRIORITY_SET.has(parsed.priority as TaskPriority)) { + throw new TaskSerializationError(taskId, 'invalid priority value'); + } + + if (!LANE_SET.has(parsed.lane as TaskLane)) { + throw new TaskSerializationError(taskId, 'invalid lane value'); + } + + if (!Array.isArray(parsed.dependencies)) { + throw new TaskSerializationError(taskId, 'dependencies must be an array'); + } + + if (!parsed.dependencies.every((dependency): dependency is string => typeof dependency === 'string')) { + throw new TaskSerializationError(taskId, 'dependencies must contain only strings'); + } + + return { + id: parsed.id, + project: parsed.project, + mission: parsed.mission, + taskId: parsed.taskId, + title: parsed.title, + status: parsed.status, + priority: parsed.priority, + dependencies: parsed.dependencies, + lane: parsed.lane, + retryCount: parsed.retryCount, + createdAt: parsed.createdAt, + updatedAt: parsed.updatedAt, + ...(typeof parsed.description === 'string' + ? { description: parsed.description } + : {}), + ...(typeof parsed.claimedBy === 'string' ? { claimedBy: parsed.claimedBy } : {}), + ...(typeof parsed.claimedAt === 'number' ? { claimedAt: parsed.claimedAt } : {}), + ...(typeof parsed.claimTTL === 'number' ? { claimTTL: parsed.claimTTL } : {}), + ...(typeof parsed.completedAt === 'number' + ? { completedAt: parsed.completedAt } + : {}), + ...(typeof parsed.failedAt === 'number' ? { failedAt: parsed.failedAt } : {}), + ...(typeof parsed.failureReason === 'string' + ? { failureReason: parsed.failureReason } + : {}), + ...(typeof parsed.completionSummary === 'string' + ? { completionSummary: parsed.completionSummary } + : {}), + ...(isRecord(parsed.metadata) ? { metadata: parsed.metadata } : {}), + }; +} + +function buildRepositoryKeys(keyPrefix: string): RepositoryKeys { + return { + taskIds: `${keyPrefix}:task-ids`, + task: (taskId: string) => `${keyPrefix}:task:${taskId}`, + }; +} + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} diff --git a/packages/queue/src/task.ts b/packages/queue/src/task.ts new file mode 100644 index 0000000..df7f6f7 --- /dev/null +++ b/packages/queue/src/task.ts @@ -0,0 +1,76 @@ +export const TASK_STATUSES = [ + 'pending', + 'claimed', + 'in-progress', + 'completed', + 'failed', + 'blocked', +] as const; + +export const TASK_PRIORITIES = ['critical', 'high', 'medium', 'low'] as const; + +export const TASK_LANES = ['planning', 'coding', 'any'] as const; + +export type TaskStatus = (typeof TASK_STATUSES)[number]; +export type TaskPriority = (typeof TASK_PRIORITIES)[number]; +export type TaskLane = (typeof TASK_LANES)[number]; + +export interface Task { + readonly id: string; + readonly project: string; + readonly mission: string; + readonly taskId: string; + readonly title: string; + readonly description?: string; + readonly status: TaskStatus; + readonly priority: TaskPriority; + readonly dependencies: readonly string[]; + readonly lane: TaskLane; + readonly claimedBy?: string; + readonly claimedAt?: number; + readonly claimTTL?: number; + readonly completedAt?: number; + readonly failedAt?: number; + readonly failureReason?: string; + readonly completionSummary?: string; + readonly retryCount: number; + readonly metadata?: Record; + readonly createdAt: number; + readonly updatedAt: number; +} + +export interface CreateTaskInput { + readonly project: string; + readonly mission: string; + readonly taskId: string; + readonly title: string; + readonly description?: string; + readonly priority?: TaskPriority; + readonly dependencies?: readonly string[]; + readonly lane?: TaskLane; + readonly metadata?: Record; +} + +export interface TaskListFilters { + readonly project?: string; + readonly mission?: string; + readonly status?: TaskStatus; +} + +export interface TaskUpdateInput { + readonly title?: string; + readonly description?: string; + readonly status?: TaskStatus; + readonly priority?: TaskPriority; + readonly dependencies?: readonly string[]; + readonly lane?: TaskLane; + readonly claimedBy?: string; + readonly claimedAt?: number; + readonly claimTTL?: number; + readonly completedAt?: number; + readonly failedAt?: number; + readonly failureReason?: string; + readonly completionSummary?: string; + readonly retryCount?: number; + readonly metadata?: Record; +} diff --git a/packages/queue/tests/task-repository.test.ts b/packages/queue/tests/task-repository.test.ts new file mode 100644 index 0000000..180b7df --- /dev/null +++ b/packages/queue/tests/task-repository.test.ts @@ -0,0 +1,171 @@ +import { describe, expect, it } from 'vitest'; + +import { + RedisTaskRepository, + TaskAlreadyExistsError, + type RedisTaskClient, +} from '../src/task-repository.js'; + +class InMemoryRedisClient implements RedisTaskClient { + private readonly kv = new Map(); + private readonly sets = new Map>(); + + public get(key: string): Promise { + return Promise.resolve(this.kv.get(key) ?? null); + } + + public set( + key: string, + value: string, + mode?: 'NX' | 'XX', + ): Promise<'OK' | null> { + const exists = this.kv.has(key); + + if (mode === 'NX' && exists) { + return Promise.resolve(null); + } + + if (mode === 'XX' && !exists) { + return Promise.resolve(null); + } + + this.kv.set(key, value); + return Promise.resolve('OK'); + } + + public smembers(key: string): Promise { + return Promise.resolve([...(this.sets.get(key) ?? new Set())]); + } + + public sadd(key: string, member: string): Promise { + const values = this.sets.get(key) ?? new Set(); + const beforeSize = values.size; + + values.add(member); + this.sets.set(key, values); + + return Promise.resolve(values.size === beforeSize ? 0 : 1); + } +} + +describe('RedisTaskRepository CRUD', () => { + it('creates and fetches a task with defaults', async () => { + const repository = new RedisTaskRepository({ + client: new InMemoryRedisClient(), + now: () => 1_700_000_000_000, + }); + + const created = await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003', + title: 'Implement task CRUD', + }); + + const fetched = await repository.get('MQ-003'); + + expect(created.id).toBe('MQ-003'); + expect(created.status).toBe('pending'); + expect(created.priority).toBe('medium'); + expect(created.lane).toBe('any'); + expect(created.dependencies).toEqual([]); + expect(created.createdAt).toBe(1_700_000_000_000); + expect(fetched).toEqual(created); + }); + + it('throws when creating a duplicate task id', async () => { + const repository = new RedisTaskRepository({ + client: new InMemoryRedisClient(), + }); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003', + title: 'First task', + }); + + await expect( + repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003', + title: 'Duplicate', + }), + ).rejects.toBeInstanceOf(TaskAlreadyExistsError); + }); + + it('lists tasks and filters by project, mission, and status', async () => { + const repository = new RedisTaskRepository({ + client: new InMemoryRedisClient(), + }); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003A', + title: 'Pending task', + }); + + await repository.create({ + project: 'queue', + mission: 'phase2', + taskId: 'MQ-003B', + title: 'Claimed task', + }); + + await repository.update('MQ-003B', { + status: 'claimed', + }); + + const byProject = await repository.list({ + project: 'queue', + }); + const byMission = await repository.list({ + mission: 'phase2', + }); + const byStatus = await repository.list({ + status: 'claimed', + }); + + expect(byProject).toHaveLength(2); + expect(byMission.map((task) => task.taskId)).toEqual(['MQ-003B']); + expect(byStatus.map((task) => task.taskId)).toEqual(['MQ-003B']); + }); + + it('updates mutable fields and preserves immutable fields', async () => { + const repository = new RedisTaskRepository({ + client: new InMemoryRedisClient(), + now: () => 1_700_000_000_001, + }); + + await repository.create({ + project: 'queue', + mission: 'phase1', + taskId: 'MQ-003', + title: 'Original title', + description: 'Original description', + }); + + const updated = await repository.update('MQ-003', { + title: 'Updated title', + description: 'Updated description', + priority: 'high', + lane: 'coding', + dependencies: ['MQ-002'], + metadata: { + source: 'unit-test', + }, + }); + + expect(updated.title).toBe('Updated title'); + expect(updated.description).toBe('Updated description'); + expect(updated.priority).toBe('high'); + expect(updated.lane).toBe('coding'); + expect(updated.dependencies).toEqual(['MQ-002']); + expect(updated.metadata).toEqual({ source: 'unit-test' }); + expect(updated.project).toBe('queue'); + expect(updated.taskId).toBe('MQ-003'); + expect(updated.updatedAt).toBe(1_700_000_000_001); + }); +});