feat(wave1): @mosaic/types populated + @mosaic/queue migrated to use it (#1)

Co-authored-by: Jason Woltje <jason@diversecanvas.com>
Co-committed-by: Jason Woltje <jason@diversecanvas.com>
This commit was merged in pull request #1.
This commit is contained in:
2026-03-06 22:44:05 +00:00
committed by jason.woltje
parent 5103406c93
commit 8a2fb6c1ec
24 changed files with 4946 additions and 13 deletions

344
packages/queue/src/cli.ts Normal file
View File

@@ -0,0 +1,344 @@
import {
Command,
CommanderError,
InvalidArgumentError,
Option,
} from 'commander';
import { assertRedisHealthy, createRedisClient } from './redis-connection.js';
import {
RedisTaskRepository,
type ClaimTaskInput,
type CompleteTaskInput,
type RedisTaskClient,
} from './task-repository.js';
import { TASK_LANES, TASK_PRIORITIES, TASK_STATUSES } from './task.js';
import type {
CreateTaskInput,
TaskLane,
TaskListFilters,
TaskPriority,
TaskStatus,
} from '@mosaic/types';
export type QueueRepository = Pick<
RedisTaskRepository,
'create' | 'list' | 'get' | 'claim' | 'release' | 'complete'
>;
export interface QueueRepositorySession {
readonly repository: QueueRepository;
readonly close: () => Promise<void>;
}
export interface QueueCliDependencies {
readonly openSession: () => Promise<QueueRepositorySession>;
readonly stdout: (line: string) => void;
readonly stderr: (line: string) => void;
}
interface CreateCommandOptions {
readonly title: string;
readonly description?: string;
readonly priority?: TaskPriority;
readonly lane?: TaskLane;
readonly dependency?: string[];
}
interface ListCommandOptions {
readonly project?: string;
readonly mission?: string;
readonly status?: TaskStatus;
}
interface ClaimCommandOptions {
readonly agent: string;
readonly ttl: number;
}
interface ReleaseCommandOptions {
readonly agent?: string;
}
interface CompleteCommandOptions {
readonly agent?: string;
readonly summary?: string;
}
interface ClosableRedisTaskClient extends RedisTaskClient {
ping(): Promise<string>;
quit(): Promise<string>;
}
const DEFAULT_DEPENDENCIES: QueueCliDependencies = {
openSession: openRedisSession,
stdout: (line: string) => {
console.log(line);
},
stderr: (line: string) => {
console.error(line);
},
};
const PRIORITY_SET = new Set<TaskPriority>(TASK_PRIORITIES);
const LANE_SET = new Set<TaskLane>(TASK_LANES);
const STATUS_SET = new Set<TaskStatus>(TASK_STATUSES);
export function buildQueueCli(
dependencyOverrides: Partial<QueueCliDependencies> = {},
): Command {
const dependencies = resolveDependencies(dependencyOverrides);
const program = new Command();
program
.name('mosaic')
.description('mosaic queue command line interface')
.exitOverride();
program.configureOutput({
writeOut: (output: string) => dependencies.stdout(output.trimEnd()),
writeErr: (output: string) => dependencies.stderr(output.trimEnd()),
});
const queue = program.command('queue').description('Manage queue tasks');
queue
.command('create <project> <mission> <taskId>')
.description('Create a queue task')
.requiredOption('--title <title>', 'Task title')
.option('--description <description>', 'Task description')
.addOption(
new Option('--priority <priority>', 'Task priority')
.choices(TASK_PRIORITIES)
.argParser(parsePriority),
)
.addOption(
new Option('--lane <lane>', 'Task lane').choices(TASK_LANES).argParser(parseLane),
)
.option('--dependency <taskIds...>', 'Task dependencies')
.action(
async (
project: string,
mission: string,
taskId: string,
options: CreateCommandOptions,
) => {
await withSession(dependencies, async (repository) => {
const payload: CreateTaskInput = {
project,
mission,
taskId,
title: options.title,
description: options.description,
priority: options.priority,
dependencies: options.dependency,
lane: options.lane,
};
const task = await repository.create(payload);
dependencies.stdout(JSON.stringify(task, null, 2));
});
},
);
queue
.command('list')
.description('List queue tasks')
.option('--project <project>', 'Filter by project')
.option('--mission <mission>', 'Filter by mission')
.addOption(
new Option('--status <status>', 'Filter by status')
.choices(TASK_STATUSES)
.argParser(parseStatus),
)
.action(async (options: ListCommandOptions) => {
await withSession(dependencies, async (repository) => {
const filters: TaskListFilters = {
project: options.project,
mission: options.mission,
status: options.status,
};
const tasks = await repository.list(filters);
dependencies.stdout(JSON.stringify(tasks, null, 2));
});
});
queue
.command('show <taskId>')
.description('Show a single queue task')
.action(async (taskId: string) => {
await withSession(dependencies, async (repository) => {
const task = await repository.get(taskId);
if (task === null) {
throw new Error(`Task ${taskId} was not found.`);
}
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
queue
.command('claim <taskId>')
.description('Claim a pending task')
.requiredOption('--agent <agentId>', 'Agent identifier')
.requiredOption('--ttl <seconds>', 'Claim TTL in seconds', parsePositiveInteger)
.action(async (taskId: string, options: ClaimCommandOptions) => {
await withSession(dependencies, async (repository) => {
const claimInput: ClaimTaskInput = {
agentId: options.agent,
ttlSeconds: options.ttl,
};
const task = await repository.claim(taskId, claimInput);
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
queue
.command('release <taskId>')
.description('Release a claimed task back to pending')
.option('--agent <agentId>', 'Expected owner agent id')
.action(async (taskId: string, options: ReleaseCommandOptions) => {
await withSession(dependencies, async (repository) => {
const task = await repository.release(taskId, {
agentId: options.agent,
});
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
queue
.command('complete <taskId>')
.description('Complete a claimed task')
.option('--agent <agentId>', 'Expected owner agent id')
.option('--summary <summary>', 'Optional completion summary')
.action(async (taskId: string, options: CompleteCommandOptions) => {
await withSession(dependencies, async (repository) => {
const completeInput: CompleteTaskInput = {
agentId: options.agent,
summary: options.summary,
};
const task = await repository.complete(taskId, completeInput);
dependencies.stdout(JSON.stringify(task, null, 2));
});
});
return program;
}
export async function runQueueCli(
argv: string[] = process.argv,
dependencyOverrides: Partial<QueueCliDependencies> = {},
): Promise<number> {
const dependencies = resolveDependencies(dependencyOverrides);
const program = buildQueueCli(dependencies);
try {
await program.parseAsync(argv, {
from: 'node',
});
return 0;
} catch (error) {
if (error instanceof CommanderError) {
if (error.code === 'commander.helpDisplayed') {
return 0;
}
if (error.code.startsWith('commander.')) {
return error.exitCode;
}
}
dependencies.stderr(formatError(error));
return 1;
}
}
async function openRedisSession(): Promise<QueueRepositorySession> {
const redisClient = createRedisClient<ClosableRedisTaskClient>();
try {
await assertRedisHealthy(redisClient);
return {
repository: new RedisTaskRepository({
client: redisClient,
}),
close: async () => {
await redisClient.quit();
},
};
} catch (error) {
await redisClient.quit();
throw error;
}
}
async function withSession(
dependencies: QueueCliDependencies,
action: (repository: QueueRepository) => Promise<void>,
): Promise<void> {
const session = await dependencies.openSession();
try {
await action(session.repository);
} finally {
await session.close();
}
}
function resolveDependencies(
overrides: Partial<QueueCliDependencies>,
): QueueCliDependencies {
const openSession = overrides.openSession ?? DEFAULT_DEPENDENCIES.openSession;
const stdout = overrides.stdout ?? DEFAULT_DEPENDENCIES.stdout;
const stderr = overrides.stderr ?? DEFAULT_DEPENDENCIES.stderr;
return {
openSession: () => openSession(),
stdout: (line: string) => stdout(line),
stderr: (line: string) => stderr(line),
};
}
function parsePositiveInteger(value: string): number {
const parsed = Number.parseInt(value, 10);
if (!Number.isInteger(parsed) || parsed <= 0) {
throw new InvalidArgumentError(`Expected a positive integer, received "${value}"`);
}
return parsed;
}
function parsePriority(value: string): TaskPriority {
if (!PRIORITY_SET.has(value as TaskPriority)) {
throw new InvalidArgumentError(
`Expected one of ${TASK_PRIORITIES.join(', ')}, received "${value}"`,
);
}
return value as TaskPriority;
}
function parseLane(value: string): TaskLane {
if (!LANE_SET.has(value as TaskLane)) {
throw new InvalidArgumentError(
`Expected one of ${TASK_LANES.join(', ')}, received "${value}"`,
);
}
return value as TaskLane;
}
function parseStatus(value: string): TaskStatus {
if (!STATUS_SET.has(value as TaskStatus)) {
throw new InvalidArgumentError(
`Expected one of ${TASK_STATUSES.join(', ')}, received "${value}"`,
);
}
return value as TaskStatus;
}
function formatError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}

View File

@@ -0,0 +1,69 @@
export const packageVersion = '0.1.0';
export {
assertRedisHealthy,
createRedisClient,
resolveRedisUrl,
runRedisHealthCheck,
} from './redis-connection.js';
export type {
CreateRedisClientOptions,
RedisClientConstructor,
RedisHealthCheck,
RedisPingClient,
} from './redis-connection.js';
export {
RedisTaskRepository,
TaskAlreadyExistsError,
TaskAtomicConflictError,
TaskNotFoundError,
TaskOwnershipError,
TaskSerializationError,
TaskTransitionError,
} from './task-repository.js';
export type {
ClaimTaskInput,
CompleteTaskInput,
FailTaskInput,
HeartbeatTaskInput,
RedisTaskClient,
RedisTaskRepositoryOptions,
RedisTaskTransaction,
ReleaseTaskInput,
} from './task-repository.js';
export { TASK_LANES, TASK_PRIORITIES, TASK_STATUSES } from './task.js';
export type {
CreateTaskInput,
Task,
TaskLane,
TaskListFilters,
TaskPriority,
TaskStatus,
TaskUpdateInput,
} from '@mosaic/types';
export { buildQueueCli, runQueueCli } from './cli.js';
export type {
QueueCliDependencies,
QueueRepository,
QueueRepositorySession,
} from './cli.js';
export {
QUEUE_MCP_TOOL_DEFINITIONS,
buildQueueMcpServer,
startQueueMcpServer,
} from './mcp-server.js';
export type {
QueueMcpDependencies,
QueueMcpRepository,
QueueMcpSession,
} from './mcp-server.js';
export {
queueClaimToolInputSchema,
queueCompleteToolInputSchema,
queueFailToolInputSchema,
queueGetToolInputSchema,
queueHeartbeatToolInputSchema,
queueListToolInputSchema,
queueReleaseToolInputSchema,
queueStatusToolInputSchema,
} from './mcp-tool-schemas.js';

View File

@@ -0,0 +1,310 @@
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import type { CallToolResult, Implementation } from '@modelcontextprotocol/sdk/types.js';
import type { z } from 'zod';
import {
assertRedisHealthy,
createRedisClient,
runRedisHealthCheck,
type RedisHealthCheck,
} from './redis-connection.js';
import {
queueClaimToolInputSchema,
queueCompleteToolInputSchema,
queueFailToolInputSchema,
queueGetToolInputSchema,
queueHeartbeatToolInputSchema,
queueListToolInputSchema,
queueReleaseToolInputSchema,
queueStatusToolInputSchema,
} from './mcp-tool-schemas.js';
import {
RedisTaskRepository,
type RedisTaskClient,
} from './task-repository.js';
import { TASK_STATUSES } from './task.js';
import type { Task, TaskStatus } from '@mosaic/types';
export type QueueMcpRepository = Pick<
RedisTaskRepository,
'list' | 'get' | 'claim' | 'heartbeat' | 'release' | 'complete' | 'fail'
>;
export interface QueueMcpSession {
readonly repository: QueueMcpRepository;
readonly checkHealth: () => Promise<RedisHealthCheck>;
readonly close: () => Promise<void>;
}
export interface QueueMcpDependencies {
readonly openSession: () => Promise<QueueMcpSession>;
readonly serverInfo: Implementation;
}
type ToolSchema = z.ZodTypeAny;
interface QueueMcpToolDefinition<TArgs extends z.ZodTypeAny> {
readonly name: string;
readonly description: string;
readonly inputSchema: TArgs;
readonly execute: (
session: QueueMcpSession,
input: z.output<TArgs>,
) => Promise<unknown>;
}
interface ClosableRedisTaskClient extends RedisTaskClient {
ping(): Promise<string>;
quit(): Promise<string>;
}
const DEFAULT_SERVER_INFO: Implementation = {
name: 'mosaic-queue',
version: '0.1.0',
};
const DEFAULT_DEPENDENCIES: QueueMcpDependencies = {
openSession: openRedisMcpSession,
serverInfo: DEFAULT_SERVER_INFO,
};
export const QUEUE_MCP_TOOL_DEFINITIONS = [
{
name: 'queue_list',
description: 'List queue tasks with optional project/mission/status filters',
inputSchema: queueListToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueListToolInputSchema>) => {
const tasks = await session.repository.list(input);
return {
tasks,
};
},
},
{
name: 'queue_get',
description: 'Get a single queue task by taskId',
inputSchema: queueGetToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueGetToolInputSchema>) => {
const task = await session.repository.get(input.taskId);
return {
task,
};
},
},
{
name: 'queue_claim',
description: 'Atomically claim a task for an agent',
inputSchema: queueClaimToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueClaimToolInputSchema>) => {
const task = await session.repository.claim(input.taskId, {
agentId: input.agentId,
ttlSeconds: input.ttlSeconds,
});
return {
task,
};
},
},
{
name: 'queue_heartbeat',
description: 'Refresh claim ownership TTL for a task',
inputSchema: queueHeartbeatToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueHeartbeatToolInputSchema>) => {
const task = await session.repository.heartbeat(input.taskId, {
agentId: input.agentId,
ttlSeconds: input.ttlSeconds,
});
return {
task,
};
},
},
{
name: 'queue_release',
description: 'Release a claimed task back to pending',
inputSchema: queueReleaseToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueReleaseToolInputSchema>) => {
const task = await session.repository.release(input.taskId, {
agentId: input.agentId,
});
return {
task,
};
},
},
{
name: 'queue_complete',
description: 'Mark a claimed task as completed',
inputSchema: queueCompleteToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueCompleteToolInputSchema>) => {
const task = await session.repository.complete(input.taskId, {
agentId: input.agentId,
summary: input.summary,
});
return {
task,
};
},
},
{
name: 'queue_fail',
description: 'Mark a claimed task as failed with a reason',
inputSchema: queueFailToolInputSchema,
execute: async (session: QueueMcpSession, input: z.output<typeof queueFailToolInputSchema>) => {
const task = await session.repository.fail(input.taskId, {
agentId: input.agentId,
reason: input.reason,
});
return {
task,
};
},
},
{
name: 'queue_status',
description: 'Return queue health and task status counters',
inputSchema: queueStatusToolInputSchema,
execute: async (session: QueueMcpSession) => {
const tasks = await session.repository.list({});
const health = await session.checkHealth();
const counts = countStatuses(tasks);
return {
health,
counts,
total: tasks.length,
};
},
},
] as const;
export function buildQueueMcpServer(
dependencyOverrides: Partial<QueueMcpDependencies> = {},
): McpServer {
const dependencies = resolveDependencies(dependencyOverrides);
const server = new McpServer(dependencies.serverInfo);
for (const definition of QUEUE_MCP_TOOL_DEFINITIONS) {
server.registerTool(
definition.name,
{
description: definition.description,
inputSchema: definition.inputSchema,
},
async (args: unknown) => {
return withSession(dependencies, async (session) => {
try {
const parsedArgs = definition.inputSchema.parse(args);
const response = await definition.execute(session, parsedArgs as never);
return toToolResult(response);
} catch (error) {
return toToolErrorResult(error);
}
});
},
);
}
return server;
}
export async function startQueueMcpServer(
dependencyOverrides: Partial<QueueMcpDependencies> = {},
): Promise<McpServer> {
const server = buildQueueMcpServer(dependencyOverrides);
const transport = new StdioServerTransport();
await server.connect(transport);
return server;
}
function resolveDependencies(
overrides: Partial<QueueMcpDependencies>,
): QueueMcpDependencies {
const openSession = overrides.openSession ?? DEFAULT_DEPENDENCIES.openSession;
const serverInfo = overrides.serverInfo ?? DEFAULT_DEPENDENCIES.serverInfo;
return {
openSession: () => openSession(),
serverInfo,
};
}
async function withSession(
dependencies: QueueMcpDependencies,
handler: (session: QueueMcpSession) => Promise<CallToolResult>,
): Promise<CallToolResult> {
const session = await dependencies.openSession();
try {
return await handler(session);
} finally {
await session.close();
}
}
async function openRedisMcpSession(): Promise<QueueMcpSession> {
const redisClient = createRedisClient<ClosableRedisTaskClient>();
try {
await assertRedisHealthy(redisClient);
return {
repository: new RedisTaskRepository({
client: redisClient,
}),
checkHealth: async () => runRedisHealthCheck(redisClient),
close: async () => {
await redisClient.quit();
},
};
} catch (error) {
await redisClient.quit();
throw error;
}
}
function toToolResult(payload: unknown): CallToolResult {
return {
content: [
{
type: 'text',
text: JSON.stringify(payload, null, 2),
},
],
};
}
function toToolErrorResult(error: unknown): CallToolResult {
return {
isError: true,
content: [
{
type: 'text',
text: formatError(error),
},
],
};
}
function formatError(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
function countStatuses(tasks: readonly Task[]): Record<TaskStatus, number> {
const counts = Object.fromEntries(TASK_STATUSES.map((status) => [status, 0])) as Record<
TaskStatus,
number
>;
for (const task of tasks) {
counts[task.status] += 1;
}
return counts;
}

View File

@@ -0,0 +1,44 @@
import { z } from 'zod';
import { TASK_STATUSES } from './task.js';
export const queueListToolInputSchema = z.object({
project: z.string().min(1).optional(),
mission: z.string().min(1).optional(),
status: z.enum(TASK_STATUSES).optional(),
});
export const queueGetToolInputSchema = z.object({
taskId: z.string().min(1),
});
export const queueClaimToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1),
ttlSeconds: z.number().int().positive(),
});
export const queueHeartbeatToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
ttlSeconds: z.number().int().positive().optional(),
});
export const queueReleaseToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
});
export const queueCompleteToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
summary: z.string().min(1).optional(),
});
export const queueFailToolInputSchema = z.object({
taskId: z.string().min(1),
agentId: z.string().min(1).optional(),
reason: z.string().min(1),
});
export const queueStatusToolInputSchema = z.object({});

