diff --git a/packages/db/src/backlog.spec.ts b/packages/db/src/backlog.spec.ts index fe7152d..d02a0df 100644 --- a/packages/db/src/backlog.spec.ts +++ b/packages/db/src/backlog.spec.ts @@ -101,6 +101,52 @@ describe('BacklogService', () => { expect(new Set(wonIds).size).toBe(5); // all distinct — no double-claim }); + it('N concurrent claimers on N ready cards => every claimer wins a distinct card (no starvation)', async () => { + // This is the direct benefit of locking exactly ONE ready row per claim + // (`FOR UPDATE SKIP LOCKED LIMIT 1`): with as many ready cards as + // claimers, NONE should starve. The old "lock the whole ready set" + // behaviour let one claimer lock every row, forcing the rest to null even + // though cards were free. + const N = 6; + for (let i = 0; i < N; i++) { + await svc.create({ id: `n-${i}`, title: `card ${i}`, priority: i }); + } + const results = await Promise.all( + Array.from({ length: N }, (_, i) => + new BacklogService(handle.db).claim({ owner: `w-${i}` }), + ), + ); + const won = results.filter((c): c is NonNullable => c !== null); + // No claimer starved: all N won. + expect(won).toHaveLength(N); + // Each won a distinct card. + expect(new Set(won.map((c) => c.id)).size).toBe(N); + // Every ready card was consumed. + expect(await svc.list({ status: 'ready' })).toHaveLength(0); + }); + + it('sequential claims drain ready cards in priority order and never null while ready remain', async () => { + // PGlite-stable fallback assertion of the same property without relying on + // true parallelism or wall-clock timing: each claim returns the next + // highest-priority distinct card and never spuriously returns null while + // ready cards remain. + const N = 4; + for (let i = 0; i < N; i++) { + await svc.create({ id: `s-${i}`, title: `card ${i}`, priority: i }); + } + const order: string[] = []; + for (let i = 0; i < N; i++) { + const claimed = await svc.claim({ owner: `w-${i}` }); + expect(claimed).not.toBeNull(); + order.push(claimed!.id); + } + // Highest priority first, all distinct. + expect(order).toEqual(['s-3', 's-2', 's-1', 's-0']); + expect(new Set(order).size).toBe(N); + // Now nothing ready remains => null. + expect(await svc.claim({ owner: 'late' })).toBeNull(); + }); + it('claim picks the highest-priority ready card', async () => { await svc.create({ id: 'low', title: 'low', priority: 1 }); await svc.create({ id: 'high', title: 'high', priority: 9 }); diff --git a/packages/db/src/backlog.ts b/packages/db/src/backlog.ts index e987e62..922c259 100644 --- a/packages/db/src/backlog.ts +++ b/packages/db/src/backlog.ts @@ -193,14 +193,16 @@ export class BacklogService { /** * Atomically claim a card. * - * Strategy: inside ONE transaction we lock the candidate row(s) with - * `FOR UPDATE SKIP LOCKED`. A concurrent claimer that already holds the lock - * on a row has that row skipped for us, so two claimers can never both win the - * same card. The locked row is then flipped to `claimed` and returned. + * Strategy: inside ONE transaction we lock the candidate row with + * `FOR UPDATE SKIP LOCKED LIMIT 1`. A concurrent claimer that already holds + * the lock on a row has that row skipped for us, so two claimers can never + * both win the same card — and, crucially, each claimer locks exactly ONE + * row, so concurrent claimers fan out across distinct ready cards instead of + * one claimer locking the whole ready set and starving the rest. * * Candidate selection (when no explicit `id`): * - status = 'ready' - * - all deps satisfied (deps ⊆ done set) + * - all deps satisfied (every id in depends_on is currently 'done') * - ordered by priority DESC, created_at ASC * * Returns the claimed card, or null if nothing is claimable. @@ -209,54 +211,76 @@ export class BacklogService { const ttl = opts.ttlSeconds ?? DEFAULT_CLAIM_TTL_SECONDS; return this.db.transaction(async (tx) => { - // Compute the set of satisfied dependencies up front. A card is claimable - // only if every id in depends_on is currently 'done'. - const doneRows = await tx - .select({ id: backlog.id }) - .from(backlog) - .where(eq(backlog.status, 'done')); - const doneIds = new Set(doneRows.map((r) => r.id)); - - // Lock candidate ready rows, skipping any already locked by a concurrent - // claimer. We over-fetch (no LIMIT 1) so the deps filter below can fall - // through to the next-best card rather than returning null spuriously. - let lockedRows: RawRow[]; + // Specific-id path: lock that one ready row (if free) and apply the + // deps-satisfied gate in JS, exactly as before. if (opts.id) { + const doneRows = await tx + .select({ id: backlog.id }) + .from(backlog) + .where(eq(backlog.status, 'done')); + const doneIds = new Set(doneRows.map((r) => r.id)); + const result = await tx.execute( sql`SELECT * FROM ${backlog} WHERE ${backlog.id} = ${opts.id} AND ${backlog.status} = 'ready' FOR UPDATE SKIP LOCKED`, ); - lockedRows = rowsOf(result); - } else { - const result = await tx.execute( - sql`SELECT * FROM ${backlog} - WHERE ${backlog.status} = 'ready' - ORDER BY ${backlog.priority} DESC, ${backlog.createdAt} ASC - FOR UPDATE SKIP LOCKED`, + const candidate = rowsOf(result).find((row) => + normalizeDeps(row.depends_on).every((dep) => doneIds.has(dep)), ); - lockedRows = rowsOf(result); + if (!candidate) return null; + + const updated = await tx + .update(backlog) + .set({ + status: 'claimed', + claimOwner: opts.owner, + claimTtlSeconds: ttl, + claimedAt: new Date(), + attempts: sql`${backlog.attempts} + 1`, + updatedAt: new Date(), + }) + .where(eq(backlog.id, candidate.id)) + .returning(); + + return toCard(updated[0]!); } - const candidate = lockedRows.find((row) => - normalizeDeps(row.depends_on).every((dep) => doneIds.has(dep)), + // No-id path: claim the single highest-priority, deps-satisfied ready + // card. We lock exactly ONE row in the inner SELECT (`FOR UPDATE SKIP + // LOCKED LIMIT 1`) so concurrent claimers grab distinct cards rather than + // one claimer locking every ready row and forcing the others to null. + // + // The deps-satisfied gate is pushed into SQL so `LIMIT 1` lands on the + // next genuinely-eligible card: a card is eligible iff none of its + // depends_on ids is absent from the set of 'done' card ids. + const updated = await tx.execute( + sql`UPDATE ${backlog} + SET status = 'claimed', + claim_owner = ${opts.owner}, + claim_ttl_seconds = ${ttl}, + claimed_at = now(), + attempts = ${backlog.attempts} + 1, + updated_at = now() + WHERE ${backlog.id} = ( + SELECT b.id FROM ${backlog} AS b + WHERE b.status = 'ready' + AND NOT EXISTS ( + SELECT 1 + FROM jsonb_array_elements_text(b.depends_on) AS dep + WHERE dep NOT IN ( + SELECT d.id FROM ${backlog} AS d WHERE d.status = 'done' + ) + ) + ORDER BY b.priority DESC, b.created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + RETURNING *`, ); - if (!candidate) return null; - const updated = await tx - .update(backlog) - .set({ - status: 'claimed', - claimOwner: opts.owner, - claimTtlSeconds: ttl, - claimedAt: new Date(), - attempts: sql`${backlog.attempts} + 1`, - updatedAt: new Date(), - }) - .where(eq(backlog.id, candidate.id)) - .returning(); - - return toCard(updated[0]!); + const row = rowsOf(updated)[0]; + return row ? toCard(rawToRow(row)) : null; }); } @@ -391,6 +415,33 @@ function rowsOf(result: unknown): RawRow[] { return []; } +/** + * Map a raw `RETURNING *` row (snake_case columns, possibly string-encoded + * timestamps/JSON depending on the driver) onto the drizzle `Row` shape that + * `toCard` consumes. Mirrors the column ↔ property mapping in `schema.ts`. + */ +function rawToRow(raw: RawRow): Row { + const r = raw as unknown as Record; + const toDate = (v: unknown): Date => (v instanceof Date ? v : new Date(v as string)); + return { + id: r.id as string, + title: r.title as string, + body: (r.body ?? null) as string | null, + phase: (r.phase ?? null) as string | null, + priority: Number(r.priority), + status: r.status as BacklogStatus, + dependsOn: normalizeDeps(r.depends_on), + claimOwner: (r.claim_owner ?? null) as string | null, + claimTtlSeconds: r.claim_ttl_seconds == null ? null : Number(r.claim_ttl_seconds), + claimedAt: r.claimed_at == null ? null : toDate(r.claimed_at), + attempts: Number(r.attempts), + idempotencyKey: (r.idempotency_key ?? null) as string | null, + acceptance: r.acceptance ?? null, + createdAt: toDate(r.created_at), + updatedAt: toDate(r.updated_at), + }; +} + /** A raw SQL row returns snake_case `depends_on`; normalize to string[]. */ function normalizeDeps(value: unknown): string[] { if (Array.isArray(value)) return value as string[];