feat(fleet): Phase-2 observability — fleet ps + watch + send verify #579

Merged
jason.woltje merged 9 commits from feat/fleet-observability into main 2026-06-21 04:23:52 +00:00
2 changed files with 922 additions and 2 deletions
Showing only changes of commit cf304eebc3 - Show all commits

View File

@@ -5,14 +5,26 @@ import { Command } from 'commander';
import { afterEach, describe, expect, it, vi } from 'vitest';
import {
buildAgentSendCommand,
buildAgentWatchCommand,
buildAgentVerifyAcceptedCommand,
buildFleetServiceCommand,
buildSystemdShowCommand,
buildTmuxListPanesCommand,
detectDrift,
generateAgentEnv,
getDefaultOperatorSourceLabel,
getDefaultTenantAndHost,
getRosterAgent,
heartbeatPath,
isSendAccepted,
loadFleetRoster,
mergeAgentEnv,
parseHeartbeat,
parseSystemdShow,
parseTmuxListPanes,
registerFleetCommand,
resolveFleetPaths,
type AgentPsRow,
type CommandRunner,
} from './fleet.js';
import { registerAgentCommand } from './agent.js';
@@ -39,6 +51,7 @@ describe('registerFleetCommand', () => {
'init',
'install',
'install-systemd',
'ps',
'restart',
'start',
'status',
@@ -59,6 +72,7 @@ describe('registerFleetCommand', () => {
'send',
'status',
'tail',
'watch',
]);
});
});
@@ -736,3 +750,473 @@ describe('fleet command construction', () => {
expect(packageJson.files).toEqual(expect.arrayContaining(['dist', 'framework']));
});
});
// ---------------------------------------------------------------------------
// Phase-2 observability — unit tests (FR-1, FR-3, FR-5, FR-6)
// ---------------------------------------------------------------------------
describe('fleet ps — command construction', () => {
it('builds exact systemd show command for an agent unit', () => {
expect(buildSystemdShowCommand('canary-pi')).toEqual([
'systemctl',
'--user',
'show',
'mosaic-agent@canary-pi.service',
'-p',
'ActiveState',
'-p',
'SubState',
'-p',
'UnitFileState',
]);
});
it('builds exact tmux list-panes command with the correct format string', () => {
expect(buildTmuxListPanesCommand('canary-pi', 'mosaic-factory')).toEqual([
'tmux',
'-L',
'mosaic-factory',
'list-panes',
'-t',
'=canary-pi:0.0',
'-F',
'#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}',
]);
});
it('uses DEFAULT_SOCKET_NAME when socket is omitted from list-panes', () => {
const cmd = buildTmuxListPanesCommand('canary-pi');
expect(cmd[2]).toBe('mosaic-factory');
});
it('derives heartbeat path under ~/.config/mosaic/fleet/run/', () => {
const home = '/home/test/.config/mosaic';
expect(heartbeatPath('canary-pi', home)).toBe(
'/home/test/.config/mosaic/fleet/run/canary-pi.hb',
);
});
});
describe('fleet ps — heartbeat parsing', () => {
const NOW = 1_700_000_000_000; // fixed epoch ms for deterministic tests
it('parses a healthy heartbeat file', () => {
const ts = new Date(NOW - 10_000).toISOString(); // 10s ago — within 3×15s = 45s
const content = `ts=${ts}\npid=12345\nstatus=ok\n`;
const hb = parseHeartbeat(content, NOW);
expect(hb.health).toBe('healthy');
expect(hb.pid).toBe(12345);
expect(hb.status).toBe('ok');
expect(hb.ageMs).toBe(10_000);
});
it('reports stale when heartbeat is older than 3×interval', () => {
const ts = new Date(NOW - 60_000).toISOString(); // 60s ago > 45s threshold
const content = `ts=${ts}\npid=99\nstatus=busy\n`;
const hb = parseHeartbeat(content, NOW);
expect(hb.health).toBe('stale');
expect(hb.status).toBe('busy');
});
it('reports unknown when heartbeat file is missing (null input)', () => {
const hb = parseHeartbeat(null, NOW);
expect(hb.health).toBe('unknown');
expect(hb.ts).toBeNull();
expect(hb.pid).toBeNull();
expect(hb.ageMs).toBeNull();
});
it('tolerates missing fields in heartbeat file', () => {
const hb = parseHeartbeat('ts=not-a-date\n', NOW);
expect(hb.health).toBe('unknown');
expect(hb.ts).toBeNull();
});
});
describe('fleet ps — systemd show parsing', () => {
it('parses ActiveState, SubState, UnitFileState from systemctl show output', () => {
const output = 'ActiveState=active\nSubState=running\nUnitFileState=enabled\n';
expect(parseSystemdShow(output)).toEqual({
ActiveState: 'active',
SubState: 'running',
UnitFileState: 'enabled',
});
});
it('defaults missing keys to "unknown"', () => {
const result = parseSystemdShow('ActiveState=inactive\n');
expect(result.SubState).toBe('unknown');
expect(result.UnitFileState).toBe('unknown');
});
});
describe('fleet ps — tmux list-panes parsing', () => {
const NOW_MS = 1_700_000_000_000;
it('parses alive pane with pid, command, and idle time', () => {
const activityEpoch = Math.floor((NOW_MS - 30_000) / 1000); // 30s ago
const output = `12345 claude 0 ${activityEpoch}\n`;
const result = parseTmuxListPanes(output, NOW_MS);
expect(result.pid).toBe(12345);
expect(result.command).toBe('claude');
expect(result.dead).toBe(false);
expect(result.idleSeconds).toBe(30);
});
it('reports dead pane when pane_dead=1', () => {
const output = `0 bash 1 0\n`;
const result = parseTmuxListPanes(output, NOW_MS);
expect(result.dead).toBe(true);
});
it('returns nulls for empty pane output', () => {
const result = parseTmuxListPanes('', NOW_MS);
expect(result.pid).toBeNull();
expect(result.command).toBeNull();
expect(result.dead).toBe(true);
expect(result.idleSeconds).toBeNull();
});
});
describe('fleet ps — drift detection', () => {
it('flags drift when roster says pi but pane runs python3', () => {
expect(detectDrift('pi', 'python3')).toBe(true);
});
it('flags drift when roster says claude but pane runs dogfood-agent.py', () => {
expect(detectDrift('claude', 'dogfood-agent.py')).toBe(true);
});
it('does NOT flag drift when pane command matches the roster runtime', () => {
expect(detectDrift('claude', 'claude')).toBe(false);
expect(detectDrift('codex', 'codex')).toBe(false);
expect(detectDrift('pi', 'pi')).toBe(false);
expect(detectDrift('opencode', 'opencode')).toBe(false);
});
it('does NOT flag drift for unknown/custom runtimes (no canonical mapping)', () => {
expect(detectDrift('custom-runtime', 'anything')).toBe(false);
});
it('does NOT flag drift when pane command is null (pane dead)', () => {
expect(detectDrift('pi', null)).toBe(false);
});
});
describe('fleet ps — tenant and host', () => {
it('returns tenant_id and host as non-empty strings', () => {
const { tenant_id, host } = getDefaultTenantAndHost();
expect(typeof tenant_id).toBe('string');
expect(tenant_id.length).toBeGreaterThan(0);
expect(typeof host).toBe('string');
expect(host.length).toBeGreaterThan(0);
});
});
describe('fleet ps — JSON output shape (FR-6)', () => {
it('produces --json records including tenant_id and host for each agent', async () => {
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
const rosterPath = join(home, 'fleet', 'roster.yaml');
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
rosterPath,
[
'version: 1',
'transport: tmux',
'agents:',
' - name: canary-pi',
' runtime: pi',
' class: canary',
].join('\n'),
);
const nowMs = Date.now();
const activityEpoch = Math.floor((nowMs - 20_000) / 1000);
const runner: CommandRunner = async (command, args) => {
const fullArgs = [command, ...args].join(' ');
if (fullArgs.includes('systemctl') && fullArgs.includes('show')) {
return {
stdout: 'ActiveState=active\nSubState=running\nUnitFileState=disabled\n',
stderr: '',
exitCode: 0,
};
}
if (fullArgs.includes('list-panes')) {
return {
stdout: `12345 python3 0 ${activityEpoch}\n`,
stderr: '',
exitCode: 0,
};
}
return { stdout: '', stderr: '', exitCode: 0 };
};
const lines: string[] = [];
const origLog = console.log;
console.log = (msg: string) => {
lines.push(msg);
};
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'fleet', 'ps', '--json']);
} finally {
console.log = origLog;
await rm(home, { recursive: true, force: true });
}
const json = JSON.parse(lines.join('')) as AgentPsRow[];
expect(Array.isArray(json)).toBe(true);
expect(json).toHaveLength(1);
const row = json[0]!;
// FR-6: tenant_id and host must be present
expect(typeof row.tenant_id).toBe('string');
expect(row.tenant_id.length).toBeGreaterThan(0);
expect(typeof row.host).toBe('string');
expect(row.host.length).toBeGreaterThan(0);
// drift: roster says pi, pane runs python3 → drift flag
expect(row.driftFlag).toBe(true);
// boot-enable warning: active + disabled
expect(row.bootEnableWarning).toBe(true);
// heartbeat missing → unknown
expect(row.heartbeat.health).toBe('unknown');
expect(row.name).toBe('canary-pi');
expect(row.runtime).toBe('pi');
expect(row.systemdActive).toBe('active');
expect(row.systemdEnabled).toBe('disabled');
});
});
describe('fleet ps — command sequences issued', () => {
it('issues systemd show + tmux list-panes per agent', async () => {
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
const rosterPath = join(home, 'fleet', 'roster.yaml');
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
rosterPath,
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
return {
stdout: 'ActiveState=inactive\nSubState=dead\nUnitFileState=enabled\n',
stderr: '',
exitCode: 0,
};
};
// suppress console.log for table output
const origLog = console.log;
console.log = () => {};
const program = new Command();
program.exitOverride();
registerFleetCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'fleet', 'ps']);
expect(calls).toEqual([
buildSystemdShowCommand('coder0'),
buildTmuxListPanesCommand('coder0', 'mosaic-factory'),
]);
} finally {
console.log = origLog;
await rm(home, { recursive: true, force: true });
}
});
});
describe('agent watch', () => {
it('builds exact read-only tmux attach command', () => {
expect(buildAgentWatchCommand('canary-pi', 'mosaic-factory')).toEqual([
'tmux',
'-L',
'mosaic-factory',
'attach',
'-r',
'-t',
'=canary-pi',
]);
});
it('uses DEFAULT_SOCKET_NAME when socket is omitted', () => {
const cmd = buildAgentWatchCommand('canary-pi');
expect(cmd[2]).toBe('mosaic-factory');
expect(cmd).toContain('-r');
});
it('issues the read-only attach command through the injected runner', async () => {
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
join(home, 'fleet', 'roster.yaml'),
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
return { stdout: '', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerAgentCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'agent', 'watch', 'coder0']);
expect(calls).toEqual([['tmux', '-L', 'mosaic-factory', 'attach', '-r', '-t', '=coder0']]);
} finally {
await rm(home, { recursive: true, force: true });
}
});
it('rejects watch for agents not in the roster', async () => {
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
join(home, 'fleet', 'roster.yaml'),
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const runner = vi.fn<CommandRunner>(async () => ({ stdout: '', stderr: '', exitCode: 0 }));
const program = new Command();
program.exitOverride();
registerAgentCommand(program, { runner, mosaicHome: home });
try {
await expect(
program.parseAsync(['node', 'mosaic', 'agent', 'watch', 'typo']),
).rejects.toThrow('Agent "typo" is not in the fleet roster');
expect(runner).not.toHaveBeenCalled();
} finally {
await rm(home, { recursive: true, force: true });
}
});
});
describe('agent send --verify', () => {
it('builds exact verify capture-pane command', () => {
expect(buildAgentVerifyAcceptedCommand('canary-pi', 'mosaic-factory', 5)).toEqual([
'tmux',
'-L',
'mosaic-factory',
'capture-pane',
'-t',
'=canary-pi:0.0',
'-p',
'-S',
'-5',
]);
});
it('isSendAccepted: returns true for normal response output', () => {
expect(isSendAccepted('Some response text\nAnother line\n')).toBe(true);
});
it('isSendAccepted: returns false when last line starts with "> " (draft pattern)', () => {
expect(isSendAccepted('> my unsent message')).toBe(false);
});
it('isSendAccepted: returns true for blank pane (treated as submitted)', () => {
expect(isSendAccepted('')).toBe(true);
expect(isSendAccepted(' \n \n')).toBe(true);
});
it('issues send then verify capture via injected runner when --verify is passed', async () => {
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
join(home, 'fleet', 'roster.yaml'),
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
// For agent-send.sh: success; for capture-pane: return accepted output
return { stdout: 'Response from agent\n', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerAgentCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync([
'node',
'mosaic',
'agent',
'send',
'coder0',
'--message',
'hello world',
'--verify',
]);
// First call should be agent-send.sh, second call should be capture-pane for verify
expect(calls).toHaveLength(2);
expect(calls[0]![0]).toContain('agent-send.sh');
const captureCall = calls[1]!;
expect(captureCall).toEqual(buildAgentVerifyAcceptedCommand('coder0', 'mosaic-factory', 5));
} finally {
await rm(home, { recursive: true, force: true });
}
}, 10_000);
it('does NOT issue capture-pane verify when --verify is not passed', async () => {
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
join(home, 'fleet', 'roster.yaml'),
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const calls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
return { stdout: '', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerAgentCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync([
'node',
'mosaic',
'agent',
'send',
'coder0',
'--message',
'hello world',
]);
// Only 1 call: agent-send.sh (no capture-pane)
expect(calls).toHaveLength(1);
expect(calls[0]![0]).toContain('agent-send.sh');
} finally {
await rm(home, { recursive: true, force: true });
}
});
});

