diff --git a/docs/fleet/PRD.md b/docs/fleet/PRD.md index f250395..8a3cb04 100644 --- a/docs/fleet/PRD.md +++ b/docs/fleet/PRD.md @@ -80,14 +80,17 @@ observability and no safe way to watch a session. - **Verify heuristic is best-effort:** `agent send --verify` uses a `>` -prefix draft heuristic that is specific to pi/claude TUIs. Draft detection for codex and opencode TUIs is best-effort only; those runtimes may not use the same input-line indicator. -- **Pane-change check is the best Phase-2 signal:** `agent send --verify` compares a - BEFORE snapshot (captured immediately before the send) to an AFTER snapshot (captured - after the send delay). A pane that changed and does not end in a draft line is reported - as 'accepted'. A pane that did not change — including a wedged pane showing stale - non-empty content — is reported 'unverifiable' (exit 1, "no pane change after send"). - Definitive acceptance ultimately requires a runtime acknowledgement (Phase-3 - heartbeat-ack); the pane-change check is the best signal available against an opaque - TUI for Phase-2. +- **Pane-change check is the best Phase-2 signal; verify now polls up to a bounded + timeout:** `agent send --verify` captures a BEFORE snapshot, sends the message, then + polls `capture-pane` every ~400 ms up to a configurable total timeout (default ~6 s, + controlled by `--verify-timeout `). On each poll it runs classifySendResult: if + the pane shows 'accepted' or 'draft' the loop exits immediately; while the result is + 'unverifiable' (no pane change yet) it keeps polling. After the timeout with no + definitive result, it fails closed: exit 1 with "no pane change after send". This + eliminates false 'unverifiable' failures for slow/loaded TUIs that were previously + caused by the old fixed 300 ms single-capture. Definitive acceptance ultimately + requires a runtime acknowledgement (Phase-3 heartbeat-ack); the bounded pane-change + poll is the best signal available against an opaque TUI for Phase-2. - **Blank AFTER capture fails closed:** Full-screen TUIs (claude, codex, opencode, pi) render blank for `tmux capture-pane`. When the AFTER snapshot is empty, `send --verify` returns non-zero with an "unverifiable" message rather than silently succeeding. This diff --git a/packages/mosaic/src/commands/fleet.spec.ts b/packages/mosaic/src/commands/fleet.spec.ts index 09261c7..5b852cf 100644 --- a/packages/mosaic/src/commands/fleet.spec.ts +++ b/packages/mosaic/src/commands/fleet.spec.ts @@ -28,9 +28,12 @@ import { parseTmuxListPanes, registerFleetCommand, resolveFleetPaths, + VERIFY_DEFAULT_TIMEOUT_MS, + VERIFY_POLL_INTERVAL_MS, type AgentPsRow, type CommandRunner, type InteractiveRunner, + type SleepFn, } from './fleet.js'; import { registerAgentCommand } from './agent.js'; @@ -1235,7 +1238,7 @@ describe('agent send --verify', () => { }); }); - it('issues BEFORE-capture then send then AFTER-capture (3 calls) when --verify is passed', async () => { + it('issues BEFORE-capture then send then AFTER-capture (3 calls) when --verify is passed and pane changes on first poll', async () => { const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); await mkdir(join(home, 'fleet'), { recursive: true }); await writeFile( @@ -1245,6 +1248,9 @@ describe('agent send --verify', () => { ), ); + // no-op sleep so the test doesn't take VERIFY_DEFAULT_TIMEOUT_MS + const sleepFn: SleepFn = async () => {}; + let callIndex = 0; const calls: string[][] = []; const runner: CommandRunner = async (command, args) => { @@ -1253,14 +1259,14 @@ describe('agent send --verify', () => { if ([command, ...args].join(' ').includes('agent-send.sh')) { return { stdout: '', stderr: '', exitCode: 0 }; } - // BEFORE capture: return old content; AFTER capture: return new content + // BEFORE capture (idx 0): return old content; first AFTER capture (idx 2): return new content const stdout = idx === 0 ? 'Old pane content\n' : 'New response from agent\n'; return { stdout, stderr: '', exitCode: 0 }; }; const program = new Command(); program.exitOverride(); - registerAgentCommand(program, { runner, mosaicHome: home }); + registerAgentCommand(program, { runner, sleepFn, mosaicHome: home }); try { await program.parseAsync([ @@ -1274,7 +1280,7 @@ describe('agent send --verify', () => { '--verify', ]); - // 3 calls: BEFORE-capture, send, AFTER-capture + // 3 calls: BEFORE-capture, send, AFTER-capture (pane changed on first poll → accepted immediately) expect(calls).toHaveLength(3); expect(calls[0]).toEqual(buildAgentVerifyAcceptedCommand('coder0', 'mosaic-factory', 5)); expect(calls[1]![0]).toContain('agent-send.sh'); @@ -1322,7 +1328,7 @@ describe('agent send --verify', () => { } }); - it('send --verify: AFTER==BEFORE (stale/wedged pane) sets process.exitCode=1 (unverifiable)', async () => { + it('send --verify: AFTER==BEFORE (stale/wedged pane) sets process.exitCode=1 (unverifiable) after timeout', async () => { const originalExitCode = process.exitCode; const stderrMessages: string[] = []; const stderrSpy = vi.spyOn(process.stderr, 'write').mockImplementation((msg) => { @@ -1339,6 +1345,12 @@ describe('agent send --verify', () => { ), ); + // Count sleep calls to verify polling happens; use no-op sleep for speed. + let sleepCalls = 0; + const sleepFn: SleepFn = async () => { + sleepCalls++; + }; + const runner: CommandRunner = async (command, args) => { const full = [command, ...args].join(' '); if (full.includes('agent-send.sh')) return { stdout: '', stderr: '', exitCode: 0 }; @@ -1348,9 +1360,12 @@ describe('agent send --verify', () => { const program = new Command(); program.exitOverride(); - registerAgentCommand(program, { runner, mosaicHome: home }); + registerAgentCommand(program, { runner, sleepFn, mosaicHome: home }); try { + // Use a short verify-timeout (one poll interval worth) so the loop exits quickly. + // With a no-op sleep, Date.now() won't advance, so we only get 1 poll before + // deadline is exceeded. Use --verify-timeout=0 to force single-poll timeout. await program.parseAsync([ 'node', 'mosaic', @@ -1360,10 +1375,14 @@ describe('agent send --verify', () => { '--message', 'hello', '--verify', + '--verify-timeout', + '0', ]); expect(process.exitCode).toBe(1); // Must mention "no pane change" to distinguish from blank-capture case expect(stderrMessages.join('')).toMatch(/no pane change after send/i); + // At least one poll should have happened + expect(sleepCalls).toBeGreaterThanOrEqual(1); } finally { process.exitCode = originalExitCode; stderrSpy.mockRestore(); @@ -1371,7 +1390,7 @@ describe('agent send --verify', () => { } }, 10_000); - it('send --verify: blank AFTER capture sets process.exitCode=1 (unverifiable, fails closed)', async () => { + it('send --verify: blank AFTER capture sets process.exitCode=1 (unverifiable, fails closed) after timeout', async () => { const originalExitCode = process.exitCode; const stderrMessages: string[] = []; const stderrSpy = vi.spyOn(process.stderr, 'write').mockImplementation((msg) => { @@ -1388,6 +1407,8 @@ describe('agent send --verify', () => { ), ); + const sleepFn: SleepFn = async () => {}; + let captureCallCount = 0; const runner: CommandRunner = async (command, args) => { const full = [command, ...args].join(' '); @@ -1400,7 +1421,7 @@ describe('agent send --verify', () => { const program = new Command(); program.exitOverride(); - registerAgentCommand(program, { runner, mosaicHome: home }); + registerAgentCommand(program, { runner, sleepFn, mosaicHome: home }); try { await program.parseAsync([ @@ -1412,6 +1433,8 @@ describe('agent send --verify', () => { '--message', 'hello', '--verify', + '--verify-timeout', + '0', ]); expect(process.exitCode).toBe(1); expect(stderrMessages.join('')).toMatch(/could not verify delivery/i); @@ -1422,7 +1445,7 @@ describe('agent send --verify', () => { } }, 10_000); - it('send --verify: AFTER differs from BEFORE with draft line sets process.exitCode=1', async () => { + it('send --verify: AFTER differs from BEFORE with draft line sets process.exitCode=1 (returns immediately on first poll)', async () => { const originalExitCode = process.exitCode; const stderrMessages: string[] = []; const stderrSpy = vi.spyOn(process.stderr, 'write').mockImplementation((msg) => { @@ -1439,6 +1462,11 @@ describe('agent send --verify', () => { ), ); + let sleepCalls = 0; + const sleepFn: SleepFn = async () => { + sleepCalls++; + }; + let captureCallCount = 0; const runner: CommandRunner = async (command, args) => { const full = [command, ...args].join(' '); @@ -1451,7 +1479,7 @@ describe('agent send --verify', () => { const program = new Command(); program.exitOverride(); - registerAgentCommand(program, { runner, mosaicHome: home }); + registerAgentCommand(program, { runner, sleepFn, mosaicHome: home }); try { await program.parseAsync([ @@ -1466,6 +1494,8 @@ describe('agent send --verify', () => { ]); expect(process.exitCode).toBe(1); expect(stderrMessages.join('')).toMatch(/unsubmitted draft/i); + // Draft is returned on the first poll — only one sleep call expected + expect(sleepCalls).toBe(1); } finally { process.exitCode = originalExitCode; stderrSpy.mockRestore(); @@ -1473,7 +1503,7 @@ describe('agent send --verify', () => { } }, 10_000); - it('send --verify: AFTER differs from BEFORE with real response content sets exitCode=0 (accepted)', async () => { + it('send --verify: AFTER differs from BEFORE with real response content sets exitCode=0 (accepted on first poll)', async () => { const originalExitCode = process.exitCode; const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); @@ -1485,6 +1515,8 @@ describe('agent send --verify', () => { ), ); + const sleepFn: SleepFn = async () => {}; + let captureCallCount = 0; const runner: CommandRunner = async (command, args) => { const full = [command, ...args].join(' '); @@ -1500,7 +1532,7 @@ describe('agent send --verify', () => { const program = new Command(); program.exitOverride(); - registerAgentCommand(program, { runner, mosaicHome: home }); + registerAgentCommand(program, { runner, sleepFn, mosaicHome: home }); try { await program.parseAsync([ @@ -1520,4 +1552,199 @@ describe('agent send --verify', () => { await rm(home, { recursive: true, force: true }); } }, 10_000); + + // --------------------------------------------------------------------------- + // Bounded-polling tests (FR-5 enhancement) + // --------------------------------------------------------------------------- + + it('send --verify: accepted on 2nd poll (pane slow to respond) => exit 0', async () => { + // Simulates a slow/loaded TUI that only updates on the 2nd poll. + const originalExitCode = process.exitCode; + + const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + join(home, 'fleet', 'roster.yaml'), + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + let sleepCalls = 0; + const sleepFn: SleepFn = async (ms) => { + sleepCalls++; + expect(ms).toBe(VERIFY_POLL_INTERVAL_MS); + }; + + let captureCallCount = 0; + const runner: CommandRunner = async (command, args) => { + const full = [command, ...args].join(' '); + if (full.includes('agent-send.sh')) return { stdout: '', stderr: '', exitCode: 0 }; + captureCallCount++; + if (captureCallCount === 1) { + // BEFORE: old content + return { stdout: 'Old pane content\n', stderr: '', exitCode: 0 }; + } else if (captureCallCount === 2) { + // 1st AFTER poll: still unchanged (slow TUI) => unverifiable + return { stdout: 'Old pane content\n', stderr: '', exitCode: 0 }; + } else { + // 2nd AFTER poll: pane changed => accepted + return { stdout: 'Old pane content\nAgent accepted task.\n', stderr: '', exitCode: 0 }; + } + }; + + const program = new Command(); + program.exitOverride(); + registerAgentCommand(program, { runner, sleepFn, mosaicHome: home }); + + try { + // Give enough timeout for at least 2 polls (2 × VERIFY_POLL_INTERVAL_MS). + // With no-op sleep, Date.now() will advance between polls so we use a generous timeout. + await program.parseAsync([ + 'node', + 'mosaic', + 'agent', + 'send', + 'coder0', + '--message', + 'hello', + '--verify', + '--verify-timeout', + String(VERIFY_POLL_INTERVAL_MS * 3), + ]); + // Accepted on 2nd poll — exitCode should remain unchanged + expect(process.exitCode).toBe(originalExitCode); + // At least 2 sleep calls (one per poll until accepted) + expect(sleepCalls).toBeGreaterThanOrEqual(2); + } finally { + process.exitCode = originalExitCode; + await rm(home, { recursive: true, force: true }); + } + }, 10_000); + + it('send --verify: accepted on 3rd poll => exit 0', async () => { + const originalExitCode = process.exitCode; + + const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + join(home, 'fleet', 'roster.yaml'), + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + let sleepCalls = 0; + // Real-time advancing sleep needed so deadline check works correctly. + // Use a tiny delay (1ms) so the test runs fast but Date.now() still advances. + const sleepFn: SleepFn = async () => { + sleepCalls++; + await new Promise((r) => setTimeout(r, 1)); + }; + + let captureCallCount = 0; + const runner: CommandRunner = async (command, args) => { + const full = [command, ...args].join(' '); + if (full.includes('agent-send.sh')) return { stdout: '', stderr: '', exitCode: 0 }; + captureCallCount++; + if (captureCallCount === 1) { + return { stdout: 'Old content\n', stderr: '', exitCode: 0 }; + } else if (captureCallCount <= 3) { + // Polls 1 and 2: unchanged + return { stdout: 'Old content\n', stderr: '', exitCode: 0 }; + } else { + // Poll 3: accepted + return { stdout: 'Old content\nDone!\n', stderr: '', exitCode: 0 }; + } + }; + + const program = new Command(); + program.exitOverride(); + registerAgentCommand(program, { runner, sleepFn, mosaicHome: home }); + + try { + // Long enough timeout to allow at least 3 polls with 1ms sleeps + await program.parseAsync([ + 'node', + 'mosaic', + 'agent', + 'send', + 'coder0', + '--message', + 'hello', + '--verify', + '--verify-timeout', + '500', + ]); + expect(process.exitCode).toBe(originalExitCode); + expect(sleepCalls).toBeGreaterThanOrEqual(3); + } finally { + process.exitCode = originalExitCode; + await rm(home, { recursive: true, force: true }); + } + }, 10_000); + + it('send --verify: pane stays unchanged until timeout => exit 1 (unverifiable)', async () => { + const originalExitCode = process.exitCode; + const stderrMessages: string[] = []; + const stderrSpy = vi.spyOn(process.stderr, 'write').mockImplementation((msg) => { + stderrMessages.push(String(msg)); + return true; + }); + + const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-')); + await mkdir(join(home, 'fleet'), { recursive: true }); + await writeFile( + join(home, 'fleet', 'roster.yaml'), + ['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join( + '\n', + ), + ); + + let sleepCalls = 0; + const sleepFn: SleepFn = async () => { + sleepCalls++; + }; + + const runner: CommandRunner = async (command, args) => { + const full = [command, ...args].join(' '); + if (full.includes('agent-send.sh')) return { stdout: '', stderr: '', exitCode: 0 }; + // Always the same content — pane never changes + return { stdout: 'Unchanged pane content\n', stderr: '', exitCode: 0 }; + }; + + const program = new Command(); + program.exitOverride(); + registerAgentCommand(program, { runner, sleepFn, mosaicHome: home }); + + try { + // timeout=0 means deadline is immediately exceeded after the first poll + await program.parseAsync([ + 'node', + 'mosaic', + 'agent', + 'send', + 'coder0', + '--message', + 'hello', + '--verify', + '--verify-timeout', + '0', + ]); + expect(process.exitCode).toBe(1); + expect(stderrMessages.join('')).toMatch(/no pane change after send/i); + expect(sleepCalls).toBeGreaterThanOrEqual(1); + } finally { + process.exitCode = originalExitCode; + stderrSpy.mockRestore(); + await rm(home, { recursive: true, force: true }); + } + }, 10_000); + + it('send --verify: VERIFY_POLL_INTERVAL_MS and VERIFY_DEFAULT_TIMEOUT_MS are exported constants', () => { + expect(typeof VERIFY_POLL_INTERVAL_MS).toBe('number'); + expect(VERIFY_POLL_INTERVAL_MS).toBe(400); + expect(typeof VERIFY_DEFAULT_TIMEOUT_MS).toBe('number'); + expect(VERIFY_DEFAULT_TIMEOUT_MS).toBe(6_000); + }); }); diff --git a/packages/mosaic/src/commands/fleet.ts b/packages/mosaic/src/commands/fleet.ts index 630a638..4e76035 100644 --- a/packages/mosaic/src/commands/fleet.ts +++ b/packages/mosaic/src/commands/fleet.ts @@ -22,10 +22,23 @@ export interface CommandResult { export type CommandRunner = (command: string, args: string[]) => Promise; +/** + * Injectable sleep helper used by the send --verify polling loop. + * Tests stub this to avoid real delays; production uses the default + * implementation backed by setTimeout. + */ +export type SleepFn = (ms: number) => Promise; + export interface FleetCommandDeps { runner?: CommandRunner; /** Injectable interactive runner for commands needing inherited TTY (e.g., `tmux attach`). */ interactiveRunner?: InteractiveRunner; + /** + * Injectable sleep function for the send --verify polling loop. + * Defaults to a real setTimeout-based sleep. Tests stub this to avoid + * real delays; the default is used in production. + */ + sleepFn?: SleepFn; mosaicHome?: string; frameworkRoot?: string; } @@ -101,6 +114,18 @@ type FleetServiceAction = 'start' | 'stop' | 'restart' | 'status'; const DEFAULT_SOCKET_NAME = 'mosaic-factory'; const DEFAULT_HOLDER_SESSION = '_holder'; const DEFAULT_WORKING_DIRECTORY = '~/src'; + +/** + * Default poll interval (ms) between capture-pane checks in `send --verify`. + * Kept short enough to react quickly while not hammering tmux on busy hosts. + */ +export const VERIFY_POLL_INTERVAL_MS = 400; + +/** + * Default total timeout (ms) for the `send --verify` polling loop. + * Configurable via `--verify-timeout ` on `agent send`. + */ +export const VERIFY_DEFAULT_TIMEOUT_MS = 6_000; const DEFAULT_RUNTIME_RESETS: Record = { claude: { resetCommand: '/clear' }, codex: { resetCommand: '/clear' }, @@ -880,6 +905,7 @@ export function registerFleetAgentCommands( ): void { const runner = deps.runner ?? runCommand; const iRunner = deps.interactiveRunner ?? spawnInteractive; + const sleepFn = deps.sleepFn ?? defaultSleep; agentCommand .command('roster') @@ -933,10 +959,21 @@ export function registerFleetAgentCommands( '--verify', 'Verify message was accepted (not left as a draft); exit non-zero if unverifiable', ) + .option( + '--verify-timeout ', + `Maximum time (ms) to poll for pane change when --verify is set (default: ${VERIFY_DEFAULT_TIMEOUT_MS})`, + String(VERIFY_DEFAULT_TIMEOUT_MS), + ) .action( async ( agent: string, - opts: { message: string; sourceLabel?: string; source?: string; verify?: boolean }, + opts: { + message: string; + sourceLabel?: string; + source?: string; + verify?: boolean; + verifyTimeout?: string; + }, ) => { const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome); getRosterAgent(roster, agent); @@ -945,6 +982,12 @@ export function registerFleetAgentCommands( ); const sourceLabel = opts.sourceLabel ?? opts.source ?? getDefaultOperatorSourceLabel(); if (opts.verify) { + const parsedTimeout = + opts.verifyTimeout !== undefined ? Number.parseInt(opts.verifyTimeout, 10) : Number.NaN; + const timeoutMs = Number.isFinite(parsedTimeout) + ? Math.max(0, parsedTimeout) + : VERIFY_DEFAULT_TIMEOUT_MS; + // Capture BEFORE snapshot so we can detect stale-pane false-successes. // A wedged pane that still shows old non-empty content must not be reported // as 'accepted' — we compare BEFORE vs AFTER to guard against that case. @@ -963,18 +1006,34 @@ export function registerFleetAgentCommands( buildAgentSendCommand(paths, agent, opts.message, roster.tmux.socketName, sourceLabel), ); - // Brief pause to allow the TUI to process the send before capturing the AFTER snapshot. - await new Promise((res) => setTimeout(res, 300)); - const afterResult = await runner( - ...splitCommand(buildAgentVerifyAcceptedCommand(agent, roster.tmux.socketName)), - ); - if (afterResult.exitCode !== 0) { - throw new Error( - `send --verify: could not capture pane output to verify acceptance (tmux exited ${afterResult.exitCode}).`, + // Bounded polling loop: poll capture-pane every VERIFY_POLL_INTERVAL_MS up to + // timeoutMs. Return immediately when the pane shows 'accepted' or 'draft'; + // keep polling while 'unverifiable' (no pane change yet). Fail closed after + // timeout with the existing "no pane change after send" message. + const deadline = Date.now() + timeoutMs; + let verifyResult: SendVerifyResult = 'unverifiable'; + + while (true) { + await sleepFn(VERIFY_POLL_INTERVAL_MS); + const afterResult = await runner( + ...splitCommand(buildAgentVerifyAcceptedCommand(agent, roster.tmux.socketName)), ); + if (afterResult.exitCode !== 0) { + throw new Error( + `send --verify: could not capture pane output to verify acceptance (tmux exited ${afterResult.exitCode}).`, + ); + } + verifyResult = classifySendResult(beforeSnapshot, afterResult.stdout); + // Definitive result — stop polling immediately. + if (verifyResult === 'accepted' || verifyResult === 'draft') { + break; + } + // Still unverifiable — check if we have time left to poll again. + if (Date.now() >= deadline) { + break; + } } - // Classify using BEFORE/AFTER comparison to guard against stale-pane false-successes. - const verifyResult = classifySendResult(beforeSnapshot, afterResult.stdout); + if (verifyResult === 'draft') { process.exitCode = 1; process.stderr.write( @@ -1481,6 +1540,14 @@ function spawnInteractive(command: string, args: string[]): Promise { }); } +/** + * Default SleepFn implementation backed by setTimeout. + * Tests inject a stub to avoid real delays in the send --verify polling loop. + */ +function defaultSleep(ms: number): Promise { + return new Promise((res) => setTimeout(res, ms)); +} + async function canRead(path: string): Promise { try { await access(path, constants.R_OK);