feat(fleet): add local canary CLI (#563)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/push/publish Pipeline was successful

This commit was merged in pull request #563.
This commit is contained in:
2026-06-20 17:49:01 +00:00
parent 45e2c2aad8
commit 6dfd78f643
15 changed files with 1835 additions and 3 deletions

View File

@@ -13,6 +13,7 @@ import { registerStorageCommand } from '@mosaicstack/storage';
import { registerTelemetryCommand } from './commands/telemetry.js';
import { registerAgentCommand } from './commands/agent.js';
import { registerConfigCommand } from './commands/config.js';
import { registerFleetCommand } from './commands/fleet.js';
import { registerMissionCommand } from './commands/mission.js';
import { registerUninstallCommand } from './commands/uninstall.js';
// prdy is registered via launch.ts
@@ -57,7 +58,7 @@ Command Groups:
Runtime: tui, login, sessions
Gateway: gateway
Framework: agent, bootstrap, coord, doctor, init, launch, mission, prdy, seq, sync, upgrade, wizard, yolo
Framework: agent, bootstrap, coord, doctor, fleet, init, launch, mission, prdy, seq, sync, upgrade, wizard, yolo
Platform: update
Runtimes: claude, codex, opencode, pi
`,
@@ -345,6 +346,10 @@ registerFederationCommand(program);
registerAgentCommand(program);
// ─── fleet ─────────────────────────────────────────────────────────────
registerFleetCommand(program);
// ─── config ────────────────────────────────────────────────────────────
registerConfigCommand(program);

View File

@@ -1,4 +1,5 @@
import type { Command } from 'commander';
import { registerFleetAgentCommands, type FleetCommandDeps } from './fleet.js';
import { withAuth } from './with-auth.js';
import { selectItem } from './select-dialog.js';
import {
@@ -30,11 +31,13 @@ function showAgentDetail(a: AgentConfigInfo) {
console.log(` Created: ${new Date(a.createdAt).toLocaleString()}`);
}
export function registerAgentCommand(program: Command) {
export function registerAgentCommand(program: Command, fleetDeps: FleetCommandDeps = {}) {
const cmd = program
.command('agent')
.description('Manage agent configurations')
.description('Manage agent configurations and local fleet agents')
.option('-g, --gateway <url>', 'Gateway URL', 'http://localhost:14242')
.option('--mosaic-home <path>', 'Mosaic home directory')
.option('--roster <path>', 'Local fleet roster path')
.option('--list', 'List all agents')
.option('--new', 'Create a new agent')
.option('--show <idOrName>', 'Show agent details')
@@ -72,6 +75,8 @@ export function registerAgentCommand(program: Command) {
},
);
registerFleetAgentCommands(cmd, fleetDeps);
return cmd;
}

View File

@@ -0,0 +1,563 @@
import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { dirname, join, resolve } from 'node:path';
import { Command } from 'commander';
import { afterEach, describe, expect, it, vi } from 'vitest';
import {
buildAgentSendCommand,
buildFleetServiceCommand,
generateAgentEnv,
getRosterAgent,
loadFleetRoster,
registerFleetCommand,
resolveFleetPaths,
type CommandRunner,
} from './fleet.js';
import { registerAgentCommand } from './agent.js';
function buildProgram(): Command {
const program = new Command();
program.exitOverride();
registerFleetCommand(program);
registerAgentCommand(program);
return program;
}
async function tempDir(): Promise<string> {
return mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
}
describe('registerFleetCommand', () => {
it('registers local canary fleet subcommands', () => {
const program = buildProgram();
const fleet = program.commands.find((command) => command.name() === 'fleet');
expect(fleet).toBeDefined();
expect(fleet!.commands.map((command) => command.name()).sort()).toEqual([
'init',
'install',
'install-systemd',
'restart',
'start',
'status',
'stop',
'verify',
]);
});
it('adds fleet-backed agent subcommands without removing existing options', () => {
const program = buildProgram();
const agent = program.commands.find((command) => command.name() === 'agent');
expect(agent).toBeDefined();
expect(agent!.options.map((option) => option.long)).toContain('--list');
expect(agent!.commands.map((command) => command.name()).sort()).toEqual([
'reset',
'roster',
'send',
'status',
'tail',
]);
});
});
describe('fleet roster parsing', () => {
let cleanup: string | undefined;
afterEach(async () => {
if (cleanup) {
await rm(cleanup, { recursive: true, force: true });
cleanup = undefined;
}
});
it('defaults local canary rosters to the isolated mosaic-factory socket', async () => {
cleanup = await tempDir();
const rosterPath = join(cleanup, 'roster.yaml');
await writeFile(
rosterPath,
[
'version: 1',
'transport: tmux',
'agents:',
' - name: canary-pi',
' runtime: pi',
' class: canary',
].join('\n'),
);
const roster = await loadFleetRoster(rosterPath);
expect(roster.tmux.socketName).toBe('mosaic-factory');
expect(roster.tmux.holderSession).toBe('_holder');
expect(roster.agents).toHaveLength(1);
expect(getRosterAgent(roster, 'canary-pi').runtime).toBe('pi');
});
it('generates deterministic per-agent EnvironmentFile content', async () => {
cleanup = await tempDir();
const rosterPath = join(cleanup, 'roster.json');
await writeFile(
rosterPath,
JSON.stringify({
version: 1,
transport: 'tmux',
tmux: { socket_name: 'mosaic-factory' },
defaults: { working_directory: '/srv/mosaic' },
agents: [{ name: 'coder0', runtime: 'codex', class: 'implementer' }],
}),
);
const roster = await loadFleetRoster(rosterPath);
expect(generateAgentEnv(roster, getRosterAgent(roster, 'coder0'))).toBe(
[
'MOSAIC_AGENT_NAME=coder0',
'MOSAIC_AGENT_RUNTIME=codex',
'MOSAIC_AGENT_WORKDIR=/srv/mosaic',
'MOSAIC_TMUX_SOCKET=mosaic-factory',
'',
].join('\n'),
);
});
it('rejects unknown roster fields instead of silently defaulting', async () => {
cleanup = await tempDir();
const rosterPath = join(cleanup, 'roster.yaml');
await writeFile(
rosterPath,
[
'version: 1',
'transport: tmux',
'tmux:',
' socketNamee: prod-fleet',
'agents:',
' - name: canary-pi',
' runtime: pi',
].join('\n'),
);
await expect(loadFleetRoster(rosterPath)).rejects.toThrow(
'Fleet roster tmux has unknown field(s): socketNamee.',
);
});
it('rejects wrong-typed roster fields instead of silently defaulting', async () => {
cleanup = await tempDir();
const rosterPath = join(cleanup, 'roster.json');
await writeFile(
rosterPath,
JSON.stringify({
version: 1,
transport: 'tmux',
tmux: { socket_name: 123 },
defaults: { working_directory: '/srv/mosaic' },
agents: [{ name: 'canary-pi', runtime: 'pi' }],
}),
);
await expect(loadFleetRoster(rosterPath)).rejects.toThrow(
'Fleet roster tmux socket_name must be a string.',
);
});
it('rejects wrong-typed agent fields', async () => {
cleanup = await tempDir();
const rosterPath = join(cleanup, 'roster.json');
await writeFile(
rosterPath,
JSON.stringify({
version: 1,
transport: 'tmux',
agents: [{ name: 'canary-pi', runtime: 42 }],
}),
);
await expect(loadFleetRoster(rosterPath)).rejects.toThrow(
'Fleet roster agent "canary-pi" runtime must be a string.',
);
});
it('rejects duplicate agent names before install can overwrite env files', async () => {
cleanup = await tempDir();
const rosterPath = join(cleanup, 'roster.yaml');
await writeFile(
rosterPath,
[
'version: 1',
'transport: tmux',
'agents:',
' - name: canary-pi',
' runtime: pi',
' - name: canary-pi',
' runtime: codex',
].join('\n'),
);
await expect(loadFleetRoster(rosterPath)).rejects.toThrow(
'Fleet roster has duplicate agent name: canary-pi.',
);
});
it('ships generic minimal and local-canary examples without site-specific defaults', async () => {
const examplesDir = resolve(process.cwd(), 'framework', 'fleet', 'examples');
const minimal = await loadFleetRoster(join(examplesDir, 'minimal.yaml'));
const localCanaryText = await readFile(join(examplesDir, 'local-canary.yaml'), 'utf8');
const localCanary = await loadFleetRoster(join(examplesDir, 'local-canary.yaml'));
expect(minimal.agents.map((agent) => agent.name)).toEqual(['canary-pi']);
expect(localCanary.tmux.socketName).toBe('mosaic-factory');
expect(localCanary.agents.map((agent) => agent.name)).toEqual(['lead', 'coder0', 'reviewer0']);
expect(localCanaryText).not.toMatch(/usc|ultron|secrev/i);
});
});
describe('fleet command construction', () => {
it('builds exact systemd user commands for holder and agent operations', () => {
expect(buildFleetServiceCommand('status')).toEqual([
'systemctl',
'--user',
'status',
'mosaic-tmux-holder.service',
]);
expect(buildFleetServiceCommand('restart', 'coder0')).toEqual([
'systemctl',
'--user',
'restart',
'mosaic-agent@coder0.service',
]);
});
it('builds socket-scoped agent send commands', () => {
const paths = resolveFleetPaths('/home/test/.config/mosaic');
expect(buildAgentSendCommand(paths, 'coder0', 'hello', 'mosaic-factory')).toEqual([
'/home/test/.config/mosaic/tools/tmux/agent-send.sh',
'-L',
'mosaic-factory',
'-s',
'coder0',
'-m',
'hello',
]);
});
it('runs fleet status through injected runner without touching tmux in tests', async () => {
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
return { stdout: 'ok\n', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner });
await program.parseAsync(['node', 'mosaic', 'fleet', 'status']);
expect(calls).toEqual([['systemctl', '--user', 'status', 'mosaic-tmux-holder.service']]);
});
it('writes init output to the explicit roster path', async () => {
const home = await tempDir();
const rosterPath = join(home, 'custom', 'roster.yaml');
const frameworkRoot = resolve(process.cwd(), 'framework');
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { frameworkRoot, mosaicHome: home });
try {
await program.parseAsync([
'node',
'mosaic',
'fleet',
'--roster',
rosterPath,
'init',
'--profile',
'minimal',
'--write',
]);
const content = await readFile(rosterPath, 'utf8');
expect(content).toContain('name: canary-pi');
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('refuses to overwrite an existing roster unless --force is provided', async () => {
const home = await tempDir();
const rosterPath = join(home, 'custom', 'roster.yaml');
await mkdir(dirname(rosterPath), { recursive: true });
await writeFile(rosterPath, 'site-owned: true\n');
const frameworkRoot = resolve(process.cwd(), 'framework');
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { frameworkRoot, mosaicHome: home });
try {
await expect(
program.parseAsync([
'node',
'mosaic',
'fleet',
'--roster',
rosterPath,
'init',
'--profile',
'minimal',
'--write',
]),
).rejects.toThrow('Fleet roster already exists');
expect(await readFile(rosterPath, 'utf8')).toBe('site-owned: true\n');
await program.parseAsync([
'node',
'mosaic',
'fleet',
'--roster',
rosterPath,
'init',
'--profile',
'minimal',
'--write',
'--force',
]);
expect(await readFile(rosterPath, 'utf8')).toContain('name: canary-pi');
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('rejects unknown init profiles instead of silently falling back', async () => {
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { frameworkRoot: resolve(process.cwd(), 'framework') });
await expect(
program.parseAsync(['node', 'mosaic', 'fleet', 'init', '--profile', 'typo']),
).rejects.toThrow('Unsupported fleet profile');
});
it('sets process exitCode when status runner fails', async () => {
const originalExitCode = process.exitCode;
const stderrSpy = vi.spyOn(process.stderr, 'write').mockImplementation(() => true);
const runner: CommandRunner = async () => ({ stdout: '', stderr: 'missing\n', exitCode: 3 });
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner });
try {
await program.parseAsync(['node', 'mosaic', 'fleet', 'status']);
expect(process.exitCode).toBe(3);
} finally {
process.exitCode = originalExitCode;
stderrSpy.mockRestore();
}
});
it('loads default fleet/roster.json when roster.yaml is absent', async () => {
const home = await tempDir();
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
join(home, 'fleet', 'roster.json'),
JSON.stringify({
version: 1,
transport: 'tmux',
agents: [{ name: 'json-canary', runtime: 'pi' }],
}),
);
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
return { stdout: '', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'fleet', 'status', 'json-canary']);
expect(calls).toEqual([
['systemctl', '--user', 'status', 'mosaic-agent@json-canary.service'],
]);
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('starts the holder before agents and stops agents before the holder', async () => {
const home = await tempDir();
const rosterPath = join(home, 'fleet', 'roster.yaml');
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
rosterPath,
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
return { stdout: '', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'fleet', 'start']);
await program.parseAsync(['node', 'mosaic', 'fleet', 'stop']);
expect(calls).toEqual([
['systemctl', '--user', 'start', 'mosaic-tmux-holder.service'],
['systemctl', '--user', 'start', 'mosaic-agent@coder0.service'],
['systemctl', '--user', 'stop', 'mosaic-agent@coder0.service'],
['systemctl', '--user', 'stop', 'mosaic-tmux-holder.service'],
]);
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('attempts every agent and the holder during fleet stop even when an agent stop fails', async () => {
const home = await tempDir();
const rosterPath = join(home, 'fleet', 'roster.yaml');
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
rosterPath,
[
'version: 1',
'transport: tmux',
'agents:',
' - name: coder0',
' runtime: codex',
' - name: reviewer0',
' runtime: pi',
].join('\n'),
);
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
if (args.includes('mosaic-agent@coder0.service')) {
return { stdout: '', stderr: 'coder0 failed\n', exitCode: 1 };
}
return { stdout: '', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner, mosaicHome: home });
try {
await expect(program.parseAsync(['node', 'mosaic', 'fleet', 'stop'])).rejects.toThrow(
'Fleet stop completed with 1 failure(s)',
);
expect(calls).toEqual([
['systemctl', '--user', 'stop', 'mosaic-agent@coder0.service'],
['systemctl', '--user', 'stop', 'mosaic-agent@reviewer0.service'],
['systemctl', '--user', 'stop', 'mosaic-tmux-holder.service'],
]);
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('rejects install-systemd with a non-default Mosaic home because units use %h/.config/mosaic', async () => {
const home = await tempDir();
const program = new Command();
program.exitOverride();
registerFleetCommand(program, {
mosaicHome: home,
frameworkRoot: resolve(process.cwd(), 'framework'),
});
try {
await expect(
program.parseAsync(['node', 'mosaic', 'fleet', 'install-systemd']),
).rejects.toThrow('install-systemd only supports the default Mosaic home');
} finally {
await rm(home, { recursive: true, force: true });
}
});
it.each(['start', 'stop', 'restart', 'status'] as const)(
'rejects single-agent %s for agents outside the roster',
async (action) => {
const home = await tempDir();
const rosterPath = join(home, 'fleet', 'roster.yaml');
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
rosterPath,
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const runner = vi.fn<CommandRunner>(async () => ({ stdout: '', stderr: '', exitCode: 0 }));
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner, mosaicHome: home });
try {
await expect(
program.parseAsync(['node', 'mosaic', 'fleet', action, 'typo']),
).rejects.toThrow('Agent "typo" is not in the fleet roster');
expect(runner).not.toHaveBeenCalled();
} finally {
await rm(home, { recursive: true, force: true });
}
},
);
it('loads default fleet/roster.json for agent commands when roster.yaml is absent', async () => {
const home = await tempDir();
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
join(home, 'fleet', 'roster.json'),
JSON.stringify({
version: 1,
transport: 'tmux',
agents: [{ name: 'json-agent', runtime: 'pi' }],
}),
);
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
return { stdout: '', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerAgentCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'agent', 'status', 'json-agent']);
expect(calls).toEqual([
['tmux', '-L', 'mosaic-factory', 'has-session', '-t', '=json-agent:0.0'],
]);
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('rejects agent status typos before invoking the runner', async () => {
const home = await tempDir();
const rosterPath = join(home, 'fleet', 'roster.yaml');
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
rosterPath,
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const runner = vi.fn<CommandRunner>(async () => ({ stdout: '', stderr: '', exitCode: 0 }));
const program = new Command();
program.exitOverride();
registerAgentCommand(program, { runner, mosaicHome: home });
try {
await expect(
program.parseAsync(['node', 'mosaic', 'agent', 'status', 'typo']),
).rejects.toThrow('Agent "typo" is not in the fleet roster');
expect(runner).not.toHaveBeenCalled();
} finally {
await rm(home, { recursive: true, force: true });
}
});
});

View File

@@ -0,0 +1,851 @@
import { constants } from 'node:fs';
import { access, copyFile, mkdir, readFile, writeFile } from 'node:fs/promises';
import { homedir } from 'node:os';
import { dirname, join, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
import { spawn } from 'node:child_process';
import type { Command } from 'commander';
import YAML from 'yaml';
export interface CommandResult {
stdout: string;
stderr: string;
exitCode: number;
}
export type CommandRunner = (command: string, args: string[]) => Promise<CommandResult>;
export interface FleetCommandDeps {
runner?: CommandRunner;
mosaicHome?: string;
frameworkRoot?: string;
}
interface RawFleetRoster {
version?: unknown;
transport?: unknown;
tmux?: {
socket_name?: unknown;
socketName?: unknown;
holder_session?: unknown;
holderSession?: unknown;
};
defaults?: {
working_directory?: unknown;
workingDirectory?: unknown;
};
runtimes?: Record<string, { reset_command?: unknown; resetCommand?: unknown }>;
agents?: Array<{
name?: unknown;
runtime?: unknown;
class?: unknown;
working_directory?: unknown;
workingDirectory?: unknown;
model_hint?: unknown;
modelHint?: unknown;
persistent_persona?: unknown;
persistentPersona?: unknown;
reset_between_tasks?: unknown;
resetBetweenTasks?: unknown;
kickstart_template?: unknown;
kickstartTemplate?: unknown;
}>;
}
export interface FleetAgent {
name: string;
runtime: string;
className: string;
workingDirectory?: string;
modelHint?: string;
persistentPersona?: boolean | string;
resetBetweenTasks?: boolean;
kickstartTemplate?: string;
}
export interface FleetRoster {
version: 1;
transport: 'tmux';
tmux: {
socketName: string;
holderSession: string;
};
defaults: {
workingDirectory: string;
};
runtimes: Record<string, { resetCommand: string }>;
agents: FleetAgent[];
}
export interface FleetPaths {
mosaicHome: string;
rosterPath: string;
toolsDir: string;
fleetToolsDir: string;
tmuxToolsDir: string;
systemdUserDir: string;
agentEnvDir: string;
}
type FleetServiceAction = 'start' | 'stop' | 'restart' | 'status';
const DEFAULT_SOCKET_NAME = 'mosaic-factory';
const DEFAULT_HOLDER_SESSION = '_holder';
const DEFAULT_WORKING_DIRECTORY = '~/src';
const DEFAULT_RUNTIME_RESETS: Record<string, { resetCommand: string }> = {
claude: { resetCommand: '/clear' },
codex: { resetCommand: '/clear' },
opencode: { resetCommand: '/clear' },
pi: { resetCommand: '/new' },
};
export function resolveFleetPaths(mosaicHome = defaultMosaicHome()): FleetPaths {
return {
mosaicHome,
rosterPath: join(mosaicHome, 'fleet', 'roster.yaml'),
toolsDir: join(mosaicHome, 'tools'),
fleetToolsDir: join(mosaicHome, 'tools', 'fleet'),
tmuxToolsDir: join(mosaicHome, 'tools', 'tmux'),
systemdUserDir: join(homedir(), '.config', 'systemd', 'user'),
agentEnvDir: join(mosaicHome, 'fleet', 'agents'),
};
}
function defaultMosaicHome(): string {
return join(homedir(), '.config', 'mosaic');
}
function assertDefaultMosaicHomeForSystemd(mosaicHome: string): void {
if (resolve(mosaicHome) !== resolve(defaultMosaicHome())) {
throw new Error(
`install-systemd only supports the default Mosaic home (${defaultMosaicHome()}) because the user systemd units use %h/.config/mosaic paths.`,
);
}
}
export async function loadFleetRoster(path: string): Promise<FleetRoster> {
const rawText = await readFile(path, 'utf8');
const parsed = parseRosterText(rawText, path);
return normalizeRoster(parsed);
}
export function getRosterAgent(roster: FleetRoster, name: string): FleetAgent {
const agent = roster.agents.find((candidate) => candidate.name === name);
if (!agent) {
throw new Error(`Agent "${name}" is not in the fleet roster.`);
}
return agent;
}
export function generateAgentEnv(roster: FleetRoster, agent: FleetAgent): string {
const workingDirectory = agent.workingDirectory ?? roster.defaults.workingDirectory;
return [
`MOSAIC_AGENT_NAME=${shellEnvValue(agent.name)}`,
`MOSAIC_AGENT_RUNTIME=${shellEnvValue(agent.runtime)}`,
`MOSAIC_AGENT_WORKDIR=${shellEnvValue(expandHome(workingDirectory))}`,
`MOSAIC_TMUX_SOCKET=${shellEnvValue(roster.tmux.socketName)}`,
'',
].join('\n');
}
export function buildFleetServiceCommand(action: FleetServiceAction, agentName?: string): string[] {
const service = agentName ? `mosaic-agent@${agentName}.service` : 'mosaic-tmux-holder.service';
return ['systemctl', '--user', action, service];
}
export function buildAgentSendCommand(
paths: FleetPaths,
agentName: string,
message: string,
socketName = DEFAULT_SOCKET_NAME,
): string[] {
return [
join(paths.tmuxToolsDir, 'agent-send.sh'),
'-L',
socketName,
'-s',
agentName,
'-m',
message,
];
}
export function buildAgentResetCommand(
paths: FleetPaths,
agentName: string,
resetCommand: string,
socketName = DEFAULT_SOCKET_NAME,
): string[] {
return [
join(paths.tmuxToolsDir, 'send-message.sh'),
'-L',
socketName,
'-t',
`=${agentName}`,
'-m',
resetCommand,
];
}
export function buildAgentTailCommand(
agentName: string,
lines: number,
socketName = DEFAULT_SOCKET_NAME,
): string[] {
return [
'tmux',
'-L',
socketName,
'capture-pane',
'-t',
`=${agentName}:0.0`,
'-p',
'-S',
`-${lines}`,
];
}
export function registerFleetCommand(program: Command, deps: FleetCommandDeps = {}): Command {
const runner = deps.runner ?? runCommand;
const paths = resolveFleetPaths(deps.mosaicHome);
const frameworkRoot = deps.frameworkRoot ?? resolveFrameworkRoot();
const cmd = program
.command('fleet')
.description('Manage the local Mosaic tmux fleet canary')
.option('--mosaic-home <path>', 'Mosaic home directory', paths.mosaicHome)
.option('--roster <path>', 'Fleet roster path');
cmd
.command('init')
.description('Initialize a local fleet roster')
.option('--profile <name>', 'Roster profile: minimal or local-canary', 'minimal')
.option('--write', 'Write the roster to Mosaic home')
.option('--force', 'Overwrite an existing roster when used with --write')
.action(async (opts: { profile: string; write?: boolean; force?: boolean }) => {
const commandOpts = cmd.opts<{ mosaicHome: string; roster?: string }>();
const activePaths = resolveFleetPaths(commandOpts.mosaicHome);
const profile = parseInitProfile(opts.profile);
const source = join(frameworkRoot, 'fleet', 'examples', `${profile}.yaml`);
const content = await readFile(source, 'utf8');
if (!opts.write) {
console.log(content.trimEnd());
return;
}
const destination = commandOpts.roster ?? activePaths.rosterPath;
if (!opts.force && (await canRead(destination))) {
throw new Error(
`Fleet roster already exists: ${destination}. Re-run with --force to overwrite.`,
);
}
await mkdir(dirname(destination), { recursive: true });
await writeFile(destination, content);
console.log(`Wrote fleet roster: ${destination}`);
});
cmd
.command('install')
.description('Install local fleet tools and user systemd units')
.action(async () => installFleet(cmd, frameworkRoot));
cmd
.command('install-systemd')
.description('Install local fleet tools and user systemd units')
.action(async () => installFleet(cmd, frameworkRoot));
for (const action of ['start', 'stop', 'restart'] as const) {
cmd
.command(`${action} [agent]`)
.description(`${action} the fleet holder or one agent`)
.action(async (agent?: string) => {
const roster = await loadRosterForCommand(cmd);
if (agent) {
getRosterAgent(roster, agent);
await runChecked(runner, buildFleetServiceCommand(action, agent));
return;
}
if (action === 'stop') {
await stopFleetBestEffort(
runner,
roster.agents.map((rosterAgent) => rosterAgent.name),
);
return;
}
await runChecked(runner, buildFleetServiceCommand(action));
for (const rosterAgent of roster.agents) {
await runChecked(runner, buildFleetServiceCommand(action, rosterAgent.name));
}
});
}
cmd
.command('status [agent]')
.description('Show fleet holder or agent systemd status')
.option('--json', 'Print JSON status')
.action(async (agent: string | undefined, opts: { json?: boolean }) => {
if (agent) {
const roster = await loadRosterForCommand(cmd);
getRosterAgent(roster, agent);
}
const result = await runner(...splitCommand(buildFleetServiceCommand('status', agent)));
if (opts.json) {
console.log(
JSON.stringify({
exitCode: result.exitCode,
stdout: result.stdout,
stderr: result.stderr,
}),
);
setExitCodeFromResult(result);
return;
}
writeCommandOutput(result);
});
cmd
.command('verify')
.description('Verify the local canary holder and roster sessions on the isolated socket')
.action(async () => {
const roster = await loadRosterForCommand(cmd);
const socketName = roster.tmux.socketName;
await runChecked(runner, [
'tmux',
'-L',
socketName,
'has-session',
'-t',
`=${roster.tmux.holderSession}:0.0`,
]);
for (const agent of roster.agents) {
await runChecked(runner, [
'tmux',
'-L',
socketName,
'has-session',
'-t',
`=${agent.name}:0.0`,
]);
}
console.log(`Verified fleet on tmux socket ${socketName}.`);
});
return cmd;
}
export function registerFleetAgentCommands(
agentCommand: Command,
deps: FleetCommandDeps = {},
): void {
const runner = deps.runner ?? runCommand;
agentCommand
.command('roster')
.description('List agents from the local fleet roster')
.option('--json', 'Print JSON')
.action(async (opts: { json?: boolean }) => {
const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome);
if (opts.json) {
console.log(JSON.stringify(roster, null, 2));
return;
}
for (const agent of roster.agents) {
console.log(`${agent.name}\t${agent.runtime}\t${agent.className}`);
}
});
agentCommand
.command('status [agent]')
.description('Show tmux status for the local fleet or one agent')
.option('--json', 'Print JSON')
.action(async (agent: string | undefined, opts: { json?: boolean }) => {
const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome);
if (agent) {
getRosterAgent(roster, agent);
}
const command = agent
? ['tmux', '-L', roster.tmux.socketName, 'has-session', '-t', `=${agent}:0.0`]
: ['tmux', '-L', roster.tmux.socketName, 'ls'];
const result = await runner(...splitCommand(command));
if (opts.json) {
console.log(
JSON.stringify({
exitCode: result.exitCode,
stdout: result.stdout,
stderr: result.stderr,
}),
);
setExitCodeFromResult(result);
return;
}
writeCommandOutput(result);
});
agentCommand
.command('send <agent>')
.description('Send a message to a local fleet agent')
.requiredOption('--message <text>', 'Message text')
.action(async (agent: string, opts: { message: string }) => {
const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome);
getRosterAgent(roster, agent);
const paths = resolveFleetPaths(resolveMosaicHomeFromCommand(agentCommand, deps.mosaicHome));
await runChecked(
runner,
buildAgentSendCommand(paths, agent, opts.message, roster.tmux.socketName),
);
});
agentCommand
.command('reset <agent>')
.description('Reset a local fleet agent by sending the runtime reset command')
.option('--clear', 'Send /clear')
.option('--new', 'Send /new')
.action(async (agent: string, opts: { clear?: boolean; new?: boolean }) => {
const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome);
const rosterAgent = getRosterAgent(roster, agent);
const paths = resolveFleetPaths(resolveMosaicHomeFromCommand(agentCommand, deps.mosaicHome));
const resetCommand = opts.clear
? '/clear'
: opts.new
? '/new'
: (roster.runtimes[rosterAgent.runtime]?.resetCommand ?? '/clear');
await runChecked(
runner,
buildAgentResetCommand(paths, agent, resetCommand, roster.tmux.socketName),
);
});
agentCommand
.command('tail <agent>')
.description('Print recent pane output for a local fleet agent')
.option('-n, --lines <number>', 'Number of pane history lines', '80')
.action(async (agent: string, opts: { lines: string }) => {
const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome);
getRosterAgent(roster, agent);
const lines = Number.parseInt(opts.lines, 10);
const result = await runner(
...splitCommand(
buildAgentTailCommand(agent, Number.isFinite(lines) ? lines : 80, roster.tmux.socketName),
),
);
writeCommandOutput(result);
});
}
async function installFleet(cmd: Command, frameworkRoot: string): Promise<void> {
const activePaths = resolveFleetPaths(cmd.opts<{ mosaicHome: string }>().mosaicHome);
assertDefaultMosaicHomeForSystemd(activePaths.mosaicHome);
const roster = await loadRosterForCommand(cmd);
await mkdir(activePaths.fleetToolsDir, { recursive: true });
await mkdir(activePaths.tmuxToolsDir, { recursive: true });
await mkdir(activePaths.systemdUserDir, { recursive: true });
await mkdir(activePaths.agentEnvDir, { recursive: true });
await copyFile(
join(frameworkRoot, 'tools', 'fleet', 'start-agent-session.sh'),
join(activePaths.fleetToolsDir, 'start-agent-session.sh'),
);
await copyFile(
join(frameworkRoot, 'tools', 'tmux', 'send-message.sh'),
join(activePaths.tmuxToolsDir, 'send-message.sh'),
);
await copyFile(
join(frameworkRoot, 'tools', 'tmux', 'agent-send.sh'),
join(activePaths.tmuxToolsDir, 'agent-send.sh'),
);
await copyFile(
join(frameworkRoot, 'systemd', 'user', 'mosaic-tmux-holder.service'),
join(activePaths.systemdUserDir, 'mosaic-tmux-holder.service'),
);
await copyFile(
join(frameworkRoot, 'systemd', 'user', 'mosaic-agent@.service'),
join(activePaths.systemdUserDir, 'mosaic-agent@.service'),
);
for (const agent of roster.agents) {
await writeFile(
join(activePaths.agentEnvDir, `${agent.name}.env`),
generateAgentEnv(roster, agent),
);
}
console.log(`Installed fleet files for ${roster.agents.length} agent(s).`);
}
async function loadRosterForCommand(cmd: Command): Promise<FleetRoster> {
const opts = cmd.opts<{ mosaicHome: string; roster?: string }>();
return loadFleetRoster(await resolveRosterPath(opts.mosaicHome, opts.roster));
}
async function loadRosterFromAgentCommand(
command: Command,
mosaicHomeOverride?: string,
): Promise<FleetRoster> {
const opts = command.optsWithGlobals<{ mosaicHome?: string; roster?: string }>();
const mosaicHome = opts.mosaicHome ?? mosaicHomeOverride ?? defaultMosaicHome();
return loadFleetRoster(await resolveRosterPath(mosaicHome, opts.roster));
}
function resolveMosaicHomeFromCommand(command: Command, override?: string): string {
const opts = command.optsWithGlobals<{ mosaicHome?: string }>();
return opts.mosaicHome ?? override ?? defaultMosaicHome();
}
function parseRosterText(text: string, path: string): RawFleetRoster {
const trimmed = text.trim();
if (path.endsWith('.json')) {
return JSON.parse(trimmed) as RawFleetRoster;
}
return YAML.parse(trimmed) as RawFleetRoster;
}
function normalizeRoster(raw: RawFleetRoster): FleetRoster {
assertObject(raw, 'Fleet roster');
assertKnownKeys(raw, 'Fleet roster', [
'version',
'transport',
'tmux',
'defaults',
'runtimes',
'agents',
]);
if (raw.tmux !== undefined) {
assertObject(raw.tmux, 'Fleet roster tmux');
assertKnownKeys(raw.tmux, 'Fleet roster tmux', [
'socket_name',
'socketName',
'holder_session',
'holderSession',
]);
}
if (raw.defaults !== undefined) {
assertObject(raw.defaults, 'Fleet roster defaults');
assertKnownKeys(raw.defaults, 'Fleet roster defaults', [
'working_directory',
'workingDirectory',
]);
}
if (raw.runtimes !== undefined) {
assertObject(raw.runtimes, 'Fleet roster runtimes');
for (const [runtime, config] of Object.entries(raw.runtimes)) {
assertObject(config, `Fleet roster runtime "${runtime}"`);
assertKnownKeys(config, `Fleet roster runtime "${runtime}"`, [
'reset_command',
'resetCommand',
]);
}
}
if (raw.version !== 1) {
throw new Error('Fleet roster version must be 1.');
}
if (raw.transport !== 'tmux') {
throw new Error('Fleet roster transport must be "tmux".');
}
if (!Array.isArray(raw.agents) || raw.agents.length === 0) {
throw new Error('Fleet roster must define at least one agent.');
}
const agents = raw.agents.map(normalizeAgent);
assertUniqueAgentNames(agents);
return {
version: 1,
transport: 'tmux',
tmux: {
socketName: stringValue(
raw.tmux?.socket_name ?? raw.tmux?.socketName,
DEFAULT_SOCKET_NAME,
'Fleet roster tmux socket_name',
),
holderSession: stringValue(
raw.tmux?.holder_session ?? raw.tmux?.holderSession,
DEFAULT_HOLDER_SESSION,
'Fleet roster tmux holder_session',
),
},
defaults: {
workingDirectory: stringValue(
raw.defaults?.working_directory ?? raw.defaults?.workingDirectory,
DEFAULT_WORKING_DIRECTORY,
'Fleet roster defaults working_directory',
),
},
runtimes: normalizeRuntimes(raw.runtimes as RawFleetRoster['runtimes']),
agents,
};
}
function normalizeAgent(raw: NonNullable<RawFleetRoster['agents']>[number]): FleetAgent {
assertObject(raw, 'Fleet roster agent');
assertKnownKeys(raw, 'Fleet roster agent', [
'name',
'runtime',
'class',
'working_directory',
'workingDirectory',
'model_hint',
'modelHint',
'persistent_persona',
'persistentPersona',
'reset_between_tasks',
'resetBetweenTasks',
'kickstart_template',
'kickstartTemplate',
]);
const name = stringValue(raw.name, '', 'Fleet roster agent name');
const runtime = stringValue(
raw.runtime,
'',
`Fleet roster agent "${name || '<unknown>'}" runtime`,
);
if (!name || !/^[A-Za-z0-9_.-]+$/.test(name)) {
throw new Error(`Invalid fleet agent name: ${name || '<empty>'}`);
}
if (!runtime) {
throw new Error(`Fleet agent "${name}" must define a runtime.`);
}
return {
name,
runtime,
className: stringValue(raw.class, 'worker', `Fleet roster agent "${name}" class`),
workingDirectory: optionalString(
raw.working_directory ?? raw.workingDirectory,
`Fleet roster agent "${name}" working_directory`,
),
modelHint: optionalString(
raw.model_hint ?? raw.modelHint,
`Fleet roster agent "${name}" model_hint`,
),
persistentPersona: optionalBooleanOrString(
raw.persistent_persona ?? raw.persistentPersona,
`Fleet roster agent "${name}" persistent_persona`,
),
resetBetweenTasks: optionalBoolean(
raw.reset_between_tasks ?? raw.resetBetweenTasks,
`Fleet roster agent "${name}" reset_between_tasks`,
),
kickstartTemplate: optionalString(
raw.kickstart_template ?? raw.kickstartTemplate,
`Fleet roster agent "${name}" kickstart_template`,
),
};
}
function normalizeRuntimes(
raw: RawFleetRoster['runtimes'] | undefined,
): Record<string, { resetCommand: string }> {
const result: Record<string, { resetCommand: string }> = { ...DEFAULT_RUNTIME_RESETS };
for (const [runtime, config] of Object.entries(raw ?? {})) {
result[runtime] = {
resetCommand: stringValue(
config.reset_command ?? config.resetCommand,
'/clear',
`Fleet roster runtime "${runtime}" reset_command`,
),
};
}
return result;
}
function assertObject(value: unknown, label: string): asserts value is Record<string, unknown> {
if (!value || typeof value !== 'object' || Array.isArray(value)) {
throw new Error(`${label} must be an object.`);
}
}
function assertKnownKeys(
value: Record<string, unknown>,
label: string,
allowedKeys: readonly string[],
): void {
const allowed = new Set(allowedKeys);
const unknownKeys = Object.keys(value).filter((key) => !allowed.has(key));
if (unknownKeys.length > 0) {
throw new Error(`${label} has unknown field(s): ${unknownKeys.join(', ')}.`);
}
}
function assertUniqueAgentNames(agents: FleetAgent[]): void {
const seen = new Set<string>();
for (const agent of agents) {
if (seen.has(agent.name)) {
throw new Error(`Fleet roster has duplicate agent name: ${agent.name}.`);
}
seen.add(agent.name);
}
}
function stringValue(value: unknown, fallback = '', label = 'Value'): string {
if (value === undefined) {
return fallback;
}
if (typeof value !== 'string') {
throw new Error(`${label} must be a string.`);
}
return value;
}
function optionalString(value: unknown, label = 'Value'): string | undefined {
if (value === undefined) {
return undefined;
}
if (typeof value !== 'string') {
throw new Error(`${label} must be a string.`);
}
return value;
}
function optionalBoolean(value: unknown, label = 'Value'): boolean | undefined {
if (value === undefined) {
return undefined;
}
if (typeof value !== 'boolean') {
throw new Error(`${label} must be a boolean.`);
}
return value;
}
function optionalBooleanOrString(value: unknown, label = 'Value'): boolean | string | undefined {
if (value === undefined) {
return undefined;
}
if (typeof value !== 'boolean' && typeof value !== 'string') {
throw new Error(`${label} must be a boolean or string.`);
}
return value;
}
function expandHome(path: string): string {
return path === '~' || path.startsWith('~/') ? join(homedir(), path.slice(2)) : path;
}
function shellEnvValue(value: string): string {
if (/^[A-Za-z0-9_./:=@+-]+$/.test(value)) {
return value;
}
return `'${value.replaceAll("'", "'\"'\"'")}'`;
}
async function stopFleetBestEffort(runner: CommandRunner, agentNames: string[]): Promise<void> {
const failures: string[] = [];
for (const agentName of agentNames) {
const command = buildFleetServiceCommand('stop', agentName);
const result = await runner(...splitCommand(command));
writeSuccessfulCommandOutput(result);
if (result.exitCode !== 0) {
failures.push(result.stderr || result.stdout || `Command failed: ${command.join(' ')}`);
}
}
const holderCommand = buildFleetServiceCommand('stop');
const holderResult = await runner(...splitCommand(holderCommand));
writeSuccessfulCommandOutput(holderResult);
if (holderResult.exitCode !== 0) {
failures.push(
holderResult.stderr || holderResult.stdout || `Command failed: ${holderCommand.join(' ')}`,
);
}
if (failures.length > 0) {
throw new Error(
`Fleet stop completed with ${failures.length} failure(s): ${failures.join('; ')}`,
);
}
}
async function runChecked(runner: CommandRunner, command: string[]): Promise<void> {
const result = await runner(...splitCommand(command));
if (result.exitCode !== 0) {
throw new Error(result.stderr || result.stdout || `Command failed: ${command.join(' ')}`);
}
if (result.stdout) {
process.stdout.write(result.stdout);
}
}
function splitCommand(command: string[]): [string, string[]] {
const [bin, ...args] = command;
if (!bin) {
throw new Error('Cannot run an empty command.');
}
return [bin, args];
}
function parseInitProfile(profile: string): 'minimal' | 'local-canary' {
if (profile === 'minimal' || profile === 'local-canary') {
return profile;
}
throw new Error(`Unsupported fleet profile "${profile}". Use: minimal, local-canary.`);
}
function writeCommandOutput(result: CommandResult): void {
if (result.stdout) {
process.stdout.write(result.stdout);
} else if (result.stderr) {
process.stderr.write(result.stderr);
}
setExitCodeFromResult(result);
}
function writeSuccessfulCommandOutput(result: CommandResult): void {
if (result.exitCode !== 0) {
return;
}
if (result.stdout) {
process.stdout.write(result.stdout);
}
}
function setExitCodeFromResult(result: CommandResult): void {
if (result.exitCode !== 0) {
process.exitCode = result.exitCode;
}
}
function runCommand(command: string, args: string[]): Promise<CommandResult> {
return new Promise((resolvePromise) => {
const child = spawn(command, args, { stdio: ['ignore', 'pipe', 'pipe'] });
let stdout = '';
let stderr = '';
child.stdout.on('data', (chunk: Buffer) => {
stdout += chunk.toString('utf8');
});
child.stderr.on('data', (chunk: Buffer) => {
stderr += chunk.toString('utf8');
});
child.on('error', (error) => {
resolvePromise({ stdout, stderr: error.message, exitCode: 127 });
});
child.on('close', (code) => {
resolvePromise({ stdout, stderr, exitCode: code ?? 1 });
});
});
}
function resolveFrameworkRoot(): string {
const currentFile = fileURLToPath(import.meta.url);
return resolve(dirname(currentFile), '..', '..', 'framework');
}
async function canRead(path: string): Promise<boolean> {
try {
await access(path, constants.R_OK);
return true;
} catch {
return false;
}
}
export async function resolveRosterPath(
mosaicHome: string,
explicitPath?: string,
): Promise<string> {
if (explicitPath) {
return explicitPath;
}
const yamlPath = resolveFleetPaths(mosaicHome).rosterPath;
if (await canRead(yamlPath)) {
return yamlPath;
}
const jsonPath = join(mosaicHome, 'fleet', 'roster.json');
return jsonPath;
}