diff --git a/packages/mosaic/src/commands/fleet.spec.ts b/packages/mosaic/src/commands/fleet.spec.ts index b1bf5ea..9545170 100644 --- a/packages/mosaic/src/commands/fleet.spec.ts +++ b/packages/mosaic/src/commands/fleet.spec.ts @@ -45,6 +45,8 @@ import { removeAgentFromRoster, resolveFleetPaths, resolvePresetFilename, + restartLockPath, + RESTART_LOCK_STALE_MS, RUNTIME_ACCEPTABLE_COMMANDS, serializeRosterToYaml, VERIFY_DEFAULT_TIMEOUT_MS, @@ -678,6 +680,144 @@ describe('fleet command construction', () => { } }); + it('waits for an in-flight restart to clear before relaunching (re-entry guard)', async () => { + const home = await tempDir(); + const rosterPath = join(home, 'fleet', 'roster.yaml'); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + rosterPath, + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + // Simulate another `mosaic fleet restart` process mid-teardown: a fresh lock + // (recent timestamp, so it is NOT treated as stale) already held. + const lockPath = restartLockPath(home); + await mkdir(dirname(lockPath), { recursive: true }); + await writeFile(lockPath, `4242\n${Date.now()}\n`); + + const events: string[] = []; + const runner: CommandRunner = async (command, args) => { + events.push(`run:${args[args.length - 1]}`); + return { stdout: '', stderr: '', exitCode: 0 }; + }; + // The injected sleep stands in for time passing while we wait; the in-flight + // restart "finishes" (releases its lock) after the first poll. + let sleeps = 0; + const sleepFn: SleepFn = async () => { + sleeps += 1; + events.push(`sleep:${sleeps}`); + await rm(lockPath, { force: true }); + }; + + const program = new Command(); + program.exitOverride(); + registerFleetCommand(program, { runner, sleepFn, mosaicHome: home }); + + try { + await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']); + + // It must have waited at least once before issuing any systemctl restart. + expect(sleeps).toBeGreaterThan(0); + const firstSleep = events.findIndex((e) => e.startsWith('sleep:')); + const firstRun = events.findIndex((e) => e.startsWith('run:')); + expect(firstSleep).toBeGreaterThanOrEqual(0); + expect(firstRun).toBeGreaterThan(firstSleep); + + // And it still performs the full restart once the lock clears. + expect(events).toContain('run:mosaic-tmux-holder.service'); + expect(events).toContain('run:mosaic-agent@coder0.service'); + + // The lock is released after the restart completes. + await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' }); + } finally { + await rm(home, { recursive: true, force: true }); + } + }); + + it('breaks a stale restart lock and proceeds without waiting', async () => { + const home = await tempDir(); + const rosterPath = join(home, 'fleet', 'roster.yaml'); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + rosterPath, + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + // A lock left behind by a crashed owner: timestamp older than the stale window. + const lockPath = restartLockPath(home); + await mkdir(dirname(lockPath), { recursive: true }); + await writeFile(lockPath, `4242\n${Date.now() - RESTART_LOCK_STALE_MS - 1_000}\n`); + + const calls: string[][] = []; + const runner: CommandRunner = async (command, args) => { + calls.push([command, ...args]); + return { stdout: '', stderr: '', exitCode: 0 }; + }; + const sleepFn = vi.fn(async () => {}); + + const program = new Command(); + program.exitOverride(); + registerFleetCommand(program, { runner, sleepFn, mosaicHome: home }); + + try { + await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']); + + // Stale lock is broken immediately — no waiting. + expect(sleepFn).not.toHaveBeenCalled(); + expect(calls).toEqual([ + ['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'], + ['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'], + ]); + // The stale lock is gone once the restart completes. + await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' }); + } finally { + await rm(home, { recursive: true, force: true }); + } + }); + + it('releases the restart lock so a subsequent restart is not blocked', async () => { + const home = await tempDir(); + const rosterPath = join(home, 'fleet', 'roster.yaml'); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + rosterPath, + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + const calls: string[][] = []; + const runner: CommandRunner = async (command, args) => { + calls.push([command, ...args]); + return { stdout: '', stderr: '', exitCode: 0 }; + }; + const sleepFn = vi.fn(async () => {}); + + const program = new Command(); + program.exitOverride(); + registerFleetCommand(program, { runner, sleepFn, mosaicHome: home }); + + try { + await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']); + await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']); + + // Two sequential restarts both run fully and neither has to wait. + expect(sleepFn).not.toHaveBeenCalled(); + expect(calls).toEqual([ + ['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'], + ['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'], + ['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'], + ['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'], + ]); + } finally { + await rm(home, { recursive: true, force: true }); + } + }); + it('attempts every agent and the holder during fleet stop even when an agent stop fails', async () => { const home = await tempDir(); const rosterPath = join(home, 'fleet', 'roster.yaml'); diff --git a/packages/mosaic/src/commands/fleet.ts b/packages/mosaic/src/commands/fleet.ts index 3149114..d343640 100644 --- a/packages/mosaic/src/commands/fleet.ts +++ b/packages/mosaic/src/commands/fleet.ts @@ -1,5 +1,14 @@ import { constants } from 'node:fs'; -import { access, chmod, copyFile, mkdir, readFile, unlink, writeFile } from 'node:fs/promises'; +import { + access, + chmod, + copyFile, + mkdir, + open, + readFile, + unlink, + writeFile, +} from 'node:fs/promises'; import { homedir, hostname, userInfo } from 'node:os'; import { dirname, join, resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; @@ -533,6 +542,108 @@ export function buildFleetServiceCommand(action: FleetServiceAction, agentName?: return ['systemctl', '--user', action, service]; } +/** Poll interval (ms) while waiting for an in-flight restart's lock to clear. */ +export const RESTART_LOCK_POLL_INTERVAL_MS = 250; +/** + * Maximum time (ms) a re-entrant restart waits for the in-flight restart to + * finish before it breaks the lock and proceeds anyway. A bound is required so + * a crashed holder of the lock can never deadlock the fleet permanently. + */ +export const RESTART_LOCK_MAX_WAIT_MS = 30_000; +/** + * Age (ms) past which a restart lock is treated as stale (its owner died + * without releasing it) and is broken immediately rather than waited on. + */ +export const RESTART_LOCK_STALE_MS = 60_000; + +/** + * Resolves the path of the cross-process restart lock for a given Mosaic home. + * Kept strictly under `/fleet/run` (not the heartbeat env override) + * so the lock is scoped to the same fleet the restart acts on. + */ +export function restartLockPath(mosaicHome: string): string { + return join(mosaicHome, 'fleet', 'run', 'restart.lock'); +} + +/** A held restart lock; `release()` removes the lock file (idempotent). */ +interface RestartGuard { + release(): Promise; +} + +/** + * Returns true when an existing lock file is stale: older than + * RESTART_LOCK_STALE_MS, or unreadable/unparseable (a corrupt or partially + * written lock left by a crashed owner). A vanished lock (ENOENT) is not stale — + * the next acquire attempt will simply succeed. + */ +async function isRestartLockStale(lockPath: string, now: number): Promise { + let raw: string; + try { + raw = await readFile(lockPath, 'utf8'); + } catch (err) { + if ((err as NodeJS.ErrnoException).code === 'ENOENT') { + return false; + } + return true; + } + const stampLine = raw.split('\n')[1] ?? ''; + const stamp = Number.parseInt(stampLine.trim(), 10); + if (!Number.isFinite(stamp)) { + return true; + } + return now - stamp >= RESTART_LOCK_STALE_MS; +} + +/** + * Acquire the fleet restart lock, serializing concurrent `mosaic fleet restart` + * invocations across processes. Each restart tears the tmux holder (and the + * agent sessions inside it) down and back up; without this guard a re-entrant + * restart relaunches agents against a half-torn-down holder, which fails and + * tight-loops. A re-entrant caller waits for the in-flight restart to release + * the lock (clean shutdown settled) before proceeding, breaks a stale lock left + * by a crashed owner, and after RESTART_LOCK_MAX_WAIT_MS breaks the lock to + * avoid a permanent deadlock. + */ +async function acquireRestartLock(mosaicHome: string, sleepFn: SleepFn): Promise { + const lockPath = restartLockPath(mosaicHome); + await mkdir(dirname(lockPath), { recursive: true }); + const release = async (): Promise => { + try { + await unlink(lockPath); + } catch { + // Already gone (broken as stale by another waiter, or never written) — fine. + } + }; + const deadline = Date.now() + RESTART_LOCK_MAX_WAIT_MS; + for (;;) { + try { + const handle = await open(lockPath, 'wx'); + await handle.writeFile(`${process.pid}\n${Date.now()}\n`); + await handle.close(); + return { release }; + } catch (err) { + if ((err as NodeJS.ErrnoException).code !== 'EEXIST') { + throw err; + } + // A restart is already in flight (or its lock was left behind). + if (await isRestartLockStale(lockPath, Date.now())) { + process.stderr.write('Breaking stale fleet restart lock and proceeding.\n'); + await release(); + continue; + } + if (Date.now() >= deadline) { + process.stderr.write( + `Timed out after ${RESTART_LOCK_MAX_WAIT_MS}ms waiting for the in-flight fleet ` + + 'restart; breaking the lock and proceeding.\n', + ); + await release(); + continue; + } + await sleepFn(RESTART_LOCK_POLL_INTERVAL_MS); + } + } +} + /** * Returns the systemctl --user enable command for a given unit. * Used by the install auto-enable step to persist units across reboots. @@ -1172,6 +1283,7 @@ export function isSendAccepted(capturedOutput: string): SendVerifyResult { export function registerFleetCommand(program: Command, deps: FleetCommandDeps = {}): Command { const runner = deps.runner ?? runCommand; + const sleepFn = deps.sleepFn ?? defaultSleep; const paths = resolveFleetPaths(deps.mosaicHome); const frameworkRoot = deps.frameworkRoot ?? resolveFrameworkRoot(); @@ -1285,9 +1397,22 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps = .command(`${action} [agent]`) .description(`${action} the fleet holder or one agent`) .action(async (agent?: string) => { + const commandOpts = cmd.opts<{ mosaicHome: string; roster?: string }>(); + const activePaths = resolveFleetPaths(commandOpts.mosaicHome); const roster = await loadRosterForCommand(cmd); if (agent) { getRosterAgent(roster, agent); + // Single-agent restart is guarded too: it can race a full restart that + // is tearing the shared holder down. + if (action === 'restart') { + const guard = await acquireRestartLock(activePaths.mosaicHome, sleepFn); + try { + await runChecked(runner, buildFleetServiceCommand(action, agent)); + } finally { + await guard.release(); + } + return; + } await runChecked(runner, buildFleetServiceCommand(action, agent)); return; } @@ -1298,6 +1423,21 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps = ); return; } + if (action === 'restart') { + // Serialize the holder+agents teardown/relaunch behind the restart lock + // so a re-entrant restart waits for clean shutdown before relaunching, + // instead of racing a half-torn-down holder into a tight loop. + const guard = await acquireRestartLock(activePaths.mosaicHome, sleepFn); + try { + await runChecked(runner, buildFleetServiceCommand(action)); + for (const rosterAgent of roster.agents) { + await runChecked(runner, buildFleetServiceCommand(action, rosterAgent.name)); + } + } finally { + await guard.release(); + } + return; + } await runChecked(runner, buildFleetServiceCommand(action)); for (const rosterAgent of roster.agents) { await runChecked(runner, buildFleetServiceCommand(action, rosterAgent.name));