feat(queue): add redis task CRUD repository (MQ-003)
This commit is contained in:
@@ -12,3 +12,24 @@ export type {
|
||||
RedisHealthCheck,
|
||||
RedisPingClient,
|
||||
} from './redis-connection.js';
|
||||
export {
|
||||
RedisTaskRepository,
|
||||
TaskAlreadyExistsError,
|
||||
TaskNotFoundError,
|
||||
TaskSerializationError,
|
||||
} from './task-repository.js';
|
||||
export type { RedisTaskClient, RedisTaskRepositoryOptions } from './task-repository.js';
|
||||
export {
|
||||
TASK_LANES,
|
||||
TASK_PRIORITIES,
|
||||
TASK_STATUSES,
|
||||
} from './task.js';
|
||||
export type {
|
||||
CreateTaskInput,
|
||||
Task,
|
||||
TaskLane,
|
||||
TaskListFilters,
|
||||
TaskPriority,
|
||||
TaskStatus,
|
||||
TaskUpdateInput,
|
||||
} from './task.js';
|
||||
|
||||
271
packages/queue/src/task-repository.ts
Normal file
271
packages/queue/src/task-repository.ts
Normal file
@@ -0,0 +1,271 @@
|
||||
import {
|
||||
TASK_LANES,
|
||||
TASK_PRIORITIES,
|
||||
TASK_STATUSES,
|
||||
type CreateTaskInput,
|
||||
type Task,
|
||||
type TaskLane,
|
||||
type TaskListFilters,
|
||||
type TaskPriority,
|
||||
type TaskStatus,
|
||||
type TaskUpdateInput,
|
||||
} from './task.js';
|
||||
|
||||
const STATUS_SET = new Set<TaskStatus>(TASK_STATUSES);
|
||||
const PRIORITY_SET = new Set<TaskPriority>(TASK_PRIORITIES);
|
||||
const LANE_SET = new Set<TaskLane>(TASK_LANES);
|
||||
|
||||
const DEFAULT_KEY_PREFIX = 'mosaic:queue';
|
||||
|
||||
interface RepositoryKeys {
|
||||
readonly taskIds: string;
|
||||
task(taskId: string): string;
|
||||
}
|
||||
|
||||
export interface RedisTaskClient {
|
||||
get(key: string): Promise<string | null>;
|
||||
set(key: string, value: string, mode?: 'NX' | 'XX'): Promise<'OK' | null>;
|
||||
smembers(key: string): Promise<string[]>;
|
||||
sadd(key: string, member: string): Promise<number>;
|
||||
}
|
||||
|
||||
export interface RedisTaskRepositoryOptions {
|
||||
readonly client: RedisTaskClient;
|
||||
readonly keyPrefix?: string;
|
||||
readonly now?: () => number;
|
||||
}
|
||||
|
||||
export class TaskAlreadyExistsError extends Error {
|
||||
public constructor(taskId: string) {
|
||||
super(`Task ${taskId} already exists.`);
|
||||
this.name = 'TaskAlreadyExistsError';
|
||||
}
|
||||
}
|
||||
|
||||
export class TaskNotFoundError extends Error {
|
||||
public constructor(taskId: string) {
|
||||
super(`Task ${taskId} was not found.`);
|
||||
this.name = 'TaskNotFoundError';
|
||||
}
|
||||
}
|
||||
|
||||
export class TaskSerializationError extends Error {
|
||||
public constructor(taskId: string, message: string) {
|
||||
super(`Unable to deserialize task ${taskId}: ${message}`);
|
||||
this.name = 'TaskSerializationError';
|
||||
}
|
||||
}
|
||||
|
||||
export class RedisTaskRepository {
|
||||
private readonly client: RedisTaskClient;
|
||||
private readonly keys: RepositoryKeys;
|
||||
private readonly now: () => number;
|
||||
|
||||
public constructor(options: RedisTaskRepositoryOptions) {
|
||||
this.client = options.client;
|
||||
this.keys = buildRepositoryKeys(options.keyPrefix ?? DEFAULT_KEY_PREFIX);
|
||||
this.now = options.now ?? Date.now;
|
||||
}
|
||||
|
||||
public async create(input: CreateTaskInput): Promise<Task> {
|
||||
const timestamp = this.now();
|
||||
|
||||
const task: Task = {
|
||||
id: input.taskId,
|
||||
project: input.project,
|
||||
mission: input.mission,
|
||||
taskId: input.taskId,
|
||||
title: input.title,
|
||||
description: input.description,
|
||||
status: 'pending',
|
||||
priority: input.priority ?? 'medium',
|
||||
dependencies: [...(input.dependencies ?? [])],
|
||||
lane: input.lane ?? 'any',
|
||||
retryCount: 0,
|
||||
metadata: input.metadata,
|
||||
createdAt: timestamp,
|
||||
updatedAt: timestamp,
|
||||
};
|
||||
|
||||
const saveResult = await this.client.set(
|
||||
this.keys.task(task.taskId),
|
||||
JSON.stringify(task),
|
||||
'NX',
|
||||
);
|
||||
|
||||
if (saveResult !== 'OK') {
|
||||
throw new TaskAlreadyExistsError(task.taskId);
|
||||
}
|
||||
|
||||
await this.client.sadd(this.keys.taskIds, task.taskId);
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
public async get(taskId: string): Promise<Task | null> {
|
||||
const raw = await this.client.get(this.keys.task(taskId));
|
||||
|
||||
if (raw === null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return deserializeTask(taskId, raw);
|
||||
}
|
||||
|
||||
public async list(filters: TaskListFilters = {}): Promise<Task[]> {
|
||||
const taskIds = await this.client.smembers(this.keys.taskIds);
|
||||
const records = await Promise.all(taskIds.map(async (taskId) => this.get(taskId)));
|
||||
const tasks = records.filter((task): task is Task => task !== null);
|
||||
|
||||
return tasks
|
||||
.filter((task) =>
|
||||
matchesFilters(task, {
|
||||
project: filters.project,
|
||||
mission: filters.mission,
|
||||
status: filters.status,
|
||||
}),
|
||||
)
|
||||
.sort((left, right) => left.createdAt - right.createdAt);
|
||||
}
|
||||
|
||||
public async update(taskId: string, patch: TaskUpdateInput): Promise<Task> {
|
||||
const existing = await this.get(taskId);
|
||||
|
||||
if (existing === null) {
|
||||
throw new TaskNotFoundError(taskId);
|
||||
}
|
||||
|
||||
const updated: Task = {
|
||||
...existing,
|
||||
...patch,
|
||||
dependencies:
|
||||
patch.dependencies === undefined ? existing.dependencies : [...patch.dependencies],
|
||||
updatedAt: this.now(),
|
||||
};
|
||||
|
||||
const saveResult = await this.client.set(
|
||||
this.keys.task(taskId),
|
||||
JSON.stringify(updated),
|
||||
'XX',
|
||||
);
|
||||
|
||||
if (saveResult !== 'OK') {
|
||||
throw new TaskNotFoundError(taskId);
|
||||
}
|
||||
|
||||
await this.client.sadd(this.keys.taskIds, taskId);
|
||||
|
||||
return updated;
|
||||
}
|
||||
}
|
||||
|
||||
function matchesFilters(task: Task, filters: TaskListFilters): boolean {
|
||||
if (filters.project !== undefined && task.project !== filters.project) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (filters.mission !== undefined && task.mission !== filters.mission) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (filters.status !== undefined && task.status !== filters.status) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
function deserializeTask(taskId: string, raw: string): Task {
|
||||
let parsed: unknown;
|
||||
|
||||
try {
|
||||
parsed = JSON.parse(raw);
|
||||
} catch (error) {
|
||||
throw new TaskSerializationError(
|
||||
taskId,
|
||||
error instanceof Error ? error.message : 'invalid JSON',
|
||||
);
|
||||
}
|
||||
|
||||
if (!isRecord(parsed)) {
|
||||
throw new TaskSerializationError(taskId, 'task payload is not an object');
|
||||
}
|
||||
|
||||
const requiredStringKeys = ['id', 'project', 'mission', 'taskId', 'title'] as const;
|
||||
const requiredNumberKeys = ['retryCount', 'createdAt', 'updatedAt'] as const;
|
||||
|
||||
for (const key of requiredStringKeys) {
|
||||
if (typeof parsed[key] !== 'string') {
|
||||
throw new TaskSerializationError(taskId, `missing string field "${key}"`);
|
||||
}
|
||||
}
|
||||
|
||||
for (const key of requiredNumberKeys) {
|
||||
if (typeof parsed[key] !== 'number') {
|
||||
throw new TaskSerializationError(taskId, `missing numeric field "${key}"`);
|
||||
}
|
||||
}
|
||||
|
||||
if (!STATUS_SET.has(parsed.status as TaskStatus)) {
|
||||
throw new TaskSerializationError(taskId, 'invalid status value');
|
||||
}
|
||||
|
||||
if (!PRIORITY_SET.has(parsed.priority as TaskPriority)) {
|
||||
throw new TaskSerializationError(taskId, 'invalid priority value');
|
||||
}
|
||||
|
||||
if (!LANE_SET.has(parsed.lane as TaskLane)) {
|
||||
throw new TaskSerializationError(taskId, 'invalid lane value');
|
||||
}
|
||||
|
||||
if (!Array.isArray(parsed.dependencies)) {
|
||||
throw new TaskSerializationError(taskId, 'dependencies must be an array');
|
||||
}
|
||||
|
||||
if (!parsed.dependencies.every((dependency): dependency is string => typeof dependency === 'string')) {
|
||||
throw new TaskSerializationError(taskId, 'dependencies must contain only strings');
|
||||
}
|
||||
|
||||
return {
|
||||
id: parsed.id,
|
||||
project: parsed.project,
|
||||
mission: parsed.mission,
|
||||
taskId: parsed.taskId,
|
||||
title: parsed.title,
|
||||
status: parsed.status,
|
||||
priority: parsed.priority,
|
||||
dependencies: parsed.dependencies,
|
||||
lane: parsed.lane,
|
||||
retryCount: parsed.retryCount,
|
||||
createdAt: parsed.createdAt,
|
||||
updatedAt: parsed.updatedAt,
|
||||
...(typeof parsed.description === 'string'
|
||||
? { description: parsed.description }
|
||||
: {}),
|
||||
...(typeof parsed.claimedBy === 'string' ? { claimedBy: parsed.claimedBy } : {}),
|
||||
...(typeof parsed.claimedAt === 'number' ? { claimedAt: parsed.claimedAt } : {}),
|
||||
...(typeof parsed.claimTTL === 'number' ? { claimTTL: parsed.claimTTL } : {}),
|
||||
...(typeof parsed.completedAt === 'number'
|
||||
? { completedAt: parsed.completedAt }
|
||||
: {}),
|
||||
...(typeof parsed.failedAt === 'number' ? { failedAt: parsed.failedAt } : {}),
|
||||
...(typeof parsed.failureReason === 'string'
|
||||
? { failureReason: parsed.failureReason }
|
||||
: {}),
|
||||
...(typeof parsed.completionSummary === 'string'
|
||||
? { completionSummary: parsed.completionSummary }
|
||||
: {}),
|
||||
...(isRecord(parsed.metadata) ? { metadata: parsed.metadata } : {}),
|
||||
};
|
||||
}
|
||||
|
||||
function buildRepositoryKeys(keyPrefix: string): RepositoryKeys {
|
||||
return {
|
||||
taskIds: `${keyPrefix}:task-ids`,
|
||||
task: (taskId: string) => `${keyPrefix}:task:${taskId}`,
|
||||
};
|
||||
}
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === 'object' && value !== null && !Array.isArray(value);
|
||||
}
|
||||
76
packages/queue/src/task.ts
Normal file
76
packages/queue/src/task.ts
Normal file
@@ -0,0 +1,76 @@
|
||||
export const TASK_STATUSES = [
|
||||
'pending',
|
||||
'claimed',
|
||||
'in-progress',
|
||||
'completed',
|
||||
'failed',
|
||||
'blocked',
|
||||
] as const;
|
||||
|
||||
export const TASK_PRIORITIES = ['critical', 'high', 'medium', 'low'] as const;
|
||||
|
||||
export const TASK_LANES = ['planning', 'coding', 'any'] as const;
|
||||
|
||||
export type TaskStatus = (typeof TASK_STATUSES)[number];
|
||||
export type TaskPriority = (typeof TASK_PRIORITIES)[number];
|
||||
export type TaskLane = (typeof TASK_LANES)[number];
|
||||
|
||||
export interface Task {
|
||||
readonly id: string;
|
||||
readonly project: string;
|
||||
readonly mission: string;
|
||||
readonly taskId: string;
|
||||
readonly title: string;
|
||||
readonly description?: string;
|
||||
readonly status: TaskStatus;
|
||||
readonly priority: TaskPriority;
|
||||
readonly dependencies: readonly string[];
|
||||
readonly lane: TaskLane;
|
||||
readonly claimedBy?: string;
|
||||
readonly claimedAt?: number;
|
||||
readonly claimTTL?: number;
|
||||
readonly completedAt?: number;
|
||||
readonly failedAt?: number;
|
||||
readonly failureReason?: string;
|
||||
readonly completionSummary?: string;
|
||||
readonly retryCount: number;
|
||||
readonly metadata?: Record<string, unknown>;
|
||||
readonly createdAt: number;
|
||||
readonly updatedAt: number;
|
||||
}
|
||||
|
||||
export interface CreateTaskInput {
|
||||
readonly project: string;
|
||||
readonly mission: string;
|
||||
readonly taskId: string;
|
||||
readonly title: string;
|
||||
readonly description?: string;
|
||||
readonly priority?: TaskPriority;
|
||||
readonly dependencies?: readonly string[];
|
||||
readonly lane?: TaskLane;
|
||||
readonly metadata?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface TaskListFilters {
|
||||
readonly project?: string;
|
||||
readonly mission?: string;
|
||||
readonly status?: TaskStatus;
|
||||
}
|
||||
|
||||
export interface TaskUpdateInput {
|
||||
readonly title?: string;
|
||||
readonly description?: string;
|
||||
readonly status?: TaskStatus;
|
||||
readonly priority?: TaskPriority;
|
||||
readonly dependencies?: readonly string[];
|
||||
readonly lane?: TaskLane;
|
||||
readonly claimedBy?: string;
|
||||
readonly claimedAt?: number;
|
||||
readonly claimTTL?: number;
|
||||
readonly completedAt?: number;
|
||||
readonly failedAt?: number;
|
||||
readonly failureReason?: string;
|
||||
readonly completionSummary?: string;
|
||||
readonly retryCount?: number;
|
||||
readonly metadata?: Record<string, unknown>;
|
||||
}
|
||||
171
packages/queue/tests/task-repository.test.ts
Normal file
171
packages/queue/tests/task-repository.test.ts
Normal file
@@ -0,0 +1,171 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import {
|
||||
RedisTaskRepository,
|
||||
TaskAlreadyExistsError,
|
||||
type RedisTaskClient,
|
||||
} from '../src/task-repository.js';
|
||||
|
||||
class InMemoryRedisClient implements RedisTaskClient {
|
||||
private readonly kv = new Map<string, string>();
|
||||
private readonly sets = new Map<string, Set<string>>();
|
||||
|
||||
public get(key: string): Promise<string | null> {
|
||||
return Promise.resolve(this.kv.get(key) ?? null);
|
||||
}
|
||||
|
||||
public set(
|
||||
key: string,
|
||||
value: string,
|
||||
mode?: 'NX' | 'XX',
|
||||
): Promise<'OK' | null> {
|
||||
const exists = this.kv.has(key);
|
||||
|
||||
if (mode === 'NX' && exists) {
|
||||
return Promise.resolve(null);
|
||||
}
|
||||
|
||||
if (mode === 'XX' && !exists) {
|
||||
return Promise.resolve(null);
|
||||
}
|
||||
|
||||
this.kv.set(key, value);
|
||||
return Promise.resolve('OK');
|
||||
}
|
||||
|
||||
public smembers(key: string): Promise<string[]> {
|
||||
return Promise.resolve([...(this.sets.get(key) ?? new Set<string>())]);
|
||||
}
|
||||
|
||||
public sadd(key: string, member: string): Promise<number> {
|
||||
const values = this.sets.get(key) ?? new Set<string>();
|
||||
const beforeSize = values.size;
|
||||
|
||||
values.add(member);
|
||||
this.sets.set(key, values);
|
||||
|
||||
return Promise.resolve(values.size === beforeSize ? 0 : 1);
|
||||
}
|
||||
}
|
||||
|
||||
describe('RedisTaskRepository CRUD', () => {
|
||||
it('creates and fetches a task with defaults', async () => {
|
||||
const repository = new RedisTaskRepository({
|
||||
client: new InMemoryRedisClient(),
|
||||
now: () => 1_700_000_000_000,
|
||||
});
|
||||
|
||||
const created = await repository.create({
|
||||
project: 'queue',
|
||||
mission: 'phase1',
|
||||
taskId: 'MQ-003',
|
||||
title: 'Implement task CRUD',
|
||||
});
|
||||
|
||||
const fetched = await repository.get('MQ-003');
|
||||
|
||||
expect(created.id).toBe('MQ-003');
|
||||
expect(created.status).toBe('pending');
|
||||
expect(created.priority).toBe('medium');
|
||||
expect(created.lane).toBe('any');
|
||||
expect(created.dependencies).toEqual([]);
|
||||
expect(created.createdAt).toBe(1_700_000_000_000);
|
||||
expect(fetched).toEqual(created);
|
||||
});
|
||||
|
||||
it('throws when creating a duplicate task id', async () => {
|
||||
const repository = new RedisTaskRepository({
|
||||
client: new InMemoryRedisClient(),
|
||||
});
|
||||
|
||||
await repository.create({
|
||||
project: 'queue',
|
||||
mission: 'phase1',
|
||||
taskId: 'MQ-003',
|
||||
title: 'First task',
|
||||
});
|
||||
|
||||
await expect(
|
||||
repository.create({
|
||||
project: 'queue',
|
||||
mission: 'phase1',
|
||||
taskId: 'MQ-003',
|
||||
title: 'Duplicate',
|
||||
}),
|
||||
).rejects.toBeInstanceOf(TaskAlreadyExistsError);
|
||||
});
|
||||
|
||||
it('lists tasks and filters by project, mission, and status', async () => {
|
||||
const repository = new RedisTaskRepository({
|
||||
client: new InMemoryRedisClient(),
|
||||
});
|
||||
|
||||
await repository.create({
|
||||
project: 'queue',
|
||||
mission: 'phase1',
|
||||
taskId: 'MQ-003A',
|
||||
title: 'Pending task',
|
||||
});
|
||||
|
||||
await repository.create({
|
||||
project: 'queue',
|
||||
mission: 'phase2',
|
||||
taskId: 'MQ-003B',
|
||||
title: 'Claimed task',
|
||||
});
|
||||
|
||||
await repository.update('MQ-003B', {
|
||||
status: 'claimed',
|
||||
});
|
||||
|
||||
const byProject = await repository.list({
|
||||
project: 'queue',
|
||||
});
|
||||
const byMission = await repository.list({
|
||||
mission: 'phase2',
|
||||
});
|
||||
const byStatus = await repository.list({
|
||||
status: 'claimed',
|
||||
});
|
||||
|
||||
expect(byProject).toHaveLength(2);
|
||||
expect(byMission.map((task) => task.taskId)).toEqual(['MQ-003B']);
|
||||
expect(byStatus.map((task) => task.taskId)).toEqual(['MQ-003B']);
|
||||
});
|
||||
|
||||
it('updates mutable fields and preserves immutable fields', async () => {
|
||||
const repository = new RedisTaskRepository({
|
||||
client: new InMemoryRedisClient(),
|
||||
now: () => 1_700_000_000_001,
|
||||
});
|
||||
|
||||
await repository.create({
|
||||
project: 'queue',
|
||||
mission: 'phase1',
|
||||
taskId: 'MQ-003',
|
||||
title: 'Original title',
|
||||
description: 'Original description',
|
||||
});
|
||||
|
||||
const updated = await repository.update('MQ-003', {
|
||||
title: 'Updated title',
|
||||
description: 'Updated description',
|
||||
priority: 'high',
|
||||
lane: 'coding',
|
||||
dependencies: ['MQ-002'],
|
||||
metadata: {
|
||||
source: 'unit-test',
|
||||
},
|
||||
});
|
||||
|
||||
expect(updated.title).toBe('Updated title');
|
||||
expect(updated.description).toBe('Updated description');
|
||||
expect(updated.priority).toBe('high');
|
||||
expect(updated.lane).toBe('coding');
|
||||
expect(updated.dependencies).toEqual(['MQ-002']);
|
||||
expect(updated.metadata).toEqual({ source: 'unit-test' });
|
||||
expect(updated.project).toBe('queue');
|
||||
expect(updated.taskId).toBe('MQ-003');
|
||||
expect(updated.updatedAt).toBe(1_700_000_000_001);
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user