View File

@@ -0,0 +1,95 @@
import Redis, { type Redis as RedisClient, type RedisOptions } from 'ioredis';
const ERR_MISSING_REDIS_URL =
'Missing required Valkey/Redis connection URL. Set VALKEY_URL or REDIS_URL.';
export interface RedisHealthCheck {
readonly checkedAt: number;
readonly latencyMs: number;
readonly ok: boolean;
readonly response?: string;
readonly error?: string;
}
export interface RedisPingClient {
ping(): Promise<string>;
}
export type RedisClientConstructor<TClient> = new (
url: string,
options?: RedisOptions,
) => TClient;
export interface CreateRedisClientOptions<TClient> {
readonly env?: NodeJS.ProcessEnv;
readonly redisConstructor?: RedisClientConstructor<TClient>;
readonly redisOptions?: RedisOptions;
}
export function resolveRedisUrl(env: NodeJS.ProcessEnv = process.env): string {
const resolvedUrl = env.VALKEY_URL ?? env.REDIS_URL;
if (typeof resolvedUrl !== 'string' || resolvedUrl.trim().length === 0) {
throw new Error(ERR_MISSING_REDIS_URL);
}
return resolvedUrl;
}
export function createRedisClient<TClient = RedisClient>(
options: CreateRedisClientOptions<TClient> = {},
): TClient {
const redisUrl = resolveRedisUrl(options.env);
const RedisCtor =
options.redisConstructor ??
(Redis as unknown as RedisClientConstructor<TClient>);
return new RedisCtor(redisUrl, {
maxRetriesPerRequest: null,
...options.redisOptions,
});
}
export async function runRedisHealthCheck(
client: RedisPingClient,
): Promise<RedisHealthCheck> {
const startedAt = process.hrtime.bigint();
try {
const response = await client.ping();
const elapsedMs = Number((process.hrtime.bigint() - startedAt) / 1_000_000n);
return {
checkedAt: Date.now(),
latencyMs: elapsedMs,
ok: true,
response,
};
} catch (error) {
const elapsedMs = Number((process.hrtime.bigint() - startedAt) / 1_000_000n);
const message =
error instanceof Error ? error.message : 'Unknown redis health check error';
return {
checkedAt: Date.now(),
latencyMs: elapsedMs,
ok: false,
error: message,
};
}
}
export async function assertRedisHealthy(
client: RedisPingClient,
): Promise<RedisHealthCheck> {
const health = await runRedisHealthCheck(client);
if (!health.ok) {
throw new Error(
`Redis health check failed after ${health.latencyMs}ms: ${health.error ?? 'unknown error'}`,
);
}
return health;
}

