import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { sql } from 'drizzle-orm'; import { createPgliteDb } from './client-pglite.js'; import { runPgliteMigrations } from './migrate.js'; import type { DbHandle } from './client.js'; import { BacklogService } from './backlog.js'; import { backlog } from './schema.js'; // Helper: backdate a claim's claimed_at by 1 hour so it is past any short TTL. function sqlBackdate(id: string) { return sql`UPDATE ${backlog} SET claimed_at = now() - interval '1 hour' WHERE ${backlog.id} = ${id}`; } /** * Real Postgres semantics, no external server: embedded in-memory PGlite. * The migration path creates the `backlog` table (and every other table) so the * service runs against the actual generated schema, including the row locks the * atomic-claim path depends on. */ async function freshService(): Promise<{ handle: DbHandle; svc: BacklogService }> { const handle = createPgliteDb('memory://'); await runPgliteMigrations(handle); return { handle, svc: new BacklogService(handle.db) }; } describe('BacklogService', () => { let handle: DbHandle; let svc: BacklogService; beforeEach(async () => { ({ handle, svc } = await freshService()); }); afterEach(async () => { await handle.close(); }); it('create then list returns the card', async () => { await svc.create({ id: 'c1', title: 'First card', phase: 'M1', priority: 5 }); const all = await svc.list(); expect(all).toHaveLength(1); expect(all[0]).toMatchObject({ id: 'c1', title: 'First card', phase: 'M1', status: 'ready' }); }); it('idempotency_key dedups create', async () => { const a = await svc.create({ id: 'c1', title: 'one', idempotencyKey: 'k-1' }); const b = await svc.create({ id: 'c2', title: 'two', idempotencyKey: 'k-1' }); expect(b.id).toBe(a.id); const all = await svc.list(); expect(all).toHaveLength(1); }); it('list filters by status and phase', async () => { await svc.create({ id: 'c1', title: 'a', phase: 'M1' }); await svc.create({ id: 'c2', title: 'b', phase: 'M2' }); await svc.block('c2'); expect(await svc.list({ phase: 'M1' })).toHaveLength(1); expect(await svc.list({ status: 'blocked' })).toHaveLength(1); expect((await svc.list({ status: 'blocked' }))[0]!.id).toBe('c2'); }); describe('atomic claim', () => { it('two concurrent claimers on one card => exactly one wins', async () => { await svc.create({ id: 'only', title: 'the one', priority: 10 }); // Two independent claimers race for the single ready card on the same db. // The atomic claim path (`FOR UPDATE SKIP LOCKED` inside a transaction) // guarantees the loser's locked row is skipped, so it can never also flip // the card to claimed — it gets the next candidate (none) and returns null. const svcA = new BacklogService(handle.db); const svcB = new BacklogService(handle.db); const [a, b] = await Promise.all([ svcA.claim({ owner: 'worker-A' }), svcB.claim({ owner: 'worker-B' }), ]); const winners = [a, b].filter((c) => c !== null); expect(winners).toHaveLength(1); expect(winners[0]!.id).toBe('only'); expect(winners[0]!.status).toBe('claimed'); expect(['worker-A', 'worker-B']).toContain(winners[0]!.claimOwner); const card = await svc.get('only'); expect(card!.status).toBe('claimed'); expect(card!.attempts).toBe(1); }); it('many concurrent claimers on N cards => no card is double-claimed', async () => { // 5 ready cards, 8 concurrent claimers. Exactly 5 win, all distinct. for (let i = 0; i < 5; i++) { await svc.create({ id: `card-${i}`, title: `card ${i}`, priority: i }); } const claimers = Array.from({ length: 8 }, (_, i) => new BacklogService(handle.db).claim({ owner: `w-${i}` }), ); const results = await Promise.all(claimers); const won = results.filter((c): c is NonNullable => c !== null); const wonIds = won.map((c) => c.id); expect(won).toHaveLength(5); expect(new Set(wonIds).size).toBe(5); // all distinct — no double-claim }); it('N concurrent claimers on N ready cards => every claimer wins a distinct card (no starvation)', async () => { // This is the direct benefit of locking exactly ONE ready row per claim // (`FOR UPDATE SKIP LOCKED LIMIT 1`): with as many ready cards as // claimers, NONE should starve. The old "lock the whole ready set" // behaviour let one claimer lock every row, forcing the rest to null even // though cards were free. const N = 6; for (let i = 0; i < N; i++) { await svc.create({ id: `n-${i}`, title: `card ${i}`, priority: i }); } const results = await Promise.all( Array.from({ length: N }, (_, i) => new BacklogService(handle.db).claim({ owner: `w-${i}` }), ), ); const won = results.filter((c): c is NonNullable => c !== null); // No claimer starved: all N won. expect(won).toHaveLength(N); // Each won a distinct card. expect(new Set(won.map((c) => c.id)).size).toBe(N); // Every ready card was consumed. expect(await svc.list({ status: 'ready' })).toHaveLength(0); }); it('sequential claims drain ready cards in priority order and never null while ready remain', async () => { // PGlite-stable fallback assertion of the same property without relying on // true parallelism or wall-clock timing: each claim returns the next // highest-priority distinct card and never spuriously returns null while // ready cards remain. const N = 4; for (let i = 0; i < N; i++) { await svc.create({ id: `s-${i}`, title: `card ${i}`, priority: i }); } const order: string[] = []; for (let i = 0; i < N; i++) { const claimed = await svc.claim({ owner: `w-${i}` }); expect(claimed).not.toBeNull(); order.push(claimed!.id); } // Highest priority first, all distinct. expect(order).toEqual(['s-3', 's-2', 's-1', 's-0']); expect(new Set(order).size).toBe(N); // Now nothing ready remains => null. expect(await svc.claim({ owner: 'late' })).toBeNull(); }); it('claim picks the highest-priority ready card', async () => { await svc.create({ id: 'low', title: 'low', priority: 1 }); await svc.create({ id: 'high', title: 'high', priority: 9 }); const claimed = await svc.claim({ owner: 'w' }); expect(claimed!.id).toBe('high'); }); it('claim of a specific --id', async () => { await svc.create({ id: 'a', title: 'a', priority: 9 }); await svc.create({ id: 'b', title: 'b', priority: 1 }); const claimed = await svc.claim({ owner: 'w', id: 'b' }); expect(claimed!.id).toBe('b'); }); it('claim returns null when nothing is ready', async () => { const claimed = await svc.claim({ owner: 'w' }); expect(claimed).toBeNull(); }); }); describe('deps DAG gate', () => { it('card with an unfinished dep is not claimable and not ready', async () => { await svc.create({ id: 'dep', title: 'dependency' }); await svc.create({ id: 'main', title: 'depends on dep', dependsOn: ['dep'] }); // `main` should NOT be claimable while `dep` is not done — `dep` wins. const first = await svc.claim({ owner: 'w' }); expect(first!.id).toBe('dep'); // With dep claimed (not done), main still cannot be claimed. const second = await svc.claim({ owner: 'w' }); expect(second).toBeNull(); // ready-only list excludes main while its dep is unfinished. const ready = await svc.list({ readyOnly: true }); expect(ready.map((c) => c.id)).not.toContain('main'); // Once dep is done, main becomes ready and claimable. await svc.complete('dep'); const readyAfter = await svc.list({ readyOnly: true }); expect(readyAfter.map((c) => c.id)).toContain('main'); const third = await svc.claim({ owner: 'w' }); expect(third!.id).toBe('main'); }); it('link adds a depends_on edge', async () => { await svc.create({ id: 'a', title: 'a' }); await svc.create({ id: 'b', title: 'b' }); const linked = await svc.link('a', 'b'); expect(linked.dependsOn).toEqual(['b']); // a is now gated on b const claimed = await svc.claim({ owner: 'w' }); expect(claimed!.id).toBe('b'); }); }); describe('reclaim TTL', () => { it('reclaim returns expired claims to ready', async () => { await svc.create({ id: 'c1', title: 'c1' }); const claimed = await svc.claim({ owner: 'w', ttlSeconds: 60 }); expect(claimed!.status).toBe('claimed'); // Backdate the claim so it is well past its TTL. await handle.db.execute(sqlBackdate('c1')); const result = await svc.reclaim(); expect(result.reclaimed).toEqual(['c1']); const card = await svc.get('c1'); expect(card!.status).toBe('ready'); expect(card!.claimOwner).toBeNull(); expect(card!.claimedAt).toBeNull(); }); it('reclaim does not touch a fresh (unexpired) claim', async () => { await svc.create({ id: 'c1', title: 'c1' }); await svc.claim({ owner: 'w', ttlSeconds: 3600 }); const result = await svc.reclaim(); expect(result.reclaimed).toEqual([]); expect((await svc.get('c1'))!.status).toBe('claimed'); }); it('reclaim --id releases a specific claim regardless of expiry', async () => { await svc.create({ id: 'c1', title: 'c1' }); await svc.claim({ owner: 'w', ttlSeconds: 3600 }); const result = await svc.reclaim({ id: 'c1' }); expect(result.reclaimed).toEqual(['c1']); expect((await svc.get('c1'))!.status).toBe('ready'); }); }); describe('stats', () => { it('computes counts, oldest-ready age, and expired-claim count', async () => { await svc.create({ id: 'r1', title: 'r1' }); await svc.create({ id: 'r2', title: 'r2' }); await svc.create({ id: 'b1', title: 'b1' }); await svc.block('b1'); await svc.create({ id: 'd1', title: 'd1' }); await svc.complete('d1'); await svc.create({ id: 'cl1', title: 'cl1' }); await svc.claim({ owner: 'w', id: 'cl1', ttlSeconds: 60 }); await handle.db.execute(sqlBackdate('cl1')); const stats = await svc.stats(); expect(stats.counts.ready).toBe(2); expect(stats.counts.blocked).toBe(1); expect(stats.counts.done).toBe(1); expect(stats.counts.claimed).toBe(1); expect(stats.total).toBe(5); expect(stats.expiredClaimCount).toBe(1); expect(stats.oldestReadyAgeSeconds).not.toBeNull(); expect(stats.oldestReadyAgeSeconds!).toBeGreaterThanOrEqual(0); }); }); });