fix(fleet): guard mosaic fleet restart against tight-loop re-entry race
`mosaic fleet restart` runs as a fresh process each invocation, issuing `systemctl --user restart` for the tmux holder and then each agent. The agent sessions live inside the holder's tmux, so restarting the holder tears them down. With no mutual exclusion, a second restart entering while the first is still mid-teardown (upgrade `--relaunch`, a watchdog, or a hurried operator) interleaves: agents relaunch against a half-torn-down holder, fail, and tight-loop. Add a cross-process teardown-settle guard: a lock file under `<mosaicHome>/fleet/run/restart.lock` acquired with O_CREAT|O_EXCL. A re-entrant restart waits (bounded, injectable sleep) for the in-flight restart to release the lock before relaunching, breaks a stale lock left by a crashed owner, and after a max wait breaks the lock to avoid a permanent deadlock. Both full-fleet and single-agent restart paths are guarded; start/stop/status are unchanged. Regression test reproduces the race: with an in-flight lock held, the restart must wait before issuing any systemctl command — fails on the unguarded code path, passes with the guard. Adds stale-lock-break and lock-release coverage. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -45,6 +45,8 @@ import {
|
||||
removeAgentFromRoster,
|
||||
resolveFleetPaths,
|
||||
resolvePresetFilename,
|
||||
restartLockPath,
|
||||
RESTART_LOCK_STALE_MS,
|
||||
RUNTIME_ACCEPTABLE_COMMANDS,
|
||||
serializeRosterToYaml,
|
||||
VERIFY_DEFAULT_TIMEOUT_MS,
|
||||
@@ -678,6 +680,144 @@ describe('fleet command construction', () => {
|
||||
}
|
||||
});
|
||||
|
||||
it('waits for an in-flight restart to clear before relaunching (re-entry guard)', async () => {
|
||||
const home = await tempDir();
|
||||
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
||||
await mkdir(join(home, 'fleet'), { recursive: true });
|
||||
await writeFile(
|
||||
rosterPath,
|
||||
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
|
||||
'\n',
|
||||
),
|
||||
);
|
||||
|
||||
// Simulate another `mosaic fleet restart` process mid-teardown: a fresh lock
|
||||
// (recent timestamp, so it is NOT treated as stale) already held.
|
||||
const lockPath = restartLockPath(home);
|
||||
await mkdir(dirname(lockPath), { recursive: true });
|
||||
await writeFile(lockPath, `4242\n${Date.now()}\n`);
|
||||
|
||||
const events: string[] = [];
|
||||
const runner: CommandRunner = async (command, args) => {
|
||||
events.push(`run:${args[args.length - 1]}`);
|
||||
return { stdout: '', stderr: '', exitCode: 0 };
|
||||
};
|
||||
// The injected sleep stands in for time passing while we wait; the in-flight
|
||||
// restart "finishes" (releases its lock) after the first poll.
|
||||
let sleeps = 0;
|
||||
const sleepFn: SleepFn = async () => {
|
||||
sleeps += 1;
|
||||
events.push(`sleep:${sleeps}`);
|
||||
await rm(lockPath, { force: true });
|
||||
};
|
||||
|
||||
const program = new Command();
|
||||
program.exitOverride();
|
||||
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
|
||||
|
||||
try {
|
||||
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
|
||||
|
||||
// It must have waited at least once before issuing any systemctl restart.
|
||||
expect(sleeps).toBeGreaterThan(0);
|
||||
const firstSleep = events.findIndex((e) => e.startsWith('sleep:'));
|
||||
const firstRun = events.findIndex((e) => e.startsWith('run:'));
|
||||
expect(firstSleep).toBeGreaterThanOrEqual(0);
|
||||
expect(firstRun).toBeGreaterThan(firstSleep);
|
||||
|
||||
// And it still performs the full restart once the lock clears.
|
||||
expect(events).toContain('run:mosaic-tmux-holder.service');
|
||||
expect(events).toContain('run:mosaic-agent@coder0.service');
|
||||
|
||||
// The lock is released after the restart completes.
|
||||
await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' });
|
||||
} finally {
|
||||
await rm(home, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it('breaks a stale restart lock and proceeds without waiting', async () => {
|
||||
const home = await tempDir();
|
||||
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
||||
await mkdir(join(home, 'fleet'), { recursive: true });
|
||||
await writeFile(
|
||||
rosterPath,
|
||||
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
|
||||
'\n',
|
||||
),
|
||||
);
|
||||
|
||||
// A lock left behind by a crashed owner: timestamp older than the stale window.
|
||||
const lockPath = restartLockPath(home);
|
||||
await mkdir(dirname(lockPath), { recursive: true });
|
||||
await writeFile(lockPath, `4242\n${Date.now() - RESTART_LOCK_STALE_MS - 1_000}\n`);
|
||||
|
||||
const calls: string[][] = [];
|
||||
const runner: CommandRunner = async (command, args) => {
|
||||
calls.push([command, ...args]);
|
||||
return { stdout: '', stderr: '', exitCode: 0 };
|
||||
};
|
||||
const sleepFn = vi.fn<SleepFn>(async () => {});
|
||||
|
||||
const program = new Command();
|
||||
program.exitOverride();
|
||||
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
|
||||
|
||||
try {
|
||||
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
|
||||
|
||||
// Stale lock is broken immediately — no waiting.
|
||||
expect(sleepFn).not.toHaveBeenCalled();
|
||||
expect(calls).toEqual([
|
||||
['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'],
|
||||
['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'],
|
||||
]);
|
||||
// The stale lock is gone once the restart completes.
|
||||
await expect(readFile(lockPath, 'utf8')).rejects.toMatchObject({ code: 'ENOENT' });
|
||||
} finally {
|
||||
await rm(home, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it('releases the restart lock so a subsequent restart is not blocked', async () => {
|
||||
const home = await tempDir();
|
||||
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
||||
await mkdir(join(home, 'fleet'), { recursive: true });
|
||||
await writeFile(
|
||||
rosterPath,
|
||||
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
|
||||
'\n',
|
||||
),
|
||||
);
|
||||
|
||||
const calls: string[][] = [];
|
||||
const runner: CommandRunner = async (command, args) => {
|
||||
calls.push([command, ...args]);
|
||||
return { stdout: '', stderr: '', exitCode: 0 };
|
||||
};
|
||||
const sleepFn = vi.fn<SleepFn>(async () => {});
|
||||
|
||||
const program = new Command();
|
||||
program.exitOverride();
|
||||
registerFleetCommand(program, { runner, sleepFn, mosaicHome: home });
|
||||
|
||||
try {
|
||||
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
|
||||
await program.parseAsync(['node', 'mosaic', 'fleet', 'restart']);
|
||||
|
||||
// Two sequential restarts both run fully and neither has to wait.
|
||||
expect(sleepFn).not.toHaveBeenCalled();
|
||||
expect(calls).toEqual([
|
||||
['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'],
|
||||
['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'],
|
||||
['systemctl', '--user', 'restart', 'mosaic-tmux-holder.service'],
|
||||
['systemctl', '--user', 'restart', 'mosaic-agent@coder0.service'],
|
||||
]);
|
||||
} finally {
|
||||
await rm(home, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
it('attempts every agent and the holder during fleet stop even when an agent stop fails', async () => {
|
||||
const home = await tempDir();
|
||||
const rosterPath = join(home, 'fleet', 'roster.yaml');
|
||||
|
||||
Reference in New Issue
Block a user