View File

@@ -1,6 +1,6 @@
import { constants } from 'node:fs';
import { access, chmod, copyFile, mkdir, readFile, writeFile } from 'node:fs/promises';
import { homedir, hostname } from 'node:os';
import { homedir, hostname, userInfo } from 'node:os';
import { dirname, join, resolve } from 'node:path';
import { fileURLToPath } from 'node:url';
import { spawn } from 'node:child_process';
@@ -236,6 +236,297 @@ export function buildAgentTailCommand(
];
}
// ---------------------------------------------------------------------------
// Fleet ps — phase 2 observability helpers
// ---------------------------------------------------------------------------
export const HEARTBEAT_INTERVAL_MS = 15_000;
export const HEARTBEAT_HEALTHY_MULTIPLIER = 3;
export interface HeartbeatInfo {
ts: Date | null;
pid: number | null;
status: 'ok' | 'busy' | null;
/** healthy | stale | unknown */
health: 'healthy' | 'stale' | 'unknown';
ageMs: number | null;
}
export interface AgentPsRow {
name: string;
tenant_id: string;
host: string;
runtime: string;
systemdActive: string;
systemdEnabled: string;
paneAlive: boolean;
panePid: number | null;
paneCommand: string | null;
idleSeconds: number | null;
heartbeat: HeartbeatInfo;
/** roster runtime !== actual pane command */
driftFlag: boolean;
/** active but UnitFileState=disabled */
bootEnableWarning: boolean;
}
/**
* Returns the systemd show command for an agent unit (active+enabled state).
* Returns: `systemctl --user show <unit> -p ActiveState -p SubState -p UnitFileState`
*/
export function buildSystemdShowCommand(agentName: string): string[] {
const unit = `mosaic-agent@${agentName}.service`;
return [
'systemctl',
'--user',
'show',
unit,
'-p',
'ActiveState',
'-p',
'SubState',
'-p',
'UnitFileState',
];
}
/**
* Returns the systemd is-active command for an agent unit.
*/
export function buildSystemdIsActiveCommand(agentName: string): string[] {
const unit = `mosaic-agent@${agentName}.service`;
return ['systemctl', '--user', 'is-active', unit];
}
/**
* Returns the tmux list-panes command for an agent pane.
* Format: `#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}`
*/
export function buildTmuxListPanesCommand(
agentName: string,
socketName = DEFAULT_SOCKET_NAME,
): string[] {
return [
'tmux',
'-L',
socketName,
'list-panes',
'-t',
`=${agentName}:0.0`,
'-F',
'#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}',
];
}
/**
* Returns the heartbeat file path for an agent.
*/
export function heartbeatPath(agentName: string, mosaicHome = defaultMosaicHome()): string {
return join(mosaicHome, 'fleet', 'run', `${agentName}.hb`);
}
/**
* Parse a heartbeat file's contents into a HeartbeatInfo.
* File format (one key=value per line):
* ts=<iso8601>
* pid=<pid>
* status=<ok|busy>
*/
export function parseHeartbeat(content: string | null, nowMs = Date.now()): HeartbeatInfo {
if (content === null) {
return { ts: null, pid: null, status: null, health: 'unknown', ageMs: null };
}
const lines = content.split('\n');
let ts: Date | null = null;
let pid: number | null = null;
let status: 'ok' | 'busy' | null = null;
for (const line of lines) {
const [key, ...rest] = line.split('=');
const val = rest.join('=').trim();
if (key === 'ts' && val) {
const d = new Date(val);
if (!Number.isNaN(d.getTime())) ts = d;
} else if (key === 'pid' && val) {
const n = Number.parseInt(val, 10);
if (Number.isFinite(n)) pid = n;
} else if (key === 'status' && (val === 'ok' || val === 'busy')) {
status = val;
}
}
const thresholdMs = HEARTBEAT_INTERVAL_MS * HEARTBEAT_HEALTHY_MULTIPLIER;
let health: 'healthy' | 'stale' | 'unknown' = 'unknown';
let ageMs: number | null = null;
if (ts !== null) {
ageMs = nowMs - ts.getTime();
health = ageMs <= thresholdMs ? 'healthy' : 'stale';
}
return { ts, pid, status, health, ageMs };
}
/**
* Parse the output of `systemctl --user show ... -p ActiveState -p SubState -p UnitFileState`
* Returns an object with the three properties.
*/
export function parseSystemdShow(output: string): {
ActiveState: string;
SubState: string;
UnitFileState: string;
} {
const result: Record<string, string> = {};
for (const line of output.split('\n')) {
const eq = line.indexOf('=');
if (eq !== -1) {
result[line.slice(0, eq)] = line.slice(eq + 1).trim();
}
}
return {
ActiveState: result['ActiveState'] ?? 'unknown',
SubState: result['SubState'] ?? 'unknown',
UnitFileState: result['UnitFileState'] ?? 'unknown',
};
}
/**
* Parse the output of `tmux list-panes -F '#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}'`
* pane_activity is a Unix epoch timestamp (seconds).
*/
export function parseTmuxListPanes(
output: string,
nowMs = Date.now(),
): { pid: number | null; command: string | null; dead: boolean; idleSeconds: number | null } {
const line = output.trim().split('\n')[0];
if (!line) {
return { pid: null, command: null, dead: true, idleSeconds: null };
}
// format: <pid> <command> <dead(0|1)> <activity_epoch>
const parts = line.split(' ');
const pid = parts[0] ? (Number.isFinite(Number(parts[0])) ? Number(parts[0]) : null) : null;
const command = parts[1] ?? null;
const dead = parts[2] === '1';
const activityEpoch = parts[3] ? Number(parts[3]) : NaN;
const idleSeconds =
Number.isFinite(activityEpoch) && activityEpoch > 0
? Math.floor((nowMs - activityEpoch * 1000) / 1000)
: null;
return { pid, command, dead, idleSeconds };
}
/**
* Determine if there is a runtime drift: roster says one runtime but the pane
* is actually running something from a different runtime. We detect this by
* checking if the pane command doesn't match a known canonical command for the
* roster's declared runtime.
*
* Known canonical commands per runtime:
* claude → claude
* codex → codex
* opencode → opencode
* pi → pi
*
* If the pane is running something else (e.g., python3/dogfood-agent.py) for
* an agent whose roster runtime is "pi", that's a drift.
*/
export function detectDrift(rosterRuntime: string, paneCommand: string | null): boolean {
if (!paneCommand) return false;
const knownCommands: Record<string, string[]> = {
claude: ['claude'],
codex: ['codex'],
opencode: ['opencode'],
pi: ['pi'],
};
const expected = knownCommands[rosterRuntime];
if (!expected) return false;
return !expected.includes(paneCommand);
}
/**
* Returns the default tenant_id (OS username) and host (short hostname).
* These MUST appear in every --json record for multi-tenant/multi-host zero-foreclosure.
*/
export function getDefaultTenantAndHost(): { tenant_id: string; host: string } {
let tenant_id: string;
try {
tenant_id = userInfo().username;
} catch {
tenant_id = process.env['USER'] ?? process.env['LOGNAME'] ?? 'unknown';
}
const host = hostname().split('.')[0] || 'localhost';
return { tenant_id, host };
}
/**
* Builds the `agent watch` command: read-only tmux attach.
* Uses `-r` flag to prevent keystrokes and `=<name>` exact-match session target.
*/
export function buildAgentWatchCommand(
agentName: string,
socketName = DEFAULT_SOCKET_NAME,
): string[] {
return ['tmux', '-L', socketName, 'attach', '-r', '-t', `=${agentName}`];
}
/**
* Builds the capture-pane command used to verify that agent send was accepted
* (not left as an unsubmitted draft). Captures the last N lines and checks for
* the draft heuristic.
*/
export function buildAgentVerifyAcceptedCommand(
agentName: string,
socketName = DEFAULT_SOCKET_NAME,
lines = 5,
): string[] {
return [
'tmux',
'-L',
socketName,
'capture-pane',
'-t',
`=${agentName}:0.0`,
'-p',
'-S',
`-${lines}`,
];
}
/**
* Check whether a send was accepted (not left as draft).
* A message is considered NOT accepted (draft) if the captured pane output
* still shows the message text at the bottom prompt without a newline/submission.
* We look for the common TUI pattern: the text appears at the last line but
* hasn't been cleared (which would happen after submission).
*
* Heuristic: if pane capture is non-empty and does NOT contain a leading `>`
* or prompt indicator on the LAST non-empty line, the send is considered accepted.
* This mirrors the send-message.sh draft check: if the last line looks like an
* unsubmitted input line, it's a draft.
*
* Returns true if accepted (submitted), false if still a draft/unverifiable.
*/
export function isSendAccepted(capturedOutput: string): boolean {
const lines = capturedOutput.split('\n').filter((l) => l.trim().length > 0);
if (lines.length === 0) return true; // blank pane — treat as submitted
const lastLine = lines[lines.length - 1]!;
// Heuristic: if last non-empty line is a bare user-typed draft line with no
// AI response yet (starts with or contains typical draft markers), flag as draft.
// Typical draft patterns: line ends with the sent message text with no ">" prefix,
// or the line is identical to a prompt that hasn't been cleared.
// We use a conservative heuristic: a line that is ONLY whitespace/prompt characters
// with no response indicator is suspicious, but since we can't reliably detect
// every TUI's draft state, we check for a specific pattern:
// if last line has trailing `█` (cursor block) or is blank after stripping ANSI,
// treat as submitted. Otherwise if we see the exact sent text repeated, it's draft.
// For robustness, we accept as submitted unless we see clear draft evidence.
// Real implementation: check if last meaningful line is just the input prompt.
const stripped = lastLine.replace(/\x1b\[[0-9;]*m/g, '').trim();
// If the pane shows a line that looks like a pending input (ends with cursor or is empty),
// that means the message was submitted and the pane is waiting for response.
// A draft line typically looks like: "> <message text>" without a response.
// For simplicity: if stripped last line starts with "> " — that's a common draft pattern
// in pi/claude TUIs for showing user input before submission.
if (/^>\s/.test(stripped)) return false;
return true;
}
export function registerFleetCommand(program: Command, deps: FleetCommandDeps = {}): Command {
const runner = deps.runner ?? runCommand;
const paths = resolveFleetPaths(deps.mosaicHome);
@@ -360,6 +651,113 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
console.log(`Verified fleet on tmux socket ${socketName}.`);
});
cmd
.command('ps')
.description('Show real-time status for all roster agents (systemd + tmux + heartbeat)')
.option('--json', 'Print JSON array')
.action(async (opts: { json?: boolean }) => {
const commandOpts = cmd.opts<{ mosaicHome: string; roster?: string }>();
const activePaths = resolveFleetPaths(commandOpts.mosaicHome);
const roster = await loadRosterForCommand(cmd);
const { tenant_id, host } = getDefaultTenantAndHost();
const nowMs = Date.now();
const rows: AgentPsRow[] = [];
for (const agent of roster.agents) {
// systemd show
const showResult = await runner(...splitCommand(buildSystemdShowCommand(agent.name)));
const sysInfo = parseSystemdShow(showResult.stdout);
// tmux list-panes
const panesResult = await runner(
...splitCommand(buildTmuxListPanesCommand(agent.name, roster.tmux.socketName)),
);
const paneInfo = parseTmuxListPanes(panesResult.stdout, nowMs);
// heartbeat
const hbFile = heartbeatPath(agent.name, activePaths.mosaicHome);
let hbContent: string | null = null;
try {
hbContent = await readFile(hbFile, 'utf8');
} catch {
hbContent = null;
}
const hb = parseHeartbeat(hbContent, nowMs);
// drift and boot-enable
const driftFlag = detectDrift(agent.runtime, paneInfo.command);
const bootEnableWarning =
sysInfo.ActiveState === 'active' && sysInfo.UnitFileState === 'disabled';
rows.push({
name: agent.name,
tenant_id,
host,
runtime: agent.runtime,
systemdActive: sysInfo.ActiveState,
systemdEnabled: sysInfo.UnitFileState,
paneAlive: !paneInfo.dead,
panePid: paneInfo.pid,
paneCommand: paneInfo.command,
idleSeconds: paneInfo.idleSeconds,
heartbeat: hb,
driftFlag,
bootEnableWarning,
});
}
if (opts.json) {
console.log(JSON.stringify(rows, null, 2));
return;
}
// Table output
const header = [
'NAME'.padEnd(18),
'TENANT'.padEnd(12),
'HOST'.padEnd(12),
'RUNTIME'.padEnd(10),
'SYSTEMD'.padEnd(16),
'PANE'.padEnd(8),
'PID'.padEnd(8),
'IDLE'.padEnd(8),
'HB'.padEnd(12),
'FLAGS',
].join(' ');
console.log(header);
console.log('-'.repeat(header.length));
for (const row of rows) {
const systemd = `${row.systemdActive}/${row.systemdEnabled}`;
const pane = row.paneAlive ? 'alive' : 'dead';
const pid = row.panePid !== null ? String(row.panePid) : '-';
const idle = row.idleSeconds !== null ? `${row.idleSeconds}s` : '-';
const hbAge =
row.heartbeat.ageMs !== null
? `${Math.round(row.heartbeat.ageMs / 1000)}s/${row.heartbeat.health}`
: `unknown`;
const flags: string[] = [];
if (row.driftFlag) flags.push('DRIFT');
if (row.bootEnableWarning) flags.push('BOOT-ENABLE');
console.log(
[
row.name.padEnd(18),
row.tenant_id.padEnd(12),
row.host.padEnd(12),
row.runtime.padEnd(10),
systemd.padEnd(16),
pane.padEnd(8),
pid.padEnd(8),
idle.padEnd(8),
hbAge.padEnd(12),
flags.join(','),
].join(' '),
);
}
});
return cmd;
}
@@ -417,8 +815,15 @@ export function registerFleetAgentCommands(
.requiredOption('--message <text>', 'Message text')
.option('--source-label <label>', 'Source label for the message preamble')
.option('--source <label>', 'Alias for --source-label')
.option(
'--verify',
'Verify message was accepted (not left as a draft); exit non-zero if unverifiable',
)
.action(
async (agent: string, opts: { message: string; sourceLabel?: string; source?: string }) => {
async (
agent: string,
opts: { message: string; sourceLabel?: string; source?: string; verify?: boolean },
) => {
const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome);
getRosterAgent(roster, agent);
const paths = resolveFleetPaths(
@@ -429,9 +834,40 @@ export function registerFleetAgentCommands(
runner,
buildAgentSendCommand(paths, agent, opts.message, roster.tmux.socketName, sourceLabel),
);
if (opts.verify) {
// Brief pause to allow the TUI to process the send before capturing
await new Promise<void>((res) => setTimeout(res, 300));
const captureResult = await runner(
...splitCommand(buildAgentVerifyAcceptedCommand(agent, roster.tmux.socketName)),
);
if (captureResult.exitCode !== 0) {
throw new Error(
`send --verify: could not capture pane output to verify acceptance (tmux exited ${captureResult.exitCode}).`,
);
}
const accepted = isSendAccepted(captureResult.stdout);
if (!accepted) {
process.exitCode = 1;
process.stderr.write(
`send --verify: message appears to be left as an unsubmitted draft in agent "${agent}".\n`,
);
}
}
},
);
agentCommand
.command('watch <agent>')
.description('Open a read-only view of a fleet agent tmux session (cannot send keystrokes)')
.action(async (agent: string) => {
const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome);
getRosterAgent(roster, agent);
const result = await runner(
...splitCommand(buildAgentWatchCommand(agent, roster.tmux.socketName)),
);
writeCommandOutput(result);
});
agentCommand
.command('reset <agent>')
.description('Reset a local fleet agent by sending the runtime reset command')