Compare commits

...

2 Commits

Author SHA1 Message Date
Jarvis
43ad813e0d fix(fleet): make restart-lock release/break ownership-safe (review #680)
Some checks failed
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/pr/ci Pipeline was canceled
Addresses the reviewer's blocker (comment 15915): release() unconditionally
unlinked restart.lock, so after a stale/max-wait break an OLD owner could
delete a NEWER owner's lock, letting a third restart interleave and defeating
the guard.

- Each acquire writes a unique owner token (randomUUID) into the lock file.
- release() only unlinks while that token is still on disk; once another caller
  has broken and re-owned the lock, the timed-out original owner's release() is
  a no-op and leaves the new owner's lock intact.
- Breaking a stale/timed-out lock now takes ownership atomically via
  write-temp + rename (atomic replace) instead of a blind unlink-then-recreate;
  a breaker that loses a concurrent takeover reads back a foreign token and
  keeps waiting rather than assuming ownership.

Regression test (does not let a timed-out owner drop a lock another restart
broke and re-owned) reproduces the three-restart interleave: R1 hangs (stale),
R2 breaks + re-owns, R1.release() must NOT drop R2's lock. Fails on the old
blind-unlink path (ENOENT), passes now. Also adds explicit single-agent
restart-path guard coverage (review should-fix).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-24 19:18:38 -05:00
Jarvis
9c2e4f0b2d fix(fleet): guard mosaic fleet restart against tight-loop re-entry race
`mosaic fleet restart` runs as a fresh process each invocation, issuing
`systemctl --user restart` for the tmux holder and then each agent. The
agent sessions live inside the holder's tmux, so restarting the holder
tears them down. With no mutual exclusion, a second restart entering
while the first is still mid-teardown (upgrade `--relaunch`, a watchdog,
or a hurried operator) interleaves: agents relaunch against a
half-torn-down holder, fail, and tight-loop.

Add a cross-process teardown-settle guard: a lock file under
`<mosaicHome>/fleet/run/restart.lock` acquired with O_CREAT|O_EXCL.
A re-entrant restart waits (bounded, injectable sleep) for the in-flight
restart to release the lock before relaunching, breaks a stale lock left
by a crashed owner, and after a max wait breaks the lock to avoid a
permanent deadlock. Both full-fleet and single-agent restart paths are
guarded; start/stop/status are unchanged.

Regression test reproduces the race: with an in-flight lock held, the
restart must wait before issuing any systemctl command — fails on the
unguarded code path, passes with the guard. Adds stale-lock-break and
lock-release coverage.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-24 19:18:38 -05:00
2 changed files with 436 additions and 1 deletions

View File

@@ -4,6 +4,7 @@ import { dirname, join, resolve } from 'node:path';
import { Command } from 'commander';
import { afterEach, describe, expect, it, vi } from 'vitest';
import {
acquireRestartLock,
addAgentToRoster,
buildAgentSendCommand,
buildAgentWatchAttachCommand,
@@ -45,6 +46,8 @@ import {
removeAgentFromRoster,
resolveFleetPaths,
resolvePresetFilename,
restartLockPath,
RESTART_LOCK_STALE_MS,
RUNTIME_ACCEPTABLE_COMMANDS,
serializeRosterToYaml,
VERIFY_DEFAULT_TIMEOUT_MS,
@@ -678,6 +681,231 @@ describe('fleet command construction', () => {
}
});
it('waits for an in-flight restart to clear before relaunching (re-entry guard)', async () => {
const home = await tempDir();
const rosterPath = join(home, 'fleet', 'roster.yaml');
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
rosterPath,
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
// Simulate another `mosaic fleet restart` process mid-teardown: a fresh lock
// (recent timestamp, so it is NOT treated as stale) already held.
const lockPath = restartLockPath(home);
await mkdir(dirname(lockPath), { recursive: true });
await writeFile(lockPath, `4242\n${Date.now()}\n`);
const events: string[] = [];
const runner: CommandRunner = async (command, args) => {
events.push(`run:${args[args.length - 1]}`);
return { stdout: '', stderr: '', exitCode: 0 };
};
// The injected sleep stands in for time passing while we wait; the in-flight
// restart "finishes" (releases its lock) after the first poll.
let sleeps = 0;
const sleepFn: SleepFn = async () => {
sleeps += 1;
events.push(`sleep:${sleeps}`);
await rm(lockPath, { force: true });
};
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
// It must have waited at least once before issuing any systemctl restart.
expect(sleeps).toBeGreaterThan(0);
const firstSleep = events.findIndex((e) => e.startsWith('sleep:'));
const firstRun = events.findIndex((e) => e.startsWith('run:'));
expect(firstSleep).toBeGreaterThanOrEqual(0);
expect(firstRun).toBeGreaterThan(firstSleep);
// And it still performs the full restart once the lock clears.
expect(events).toContain('run:mosaic-tmux-holder.service');
expect(events).toContain('run:mosaic-agent@coder0.service');
// The lock is released after the restart completes.
await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' });
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('breaks a stale restart lock and proceeds without waiting', async () => {
const home = await tempDir();
const rosterPath = join(home, 'fleet', 'roster.yaml');
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
rosterPath,
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
// A lock left behind by a crashed owner: timestamp older than the stale window.
const lockPath = restartLockPath(home);
await mkdir(dirname(lockPath), { recursive: true });
await writeFile(lockPath, `4242\n${Date.now() - RESTART_LOCK_STALE_MS - 1_000}\n`);
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
return { stdout: '', stderr: '', exitCode: 0 };
};
const sleepFn = vi.fn<SleepFn>(async () => {});
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
// Stale lock is broken immediately — no waiting.
expect(sleepFn).not.toHaveBeenCalled();
expect(calls).toEqual([
['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'],
['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'],
]);
// The stale lock is gone once the restart completes.
await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' });
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('releases the restart lock so a subsequent restart is not blocked', async () => {
const home = await tempDir();
const rosterPath = join(home, 'fleet', 'roster.yaml');
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
rosterPath,
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
return { stdout: '', stderr: '', exitCode: 0 };
};
const sleepFn = vi.fn<SleepFn>(async () => {});
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
// Two sequential restarts both run fully and neither has to wait.
expect(sleepFn).not.toHaveBeenCalled();
expect(calls).toEqual([
['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'],
['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'],
['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'],
['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'],
]);
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('guards the single-agent restart path behind the in-flight restart lock', async () => {
const home = await tempDir();
const rosterPath = join(home, 'fleet', 'roster.yaml');
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
rosterPath,
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
// A full restart is mid-flight (lock held); a single-agent restart re-enters.
const lockPath = restartLockPath(home);
await mkdir(dirname(lockPath), { recursive: true });
await writeFile(lockPath, `4242\n${Date.now()}\n`);
const events: string[] = [];
const runner: CommandRunner = async (command, args) => {
events.push(`run:${args[args.length - 1]}`);
return { stdout: '', stderr: '', exitCode: 0 };
};
let sleeps = 0;
const sleepFn: SleepFn = async () => {
sleeps += 1;
events.push(`sleep:${sleeps}`);
await rm(lockPath, { force: true });
};
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart', 'coder0']);
// The single-agent restart waits for the in-flight restart before acting.
expect(sleeps).toBeGreaterThan(0);
const firstSleep = events.findIndex((e) => e.startsWith('sleep:'));
const firstRun = events.findIndex((e) => e.startsWith('run:'));
expect(firstSleep).toBeGreaterThanOrEqual(0);
expect(firstRun).toBeGreaterThan(firstSleep);
// Only the named agent is restarted; the holder is untouched.
expect(events).toContain('run:mosaic-agent@coder0.service');
expect(events).not.toContain('run:mosaic-tmux-holder.service');
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('does not let a timed-out owner drop a lock another restart broke and re-owned', async () => {
const home = await tempDir();
const runDir = join(home, 'fleet', 'run');
await mkdir(runDir, { recursive: true });
const lockPath = restartLockPath(home);
const tokenOf = async (): Promise<string> => {
const raw = await readFile(lockPath, 'utf8');
return raw.split('\n')[2]?.trim() ?? '';
};
const sleepFn = vi.fn<SleepFn>(async () => {});
// R1 acquires the lock and begins a restart that then hangs.
const r1 = await acquireRestartLock(home, sleepFn);
const tokenR1 = await tokenOf();
expect(tokenR1).not.toBe('');
// The hung R1 leaves a stale lock: rewrite its timestamp into the past while
// preserving R1's token — exactly the on-disk state a stuck owner leaves.
await writeFile(lockPath, `4242\n${Date.now() - RESTART_LOCK_STALE_MS - 1_000}\n${tokenR1}\n`);
// R2 re-enters, sees the stale lock, and atomically takes ownership.
const r2 = await acquireRestartLock(home, sleepFn);
const tokenR2 = await tokenOf();
expect(tokenR2).not.toBe(tokenR1);
expect(sleepFn).not.toHaveBeenCalled();
// R1 finally finishes and releases. It must NOT delete R2's lock — otherwise
// a third restart (R3) could acquire and interleave with R2 still running.
await r1.release();
expect(await tokenOf()).toBe(tokenR2);
// R2 releases cleanly and the lock is gone.
await r2.release();
await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' });
await rm(home, { recursive: true, force: true });
});
it('attempts every agent and the holder during fleet stop even when an agent stop fails', async () => {
const home = await tempDir();
const rosterPath = join(home, 'fleet', 'roster.yaml');

View File

@@ -1,5 +1,16 @@
import { constants } from 'node:fs';
import { access, chmod, copyFile, mkdir, readFile, unlink, writeFile } from 'node:fs/promises';
import {
access,
chmod,
copyFile,
mkdir,
open,
readFile,
rename,
unlink,
writeFile,
} from 'node:fs/promises';
import { randomUUID } from 'node:crypto';
import { homedir, hostname, userInfo } from 'node:os';
import { dirname, join, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
@@ -533,6 +544,173 @@ export function buildFleetServiceCommand(action: FleetServiceAction, agentName?:
return ['systemctl', '--user', action, service];
}
/** Poll interval (ms) while waiting for an in-flight restart's lock to clear. */
export const RESTART_LOCK_POLL_INTERVAL_MS = 250;
/**
* Maximum time (ms) a re-entrant restart waits for the in-flight restart to
* finish before it breaks the lock and proceeds anyway. A bound is required so
* a crashed holder of the lock can never deadlock the fleet permanently.
*/
export const RESTART_LOCK_MAX_WAIT_MS = 30_000;
/**
* Age (ms) past which a restart lock is treated as stale (its owner died
* without releasing it) and is broken immediately rather than waited on.
*/
export const RESTART_LOCK_STALE_MS = 60_000;
/**
* Resolves the path of the cross-process restart lock for a given Mosaic home.
* Kept strictly under `<mosaicHome>/fleet/run` (not the heartbeat env override)
* so the lock is scoped to the same fleet the restart acts on.
*/
export function restartLockPath(mosaicHome: string): string {
return join(mosaicHome, 'fleet', 'run', 'restart.lock');
}
/** A held restart lock; `release()` removes the lock file iff we still own it. */
interface RestartGuard {
release(): Promise<void>;
}
/** Lock-file contents: pid (informational), timestamp, and a unique owner token. */
function formatRestartLockContent(token: string): string {
return `${process.pid}\n${Date.now()}\n${token}\n`;
}
/**
* Reads the owner token (line 3) from a lock file, or null if the file is
* missing/unreadable/tokenless. The token is what makes release and break
* ownership-safe: a process only ever acts on a lock whose token matches its own.
*/
async function readRestartLockToken(lockPath: string): Promise<string | null> {
let raw: string;
try {
raw = await readFile(lockPath, 'utf8');
} catch {
return null;
}
const token = raw.split('\n')[2]?.trim();
return token ? token : null;
}
/**
* Returns true when an existing lock file is stale: older than
* RESTART_LOCK_STALE_MS, or unreadable/unparseable (a corrupt or partially
* written lock left by a crashed owner). A vanished lock (ENOENT) is not stale —
* the next acquire attempt will simply succeed.
*/
async function isRestartLockStale(lockPath: string, now: number): Promise<boolean> {
let raw: string;
try {
raw = await readFile(lockPath, 'utf8');
} catch (err) {
if ((err as NodeJS.ErrnoException).code === 'ENOENT') {
return false;
}
return true;
}
const stampLine = raw.split('\n')[1] ?? '';
const stamp = Number.parseInt(stampLine.trim(), 10);
if (!Number.isFinite(stamp)) {
return true;
}
return now - stamp >= RESTART_LOCK_STALE_MS;
}
/**
* Atomically take over an existing (stale or timed-out) lock WITHOUT blind
* unlinking it: write our own token to a temp file and `rename()` it over the
* lock. rename is atomic, so it replaces the prior owner's content in one step
* rather than the unsafe unlink-then-recreate (which a third restart could slip
* between). Returns true only if our token is the one on disk afterwards — if a
* concurrent breaker raced and won, we read back their token and return false so
* the caller keeps waiting instead of assuming ownership.
*/
async function breakAndOwnRestartLock(
lockPath: string,
token: string,
content: string,
): Promise<boolean> {
const tmpPath = `${lockPath}.${token}`;
await writeFile(tmpPath, content);
try {
await rename(tmpPath, lockPath);
} catch (err) {
await unlink(tmpPath).catch(() => {});
throw err;
}
return (await readRestartLockToken(lockPath)) === token;
}
/**
* Acquire the fleet restart lock, serializing concurrent `mosaic fleet restart`
* invocations across processes. Each restart tears the tmux holder (and the
* agent sessions inside it) down and back up; without this guard a re-entrant
* restart relaunches agents against a half-torn-down holder, which fails and
* tight-loops. A re-entrant caller waits for the in-flight restart to release
* the lock (clean shutdown settled) before proceeding, breaks a stale lock left
* by a crashed owner, and after RESTART_LOCK_MAX_WAIT_MS breaks the lock to
* avoid a permanent deadlock.
*
* Ownership is tracked by a unique per-acquire token written into the lock.
* `release()` only unlinks the lock while our token is still on disk, and a
* break takes ownership atomically — so once another caller has broken and
* re-owned the lock, neither the timed-out original owner's `release()` nor a
* stale `break` can drop the new owner's lock and let a third restart interleave.
*/
export async function acquireRestartLock(
mosaicHome: string,
sleepFn: SleepFn,
): Promise<RestartGuard> {
const token = randomUUID();
const lockPath = restartLockPath(mosaicHome);
await mkdir(dirname(lockPath), { recursive: true });
const release = async (): Promise<void> => {
// Ownership-safe: only remove the lock if it is still ours. If another
// caller broke and re-owned it (after a stale/timeout break), the token no
// longer matches and we must leave their lock intact.
if ((await readRestartLockToken(lockPath)) !== token) {
return;
}
try {
await unlink(lockPath);
} catch {
// Raced away between the token check and unlink — nothing more to do.
}
};
const deadline = Date.now() + RESTART_LOCK_MAX_WAIT_MS;
for (;;) {
try {
const handle = await open(lockPath, 'wx');
await handle.writeFile(formatRestartLockContent(token));
await handle.close();
return { release };
} catch (err) {
if ((err as NodeJS.ErrnoException).code !== 'EEXIST') {
throw err;
}
// A restart is already in flight (or its lock was left behind).
const stale = await isRestartLockStale(lockPath, Date.now());
const timedOut = Date.now() >= deadline;
if (stale || timedOut) {
if (await breakAndOwnRestartLock(lockPath, token, formatRestartLockContent(token))) {
process.stderr.write(
stale
? 'Breaking stale fleet restart lock and proceeding.\n'
: `Timed out after ${RESTART_LOCK_MAX_WAIT_MS}ms waiting for the in-flight fleet ` +
'restart; breaking the lock and proceeding.\n',
);
return { release };
}
// A concurrent breaker won the takeover; back off and re-evaluate.
await sleepFn(RESTART_LOCK_POLL_INTERVAL_MS);
continue;
}
await sleepFn(RESTART_LOCK_POLL_INTERVAL_MS);
}
}
}
/**
* Returns the systemctl --user enable command for a given unit.
* Used by the install auto-enable step to persist units across reboots.
@@ -1172,6 +1350,7 @@ export function isSendAccepted(capturedOutput: string): SendVerifyResult {
export function registerFleetCommand(program: Command, deps: FleetCommandDeps = {}): Command {
const runner = deps.runner ?? runCommand;
const sleepFn = deps.sleepFn ?? defaultSleep;
const paths = resolveFleetPaths(deps.mosaicHome);
const frameworkRoot = deps.frameworkRoot ?? resolveFrameworkRoot();
@@ -1285,9 +1464,22 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
.command(`${action} [agent]`)
.description(`${action} the fleet holder or one agent`)
.action(async (agent?: string) => {
const commandOpts = cmd.opts<{ mosaicHome: string; roster?: string }>();
const activePaths = resolveFleetPaths(commandOpts.mosaicHome);
const roster = await loadRosterForCommand(cmd);
if (agent) {
getRosterAgent(roster, agent);
// Single-agent restart is guarded too: it can race a full restart that
// is tearing the shared holder down.
if (action === 'restart') {
const guard = await acquireRestartLock(activePaths.mosaicHome, sleepFn);
try {
await runChecked(runner, buildFleetServiceCommand(action, agent));
} finally {
await guard.release();
}
return;
}
await runChecked(runner, buildFleetServiceCommand(action, agent));
return;
}
@@ -1298,6 +1490,21 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
);
return;
}
if (action === 'restart') {
// Serialize the holder+agents teardown/relaunch behind the restart lock
// so a re-entrant restart waits for clean shutdown before relaunching,
// instead of racing a half-torn-down holder into a tight loop.
const guard = await acquireRestartLock(activePaths.mosaicHome, sleepFn);
try {
await runChecked(runner, buildFleetServiceCommand(action));
for (const rosterAgent of roster.agents) {
await runChecked(runner, buildFleetServiceCommand(action, rosterAgent.name));
}
} finally {
await guard.release();
}
return;
}
await runChecked(runner, buildFleetServiceCommand(action));
for (const rosterAgent of roster.agents) {
await runChecked(runner, buildFleetServiceCommand(action, rosterAgent.name));