fix(fleet): report idle agents as available, reserve stuck for genuine blocks (#653)
All checks were successful
ci/woodpecker/push/publish Pipeline was successful
ci/woodpecker/push/ci Pipeline was successful

This commit was merged in pull request #653.
This commit is contained in:
2026-06-24 13:58:22 +00:00
parent 1020cfaf9b
commit 937077f6be
3 changed files with 123 additions and 76 deletions

View File

@@ -27,7 +27,6 @@ import {
enableFleetUnits,
FLEET_PROFILES,
HEARTBEAT_IDLE_THRESHOLD_SECONDS,
HEARTBEAT_STUCK_THRESHOLD_SECONDS,
generateAgentEnv,
getDefaultOperatorSourceLabel,
getDefaultTenantAndHost,
@@ -48,7 +47,6 @@ import {
resolvePresetFilename,
RUNTIME_ACCEPTABLE_COMMANDS,
serializeRosterToYaml,
stuckThresholdSeconds,
VERIFY_DEFAULT_TIMEOUT_MS,
VERIFY_POLL_INTERVAL_MS,
type AgentPsRow,
@@ -940,42 +938,33 @@ describe('fleet ps — heartbeat parsing', () => {
describe('fleet ps — readiness thresholds', () => {
const savedIdle = process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD;
const savedStuck = process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD;
afterEach(() => {
if (savedIdle === undefined) delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD;
else process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = savedIdle;
if (savedStuck === undefined) delete process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD;
else process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = savedStuck;
});
it('uses default readiness thresholds when env is unset', () => {
it('uses the default activity threshold when env is unset', () => {
delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD;
delete process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD;
expect(idleThresholdSeconds()).toBe(HEARTBEAT_IDLE_THRESHOLD_SECONDS);
expect(stuckThresholdSeconds()).toBe(HEARTBEAT_STUCK_THRESHOLD_SECONDS);
});
it('honors positive integer readiness thresholds from env', () => {
it('honors a positive integer activity threshold from env', () => {
process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '120';
process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = '480';
expect(idleThresholdSeconds()).toBe(120);
expect(stuckThresholdSeconds()).toBe(480);
});
it('falls back to defaults for invalid readiness thresholds', () => {
it('falls back to the default for invalid activity thresholds', () => {
process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '0';
process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = 'not-a-number';
expect(idleThresholdSeconds()).toBe(HEARTBEAT_IDLE_THRESHOLD_SECONDS);
expect(stuckThresholdSeconds()).toBe(HEARTBEAT_STUCK_THRESHOLD_SECONDS);
});
});
describe('fleet ps — readiness classification', () => {
const thresholds = { idleThresholdSeconds: 300, stuckThresholdSeconds: 900 };
const thresholds = { idleThresholdSeconds: 300 };
it('reports dead when the pane is not alive', () => {
expect(
@@ -1004,7 +993,7 @@ describe('fleet ps — readiness classification', () => {
).toBe('stale');
});
it('reports working when heartbeat status is busy, even past stuck threshold', () => {
it('reports working when heartbeat status is busy, even after the activity threshold', () => {
expect(
classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'busy', idleSeconds: 2_000 },
@@ -1013,7 +1002,7 @@ describe('fleet ps — readiness classification', () => {
).toBe('working');
});
it('reports working when pane idle seconds are unavailable', () => {
it('reports working when pane idle seconds are null', () => {
expect(
classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: null },
@@ -1022,25 +1011,31 @@ describe('fleet ps — readiness classification', () => {
).toBe('working');
});
it('reports stuck at the stuck threshold boundary', () => {
it('reports working when pane idle seconds are undefined', () => {
expect(
classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 900 },
thresholds,
),
).toBe('stuck');
classifyReadiness({ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok' }, thresholds),
).toBe('working');
});
it('reports idle at the idle threshold boundary', () => {
it('reports working when pane idle seconds are non-finite', () => {
expect(
classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: Number.NaN },
thresholds,
),
).toBe('working');
});
it('reports available at the activity threshold boundary', () => {
expect(
classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 300 },
thresholds,
),
).toBe('idle');
).toBe('available');
});
it('reports working below the idle threshold', () => {
it('reports working below the activity threshold', () => {
expect(
classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 299 },
@@ -1049,13 +1044,14 @@ describe('fleet ps — readiness classification', () => {
).toBe('working');
});
it('checks stuck before idle when thresholds are inverted', () => {
expect(
classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 350 },
{ idleThresholdSeconds: 900, stuckThresholdSeconds: 300 },
),
).toBe('stuck');
it('reports very long idle as available, not stuck', () => {
const readiness = classifyReadiness(
{ paneAlive: true, hbHealth: 'healthy', hbStatus: 'ok', idleSeconds: 100_000 },
thresholds,
);
expect(readiness).toBe('available');
expect(readiness).not.toBe('stuck');
});
});
@@ -1554,7 +1550,7 @@ describe('fleet ps — command sequences issued', () => {
});
describe('fleet ps — readiness table output', () => {
it('renders readiness in HB column and flags idle/stuck rows', async () => {
it('renders available in HB column without idle/stuck alarm flags', async () => {
const home = await mkdtemp(join(tmpdir(), 'mosaic-fleet-'));
const rosterPath = join(home, 'fleet', 'roster.yaml');
const runDir = join(home, 'fleet', 'run');
@@ -1565,36 +1561,34 @@ describe('fleet ps — readiness table output', () => {
'version: 1',
'transport: tmux',
'agents:',
' - name: idle-agent',
' - name: working-agent',
' runtime: pi',
' - name: stuck-agent',
' - name: available-agent',
' runtime: pi',
].join('\n'),
);
const nowMs = 1_700_000_000_000;
const idleActivityEpoch = Math.floor((nowMs - 10_000) / 1000);
const stuckActivityEpoch = Math.floor((nowMs - 40_000) / 1000);
const workingActivityEpoch = Math.floor((nowMs - 2_000) / 1000);
const availableActivityEpoch = Math.floor((nowMs - 40_000) / 1000);
const hbTs = new Date(nowMs - 1_000).toISOString();
await writeFile(join(runDir, 'idle-agent.hb'), `ts=${hbTs}\npid=111\nstatus=ok\n`);
await writeFile(join(runDir, 'stuck-agent.hb'), `ts=${hbTs}\npid=222\nstatus=ok\n`);
await writeFile(join(runDir, 'working-agent.hb'), `ts=${hbTs}\npid=111\nstatus=ok\n`);
await writeFile(join(runDir, 'available-agent.hb'), `ts=${hbTs}\npid=222\nstatus=ok\n`);
const savedIdle = process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD;
const savedStuck = process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD;
process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = '5';
process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = '30';
const dateNow = vi.spyOn(Date, 'now').mockReturnValue(nowMs);
const runner: CommandRunner = async (command, args) => {
const full = [command, ...args].join(' ');
if (full.includes('list-sessions')) {
return { stdout: 'idle-agent\nstuck-agent\n', stderr: '', exitCode: 0 };
return { stdout: 'working-agent\navailable-agent\n', stderr: '', exitCode: 0 };
}
if (full.includes('=idle-agent:0.0')) {
return { stdout: `111 pi 0 ${idleActivityEpoch}\n`, stderr: '', exitCode: 0 };
if (full.includes('=working-agent:0.0')) {
return { stdout: `111 pi 0 ${workingActivityEpoch}\n`, stderr: '', exitCode: 0 };
}
if (full.includes('=stuck-agent:0.0')) {
return { stdout: `222 pi 0 ${stuckActivityEpoch}\n`, stderr: '', exitCode: 0 };
if (full.includes('=available-agent:0.0')) {
return { stdout: `222 pi 0 ${availableActivityEpoch}\n`, stderr: '', exitCode: 0 };
}
if (full.includes('systemctl') && full.includes('show')) {
return {
@@ -1623,19 +1617,17 @@ describe('fleet ps — readiness table output', () => {
dateNow.mockRestore();
if (savedIdle === undefined) delete process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD;
else process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD = savedIdle;
if (savedStuck === undefined) delete process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD;
else process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD = savedStuck;
await rm(home, { recursive: true, force: true });
}
const idleLine = lines.find((line) => line.includes('idle-agent'));
const stuckLine = lines.find((line) => line.includes('stuck-agent'));
expect(idleLine).toBeDefined();
expect(idleLine).toContain('1s/idle');
expect(idleLine).toMatch(/\bIDLE\b/);
expect(stuckLine).toBeDefined();
expect(stuckLine).toContain('1s/stuck');
expect(stuckLine).toMatch(/\bSTUCK\b/);
const workingLine = lines.find((line) => line.includes('working-agent'));
const availableLine = lines.find((line) => line.includes('available-agent'));
expect(workingLine).toBeDefined();
expect(workingLine).toContain('1s/working');
expect(availableLine).toBeDefined();
expect(availableLine).toContain('1s/available');
expect(availableLine).not.toMatch(/\bIDLE\b/);
expect(availableLine).not.toMatch(/\bSTUCK\b/);
});
});

View File

@@ -395,7 +395,6 @@ export function buildAgentTailCommand(agentName: string, lines: number, socketNa
export const HEARTBEAT_INTERVAL_MS = 15_000;
export const HEARTBEAT_IDLE_THRESHOLD_SECONDS = 300;
export const HEARTBEAT_STUCK_THRESHOLD_SECONDS = 900;
/**
* Heartbeat interval in ms, honoring MOSAIC_HEARTBEAT_INTERVAL (seconds) so the
@@ -407,20 +406,14 @@ export function heartbeatIntervalMs(): number {
return Number.isFinite(sec) && sec > 0 ? sec * 1000 : HEARTBEAT_INTERVAL_MS;
}
/** Idle threshold in seconds, honoring MOSAIC_HEARTBEAT_IDLE_THRESHOLD. */
/** Activity threshold in seconds, honoring MOSAIC_HEARTBEAT_IDLE_THRESHOLD. */
export function idleThresholdSeconds(): number {
const sec = Number.parseInt(process.env.MOSAIC_HEARTBEAT_IDLE_THRESHOLD ?? '', 10);
return Number.isFinite(sec) && sec > 0 ? sec : HEARTBEAT_IDLE_THRESHOLD_SECONDS;
}
/** Stuck threshold in seconds, honoring MOSAIC_HEARTBEAT_STUCK_THRESHOLD. */
export function stuckThresholdSeconds(): number {
const sec = Number.parseInt(process.env.MOSAIC_HEARTBEAT_STUCK_THRESHOLD ?? '', 10);
return Number.isFinite(sec) && sec > 0 ? sec : HEARTBEAT_STUCK_THRESHOLD_SECONDS;
}
export const HEARTBEAT_HEALTHY_MULTIPLIER = 3;
export type ReadinessState = 'working' | 'idle' | 'stuck' | 'stale' | 'dead' | 'unknown';
export type ReadinessState = 'working' | 'available' | 'stuck' | 'stale' | 'dead' | 'unknown';
export interface ReadinessSignals {
paneAlive: boolean;
@@ -431,7 +424,6 @@ export interface ReadinessSignals {
export interface ReadinessThresholds {
idleThresholdSeconds: number;
stuckThresholdSeconds: number;
}
/**
@@ -456,12 +448,8 @@ export function classifyReadiness(
const idleThreshold = Number.isFinite(thresholds?.idleThresholdSeconds)
? Number(thresholds?.idleThresholdSeconds)
: idleThresholdSeconds();
const stuckThreshold = Number.isFinite(thresholds?.stuckThresholdSeconds)
? Number(thresholds?.stuckThresholdSeconds)
: stuckThresholdSeconds();
if (idleSeconds >= stuckThreshold) return 'stuck';
if (idleSeconds >= idleThreshold) return 'idle';
// Follow-up: stuck pending per-agent assignment awareness: assigned task + idle past threshold => stuck.
if (idleSeconds >= idleThreshold) return 'available';
return 'working';
} catch {
return 'unknown';
@@ -1089,7 +1077,6 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
const rows: AgentPsRow[] = [];
const readinessThresholds = {
idleThresholdSeconds: idleThresholdSeconds(),
stuckThresholdSeconds: stuckThresholdSeconds(),
};
// Build the set of roster agent names for quick lookup when filtering socket sessions.
@@ -1264,8 +1251,6 @@ export function registerFleetCommand(program: Command, deps: FleetCommandDeps =
if (!row.managed) flags.push('UNMANAGED');
if (row.driftFlag) flags.push('DRIFT');
if (row.bootEnableWarning) flags.push('BOOT-ENABLE');
if (row.readiness === 'idle') flags.push('IDLE');
if (row.readiness === 'stuck') flags.push('STUCK');
console.log(
[