460 lines
12 KiB
TypeScript
460 lines
12 KiB
TypeScript
import { describe, expect, it } from 'vitest';
|
|
|
|
import {
|
|
RedisTaskRepository,
|
|
TaskAlreadyExistsError,
|
|
TaskOwnershipError,
|
|
TaskTransitionError,
|
|
type RedisTaskClient,
|
|
type RedisTaskTransaction,
|
|
} from '../src/task-repository.js';
|
|
|
|
type QueuedOperation =
|
|
| {
|
|
readonly type: 'set';
|
|
readonly key: string;
|
|
readonly value: string;
|
|
readonly mode?: 'NX' | 'XX';
|
|
}
|
|
| {
|
|
readonly type: 'sadd';
|
|
readonly key: string;
|
|
readonly member: string;
|
|
};
|
|
|
|
class InMemoryRedisBackend {
|
|
public readonly kv = new Map<string, string>();
|
|
public readonly sets = new Map<string, Set<string>>();
|
|
public readonly revisions = new Map<string, number>();
|
|
|
|
public getRevision(key: string): number {
|
|
return this.revisions.get(key) ?? 0;
|
|
}
|
|
|
|
public bumpRevision(key: string): void {
|
|
this.revisions.set(key, this.getRevision(key) + 1);
|
|
}
|
|
}
|
|
|
|
class InMemoryRedisTransaction implements RedisTaskTransaction {
|
|
private readonly operations: QueuedOperation[] = [];
|
|
|
|
public constructor(
|
|
private readonly backend: InMemoryRedisBackend,
|
|
private readonly watchedRevisions: ReadonlyMap<string, number>,
|
|
) {}
|
|
|
|
public set(key: string, value: string, mode?: 'NX' | 'XX'): RedisTaskTransaction {
|
|
this.operations.push({
|
|
type: 'set',
|
|
key,
|
|
value,
|
|
mode,
|
|
});
|
|
return this;
|
|
}
|
|
|
|
public sadd(key: string, member: string): RedisTaskTransaction {
|
|
this.operations.push({
|
|
type: 'sadd',
|
|
key,
|
|
member,
|
|
});
|
|
return this;
|
|
}
|
|
|
|
public exec(): Promise<readonly (readonly [Error | null, unknown])[] | null> {
|
|
for (const [key, revision] of this.watchedRevisions.entries()) {
|
|
if (this.backend.getRevision(key) !== revision) {
|
|
return Promise.resolve(null);
|
|
}
|
|
}
|
|
|
|
const results: (readonly [Error | null, unknown])[] = [];
|
|
|
|
for (const operation of this.operations) {
|
|
if (operation.type === 'set') {
|
|
const exists = this.backend.kv.has(operation.key);
|
|
if (operation.mode === 'NX' && exists) {
|
|
results.push([null, null]);
|
|
continue;
|
|
}
|
|
|
|
if (operation.mode === 'XX' && !exists) {
|
|
results.push([null, null]);
|
|
continue;
|
|
}
|
|
|
|
this.backend.kv.set(operation.key, operation.value);
|
|
this.backend.bumpRevision(operation.key);
|
|
results.push([null, 'OK']);
|
|
continue;
|
|
}
|
|
|
|
const set = this.backend.sets.get(operation.key) ?? new Set<string>();
|
|
const before = set.size;
|
|
|
|
set.add(operation.member);
|
|
this.backend.sets.set(operation.key, set);
|
|
this.backend.bumpRevision(operation.key);
|
|
results.push([null, set.size === before ? 0 : 1]);
|
|
}
|
|
|
|
return Promise.resolve(results);
|
|
}
|
|
}
|
|
|
|
class InMemoryAtomicRedisClient implements RedisTaskClient {
|
|
private watchedRevisions = new Map<string, number>();
|
|
|
|
public constructor(private readonly backend: InMemoryRedisBackend) {}
|
|
|
|
public get(key: string): Promise<string | null> {
|
|
return Promise.resolve(this.backend.kv.get(key) ?? null);
|
|
}
|
|
|
|
public mget(...keys: string[]): Promise<(string | null)[]> {
|
|
return Promise.resolve(keys.map((key) => this.backend.kv.get(key) ?? null));
|
|
}
|
|
|
|
public set(
|
|
key: string,
|
|
value: string,
|
|
mode?: 'NX' | 'XX',
|
|
): Promise<'OK' | null> {
|
|
const exists = this.backend.kv.has(key);
|
|
|
|
if (mode === 'NX' && exists) {
|
|
return Promise.resolve(null);
|
|
}
|
|
|
|
if (mode === 'XX' && !exists) {
|
|
return Promise.resolve(null);
|
|
}
|
|
|
|
this.backend.kv.set(key, value);
|
|
this.backend.bumpRevision(key);
|
|
|
|
return Promise.resolve('OK');
|
|
}
|
|
|
|
public smembers(key: string): Promise<string[]> {
|
|
return Promise.resolve([...(this.backend.sets.get(key) ?? new Set<string>())]);
|
|
}
|
|
|
|
public sadd(key: string, member: string): Promise<number> {
|
|
const values = this.backend.sets.get(key) ?? new Set<string>();
|
|
const before = values.size;
|
|
|
|
values.add(member);
|
|
this.backend.sets.set(key, values);
|
|
this.backend.bumpRevision(key);
|
|
|
|
return Promise.resolve(values.size === before ? 0 : 1);
|
|
}
|
|
|
|
public watch(...keys: string[]): Promise<'OK'> {
|
|
this.watchedRevisions = new Map(
|
|
keys.map((key) => [key, this.backend.getRevision(key)]),
|
|
);
|
|
return Promise.resolve('OK');
|
|
}
|
|
|
|
public unwatch(): Promise<'OK'> {
|
|
this.watchedRevisions.clear();
|
|
return Promise.resolve('OK');
|
|
}
|
|
|
|
public multi(): RedisTaskTransaction {
|
|
const watchedSnapshot = new Map(this.watchedRevisions);
|
|
this.watchedRevisions.clear();
|
|
return new InMemoryRedisTransaction(this.backend, watchedSnapshot);
|
|
}
|
|
}
|
|
|
|
class StrictAtomicRedisClient extends InMemoryAtomicRedisClient {
|
|
public override set(
|
|
key: string,
|
|
value: string,
|
|
mode?: 'NX' | 'XX',
|
|
): Promise<'OK' | null> {
|
|
void key;
|
|
void value;
|
|
void mode;
|
|
throw new Error('Direct set() is not allowed in strict atomic tests.');
|
|
}
|
|
|
|
public override sadd(key: string, member: string): Promise<number> {
|
|
void key;
|
|
void member;
|
|
throw new Error('Direct sadd() is not allowed in strict atomic tests.');
|
|
}
|
|
}
|
|
|
|
function createRepositoryPair(now: () => number): [RedisTaskRepository, RedisTaskRepository] {
|
|
const backend = new InMemoryRedisBackend();
|
|
|
|
return [
|
|
new RedisTaskRepository({
|
|
client: new InMemoryAtomicRedisClient(backend),
|
|
now,
|
|
}),
|
|
new RedisTaskRepository({
|
|
client: new InMemoryAtomicRedisClient(backend),
|
|
now,
|
|
}),
|
|
];
|
|
}
|
|
|
|
function createStrictRepositoryPair(
|
|
now: () => number,
|
|
): [RedisTaskRepository, RedisTaskRepository] {
|
|
const backend = new InMemoryRedisBackend();
|
|
|
|
return [
|
|
new RedisTaskRepository({
|
|
client: new StrictAtomicRedisClient(backend),
|
|
now,
|
|
}),
|
|
new RedisTaskRepository({
|
|
client: new StrictAtomicRedisClient(backend),
|
|
now,
|
|
}),
|
|
];
|
|
}
|
|
|
|
describe('RedisTaskRepository atomic transitions', () => {
|
|
it('creates atomically under concurrent create race', async () => {
|
|
const [repositoryA, repositoryB] = createStrictRepositoryPair(
|
|
() => 1_700_000_000_000,
|
|
);
|
|
|
|
const [createA, createB] = await Promise.allSettled([
|
|
repositoryA.create({
|
|
project: 'queue',
|
|
mission: 'phase1',
|
|
taskId: 'MQ-004-CREATE',
|
|
title: 'create race',
|
|
}),
|
|
repositoryB.create({
|
|
project: 'queue',
|
|
mission: 'phase1',
|
|
taskId: 'MQ-004-CREATE',
|
|
title: 'create race duplicate',
|
|
}),
|
|
]);
|
|
|
|
const fulfilled = [createA, createB].filter(
|
|
(result) => result.status === 'fulfilled',
|
|
);
|
|
const rejected = [createA, createB].filter(
|
|
(result) => result.status === 'rejected',
|
|
);
|
|
|
|
expect(fulfilled).toHaveLength(1);
|
|
expect(rejected).toHaveLength(1);
|
|
expect(rejected[0]?.reason).toBeInstanceOf(TaskAlreadyExistsError);
|
|
});
|
|
|
|
it('claims a pending task once and blocks concurrent double-claim', async () => {
|
|
let timestamp = 1_700_000_000_000;
|
|
const now = (): number => timestamp;
|
|
const [repositoryA, repositoryB] = createRepositoryPair(now);
|
|
|
|
await repositoryA.create({
|
|
project: 'queue',
|
|
mission: 'phase1',
|
|
taskId: 'MQ-004',
|
|
title: 'Atomic claim',
|
|
});
|
|
|
|
const [claimA, claimB] = await Promise.allSettled([
|
|
repositoryA.claim('MQ-004', { agentId: 'agent-a', ttlSeconds: 60 }),
|
|
repositoryB.claim('MQ-004', { agentId: 'agent-b', ttlSeconds: 60 }),
|
|
]);
|
|
|
|
const fulfilled = [claimA, claimB].filter((result) => result.status === 'fulfilled');
|
|
const rejected = [claimA, claimB].filter((result) => result.status === 'rejected');
|
|
|
|
expect(fulfilled).toHaveLength(1);
|
|
expect(rejected).toHaveLength(1);
|
|
});
|
|
|
|
it('allows claim takeover after TTL expiry', async () => {
|
|
let timestamp = 1_700_000_000_000;
|
|
const now = (): number => timestamp;
|
|
const [repositoryA, repositoryB] = createRepositoryPair(now);
|
|
|
|
await repositoryA.create({
|
|
project: 'queue',
|
|
mission: 'phase1',
|
|
taskId: 'MQ-004-EXP',
|
|
title: 'TTL expiry',
|
|
});
|
|
|
|
await repositoryA.claim('MQ-004-EXP', {
|
|
agentId: 'agent-a',
|
|
ttlSeconds: 1,
|
|
});
|
|
|
|
timestamp += 2_000;
|
|
|
|
const takeover = await repositoryB.claim('MQ-004-EXP', {
|
|
agentId: 'agent-b',
|
|
ttlSeconds: 60,
|
|
});
|
|
|
|
expect(takeover.claimedBy).toBe('agent-b');
|
|
});
|
|
|
|
it('releases a claimed task back to pending', async () => {
|
|
const [repository] = createRepositoryPair(() => 1_700_000_000_000);
|
|
|
|
await repository.create({
|
|
project: 'queue',
|
|
mission: 'phase1',
|
|
taskId: 'MQ-004-REL',
|
|
title: 'Release test',
|
|
});
|
|
|
|
await repository.claim('MQ-004-REL', {
|
|
agentId: 'agent-a',
|
|
ttlSeconds: 60,
|
|
});
|
|
|
|
const released = await repository.release('MQ-004-REL', {
|
|
agentId: 'agent-a',
|
|
});
|
|
|
|
expect(released.status).toBe('pending');
|
|
expect(released.claimedBy).toBeUndefined();
|
|
expect(released.claimedAt).toBeUndefined();
|
|
});
|
|
|
|
it('heartbeats, completes, and fails with valid transitions', async () => {
|
|
let timestamp = 1_700_000_000_000;
|
|
const now = (): number => timestamp;
|
|
const [repository] = createRepositoryPair(now);
|
|
|
|
await repository.create({
|
|
project: 'queue',
|
|
mission: 'phase1',
|
|
taskId: 'MQ-004-HCF',
|
|
title: 'Transition test',
|
|
});
|
|
|
|
await repository.claim('MQ-004-HCF', {
|
|
agentId: 'agent-a',
|
|
ttlSeconds: 60,
|
|
});
|
|
|
|
timestamp += 1_000;
|
|
const heartbeat = await repository.heartbeat('MQ-004-HCF', {
|
|
agentId: 'agent-a',
|
|
ttlSeconds: 120,
|
|
});
|
|
expect(heartbeat.claimTTL).toBe(120);
|
|
expect(heartbeat.claimedAt).toBe(1_700_000_001_000);
|
|
|
|
const completed = await repository.complete('MQ-004-HCF', {
|
|
agentId: 'agent-a',
|
|
summary: 'done',
|
|
});
|
|
expect(completed.status).toBe('completed');
|
|
expect(completed.completionSummary).toBe('done');
|
|
|
|
await repository.create({
|
|
project: 'queue',
|
|
mission: 'phase1',
|
|
taskId: 'MQ-004-FAIL',
|
|
title: 'Failure test',
|
|
});
|
|
|
|
await repository.claim('MQ-004-FAIL', {
|
|
agentId: 'agent-a',
|
|
ttlSeconds: 60,
|
|
});
|
|
|
|
const failed = await repository.fail('MQ-004-FAIL', {
|
|
agentId: 'agent-a',
|
|
reason: 'boom',
|
|
});
|
|
|
|
expect(failed.status).toBe('failed');
|
|
expect(failed.failureReason).toBe('boom');
|
|
expect(failed.retryCount).toBe(1);
|
|
});
|
|
|
|
it('rejects invalid transitions', async () => {
|
|
const [repository] = createRepositoryPair(() => 1_700_000_000_000);
|
|
|
|
await repository.create({
|
|
project: 'queue',
|
|
mission: 'phase1',
|
|
taskId: 'MQ-004-INV',
|
|
title: 'Invalid transitions',
|
|
});
|
|
|
|
await expect(
|
|
repository.complete('MQ-004-INV', {
|
|
agentId: 'agent-a',
|
|
}),
|
|
).rejects.toBeInstanceOf(TaskTransitionError);
|
|
});
|
|
|
|
it('enforces claim ownership for release and complete', async () => {
|
|
const [repository] = createRepositoryPair(() => 1_700_000_000_000);
|
|
|
|
await repository.create({
|
|
project: 'queue',
|
|
mission: 'phase1',
|
|
taskId: 'MQ-004-OWN',
|
|
title: 'Ownership checks',
|
|
});
|
|
|
|
await repository.claim('MQ-004-OWN', {
|
|
agentId: 'agent-a',
|
|
ttlSeconds: 60,
|
|
});
|
|
|
|
await expect(
|
|
repository.release('MQ-004-OWN', {
|
|
agentId: 'agent-b',
|
|
}),
|
|
).rejects.toBeInstanceOf(TaskOwnershipError);
|
|
|
|
await expect(
|
|
repository.complete('MQ-004-OWN', {
|
|
agentId: 'agent-b',
|
|
}),
|
|
).rejects.toBeInstanceOf(TaskOwnershipError);
|
|
});
|
|
|
|
it('merges concurrent non-conflicting update patches atomically', async () => {
|
|
const [repositoryA, repositoryB] = createRepositoryPair(() => 1_700_000_000_000);
|
|
|
|
await repositoryA.create({
|
|
project: 'queue',
|
|
mission: 'phase1',
|
|
taskId: 'MQ-004-UPD',
|
|
title: 'Original title',
|
|
description: 'Original description',
|
|
});
|
|
|
|
await Promise.all([
|
|
repositoryA.update('MQ-004-UPD', {
|
|
title: 'Updated title',
|
|
}),
|
|
repositoryB.update('MQ-004-UPD', {
|
|
description: 'Updated description',
|
|
}),
|
|
]);
|
|
|
|
const latest = await repositoryA.get('MQ-004-UPD');
|
|
|
|
expect(latest).not.toBeNull();
|
|
expect(latest?.title).toBe('Updated title');
|
|
expect(latest?.description).toBe('Updated description');
|
|
});
|
|
});
|