feat(fleet): Phase-2 observability — fleet ps + watch + send verify #579

Merged
jason.woltje merged 9 commits from feat/fleet-observability into main 2026-06-21 04:23:52 +00:00
3 changed files with 251 additions and 58 deletions
Showing only changes of commit aec560162b - Show all commits

View File

@@ -75,6 +75,19 @@ observability and no safe way to watch a session.
- Situational: run against the live `mosaic-factory` fleet; capture `fleet ps` output,
a kill-and-detect cycle, a read-only `watch`, and a `send --verify` pass/fail pair.
## Known limitations
- **Verify heuristic is best-effort:** `agent send --verify` uses a `>` -prefix draft
heuristic that is specific to pi/claude TUIs. Draft detection for codex and opencode
TUIs is best-effort only; those runtimes may not use the same input-line indicator.
- **Blank capture fails closed:** Full-screen TUIs (claude, codex, opencode, pi) render
blank for `tmux capture-pane`. When the captured output is empty, `send --verify`
returns non-zero with an "unverifiable" message rather than silently succeeding. This
is an intentional fail-closed design (FR-5).
- **`agent watch` requires TTY passthrough:** `tmux attach` is interactive and must be
run with inherited stdio. It cannot be captured through a pipe. Tests inject a fake
`interactiveRunner`; the real implementation spawns with `stdio: 'inherit'`.
## Surfaces & parity (MVP-X1)
CLI lands this phase. TUI surface follows in the `packages/mosaic` wizard; webUI in

View File

