Files
stack/packages/mosaic/framework/runtime/pi/mosaic-extension.ts
Jason Woltje 56e5c35678 wip(fleet): F3-m2 native Pi heartbeat + sidecar reconciliation
WIP — not for merge yet. Implements the core of the custom Pi harness (R14/R15):
- runtime/pi/mosaic-extension.ts: native heartbeat — writes the same .hb contract
  (ts/pid/status[/model]) on a MOSAIC_HEARTBEAT_INTERVAL timer; turn_start/turn_end
  flip status busy/ok; model self-report via ctx.model; touches a .hb.native
  precedence marker. Also FIXES a latent bug: session_end -> session_shutdown (the
  old handler never fired) + corrects the import scope to @earendil-works/pi-coding-agent.
- start-agent-session.sh: sidecar DEFERS when the .hb.native marker is fresh
  (< 2x interval), else writes the fallback — native precedence, sidecar fallback,
  same contract so fleet ps is agnostic (per Lead's design). Generated script
  validated (bash -n) + deferral/fallback behavior tested.

REMAINING before PR: surface model in `fleet ps` (parseHeartbeat + row); vitest for
the native-HB writer; "proper tool usage" (registerTool) piece; rebase onto #599's
%q sidecar (overlap on the printf line).

Refs #588

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-21 20:23:56 -05:00

346 lines
12 KiB
TypeScript

/**
* mosaic-extension.ts — Pi Extension for Mosaic Framework
*
* Integrates the Mosaic agent framework into Pi sessions launched via `mosaic pi`.
* Handles:
* 1. Session start — run repo hooks, detect active mission, display status
* 2. Session end — run repo hooks, clean up session lock
* 3. Mission context — inject active mission state into conversation
* 4. Memory routing — remind agent to use ~/.config/mosaic/memory/
*/
import type { ExtensionAPI, ExtensionContext } from '@earendil-works/pi-coding-agent';
import {
existsSync,
readFileSync,
writeFileSync,
unlinkSync,
mkdirSync,
renameSync,
} from 'node:fs';
import { join, basename } from 'node:path';
import { homedir } from 'node:os';
import { execSync, spawnSync } from 'node:child_process';
// ---------------------------------------------------------------------------
// Config
// ---------------------------------------------------------------------------
const MOSAIC_HOME = process.env['MOSAIC_HOME'] ?? join(homedir(), '.config', 'mosaic');
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
// Native heartbeat (fleet R14/R15)
// ---------------------------------------------------------------------------
// When this agent runs under the Mosaic fleet (MOSAIC_AGENT_NAME set), the
// extension writes its OWN heartbeat in the same .hb contract `fleet ps` reads
// (ts/pid/status[/model]) and touches a `.hb.native` precedence marker so the
// shell sidecar defers. Native HB knows the real turn state (busy/ok), so it is
// more accurate than the pane-PID-only sidecar fallback.
const HB_AGENT_NAME = process.env['MOSAIC_AGENT_NAME'] ?? '';
const HB_RUN_DIR = process.env['MOSAIC_HEARTBEAT_RUN_DIR'] ?? join(MOSAIC_HOME, 'fleet', 'run');
const HB_INTERVAL_MS = (() => {
const s = Number.parseInt(process.env['MOSAIC_HEARTBEAT_INTERVAL'] ?? '', 10);
return Number.isFinite(s) && s > 0 ? s * 1000 : 15_000;
})();
function nativeHbEnabled(): boolean {
return HB_AGENT_NAME.length > 0;
}
function readModelId(ctx: ExtensionContext): string | null {
const m = ctx.model as unknown as { id?: string; name?: string } | undefined;
return m?.id ?? m?.name ?? null;
}
function writeNativeHeartbeat(status: 'ok' | 'busy', model: string | null): void {
if (!nativeHbEnabled()) return;
try {
mkdirSync(HB_RUN_DIR, { recursive: true });
const hb = join(HB_RUN_DIR, `${HB_AGENT_NAME}.hb`);
const lines = [`ts=${nowIso()}`, `pid=${process.pid}`, `status=${status}`];
if (model) lines.push(`model=${model}`);
const tmp = `${hb}.tmp.${process.pid}`;
writeFileSync(tmp, lines.join('\n') + '\n');
renameSync(tmp, hb); // atomic replace — fleet ps never reads a partial file
// Precedence marker: tells the shell sidecar that native HB is authoritative.
writeFileSync(join(HB_RUN_DIR, `${HB_AGENT_NAME}.hb.native`), nowIso() + '\n');
} catch {
// Best-effort: never let heartbeat I/O disrupt the Pi session.
}
}
function clearNativeMarker(): void {
if (!nativeHbEnabled()) return;
try {
const m = join(HB_RUN_DIR, `${HB_AGENT_NAME}.hb.native`);
if (existsSync(m)) unlinkSync(m); // native stopping — let the sidecar take over
} catch {
/* ignore */
}
}
function safeRead(filePath: string): string | null {
try {
return readFileSync(filePath, 'utf-8');
} catch {
return null;
}
}
function safeJsonRead(filePath: string): Record<string, unknown> | null {
const raw = safeRead(filePath);
if (!raw) return null;
try {
return JSON.parse(raw) as Record<string, unknown>;
} catch {
return null;
}
}
function nowIso(): string {
return new Date().toISOString().replace(/\.\d{3}Z$/, 'Z');
}
// ---------------------------------------------------------------------------
// Mission detection
// ---------------------------------------------------------------------------
interface ActiveMission {
name: string;
id: string;
status: string;
milestonesTotal: number;
milestonesCompleted: number;
}
function detectMission(cwd: string): ActiveMission | null {
const missionFile = join(cwd, '.mosaic', 'orchestrator', 'mission.json');
const data = safeJsonRead(missionFile);
if (!data) return null;
const status = String(data.status ?? 'inactive');
if (status !== 'active' && status !== 'paused') return null;
const milestones = Array.isArray(data.milestones) ? data.milestones : [];
const completed = milestones.filter(
(m: unknown) =>
typeof m === 'object' && m !== null && (m as Record<string, unknown>).status === 'completed',
).length;
return {
name: String(data.name ?? 'unnamed'),
id: String(data.mission_id ?? ''),
status,
milestonesTotal: milestones.length,
milestonesCompleted: completed,
};
}
// ---------------------------------------------------------------------------
// Session lock management
// ---------------------------------------------------------------------------
function sessionLockPath(cwd: string): string {
return join(cwd, '.mosaic', 'orchestrator', 'session.lock');
}
function writeSessionLock(cwd: string): void {
const lockDir = join(cwd, '.mosaic', 'orchestrator');
if (!existsSync(lockDir)) return; // Only write lock if orchestrator dir exists
const lock = {
session_id: `pi-${new Date().toISOString().replace(/[:.]/g, '-')}-${process.pid}`,
runtime: 'pi',
pid: process.pid,
started_at: nowIso(),
project_path: cwd,
milestone_id: '',
};
try {
writeFileSync(sessionLockPath(cwd), JSON.stringify(lock, null, 2) + '\n', 'utf-8');
} catch {
// Non-fatal — orchestrator dir may not be writable
}
}
function cleanSessionLock(cwd: string): void {
try {
const lockFile = sessionLockPath(cwd);
if (existsSync(lockFile)) {
unlinkSync(lockFile);
}
} catch {
// Non-fatal
}
}
// ---------------------------------------------------------------------------
// Repo hooks
// ---------------------------------------------------------------------------
function runRepoHook(cwd: string, hookName: string): void {
const script = join(cwd, 'scripts', 'agent', `${hookName}.sh`);
if (!existsSync(script)) return;
try {
spawnSync('bash', [script], {
cwd,
stdio: 'pipe',
timeout: 30_000,
env: { ...process.env, MOSAIC_RUNTIME: 'pi' },
});
} catch {
// Non-fatal
}
}
// ---------------------------------------------------------------------------
// Build mission summary for notifications
// ---------------------------------------------------------------------------
function buildMissionSummary(cwd: string, mission: ActiveMission): string {
const lines: string[] = [
`Mission: ${mission.name}`,
`Status: ${mission.status} | Milestones: ${mission.milestonesCompleted}/${mission.milestonesTotal}`,
];
// Task counts
const tasksFile = join(cwd, 'docs', 'TASKS.md');
const tasksContent = safeRead(tasksFile);
if (tasksContent) {
const tableRows = tasksContent
.split('\n')
.filter((l) => l.startsWith('|') && !l.includes('---'));
const total = Math.max(0, tableRows.length - 1); // minus header
const done = (tasksContent.match(/\|\s*done\s*\|/gi) ?? []).length;
lines.push(`Tasks: ${done} done / ${total} total`);
}
// Latest scratchpad
try {
const spDir = join(cwd, 'docs', 'scratchpads');
if (existsSync(spDir)) {
const files = execSync(`ls -t "${spDir}"/*.md 2>/dev/null | head -1`, {
encoding: 'utf-8',
timeout: 5000,
}).trim();
if (files) lines.push(`Scratchpad: ${basename(files)}`);
}
} catch {
// Non-fatal
}
lines.push('', 'Read ORCHESTRATOR-PROTOCOL.md + TASKS.md before proceeding.');
return lines.join('\n');
}
// ---------------------------------------------------------------------------
// Extension registration
// ---------------------------------------------------------------------------
export default function register(pi: ExtensionAPI) {
let sessionCwd = process.cwd();
let hbStatus: 'ok' | 'busy' = 'ok';
let hbModel: string | null = null;
let hbTimer: ReturnType<typeof setInterval> | null = null;
// ── Session Start ─────────────────────────────────────────────────────
pi.on('session_start', async (_event, ctx) => {
sessionCwd = process.cwd();
// Run repo session-start hook
runRepoHook(sessionCwd, 'session-start');
// Detect active mission
const mission = detectMission(sessionCwd);
if (mission) {
// Write session lock for orchestrator awareness
writeSessionLock(sessionCwd);
const summary = buildMissionSummary(sessionCwd, mission);
ctx.ui.notify(`🎯 Active Mosaic Mission\n${summary}`, 'info');
} else {
ctx.ui.notify('Mosaic framework loaded', 'info');
}
// Native heartbeat: write immediately, then on an interval. Idle = 'ok';
// turn_start/turn_end flip the status so `fleet ps` reflects real activity.
if (nativeHbEnabled()) {
hbModel = readModelId(ctx);
writeNativeHeartbeat('ok', hbModel);
hbTimer = setInterval(() => writeNativeHeartbeat(hbStatus, hbModel), HB_INTERVAL_MS);
if (typeof hbTimer.unref === 'function') hbTimer.unref();
}
});
// ── Turn lifecycle → accurate busy/ok heartbeat ───────────────────────
pi.on('turn_start', async (_event, ctx) => {
hbStatus = 'busy';
hbModel = readModelId(ctx) ?? hbModel;
writeNativeHeartbeat('busy', hbModel);
});
pi.on('turn_end', async (_event, ctx) => {
hbStatus = 'ok';
hbModel = readModelId(ctx) ?? hbModel;
writeNativeHeartbeat('ok', hbModel);
});
// ── Session Shutdown ──────────────────────────────────────────────────
// (The pi API event is 'session_shutdown'; the prior 'session_end' handler
// never fired — fixed here so repo hooks + lock cleanup actually run.)
pi.on('session_shutdown', async (_event, _ctx) => {
if (hbTimer) {
clearInterval(hbTimer);
hbTimer = null;
}
clearNativeMarker();
// Run repo session-end hook
runRepoHook(sessionCwd, 'session-end');
// Clean up session lock
cleanSessionLock(sessionCwd);
});
// ── Register /mosaic-status command ───────────────────────────────────
pi.registerCommand('mosaic-status', {
description: 'Show Mosaic mission status for the current project',
handler: async (_args, ctx) => {
const mission = detectMission(sessionCwd);
if (!mission) {
ctx.ui.notify('No active Mosaic mission in this project.', 'info');
return;
}
const summary = buildMissionSummary(sessionCwd, mission);
ctx.ui.notify(`🎯 Mission Status\n${summary}`, 'info');
},
});
// ── Register /mosaic-memory command ───────────────────────────────────
pi.registerCommand('mosaic-memory', {
description: 'Show Mosaic memory directory path and contents',
handler: async (_args, ctx) => {
const memDir = join(MOSAIC_HOME, 'memory');
if (!existsSync(memDir)) {
ctx.ui.notify(`Memory directory: ${memDir} (empty)`, 'info');
return;
}
try {
const files = execSync(`ls -la "${memDir}" 2>/dev/null`, {
encoding: 'utf-8',
timeout: 5000,
}).trim();
ctx.ui.notify(`Memory directory: ${memDir}\n${files}`, 'info');
} catch {
ctx.ui.notify(`Memory directory: ${memDir}`, 'info');
}
},
});
}