import { spawn } from 'node:child_process'; import { randomUUID } from 'node:crypto'; import { access, mkdir, open, readFile, rename, rm, writeFile } from 'node:fs/promises'; import * as os from 'node:os'; import * as path from 'node:path'; import type { AcpRuntime, AcpRuntimeCapabilities, AcpRuntimeDoctorReport, AcpRuntimeEnsureInput, AcpRuntimeEvent, AcpRuntimeHandle, AcpRuntimeStatus, AcpRuntimeTurnInput, } from './acp-runtime-types.js'; export interface MacpRuntimeConfig { defaultModel: string; systemPrompt: string; timeoutMs: number; stateDir: string; repoRoot?: string; orchDir?: string; defaultDispatch?: string; defaultRuntime?: string; logger?: { info?: (message: string) => void; warn?: (message: string) => void; }; } type HandleState = { name: string; agent: string; runtime: string; cwd: string; model: string; systemPrompt: string; timeoutMs: number; }; type OrchestratorTask = { id: string; title: string; status: 'pending'; dispatch: string; runtime: string; worktree: string; brief_path: string; _brief_temp_path: string; timeout_seconds: number; metadata: Record; }; type QueueFile = { tasks: OrchestratorTask[]; }; type TaskGateResult = { command?: string; exit_code?: number; type?: string; }; type TaskResult = { task_id: string; status: string; summary?: string; error?: unknown; escalation_reason?: unknown; branch?: string | null; pr?: string | null; files_changed?: string[]; gate_results?: TaskGateResult[]; metadata?: Record; }; const MACP_CAPABILITIES: AcpRuntimeCapabilities = { controls: [], }; const DEFAULT_REPO_ROOT = '~/src/mosaic-stack'; const ORCHESTRATOR_RUN_PATH = '~/.config/mosaic/bin/mosaic-orchestrator-run'; const PI_RUNNER_PATH = path.join( os.homedir(), 'src', 'mosaic-stack', 'tools', 'macp', 'dispatcher', 'pi_runner.ts', ); function expandHome(rawPath: string): string { if (rawPath === '~') { return os.homedir(); } if (rawPath.startsWith('~/')) { return path.join(os.homedir(), rawPath.slice(2)); } return rawPath; } function resolveRepoRoot(config: MacpRuntimeConfig): string { return path.resolve(expandHome(config.repoRoot?.trim() || DEFAULT_REPO_ROOT)); } function resolveOrchDir(config: MacpRuntimeConfig): string { if (config.orchDir?.trim()) { return path.resolve(expandHome(config.orchDir)); } return path.join(resolveRepoRoot(config), '.mosaic', 'orchestrator'); } function resolveOrchestratorRunPath(): string { return path.resolve(expandHome(ORCHESTRATOR_RUN_PATH)); } function shellQuote(value: string): string { return `'${value.replace(/'/g, `'\\''`)}'`; } function sanitizeSegment(value: string): string { return ( value .trim() .toLowerCase() .replace(/[^a-z0-9]+/g, '-') .replace(/^-+|-+$/g, '') || 'task' ); } function encodeHandleState(state: HandleState): string { return JSON.stringify(state); } function decodeHandleState(handle: AcpRuntimeHandle): HandleState { const parsed = JSON.parse(handle.runtimeSessionName) as Partial; if ( typeof parsed.name !== 'string' || typeof parsed.agent !== 'string' || typeof parsed.runtime !== 'string' || typeof parsed.cwd !== 'string' || typeof parsed.model !== 'string' || typeof parsed.systemPrompt !== 'string' || typeof parsed.timeoutMs !== 'number' ) { throw new Error('Invalid MACP runtime handle state.'); } return parsed as HandleState; } function toSessionName(input: AcpRuntimeEnsureInput): string { return `${input.agent}-${input.sessionKey}`; } function createTaskId(sessionKey: string, requestId: string): string { return `${sanitizeSegment(sessionKey)}-${sanitizeSegment(requestId)}-${randomUUID().slice(0, 8)}`; } function createTaskTitle(prompt: string): string { const firstLine = prompt .split(/\r?\n/) .map((line) => line.trim()) .find((line) => line.length > 0); return (firstLine || 'MACP OpenClaw task').slice(0, 120); } function buildWorktreePath(repoRoot: string, taskId: string): string { const repoName = path.basename(repoRoot); return path.join(path.dirname(repoRoot), `${repoName}-worktrees`, `macp-oc-${taskId}`); } function nowIso(): string { return new Date().toISOString(); } function chunkText(text: string, chunkSize = 4000): string[] { const normalized = text.trim(); if (!normalized) { return []; } const chunks: string[] = []; for (let index = 0; index < normalized.length; index += chunkSize) { chunks.push(normalized.slice(index, index + chunkSize)); } return chunks; } function isRecord(value: unknown): value is Record { return typeof value === 'object' && value !== null; } function toMessage(value: unknown): string { if (typeof value === 'string') { return value; } if (value instanceof Error) { return value.message; } if (value === null || value === undefined) { return ''; } return JSON.stringify(value, null, 2); } function abortError(): Error { const error = new Error('MACP turn aborted.'); error.name = 'AbortError'; return error; } async function waitFor(ms: number, signal?: AbortSignal): Promise { if (signal?.aborted) { throw abortError(); } await new Promise((resolve, reject) => { const onAbort = () => { clearTimeout(timeout); signal?.removeEventListener('abort', onAbort); reject(abortError()); }; const timeout = setTimeout(() => { signal?.removeEventListener('abort', onAbort); resolve(); }, ms); signal?.addEventListener('abort', onAbort, { once: true }); }); } async function writeJsonAtomic(filePath: string, value: unknown): Promise { const tempPath = `${filePath}.${process.pid}.${Date.now()}.tmp`; await writeFile(tempPath, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'); await rename(tempPath, filePath); } async function loadQueue(tasksPath: string): Promise { try { const raw = JSON.parse(await readFile(tasksPath, 'utf-8')) as unknown; if (Array.isArray(raw)) { return { tasks: raw as OrchestratorTask[] }; } if (isRecord(raw) && Array.isArray(raw.tasks)) { return { tasks: raw.tasks as OrchestratorTask[] }; } throw new Error('tasks.json must contain a tasks array.'); } catch (error) { const code = (error as NodeJS.ErrnoException).code; if (code === 'ENOENT') { return { tasks: [] }; } throw error; } } async function withFileLock( lockPath: string, timeoutMs: number, action: () => Promise, ): Promise { const deadline = Date.now() + Math.max(5_000, Math.min(timeoutMs, 30_000)); while (true) { try { const handle = await open(lockPath, 'wx'); try { return await action(); } finally { await handle.close(); await rm(lockPath, { force: true }); } } catch (error) { const code = (error as NodeJS.ErrnoException).code; if (code !== 'EEXIST') { throw error; } if (Date.now() >= deadline) { throw new Error(`Timed out waiting for orchestrator queue lock: ${lockPath}`); } await waitFor(200); } } } async function appendTaskToQueue( task: OrchestratorTask, orchDir: string, timeoutMs: number, ): Promise { const tasksPath = path.join(orchDir, 'tasks.json'); const lockPath = `${tasksPath}.lock`; await mkdir(orchDir, { recursive: true }); await withFileLock(lockPath, timeoutMs, async () => { const queue = await loadQueue(tasksPath); queue.tasks.push(task); await writeJsonAtomic(tasksPath, queue); }); } async function readOrchestratorConfig(orchDir: string): Promise> { const configPath = path.join(orchDir, 'config.json'); const config = JSON.parse(await readFile(configPath, 'utf-8')) as unknown; if (!isRecord(config)) { throw new Error(`Invalid orchestrator config: ${configPath}`); } return config; } async function ensureOrchestratorReady(orchDir: string): Promise { const config = await readOrchestratorConfig(orchDir); if (config.enabled !== true) { throw new Error(`MACP orchestrator is disabled in ${path.join(orchDir, 'config.json')}.`); } } function triggerController(repoRoot: string): void { const child = spawn( 'bash', ['-lc', `cd ${shellQuote(repoRoot)} && ${shellQuote(resolveOrchestratorRunPath())} --once`], { detached: true, stdio: 'ignore', }, ); child.unref(); } async function pollForResult( resultPath: string, timeoutMs: number, signal?: AbortSignal, ): Promise { const deadline = Date.now() + Math.max(timeoutMs, 2_000); while (Date.now() <= deadline) { if (signal?.aborted) { throw abortError(); } try { const raw = JSON.parse(await readFile(resultPath, 'utf-8')) as unknown; if (!isRecord(raw) || typeof raw.task_id !== 'string' || typeof raw.status !== 'string') { throw new Error(`Invalid MACP result payload: ${resultPath}`); } return raw as TaskResult; } catch (error) { const code = (error as NodeJS.ErrnoException).code; if (code !== 'ENOENT' && !(error instanceof SyntaxError)) { throw error; } } await waitFor(2_000, signal); } throw new Error(`Timed out waiting for MACP result: ${resultPath}`); } async function resolveResultOutput(result: TaskResult, orchDir: string): Promise { const metadata = isRecord(result.metadata) ? result.metadata : {}; const outputCandidates = [metadata.result_output_path, metadata.output_path] .filter((value): value is string => typeof value === 'string' && value.trim().length > 0) .map((value) => (path.isAbsolute(value) ? value : path.resolve(orchDir, value))); for (const candidate of outputCandidates) { try { return (await readFile(candidate, 'utf-8')).trim(); } catch { // Fall back to formatted result details below. } } const lines: string[] = []; if (result.summary) { lines.push(result.summary); } if (result.error) { lines.push(`Error: ${toMessage(result.error)}`); } if (result.escalation_reason) { lines.push(`Escalation: ${toMessage(result.escalation_reason)}`); } if (result.branch) { lines.push(`Branch: ${result.branch}`); } if (result.pr) { lines.push(`PR: ${result.pr}`); } if (Array.isArray(result.files_changed) && result.files_changed.length > 0) { lines.push(`Files changed:\n${result.files_changed.map((file) => `- ${file}`).join('\n')}`); } if (Array.isArray(result.gate_results) && result.gate_results.length > 0) { lines.push( `Quality gates:\n${result.gate_results .map((gate) => `- [${gate.exit_code ?? 0}] ${gate.command ?? 'unknown command'}`) .join('\n')}`, ); } return lines.join('\n\n').trim() || JSON.stringify(result, null, 2); } export class MacpRuntime implements AcpRuntime { constructor(private readonly config: MacpRuntimeConfig) {} async ensureSession(input: AcpRuntimeEnsureInput): Promise { if (input.mode !== 'oneshot') { throw new Error(`macp runtime only supports oneshot sessions; received "${input.mode}".`); } const cwd = path.resolve(input.cwd ?? process.cwd()); const state: HandleState = { name: toSessionName(input), agent: input.agent, runtime: input.agent || this.config.defaultRuntime || 'codex', cwd, model: this.config.defaultModel, systemPrompt: this.config.systemPrompt, timeoutMs: this.config.timeoutMs, }; return { sessionKey: input.sessionKey, backend: 'macp', runtimeSessionName: encodeHandleState(state), cwd, backendSessionId: state.name, agentSessionId: state.name, }; } async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable { const state = decodeHandleState(input.handle); const repoRoot = resolveRepoRoot(this.config); const orchDir = resolveOrchDir(this.config); const taskId = createTaskId(input.handle.sessionKey, input.requestId); const briefDir = path.join(os.homedir(), '.mosaic', 'macp-oc'); const briefPath = path.join(briefDir, `${state.name}-${input.requestId}.md`); const resultPath = path.join(orchDir, 'results', `${taskId}.json`); try { await access(resolveOrchestratorRunPath()); await ensureOrchestratorReady(orchDir); await mkdir(briefDir, { recursive: true }); await mkdir(path.dirname(resultPath), { recursive: true }); await writeFile(briefPath, `${input.text.trimEnd()}\n`, 'utf-8'); const task: OrchestratorTask = { id: taskId, title: createTaskTitle(input.text), status: 'pending', dispatch: this.config.defaultDispatch || 'yolo', runtime: state.runtime || this.config.defaultRuntime || 'codex', worktree: buildWorktreePath(repoRoot, taskId), brief_path: briefPath, _brief_temp_path: briefPath, timeout_seconds: Math.max(1, Math.ceil(state.timeoutMs / 1_000)), metadata: { source: 'openclaw-macp-plugin', created_at: nowIso(), session_key: input.handle.sessionKey, request_id: input.requestId, agent_id: state.agent, cwd: state.cwd, }, }; this.config.logger?.info?.( `Queueing MACP orchestrator task ${taskId} (${task.runtime}/${task.dispatch}).`, ); yield { type: 'status', text: `Queued MACP task ${taskId}.`, tag: 'session_info_update', }; await appendTaskToQueue(task, orchDir, state.timeoutMs); triggerController(repoRoot); const result = await pollForResult(resultPath, state.timeoutMs, input.signal); const output = await resolveResultOutput(result, orchDir); for (const chunk of chunkText(output)) { yield { type: 'text_delta', text: chunk, stream: 'output', tag: 'agent_message_chunk', }; } yield { type: 'done', stopReason: result.status, }; } catch (error) { yield { type: 'error', message: error instanceof Error ? error.message : String(error), }; } finally { await rm(briefPath, { force: true }).catch(() => undefined); } } getCapabilities(): AcpRuntimeCapabilities { return MACP_CAPABILITIES; } async getStatus(input: { handle: AcpRuntimeHandle }): Promise { const state = decodeHandleState(input.handle); return { summary: 'macp controller oneshot runtime ready', backendSessionId: state.name, agentSessionId: state.name, details: { mode: 'oneshot', agent: state.agent, runtime: state.runtime, cwd: state.cwd, repoRoot: resolveRepoRoot(this.config), orchDir: resolveOrchDir(this.config), }, }; } async doctor(): Promise { try { const repoRoot = resolveRepoRoot(this.config); const orchDir = resolveOrchDir(this.config); const orchestratorRunPath = resolveOrchestratorRunPath(); await access(orchestratorRunPath); await access(repoRoot); await access(orchDir); await access(PI_RUNNER_PATH).catch(() => undefined); const orchestratorConfig = await readOrchestratorConfig(orchDir); if (orchestratorConfig.enabled !== true) { return { ok: false, code: 'MACP_ORCH_DISABLED', message: 'MACP orchestrator is disabled for the configured repo.', details: [path.join(orchDir, 'config.json')], }; } return { ok: true, message: 'MACP runtime is ready.', details: [orchestratorRunPath, repoRoot, orchDir], }; } catch (error) { return { ok: false, code: 'MACP_ORCH_MISSING', message: error instanceof Error ? error.message : String(error), installCommand: 'pnpm install --frozen-lockfile', }; } } async cancel(_input: { handle: AcpRuntimeHandle; reason?: string }): Promise { this.config.logger?.info?.('macp runtime cancel requested'); } async close(_input: { handle: AcpRuntimeHandle; reason: string }): Promise { this.config.logger?.info?.('macp runtime close requested'); } }