@@ -26,6 +26,7 @@ import {
resolveFleetPaths,
type AgentPsRow,
type CommandRunner,
type InteractiveRunner,
} from './fleet.js';
import { registerAgentCommand } from './agent.js';
@@ -1057,7 +1058,7 @@ describe('agent watch', () => {
expect(cmd).toContain('-r');
});
it('issues the read-only attach command through the injected runner', async () => {
it('dispatches the read-only attach command through the interactiveRunner, NOT the capturing runner', async () => {
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
@@ -1067,19 +1068,29 @@ describe('agent watch', () => {
),
);
const calls: string[][] = [];
const capturingCalls: string[][] = [];
const runner: CommandRunner = async (command, args) => {
calls.push([command, ...args]);
capturingCalls.push([command, ...args]);
return { stdout: '', stderr: '', exitCode: 0 };
};
const interactiveCalls: string[][] = [];
const interactiveRunner: InteractiveRunner = async (command, args) => {
interactiveCalls.push([command, ...args]);
return 0;
};
const program = new Command();
program.exitOverride();
registerAgentCommand(program, { runner, mosaicHome: home });
registerAgentCommand(program, { runner, interactiveRunner, mosaicHome: home });
try {
await program.parseAsync(['node', 'mosaic', 'agent', 'watch', 'coder0']);
expect(calls).toEqual([['tmux', '-L', 'mosaic-factory', 'attach', '-r', '-t', '=coder0']]);
// Must go through interactiveRunner, not the capturing runner
expect(capturingCalls).toHaveLength(0);
expect(interactiveCalls).toEqual([
['tmux', '-L', 'mosaic-factory', 'attach', '-r', '-t', '=coder0'],
]);
} finally {
await rm(home, { recursive: true, force: true });
}
@@ -1126,17 +1137,17 @@ describe('agent send --verify', () => {
]);
});
it('isSendAccepted: returns true for normal response output', () => {
expect(isSendAccepted('Some response text\nAnother line\n')).toBe(true);
it('isSendAccepted: returns "accepted" for normal response output', () => {
expect(isSendAccepted('Some response text\nAnother line\n')).toBe('accepted');
});
it('isSendAccepted: returns false when last line starts with "> " (draft pattern)', () => {
expect(isSendAccepted('> my unsent message')).toBe(false);
it('isSendAccepted: returns "draft" when last line starts with "> " (draft pattern)', () => {
expect(isSendAccepted('> my unsent message')).toBe('draft');
});
it('isSendAccepted: returns true for blank pane (treated as submitted)', () => {
expect(isSendAccepted('')).toBe(true);
expect(isSendAccepted(' \n \n')).toBe(true);
it('isSendAccepted: returns "unverifiable" for blank/empty pane (full-screen TUI case)', () => {
expect(isSendAccepted('')).toBe('unverifiable');
expect(isSendAccepted(' \n \n')).toBe('unverifiable');
});
it('issues send then verify capture via injected runner when --verify is passed', async () => {
@@ -1219,4 +1230,142 @@ describe('agent send --verify', () => {
await rm(home, { recursive: true, force: true });
}
});
it('send --verify: blank capture sets process.exitCode=1 (unverifiable, fails closed)', async () => {
const originalExitCode = process.exitCode;
const stderrMessages: string[] = [];
const stderrSpy = vi.spyOn(process.stderr, 'write').mockImplementation((msg) => {
stderrMessages.push(String(msg));
return true;
});
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
join(home, 'fleet', 'roster.yaml'),
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const runner: CommandRunner = async (command, args) => {
const full = [command, ...args].join(' ');
if (full.includes('agent-send.sh')) return { stdout: '', stderr: '', exitCode: 0 };
// capture-pane returns blank (full-screen TUI)
return { stdout: '', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerAgentCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync([
'node',
'mosaic',
'agent',
'send',
'coder0',
'--message',
'hello',
'--verify',
]);
expect(process.exitCode).toBe(1);
expect(stderrMessages.join('')).toMatch(/could not verify delivery.*blank/i);
} finally {
process.exitCode = originalExitCode;
stderrSpy.mockRestore();
await rm(home, { recursive: true, force: true });
}
}, 10_000);
it('send --verify: draft line sets process.exitCode=1 with distinct wording', async () => {
const originalExitCode = process.exitCode;
const stderrMessages: string[] = [];
const stderrSpy = vi.spyOn(process.stderr, 'write').mockImplementation((msg) => {
stderrMessages.push(String(msg));
return true;
});
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
join(home, 'fleet', 'roster.yaml'),
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const runner: CommandRunner = async (command, args) => {
const full = [command, ...args].join(' ');
if (full.includes('agent-send.sh')) return { stdout: '', stderr: '', exitCode: 0 };
// capture-pane returns a draft line ("> unsent message")
return { stdout: '> unsent message\n', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerAgentCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync([
'node',
'mosaic',
'agent',
'send',
'coder0',
'--message',
'hello',
'--verify',
]);
expect(process.exitCode).toBe(1);
expect(stderrMessages.join('')).toMatch(/unsubmitted draft/i);
} finally {
process.exitCode = originalExitCode;
stderrSpy.mockRestore();
await rm(home, { recursive: true, force: true });
}
}, 10_000);
it('send --verify: real response content sets exitCode=0 (accepted)', async () => {
const originalExitCode = process.exitCode;
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
await mkdir(join(home, 'fleet'), { recursive: true });
await writeFile(
join(home, 'fleet', 'roster.yaml'),
['version: 1', 'transport: tmux', 'agents:', ' - name: coder0', ' runtime: codex'].join(
'\n',
),
);
const runner: CommandRunner = async (command, args) => {
const full = [command, ...args].join(' ');
if (full.includes('agent-send.sh')) return { stdout: '', stderr: '', exitCode: 0 };
// capture-pane returns real response content
return { stdout: 'Agent response: task completed.\n', stderr: '', exitCode: 0 };
};
const program = new Command();
program.exitOverride();
registerAgentCommand(program, { runner, mosaicHome: home });
try {
await program.parseAsync([
'node',
'mosaic',
'agent',
'send',
'coder0',
'--message',
'hello',
'--verify',
]);
// exitCode should remain unchanged (not set to 1)
expect(process.exitCode).toBe(originalExitCode);
} finally {
process.exitCode = originalExitCode;
await rm(home, { recursive: true, force: true });
}
}, 10_000);
});

View File

@@ -7,6 +7,13 @@ import { spawn } from 'node:child_process';
import type { Command } from 'commander';
import YAML from 'yaml';
/**
* A function that spawns a command with inherited stdio (TTY passthrough).
* Used for interactive commands like `tmux attach` that need a real terminal.
* Resolves with the process exit code.
*/
export type InteractiveRunner = (command: string, args: string[]) => Promise<number>;
export interface CommandResult {
stdout: string;
stderr: string;
@@ -17,6 +24,8 @@ export type CommandRunner = (command: string, args: string[]) => Promise<Command
export interface FleetCommandDeps {
runner?: CommandRunner;
/** Injectable interactive runner for commands needing inherited TTY (e.g., `tmux attach`). */
interactiveRunner?: InteractiveRunner;
mosaicHome?: string;
frameworkRoot?: string;
}
@@ -290,14 +299,6 @@ export function buildSystemdShowCommand(agentName: string): string[] {
];
}
/**
* Returns the systemd is-active command for an agent unit.
*/
export function buildSystemdIsActiveCommand(agentName: string): string[] {
const unit = `mosaic-agent@${agentName}.service`;
return ['systemctl', '--user', 'is-active', unit];
}
/**
* Returns the tmux list-panes command for an agent pane.
* Format: `#{pane_pid} #{pane_current_command} #{pane_dead} #{pane_activity}`
@@ -489,42 +490,44 @@ export function buildAgentVerifyAcceptedCommand(
}
/**
* Check whether a send was accepted (not left as draft).
* A message is considered NOT accepted (draft) if the captured pane output
* still shows the message text at the bottom prompt without a newline/submission.
* We look for the common TUI pattern: the text appears at the last line but
* hasn't been cleared (which would happen after submission).
*
* Heuristic: if pane capture is non-empty and does NOT contain a leading `>`
* or prompt indicator on the LAST non-empty line, the send is considered accepted.
* This mirrors the send-message.sh draft check: if the last line looks like an
* unsubmitted input line, it's a draft.
*
* Returns true if accepted (submitted), false if still a draft/unverifiable.
* Result of a send-verify check.
* - 'accepted': positive evidence that the message was accepted (response content present).
* - 'draft': last non-empty line matches the draft heuristic (unsubmitted input).
* - 'unverifiable': blank/empty capture — full-screen TUIs (claude, codex, opencode, pi)
* render blank for capture-pane, so we cannot determine acceptance; fails closed per FR-5.
*/
export function isSendAccepted(capturedOutput: string): boolean {
export type SendVerifyResult = 'accepted' | 'draft' | 'unverifiable';
/**
* Check whether a send was accepted (not left as draft).
*
* Returns:
* 'unverifiable' — blank/empty capture (full-screen TUIs render blank; we cannot tell).
* 'draft' — last non-empty line matches the draft heuristic.
* 'accepted' — positive evidence of response content; not blank and not draft.
*
* Draft heuristic: a last non-empty line (after stripping ANSI escapes) that starts
* with "> " is treated as an unsubmitted input line. This pattern is specific to
* pi/claude TUIs and may miss drafts in codex/opencode TUIs — draft detection for
* those runtimes is best-effort only.
*
* FR-5 requires `send --verify` to return non-zero when delivery cannot be verified.
* Blank capture (full-screen TUI case) is the known-unverifiable case; it is treated
* as FAILURE (not success) so the caller fails closed rather than silently succeeding.
*/
export function isSendAccepted(capturedOutput: string): SendVerifyResult {
const lines = capturedOutput.split('\n').filter((l) => l.trim().length > 0);
if (lines.length === 0) return true; // blank pane — treat as submitted
// Blank/empty capture => full-screen TUI rendered blank => unverifiable.
// This is the known-unverifiable case; fail closed (not treated as success).
if (lines.length === 0) return 'unverifiable';
const lastLine = lines[lines.length - 1]!;
// Heuristic: if last non-empty line is a bare user-typed draft line with no
// AI response yet (starts with or contains typical draft markers), flag as draft.
// Typical draft patterns: line ends with the sent message text with no ">" prefix,
// or the line is identical to a prompt that hasn't been cleared.
// We use a conservative heuristic: a line that is ONLY whitespace/prompt characters
// with no response indicator is suspicious, but since we can't reliably detect
// every TUI's draft state, we check for a specific pattern:
// if last line has trailing `█` (cursor block) or is blank after stripping ANSI,
// treat as submitted. Otherwise if we see the exact sent text repeated, it's draft.
// For robustness, we accept as submitted unless we see clear draft evidence.
// Real implementation: check if last meaningful line is just the input prompt.
const stripped = lastLine.replace(/\x1b\[[0-9;]*m/g, '').trim();
// If the pane shows a line that looks like a pending input (ends with cursor or is empty),
// that means the message was submitted and the pane is waiting for response.
// A draft line typically looks like: "> <message text>" without a response.
// For simplicity: if stripped last line starts with "> " — that's a common draft pattern
// Heuristic: if stripped last line starts with "> " — that's the common draft pattern
// in pi/claude TUIs for showing user input before submission.
if (/^>\s/.test(stripped)) return false;
return true;
// NOTE: this heuristic is pi/claude-specific; draft detection for codex/opencode
// TUIs is best-effort only and may miss other unsubmitted-input indicators.
if (/^>\s/.test(stripped)) return 'draft';
return 'accepted';
}
export function registerFleetCommand(program: Command, deps: FleetCommandDeps = {}): Command {
@@ -766,6 +769,7 @@ export function registerFleetAgentCommands(
deps: FleetCommandDeps = {},
): void {
const runner = deps.runner ?? runCommand;
const iRunner = deps.interactiveRunner ?? spawnInteractive;
agentCommand
.command('roster')
@@ -845,11 +849,16 @@ export function registerFleetAgentCommands(
`send --verify: could not capture pane output to verify acceptance (tmux exited ${captureResult.exitCode}).`,
);
}
const accepted = isSendAccepted(captureResult.stdout);
if (!accepted) {
const verifyResult = isSendAccepted(captureResult.stdout);
if (verifyResult === 'draft') {
process.exitCode = 1;
process.stderr.write(
`send --verify: message appears to be left as an unsubmitted draft in agent "${agent}".\n`,
`send --verify: message left as unsubmitted draft in agent "${agent}".\n`,
);
} else if (verifyResult === 'unverifiable') {
process.exitCode = 1;
process.stderr.write(
`send --verify: could not verify delivery (blank/no response captured) for agent "${agent}".\n`,
);
}
}
@@ -862,10 +871,14 @@ export function registerFleetAgentCommands(
.action(async (agent: string) => {
const roster = await loadRosterFromAgentCommand(agentCommand, deps.mosaicHome);
getRosterAgent(roster, agent);
const result = await runner(
...splitCommand(buildAgentWatchCommand(agent, roster.tmux.socketName)),
);
writeCommandOutput(result);
// `tmux attach` is interactive and requires inherited TTY/stdin/stdout.
// Route through the interactiveRunner (stdio: 'inherit') instead of the
// capturing runner, which would hang or fail for full-screen TUI commands.
const [bin, args] = splitCommand(buildAgentWatchCommand(agent, roster.tmux.socketName));
const exitCode = await iRunner(bin, args);
if (exitCode !== 0) {
process.exitCode = exitCode;
}
});
agentCommand
@@ -1300,6 +1313,24 @@ function resolveFrameworkRoot(): string {
return resolve(dirname(currentFile), '..', '..', 'framework');
}
/**
* Default InteractiveRunner implementation: spawns the command with inherited
* stdio so the terminal is passed through to the child process. This is required
* for commands like `tmux attach` that are full-screen interactive and cannot be
* captured through a pipe.
*/
function spawnInteractive(command: string, args: string[]): Promise<number> {
return new Promise((resolvePromise) => {
const child = spawn(command, args, { stdio: 'inherit' });
child.on('error', () => {
resolvePromise(127);
});
child.on('close', (code) => {
resolvePromise(code ?? 1);
});
});
}
async function canRead(path: string): Promise<boolean> {
try {
await access(path, constants.R_OK);