fix(fleet): serialize restart-lock transitions to close concurrent-breaker race (review #680)
All checks were successful
ci/woodpecker/pr/ci Pipeline was successful
ci/woodpecker/push/ci Pipeline was successful

Stale/max-wait takeover was not safe against concurrent breakers: two
breakers could both judge the lock stale and both proceed, re-introducing
the tight-loop. POSIX/Node has no content- or inode-conditional unlink or
rename, so "judge stale, then replace" can never be atomic with pure path
ops.

Serialize ALL lock transitions (acquire, release, takeover) under one
short-lived registry mutex held only across a few fs ops, never across the
restart itself. This makes check-then-mutate atomic, so exactly one breaker
can take over a stale lock while the others wait and re-evaluate.

The mutex itself uses mtime-based staleness (open('wx') creates an empty
inode before the token is written; a content check would reap a lock that is
still being acquired). The mutex populates-or-cleans-up on write failure so a
half-created mutex never leaks.

Regression coverage at two widths: a 2-breaker barrier test (exactly one
takes over, the other waits) and the existing 3-breaker test (maxActive===1,
distinct tokens, final lock released).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Jarvis
2026-06-24 20:26:39 -05:00
parent 43ad813e0d
commit 786762587d
2 changed files with 321 additions and 66 deletions

View File

@@ -906,6 +906,139 @@ describe('fleet command construction', () => {
await rm(home, { recursive: true, force: true });
});
it('lets only one of several concurrent breakers proceed past a stale lock', async () => {
const home = await tempDir();
const lockPath = restartLockPath(home);
await mkdir(dirname(lockPath), { recursive: true });
// A stale lock left by a crashed owner: every concurrent re-entrant restart
// will judge it stale and try to break it at the same instant. Breaking must
// NOT grant ownership — only the atomic re-create may — so exactly one
// contender can ever hold the lock at a time. (The v2 fix wrote our own token
// during the break and read it back, so two breakers each saw their own token
// and BOTH proceeded; this guards that regression.)
await writeFile(
lockPath,
`4242\n${Date.now() - RESTART_LOCK_STALE_MS - 1_000}\nstale-owner-token\n`,
);
// Yielding sleep so a waiting contender lets the current owner finish and
// release before it re-contends, instead of spinning the microtask queue.
const sleepFn: SleepFn = async () => {
await new Promise((res) => setTimeout(res, 0));
};
let active = 0;
let maxActive = 0;
const tokens: string[] = [];
const tokenOf = async (): Promise<string> => {
const raw = await readFile(lockPath, 'utf8');
return raw.split('\n')[2]?.trim() ?? '';
};
// One "restart" = acquire the lock, do work in the critical section, release.
const restartOnce = async (): Promise<void> => {
const guard = await acquireRestartLock(home, sleepFn);
active += 1;
maxActive = Math.max(maxActive, active);
// Record the token we own while we hold it, then yield to interleave with
// any other contender that might (wrongly) believe it owns the lock too.
tokens.push(await tokenOf());
await new Promise((res) => setTimeout(res, 0));
active -= 1;
await guard.release();
};
try {
// Three breakers race the single stale lock simultaneously.
await Promise.all([restartOnce(), restartOnce(), restartOnce()]);
// Mutual exclusion held: never two owners at once despite concurrent breaks.
expect(maxActive).toBe(1);
// Each acquire owned with its own distinct token — no two ever shared it.
expect(new Set(tokens).size).toBe(3);
// The lock is fully released at the end.
await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' });
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('lets exactly one of two breakers take over a stale lock while the other waits', async () => {
const home = await tempDir();
const lockPath = restartLockPath(home);
await mkdir(dirname(lockPath), { recursive: true });
// A single stale lock both contenders will judge stale at the same instant.
// Every transition runs under the registry mutex, so only one may take the
// lock over; the other must observe a now-fresh owner and WAIT/re-evaluate
// rather than also taking over. (A content-blind clobber let both believe
// they owned it — this asserts the mutex-gated CAS takeover instead.)
await writeFile(
lockPath,
`4242\n${Date.now() - RESTART_LOCK_STALE_MS - 1_000}\nstale-owner-token\n`,
);
// Barrier the winner holds against until the loser has observed the lock
// fresh and waited at least once — forcing the exact interleaving where one
// proceeds while the other waits, deterministically rather than by timing.
let resolveLoserWaited: () => void = () => {};
const loserWaited = new Promise<void>((res) => {
resolveLoserWaited = res;
});
let sleeps = 0;
const sleepFn: SleepFn = async () => {
sleeps += 1;
resolveLoserWaited();
await new Promise((res) => setTimeout(res, 0));
};
let active = 0;
let maxActive = 0;
const tokens: string[] = [];
const tokenOf = async (): Promise<string> => {
const raw = await readFile(lockPath, 'utf8');
return raw.split('\n')[2]?.trim() ?? '';
};
let firstOwner = true;
const restartOnce = async (): Promise<void> => {
const guard = await acquireRestartLock(home, sleepFn);
active += 1;
maxActive = Math.max(maxActive, active);
tokens.push(await tokenOf());
if (firstOwner) {
// Winner: keep holding the lock until the loser has waited once, so the
// loser is guaranteed to see a FRESH owner (not the stale one) and back
// off — proving it could not also take over.
firstOwner = false;
await loserWaited;
} else {
await new Promise((res) => setTimeout(res, 0));
}
active -= 1;
await guard.release();
};
try {
// Exactly two breakers race the single stale lock.
await Promise.all([restartOnce(), restartOnce()]);
// Mutual exclusion: never two owners at once (if both took over the stale
// lock, this would be 2).
expect(maxActive).toBe(1);
// Both eventually owned, each with its own distinct token.
expect(new Set(tokens).size).toBe(2);
// The loser observed the winner's fresh lock and waited — it did NOT also
// take over the stale lock.
expect(sleeps).toBeGreaterThanOrEqual(1);
// The lock is fully released at the end.
await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' });
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('attempts every agent and the holder during fleet stop even when an agent stop fails', async () => {
const home = await tempDir();
const rosterPath = join(home, 'fleet', 'roster.yaml');