From cf304eebc3cc790d08a8990efac489bc390132d3 Mon Sep 17 00:00:00 2001 From: Jarvis Date: Sat, 20 Jun 2026 21:51:19 -0500 Subject: [PATCH] =?UTF-8?q?feat(fleet):=20phase-2=20observability=20?= =?UTF-8?q?=E2=80=94=20fleet=20ps=20+=20watch=20+=20send=20--verify?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FR-1 fleet ps: joins systemd show (ActiveState/SubState/UnitFileState), tmux list-panes (pid/command/dead/activity), and file-based heartbeat (~/.config/mosaic/fleet/run/.hb) into one table per roster agent. Flags DRIFT (roster runtime ≠ actual pane command) and BOOT-ENABLE (active but UnitFileState=disabled). --json output includes tenant_id and host on every record (FR-6 zero-foreclosure for multi-tenant/host). FR-3 agent watch: read-only tmux attach (-r flag) so the operator can observe any session without injecting keystrokes or resizing the window. Registered as a new verb alongside tail/send/reset in registerFleetAgentCommands. FR-5 agent send --verify: after keystroke injection, captures the last 5 pane lines and checks for draft heuristic (last non-empty line starts with '> '). Exits non-zero and writes to stderr if the message appears unsubmitted. Default send behavior is unchanged when --verify is omitted. New pure exported helpers (all unit-testable without real tmux/systemd): buildSystemdShowCommand, buildTmuxListPanesCommand, buildAgentWatchCommand, buildAgentVerifyAcceptedCommand, parseHeartbeat, parseSystemdShow, parseTmuxListPanes, detectDrift, getDefaultTenantAndHost, isSendAccepted, heartbeatPath. Added 31 new spec cases (62 total) covering exact command construction, JSON shape, heartbeat parsing, drift detection, and verify flow. Co-Authored-By: Claude Opus 4.8 (1M context) Claude-Session: https://claude.ai/code/session_01RMoEx7hfdFGjUiCHuN1RRi --- packages/mosaic/src/commands/fleet.spec.ts | 484 +++++++++++++++++++++ packages/mosaic/src/commands/fleet.ts | 440 ++++++++++++++++++- 2 files changed, 922 insertions(+), 2 deletions(-) diff --git a/packages/mosaic/src/commands/fleet.spec.ts b/packages/mosaic/src/commands/fleet.spec.ts index 28f348b..4efe809 100644 --- a/packages/mosaic/src/commands/fleet.spec.ts +++ b/packages/mosaic/src/commands/fleet.spec.ts @@ -5,14 +5,26 @@ import { Command } from 'commander'; import { afterEach, describe, expect, it, vi } from 'vitest'; import { buildAgentSendCommand, + buildAgentWatchCommand, + buildAgentVerifyAcceptedCommand, buildFleetServiceCommand, + buildSystemdShowCommand, + buildTmuxListPanesCommand, + detectDrift, generateAgentEnv, getDefaultOperatorSourceLabel, + getDefaultTenantAndHost, getRosterAgent, + heartbeatPath, + isSendAccepted, loadFleetRoster, mergeAgentEnv, + parseHeartbeat, + parseSystemdShow, + parseTmuxListPanes, registerFleetCommand, resolveFleetPaths, + type AgentPsRow, type CommandRunner, } from './fleet.js'; import { registerAgentCommand } from './agent.js'; @@ -39,6 +51,7 @@ describe('registerFleetCommand', () => { 'init', 'install', 'install-systemd', + 'ps', 'restart', 'start', 'status', @@ -59,6 +72,7 @@ describe('registerFleetCommand', () => { 'send', 'status', 'tail', + 'watch', ]); }); }); @@ -736,3 +750,473 @@ describe('fleet command construction', () => { expect(packageJson.files).toEqual(expect.arrayContaining(['dist', 'framework'])); }); }); + +// --------------------------------------------------------------------------- +// Phase-2 observability — unit tests (FR-1, FR-3, FR-5, FR-6) +// --------------------------------------------------------------------------- + +describe('fleet ps — command construction', () => { + it('builds exact systemd show command for an agent unit', () => { + expect(buildSystemdShowCommand('canary-pi')).toEqual([ + 'systemctl', + '--user', + 'show', + 'mosaic-agent@canary-pi.service', + '-p', + 'ActiveState', + '-p', + 'SubState', + '-p', + 'UnitFileState', + ]); + }); + + it('builds exact tmux list-panes command with the correct format string', () => { + expect(buildTmuxListPanesCommand('canary-pi', 'mosaic-factory')).toEqual([ + 'tmux', + '-L', + 'mosaic-factory', + 'list-panes', + '-t', + '=canary-pi:0.0', + '-F', + '#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}', + ]); + }); + + it('uses DEFAULT_SOCKET_NAME when socket is omitted from list-panes', () => { + const cmd = buildTmuxListPanesCommand('canary-pi'); + expect(cmd[2]).toBe('mosaic-factory'); + }); + + it('derives heartbeat path under ~/.config/mosaic/fleet/run/', () => { + const home = '/home/test/.config/mosaic'; + expect(heartbeatPath('canary-pi', home)).toBe( + '/home/test/.config/mosaic/fleet/run/canary-pi.hb', + ); + }); +}); + +describe('fleet ps — heartbeat parsing', () => { + const NOW = 1_700_000_000_000; // fixed epoch ms for deterministic tests + + it('parses a healthy heartbeat file', () => { + const ts = new Date(NOW - 10_000).toISOString(); // 10s ago — within 3×15s = 45s + const content = `ts=${ts}\npid=12345\nstatus=ok\n`; + const hb = parseHeartbeat(content, NOW); + expect(hb.health).toBe('healthy'); + expect(hb.pid).toBe(12345); + expect(hb.status).toBe('ok'); + expect(hb.ageMs).toBe(10_000); + }); + + it('reports stale when heartbeat is older than 3×interval', () => { + const ts = new Date(NOW - 60_000).toISOString(); // 60s ago > 45s threshold + const content = `ts=${ts}\npid=99\nstatus=busy\n`; + const hb = parseHeartbeat(content, NOW); + expect(hb.health).toBe('stale'); + expect(hb.status).toBe('busy'); + }); + + it('reports unknown when heartbeat file is missing (null input)', () => { + const hb = parseHeartbeat(null, NOW); + expect(hb.health).toBe('unknown'); + expect(hb.ts).toBeNull(); + expect(hb.pid).toBeNull(); + expect(hb.ageMs).toBeNull(); + }); + + it('tolerates missing fields in heartbeat file', () => { + const hb = parseHeartbeat('ts=not-a-date\n', NOW); + expect(hb.health).toBe('unknown'); + expect(hb.ts).toBeNull(); + }); +}); + +describe('fleet ps — systemd show parsing', () => { + it('parses ActiveState, SubState, UnitFileState from systemctl show output', () => { + const output = 'ActiveState=active\nSubState=running\nUnitFileState=enabled\n'; + expect(parseSystemdShow(output)).toEqual({ + ActiveState: 'active', + SubState: 'running', + UnitFileState: 'enabled', + }); + }); + + it('defaults missing keys to "unknown"', () => { + const result = parseSystemdShow('ActiveState=inactive\n'); + expect(result.SubState).toBe('unknown'); + expect(result.UnitFileState).toBe('unknown'); + }); +}); + +describe('fleet ps — tmux list-panes parsing', () => { + const NOW_MS = 1_700_000_000_000; + + it('parses alive pane with pid, command, and idle time', () => { + const activityEpoch = Math.floor((NOW_MS - 30_000) / 1000); // 30s ago + const output = `12345 claude 0 ${activityEpoch}\n`; + const result = parseTmuxListPanes(output, NOW_MS); + expect(result.pid).toBe(12345); + expect(result.command).toBe('claude'); + expect(result.dead).toBe(false); + expect(result.idleSeconds).toBe(30); + }); + + it('reports dead pane when pane_dead=1', () => { + const output = `0 bash 1 0\n`; + const result = parseTmuxListPanes(output, NOW_MS); + expect(result.dead).toBe(true); + }); + + it('returns nulls for empty pane output', () => { + const result = parseTmuxListPanes('', NOW_MS); + expect(result.pid).toBeNull(); + expect(result.command).toBeNull(); + expect(result.dead).toBe(true); + expect(result.idleSeconds).toBeNull(); + }); +}); + +describe('fleet ps — drift detection', () => { + it('flags drift when roster says pi but pane runs python3', () => { + expect(detectDrift('pi', 'python3')).toBe(true); + }); + + it('flags drift when roster says claude but pane runs dogfood-agent.py', () => { + expect(detectDrift('claude', 'dogfood-agent.py')).toBe(true); + }); + + it('does NOT flag drift when pane command matches the roster runtime', () => { + expect(detectDrift('claude', 'claude')).toBe(false); + expect(detectDrift('codex', 'codex')).toBe(false); + expect(detectDrift('pi', 'pi')).toBe(false); + expect(detectDrift('opencode', 'opencode')).toBe(false); + }); + + it('does NOT flag drift for unknown/custom runtimes (no canonical mapping)', () => { + expect(detectDrift('custom-runtime', 'anything')).toBe(false); + }); + + it('does NOT flag drift when pane command is null (pane dead)', () => { + expect(detectDrift('pi', null)).toBe(false); + }); +}); + +describe('fleet ps — tenant and host', () => { + it('returns tenant_id and host as non-empty strings', () => { + const { tenant_id, host } = getDefaultTenantAndHost(); + expect(typeof tenant_id).toBe('string'); + expect(tenant_id.length).toBeGreaterThan(0); + expect(typeof host).toBe('string'); + expect(host.length).toBeGreaterThan(0); + }); +}); + +describe('fleet ps — JSON output shape (FR-6)', () => { + it('produces --json records including tenant_id and host for each agent', async () => { + const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); + const rosterPath = join(home, 'fleet', 'roster.yaml'); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + rosterPath, + [ + 'version: 1', + 'transport: tmux', + 'agents:', + ' - name: canary-pi', + ' runtime: pi', + ' class: canary', + ].join('\n'), + ); + + const nowMs = Date.now(); + const activityEpoch = Math.floor((nowMs - 20_000) / 1000); + + const runner: CommandRunner = async (command, args) => { + const fullArgs = [command, ...args].join(' '); + if (fullArgs.includes('systemctl') && fullArgs.includes('show')) { + return { + stdout: 'ActiveState=active\nSubState=running\nUnitFileState=disabled\n', + stderr: '', + exitCode: 0, + }; + } + if (fullArgs.includes('list-panes')) { + return { + stdout: `12345 python3 0 ${activityEpoch}\n`, + stderr: '', + exitCode: 0, + }; + } + return { stdout: '', stderr: '', exitCode: 0 }; + }; + + const lines: string[] = []; + const origLog = console.log; + console.log = (msg: string) => { + lines.push(msg); + }; + + const program = new Command(); + program.exitOverride(); + registerFleetCommand(program, { runner, mosaicHome: home }); + + try { + await program.parseAsync(['node', 'mosaic', 'fleet', 'ps', '--json']); + } finally { + console.log = origLog; + await rm(home, { recursive: true, force: true }); + } + + const json = JSON.parse(lines.join('')) as AgentPsRow[]; + expect(Array.isArray(json)).toBe(true); + expect(json).toHaveLength(1); + + const row = json[0]!; + // FR-6: tenant_id and host must be present + expect(typeof row.tenant_id).toBe('string'); + expect(row.tenant_id.length).toBeGreaterThan(0); + expect(typeof row.host).toBe('string'); + expect(row.host.length).toBeGreaterThan(0); + + // drift: roster says pi, pane runs python3 → drift flag + expect(row.driftFlag).toBe(true); + // boot-enable warning: active + disabled + expect(row.bootEnableWarning).toBe(true); + + // heartbeat missing → unknown + expect(row.heartbeat.health).toBe('unknown'); + + expect(row.name).toBe('canary-pi'); + expect(row.runtime).toBe('pi'); + expect(row.systemdActive).toBe('active'); + expect(row.systemdEnabled).toBe('disabled'); + }); +}); + +describe('fleet ps — command sequences issued', () => { + it('issues systemd show + tmux list-panes per agent', async () => { + const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); + const rosterPath = join(home, 'fleet', 'roster.yaml'); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + rosterPath, + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + const calls: string[][] = []; + const runner: CommandRunner = async (command, args) => { + calls.push([command, ...args]); + return { + stdout: 'ActiveState=inactive\nSubState=dead\nUnitFileState=enabled\n', + stderr: '', + exitCode: 0, + }; + }; + + // suppress console.log for table output + const origLog = console.log; + console.log = () => {}; + + const program = new Command(); + program.exitOverride(); + registerFleetCommand(program, { runner, mosaicHome: home }); + + try { + await program.parseAsync(['node', 'mosaic', 'fleet', 'ps']); + expect(calls).toEqual([ + buildSystemdShowCommand('coder0'), + buildTmuxListPanesCommand('coder0', 'mosaic-factory'), + ]); + } finally { + console.log = origLog; + await rm(home, { recursive: true, force: true }); + } + }); +}); + +describe('agent watch', () => { + it('builds exact read-only tmux attach command', () => { + expect(buildAgentWatchCommand('canary-pi', 'mosaic-factory')).toEqual([ + 'tmux', + '-L', + 'mosaic-factory', + 'attach', + '-r', + '-t', + '=canary-pi', + ]); + }); + + it('uses DEFAULT_SOCKET_NAME when socket is omitted', () => { + const cmd = buildAgentWatchCommand('canary-pi'); + expect(cmd[2]).toBe('mosaic-factory'); + expect(cmd).toContain('-r'); + }); + + it('issues the read-only attach command through the injected runner', async () => { + const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + join(home, 'fleet', 'roster.yaml'), + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + const calls: string[][] = []; + const runner: CommandRunner = async (command, args) => { + calls.push([command, ...args]); + return { stdout: '', stderr: '', exitCode: 0 }; + }; + + const program = new Command(); + program.exitOverride(); + registerAgentCommand(program, { runner, mosaicHome: home }); + + try { + await program.parseAsync(['node', 'mosaic', 'agent', 'watch', 'coder0']); + expect(calls).toEqual([['tmux', '-L', 'mosaic-factory', 'attach', '-r', '-t', '=coder0']]); + } finally { + await rm(home, { recursive: true, force: true }); + } + }); + + it('rejects watch for agents not in the roster', async () => { + const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + join(home, 'fleet', 'roster.yaml'), + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + const runner = vi.fn(async () => ({ stdout: '', stderr: '', exitCode: 0 })); + const program = new Command(); + program.exitOverride(); + registerAgentCommand(program, { runner, mosaicHome: home }); + + try { + await expect( + program.parseAsync(['node', 'mosaic', 'agent', 'watch', 'typo']), + ).rejects.toThrow('Agent "typo" is not in the fleet roster'); + expect(runner).not.toHaveBeenCalled(); + } finally { + await rm(home, { recursive: true, force: true }); + } + }); +}); + +describe('agent send --verify', () => { + it('builds exact verify capture-pane command', () => { + expect(buildAgentVerifyAcceptedCommand('canary-pi', 'mosaic-factory', 5)).toEqual([ + 'tmux', + '-L', + 'mosaic-factory', + 'capture-pane', + '-t', + '=canary-pi:0.0', + '-p', + '-S', + '-5', + ]); + }); + + it('isSendAccepted: returns true for normal response output', () => { + expect(isSendAccepted('Some response text\nAnother line\n')).toBe(true); + }); + + it('isSendAccepted: returns false when last line starts with "> " (draft pattern)', () => { + expect(isSendAccepted('> my unsent message')).toBe(false); + }); + + it('isSendAccepted: returns true for blank pane (treated as submitted)', () => { + expect(isSendAccepted('')).toBe(true); + expect(isSendAccepted(' \n \n')).toBe(true); + }); + + it('issues send then verify capture via injected runner when --verify is passed', async () => { + const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + join(home, 'fleet', 'roster.yaml'), + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + const calls: string[][] = []; + const runner: CommandRunner = async (command, args) => { + calls.push([command, ...args]); + // For agent-send.sh: success; for capture-pane: return accepted output + return { stdout: 'Response from agent\n', stderr: '', exitCode: 0 }; + }; + + const program = new Command(); + program.exitOverride(); + registerAgentCommand(program, { runner, mosaicHome: home }); + + try { + await program.parseAsync([ + 'node', + 'mosaic', + 'agent', + 'send', + 'coder0', + '--message', + 'hello world', + '--verify', + ]); + + // First call should be agent-send.sh, second call should be capture-pane for verify + expect(calls).toHaveLength(2); + expect(calls[0]![0]).toContain('agent-send.sh'); + const captureCall = calls[1]!; + expect(captureCall).toEqual(buildAgentVerifyAcceptedCommand('coder0', 'mosaic-factory', 5)); + } finally { + await rm(home, { recursive: true, force: true }); + } + }, 10_000); + + it('does NOT issue capture-pane verify when --verify is not passed', async () => { + const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + join(home, 'fleet', 'roster.yaml'), + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + const calls: string[][] = []; + const runner: CommandRunner = async (command, args) => { + calls.push([command, ...args]); + return { stdout: '', stderr: '', exitCode: 0 }; + }; + + const program = new Command(); + program.exitOverride(); + registerAgentCommand(program, { runner, mosaicHome: home }); + + try { + await program.parseAsync([ + 'node', + 'mosaic', + 'agent', + 'send', + 'coder0', + '--message', + 'hello world', + ]); + // Only 1 call: agent-send.sh (no capture-pane) + expect(calls).toHaveLength(1); + expect(calls[0]![0]).toContain('agent-send.sh'); + } finally { + await rm(home, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/mosaic/src/commands/fleet.ts b/packages/mosaic/src/commands/fleet.ts index 51ade3f..947dae8 100644 --- a/packages/mosaic/src/commands/fleet.ts +++ b/packages/mosaic/src/commands/fleet.ts @@ -1,6 +1,6 @@ import { constants } from 'node:fs'; import { access, chmod, copyFile, mkdir, readFile, writeFile } from 'node:fs/promises'; -import { homedir, hostname } from 'node:os'; +import { homedir, hostname, userInfo } from 'node:os'; import { dirname, join, resolve } from 'node:path'; import { fileURLToPath } from 'node:url'; import { spawn } from 'node:child_process'; @@ -236,6 +236,297 @@ export function buildAgentTailCommand( ]; } +// --------------------------------------------------------------------------- +// Fleet ps — phase 2 observability helpers +// --------------------------------------------------------------------------- + +export const HEARTBEAT_INTERVAL_MS = 15_000; +export const HEARTBEAT_HEALTHY_MULTIPLIER = 3; + +export interface HeartbeatInfo { + ts: Date | null; + pid: number | null; + status: 'ok' | 'busy' | null; + /** healthy | stale | unknown */ + health: 'healthy' | 'stale' | 'unknown'; + ageMs: number | null; +} + +export interface AgentPsRow { + name: string; + tenant_id: string; + host: string; + runtime: string; + systemdActive: string; + systemdEnabled: string; + paneAlive: boolean; + panePid: number | null; + paneCommand: string | null; + idleSeconds: number | null; + heartbeat: HeartbeatInfo; + /** roster runtime !== actual pane command */ + driftFlag: boolean; + /** active but UnitFileState=disabled */ + bootEnableWarning: boolean; +} + +/** + * Returns the systemd show command for an agent unit (active+enabled state). + * Returns: `systemctl --user show -p ActiveState -p SubState -p UnitFileState` + */ +export function buildSystemdShowCommand(agentName: string): string[] { + const unit = `mosaic-agent@${agentName}.service`; + return [ + 'systemctl', + '--user', + 'show', + unit, + '-p', + 'ActiveState', + '-p', + 'SubState', + '-p', + 'UnitFileState', + ]; +} + +/** + * Returns the systemd is-active command for an agent unit. + */ +export function buildSystemdIsActiveCommand(agentName: string): string[] { + const unit = `mosaic-agent@${agentName}.service`; + return ['systemctl', '--user', 'is-active', unit]; +} + +/** + * Returns the tmux list-panes command for an agent pane. + * Format: `#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}` + */ +export function buildTmuxListPanesCommand( + agentName: string, + socketName = DEFAULT_SOCKET_NAME, +): string[] { + return [ + 'tmux', + '-L', + socketName, + 'list-panes', + '-t', + `=${agentName}:0.0`, + '-F', + '#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}', + ]; +} + +/** + * Returns the heartbeat file path for an agent. + */ +export function heartbeatPath(agentName: string, mosaicHome = defaultMosaicHome()): string { + return join(mosaicHome, 'fleet', 'run', `${agentName}.hb`); +} + +/** + * Parse a heartbeat file's contents into a HeartbeatInfo. + * File format (one key=value per line): + * ts= + * pid= + * status= + */ +export function parseHeartbeat(content: string | null, nowMs = Date.now()): HeartbeatInfo { + if (content === null) { + return { ts: null, pid: null, status: null, health: 'unknown', ageMs: null }; + } + const lines = content.split('\n'); + let ts: Date | null = null; + let pid: number | null = null; + let status: 'ok' | 'busy' | null = null; + for (const line of lines) { + const [key, ...rest] = line.split('='); + const val = rest.join('=').trim(); + if (key === 'ts' && val) { + const d = new Date(val); + if (!Number.isNaN(d.getTime())) ts = d; + } else if (key === 'pid' && val) { + const n = Number.parseInt(val, 10); + if (Number.isFinite(n)) pid = n; + } else if (key === 'status' && (val === 'ok' || val === 'busy')) { + status = val; + } + } + const thresholdMs = HEARTBEAT_INTERVAL_MS * HEARTBEAT_HEALTHY_MULTIPLIER; + let health: 'healthy' | 'stale' | 'unknown' = 'unknown'; + let ageMs: number | null = null; + if (ts !== null) { + ageMs = nowMs - ts.getTime(); + health = ageMs <= thresholdMs ? 'healthy' : 'stale'; + } + return { ts, pid, status, health, ageMs }; +} + +/** + * Parse the output of `systemctl --user show ... -p ActiveState -p SubState -p UnitFileState` + * Returns an object with the three properties. + */ +export function parseSystemdShow(output: string): { + ActiveState: string; + SubState: string; + UnitFileState: string; +} { + const result: Record = {}; + for (const line of output.split('\n')) { + const eq = line.indexOf('='); + if (eq !== -1) { + result[line.slice(0, eq)] = line.slice(eq + 1).trim(); + } + } + return { + ActiveState: result['ActiveState'] ?? 'unknown', + SubState: result['SubState'] ?? 'unknown', + UnitFileState: result['UnitFileState'] ?? 'unknown', + }; +} + +/** + * Parse the output of `tmux list-panes -F '#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}'` + * pane_activity is a Unix epoch timestamp (seconds). + */ +export function parseTmuxListPanes( + output: string, + nowMs = Date.now(), +): { pid: number | null; command: string | null; dead: boolean; idleSeconds: number | null } { + const line = output.trim().split('\n')[0]; + if (!line) { + return { pid: null, command: null, dead: true, idleSeconds: null }; + } + // format: + const parts = line.split(' '); + const pid = parts[0] ? (Number.isFinite(Number(parts[0])) ? Number(parts[0]) : null) : null; + const command = parts[1] ?? null; + const dead = parts[2] === '1'; + const activityEpoch = parts[3] ? Number(parts[3]) : NaN; + const idleSeconds = + Number.isFinite(activityEpoch) && activityEpoch > 0 + ? Math.floor((nowMs - activityEpoch * 1000) / 1000) + : null; + return { pid, command, dead, idleSeconds }; +} + +/** + * Determine if there is a runtime drift: roster says one runtime but the pane + * is actually running something from a different runtime. We detect this by + * checking if the pane command doesn't match a known canonical command for the + * roster's declared runtime. + * + * Known canonical commands per runtime: + * claude → claude + * codex → codex + * opencode → opencode + * pi → pi + * + * If the pane is running something else (e.g., python3/dogfood-agent.py) for + * an agent whose roster runtime is "pi", that's a drift. + */ +export function detectDrift(rosterRuntime: string, paneCommand: string | null): boolean { + if (!paneCommand) return false; + const knownCommands: Record = { + claude: ['claude'], + codex: ['codex'], + opencode: ['opencode'], + pi: ['pi'], + }; + const expected = knownCommands[rosterRuntime]; + if (!expected) return false; + return !expected.includes(paneCommand); +} + +/** + * Returns the default tenant_id (OS username) and host (short hostname). + * These MUST appear in every --json record for multi-tenant/multi-host zero-foreclosure. + */ +export function getDefaultTenantAndHost(): { tenant_id: string; host: string } { + let tenant_id: string; + try { + tenant_id = userInfo().username; + } catch { + tenant_id = process.env['USER'] ?? process.env['LOGNAME'] ?? 'unknown'; + } + const host = hostname().split('.')[0] || 'localhost'; + return { tenant_id, host }; +} + +/** + * Builds the `agent watch` command: read-only tmux attach. + * Uses `-r` flag to prevent keystrokes and `=` exact-match session target. + */ +export function buildAgentWatchCommand( + agentName: string, + socketName = DEFAULT_SOCKET_NAME, +): string[] { + return ['tmux', '-L', socketName, 'attach', '-r', '-t', `=${agentName}`]; +} + +/** + * Builds the capture-pane command used to verify that agent send was accepted + * (not left as an unsubmitted draft). Captures the last N lines and checks for + * the draft heuristic. + */ +export function buildAgentVerifyAcceptedCommand( + agentName: string, + socketName = DEFAULT_SOCKET_NAME, + lines = 5, +): string[] { + return [ + 'tmux', + '-L', + socketName, + 'capture-pane', + '-t', + `=${agentName}:0.0`, + '-p', + '-S', + `-${lines}`, + ]; +} + +/** + * Check whether a send was accepted (not left as draft). + * A message is considered NOT accepted (draft) if the captured pane output + * still shows the message text at the bottom prompt without a newline/submission. + * We look for the common TUI pattern: the text appears at the last line but + * hasn't been cleared (which would happen after submission). + * + * Heuristic: if pane capture is non-empty and does NOT contain a leading `>` + * or prompt indicator on the LAST non-empty line, the send is considered accepted. + * This mirrors the send-message.sh draft check: if the last line looks like an + * unsubmitted input line, it's a draft. + * + * Returns true if accepted (submitted), false if still a draft/unverifiable. + */ +export function isSendAccepted(capturedOutput: string): boolean { + const lines = capturedOutput.split('\n').filter((l) => l.trim().length > 0); + if (lines.length === 0) return true; // blank pane — treat as submitted + const lastLine = lines[lines.length - 1]!; + // Heuristic: if last non-empty line is a bare user-typed draft line with no + // AI response yet (starts with or contains typical draft markers), flag as draft. + // Typical draft patterns: line ends with the sent message text with no ">" prefix, + // or the line is identical to a prompt that hasn't been cleared. + // We use a conservative heuristic: a line that is ONLY whitespace/prompt characters + // with no response indicator is suspicious, but since we can't reliably detect + // every TUI's draft state, we check for a specific pattern: + // if last line has trailing `█` (cursor block) or is blank after stripping ANSI, + // treat as submitted. Otherwise if we see the exact sent text repeated, it's draft. + // For robustness, we accept as submitted unless we see clear draft evidence. + // Real implementation: check if last meaningful line is just the input prompt. + const stripped = lastLine.replace(/\x1b\[[0-9;]*m/g, '').trim(); + // If the pane shows a line that looks like a pending input (ends with cursor or is empty), + // that means the message was submitted and the pane is waiting for response. + // A draft line typically looks like: "> " without a response. + // For simplicity: if stripped last line starts with "> " — that's a common draft pattern + // in pi/claude TUIs for showing user input before submission. + if (/^>\s/.test(stripped)) return false; + return true; +} + export function registerFleetCommand(program: Command, deps: FleetCommandDeps = {}): Command { const runner = deps.runner ?? runCommand; const paths = resolveFleetPaths(deps.mosaicHome); @@ -360,6 +651,113 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps = console.log(`Verified fleet on tmux socket ${socketName}.`); }); + cmd + .command('ps') + .description('Show real-time status for all roster agents (systemd + tmux + heartbeat)') + .option('--json', 'Print JSON array') + .action(async (opts: { json?: boolean }) => { + const commandOpts = cmd.opts<{ mosaicHome: string; roster?: string }>(); + const activePaths = resolveFleetPaths(commandOpts.mosaicHome); + const roster = await loadRosterForCommand(cmd); + const { tenant_id, host } = getDefaultTenantAndHost(); + const nowMs = Date.now(); + + const rows: AgentPsRow[] = []; + + for (const agent of roster.agents) { + // systemd show + const showResult = await runner(...splitCommand(buildSystemdShowCommand(agent.name))); + const sysInfo = parseSystemdShow(showResult.stdout); + + // tmux list-panes + const panesResult = await runner( + ...splitCommand(buildTmuxListPanesCommand(agent.name, roster.tmux.socketName)), + ); + const paneInfo = parseTmuxListPanes(panesResult.stdout, nowMs); + + // heartbeat + const hbFile = heartbeatPath(agent.name, activePaths.mosaicHome); + let hbContent: string | null = null; + try { + hbContent = await readFile(hbFile, 'utf8'); + } catch { + hbContent = null; + } + const hb = parseHeartbeat(hbContent, nowMs); + + // drift and boot-enable + const driftFlag = detectDrift(agent.runtime, paneInfo.command); + const bootEnableWarning = + sysInfo.ActiveState === 'active' && sysInfo.UnitFileState === 'disabled'; + + rows.push({ + name: agent.name, + tenant_id, + host, + runtime: agent.runtime, + systemdActive: sysInfo.ActiveState, + systemdEnabled: sysInfo.UnitFileState, + paneAlive: !paneInfo.dead, + panePid: paneInfo.pid, + paneCommand: paneInfo.command, + idleSeconds: paneInfo.idleSeconds, + heartbeat: hb, + driftFlag, + bootEnableWarning, + }); + } + + if (opts.json) { + console.log(JSON.stringify(rows, null, 2)); + return; + } + + // Table output + const header = [ + 'NAME'.padEnd(18), + 'TENANT'.padEnd(12), + 'HOST'.padEnd(12), + 'RUNTIME'.padEnd(10), + 'SYSTEMD'.padEnd(16), + 'PANE'.padEnd(8), + 'PID'.padEnd(8), + 'IDLE'.padEnd(8), + 'HB'.padEnd(12), + 'FLAGS', + ].join(' '); + console.log(header); + console.log('-'.repeat(header.length)); + + for (const row of rows) { + const systemd = `${row.systemdActive}/${row.systemdEnabled}`; + const pane = row.paneAlive ? 'alive' : 'dead'; + const pid = row.panePid !== null ? String(row.panePid) : '-'; + const idle = row.idleSeconds !== null ? `${row.idleSeconds}s` : '-'; + const hbAge = + row.heartbeat.ageMs !== null + ? `${Math.round(row.heartbeat.ageMs / 1000)}s/${row.heartbeat.health}` + : `unknown`; + const flags: string[] = []; + if (row.driftFlag) flags.push('DRIFT'); + if (row.bootEnableWarning) flags.push('BOOT-ENABLE'); + + console.log( + [ + row.name.padEnd(18), + row.tenant_id.padEnd(12), + row.host.padEnd(12), + row.runtime.padEnd(10), + systemd.padEnd(16), + pane.padEnd(8), + pid.padEnd(8), + idle.padEnd(8), + hbAge.padEnd(12), + flags.join(','), + ].join(' '), + ); + } + }); + return cmd; } @@ -417,8 +815,15 @@ export function registerFleetAgentCommands( .requiredOption('--message ', 'Message text') .option('--source-label