diff --git a/packages/mosaic/src/commands/fleet.spec.ts b/packages/mosaic/src/commands/fleet.spec.ts index 9545170..28bf7b0 100644 --- a/packages/mosaic/src/commands/fleet.spec.ts +++ b/packages/mosaic/src/commands/fleet.spec.ts @@ -4,6 +4,7 @@ import { dirname, join, resolve } from 'node:path'; import { Command } from 'commander'; import { afterEach, describe, expect, it, vi } from 'vitest'; import { + acquireRestartLock, addAgentToRoster, buildAgentSendCommand, buildAgentWatchAttachCommand, @@ -818,6 +819,93 @@ describe('fleet command construction', () => { } }); + it('guards the single-agent restart path behind the in-flight restart lock', async () => { + const home = await tempDir(); + const rosterPath = join(home, 'fleet', 'roster.yaml'); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + rosterPath, + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + // A full restart is mid-flight (lock held); a single-agent restart re-enters. + const lockPath = restartLockPath(home); + await mkdir(dirname(lockPath), { recursive: true }); + await writeFile(lockPath, `4242\n${Date.now()}\n`); + + const events: string[] = []; + const runner: CommandRunner = async (command, args) => { + events.push(`run:${args[args.length - 1]}`); + return { stdout: '', stderr: '', exitCode: 0 }; + }; + let sleeps = 0; + const sleepFn: SleepFn = async () => { + sleeps += 1; + events.push(`sleep:${sleeps}`); + await rm(lockPath, { force: true }); + }; + + const program = new Command(); + program.exitOverride(); + registerFleetCommand(program, { runner, sleepFn, mosaicHome: home }); + + try { + await program.parseAsync(['node', 'mosaic', 'fleet', 'restart', 'coder0']); + + // The single-agent restart waits for the in-flight restart before acting. + expect(sleeps).toBeGreaterThan(0); + const firstSleep = events.findIndex((e) => e.startsWith('sleep:')); + const firstRun = events.findIndex((e) => e.startsWith('run:')); + expect(firstSleep).toBeGreaterThanOrEqual(0); + expect(firstRun).toBeGreaterThan(firstSleep); + // Only the named agent is restarted; the holder is untouched. + expect(events).toContain('run:mosaic-agent@coder0.service'); + expect(events).not.toContain('run:mosaic-tmux-holder.service'); + } finally { + await rm(home, { recursive: true, force: true }); + } + }); + + it('does not let a timed-out owner drop a lock another restart broke and re-owned', async () => { + const home = await tempDir(); + const runDir = join(home, 'fleet', 'run'); + await mkdir(runDir, { recursive: true }); + const lockPath = restartLockPath(home); + const tokenOf = async (): Promise => { + const raw = await readFile(lockPath, 'utf8'); + return raw.split('\n')[2]?.trim() ?? ''; + }; + const sleepFn = vi.fn(async () => {}); + + // R1 acquires the lock and begins a restart that then hangs. + const r1 = await acquireRestartLock(home, sleepFn); + const tokenR1 = await tokenOf(); + expect(tokenR1).not.toBe(''); + + // The hung R1 leaves a stale lock: rewrite its timestamp into the past while + // preserving R1's token — exactly the on-disk state a stuck owner leaves. + await writeFile(lockPath, `4242\n${Date.now() - RESTART_LOCK_STALE_MS - 1_000}\n${tokenR1}\n`); + + // R2 re-enters, sees the stale lock, and atomically takes ownership. + const r2 = await acquireRestartLock(home, sleepFn); + const tokenR2 = await tokenOf(); + expect(tokenR2).not.toBe(tokenR1); + expect(sleepFn).not.toHaveBeenCalled(); + + // R1 finally finishes and releases. It must NOT delete R2's lock — otherwise + // a third restart (R3) could acquire and interleave with R2 still running. + await r1.release(); + expect(await tokenOf()).toBe(tokenR2); + + // R2 releases cleanly and the lock is gone. + await r2.release(); + await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' }); + + await rm(home, { recursive: true, force: true }); + }); + it('attempts every agent and the holder during fleet stop even when an agent stop fails', async () => { const home = await tempDir(); const rosterPath = join(home, 'fleet', 'roster.yaml'); diff --git a/packages/mosaic/src/commands/fleet.ts b/packages/mosaic/src/commands/fleet.ts index d343640..08260a6 100644 --- a/packages/mosaic/src/commands/fleet.ts +++ b/packages/mosaic/src/commands/fleet.ts @@ -6,9 +6,11 @@ import { mkdir, open, readFile, + rename, unlink, writeFile, } from 'node:fs/promises'; +import { randomUUID } from 'node:crypto'; import { homedir, hostname, userInfo } from 'node:os'; import { dirname, join, resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; @@ -565,11 +567,32 @@ export function restartLockPath(mosaicHome: string): string { return join(mosaicHome, 'fleet', 'run', 'restart.lock'); } -/** A held restart lock; `release()` removes the lock file (idempotent). */ +/** A held restart lock; `release()` removes the lock file iff we still own it. */ interface RestartGuard { release(): Promise; } +/** Lock-file contents: pid (informational), timestamp, and a unique owner token. */ +function formatRestartLockContent(token: string): string { + return `${process.pid}\n${Date.now()}\n${token}\n`; +} + +/** + * Reads the owner token (line 3) from a lock file, or null if the file is + * missing/unreadable/tokenless. The token is what makes release and break + * ownership-safe: a process only ever acts on a lock whose token matches its own. + */ +async function readRestartLockToken(lockPath: string): Promise { + let raw: string; + try { + raw = await readFile(lockPath, 'utf8'); + } catch { + return null; + } + const token = raw.split('\n')[2]?.trim(); + return token ? token : null; +} + /** * Returns true when an existing lock file is stale: older than * RESTART_LOCK_STALE_MS, or unreadable/unparseable (a corrupt or partially @@ -594,6 +617,31 @@ async function isRestartLockStale(lockPath: string, now: number): Promise= RESTART_LOCK_STALE_MS; } +/** + * Atomically take over an existing (stale or timed-out) lock WITHOUT blind + * unlinking it: write our own token to a temp file and `rename()` it over the + * lock. rename is atomic, so it replaces the prior owner's content in one step + * rather than the unsafe unlink-then-recreate (which a third restart could slip + * between). Returns true only if our token is the one on disk afterwards — if a + * concurrent breaker raced and won, we read back their token and return false so + * the caller keeps waiting instead of assuming ownership. + */ +async function breakAndOwnRestartLock( + lockPath: string, + token: string, + content: string, +): Promise { + const tmpPath = `${lockPath}.${token}`; + await writeFile(tmpPath, content); + try { + await rename(tmpPath, lockPath); + } catch (err) { + await unlink(tmpPath).catch(() => {}); + throw err; + } + return (await readRestartLockToken(lockPath)) === token; +} + /** * Acquire the fleet restart lock, serializing concurrent `mosaic fleet restart` * invocations across processes. Each restart tears the tmux holder (and the @@ -603,22 +651,38 @@ async function isRestartLockStale(lockPath: string, now: number): Promise { +export async function acquireRestartLock( + mosaicHome: string, + sleepFn: SleepFn, +): Promise { + const token = randomUUID(); const lockPath = restartLockPath(mosaicHome); await mkdir(dirname(lockPath), { recursive: true }); const release = async (): Promise => { + // Ownership-safe: only remove the lock if it is still ours. If another + // caller broke and re-owned it (after a stale/timeout break), the token no + // longer matches and we must leave their lock intact. + if ((await readRestartLockToken(lockPath)) !== token) { + return; + } try { await unlink(lockPath); } catch { - // Already gone (broken as stale by another waiter, or never written) — fine. + // Raced away between the token check and unlink — nothing more to do. } }; const deadline = Date.now() + RESTART_LOCK_MAX_WAIT_MS; for (;;) { try { const handle = await open(lockPath, 'wx'); - await handle.writeFile(`${process.pid}\n${Date.now()}\n`); + await handle.writeFile(formatRestartLockContent(token)); await handle.close(); return { release }; } catch (err) { @@ -626,17 +690,20 @@ async function acquireRestartLock(mosaicHome: string, sleepFn: SleepFn): Promise throw err; } // A restart is already in flight (or its lock was left behind). - if (await isRestartLockStale(lockPath, Date.now())) { - process.stderr.write('Breaking stale fleet restart lock and proceeding.\n'); - await release(); - continue; - } - if (Date.now() >= deadline) { - process.stderr.write( - `Timed out after ${RESTART_LOCK_MAX_WAIT_MS}ms waiting for the in-flight fleet ` + - 'restart; breaking the lock and proceeding.\n', - ); - await release(); + const stale = await isRestartLockStale(lockPath, Date.now()); + const timedOut = Date.now() >= deadline; + if (stale || timedOut) { + if (await breakAndOwnRestartLock(lockPath, token, formatRestartLockContent(token))) { + process.stderr.write( + stale + ? 'Breaking stale fleet restart lock and proceeding.\n' + : `Timed out after ${RESTART_LOCK_MAX_WAIT_MS}ms waiting for the in-flight fleet ` + + 'restart; breaking the lock and proceeding.\n', + ); + return { release }; + } + // A concurrent breaker won the takeover; back off and re-evaluate. + await sleepFn(RESTART_LOCK_POLL_INTERVAL_MS); continue; } await sleepFn(RESTART_LOCK_POLL_INTERVAL_MS);