From d88755585205034dd59918ed71b68bfcf9904397 Mon Sep 17 00:00:00 2001 From: "jason.woltje" Date: Wed, 24 Jun 2026 05:55:47 +0000 Subject: [PATCH] feat(fleet): classify agent readiness in fleet ps (#649) --- docs/scratchpads/h1-heartbeat-readiness.md | 66 +++++++ packages/mosaic/src/commands/fleet.spec.ts | 216 ++++++++++++++++++++- packages/mosaic/src/commands/fleet.ts | 99 +++++++++- 3 files changed, 377 insertions(+), 4 deletions(-) create mode 100644 docs/scratchpads/h1-heartbeat-readiness.md diff --git a/docs/scratchpads/h1-heartbeat-readiness.md b/docs/scratchpads/h1-heartbeat-readiness.md new file mode 100644 index 0000000..2810f77 --- /dev/null +++ b/docs/scratchpads/h1-heartbeat-readiness.md @@ -0,0 +1,66 @@ +# H1 — heartbeat readiness detection + +## Objective + +Add runtime-agnostic readiness classification to `mosaic fleet ps` so an agent can be reported as working/idle/stuck/stale/dead/unknown instead of treating pane liveness as progress. + +## Scope + +- `packages/mosaic/src/commands/fleet.ts` + - exported readiness state/types/default thresholds/helpers/classifier + - `AgentPsRow.readiness` additive JSON field + - table HB column and IDLE/STUCK flags +- `packages/mosaic/src/commands/fleet.spec.ts` + - pure classifier branch/boundary coverage + - threshold helper coverage + - legitimate render/JSON assertion updates for new HB text + +## Acceptance Criteria + +- Branches covered: dead, unknown, stale, busy working, null-idle working, stuck boundary, idle boundary, working below idle. +- Threshold env helpers default to 300s/900s and honor positive integer env values. +- `fleet ps` rows populate `readiness` for roster and unmanaged socket sessions. +- Table HB text becomes `s/` when heartbeat age exists; remains `unknown` when absent. +- Flags include `IDLE`/`STUCK` for matching readiness. +- Local gates green: `pnpm typecheck`, `pnpm lint`, `pnpm format:check`, fleet vitest. +- Pre-push queue guard passes; PR opened off `origin/main`; no merge by worker. + +## Constraints / Assumptions + +- Source branch: `origin/main` @ `e3adc6a`. +- No scope creep beyond readiness detection. +- `docs/TASKS.md` and `docs/fleet/TASKS.md` are orchestrator-owned; worker will not modify them. +- PRD alignment source: `docs/fleet/PRD.md` Phase 2 observability; this is a refinement of heartbeat observability, preserving existing unknown/stale behavior. + +## Plan + +1. Install dependencies with requested PNPM environment. +2. Add readiness types/helpers/classifier near heartbeat constants. +3. Add `readiness` to `AgentPsRow` and populate both row paths. +4. Update table render and flags. +5. Add unit tests and update affected ps render/JSON assertions. +6. Run build precheck + required gates. +7. Run automated independent review, remediate findings. +8. Queue guard, push, open PR. + +## Progress + +- 2026-06-24: Branch created from `origin/main` @ `e3adc6a`. +- 2026-06-24: Implemented readiness thresholds/classifier, JSON row field, HB column label, and IDLE/STUCK flags. +- 2026-06-24: Added classifier branch/boundary tests, threshold helper tests, JSON shape assertions, and readiness table rendering assertions. + +## Verification Evidence + +- `pnpm install --store-dir "$HOME/.pnpm-store"` — pass. +- `npx turbo build --filter=@mosaicstack/mosaic^...` — pass, 12/12 tasks successful. +- `pnpm typecheck` — pass, 41/41 tasks successful. +- `pnpm lint` — pass, 23/23 tasks successful. +- `pnpm format:check` — pass, all matched files use Prettier style. +- `pnpm --filter @mosaicstack/mosaic exec vitest run src/commands/fleet.spec.ts` — pass, 171 tests. +- `pnpm --filter @mosaicstack/mosaic test` — pass, 39 files / 547 tests; `fleet.spec.ts` 171 tests. +- `~/.config/mosaic/tools/codex/codex-code-review.sh --uncommitted` — approve, 0 findings (reviewed supplied diff; sandbox file-inspection limitation noted by tool). + +## Risks / Blockers + +- No current blocker. +- Review tool could not inspect repo files directly due sandbox wrapper limitation, but it reviewed the supplied diff and approved with no findings. diff --git a/packages/mosaic/src/commands/fleet.spec.ts b/packages/mosaic/src/commands/fleet.spec.ts index a5be83b..cf0056e 100644 --- a/packages/mosaic/src/commands/fleet.spec.ts +++ b/packages/mosaic/src/commands/fleet.spec.ts @@ -19,17 +19,21 @@ import { buildSystemdShowCommand, buildTmuxListPanesCommand, buildTmuxListSessionsCommand, + classifyReadiness, classifySendResult, countOrchestrators, countEnhancers, detectDrift, enableFleetUnits, FLEET_PROFILES, + HEARTBEAT_IDLE_THRESHOLD_SECONDS, + HEARTBEAT_STUCK_THRESHOLD_SECONDS, generateAgentEnv, getDefaultOperatorSourceLabel, getDefaultTenantAndHost, getRosterAgent, heartbeatPath, + idleThresholdSeconds, isSendAccepted, loadFleetRoster, mergeAgentEnv, @@ -44,6 +48,7 @@ import { resolvePresetFilename, RUNTIME_ACCEPTABLE_COMMANDS, serializeRosterToYaml, + stuckThresholdSeconds, VERIFY_DEFAULT_TIMEOUT_MS, VERIFY_POLL_INTERVAL_MS, type AgentPsRow, @@ -933,6 +938,127 @@ describe('fleet ps — heartbeat parsing', () => { }); }); +describe('fleet ps — readiness thresholds', () => { + const savedIdle = process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD; + const savedStuck = process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD; + + afterEach(() => { + if (savedIdle === undefined) delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD; + else process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = savedIdle; + if (savedStuck === undefined) delete process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD; + else process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = savedStuck; + }); + + it('uses default readiness thresholds when env is unset', () => { + delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD; + delete process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD; + + expect(idleThresholdSeconds()).toBe(HEARTBEAT_IDLE_THRESHOLD_SECONDS); + expect(stuckThresholdSeconds()).toBe(HEARTBEAT_STUCK_THRESHOLD_SECONDS); + }); + + it('honors positive integer readiness thresholds from env', () => { + process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '120'; + process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = '480'; + + expect(idleThresholdSeconds()).toBe(120); + expect(stuckThresholdSeconds()).toBe(480); + }); + + it('falls back to defaults for invalid readiness thresholds', () => { + process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '0'; + process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = 'not-a-number'; + + expect(idleThresholdSeconds()).toBe(HEARTBEAT_IDLE_THRESHOLD_SECONDS); + expect(stuckThresholdSeconds()).toBe(HEARTBEAT_STUCK_THRESHOLD_SECONDS); + }); +}); + +describe('fleet ps — readiness classification', () => { + const thresholds = { idleThresholdSeconds: 300, stuckThresholdSeconds: 900 }; + + it('reports dead when the pane is not alive', () => { + expect( + classifyReadiness( + { paneAlive: false, hbHealth: 'healthy', hbStatus: 'busy', idleSeconds: 0 }, + thresholds, + ), + ).toBe('dead'); + }); + + it('reports unknown when heartbeat health is unknown', () => { + expect( + classifyReadiness( + { paneAlive: true, hbHealth: 'unknown', hbStatus: null, idleSeconds: 0 }, + thresholds, + ), + ).toBe('unknown'); + }); + + it('reports stale when heartbeat health is stale', () => { + expect( + classifyReadiness( + { paneAlive: true, hbHealth: 'stale', hbStatus: 'busy', idleSeconds: 1_000 }, + thresholds, + ), + ).toBe('stale'); + }); + + it('reports working when heartbeat status is busy, even past stuck threshold', () => { + expect( + classifyReadiness( + { paneAlive: true, hbHealth: 'healthy', hbStatus: 'busy', idleSeconds: 2_000 }, + thresholds, + ), + ).toBe('working'); + }); + + it('reports working when pane idle seconds are unavailable', () => { + expect( + classifyReadiness( + { paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: null }, + thresholds, + ), + ).toBe('working'); + }); + + it('reports stuck at the stuck threshold boundary', () => { + expect( + classifyReadiness( + { paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 900 }, + thresholds, + ), + ).toBe('stuck'); + }); + + it('reports idle at the idle threshold boundary', () => { + expect( + classifyReadiness( + { paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 300 }, + thresholds, + ), + ).toBe('idle'); + }); + + it('reports working below the idle threshold', () => { + expect( + classifyReadiness( + { paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 299 }, + thresholds, + ), + ).toBe('working'); + }); + + it('checks stuck before idle when thresholds are inverted', () => { + expect( + classifyReadiness( + { paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 350 }, + { idleThresholdSeconds: 900, stuckThresholdSeconds: 300 }, + ), + ).toBe('stuck'); + }); +}); + describe('fleet ps — systemd show parsing', () => { it('parses ActiveState, SubState, UnitFileState from systemctl show output', () => { const output = 'ActiveState=active\nSubState=running\nUnitFileState=enabled\n'; @@ -1324,8 +1450,9 @@ describe('fleet ps — JSON output shape (FR-6)', () => { // boot-enable warning: active + disabled expect(row.bootEnableWarning).toBe(true); - // heartbeat missing → unknown + // heartbeat missing → unknown readiness preserves existing display semantics expect(row.heartbeat.health).toBe('unknown'); + expect(row.readiness).toBe('unknown'); expect(row.name).toBe('canary-pi'); expect(row.runtime).toBe('pi'); @@ -1387,6 +1514,92 @@ describe('fleet ps — command sequences issued', () => { }); }); +describe('fleet ps — readiness table output', () => { + it('renders readiness in HB column and flags idle/stuck rows', async () => { + const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); + const rosterPath = join(home, 'fleet', 'roster.yaml'); + const runDir = join(home, 'fleet', 'run'); + await mkdir(runDir, { recursive: true }); + await writeFile( + rosterPath, + [ + 'version: 1', + 'transport: tmux', + 'agents:', + ' - name: idle-agent', + ' runtime: pi', + ' - name: stuck-agent', + ' runtime: pi', + ].join('\n'), + ); + + const nowMs = 1_700_000_000_000; + const idleActivityEpoch = Math.floor((nowMs - 10_000) / 1000); + const stuckActivityEpoch = Math.floor((nowMs - 40_000) / 1000); + const hbTs = new Date(nowMs - 1_000).toISOString(); + await writeFile(join(runDir, 'idle-agent.hb'), `ts=${hbTs}\npid=111\nstatus=ok\n`); + await writeFile(join(runDir, 'stuck-agent.hb'), `ts=${hbTs}\npid=222\nstatus=ok\n`); + + const savedIdle = process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD; + const savedStuck = process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD; + process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '5'; + process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = '30'; + + const dateNow = vi.spyOn(Date, 'now').mockReturnValue(nowMs); + const runner: CommandRunner = async (command, args) => { + const full = [command, ...args].join(' '); + if (full.includes('list-sessions')) { + return { stdout: 'idle-agent\nstuck-agent\n', stderr: '', exitCode: 0 }; + } + if (full.includes('=idle-agent:0.0')) { + return { stdout: `111 pi 0 ${idleActivityEpoch}\n`, stderr: '', exitCode: 0 }; + } + if (full.includes('=stuck-agent:0.0')) { + return { stdout: `222 pi 0 ${stuckActivityEpoch}\n`, stderr: '', exitCode: 0 }; + } + if (full.includes('systemctl') && full.includes('show')) { + return { + stdout: 'ActiveState=active\nSubState=running\nUnitFileState=enabled\n', + stderr: '', + exitCode: 0, + }; + } + return { stdout: '', stderr: '', exitCode: 0 }; + }; + + const lines: string[] = []; + const origLog = console.log; + console.log = (msg: string) => { + lines.push(msg); + }; + + const program = new Command(); + program.exitOverride(); + registerFleetCommand(program, { runner, mosaicHome: home }); + + try { + await program.parseAsync(['node', 'mosaic', 'fleet', 'ps']); + } finally { + console.log = origLog; + dateNow.mockRestore(); + if (savedIdle === undefined) delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD; + else process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = savedIdle; + if (savedStuck === undefined) delete process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD; + else process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = savedStuck; + await rm(home, { recursive: true, force: true }); + } + + const idleLine = lines.find((line) => line.includes('idle-agent')); + const stuckLine = lines.find((line) => line.includes('stuck-agent')); + expect(idleLine).toBeDefined(); + expect(idleLine).toContain('1s/idle'); + expect(idleLine).toMatch(/\bIDLE\b/); + expect(stuckLine).toBeDefined(); + expect(stuckLine).toContain('1s/stuck'); + expect(stuckLine).toMatch(/\bSTUCK\b/); + }); +}); + describe('buildTmuxListSessionsCommand', () => { it('builds exact list-sessions command with session_name format', () => { expect(buildTmuxListSessionsCommand('mosaic-fleet')).toEqual([ @@ -1514,6 +1727,7 @@ describe('fleet ps — unmanaged socket sessions', () => { // driftFlag must be false for unmanaged (no roster runtime to compare) expect(unmanagedRow.driftFlag).toBe(false); + expect(unmanagedRow.readiness).toBe('unknown'); }); it('shows UNMANAGED flag in table output for unmanaged sessions', async () => { diff --git a/packages/mosaic/src/commands/fleet.ts b/packages/mosaic/src/commands/fleet.ts index a37d680..2cecf0e 100644 --- a/packages/mosaic/src/commands/fleet.ts +++ b/packages/mosaic/src/commands/fleet.ts @@ -394,6 +394,8 @@ export function buildAgentTailCommand(agentName: string, lines: number, socketNa // --------------------------------------------------------------------------- export const HEARTBEAT_INTERVAL_MS = 15_000; +export const HEARTBEAT_IDLE_THRESHOLD_SECONDS = 300; +export const HEARTBEAT_STUCK_THRESHOLD_SECONDS = 900; /** * Heartbeat interval in ms, honoring MOSAIC_HEARTBEAT_INTERVAL (seconds) so the @@ -404,8 +406,68 @@ export function heartbeatIntervalMs(): number { const sec = Number.parseInt(process.env.MOSAIC_HEARTBEAT_INTERVAL ?? '', 10); return Number.isFinite(sec) && sec > 0 ? sec * 1000 : HEARTBEAT_INTERVAL_MS; } + +/** Idle threshold in seconds, honoring MOSAIC_HEARTBEAT_IDLE_THRESHOLD. */ +export function idleThresholdSeconds(): number { + const sec = Number.parseInt(process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD ?? '', 10); + return Number.isFinite(sec) && sec > 0 ? sec : HEARTBEAT_IDLE_THRESHOLD_SECONDS; +} + +/** Stuck threshold in seconds, honoring MOSAIC_HEARTBEAT_STUCK_THRESHOLD. */ +export function stuckThresholdSeconds(): number { + const sec = Number.parseInt(process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD ?? '', 10); + return Number.isFinite(sec) && sec > 0 ? sec : HEARTBEAT_STUCK_THRESHOLD_SECONDS; +} export const HEARTBEAT_HEALTHY_MULTIPLIER = 3; +export type ReadinessState = 'working' | 'idle' | 'stuck' | 'stale' | 'dead' | 'unknown'; + +export interface ReadinessSignals { + paneAlive: boolean; + hbHealth: 'healthy' | 'stale' | 'unknown'; + hbStatus: 'ok' | 'busy' | null; + idleSeconds: number | null; +} + +export interface ReadinessThresholds { + idleThresholdSeconds: number; + stuckThresholdSeconds: number; +} + +/** + * Classify whether an agent is progressing based on already-parsed heartbeat/tmux signals. + * Best-effort and runtime-agnostic: it never probes, never throws, and preserves existing + * unknown/stale behavior when heartbeat data is absent or old. + */ +export function classifyReadiness( + signals: Partial | null | undefined, + thresholds: Partial | null | undefined = {}, +): ReadinessState { + try { + if (signals?.paneAlive !== true) return 'dead'; + if (signals.hbHealth === 'unknown' || signals.hbHealth === undefined) return 'unknown'; + if (signals.hbHealth === 'stale') return 'stale'; + if (signals.hbStatus === 'busy') return 'working'; + if (signals.idleSeconds === null || signals.idleSeconds === undefined) return 'working'; + + const idleSeconds = Number.isFinite(signals.idleSeconds) ? signals.idleSeconds : null; + if (idleSeconds === null) return 'working'; + + const idleThreshold = Number.isFinite(thresholds?.idleThresholdSeconds) + ? Number(thresholds?.idleThresholdSeconds) + : idleThresholdSeconds(); + const stuckThreshold = Number.isFinite(thresholds?.stuckThresholdSeconds) + ? Number(thresholds?.stuckThresholdSeconds) + : stuckThresholdSeconds(); + + if (idleSeconds >= stuckThreshold) return 'stuck'; + if (idleSeconds >= idleThreshold) return 'idle'; + return 'working'; + } catch { + return 'unknown'; + } +} + export interface HeartbeatInfo { ts: Date | null; pid: number | null; @@ -429,6 +491,7 @@ export interface AgentPsRow { paneCommand: string | null; idleSeconds: number | null; heartbeat: HeartbeatInfo; + readiness: ReadinessState; /** roster runtime !== actual pane command */ driftFlag: boolean; /** active but UnitFileState=disabled */ @@ -1022,6 +1085,10 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps = const nowMs = Date.now(); const rows: AgentPsRow[] = []; + const readinessThresholds = { + idleThresholdSeconds: idleThresholdSeconds(), + stuckThresholdSeconds: stuckThresholdSeconds(), + }; // Build the set of roster agent names for quick lookup when filtering socket sessions. const rosterAgentNames = new Set(roster.agents.map((a) => a.name)); @@ -1052,6 +1119,17 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps = const bootEnableWarning = sysInfo.ActiveState === 'active' && sysInfo.UnitFileState === 'disabled'; + const paneAlive = !paneInfo.dead; + const readiness = classifyReadiness( + { + paneAlive, + hbHealth: hb.health, + hbStatus: hb.status, + idleSeconds: paneInfo.idleSeconds, + }, + readinessThresholds, + ); + rows.push({ name: agent.name, tenant_id, @@ -1059,11 +1137,12 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps = runtime: agent.runtime, systemdActive: sysInfo.ActiveState, systemdEnabled: sysInfo.UnitFileState, - paneAlive: !paneInfo.dead, + paneAlive, panePid: paneInfo.pid, paneCommand: paneInfo.command, idleSeconds: paneInfo.idleSeconds, heartbeat: hb, + readiness, driftFlag, bootEnableWarning, managed: true, @@ -1110,6 +1189,17 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps = const bootEnableWarning = sysInfo.ActiveState === 'active' && sysInfo.UnitFileState === 'disabled'; + const paneAlive = !paneInfo.dead; + const readiness = classifyReadiness( + { + paneAlive, + hbHealth: hb.health, + hbStatus: hb.status, + idleSeconds: paneInfo.idleSeconds, + }, + readinessThresholds, + ); + rows.push({ name: sessionName, tenant_id, @@ -1118,11 +1208,12 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps = runtime: 'unknown', systemdActive: sysInfo.ActiveState, systemdEnabled: sysInfo.UnitFileState, - paneAlive: !paneInfo.dead, + paneAlive, panePid: paneInfo.pid, paneCommand: paneInfo.command, idleSeconds: paneInfo.idleSeconds, heartbeat: hb, + readiness, // No roster runtime to compare — drift is not meaningful for unmanaged sessions driftFlag: false, bootEnableWarning, @@ -1164,13 +1255,15 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps = const idle = row.idleSeconds !== null ? `${row.idleSeconds}s` : '-'; const hbAge = row.heartbeat.ageMs !== null - ? `${Math.round(row.heartbeat.ageMs / 1000)}s/${row.heartbeat.health}` + ? `${Math.round(row.heartbeat.ageMs / 1000)}s/${row.readiness}` : `unknown`; const model = row.heartbeat.model ?? '-'; const flags: string[] = []; if (!row.managed) flags.push('UNMANAGED'); if (row.driftFlag) flags.push('DRIFT'); if (row.bootEnableWarning) flags.push('BOOT-ENABLE'); + if (row.readiness === 'idle') flags.push('IDLE'); + if (row.readiness === 'stuck') flags.push('STUCK'); console.log( [