View File

@@ -0,0 +1,638 @@
import { TASK_LANES, TASK_PRIORITIES, TASK_STATUSES } from './task.js';
import type {
CreateTaskInput,
Task,
TaskLane,
TaskListFilters,
TaskPriority,
TaskStatus,
TaskUpdateInput,
} from '@mosaic/types';
const STATUS_SET = new Set<TaskStatus>(TASK_STATUSES);
const PRIORITY_SET = new Set<TaskPriority>(TASK_PRIORITIES);
const LANE_SET = new Set<TaskLane>(TASK_LANES);
const DEFAULT_KEY_PREFIX = 'mosaic:queue';
const MAX_ATOMIC_RETRIES = 8;
const UPDATE_ALLOWED_STATUS_TRANSITIONS: Readonly<Record<TaskStatus, readonly TaskStatus[]>> = {
pending: ['blocked'],
blocked: ['pending'],
claimed: ['in-progress'],
'in-progress': ['claimed'],
completed: [],
failed: [],
};
interface RepositoryKeys {
readonly taskIds: string;
task(taskId: string): string;
}
export interface RedisTaskClient {
get(key: string): Promise<string | null>;
mget(...keys: string[]): Promise<(string | null)[]>;
set(key: string, value: string, mode?: 'NX' | 'XX'): Promise<'OK' | null>;
smembers(key: string): Promise<string[]>;
sadd(key: string, member: string): Promise<number>;
watch(...keys: string[]): Promise<'OK'>;
unwatch(): Promise<'OK'>;
multi(): RedisTaskTransaction;
}
export interface RedisTaskTransaction {
set(key: string, value: string, mode?: 'NX' | 'XX'): RedisTaskTransaction;
sadd(key: string, member: string): RedisTaskTransaction;
exec(): Promise<readonly (readonly [Error | null, unknown])[] | null>;
}
export interface RedisTaskRepositoryOptions {
readonly client: RedisTaskClient;
readonly keyPrefix?: string;
readonly now?: () => number;
}
export interface ClaimTaskInput {
readonly agentId: string;
readonly ttlSeconds: number;
}
export interface ReleaseTaskInput {
readonly agentId?: string;
}
export interface HeartbeatTaskInput {
readonly agentId?: string;
readonly ttlSeconds?: number;
}
export interface CompleteTaskInput {
readonly agentId?: string;
readonly summary?: string;
}
export interface FailTaskInput {
readonly agentId?: string;
readonly reason: string;
}
export class TaskAlreadyExistsError extends Error {
public constructor(taskId: string) {
super(`Task ${taskId} already exists.`);
this.name = 'TaskAlreadyExistsError';
}
}
export class TaskNotFoundError extends Error {
public constructor(taskId: string) {
super(`Task ${taskId} was not found.`);
this.name = 'TaskNotFoundError';
}
}
export class TaskSerializationError extends Error {
public constructor(taskId: string, message: string) {
super(`Unable to deserialize task ${taskId}: ${message}`);
this.name = 'TaskSerializationError';
}
}
export class TaskTransitionError extends Error {
public constructor(taskId: string, status: TaskStatus, action: string) {
super(`Task ${taskId} cannot transition from ${status} via ${action}.`);
this.name = 'TaskTransitionError';
}
}
export class TaskOwnershipError extends Error {
public constructor(taskId: string, expectedAgentId: string, actualAgentId: string) {
super(
`Task ${taskId} is owned by ${actualAgentId}, not ${expectedAgentId}.`,
);
this.name = 'TaskOwnershipError';
}
}
export class TaskAtomicConflictError extends Error {
public constructor(taskId: string) {
super(`Task ${taskId} could not be updated atomically after multiple retries.`);
this.name = 'TaskAtomicConflictError';
}
}
export class RedisTaskRepository {
private readonly client: RedisTaskClient;
private readonly keys: RepositoryKeys;
private readonly now: () => number;
public constructor(options: RedisTaskRepositoryOptions) {
this.client = options.client;
this.keys = buildRepositoryKeys(options.keyPrefix ?? DEFAULT_KEY_PREFIX);
this.now = options.now ?? Date.now;
}
public async create(input: CreateTaskInput): Promise<Task> {
const timestamp = this.now();
const task: Task = {
id: input.taskId,
project: input.project,
mission: input.mission,
taskId: input.taskId,
title: input.title,
description: input.description,
status: 'pending',
priority: input.priority ?? 'medium',
dependencies: [...(input.dependencies ?? [])],
lane: input.lane ?? 'any',
retryCount: 0,
metadata: input.metadata,
createdAt: timestamp,
updatedAt: timestamp,
};
const taskKey = this.keys.task(task.taskId);
const serializedTask = JSON.stringify(task);
for (let attempt = 0; attempt < MAX_ATOMIC_RETRIES; attempt += 1) {
await this.client.watch(taskKey);
try {
const transaction = this.client.multi();
transaction.set(taskKey, serializedTask, 'NX');
transaction.sadd(this.keys.taskIds, task.taskId);
const execResult = await transaction.exec();
if (execResult === null) {
continue;
}
const setResult = execResult[0];
if (setResult === undefined) {
throw new TaskAtomicConflictError(task.taskId);
}
const [setError, setReply] = setResult;
if (setError !== null) {
throw setError;
}
if (setReply !== 'OK') {
throw new TaskAlreadyExistsError(task.taskId);
}
const saddResult = execResult[1];
if (saddResult !== undefined && saddResult[0] !== null) {
throw saddResult[0];
}
return task;
} finally {
await this.client.unwatch();
}
}
throw new TaskAlreadyExistsError(task.taskId);
}
public async get(taskId: string): Promise<Task | null> {
const raw = await this.client.get(this.keys.task(taskId));
if (raw === null) {
return null;
}
return deserializeTask(taskId, raw);
}
public async list(filters: TaskListFilters = {}): Promise<Task[]> {
const taskIds = await this.client.smembers(this.keys.taskIds);
if (taskIds.length === 0) {
return [];
}
const taskKeys = taskIds.map((taskId) => this.keys.task(taskId));
const records = await this.client.mget(...taskKeys);
const tasks: Task[] = [];
for (const [index, rawTask] of records.entries()) {
if (rawTask === null || rawTask === undefined) {
continue;
}
const taskId = taskIds[index];
if (taskId === undefined) {
continue;
}
tasks.push(deserializeTask(taskId, rawTask));
}
return tasks
.filter((task) =>
matchesFilters(task, {
project: filters.project,
mission: filters.mission,
status: filters.status,
}),
)
.sort((left, right) => left.createdAt - right.createdAt);
}
public async update(taskId: string, patch: TaskUpdateInput): Promise<Task> {
return this.mutateTaskAtomically(taskId, (existing, now) => {
assertUpdatePatchIsAllowed(taskId, existing, patch);
return {
...existing,
...patch,
dependencies:
patch.dependencies === undefined ? existing.dependencies : [...patch.dependencies],
updatedAt: now,
};
});
}
public async claim(taskId: string, input: ClaimTaskInput): Promise<Task> {
if (input.ttlSeconds <= 0) {
throw new Error(`Task ${taskId} claim ttl must be greater than 0 seconds.`);
}
return this.mutateTaskAtomically(taskId, (existing, now) => {
if (!canClaimTask(existing, now)) {
throw new TaskTransitionError(taskId, existing.status, 'claim');
}
const base = withoutCompletionAndFailureFields(withoutClaimFields(existing));
return {
...base,
status: 'claimed',
claimedBy: input.agentId,
claimedAt: now,
claimTTL: input.ttlSeconds,
updatedAt: now,
};
});
}
public async release(taskId: string, input: ReleaseTaskInput = {}): Promise<Task> {
return this.mutateTaskAtomically(taskId, (existing, now) => {
if (!isClaimedLikeStatus(existing.status)) {
throw new TaskTransitionError(taskId, existing.status, 'release');
}
assertTaskOwnership(taskId, existing, input.agentId);
const base = withoutClaimFields(existing);
return {
...base,
status: 'pending',
updatedAt: now,
};
});
}
public async heartbeat(
taskId: string,
input: HeartbeatTaskInput = {},
): Promise<Task> {
return this.mutateTaskAtomically(taskId, (existing, now) => {
if (!isClaimedLikeStatus(existing.status)) {
throw new TaskTransitionError(taskId, existing.status, 'heartbeat');
}
if (isClaimExpired(existing, now)) {
throw new TaskTransitionError(taskId, existing.status, 'heartbeat');
}
assertTaskOwnership(taskId, existing, input.agentId);
const ttl = input.ttlSeconds ?? existing.claimTTL;
if (ttl === undefined || ttl <= 0) {
throw new TaskTransitionError(taskId, existing.status, 'heartbeat');
}
return {
...existing,
claimedAt: now,
claimTTL: ttl,
updatedAt: now,
};
});
}
public async complete(
taskId: string,
input: CompleteTaskInput = {},
): Promise<Task> {
return this.mutateTaskAtomically(taskId, (existing, now) => {
if (!isClaimedLikeStatus(existing.status)) {
throw new TaskTransitionError(taskId, existing.status, 'complete');
}
assertTaskOwnership(taskId, existing, input.agentId);
const base = withoutCompletionAndFailureFields(withoutClaimFields(existing));
return {
...base,
status: 'completed',
completedAt: now,
...(input.summary === undefined ? {} : { completionSummary: input.summary }),
updatedAt: now,
};
});
}
public async fail(taskId: string, input: FailTaskInput): Promise<Task> {
return this.mutateTaskAtomically(taskId, (existing, now) => {
if (!isClaimedLikeStatus(existing.status)) {
throw new TaskTransitionError(taskId, existing.status, 'fail');
}
assertTaskOwnership(taskId, existing, input.agentId);
const base = withoutCompletionAndFailureFields(withoutClaimFields(existing));
return {
...base,
status: 'failed',
failedAt: now,
failureReason: input.reason,
retryCount: existing.retryCount + 1,
updatedAt: now,
};
});
}
private async mutateTaskAtomically(
taskId: string,
mutation: (existing: Task, now: number) => Task,
): Promise<Task> {
const taskKey = this.keys.task(taskId);
for (let attempt = 0; attempt < MAX_ATOMIC_RETRIES; attempt += 1) {
await this.client.watch(taskKey);
try {
const raw = await this.client.get(taskKey);
if (raw === null) {
throw new TaskNotFoundError(taskId);
}
const existing = deserializeTask(taskId, raw);
const updated = mutation(existing, this.now());
const transaction = this.client.multi();
transaction.set(taskKey, JSON.stringify(updated), 'XX');
transaction.sadd(this.keys.taskIds, taskId);
const execResult = await transaction.exec();
if (execResult === null) {
continue;
}
const setResult = execResult[0];
if (setResult === undefined) {
throw new TaskAtomicConflictError(taskId);
}
const [setError, setReply] = setResult;
if (setError !== null) {
throw setError;
}
if (setReply !== 'OK') {
throw new TaskNotFoundError(taskId);
}
const saddResult = execResult[1];
if (saddResult !== undefined && saddResult[0] !== null) {
throw saddResult[0];
}
return updated;
} finally {
await this.client.unwatch();
}
}
throw new TaskAtomicConflictError(taskId);
}
}
function matchesFilters(task: Task, filters: TaskListFilters): boolean {
if (filters.project !== undefined && task.project !== filters.project) {
return false;
}
if (filters.mission !== undefined && task.mission !== filters.mission) {
return false;
}
if (filters.status !== undefined && task.status !== filters.status) {
return false;
}
return true;
}
function assertUpdatePatchIsAllowed(taskId: string, task: Task, patch: TaskUpdateInput): void {
if (patch.status !== undefined && !canTransitionStatusViaUpdate(task.status, patch.status)) {
throw new TaskTransitionError(taskId, task.status, 'update');
}
if (
patch.claimedBy !== undefined ||
patch.claimedAt !== undefined ||
patch.claimTTL !== undefined ||
patch.completedAt !== undefined ||
patch.failedAt !== undefined ||
patch.failureReason !== undefined ||
patch.completionSummary !== undefined ||
patch.retryCount !== undefined
) {
throw new TaskTransitionError(taskId, task.status, 'update');
}
}
function canTransitionStatusViaUpdate(from: TaskStatus, to: TaskStatus): boolean {
if (from === to) {
return true;
}
return UPDATE_ALLOWED_STATUS_TRANSITIONS[from].includes(to);
}
function canClaimTask(task: Task, now: number): boolean {
if (task.status === 'pending') {
return true;
}
if (!isClaimedLikeStatus(task.status)) {
return false;
}
return isClaimExpired(task, now);
}
function isClaimedLikeStatus(status: TaskStatus): boolean {
return status === 'claimed' || status === 'in-progress';
}
function isClaimExpired(task: Task, now: number): boolean {
if (task.claimedAt === undefined || task.claimTTL === undefined) {
return false;
}
return task.claimedAt + task.claimTTL * 1000 <= now;
}
function assertTaskOwnership(
taskId: string,
task: Task,
expectedAgentId: string | undefined,
): void {
if (expectedAgentId === undefined || task.claimedBy === undefined) {
return;
}
if (task.claimedBy !== expectedAgentId) {
throw new TaskOwnershipError(taskId, expectedAgentId, task.claimedBy);
}
}
type TaskWithoutClaimFields = Omit<Task, 'claimedBy' | 'claimedAt' | 'claimTTL'>;
type TaskWithoutCompletionAndFailureFields = Omit<
Task,
'completedAt' | 'failedAt' | 'failureReason' | 'completionSummary'
>;
function withoutClaimFields(task: Task): TaskWithoutClaimFields {
const {
claimedBy: _claimedBy,
claimedAt: _claimedAt,
claimTTL: _claimTTL,
...taskWithoutClaimFields
} = task;
return taskWithoutClaimFields;
}
function withoutCompletionAndFailureFields(
task: TaskWithoutClaimFields,
): TaskWithoutCompletionAndFailureFields {
const {
completedAt: _completedAt,
failedAt: _failedAt,
failureReason: _failureReason,
completionSummary: _completionSummary,
...taskWithoutCompletionAndFailureFields
} = task;
return taskWithoutCompletionAndFailureFields;
}
function deserializeTask(taskId: string, raw: string): Task {
let parsed: unknown;
try {
parsed = JSON.parse(raw);
} catch (error) {
throw new TaskSerializationError(
taskId,
error instanceof Error ? error.message : 'invalid JSON',
);
}
if (!isRecord(parsed)) {
throw new TaskSerializationError(taskId, 'task payload is not an object');
}
const requiredStringKeys = ['id', 'project', 'mission', 'taskId', 'title'] as const;
const requiredNumberKeys = ['retryCount', 'createdAt', 'updatedAt'] as const;
for (const key of requiredStringKeys) {
if (typeof parsed[key] !== 'string') {
throw new TaskSerializationError(taskId, `missing string field "${key}"`);
}
}
for (const key of requiredNumberKeys) {
if (typeof parsed[key] !== 'number') {
throw new TaskSerializationError(taskId, `missing numeric field "${key}"`);
}
}
if (!STATUS_SET.has(parsed.status as TaskStatus)) {
throw new TaskSerializationError(taskId, 'invalid status value');
}
if (!PRIORITY_SET.has(parsed.priority as TaskPriority)) {
throw new TaskSerializationError(taskId, 'invalid priority value');
}
if (!LANE_SET.has(parsed.lane as TaskLane)) {
throw new TaskSerializationError(taskId, 'invalid lane value');
}
if (!Array.isArray(parsed.dependencies)) {
throw new TaskSerializationError(taskId, 'dependencies must be an array');
}
if (!parsed.dependencies.every((dependency): dependency is string => typeof dependency === 'string')) {
throw new TaskSerializationError(taskId, 'dependencies must contain only strings');
}
return {
id: parsed.id as string,
project: parsed.project as string,
mission: parsed.mission as string,
taskId: parsed.taskId as string,
title: parsed.title as string,
status: parsed.status as TaskStatus,
priority: parsed.priority as TaskPriority,
dependencies: parsed.dependencies as string[],
lane: parsed.lane as TaskLane,
retryCount: parsed.retryCount as number,
createdAt: parsed.createdAt as number,
updatedAt: parsed.updatedAt as number,
...(typeof parsed.description === 'string'
? { description: parsed.description }
: {}),
...(typeof parsed.claimedBy === 'string' ? { claimedBy: parsed.claimedBy } : {}),
...(typeof parsed.claimedAt === 'number' ? { claimedAt: parsed.claimedAt } : {}),
...(typeof parsed.claimTTL === 'number' ? { claimTTL: parsed.claimTTL } : {}),
...(typeof parsed.completedAt === 'number'
? { completedAt: parsed.completedAt }
: {}),
...(typeof parsed.failedAt === 'number' ? { failedAt: parsed.failedAt } : {}),
...(typeof parsed.failureReason === 'string'
? { failureReason: parsed.failureReason }
: {}),
...(typeof parsed.completionSummary === 'string'
? { completionSummary: parsed.completionSummary }
: {}),
...(isRecord(parsed.metadata) ? { metadata: parsed.metadata } : {}),
};
}
function buildRepositoryKeys(keyPrefix: string): RepositoryKeys {
return {
taskIds: `${keyPrefix}:task-ids`,
task: (taskId: string) => `${keyPrefix}:task:${taskId}`,
};
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null && !Array.isArray(value);
}

View File

@@ -0,0 +1,12 @@
export const TASK_STATUSES = [
'pending',
'claimed',
'in-progress',
'completed',
'failed',
'blocked',
] as const;
export const TASK_PRIORITIES = ['critical', 'high', 'medium', 'low'] as const;
export const TASK_LANES = ['planning', 'coding', 'any'] as const;