import { describe, expect, it } from 'vitest'; import { RedisTaskRepository, TaskAlreadyExistsError, TaskOwnershipError, TaskTransitionError, type RedisTaskClient, type RedisTaskTransaction, } from '../src/task-repository.js'; type QueuedOperation = | { readonly type: 'set'; readonly key: string; readonly value: string; readonly mode?: 'NX' | 'XX'; } | { readonly type: 'sadd'; readonly key: string; readonly member: string; }; class InMemoryRedisBackend { public readonly kv = new Map(); public readonly sets = new Map>(); public readonly revisions = new Map(); public getRevision(key: string): number { return this.revisions.get(key) ?? 0; } public bumpRevision(key: string): void { this.revisions.set(key, this.getRevision(key) + 1); } } class InMemoryRedisTransaction implements RedisTaskTransaction { private readonly operations: QueuedOperation[] = []; public constructor( private readonly backend: InMemoryRedisBackend, private readonly watchedRevisions: ReadonlyMap, ) {} public set(key: string, value: string, mode?: 'NX' | 'XX'): RedisTaskTransaction { this.operations.push({ type: 'set', key, value, mode, }); return this; } public sadd(key: string, member: string): RedisTaskTransaction { this.operations.push({ type: 'sadd', key, member, }); return this; } public exec(): Promise { for (const [key, revision] of this.watchedRevisions.entries()) { if (this.backend.getRevision(key) !== revision) { return Promise.resolve(null); } } const results: (readonly [Error | null, unknown])[] = []; for (const operation of this.operations) { if (operation.type === 'set') { const exists = this.backend.kv.has(operation.key); if (operation.mode === 'NX' && exists) { results.push([null, null]); continue; } if (operation.mode === 'XX' && !exists) { results.push([null, null]); continue; } this.backend.kv.set(operation.key, operation.value); this.backend.bumpRevision(operation.key); results.push([null, 'OK']); continue; } const set = this.backend.sets.get(operation.key) ?? new Set(); const before = set.size; set.add(operation.member); this.backend.sets.set(operation.key, set); this.backend.bumpRevision(operation.key); results.push([null, set.size === before ? 0 : 1]); } return Promise.resolve(results); } } class InMemoryAtomicRedisClient implements RedisTaskClient { private watchedRevisions = new Map(); public constructor(private readonly backend: InMemoryRedisBackend) {} public get(key: string): Promise { return Promise.resolve(this.backend.kv.get(key) ?? null); } public mget(...keys: string[]): Promise<(string | null)[]> { return Promise.resolve(keys.map((key) => this.backend.kv.get(key) ?? null)); } public set( key: string, value: string, mode?: 'NX' | 'XX', ): Promise<'OK' | null> { const exists = this.backend.kv.has(key); if (mode === 'NX' && exists) { return Promise.resolve(null); } if (mode === 'XX' && !exists) { return Promise.resolve(null); } this.backend.kv.set(key, value); this.backend.bumpRevision(key); return Promise.resolve('OK'); } public smembers(key: string): Promise { return Promise.resolve([...(this.backend.sets.get(key) ?? new Set())]); } public sadd(key: string, member: string): Promise { const values = this.backend.sets.get(key) ?? new Set(); const before = values.size; values.add(member); this.backend.sets.set(key, values); this.backend.bumpRevision(key); return Promise.resolve(values.size === before ? 0 : 1); } public watch(...keys: string[]): Promise<'OK'> { this.watchedRevisions = new Map( keys.map((key) => [key, this.backend.getRevision(key)]), ); return Promise.resolve('OK'); } public unwatch(): Promise<'OK'> { this.watchedRevisions.clear(); return Promise.resolve('OK'); } public multi(): RedisTaskTransaction { const watchedSnapshot = new Map(this.watchedRevisions); this.watchedRevisions.clear(); return new InMemoryRedisTransaction(this.backend, watchedSnapshot); } } class StrictAtomicRedisClient extends InMemoryAtomicRedisClient { public override set( key: string, value: string, mode?: 'NX' | 'XX', ): Promise<'OK' | null> { void key; void value; void mode; throw new Error('Direct set() is not allowed in strict atomic tests.'); } public override sadd(key: string, member: string): Promise { void key; void member; throw new Error('Direct sadd() is not allowed in strict atomic tests.'); } } function createRepositoryPair(now: () => number): [RedisTaskRepository, RedisTaskRepository] { const backend = new InMemoryRedisBackend(); return [ new RedisTaskRepository({ client: new InMemoryAtomicRedisClient(backend), now, }), new RedisTaskRepository({ client: new InMemoryAtomicRedisClient(backend), now, }), ]; } function createStrictRepositoryPair( now: () => number, ): [RedisTaskRepository, RedisTaskRepository] { const backend = new InMemoryRedisBackend(); return [ new RedisTaskRepository({ client: new StrictAtomicRedisClient(backend), now, }), new RedisTaskRepository({ client: new StrictAtomicRedisClient(backend), now, }), ]; } describe('RedisTaskRepository atomic transitions', () => { it('creates atomically under concurrent create race', async () => { const [repositoryA, repositoryB] = createStrictRepositoryPair( () => 1_700_000_000_000, ); const [createA, createB] = await Promise.allSettled([ repositoryA.create({ project: 'queue', mission: 'phase1', taskId: 'MQ-004-CREATE', title: 'create race', }), repositoryB.create({ project: 'queue', mission: 'phase1', taskId: 'MQ-004-CREATE', title: 'create race duplicate', }), ]); const fulfilled = [createA, createB].filter( (result) => result.status === 'fulfilled', ); const rejected = [createA, createB].filter( (result) => result.status === 'rejected', ); expect(fulfilled).toHaveLength(1); expect(rejected).toHaveLength(1); expect(rejected[0]?.reason).toBeInstanceOf(TaskAlreadyExistsError); }); it('claims a pending task once and blocks concurrent double-claim', async () => { let timestamp = 1_700_000_000_000; const now = (): number => timestamp; const [repositoryA, repositoryB] = createRepositoryPair(now); await repositoryA.create({ project: 'queue', mission: 'phase1', taskId: 'MQ-004', title: 'Atomic claim', }); const [claimA, claimB] = await Promise.allSettled([ repositoryA.claim('MQ-004', { agentId: 'agent-a', ttlSeconds: 60 }), repositoryB.claim('MQ-004', { agentId: 'agent-b', ttlSeconds: 60 }), ]); const fulfilled = [claimA, claimB].filter((result) => result.status === 'fulfilled'); const rejected = [claimA, claimB].filter((result) => result.status === 'rejected'); expect(fulfilled).toHaveLength(1); expect(rejected).toHaveLength(1); }); it('allows claim takeover after TTL expiry', async () => { let timestamp = 1_700_000_000_000; const now = (): number => timestamp; const [repositoryA, repositoryB] = createRepositoryPair(now); await repositoryA.create({ project: 'queue', mission: 'phase1', taskId: 'MQ-004-EXP', title: 'TTL expiry', }); await repositoryA.claim('MQ-004-EXP', { agentId: 'agent-a', ttlSeconds: 1, }); timestamp += 2_000; const takeover = await repositoryB.claim('MQ-004-EXP', { agentId: 'agent-b', ttlSeconds: 60, }); expect(takeover.claimedBy).toBe('agent-b'); }); it('releases a claimed task back to pending', async () => { const [repository] = createRepositoryPair(() => 1_700_000_000_000); await repository.create({ project: 'queue', mission: 'phase1', taskId: 'MQ-004-REL', title: 'Release test', }); await repository.claim('MQ-004-REL', { agentId: 'agent-a', ttlSeconds: 60, }); const released = await repository.release('MQ-004-REL', { agentId: 'agent-a', }); expect(released.status).toBe('pending'); expect(released.claimedBy).toBeUndefined(); expect(released.claimedAt).toBeUndefined(); }); it('heartbeats, completes, and fails with valid transitions', async () => { let timestamp = 1_700_000_000_000; const now = (): number => timestamp; const [repository] = createRepositoryPair(now); await repository.create({ project: 'queue', mission: 'phase1', taskId: 'MQ-004-HCF', title: 'Transition test', }); await repository.claim('MQ-004-HCF', { agentId: 'agent-a', ttlSeconds: 60, }); timestamp += 1_000; const heartbeat = await repository.heartbeat('MQ-004-HCF', { agentId: 'agent-a', ttlSeconds: 120, }); expect(heartbeat.claimTTL).toBe(120); expect(heartbeat.claimedAt).toBe(1_700_000_001_000); const completed = await repository.complete('MQ-004-HCF', { agentId: 'agent-a', summary: 'done', }); expect(completed.status).toBe('completed'); expect(completed.completionSummary).toBe('done'); await repository.create({ project: 'queue', mission: 'phase1', taskId: 'MQ-004-FAIL', title: 'Failure test', }); await repository.claim('MQ-004-FAIL', { agentId: 'agent-a', ttlSeconds: 60, }); const failed = await repository.fail('MQ-004-FAIL', { agentId: 'agent-a', reason: 'boom', }); expect(failed.status).toBe('failed'); expect(failed.failureReason).toBe('boom'); expect(failed.retryCount).toBe(1); }); it('rejects invalid transitions', async () => { const [repository] = createRepositoryPair(() => 1_700_000_000_000); await repository.create({ project: 'queue', mission: 'phase1', taskId: 'MQ-004-INV', title: 'Invalid transitions', }); await expect( repository.complete('MQ-004-INV', { agentId: 'agent-a', }), ).rejects.toBeInstanceOf(TaskTransitionError); }); it('enforces claim ownership for release and complete', async () => { const [repository] = createRepositoryPair(() => 1_700_000_000_000); await repository.create({ project: 'queue', mission: 'phase1', taskId: 'MQ-004-OWN', title: 'Ownership checks', }); await repository.claim('MQ-004-OWN', { agentId: 'agent-a', ttlSeconds: 60, }); await expect( repository.release('MQ-004-OWN', { agentId: 'agent-b', }), ).rejects.toBeInstanceOf(TaskOwnershipError); await expect( repository.complete('MQ-004-OWN', { agentId: 'agent-b', }), ).rejects.toBeInstanceOf(TaskOwnershipError); }); it('merges concurrent non-conflicting update patches atomically', async () => { const [repositoryA, repositoryB] = createRepositoryPair(() => 1_700_000_000_000); await repositoryA.create({ project: 'queue', mission: 'phase1', taskId: 'MQ-004-UPD', title: 'Original title', description: 'Original description', }); await Promise.all([ repositoryA.update('MQ-004-UPD', { title: 'Updated title', }), repositoryB.update('MQ-004-UPD', { description: 'Updated description', }), ]); const latest = await repositoryA.get('MQ-004-UPD'); expect(latest).not.toBeNull(); expect(latest?.title).toBe('Updated title'); expect(latest?.description).toBe('Updated description'); }); });