fix(backlog): claim locks one ready row (LIMIT 1) to prevent claimer starvation
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -101,6 +101,52 @@ describe('BacklogService', () => {
|
||||
expect(new Set(wonIds).size).toBe(5); // all distinct — no double-claim
|
||||
});
|
||||
|
||||
it('N concurrent claimers on N ready cards => every claimer wins a distinct card (no starvation)', async () => {
|
||||
// This is the direct benefit of locking exactly ONE ready row per claim
|
||||
// (`FOR UPDATE SKIP LOCKED LIMIT 1`): with as many ready cards as
|
||||
// claimers, NONE should starve. The old "lock the whole ready set"
|
||||
// behaviour let one claimer lock every row, forcing the rest to null even
|
||||
// though cards were free.
|
||||
const N = 6;
|
||||
for (let i = 0; i < N; i++) {
|
||||
await svc.create({ id: `n-${i}`, title: `card ${i}`, priority: i });
|
||||
}
|
||||
const results = await Promise.all(
|
||||
Array.from({ length: N }, (_, i) =>
|
||||
new BacklogService(handle.db).claim({ owner: `w-${i}` }),
|
||||
),
|
||||
);
|
||||
const won = results.filter((c): c is NonNullable<typeof c> => c !== null);
|
||||
// No claimer starved: all N won.
|
||||
expect(won).toHaveLength(N);
|
||||
// Each won a distinct card.
|
||||
expect(new Set(won.map((c) => c.id)).size).toBe(N);
|
||||
// Every ready card was consumed.
|
||||
expect(await svc.list({ status: 'ready' })).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('sequential claims drain ready cards in priority order and never null while ready remain', async () => {
|
||||
// PGlite-stable fallback assertion of the same property without relying on
|
||||
// true parallelism or wall-clock timing: each claim returns the next
|
||||
// highest-priority distinct card and never spuriously returns null while
|
||||
// ready cards remain.
|
||||
const N = 4;
|
||||
for (let i = 0; i < N; i++) {
|
||||
await svc.create({ id: `s-${i}`, title: `card ${i}`, priority: i });
|
||||
}
|
||||
const order: string[] = [];
|
||||
for (let i = 0; i < N; i++) {
|
||||
const claimed = await svc.claim({ owner: `w-${i}` });
|
||||
expect(claimed).not.toBeNull();
|
||||
order.push(claimed!.id);
|
||||
}
|
||||
// Highest priority first, all distinct.
|
||||
expect(order).toEqual(['s-3', 's-2', 's-1', 's-0']);
|
||||
expect(new Set(order).size).toBe(N);
|
||||
// Now nothing ready remains => null.
|
||||
expect(await svc.claim({ owner: 'late' })).toBeNull();
|
||||
});
|
||||
|
||||
it('claim picks the highest-priority ready card', async () => {
|
||||
await svc.create({ id: 'low', title: 'low', priority: 1 });
|
||||
await svc.create({ id: 'high', title: 'high', priority: 9 });
|
||||
|
||||
@@ -193,14 +193,16 @@ export class BacklogService {
|
||||
/**
|
||||
* Atomically claim a card.
|
||||
*
|
||||
* Strategy: inside ONE transaction we lock the candidate row(s) with
|
||||
* `FOR UPDATE SKIP LOCKED`. A concurrent claimer that already holds the lock
|
||||
* on a row has that row skipped for us, so two claimers can never both win the
|
||||
* same card. The locked row is then flipped to `claimed` and returned.
|
||||
* Strategy: inside ONE transaction we lock the candidate row with
|
||||
* `FOR UPDATE SKIP LOCKED LIMIT 1`. A concurrent claimer that already holds
|
||||
* the lock on a row has that row skipped for us, so two claimers can never
|
||||
* both win the same card — and, crucially, each claimer locks exactly ONE
|
||||
* row, so concurrent claimers fan out across distinct ready cards instead of
|
||||
* one claimer locking the whole ready set and starving the rest.
|
||||
*
|
||||
* Candidate selection (when no explicit `id`):
|
||||
* - status = 'ready'
|
||||
* - all deps satisfied (deps ⊆ done set)
|
||||
* - all deps satisfied (every id in depends_on is currently 'done')
|
||||
* - ordered by priority DESC, created_at ASC
|
||||
*
|
||||
* Returns the claimed card, or null if nothing is claimable.
|
||||
@@ -209,54 +211,76 @@ export class BacklogService {
|
||||
const ttl = opts.ttlSeconds ?? DEFAULT_CLAIM_TTL_SECONDS;
|
||||
|
||||
return this.db.transaction(async (tx) => {
|
||||
// Compute the set of satisfied dependencies up front. A card is claimable
|
||||
// only if every id in depends_on is currently 'done'.
|
||||
const doneRows = await tx
|
||||
.select({ id: backlog.id })
|
||||
.from(backlog)
|
||||
.where(eq(backlog.status, 'done'));
|
||||
const doneIds = new Set(doneRows.map((r) => r.id));
|
||||
|
||||
// Lock candidate ready rows, skipping any already locked by a concurrent
|
||||
// claimer. We over-fetch (no LIMIT 1) so the deps filter below can fall
|
||||
// through to the next-best card rather than returning null spuriously.
|
||||
let lockedRows: RawRow[];
|
||||
// Specific-id path: lock that one ready row (if free) and apply the
|
||||
// deps-satisfied gate in JS, exactly as before.
|
||||
if (opts.id) {
|
||||
const doneRows = await tx
|
||||
.select({ id: backlog.id })
|
||||
.from(backlog)
|
||||
.where(eq(backlog.status, 'done'));
|
||||
const doneIds = new Set(doneRows.map((r) => r.id));
|
||||
|
||||
const result = await tx.execute(
|
||||
sql`SELECT * FROM ${backlog}
|
||||
WHERE ${backlog.id} = ${opts.id} AND ${backlog.status} = 'ready'
|
||||
FOR UPDATE SKIP LOCKED`,
|
||||
);
|
||||
lockedRows = rowsOf(result);
|
||||
} else {
|
||||
const result = await tx.execute(
|
||||
sql`SELECT * FROM ${backlog}
|
||||
WHERE ${backlog.status} = 'ready'
|
||||
ORDER BY ${backlog.priority} DESC, ${backlog.createdAt} ASC
|
||||
FOR UPDATE SKIP LOCKED`,
|
||||
const candidate = rowsOf(result).find((row) =>
|
||||
normalizeDeps(row.depends_on).every((dep) => doneIds.has(dep)),
|
||||
);
|
||||
lockedRows = rowsOf(result);
|
||||
if (!candidate) return null;
|
||||
|
||||
const updated = await tx
|
||||
.update(backlog)
|
||||
.set({
|
||||
status: 'claimed',
|
||||
claimOwner: opts.owner,
|
||||
claimTtlSeconds: ttl,
|
||||
claimedAt: new Date(),
|
||||
attempts: sql`${backlog.attempts} + 1`,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(backlog.id, candidate.id))
|
||||
.returning();
|
||||
|
||||
return toCard(updated[0]!);
|
||||
}
|
||||
|
||||
const candidate = lockedRows.find((row) =>
|
||||
normalizeDeps(row.depends_on).every((dep) => doneIds.has(dep)),
|
||||
// No-id path: claim the single highest-priority, deps-satisfied ready
|
||||
// card. We lock exactly ONE row in the inner SELECT (`FOR UPDATE SKIP
|
||||
// LOCKED LIMIT 1`) so concurrent claimers grab distinct cards rather than
|
||||
// one claimer locking every ready row and forcing the others to null.
|
||||
//
|
||||
// The deps-satisfied gate is pushed into SQL so `LIMIT 1` lands on the
|
||||
// next genuinely-eligible card: a card is eligible iff none of its
|
||||
// depends_on ids is absent from the set of 'done' card ids.
|
||||
const updated = await tx.execute(
|
||||
sql`UPDATE ${backlog}
|
||||
SET status = 'claimed',
|
||||
claim_owner = ${opts.owner},
|
||||
claim_ttl_seconds = ${ttl},
|
||||
claimed_at = now(),
|
||||
attempts = ${backlog.attempts} + 1,
|
||||
updated_at = now()
|
||||
WHERE ${backlog.id} = (
|
||||
SELECT b.id FROM ${backlog} AS b
|
||||
WHERE b.status = 'ready'
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM jsonb_array_elements_text(b.depends_on) AS dep
|
||||
WHERE dep NOT IN (
|
||||
SELECT d.id FROM ${backlog} AS d WHERE d.status = 'done'
|
||||
)
|
||||
)
|
||||
ORDER BY b.priority DESC, b.created_at ASC
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LIMIT 1
|
||||
)
|
||||
RETURNING *`,
|
||||
);
|
||||
if (!candidate) return null;
|
||||
|
||||
const updated = await tx
|
||||
.update(backlog)
|
||||
.set({
|
||||
status: 'claimed',
|
||||
claimOwner: opts.owner,
|
||||
claimTtlSeconds: ttl,
|
||||
claimedAt: new Date(),
|
||||
attempts: sql`${backlog.attempts} + 1`,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(backlog.id, candidate.id))
|
||||
.returning();
|
||||
|
||||
return toCard(updated[0]!);
|
||||
const row = rowsOf(updated)[0];
|
||||
return row ? toCard(rawToRow(row)) : null;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -391,6 +415,33 @@ function rowsOf(result: unknown): RawRow[] {
|
||||
return [];
|
||||
}
|
||||
|
||||
/**
|
||||
* Map a raw `RETURNING *` row (snake_case columns, possibly string-encoded
|
||||
* timestamps/JSON depending on the driver) onto the drizzle `Row` shape that
|
||||
* `toCard` consumes. Mirrors the column ↔ property mapping in `schema.ts`.
|
||||
*/
|
||||
function rawToRow(raw: RawRow): Row {
|
||||
const r = raw as unknown as Record<string, unknown>;
|
||||
const toDate = (v: unknown): Date => (v instanceof Date ? v : new Date(v as string));
|
||||
return {
|
||||
id: r.id as string,
|
||||
title: r.title as string,
|
||||
body: (r.body ?? null) as string | null,
|
||||
phase: (r.phase ?? null) as string | null,
|
||||
priority: Number(r.priority),
|
||||
status: r.status as BacklogStatus,
|
||||
dependsOn: normalizeDeps(r.depends_on),
|
||||
claimOwner: (r.claim_owner ?? null) as string | null,
|
||||
claimTtlSeconds: r.claim_ttl_seconds == null ? null : Number(r.claim_ttl_seconds),
|
||||
claimedAt: r.claimed_at == null ? null : toDate(r.claimed_at),
|
||||
attempts: Number(r.attempts),
|
||||
idempotencyKey: (r.idempotency_key ?? null) as string | null,
|
||||
acceptance: r.acceptance ?? null,
|
||||
createdAt: toDate(r.created_at),
|
||||
updatedAt: toDate(r.updated_at),
|
||||
};
|
||||
}
|
||||
|
||||
/** A raw SQL row returns snake_case `depends_on`; normalize to string[]. */
|
||||
function normalizeDeps(value: unknown): string[] {
|
||||
if (Array.isArray(value)) return value as string[];
|
||||
|
||||
Reference in New Issue
Block a user