feat(fleet): Phase-2 observability — fleet ps + watch + send verify (#579)
All checks were successful
ci/woodpecker/push/ci Pipeline was successful
ci/woodpecker/push/publish Pipeline was successful

This commit was merged in pull request #579.
This commit is contained in:
2026-06-21 04:23:51 +00:00
parent 5118be74cb
commit af2eede7a9
6 changed files with 2041 additions and 6 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,12 +1,19 @@
import { constants } from 'node:fs';
import { access, chmod, copyFile, mkdir, readFile, writeFile } from 'node:fs/promises';
import { homedir, hostname } from 'node:os';
import { homedir, hostname, userInfo } from 'node:os';
import { dirname, join, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
import { spawn } from 'node:child_process';
import type { Command } from 'commander';
import YAML from 'yaml';
/**
* A function that spawns a command with inherited stdio (TTY passthrough).
* Used for interactive commands like `tmux attach` that need a real terminal.
* Resolves with the process exit code.
*/
export type InteractiveRunner = (command: string, args: string[]) => Promise<number>;
export interface CommandResult {
stdout: string;
stderr: string;
@@ -15,8 +22,23 @@ export interface CommandResult {
export type CommandRunner = (command: string, args: string[]) => Promise<CommandResult>;
/**
* Injectable sleep helper used by the send --verify polling loop.
* Tests stub this to avoid real delays; production uses the default
* implementation backed by setTimeout.
*/
export type SleepFn = (ms: number) => Promise<void>;
export interface FleetCommandDeps {
runner?: CommandRunner;
/** Injectable interactive runner for commands needing inherited TTY (e.g., `tmux attach`). */
interactiveRunner?: InteractiveRunner;
/**
* Injectable sleep function for the send --verify polling loop.
* Defaults to a real setTimeout-based sleep. Tests stub this to avoid
* real delays; the default is used in production.
*/
sleepFn?: SleepFn;
mosaicHome?: string;
frameworkRoot?: string;
}
@@ -92,6 +114,18 @@ type FleetServiceAction = 'start' | 'stop' | 'restart' | 'status';
const DEFAULT_SOCKET_NAME = 'mosaic-factory';
const DEFAULT_HOLDER_SESSION = '_holder';
const DEFAULT_WORKING_DIRECTORY = '~/src';
/**
* Default poll interval (ms) between capture-pane checks in `send --verify`.
* Kept short enough to react quickly while not hammering tmux on busy hosts.
*/
export const VERIFY_POLL_INTERVAL_MS = 400;
/**
* Default total timeout (ms) for the `send --verify` polling loop.
* Configurable via `--verify-timeout <ms>` on `agent send`.
*/
export const VERIFY_DEFAULT_TIMEOUT_MS = 6_000;
const DEFAULT_RUNTIME_RESETS: Record<string, { resetCommand: string }> = {
claude: { resetCommand: '/clear' },
codex: { resetCommand: '/clear' },
@@ -236,6 +270,401 @@ export function buildAgentTailCommand(
];
}
// ---------------------------------------------------------------------------
// Fleet ps — phase 2 observability helpers
// ---------------------------------------------------------------------------
export const HEARTBEAT_INTERVAL_MS = 15_000;
export const HEARTBEAT_HEALTHY_MULTIPLIER = 3;
export interface HeartbeatInfo {
ts: Date | null;
pid: number | null;
status: 'ok' | 'busy' | null;
/** healthy | stale | unknown */
health: 'healthy' | 'stale' | 'unknown';
ageMs: number | null;
}
export interface AgentPsRow {
name: string;
tenant_id: string;
host: string;
runtime: string;
systemdActive: string;
systemdEnabled: string;
paneAlive: boolean;
panePid: number | null;
paneCommand: string | null;
idleSeconds: number | null;
heartbeat: HeartbeatInfo;
/** roster runtime !== actual pane command */
driftFlag: boolean;
/** active but UnitFileState=disabled */
bootEnableWarning: boolean;
}
/**
* Returns the systemd show command for an agent unit (active+enabled state).
* Returns: `systemctl --user show <unit> -p ActiveState -p SubState -p UnitFileState`
*/
export function buildSystemdShowCommand(agentName: string): string[] {
const unit = `mosaic-agent@${agentName}.service`;
return [
'systemctl',
'--user',
'show',
unit,
'-p',
'ActiveState',
'-p',
'SubState',
'-p',
'UnitFileState',
];
}
/**
* Returns the tmux list-panes command for an agent pane.
* Format: `#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}`
*/
export function buildTmuxListPanesCommand(
agentName: string,
socketName = DEFAULT_SOCKET_NAME,
): string[] {
return [
'tmux',
'-L',
socketName,
'list-panes',
'-t',
`=${agentName}:0.0`,
'-F',
'#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}',
];
}
/**
* Returns the heartbeat file path for an agent.
*/
export function heartbeatPath(agentName: string, mosaicHome = defaultMosaicHome()): string {
return join(mosaicHome, 'fleet', 'run', `${agentName}.hb`);
}
/**
* Parse a heartbeat file's contents into a HeartbeatInfo.
* File format (one key=value per line):
* ts=<iso8601>
* pid=<pid>
* status=<ok|busy>
*/
export function parseHeartbeat(content: string | null, nowMs = Date.now()): HeartbeatInfo {
if (content === null) {
return { ts: null, pid: null, status: null, health: 'unknown', ageMs: null };
}
const lines = content.split('\n');
let ts: Date | null = null;
let pid: number | null = null;
let status: 'ok' | 'busy' | null = null;
for (const line of lines) {
const [key, ...rest] = line.split('=');
const val = rest.join('=').trim();
if (key === 'ts' && val) {
const d = new Date(val);
if (!Number.isNaN(d.getTime())) ts = d;
} else if (key === 'pid' && val) {
const n = Number.parseInt(val, 10);
if (Number.isFinite(n)) pid = n;
} else if (key === 'status' && (val === 'ok' || val === 'busy')) {
status = val;
}
}
const thresholdMs = HEARTBEAT_INTERVAL_MS * HEARTBEAT_HEALTHY_MULTIPLIER;
let health: 'healthy' | 'stale' | 'unknown' = 'unknown';
let ageMs: number | null = null;
if (ts !== null) {
ageMs = nowMs - ts.getTime();
health = ageMs <= thresholdMs ? 'healthy' : 'stale';
}
return { ts, pid, status, health, ageMs };
}
/**
* Parse the output of `systemctl --user show ... -p ActiveState -p SubState -p UnitFileState`
* Returns an object with the three properties.
*/
export function parseSystemdShow(output: string): {
ActiveState: string;
SubState: string;
UnitFileState: string;
} {
const result: Record<string, string> = {};
for (const line of output.split('\n')) {
const eq = line.indexOf('=');
if (eq !== -1) {
result[line.slice(0, eq)] = line.slice(eq + 1).trim();
}
}
return {
ActiveState: result['ActiveState'] ?? 'unknown',
SubState: result['SubState'] ?? 'unknown',
UnitFileState: result['UnitFileState'] ?? 'unknown',
};
}
/**
* Parse the output of `tmux list-panes -F '#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}'`
* pane_activity is a Unix epoch timestamp (seconds).
*/
export function parseTmuxListPanes(
output: string,
nowMs = Date.now(),
): { pid: number | null; command: string | null; dead: boolean; idleSeconds: number | null } {
const line = output.trim().split('\n')[0];
if (!line) {
return { pid: null, command: null, dead: true, idleSeconds: null };
}
// format: <pid> <command> <dead(0|1)> <activity_epoch>
const parts = line.split(' ');
const pid = parts[0] ? (Number.isFinite(Number(parts[0])) ? Number(parts[0]) : null) : null;
const command = parts[1] ?? null;
const dead = parts[2] === '1';
const activityEpoch = parts[3] ? Number(parts[3]) : NaN;
const idleSeconds =
Number.isFinite(activityEpoch) && activityEpoch > 0
? Math.floor((nowMs - activityEpoch * 1000) / 1000)
: null;
return { pid, command, dead, idleSeconds };
}
/**
* Determine if there is a runtime drift: roster says one runtime but the pane
* is actually running something from a different runtime. We detect this by
* checking if the pane command doesn't match a known canonical command for the
* roster's declared runtime.
*
* Known canonical commands per runtime:
* claude → claude
* codex → codex
* opencode → opencode
* pi → pi
*
* If the pane is running something else (e.g., python3/dogfood-agent.py) for
* an agent whose roster runtime is "pi", that's a drift.
*/
export function detectDrift(rosterRuntime: string, paneCommand: string | null): boolean {
if (!paneCommand) return false;
const knownCommands: Record<string, string[]> = {
claude: ['claude'],
codex: ['codex'],
opencode: ['opencode'],
pi: ['pi'],
};
const expected = knownCommands[rosterRuntime];
if (!expected) return false;
return !expected.includes(paneCommand);
}
/**
* Returns the default tenant_id (OS username) and host (short hostname).
* These MUST appear in every --json record for multi-tenant/multi-host zero-foreclosure.
*/
export function getDefaultTenantAndHost(): { tenant_id: string; host: string } {
let tenant_id: string;
try {
tenant_id = userInfo().username;
} catch {
tenant_id = process.env['USER'] ?? process.env['LOGNAME'] ?? 'unknown';
}
const host = hostname().split('.')[0] || 'localhost';
return { tenant_id, host };
}
/**
* Builds the command to create a grouped viewer session targeting an agent session.
* A grouped session shares the same windows as the target but gets INDEPENDENT sizing,
* so attaching the viewer never resizes the agent's window.
*
* The viewer session name is derived from the agent name and a unique suffix (typically
* the caller's PID) so multiple concurrent watchers don't collide.
*
* Usage sequence:
* 1. Run buildAgentWatchCreateViewerCommand → create grouped session (via capturing runner).
* 2. Run buildAgentWatchAttachCommand → attach -r to the viewer session (via interactiveRunner).
* 3. Run buildAgentWatchKillViewerCommand → kill the viewer session on detach (via capturing runner).
*/
export function buildAgentWatchCreateViewerCommand(
agentName: string,
viewerSessionName: string,
socketName = DEFAULT_SOCKET_NAME,
): string[] {
return [
'tmux',
'-L',
socketName,
'new-session',
'-d',
'-t',
`=${agentName}`,
'-s',
viewerSessionName,
];
}
/**
* Builds the interactive attach command for a viewer session (read-only).
* Must be run via interactiveRunner (stdio: 'inherit').
*/
export function buildAgentWatchAttachCommand(
viewerSessionName: string,
socketName = DEFAULT_SOCKET_NAME,
): string[] {
return ['tmux', '-L', socketName, 'attach', '-r', '-t', viewerSessionName];
}
/**
* Builds the kill-session command to clean up a viewer session after detach.
* Keeps the agent session intact.
*/
export function buildAgentWatchKillViewerCommand(
viewerSessionName: string,
socketName = DEFAULT_SOCKET_NAME,
): string[] {
return ['tmux', '-L', socketName, 'kill-session', '-t', viewerSessionName];
}
/**
* Returns a unique viewer session name for a given agent.
* Uses process.pid so concurrent watchers produce distinct names.
*/
export function buildViewerSessionName(agentName: string): string {
return `${agentName}-watch-${process.pid}`;
}
/**
* @deprecated Use buildAgentWatchCreateViewerCommand + buildAgentWatchAttachCommand +
* buildAgentWatchKillViewerCommand instead. This bare attach targets the agent session
* directly and can resize it when the viewer terminal is smaller than the agent's window.
*
* Kept for backward compatibility only.
*/
export function buildAgentWatchCommand(
agentName: string,
socketName = DEFAULT_SOCKET_NAME,
): string[] {
return ['tmux', '-L', socketName, 'attach', '-r', '-t', `=${agentName}`];
}
/**
* Builds the capture-pane command used to verify that agent send was accepted
* (not left as an unsubmitted draft). Captures the last N lines and checks for
* the draft heuristic.
*/
export function buildAgentVerifyAcceptedCommand(
agentName: string,
socketName = DEFAULT_SOCKET_NAME,
lines = 5,
): string[] {
return [
'tmux',
'-L',
socketName,
'capture-pane',
'-t',
`=${agentName}:0.0`,
'-p',
'-S',
`-${lines}`,
];
}
/**
* Result of a send-verify check.
* - 'accepted': positive evidence that the message was accepted (response content present).
* - 'draft': last non-empty line matches the draft heuristic (unsubmitted input).
* - 'unverifiable': pane did not change after send (stale or blank) — we cannot determine
* acceptance; fails closed per FR-5.
*/
export type SendVerifyResult = 'accepted' | 'draft' | 'unverifiable';
/**
* Classify the result of a send-verify check by comparing BEFORE and AFTER pane snapshots.
*
* This is the primary classifier for `send --verify`. It addresses the stale-pane
* false-success problem: if the pane content did not change after the send, the new
* message never registered in the TUI (wedged pane, send dropped, etc.).
*
* Classification logic:
* 'unverifiable' — AFTER is blank/empty OR AFTER == BEFORE (no pane change after send).
* 'draft' — AFTER differs from BEFORE AND the last non-empty line of AFTER starts
* with the draft pattern ("> "); message was typed but not submitted.
* 'accepted' — AFTER differs from BEFORE AND AFTER does not end in a draft line;
* positive evidence that the TUI accepted the message.
*
* NOTE on blank AFTER: Full-screen TUIs (claude, codex, opencode, pi) render blank for
* `tmux capture-pane`. A blank AFTER is indistinguishable from a wedged pane, so it
* is always classified 'unverifiable' (fail-closed).
*
* NOTE on definitive acceptance: Phase-2 can only observe the pane surface — there is no
* runtime acknowledgement (heartbeat-ack) at this phase. The pane-change check is the best
* signal available against an opaque TUI. Definitive acceptance ultimately requires a
* runtime acknowledgement (Phase-3 heartbeat-ack).
*
* Draft heuristic: a last non-empty line (after stripping ANSI escapes) that starts
* with "> " is treated as an unsubmitted input line. This pattern is specific to
* pi/claude TUIs; draft detection for codex/opencode TUIs is best-effort only.
*
* FR-5 requires `send --verify` to return non-zero when delivery cannot be verified.
*
* @param before Pane snapshot captured BEFORE the send command.
* @param after Pane snapshot captured AFTER the send command (after the delay).
*/
export function classifySendResult(before: string, after: string): SendVerifyResult {
const afterLines = after.split('\n').filter((l) => l.trim().length > 0);
// Blank/empty AFTER => full-screen TUI rendered blank, or pane is wedged => unverifiable.
if (afterLines.length === 0) return 'unverifiable';
// No change => message didn't register in the TUI (stale/wedged pane) => unverifiable.
if (after === before) return 'unverifiable';
// AFTER differs from BEFORE — check whether the pane is now showing a draft line.
const lastLine = afterLines[afterLines.length - 1]!;
const stripped = lastLine.replace(/\x1b\[[0-9;]*m/g, '').trim();
// Heuristic: if stripped last line starts with "> " — that's the common draft pattern
// in pi/claude TUIs for showing user input before submission.
// NOTE: this heuristic is pi/claude-specific; draft detection for codex/opencode
// TUIs is best-effort only and may miss other unsubmitted-input indicators.
if (/^>\s/.test(stripped)) return 'draft';
return 'accepted';
}
/**
* Check whether a send was accepted (not left as draft), using only the AFTER snapshot.
*
* @deprecated Prefer classifySendResult(before, after) which guards against stale-pane
* false-successes. This single-snapshot variant cannot detect a wedged pane that still
* shows old non-empty content — it will incorrectly return 'accepted' in that case.
*
* Retained for unit-test compatibility with single-snapshot assertions.
*
* Returns:
* 'unverifiable' — blank/empty capture (full-screen TUIs render blank; we cannot tell).
* 'draft' — last non-empty line matches the draft heuristic.
* 'accepted' — non-blank and not a draft line (but may be stale — see above).
*/
export function isSendAccepted(capturedOutput: string): SendVerifyResult {
const lines = capturedOutput.split('\n').filter((l) => l.trim().length > 0);
// Blank/empty capture => full-screen TUI rendered blank => unverifiable.
// This is the known-unverifiable case; fail closed (not treated as success).
if (lines.length === 0) return 'unverifiable';
const lastLine = lines[lines.length - 1]!;
const stripped = lastLine.replace(/\x1b\[[0-9;]*m/g, '').trim();
// Heuristic: if stripped last line starts with "> " — that's the common draft pattern
// in pi/claude TUIs for showing user input before submission.
// NOTE: this heuristic is pi/claude-specific; draft detection for codex/opencode
// TUIs is best-effort only and may miss other unsubmitted-input indicators.
if (/^>\s/.test(stripped)) return 'draft';
return 'accepted';
}
export function registerFleetCommand(program: Command, deps: FleetCommandDeps = {}): Command {
const runner = deps.runner ?? runCommand;
const paths = resolveFleetPaths(deps.mosaicHome);
@@ -360,6 +789,113 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
console.log(`Verified fleet on tmux socket ${socketName}.`);
});
cmd
.command('ps')
.description('Show real-time status for all roster agents (systemd + tmux + heartbeat)')
.option('--json', 'Print JSON array')
.action(async (opts: { json?: boolean }) => {
const commandOpts = cmd.opts<{ mosaicHome: string; roster?: string }>();
const activePaths = resolveFleetPaths(commandOpts.mosaicHome);
const roster = await loadRosterForCommand(cmd);
const { tenant_id, host } = getDefaultTenantAndHost();
const nowMs = Date.now();
const rows: AgentPsRow[] = [];
for (const agent of roster.agents) {
// systemd show
const showResult = await runner(...splitCommand(buildSystemdShowCommand(agent.name)));
const sysInfo = parseSystemdShow(showResult.stdout);
// tmux list-panes
const panesResult = await runner(
...splitCommand(buildTmuxListPanesCommand(agent.name, roster.tmux.socketName)),
);
const paneInfo = parseTmuxListPanes(panesResult.stdout, nowMs);
// heartbeat
const hbFile = heartbeatPath(agent.name, activePaths.mosaicHome);
let hbContent: string | null = null;
try {
hbContent = await readFile(hbFile, 'utf8');
} catch {
hbContent = null;
}
const hb = parseHeartbeat(hbContent, nowMs);
// drift and boot-enable
const driftFlag = detectDrift(agent.runtime, paneInfo.command);
const bootEnableWarning =
sysInfo.ActiveState === 'active' && sysInfo.UnitFileState === 'disabled';
rows.push({
name: agent.name,
tenant_id,
host,
runtime: agent.runtime,
systemdActive: sysInfo.ActiveState,
systemdEnabled: sysInfo.UnitFileState,
paneAlive: !paneInfo.dead,
panePid: paneInfo.pid,
paneCommand: paneInfo.command,
idleSeconds: paneInfo.idleSeconds,
heartbeat: hb,
driftFlag,
bootEnableWarning,
});
}
if (opts.json) {
console.log(JSON.stringify(rows, null, 2));
return;
}
// Table output
const header = [
'NAME'.padEnd(18),
'TENANT'.padEnd(12),
'HOST'.padEnd(12),
'RUNTIME'.padEnd(10),
'SYSTEMD'.padEnd(16),
'PANE'.padEnd(8),
'PID'.padEnd(8),
'IDLE'.padEnd(8),
'HB'.padEnd(12),
'FLAGS',
].join(' ');
console.log(header);
console.log('-'.repeat(header.length));
for (const row of rows) {
const systemd = `${row.systemdActive}/${row.systemdEnabled}`;
const pane = row.paneAlive ? 'alive' : 'dead';
const pid = row.panePid !== null ? String(row.panePid) : '-';
const idle = row.idleSeconds !== null ? `${row.idleSeconds}s` : '-';
const hbAge =
row.heartbeat.ageMs !== null
? `${Math.round(row.heartbeat.ageMs / 1000)}s/${row.heartbeat.health}`
: `unknown`;
const flags: string[] = [];
if (row.driftFlag) flags.push('DRIFT');
if (row.bootEnableWarning) flags.push('BOOT-ENABLE');
console.log(
[
row.name.padEnd(18),
row.tenant_id.padEnd(12),
row.host.padEnd(12),
row.runtime.padEnd(10),
systemd.padEnd(16),
pane.padEnd(8),
pid.padEnd(8),
idle.padEnd(8),
hbAge.padEnd(12),
flags.join(','),
].join(' '),
);
}
});
return cmd;
}
@@ -368,6 +904,8 @@ export function registerFleetAgentCommands(
deps: FleetCommandDeps = {},
): void {
const runner = deps.runner ?? runCommand;
const iRunner = deps.interactiveRunner ?? spawnInteractive;
const sleepFn = deps.sleepFn ?? defaultSleep;
agentCommand
.command('roster')
@@ -417,21 +955,141 @@ export function registerFleetAgentCommands(
.requiredOption('--message <text>', 'Message text')
.option('--source-label <label>', 'Source label for the message preamble')
.option('--source <label>', 'Alias for --source-label')
.option(
'--verify',
'Verify message was accepted (not left as a draft); exit non-zero if unverifiable',
)
.option(
'--verify-timeout <ms>',
`Maximum time (ms) to poll for pane change when --verify is set (default: ${VERIFY_DEFAULT_TIMEOUT_MS})`,
String(VERIFY_DEFAULT_TIMEOUT_MS),
)
.action(
async (agent: string, opts: { message: string; sourceLabel?: string; source?: string }) => {
async (
agent: string,
opts: {
message: string;
sourceLabel?: string;
source?: string;
verify?: boolean;
verifyTimeout?: string;
},
) => {
const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome);
getRosterAgent(roster, agent);
const paths = resolveFleetPaths(
resolveMosaicHomeFromCommand(agentCommand, deps.mosaicHome),
);
const sourceLabel = opts.sourceLabel ?? opts.source ?? getDefaultOperatorSourceLabel();
await runChecked(
runner,
buildAgentSendCommand(paths, agent, opts.message, roster.tmux.socketName, sourceLabel),
);
if (opts.verify) {
const parsedTimeout =
opts.verifyTimeout !== undefined ? Number.parseInt(opts.verifyTimeout, 10) : Number.NaN;
const timeoutMs = Number.isFinite(parsedTimeout)
? Math.max(0, parsedTimeout)
: VERIFY_DEFAULT_TIMEOUT_MS;
// Capture BEFORE snapshot so we can detect stale-pane false-successes.
// A wedged pane that still shows old non-empty content must not be reported
// as 'accepted' — we compare BEFORE vs AFTER to guard against that case.
const beforeResult = await runner(
...splitCommand(buildAgentVerifyAcceptedCommand(agent, roster.tmux.socketName)),
);
if (beforeResult.exitCode !== 0) {
throw new Error(
`send --verify: could not capture pane output before send (tmux exited ${beforeResult.exitCode}).`,
);
}
const beforeSnapshot = beforeResult.stdout;
await runChecked(
runner,
buildAgentSendCommand(paths, agent, opts.message, roster.tmux.socketName, sourceLabel),
);
// Bounded polling loop: poll capture-pane every VERIFY_POLL_INTERVAL_MS up to
// timeoutMs. Return immediately when the pane shows 'accepted' or 'draft';
// keep polling while 'unverifiable' (no pane change yet). Fail closed after
// timeout with the existing "no pane change after send" message.
const deadline = Date.now() + timeoutMs;
let verifyResult: SendVerifyResult = 'unverifiable';
while (true) {
await sleepFn(VERIFY_POLL_INTERVAL_MS);
const afterResult = await runner(
...splitCommand(buildAgentVerifyAcceptedCommand(agent, roster.tmux.socketName)),
);
if (afterResult.exitCode !== 0) {
throw new Error(
`send --verify: could not capture pane output to verify acceptance (tmux exited ${afterResult.exitCode}).`,
);
}
verifyResult = classifySendResult(beforeSnapshot, afterResult.stdout);
// Definitive result — stop polling immediately.
if (verifyResult === 'accepted' || verifyResult === 'draft') {
break;
}
// Still unverifiable — check if we have time left to poll again.
if (Date.now() >= deadline) {
break;
}
}
if (verifyResult === 'draft') {
process.exitCode = 1;
process.stderr.write(
`send --verify: message left as unsubmitted draft in agent "${agent}".\n`,
);
} else if (verifyResult === 'unverifiable') {
process.exitCode = 1;
process.stderr.write(
`send --verify: could not verify delivery (no pane change after send) for agent "${agent}".\n`,
);
}
} else {
await runChecked(
runner,
buildAgentSendCommand(paths, agent, opts.message, roster.tmux.socketName, sourceLabel),
);
}
},
);
agentCommand
.command('watch <agent>')
.description('Open a read-only view of a fleet agent tmux session (cannot send keystrokes)')
.action(async (agent: string) => {
const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome);
getRosterAgent(roster, agent);
// Use a GROUPED VIEWER SESSION to prevent the observer from resizing the agent's
// window. A bare `tmux attach -r` against the agent session itself still lets the
// client shrink the session to its terminal size; a grouped session gets INDEPENDENT
// sizing so the agent's window is never affected by the viewer's terminal dimensions.
//
// Sequence:
// 1. Create a throwaway grouped session targeting the agent (capturing runner).
// 2. Attach -r (read-only) to the viewer session (interactiveRunner / TTY).
// 3. Kill the viewer session on detach so stale sessions don't accumulate.
const viewerName = buildViewerSessionName(agent);
const socketName = roster.tmux.socketName;
await runChecked(runner, buildAgentWatchCreateViewerCommand(agent, viewerName, socketName));
const [bin, args] = splitCommand(buildAgentWatchAttachCommand(viewerName, socketName));
const exitCode = await iRunner(bin, args);
// Best-effort cleanup of the viewer session regardless of how the user detached.
// Errors here are intentionally suppressed — the agent session is unaffected.
const killResult = await runner(
...splitCommand(buildAgentWatchKillViewerCommand(viewerName, socketName)),
);
void killResult; // result is intentionally ignored
if (exitCode !== 0) {
process.exitCode = exitCode;
}
});
agentCommand
.command('reset <agent>')
.description('Reset a local fleet agent by sending the runtime reset command')
@@ -864,6 +1522,32 @@ function resolveFrameworkRoot(): string {
return resolve(dirname(currentFile), '..', '..', 'framework');
}
/**
* Default InteractiveRunner implementation: spawns the command with inherited
* stdio so the terminal is passed through to the child process. This is required
* for commands like `tmux attach` that are full-screen interactive and cannot be
* captured through a pipe.
*/
function spawnInteractive(command: string, args: string[]): Promise<number> {
return new Promise((resolvePromise) => {
const child = spawn(command, args, { stdio: 'inherit' });
child.on('error', () => {
resolvePromise(127);
});
child.on('close', (code) => {
resolvePromise(code ?? 1);
});
});
}
/**
* Default SleepFn implementation backed by setTimeout.
* Tests inject a stub to avoid real delays in the send --verify polling loop.
*/
function defaultSleep(ms: number): Promise<void> {
return new Promise<void>((res) => setTimeout(res, ms));
}
async function canRead(path: string): Promise<boolean> {
try {
await access(path, constants.R_OK);