Files
mosaic/packages/queue/tests/task-atomic.test.ts
2026-03-06 22:44:05 +00:00

460 lines
12 KiB
TypeScript

import { describe, expect, it } from 'vitest';
import {
RedisTaskRepository,
TaskAlreadyExistsError,
TaskOwnershipError,
TaskTransitionError,
type RedisTaskClient,
type RedisTaskTransaction,
} from '../src/task-repository.js';
type QueuedOperation =
| {
readonly type: 'set';
readonly key: string;
readonly value: string;
readonly mode?: 'NX' | 'XX';
}
| {
readonly type: 'sadd';
readonly key: string;
readonly member: string;
};
class InMemoryRedisBackend {
public readonly kv = new Map<string, string>();
public readonly sets = new Map<string, Set<string>>();
public readonly revisions = new Map<string, number>();
public getRevision(key: string): number {
return this.revisions.get(key) ?? 0;
}
public bumpRevision(key: string): void {
this.revisions.set(key, this.getRevision(key) + 1);
}
}
class InMemoryRedisTransaction implements RedisTaskTransaction {
private readonly operations: QueuedOperation[] = [];
public constructor(
private readonly backend: InMemoryRedisBackend,
private readonly watchedRevisions: ReadonlyMap<string, number>,
) {}
public set(key: string, value: string, mode?: 'NX' | 'XX'): RedisTaskTransaction {
this.operations.push({
type: 'set',
key,
value,
mode,
});
return this;
}
public sadd(key: string, member: string): RedisTaskTransaction {
this.operations.push({
type: 'sadd',
key,
member,
});
return this;
}
public exec(): Promise<readonly (readonly [Error | null, unknown])[] | null> {
for (const [key, revision] of this.watchedRevisions.entries()) {
if (this.backend.getRevision(key) !== revision) {
return Promise.resolve(null);
}
}
const results: (readonly [Error | null, unknown])[] = [];
for (const operation of this.operations) {
if (operation.type === 'set') {
const exists = this.backend.kv.has(operation.key);
if (operation.mode === 'NX' && exists) {
results.push([null, null]);
continue;
}
if (operation.mode === 'XX' && !exists) {
results.push([null, null]);
continue;
}
this.backend.kv.set(operation.key, operation.value);
this.backend.bumpRevision(operation.key);
results.push([null, 'OK']);
continue;
}
const set = this.backend.sets.get(operation.key) ?? new Set<string>();
const before = set.size;
set.add(operation.member);
this.backend.sets.set(operation.key, set);
this.backend.bumpRevision(operation.key);
results.push([null, set.size === before ? 0 : 1]);
}
return Promise.resolve(results);
}
}
class InMemoryAtomicRedisClient implements RedisTaskClient {
private watchedRevisions = new Map<string, number>();
public constructor(private readonly backend: InMemoryRedisBackend) {}
public get(key: string): Promise<string | null> {
return Promise.resolve(this.backend.kv.get(key) ?? null);
}
public mget(...keys: string[]): Promise<(string | null)[]> {
return Promise.resolve(keys.map((key) => this.backend.kv.get(key) ?? null));
}
public set(
key: string,
value: string,
mode?: 'NX' | 'XX',
): Promise<'OK' | null> {
const exists = this.backend.kv.has(key);
if (mode === 'NX' && exists) {
return Promise.resolve(null);
}
if (mode === 'XX' && !exists) {
return Promise.resolve(null);
}
this.backend.kv.set(key, value);
this.backend.bumpRevision(key);
return Promise.resolve('OK');
}
public smembers(key: string): Promise<string[]> {
return Promise.resolve([...(this.backend.sets.get(key) ?? new Set<string>())]);
}
public sadd(key: string, member: string): Promise<number> {
const values = this.backend.sets.get(key) ?? new Set<string>();
const before = values.size;
values.add(member);
this.backend.sets.set(key, values);
this.backend.bumpRevision(key);
return Promise.resolve(values.size === before ? 0 : 1);
}
public watch(...keys: string[]): Promise<'OK'> {
this.watchedRevisions = new Map(
keys.map((key) => [key, this.backend.getRevision(key)]),
);
return Promise.resolve('OK');
}
public unwatch(): Promise<'OK'> {
this.watchedRevisions.clear();
return Promise.resolve('OK');
}
public multi(): RedisTaskTransaction {
const watchedSnapshot = new Map(this.watchedRevisions);
this.watchedRevisions.clear();
return new InMemoryRedisTransaction(this.backend, watchedSnapshot);
}
}
class StrictAtomicRedisClient extends InMemoryAtomicRedisClient {
public override set(
key: string,
value: string,
mode?: 'NX' | 'XX',
): Promise<'OK' | null> {
void key;
void value;
void mode;
throw new Error('Direct set() is not allowed in strict atomic tests.');
}
public override sadd(key: string, member: string): Promise<number> {
void key;
void member;
throw new Error('Direct sadd() is not allowed in strict atomic tests.');
}
}
function createRepositoryPair(now: () => number): [RedisTaskRepository, RedisTaskRepository] {
const backend = new InMemoryRedisBackend();
return [
new RedisTaskRepository({
client: new InMemoryAtomicRedisClient(backend),
now,
}),
new RedisTaskRepository({
client: new InMemoryAtomicRedisClient(backend),
now,
}),
];
}
function createStrictRepositoryPair(
now: () => number,
): [RedisTaskRepository, RedisTaskRepository] {
const backend = new InMemoryRedisBackend();
return [
new RedisTaskRepository({
client: new StrictAtomicRedisClient(backend),
now,
}),
new RedisTaskRepository({
client: new StrictAtomicRedisClient(backend),
now,
}),
];
}
describe('RedisTaskRepository atomic transitions', () => {
it('creates atomically under concurrent create race', async () => {
const [repositoryA, repositoryB] = createStrictRepositoryPair(
() => 1_700_000_000_000,
);
const [createA, createB] = await Promise.allSettled([
repositoryA.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-CREATE',
title: 'create race',
}),
repositoryB.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-CREATE',
title: 'create race duplicate',
}),
]);
const fulfilled = [createA, createB].filter(
(result) => result.status === 'fulfilled',
);
const rejected = [createA, createB].filter(
(result) => result.status === 'rejected',
);
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
expect(rejected[0]?.reason).toBeInstanceOf(TaskAlreadyExistsError);
});
it('claims a pending task once and blocks concurrent double-claim', async () => {
let timestamp = 1_700_000_000_000;
const now = (): number => timestamp;
const [repositoryA, repositoryB] = createRepositoryPair(now);
await repositoryA.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004',
title: 'Atomic claim',
});
const [claimA, claimB] = await Promise.allSettled([
repositoryA.claim('MQ-004', { agentId: 'agent-a', ttlSeconds: 60 }),
repositoryB.claim('MQ-004', { agentId: 'agent-b', ttlSeconds: 60 }),
]);
const fulfilled = [claimA, claimB].filter((result) => result.status === 'fulfilled');
const rejected = [claimA, claimB].filter((result) => result.status === 'rejected');
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
});
it('allows claim takeover after TTL expiry', async () => {
let timestamp = 1_700_000_000_000;
const now = (): number => timestamp;
const [repositoryA, repositoryB] = createRepositoryPair(now);
await repositoryA.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-EXP',
title: 'TTL expiry',
});
await repositoryA.claim('MQ-004-EXP', {
agentId: 'agent-a',
ttlSeconds: 1,
});
timestamp += 2_000;
const takeover = await repositoryB.claim('MQ-004-EXP', {
agentId: 'agent-b',
ttlSeconds: 60,
});
expect(takeover.claimedBy).toBe('agent-b');
});
it('releases a claimed task back to pending', async () => {
const [repository] = createRepositoryPair(() => 1_700_000_000_000);
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-REL',
title: 'Release test',
});
await repository.claim('MQ-004-REL', {
agentId: 'agent-a',
ttlSeconds: 60,
});
const released = await repository.release('MQ-004-REL', {
agentId: 'agent-a',
});
expect(released.status).toBe('pending');
expect(released.claimedBy).toBeUndefined();
expect(released.claimedAt).toBeUndefined();
});
it('heartbeats, completes, and fails with valid transitions', async () => {
let timestamp = 1_700_000_000_000;
const now = (): number => timestamp;
const [repository] = createRepositoryPair(now);
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-HCF',
title: 'Transition test',
});
await repository.claim('MQ-004-HCF', {
agentId: 'agent-a',
ttlSeconds: 60,
});
timestamp += 1_000;
const heartbeat = await repository.heartbeat('MQ-004-HCF', {
agentId: 'agent-a',
ttlSeconds: 120,
});
expect(heartbeat.claimTTL).toBe(120);
expect(heartbeat.claimedAt).toBe(1_700_000_001_000);
const completed = await repository.complete('MQ-004-HCF', {
agentId: 'agent-a',
summary: 'done',
});
expect(completed.status).toBe('completed');
expect(completed.completionSummary).toBe('done');
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-FAIL',
title: 'Failure test',
});
await repository.claim('MQ-004-FAIL', {
agentId: 'agent-a',
ttlSeconds: 60,
});
const failed = await repository.fail('MQ-004-FAIL', {
agentId: 'agent-a',
reason: 'boom',
});
expect(failed.status).toBe('failed');
expect(failed.failureReason).toBe('boom');
expect(failed.retryCount).toBe(1);
});
it('rejects invalid transitions', async () => {
const [repository] = createRepositoryPair(() => 1_700_000_000_000);
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-INV',
title: 'Invalid transitions',
});
await expect(
repository.complete('MQ-004-INV', {
agentId: 'agent-a',
}),
).rejects.toBeInstanceOf(TaskTransitionError);
});
it('enforces claim ownership for release and complete', async () => {
const [repository] = createRepositoryPair(() => 1_700_000_000_000);
await repository.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-OWN',
title: 'Ownership checks',
});
await repository.claim('MQ-004-OWN', {
agentId: 'agent-a',
ttlSeconds: 60,
});
await expect(
repository.release('MQ-004-OWN', {
agentId: 'agent-b',
}),
).rejects.toBeInstanceOf(TaskOwnershipError);
await expect(
repository.complete('MQ-004-OWN', {
agentId: 'agent-b',
}),
).rejects.toBeInstanceOf(TaskOwnershipError);
});
it('merges concurrent non-conflicting update patches atomically', async () => {
const [repositoryA, repositoryB] = createRepositoryPair(() => 1_700_000_000_000);
await repositoryA.create({
project: 'queue',
mission: 'phase1',
taskId: 'MQ-004-UPD',
title: 'Original title',
description: 'Original description',
});
await Promise.all([
repositoryA.update('MQ-004-UPD', {
title: 'Updated title',
}),
repositoryB.update('MQ-004-UPD', {
description: 'Updated description',
}),
]);
const latest = await repositoryA.get('MQ-004-UPD');
expect(latest).not.toBeNull();
expect(latest?.title).toBe('Updated title');
expect(latest?.description).toBe('Updated description');
});
});