fix(fleet): guard mosaic fleet restart against tight-loop re-entry race (#680)
All checks were successful
ci/woodpecker/push/publish Pipeline was successful
ci/woodpecker/push/ci Pipeline was successful

This commit was merged in pull request #680.
This commit is contained in:
2026-06-25 01:44:48 +00:00
parent adb153428b
commit 67135d3822
2 changed files with 691 additions and 1 deletions

View File

@@ -1,5 +1,16 @@
import { constants } from 'node:fs';
import { access, chmod, copyFile, mkdir, readFile, unlink, writeFile } from 'node:fs/promises';
import {
access,
chmod,
copyFile,
mkdir,
open,
readFile,
stat,
unlink,
writeFile,
} from 'node:fs/promises';
import { randomUUID } from 'node:crypto';
import { homedir, hostname, userInfo } from 'node:os';
import { dirname, join, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
@@ -533,6 +544,295 @@ export function buildFleetServiceCommand(action: FleetServiceAction, agentName?:
return ['systemctl', '--user', action, service];
}
/** Poll interval (ms) while waiting for an in-flight restart's lock to clear. */
export const RESTART_LOCK_POLL_INTERVAL_MS = 250;
/**
* Maximum time (ms) a re-entrant restart waits for the in-flight restart to
* finish before it breaks the lock and proceeds anyway. A bound is required so
* a crashed holder of the lock can never deadlock the fleet permanently.
*/
export const RESTART_LOCK_MAX_WAIT_MS = 30_000;
/**
* Age (ms) past which a restart lock is treated as stale (its owner died
* without releasing it) and is broken immediately rather than waited on.
*/
export const RESTART_LOCK_STALE_MS = 60_000;
/**
* Resolves the path of the cross-process restart lock for a given Mosaic home.
* Kept strictly under `<mosaicHome>/fleet/run` (not the heartbeat env override)
* so the lock is scoped to the same fleet the restart acts on.
*/
export function restartLockPath(mosaicHome: string): string {
return join(mosaicHome, 'fleet', 'run', 'restart.lock');
}
/** A held restart lock; `release()` removes the lock file iff we still own it. */
interface RestartGuard {
release(): Promise<void>;
}
/** Lock-file contents: pid (informational), timestamp, and a unique owner token. */
function formatRestartLockContent(token: string): string {
return `${process.pid}\n${Date.now()}\n${token}\n`;
}
/**
* Reads the owner token (line 3) from a lock file, or null if the file is
* missing/unreadable/tokenless. The token is what makes release and break
* ownership-safe: a process only ever acts on a lock whose token matches its own.
*/
async function readRestartLockToken(lockPath: string): Promise<string | null> {
let raw: string;
try {
raw = await readFile(lockPath, 'utf8');
} catch {
return null;
}
const token = raw.split('\n')[2]?.trim();
return token ? token : null;
}
/**
* Returns true when a lock's contents are stale: older than RESTART_LOCK_STALE_MS,
* or unparseable (a corrupt or partially written lock left by a crashed owner).
*/
function isRestartLockContentStale(raw: string, now: number): boolean {
const stampLine = raw.split('\n')[1] ?? '';
const stamp = Number.parseInt(stampLine.trim(), 10);
if (!Number.isFinite(stamp)) {
return true;
}
return now - stamp >= RESTART_LOCK_STALE_MS;
}
/**
* Path of the short-lived registry mutex that guards EVERY transition of the
* restart lock (acquire, release, takeover). Held only across a few filesystem
* ops — never across the restart itself — so contention clears in microseconds.
*/
function restartMutexPath(lockPath: string): string {
return `${lockPath}.mutex`;
}
/** Brief back-off between registry-mutex acquisition attempts (held microseconds). */
const RESTART_MUTEX_RETRY_MS = 20;
/**
* Staleness for the internal mutex / reclaim locks, judged by the file's mtime
* rather than its CONTENT. `open(path, 'wx')` creates the inode (with a fresh
* mtime) before any token/timestamp is written into it, so a content-based check
* would momentarily see that empty file as corrupt-and-stale and could reap a
* lock another contender is still acquiring. mtime is set atomically at creation,
* so a just-created lock always reads as live; only a lock whose holder died and
* stopped touching it ages past the threshold. These locks are never held across
* the restart itself (only a couple of filesystem ops), so any mtime this old can
* belong only to a dead holder.
*/
async function isRestartLockPathStale(path: string, now: number): Promise<boolean> {
try {
const info = await stat(path);
return now - info.mtimeMs >= RESTART_LOCK_STALE_MS;
} catch (err) {
if ((err as NodeJS.ErrnoException).code === 'ENOENT') {
return false; // Gone, not stale — the caller will re-contend.
}
return false; // Can't stat — treat as live and back off rather than reap.
}
}
/** Path of the reclaim lock that serializes reaping of a crashed-holder mutex. */
function restartReclaimPath(mutexPath: string): string {
return `${mutexPath}.reclaim`;
}
/**
* Reap a registry mutex left behind by a process that CRASHED mid-transition —
* one whose file has aged past RESTART_LOCK_STALE_MS. Because the mutex is held
* only for a couple of filesystem ops (no sleeps, never across the restart), a
* mutex this old can only belong to a dead holder.
*
* The reap removes the dead mutex but never CREATES/holds it — acquisition stays
* the single `open('wx')` create in {@link acquireRestartMutex}, so exactly one
* contender wins ownership no matter how the reap and acquires interleave. The
* removal is made conditional by a dedicated reclaim lock: while it is held the
* dead mutex is stable (its dead holder will never touch it, and no other
* reclaimer can race), so re-reading it and removing it only if it is STILL stale
* is a true compare — a live holder's fresh mutex is never removed. This closes
* the reclaim race a content-blind rename-and-restore left open (a third
* contender slipping into the gap while a fresh mutex was moved aside).
*/
async function reclaimStaleRestartMutex(mutexPath: string): Promise<void> {
const reclaimPath = restartReclaimPath(mutexPath);
let handle: Awaited<ReturnType<typeof open>>;
try {
handle = await open(reclaimPath, 'wx');
} catch (err) {
if ((err as NodeJS.ErrnoException).code !== 'EEXIST') {
throw err;
}
// Someone is already reclaiming. If their reclaim lock is itself stale by
// mtime, its holder crashed mid-reap (the lock spans only a stat + unlink,
// microseconds) — clear it so a later pass can retry. Otherwise a live
// reclaimer has it; back off. Either way we do not reap the mutex this pass.
if (await isRestartLockPathStale(reclaimPath, Date.now())) {
await unlink(reclaimPath).catch(() => {});
}
return;
}
try {
// Re-check the mutex UNDER the reclaim lock and remove it only if it is STILL
// stale by mtime. A live holder's mutex is fresh and is left untouched; a dead
// holder's mutex is stable here (its holder is gone and no other reclaimer can
// race us), so this re-check is authoritative.
if (await isRestartLockPathStale(mutexPath, Date.now())) {
await unlink(mutexPath).catch(() => {});
}
} finally {
await handle.close();
await unlink(reclaimPath).catch(() => {});
}
}
/**
* Acquire the registry mutex, BLOCKING (with brief back-offs) until held, and
* return a token-gated release. This is the single point of mutual exclusion for
* the restart lock: acquire, release, and stale/timeout takeover all run under it,
* so "read the lock, then mutate it" is atomic — no acquirer, releaser, or breaker
* can ever interleave with another. A mutex left by a crashed holder is reclaimed
* once it ages past the stale threshold.
*/
async function acquireRestartMutex(
mutexPath: string,
token: string,
): Promise<RestartGuard['release']> {
for (;;) {
let handle: Awaited<ReturnType<typeof open>>;
try {
handle = await open(mutexPath, 'wx');
} catch (err) {
if ((err as NodeJS.ErrnoException).code !== 'EEXIST') {
throw err;
}
// Staleness is judged by mtime, not content, so a mutex that exists but has
// not yet had its token written (the open-before-write window) reads as live
// and is never wrongly reaped.
if (!(await isRestartLockPathStale(mutexPath, Date.now()))) {
// A live holder has it — it will be gone in microseconds. Back off briefly.
await new Promise((resolve) => setTimeout(resolve, RESTART_MUTEX_RETRY_MS));
continue;
}
await reclaimStaleRestartMutex(mutexPath);
continue;
}
// We created the mutex. Populate it with our token; if writing fails, clean up
// our own file so we never leak an empty mutex that a peer would have to reap.
try {
await handle.writeFile(formatRestartLockContent(token));
await handle.close();
} catch (err) {
await handle.close().catch(() => {});
await unlink(mutexPath).catch(() => {});
throw err;
}
return async (): Promise<void> => {
if ((await readRestartLockToken(mutexPath)) !== token) return;
await unlink(mutexPath).catch(() => {});
};
}
}
/**
* Acquire the fleet restart lock, serializing concurrent `mosaic fleet restart`
* invocations across processes. Each restart tears the tmux holder (and the
* agent sessions inside it) down and back up; without this guard a re-entrant
* restart relaunches agents against a half-torn-down holder, which fails and
* tight-loops. A re-entrant caller waits for the in-flight restart to release
* the lock (clean shutdown settled) before proceeding, breaks a stale lock left
* by a crashed owner, and after RESTART_LOCK_MAX_WAIT_MS breaks the lock to
* avoid a permanent deadlock.
*
* Correctness rests on a single invariant: EVERY transition of the lock — taking
* a free lock, taking over a stale/timed-out one, and releasing — happens under
* the registry mutex. Because the check ("is the lock free / stale / fresh?") and
* the mutation that follows it both run while the mutex is held, they are atomic:
* no other acquirer, releaser, or breaker can slip in between. That is what makes
* takeover a true compare-and-swap rather than a content-blind clobber — a normal
* `open('wx')` acquirer cannot create a fresh lock in a gap, and the original
* owner's `release()` (also mutex-gated and token-checked) cannot drop a lock a
* breaker already took over. So no interleaving lets two restarts both own the
* lock and run concurrently.
*/
export async function acquireRestartLock(
mosaicHome: string,
sleepFn: SleepFn,
): Promise<RestartGuard> {
const token = randomUUID();
const lockPath = restartLockPath(mosaicHome);
const mutexPath = restartMutexPath(lockPath);
await mkdir(dirname(lockPath), { recursive: true });
const release = async (): Promise<void> => {
// Mutex-gated and token-gated: only remove the lock if it is still ours. If
// another caller took it over (after a stale/timeout break) the token no
// longer matches and we leave their lock intact.
const releaseMutex = await acquireRestartMutex(mutexPath, token);
try {
if ((await readRestartLockToken(lockPath)) === token) {
await unlink(lockPath).catch(() => {});
}
} finally {
await releaseMutex();
}
};
const deadline = Date.now() + RESTART_LOCK_MAX_WAIT_MS;
for (;;) {
let owned = false;
const releaseMutex = await acquireRestartMutex(mutexPath, token);
try {
// Read and (if appropriate) mutate the lock atomically under the mutex.
let current: string | null = null;
let absent = false;
try {
current = await readFile(lockPath, 'utf8');
} catch (readErr) {
if ((readErr as NodeJS.ErrnoException).code === 'ENOENT') {
absent = true;
} else {
current = null; // Unreadable/corrupt: treat as stale.
}
}
const now = Date.now();
if (absent) {
// Lock is free — take it.
await writeFile(lockPath, formatRestartLockContent(token));
owned = true;
} else {
const stale = current === null || isRestartLockContentStale(current, now);
const timedOut = now >= deadline;
if (stale || timedOut) {
process.stderr.write(
stale
? 'Breaking stale fleet restart lock.\n'
: `Timed out after ${RESTART_LOCK_MAX_WAIT_MS}ms waiting for the in-flight fleet ` +
'restart; breaking the lock.\n',
);
// Takeover is just an overwrite — safe because we hold the mutex, so no
// acquirer or releaser can touch the lock between our read and this write.
await writeFile(lockPath, formatRestartLockContent(token));
owned = true;
}
// else: a fresh restart owns it — wait below and re-evaluate.
}
} finally {
await releaseMutex();
}
if (owned) {
return { release };
}
await sleepFn(RESTART_LOCK_POLL_INTERVAL_MS);
}
}
/**
* Returns the systemctl --user enable command for a given unit.
* Used by the install auto-enable step to persist units across reboots.
@@ -1172,6 +1472,7 @@ export function isSendAccepted(capturedOutput: string): SendVerifyResult {
export function registerFleetCommand(program: Command, deps: FleetCommandDeps = {}): Command {
const runner = deps.runner ?? runCommand;
const sleepFn = deps.sleepFn ?? defaultSleep;
const paths = resolveFleetPaths(deps.mosaicHome);
const frameworkRoot = deps.frameworkRoot ?? resolveFrameworkRoot();
@@ -1285,9 +1586,22 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
.command(`${action} [agent]`)
.description(`${action} the fleet holder or one agent`)
.action(async (agent?: string) => {
const commandOpts = cmd.opts<{ mosaicHome: string; roster?: string }>();
const activePaths = resolveFleetPaths(commandOpts.mosaicHome);
const roster = await loadRosterForCommand(cmd);
if (agent) {
getRosterAgent(roster, agent);
// Single-agent restart is guarded too: it can race a full restart that
// is tearing the shared holder down.
if (action === 'restart') {
const guard = await acquireRestartLock(activePaths.mosaicHome, sleepFn);
try {
await runChecked(runner, buildFleetServiceCommand(action, agent));
} finally {
await guard.release();
}
return;
}
await runChecked(runner, buildFleetServiceCommand(action, agent));
return;
}
@@ -1298,6 +1612,21 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
);
return;
}
if (action === 'restart') {
// Serialize the holder+agents teardown/relaunch behind the restart lock
// so a re-entrant restart waits for clean shutdown before relaunching,
// instead of racing a half-torn-down holder into a tight loop.
const guard = await acquireRestartLock(activePaths.mosaicHome, sleepFn);
try {
await runChecked(runner, buildFleetServiceCommand(action));
for (const rosterAgent of roster.agents) {
await runChecked(runner, buildFleetServiceCommand(action, rosterAgent.name));
}
} finally {
await guard.release();
}
return;
}
await runChecked(runner, buildFleetServiceCommand(action));
for (const rosterAgent of roster.agents) {
await runChecked(runner, buildFleetServiceCommand(action, rosterAgent.name));