feat(fleet): orchestrator-mutable fleet — fleet add/remove (F5/R9) (#596)
Some checks are pending
ci/woodpecker/push/ci Pipeline is pending
ci/woodpecker/push/publish Pipeline is pending

This commit was merged in pull request #596.
This commit is contained in:
2026-06-21 23:26:21 +00:00
parent 60a309d5a4
commit 67df06f1c4
2 changed files with 677 additions and 1 deletions

View File

@@ -1,5 +1,5 @@
import { constants } from 'node:fs';
import { access, chmod, copyFile, mkdir, readFile, writeFile } from 'node:fs/promises';
import { access, chmod, copyFile, mkdir, readFile, unlink, writeFile } from 'node:fs/promises';
import { homedir, hostname, userInfo } from 'node:os';
import { dirname, join, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
@@ -1153,6 +1153,112 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
}
});
cmd
.command('add <name>')
.description('Add a new agent to the fleet roster and optionally start it')
.requiredOption('--runtime <runtime>', `Agent runtime (${VALID_FLEET_RUNTIMES.join(', ')})`)
.requiredOption('--class <class>', 'Agent class (e.g. worker, orchestrator, canary)')
.option('--model <hint>', 'Model hint for the agent')
.option('--working-dir <path>', 'Working directory for the agent')
.option('--no-start', 'Skip starting the agent after adding')
.action(
async (
name: string,
opts: {
runtime: string;
class: string;
model?: string;
workingDir?: string;
start: boolean;
},
) => {
if (!VALID_FLEET_RUNTIMES.includes(opts.runtime)) {
throw new Error(
`Invalid runtime "${opts.runtime}". Valid runtimes: ${VALID_FLEET_RUNTIMES.join(', ')}.`,
);
}
const commandOpts = cmd.opts<{ mosaicHome: string; roster?: string }>();
const activePaths = resolveFleetPaths(commandOpts.mosaicHome);
const rosterPath = await resolveRosterPath(commandOpts.mosaicHome, commandOpts.roster);
const roster = await loadFleetRoster(rosterPath);
const newAgent: FleetAgent = {
name,
runtime: opts.runtime,
className: opts.class,
...(opts.workingDir !== undefined && { workingDirectory: opts.workingDir }),
...(opts.model !== undefined && { modelHint: opts.model }),
};
const updatedRoster = addAgentToRoster(roster, newAgent);
await writeFile(rosterPath, serializeRosterToYaml(updatedRoster));
const envPath = join(activePaths.agentEnvDir, `${name}.env`);
const existingEnv = (await canRead(envPath)) ? await readFile(envPath, 'utf8') : undefined;
await mkdir(activePaths.agentEnvDir, { recursive: true });
await writeFile(
envPath,
mergeAgentEnv(generateAgentEnv(updatedRoster, newAgent), existingEnv),
);
console.log(`Added ${name} (${opts.runtime}/${opts.class}) to the fleet.`);
if (opts.start !== false) {
await runChecked(runner, buildFleetServiceCommand('start', name));
console.log(`Started mosaic-agent@${name}.service.`);
} else {
console.log(`Agent queued (--no-start); run: mosaic fleet start ${name}`);
}
},
);
cmd
.command('remove <name>')
.description('Remove an agent from the fleet roster')
.option('--keep-files', 'Skip deleting env and heartbeat files')
.action(async (name: string, opts: { keepFiles?: boolean }) => {
const commandOpts = cmd.opts<{ mosaicHome: string; roster?: string }>();
const activePaths = resolveFleetPaths(commandOpts.mosaicHome);
const rosterPath = await resolveRosterPath(commandOpts.mosaicHome, commandOpts.roster);
const roster = await loadFleetRoster(rosterPath);
// Guard: throws if removing leaves 0 orchestrators or agent not in roster
const updatedRoster = removeAgentFromRoster(roster, name);
// Stop agent (non-fatal)
try {
const stopResult = await runner(...splitCommand(buildFleetServiceCommand('stop', name)));
if (stopResult.exitCode !== 0) {
process.stderr.write(
`Warning: could not stop mosaic-agent@${name}.service: ${stopResult.stderr || stopResult.stdout || 'non-zero exit'}\n`,
);
}
} catch (err) {
process.stderr.write(
`Warning: stop command failed for ${name}: ${err instanceof Error ? err.message : String(err)}\n`,
);
}
// Write updated roster
await writeFile(rosterPath, serializeRosterToYaml(updatedRoster));
// Delete env and heartbeat files (best-effort, non-fatal)
if (!opts.keepFiles) {
try {
await unlink(join(activePaths.agentEnvDir, `${name}.env`));
} catch {
// best-effort
}
try {
await unlink(heartbeatPath(name, activePaths.mosaicHome));
} catch {
// best-effort
}
}
console.log(`Removed ${name} from the fleet.`);
});
return cmd;
}
@@ -1769,6 +1875,105 @@ export function countOrchestrators(roster: FleetRoster): number {
return roster.agents.filter((a) => a.className === 'orchestrator').length;
}
/** Valid runtime identifiers for fleet agents. */
export const VALID_FLEET_RUNTIMES: readonly string[] = [
'pi',
'claude',
'codex',
'opencode',
'dogfood',
];
/**
* Add a new agent to a fleet roster (immutable — returns a new FleetRoster).
* Throws on invalid name, duplicate name.
*/
export function addAgentToRoster(roster: FleetRoster, agent: FleetAgent): FleetRoster {
if (!agent.name || !/^[A-Za-z0-9_.-]+$/.test(agent.name)) {
throw new Error(`Invalid fleet agent name: ${agent.name || '<empty>'}`);
}
if (roster.agents.some((a) => a.name === agent.name)) {
throw new Error(`Agent "${agent.name}" already exists in the fleet roster.`);
}
return {
...roster,
agents: [...roster.agents, agent],
};
}
/**
* Remove an agent from a fleet roster (immutable — returns a new FleetRoster).
* Throws if the agent is not found, or if removal would leave zero orchestrators.
*/
export function removeAgentFromRoster(roster: FleetRoster, name: string): FleetRoster {
const agent = roster.agents.find((a) => a.name === name);
if (!agent) {
throw new Error(`Agent "${name}" is not in the fleet roster.`);
}
const remaining = roster.agents.filter((a) => a.name !== name);
const remainingOrchCount = remaining.filter((a) => a.className === 'orchestrator').length;
if (remainingOrchCount === 0 && agent.className === 'orchestrator') {
throw new Error(
`Cannot remove agent "${name}": it is the sole orchestrator. Add another orchestrator first (R5).`,
);
}
return {
...roster,
agents: remaining,
};
}
/**
* Serialize a FleetRoster to YAML text (snake_case keys).
* The output is parseable by loadFleetRoster.
*/
export function serializeRosterToYaml(roster: FleetRoster): string {
const agents = roster.agents.map((agent) => {
const raw: Record<string, unknown> = {
name: agent.name,
runtime: agent.runtime,
class: agent.className,
};
if (agent.workingDirectory !== undefined) {
raw['working_directory'] = agent.workingDirectory;
}
if (agent.modelHint !== undefined) {
raw['model_hint'] = agent.modelHint;
}
if (agent.persistentPersona !== undefined) {
raw['persistent_persona'] = agent.persistentPersona;
}
if (agent.resetBetweenTasks !== undefined) {
raw['reset_between_tasks'] = agent.resetBetweenTasks;
}
if (agent.kickstartTemplate !== undefined) {
raw['kickstart_template'] = agent.kickstartTemplate;
}
return raw;
});
const runtimes: Record<string, { reset_command: string }> = {};
for (const [runtime, config] of Object.entries(roster.runtimes)) {
runtimes[runtime] = { reset_command: config.resetCommand };
}
const raw: Record<string, unknown> = {
version: roster.version,
transport: roster.transport,
tmux: {
socket_name: roster.tmux.socketName,
holder_session: roster.tmux.holderSession,
},
defaults: {
working_directory: roster.defaults.workingDirectory,
},
runtimes,
agents,
};
return YAML.stringify(raw);
}
/**
* Prompt interactively for a fleet profile via stdin readline.
* AI-free: no LLM calls — pure readline menu.