fix(fleet): make restart-lock release/break ownership-safe (review #680)
Addresses the reviewer's blocker (comment 15915): release() unconditionally unlinked restart.lock, so after a stale/max-wait break an OLD owner could delete a NEWER owner's lock, letting a third restart interleave and defeating the guard. - Each acquire writes a unique owner token (randomUUID) into the lock file. - release() only unlinks while that token is still on disk; once another caller has broken and re-owned the lock, the timed-out original owner's release() is a no-op and leaves the new owner's lock intact. - Breaking a stale/timed-out lock now takes ownership atomically via write-temp + rename (atomic replace) instead of a blind unlink-then-recreate; a breaker that loses a concurrent takeover reads back a foreign token and keeps waiting rather than assuming ownership. Regression test (does not let a timed-out owner drop a lock another restart broke and re-owned) reproduces the three-restart interleave: R1 hangs (stale), R2 breaks + re-owns, R1.release() must NOT drop R2's lock. Fails on the old blind-unlink path (ENOENT), passes now. Also adds explicit single-agent restart-path guard coverage (review should-fix). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -4,6 +4,7 @@ import { dirname, join, resolve } from 'node:path';
|
|||||||
import { Command } from 'commander';
|
import { Command } from 'commander';
|
||||||
import { afterEach, describe, expect, it, vi } from 'vitest';
|
import { afterEach, describe, expect, it, vi } from 'vitest';
|
||||||
import {
|
import {
|
||||||
|
acquireRestartLock,
|
||||||
addAgentToRoster,
|
addAgentToRoster,
|
||||||
buildAgentSendCommand,
|
buildAgentSendCommand,
|
||||||
buildAgentWatchAttachCommand,
|
buildAgentWatchAttachCommand,
|
||||||
@@ -818,6 +819,93 @@ describe('fleet command construction', () => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('guards the single-agent restart path behind the in-flight restart lock', async () => {
|
||||||
|
const home = await tempDir();
|
||||||
|
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
||||||
|
await mkdir(join(home, 'fleet'), { recursive: true });
|
||||||
|
await writeFile(
|
||||||
|
rosterPath,
|
||||||
|
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
|
||||||
|
'\n',
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
// A full restart is mid-flight (lock held); a single-agent restart re-enters.
|
||||||
|
const lockPath = restartLockPath(home);
|
||||||
|
await mkdir(dirname(lockPath), { recursive: true });
|
||||||
|
await writeFile(lockPath, `4242\n${Date.now()}\n`);
|
||||||
|
|
||||||
|
const events: string[] = [];
|
||||||
|
const runner: CommandRunner = async (command, args) => {
|
||||||
|
events.push(`run:${args[args.length - 1]}`);
|
||||||
|
return { stdout: '', stderr: '', exitCode: 0 };
|
||||||
|
};
|
||||||
|
let sleeps = 0;
|
||||||
|
const sleepFn: SleepFn = async () => {
|
||||||
|
sleeps += 1;
|
||||||
|
events.push(`sleep:${sleeps}`);
|
||||||
|
await rm(lockPath, { force: true });
|
||||||
|
};
|
||||||
|
|
||||||
|
const program = new Command();
|
||||||
|
program.exitOverride();
|
||||||
|
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
|
||||||
|
|
||||||
|
try {
|
||||||
|
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart', 'coder0']);
|
||||||
|
|
||||||
|
// The single-agent restart waits for the in-flight restart before acting.
|
||||||
|
expect(sleeps).toBeGreaterThan(0);
|
||||||
|
const firstSleep = events.findIndex((e) => e.startsWith('sleep:'));
|
||||||
|
const firstRun = events.findIndex((e) => e.startsWith('run:'));
|
||||||
|
expect(firstSleep).toBeGreaterThanOrEqual(0);
|
||||||
|
expect(firstRun).toBeGreaterThan(firstSleep);
|
||||||
|
// Only the named agent is restarted; the holder is untouched.
|
||||||
|
expect(events).toContain('run:mosaic-agent@coder0.service');
|
||||||
|
expect(events).not.toContain('run:mosaic-tmux-holder.service');
|
||||||
|
} finally {
|
||||||
|
await rm(home, { recursive: true, force: true });
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('does not let a timed-out owner drop a lock another restart broke and re-owned', async () => {
|
||||||
|
const home = await tempDir();
|
||||||
|
const runDir = join(home, 'fleet', 'run');
|
||||||
|
await mkdir(runDir, { recursive: true });
|
||||||
|
const lockPath = restartLockPath(home);
|
||||||
|
const tokenOf = async (): Promise<string> => {
|
||||||
|
const raw = await readFile(lockPath, 'utf8');
|
||||||
|
return raw.split('\n')[2]?.trim() ?? '';
|
||||||
|
};
|
||||||
|
const sleepFn = vi.fn<SleepFn>(async () => {});
|
||||||
|
|
||||||
|
// R1 acquires the lock and begins a restart that then hangs.
|
||||||
|
const r1 = await acquireRestartLock(home, sleepFn);
|
||||||
|
const tokenR1 = await tokenOf();
|
||||||
|
expect(tokenR1).not.toBe('');
|
||||||
|
|
||||||
|
// The hung R1 leaves a stale lock: rewrite its timestamp into the past while
|
||||||
|
// preserving R1's token — exactly the on-disk state a stuck owner leaves.
|
||||||
|
await writeFile(lockPath, `4242\n${Date.now() - RESTART_LOCK_STALE_MS - 1_000}\n${tokenR1}\n`);
|
||||||
|
|
||||||
|
// R2 re-enters, sees the stale lock, and atomically takes ownership.
|
||||||
|
const r2 = await acquireRestartLock(home, sleepFn);
|
||||||
|
const tokenR2 = await tokenOf();
|
||||||
|
expect(tokenR2).not.toBe(tokenR1);
|
||||||
|
expect(sleepFn).not.toHaveBeenCalled();
|
||||||
|
|
||||||
|
// R1 finally finishes and releases. It must NOT delete R2's lock — otherwise
|
||||||
|
// a third restart (R3) could acquire and interleave with R2 still running.
|
||||||
|
await r1.release();
|
||||||
|
expect(await tokenOf()).toBe(tokenR2);
|
||||||
|
|
||||||
|
// R2 releases cleanly and the lock is gone.
|
||||||
|
await r2.release();
|
||||||
|
await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' });
|
||||||
|
|
||||||
|
await rm(home, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
it('attempts every agent and the holder during fleet stop even when an agent stop fails', async () => {
|
it('attempts every agent and the holder during fleet stop even when an agent stop fails', async () => {
|
||||||
const home = await tempDir();
|
const home = await tempDir();
|
||||||
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
||||||
|
|||||||
@@ -6,9 +6,11 @@ import {
|
|||||||
mkdir,
|
mkdir,
|
||||||
open,
|
open,
|
||||||
readFile,
|
readFile,
|
||||||
|
rename,
|
||||||
unlink,
|
unlink,
|
||||||
writeFile,
|
writeFile,
|
||||||
} from 'node:fs/promises';
|
} from 'node:fs/promises';
|
||||||
|
import { randomUUID } from 'node:crypto';
|
||||||
import { homedir, hostname, userInfo } from 'node:os';
|
import { homedir, hostname, userInfo } from 'node:os';
|
||||||
import { dirname, join, resolve } from 'node:path';
|
import { dirname, join, resolve } from 'node:path';
|
||||||
import { fileURLToPath } from 'node:url';
|
import { fileURLToPath } from 'node:url';
|
||||||
@@ -565,11 +567,32 @@ export function restartLockPath(mosaicHome: string): string {
|
|||||||
return join(mosaicHome, 'fleet', 'run', 'restart.lock');
|
return join(mosaicHome, 'fleet', 'run', 'restart.lock');
|
||||||
}
|
}
|
||||||
|
|
||||||
/** A held restart lock; `release()` removes the lock file (idempotent). */
|
/** A held restart lock; `release()` removes the lock file iff we still own it. */
|
||||||
interface RestartGuard {
|
interface RestartGuard {
|
||||||
release(): Promise<void>;
|
release(): Promise<void>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Lock-file contents: pid (informational), timestamp, and a unique owner token. */
|
||||||
|
function formatRestartLockContent(token: string): string {
|
||||||
|
return `${process.pid}\n${Date.now()}\n${token}\n`;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reads the owner token (line 3) from a lock file, or null if the file is
|
||||||
|
* missing/unreadable/tokenless. The token is what makes release and break
|
||||||
|
* ownership-safe: a process only ever acts on a lock whose token matches its own.
|
||||||
|
*/
|
||||||
|
async function readRestartLockToken(lockPath: string): Promise<string | null> {
|
||||||
|
let raw: string;
|
||||||
|
try {
|
||||||
|
raw = await readFile(lockPath, 'utf8');
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
const token = raw.split('\n')[2]?.trim();
|
||||||
|
return token ? token : null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns true when an existing lock file is stale: older than
|
* Returns true when an existing lock file is stale: older than
|
||||||
* RESTART_LOCK_STALE_MS, or unreadable/unparseable (a corrupt or partially
|
* RESTART_LOCK_STALE_MS, or unreadable/unparseable (a corrupt or partially
|
||||||
@@ -594,6 +617,31 @@ async function isRestartLockStale(lockPath: string, now: number): Promise<boolea
|
|||||||
return now - stamp >= RESTART_LOCK_STALE_MS;
|
return now - stamp >= RESTART_LOCK_STALE_MS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Atomically take over an existing (stale or timed-out) lock WITHOUT blind
|
||||||
|
* unlinking it: write our own token to a temp file and `rename()` it over the
|
||||||
|
* lock. rename is atomic, so it replaces the prior owner's content in one step
|
||||||
|
* rather than the unsafe unlink-then-recreate (which a third restart could slip
|
||||||
|
* between). Returns true only if our token is the one on disk afterwards — if a
|
||||||
|
* concurrent breaker raced and won, we read back their token and return false so
|
||||||
|
* the caller keeps waiting instead of assuming ownership.
|
||||||
|
*/
|
||||||
|
async function breakAndOwnRestartLock(
|
||||||
|
lockPath: string,
|
||||||
|
token: string,
|
||||||
|
content: string,
|
||||||
|
): Promise<boolean> {
|
||||||
|
const tmpPath = `${lockPath}.${token}`;
|
||||||
|
await writeFile(tmpPath, content);
|
||||||
|
try {
|
||||||
|
await rename(tmpPath, lockPath);
|
||||||
|
} catch (err) {
|
||||||
|
await unlink(tmpPath).catch(() => {});
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
return (await readRestartLockToken(lockPath)) === token;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Acquire the fleet restart lock, serializing concurrent `mosaic fleet restart`
|
* Acquire the fleet restart lock, serializing concurrent `mosaic fleet restart`
|
||||||
* invocations across processes. Each restart tears the tmux holder (and the
|
* invocations across processes. Each restart tears the tmux holder (and the
|
||||||
@@ -603,22 +651,38 @@ async function isRestartLockStale(lockPath: string, now: number): Promise<boolea
|
|||||||
* the lock (clean shutdown settled) before proceeding, breaks a stale lock left
|
* the lock (clean shutdown settled) before proceeding, breaks a stale lock left
|
||||||
* by a crashed owner, and after RESTART_LOCK_MAX_WAIT_MS breaks the lock to
|
* by a crashed owner, and after RESTART_LOCK_MAX_WAIT_MS breaks the lock to
|
||||||
* avoid a permanent deadlock.
|
* avoid a permanent deadlock.
|
||||||
|
*
|
||||||
|
* Ownership is tracked by a unique per-acquire token written into the lock.
|
||||||
|
* `release()` only unlinks the lock while our token is still on disk, and a
|
||||||
|
* break takes ownership atomically — so once another caller has broken and
|
||||||
|
* re-owned the lock, neither the timed-out original owner's `release()` nor a
|
||||||
|
* stale `break` can drop the new owner's lock and let a third restart interleave.
|
||||||
*/
|
*/
|
||||||
async function acquireRestartLock(mosaicHome: string, sleepFn: SleepFn): Promise<RestartGuard> {
|
export async function acquireRestartLock(
|
||||||
|
mosaicHome: string,
|
||||||
|
sleepFn: SleepFn,
|
||||||
|
): Promise<RestartGuard> {
|
||||||
|
const token = randomUUID();
|
||||||
const lockPath = restartLockPath(mosaicHome);
|
const lockPath = restartLockPath(mosaicHome);
|
||||||
await mkdir(dirname(lockPath), { recursive: true });
|
await mkdir(dirname(lockPath), { recursive: true });
|
||||||
const release = async (): Promise<void> => {
|
const release = async (): Promise<void> => {
|
||||||
|
// Ownership-safe: only remove the lock if it is still ours. If another
|
||||||
|
// caller broke and re-owned it (after a stale/timeout break), the token no
|
||||||
|
// longer matches and we must leave their lock intact.
|
||||||
|
if ((await readRestartLockToken(lockPath)) !== token) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
await unlink(lockPath);
|
await unlink(lockPath);
|
||||||
} catch {
|
} catch {
|
||||||
// Already gone (broken as stale by another waiter, or never written) — fine.
|
// Raced away between the token check and unlink — nothing more to do.
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
const deadline = Date.now() + RESTART_LOCK_MAX_WAIT_MS;
|
const deadline = Date.now() + RESTART_LOCK_MAX_WAIT_MS;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
try {
|
try {
|
||||||
const handle = await open(lockPath, 'wx');
|
const handle = await open(lockPath, 'wx');
|
||||||
await handle.writeFile(`${process.pid}\n${Date.now()}\n`);
|
await handle.writeFile(formatRestartLockContent(token));
|
||||||
await handle.close();
|
await handle.close();
|
||||||
return { release };
|
return { release };
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
@@ -626,17 +690,20 @@ async function acquireRestartLock(mosaicHome: string, sleepFn: SleepFn): Promise
|
|||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
// A restart is already in flight (or its lock was left behind).
|
// A restart is already in flight (or its lock was left behind).
|
||||||
if (await isRestartLockStale(lockPath, Date.now())) {
|
const stale = await isRestartLockStale(lockPath, Date.now());
|
||||||
process.stderr.write('Breaking stale fleet restart lock and proceeding.\n');
|
const timedOut = Date.now() >= deadline;
|
||||||
await release();
|
if (stale || timedOut) {
|
||||||
continue;
|
if (await breakAndOwnRestartLock(lockPath, token, formatRestartLockContent(token))) {
|
||||||
}
|
|
||||||
if (Date.now() >= deadline) {
|
|
||||||
process.stderr.write(
|
process.stderr.write(
|
||||||
`Timed out after ${RESTART_LOCK_MAX_WAIT_MS}ms waiting for the in-flight fleet ` +
|
stale
|
||||||
|
? 'Breaking stale fleet restart lock and proceeding.\n'
|
||||||
|
: `Timed out after ${RESTART_LOCK_MAX_WAIT_MS}ms waiting for the in-flight fleet ` +
|
||||||
'restart; breaking the lock and proceeding.\n',
|
'restart; breaking the lock and proceeding.\n',
|
||||||
);
|
);
|
||||||
await release();
|
return { release };
|
||||||
|
}
|
||||||
|
// A concurrent breaker won the takeover; back off and re-evaluate.
|
||||||
|
await sleepFn(RESTART_LOCK_POLL_INTERVAL_MS);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
await sleepFn(RESTART_LOCK_POLL_INTERVAL_MS);
|
await sleepFn(RESTART_LOCK_POLL_INTERVAL_MS);
|
||||||
|
|||||||
Reference in New Issue
Block a user