Files
mosaic/packages/coord/src/runner.ts
2026-03-07 01:32:10 +00:00

489 lines
13 KiB
TypeScript

import { spawn, spawnSync } from 'node:child_process';
import { promises as fs } from 'node:fs';
import path from 'node:path';
import { loadMission, saveMission } from './mission.js';
import { parseTasksFile, updateTaskStatus } from './tasks-file.js';
import type {
Mission,
MissionMilestone,
MissionSession,
RunTaskOptions,
TaskRun,
} from './types.js';
const SESSION_LOCK_FILE = 'session.lock';
const NEXT_TASK_FILE = 'next-task.json';
interface SessionLockState {
session_id: string;
runtime: 'claude' | 'codex';
pid: number;
started_at: string;
project_path: string;
milestone_id?: string;
}
function orchestratorDirPath(mission: Mission): string {
if (path.isAbsolute(mission.orchestratorDir)) {
return mission.orchestratorDir;
}
return path.join(mission.projectPath, mission.orchestratorDir);
}
function sessionLockPath(mission: Mission): string {
return path.join(orchestratorDirPath(mission), SESSION_LOCK_FILE);
}
function nextTaskCapsulePath(mission: Mission): string {
return path.join(orchestratorDirPath(mission), NEXT_TASK_FILE);
}
function tasksFilePath(mission: Mission): string {
if (path.isAbsolute(mission.tasksFile)) {
return mission.tasksFile;
}
return path.join(mission.projectPath, mission.tasksFile);
}
function buildSessionId(mission: Mission): string {
return `sess-${String(mission.sessions.length + 1).padStart(3, '0')}`;
}
function isPidAlive(pid: number): boolean {
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
function currentMilestone(mission: Mission): MissionMilestone | undefined {
return mission.milestones.find((milestone) => milestone.status === 'in-progress')
?? mission.milestones.find((milestone) => milestone.status === 'pending');
}
async function readTasks(mission: Mission) {
const filePath = tasksFilePath(mission);
try {
const content = await fs.readFile(filePath, 'utf8');
return parseTasksFile(content);
} catch (error) {
if (
typeof error === 'object' &&
error !== null &&
'code' in error &&
(error as { code?: string }).code === 'ENOENT'
) {
return [];
}
throw error;
}
}
function currentBranch(projectPath: string): string | undefined {
const result = spawnSync('git', ['-C', projectPath, 'branch', '--show-current'], {
encoding: 'utf8',
});
if (result.status !== 0) {
return undefined;
}
const branch = result.stdout.trim();
return branch.length > 0 ? branch : undefined;
}
function percentage(done: number, total: number): number {
if (total === 0) {
return 0;
}
return Math.floor((done / total) * 100);
}
function formatDurationSeconds(totalSeconds: number): string {
if (totalSeconds < 60) {
return `${totalSeconds}s`;
}
if (totalSeconds < 3600) {
const minutes = Math.floor(totalSeconds / 60);
const seconds = totalSeconds % 60;
return `${minutes}m ${seconds}s`;
}
const hours = Math.floor(totalSeconds / 3600);
const minutes = Math.floor((totalSeconds % 3600) / 60);
return `${hours}h ${minutes}m`;
}
function buildContinuationPrompt(params: {
mission: Mission;
taskId: string;
runtime: 'claude' | 'codex';
tasksDone: number;
tasksTotal: number;
currentMilestone?: MissionMilestone;
previousSession?: MissionSession;
branch?: string;
}): string {
const {
mission,
taskId,
runtime,
tasksDone,
tasksTotal,
currentMilestone,
previousSession,
branch,
} = params;
const pct = percentage(tasksDone, tasksTotal);
const previousDuration =
previousSession?.durationSeconds !== undefined
? formatDurationSeconds(previousSession.durationSeconds)
: '—';
return [
'## Continuation Mission',
'',
`Continue **${mission.name}** from existing state.`,
'',
'## Setup',
'',
`- **Project:** ${mission.projectPath}`,
`- **State:** ${mission.tasksFile} (${tasksDone}/${tasksTotal} tasks complete)`,
`- **Manifest:** ${mission.manifestFile}`,
`- **Scratchpad:** ${mission.scratchpadFile}`,
'- **Protocol:** ~/.config/mosaic/guides/ORCHESTRATOR.md',
`- **Quality gates:** ${mission.qualityGates ?? '—'}`,
`- **Target runtime:** ${runtime}`,
'',
'## Resume Point',
'',
`- **Current milestone:** ${currentMilestone?.name ?? '—'} (${currentMilestone?.id ?? '—'})`,
`- **Next task:** ${taskId}`,
`- **Progress:** ${tasksDone}/${tasksTotal} (${pct}%)`,
`- **Branch:** ${branch ?? '—'}`,
'',
'## Previous Session Context',
'',
`- **Session:** ${previousSession?.sessionId ?? '—'} (${previousSession?.runtime ?? '—'}, ${previousDuration})`,
`- **Ended:** ${previousSession?.endedReason ?? '—'}`,
`- **Last completed task:** ${previousSession?.lastTaskId ?? '—'}`,
'',
'## Instructions',
'',
'1. Read `~/.config/mosaic/guides/ORCHESTRATOR.md` for full protocol',
`2. Read \`${mission.manifestFile}\` for mission scope and status`,
`3. Read \`${mission.scratchpadFile}\` for session history and decisions`,
`4. Read \`${mission.tasksFile}\` for current task state`,
'5. `git pull --rebase` to sync latest changes',
`6. Launch runtime with \`${runtime} -p\``,
`7. Continue execution from task **${taskId}**`,
'8. Follow Two-Phase Completion Protocol',
`9. You are the SOLE writer of \`${mission.tasksFile}\``,
].join('\n');
}
function resolveLaunchCommand(
runtime: 'claude' | 'codex',
prompt: string,
configuredCommand: string[] | undefined,
): string[] {
if (configuredCommand === undefined || configuredCommand.length === 0) {
return [runtime, '-p', prompt];
}
const withInterpolation = configuredCommand.map((value) =>
value === '{prompt}' ? prompt : value,
);
if (withInterpolation.includes(prompt)) {
return withInterpolation;
}
return [...withInterpolation, prompt];
}
async function writeAtomicJson(filePath: string, payload: unknown): Promise<void> {
const directory = path.dirname(filePath);
await fs.mkdir(directory, { recursive: true });
const tempPath = path.join(
directory,
`.${path.basename(filePath)}.tmp-${process.pid}-${Date.now()}-${Math.random()
.toString(16)
.slice(2)}`,
);
await fs.writeFile(tempPath, `${JSON.stringify(payload, null, 2)}\n`, 'utf8');
await fs.rename(tempPath, filePath);
}
async function readSessionLock(mission: Mission): Promise<SessionLockState | undefined> {
const filePath = sessionLockPath(mission);
try {
const raw = await fs.readFile(filePath, 'utf8');
const data = JSON.parse(raw) as Partial<SessionLockState>;
if (
typeof data.session_id !== 'string' ||
(data.runtime !== 'claude' && data.runtime !== 'codex') ||
typeof data.pid !== 'number' ||
typeof data.started_at !== 'string' ||
typeof data.project_path !== 'string'
) {
return undefined;
}
return {
session_id: data.session_id,
runtime: data.runtime,
pid: data.pid,
started_at: data.started_at,
project_path: data.project_path,
milestone_id: data.milestone_id,
};
} catch (error) {
if (
typeof error === 'object' &&
error !== null &&
'code' in error &&
(error as { code?: string }).code === 'ENOENT'
) {
return undefined;
}
throw error;
}
}
async function writeSessionLock(
mission: Mission,
lock: SessionLockState,
): Promise<void> {
await writeAtomicJson(sessionLockPath(mission), lock);
}
function markSessionCrashed(
mission: Mission,
sessionId: string,
endedAt: string,
): Mission {
const sessions = mission.sessions.map((session) => {
if (session.sessionId !== sessionId) {
return session;
}
if (session.endedAt !== undefined) {
return session;
}
const startedEpoch = Date.parse(session.startedAt);
const endedEpoch = Date.parse(endedAt);
const durationSeconds =
Number.isFinite(startedEpoch) && Number.isFinite(endedEpoch)
? Math.max(0, Math.floor((endedEpoch - startedEpoch) / 1000))
: undefined;
return {
...session,
endedAt,
endedReason: 'crashed' as const,
durationSeconds,
};
});
return {
...mission,
sessions,
};
}
export async function runTask(
mission: Mission,
taskId: string,
options: RunTaskOptions = {},
): Promise<TaskRun> {
const runtime = options.runtime ?? 'claude';
const mode = options.mode ?? 'interactive';
const freshMission = await loadMission(mission.projectPath);
const tasks = await readTasks(freshMission);
const matches = tasks.filter((task) => task.id === taskId);
if (matches.length === 0) {
throw new Error(`Task not found: ${taskId}`);
}
if (matches.length > 1) {
throw new Error(`Duplicate task IDs found: ${taskId}`);
}
const task = matches[0];
if (task.status === 'done' || task.status === 'cancelled') {
throw new Error(`Task ${taskId} cannot be run from status ${task.status}`);
}
const tasksTotal = tasks.length;
const tasksDone = tasks.filter((candidate) => candidate.status === 'done').length;
const selectedMilestone =
freshMission.milestones.find((milestone) => milestone.id === options.milestoneId)
?? freshMission.milestones.find((milestone) => milestone.id === task.milestone)
?? currentMilestone(freshMission);
const continuationPrompt = buildContinuationPrompt({
mission: freshMission,
taskId,
runtime,
tasksDone,
tasksTotal,
currentMilestone: selectedMilestone,
previousSession: freshMission.sessions.at(-1),
branch: currentBranch(freshMission.projectPath),
});
const launchCommand = resolveLaunchCommand(runtime, continuationPrompt, options.command);
const startedAt = new Date().toISOString();
const sessionId = buildSessionId(freshMission);
const lockFile = sessionLockPath(freshMission);
await writeAtomicJson(nextTaskCapsulePath(freshMission), {
generated_at: startedAt,
runtime,
mission_id: freshMission.id,
mission_name: freshMission.name,
project_path: freshMission.projectPath,
quality_gates: freshMission.qualityGates ?? '',
current_milestone: {
id: selectedMilestone?.id ?? '',
name: selectedMilestone?.name ?? '',
},
next_task: taskId,
progress: {
tasks_done: tasksDone,
tasks_total: tasksTotal,
pct: percentage(tasksDone, tasksTotal),
},
current_branch: currentBranch(freshMission.projectPath) ?? '',
});
if (mode === 'print-only') {
return {
missionId: freshMission.id,
taskId,
sessionId,
runtime,
launchCommand,
startedAt,
lockFile,
};
}
await updateTaskStatus(freshMission, taskId, 'in-progress');
await writeSessionLock(freshMission, {
session_id: sessionId,
runtime,
pid: 0,
started_at: startedAt,
project_path: freshMission.projectPath,
milestone_id: selectedMilestone?.id,
});
const child = spawn(launchCommand[0], launchCommand.slice(1), {
cwd: freshMission.projectPath,
env: {
...process.env,
...(options.env ?? {}),
},
stdio: 'inherit',
});
await new Promise<void>((resolve, reject) => {
child.once('spawn', () => {
resolve();
});
child.once('error', (error) => {
reject(error);
});
});
const pid = child.pid;
if (pid === undefined) {
throw new Error('Failed to start task runtime process (pid missing)');
}
await writeSessionLock(freshMission, {
session_id: sessionId,
runtime,
pid,
started_at: startedAt,
project_path: freshMission.projectPath,
milestone_id: selectedMilestone?.id,
});
const updatedMission: Mission = {
...freshMission,
status: 'active',
sessions: [
...freshMission.sessions,
{
sessionId,
runtime,
pid,
startedAt,
milestoneId: selectedMilestone?.id,
lastTaskId: taskId,
},
],
};
await saveMission(updatedMission);
return {
missionId: updatedMission.id,
taskId,
sessionId,
runtime,
launchCommand,
startedAt,
pid,
lockFile,
};
}
export async function resumeTask(
mission: Mission,
taskId: string,
options: Omit<RunTaskOptions, 'milestoneId'> = {},
): Promise<TaskRun> {
const freshMission = await loadMission(mission.projectPath);
const lock = await readSessionLock(freshMission);
if (lock !== undefined && lock.pid > 0 && isPidAlive(lock.pid)) {
throw new Error(
`Session ${lock.session_id} is still running (PID ${lock.pid}).`,
);
}
let nextMissionState = freshMission;
if (lock !== undefined) {
const endedAt = new Date().toISOString();
nextMissionState = markSessionCrashed(freshMission, lock.session_id, endedAt);
await saveMission(nextMissionState);
await fs.rm(sessionLockPath(nextMissionState), { force: true });
}
return runTask(nextMissionState, taskId, options);
}