From a9340adad727c6a25b2cf71569270d7eccd015cd Mon Sep 17 00:00:00 2001 From: Jarvis Date: Mon, 30 Mar 2026 10:33:32 -0500 Subject: [PATCH] fix(oc-plugin): replace Pi-direct with MACP controller bridge in runTurn --- docs/scratchpads/macp-oc-bridge-20260330.md | 16 + plugins/macp/README.md | 25 +- plugins/macp/openclaw.plugin.json | 24 +- plugins/macp/src/index.ts | 35 +- plugins/macp/src/macp-runtime.ts | 559 +++++++++++++++----- 5 files changed, 510 insertions(+), 149 deletions(-) create mode 100644 docs/scratchpads/macp-oc-bridge-20260330.md diff --git a/docs/scratchpads/macp-oc-bridge-20260330.md b/docs/scratchpads/macp-oc-bridge-20260330.md new file mode 100644 index 0000000..9b56c28 --- /dev/null +++ b/docs/scratchpads/macp-oc-bridge-20260330.md @@ -0,0 +1,16 @@ +# Scratchpad: MACP OC Bridge (2026-03-30) + +- Objective: Replace the OpenClaw MACP plugin's Pi-direct `runTurn` with the MACP controller queue bridge. +- Scope: `plugins/macp/src/macp-runtime.ts`, `plugins/macp/src/index.ts`, `plugins/macp/openclaw.plugin.json`, `plugins/macp/README.md`. +- Plan: + 1. Read controller/dispatcher/plugin docs and confirm queue/result contract. + 2. Queue tasks through `.mosaic/orchestrator/tasks.json` using a brief file and controller trigger. + 3. Poll result JSON, stream output back to ACP, then validate with typecheck/format checks. +- Risks: + - The repo orchestrator must be enabled in `.mosaic/orchestrator/config.json`. + - Result JSON does not always embed worker output, so the runtime falls back to metadata-linked output files or a formatted result summary. +- Verification: + - `npx tsc --noEmit --target es2022 --module nodenext --moduleResolution nodenext --skipLibCheck plugins/macp/src/macp-runtime.ts plugins/macp/src/index.ts` + - `pnpm prettier --write "plugins/macp/**/*.{ts,json,md}"` + - `pnpm format:check` + - `npx tsc --noEmit -p plugins/macp/tsconfig.json` still fails in this branch because `plugins/macp/tsconfig.json` extends a missing `packages/config/typescript/library.json` file and also pulls in pre-existing external OpenClaw type noise. diff --git a/plugins/macp/README.md b/plugins/macp/README.md index 2dd4669..0911eb7 100644 --- a/plugins/macp/README.md +++ b/plugins/macp/README.md @@ -2,14 +2,15 @@ This plugin registers a new OpenClaw ACP runtime backend named `macp`. -When OpenClaw calls `sessions_spawn(runtime: "macp", agentId: "pi")`, the plugin runs a Pi agent turn in-process using the MACP Pi runner logic and streams Pi output back as ACP runtime events. +When OpenClaw calls `sessions_spawn(runtime: "macp")`, the plugin now writes the prompt to a brief file, queues a MACP controller task in `.mosaic/orchestrator/tasks.json`, triggers `mosaic-orchestrator-run --once`, polls `.mosaic/orchestrator/results/.json`, and streams the resulting output back as ACP runtime events. ## Current behavior -- Supports `agentId: "pi"` only - Supports ACP `mode: "oneshot"` only +- Accepts any `agentId` and maps it to the queued MACP task `runtime` +- Defaults queued tasks to `dispatch: "yolo"` and `runtime: "codex"` when no override is provided - Rejects persistent ACP sessions -- Reuses the Pi default tools from `tools/macp/dispatcher/pi_runner.ts` +- Keeps `src/pi-bridge.ts` for future `dispatch: "pi"` support ## Install in OpenClaw @@ -32,15 +33,29 @@ Add the plugin entry to your OpenClaw config: "defaultModel": "openai/gpt-5-mini", "systemPrompt": "You are Pi running via MACP.", "timeoutMs": 300000, - "logDir": "~/.openclaw/state/macp" + "logDir": "~/.openclaw/state/macp", + "repoRoot": "~/src/mosaic-mono-v1", + "orchDir": "~/src/mosaic-mono-v1/.mosaic/orchestrator", + "defaultDispatch": "yolo", + "defaultRuntime": "codex" } } ] } ``` +## Runtime flow + +1. OpenClaw ensures a oneshot `macp` session and preserves the requested `agentId`. +2. `runTurn` writes the turn prompt to `~/.mosaic/macp-oc/-.md`. +3. The plugin appends a pending MACP task to the configured orchestrator queue. +4. The plugin triggers `~/.config/mosaic/bin/mosaic-orchestrator-run --once` in the configured repo root. +5. The plugin polls for `.mosaic/orchestrator/results/.json` and streams the result back to OpenClaw. + ## Verification ```bash -pnpm exec tsc --noEmit -p plugins/macp/tsconfig.json +pnpm --filter @mosaic/oc-macp-plugin typecheck || npx tsc --noEmit -p plugins/macp/tsconfig.json +pnpm prettier --write "plugins/macp/**/*.{ts,json,md}" +pnpm format:check ``` diff --git a/plugins/macp/openclaw.plugin.json b/plugins/macp/openclaw.plugin.json index 30e9379..c95fb55 100644 --- a/plugins/macp/openclaw.plugin.json +++ b/plugins/macp/openclaw.plugin.json @@ -1,18 +1,18 @@ { "id": "macp", "name": "MACP Runtime", - "description": "Registers the macp ACP runtime backend and dispatches Pi turns through the MACP Pi runner.", + "description": "Registers the macp ACP runtime backend and routes turns through the MACP controller queue.", "configSchema": { "type": "object", "additionalProperties": false, "properties": { "defaultModel": { "type": "string", - "description": "Default Pi model in provider/model format. Defaults to openai/gpt-5-mini." + "description": "Default Pi model in provider/model format. Retained for Pi bridge compatibility. Defaults to openai/gpt-5-mini." }, "systemPrompt": { "type": "string", - "description": "Optional system prompt prepended to each Pi turn." + "description": "Optional system prompt retained for Pi bridge compatibility." }, "timeoutMs": { "type": "number", @@ -21,7 +21,23 @@ }, "logDir": { "type": "string", - "description": "Directory for Pi runtime logs. Defaults to the plugin state dir." + "description": "Directory for plugin state/log files. Defaults to the plugin state dir." + }, + "repoRoot": { + "type": "string", + "description": "Repository root containing .mosaic/orchestrator. Defaults to ~/src/mosaic-mono-v1." + }, + "orchDir": { + "type": "string", + "description": "Override for the orchestrator directory. Defaults to /.mosaic/orchestrator." + }, + "defaultDispatch": { + "type": "string", + "description": "Dispatch type written into queued MACP tasks. Defaults to yolo." + }, + "defaultRuntime": { + "type": "string", + "description": "Fallback runtime when agentId is unavailable. Defaults to codex." } } } diff --git a/plugins/macp/src/index.ts b/plugins/macp/src/index.ts index 0d34d4d..650464e 100644 --- a/plugins/macp/src/index.ts +++ b/plugins/macp/src/index.ts @@ -1,4 +1,5 @@ -import path from 'node:path'; +import * as os from 'node:os'; +import * as path from 'node:path'; import { registerAcpRuntimeBackend, @@ -17,10 +18,27 @@ type PluginConfig = { systemPrompt?: string; timeoutMs?: number; logDir?: string; + repoRoot?: string; + orchDir?: string; + defaultDispatch?: string; + defaultRuntime?: string; }; +function expandHome(rawPath: string): string { + if (rawPath === '~') { + return os.homedir(); + } + if (rawPath.startsWith('~/')) { + return path.join(os.homedir(), rawPath.slice(2)); + } + return rawPath; +} + function resolveConfig(pluginConfig?: Record, stateDir?: string) { const config = (pluginConfig ?? {}) as PluginConfig; + const repoRoot = config.repoRoot?.trim() + ? path.resolve(expandHome(config.repoRoot)) + : path.resolve('/home/jarvis/src/mosaic-mono-v1'); return { defaultModel: config.defaultModel?.trim() || 'openai/gpt-5-mini', systemPrompt: config.systemPrompt ?? '', @@ -30,7 +48,15 @@ function resolveConfig(pluginConfig?: Record, stateDir?: string config.timeoutMs > 0 ? config.timeoutMs : 300_000, - stateDir: config.logDir?.trim() ? path.resolve(config.logDir) : (stateDir ?? process.cwd()), + stateDir: config.logDir?.trim() + ? path.resolve(expandHome(config.logDir)) + : (stateDir ?? process.cwd()), + repoRoot, + orchDir: config.orchDir?.trim() + ? path.resolve(expandHome(config.orchDir)) + : path.join(repoRoot, '.mosaic', 'orchestrator'), + defaultDispatch: config.defaultDispatch?.trim() || 'yolo', + defaultRuntime: config.defaultRuntime?.trim() || 'codex', }; } @@ -51,7 +77,7 @@ function createMacpRuntimeService(pluginConfig?: Record): OpenC healthy: () => runtime !== null, }); ctx.logger.info( - `macp runtime backend registered (defaultModel: ${resolved.defaultModel}, timeoutMs: ${resolved.timeoutMs})`, + `macp runtime backend registered (defaultRuntime: ${resolved.defaultRuntime}, defaultDispatch: ${resolved.defaultDispatch}, timeoutMs: ${resolved.timeoutMs})`, ); }, async stop(_ctx: OpenClawPluginServiceContext) { @@ -64,7 +90,8 @@ function createMacpRuntimeService(pluginConfig?: Record): OpenC const plugin = { id: 'macp', name: 'MACP Runtime', - description: 'ACP runtime backend that dispatches Pi oneshot sessions through MACP.', + description: + 'ACP runtime backend that dispatches OpenClaw oneshot sessions through the MACP controller queue.', register(api: OpenClawPluginApi) { api.registerService(createMacpRuntimeService(api.pluginConfig)); }, diff --git a/plugins/macp/src/macp-runtime.ts b/plugins/macp/src/macp-runtime.ts index fc28525..3c10035 100644 --- a/plugins/macp/src/macp-runtime.ts +++ b/plugins/macp/src/macp-runtime.ts @@ -1,5 +1,8 @@ -import path from 'node:path'; -import { access } from 'node:fs/promises'; +import { spawn } from 'node:child_process'; +import { randomUUID } from 'node:crypto'; +import { access, mkdir, open, readFile, rename, rm, writeFile } from 'node:fs/promises'; +import * as os from 'node:os'; +import * as path from 'node:path'; import type { AcpRuntime, @@ -12,13 +15,15 @@ import type { AcpRuntimeTurnInput, } from '/home/jarvis/.npm-global/lib/node_modules/openclaw/dist/plugin-sdk/acp-runtime.js'; -import { formatAssistantEvent, runPiTurn } from './pi-bridge.js'; - export interface MacpRuntimeConfig { defaultModel: string; systemPrompt: string; timeoutMs: number; stateDir: string; + repoRoot?: string; + orchDir?: string; + defaultDispatch?: string; + defaultRuntime?: string; logger?: { info?: (message: string) => void; warn?: (message: string) => void; @@ -28,18 +33,96 @@ export interface MacpRuntimeConfig { type HandleState = { name: string; agent: string; + runtime: string; cwd: string; model: string; systemPrompt: string; timeoutMs: number; }; +type OrchestratorTask = { + id: string; + title: string; + status: 'pending'; + dispatch: string; + runtime: string; + worktree: string; + brief_path: string; + _brief_temp_path: string; + timeout_seconds: number; + metadata: Record; +}; + +type QueueFile = { + tasks: OrchestratorTask[]; +}; + +type TaskGateResult = { + command?: string; + exit_code?: number; + type?: string; +}; + +type TaskResult = { + task_id: string; + status: string; + summary?: string; + error?: unknown; + escalation_reason?: unknown; + branch?: string | null; + pr?: string | null; + files_changed?: string[]; + gate_results?: TaskGateResult[]; + metadata?: Record; +}; + const MACP_CAPABILITIES: AcpRuntimeCapabilities = { controls: [], }; +const DEFAULT_REPO_ROOT = '~/src/mosaic-mono-v1'; +const ORCHESTRATOR_RUN_PATH = '~/.config/mosaic/bin/mosaic-orchestrator-run'; const PI_RUNNER_PATH = '/home/jarvis/src/mosaic-mono-v1/tools/macp/dispatcher/pi_runner.ts'; +function expandHome(rawPath: string): string { + if (rawPath === '~') { + return os.homedir(); + } + if (rawPath.startsWith('~/')) { + return path.join(os.homedir(), rawPath.slice(2)); + } + return rawPath; +} + +function resolveRepoRoot(config: MacpRuntimeConfig): string { + return path.resolve(expandHome(config.repoRoot?.trim() || DEFAULT_REPO_ROOT)); +} + +function resolveOrchDir(config: MacpRuntimeConfig): string { + if (config.orchDir?.trim()) { + return path.resolve(expandHome(config.orchDir)); + } + return path.join(resolveRepoRoot(config), '.mosaic', 'orchestrator'); +} + +function resolveOrchestratorRunPath(): string { + return path.resolve(expandHome(ORCHESTRATOR_RUN_PATH)); +} + +function shellQuote(value: string): string { + return `'${value.replace(/'/g, `'\\''`)}'`; +} + +function sanitizeSegment(value: string): string { + return ( + value + .trim() + .toLowerCase() + .replace(/[^a-z0-9]+/g, '-') + .replace(/^-+|-+$/g, '') || 'task' + ); +} + function encodeHandleState(state: HandleState): string { return JSON.stringify(state); } @@ -49,6 +132,7 @@ function decodeHandleState(handle: AcpRuntimeHandle): HandleState { if ( typeof parsed.name !== 'string' || typeof parsed.agent !== 'string' || + typeof parsed.runtime !== 'string' || typeof parsed.cwd !== 'string' || typeof parsed.model !== 'string' || typeof parsed.systemPrompt !== 'string' || @@ -63,13 +147,260 @@ function toSessionName(input: AcpRuntimeEnsureInput): string { return `${input.agent}-${input.sessionKey}`; } +function createTaskId(sessionKey: string, requestId: string): string { + return `${sanitizeSegment(sessionKey)}-${sanitizeSegment(requestId)}-${randomUUID().slice(0, 8)}`; +} + +function createTaskTitle(prompt: string): string { + const firstLine = prompt + .split(/\r?\n/) + .map((line) => line.trim()) + .find((line) => line.length > 0); + return (firstLine || 'MACP OpenClaw task').slice(0, 120); +} + +function buildWorktreePath(repoRoot: string, taskId: string): string { + const repoName = path.basename(repoRoot); + return path.join(path.dirname(repoRoot), `${repoName}-worktrees`, `macp-oc-${taskId}`); +} + +function nowIso(): string { + return new Date().toISOString(); +} + +function chunkText(text: string, chunkSize = 4000): string[] { + const normalized = text.trim(); + if (!normalized) { + return []; + } + + const chunks: string[] = []; + for (let index = 0; index < normalized.length; index += chunkSize) { + chunks.push(normalized.slice(index, index + chunkSize)); + } + return chunks; +} + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null; +} + +function toMessage(value: unknown): string { + if (typeof value === 'string') { + return value; + } + if (value instanceof Error) { + return value.message; + } + if (value === null || value === undefined) { + return ''; + } + return JSON.stringify(value, null, 2); +} + +function abortError(): Error { + const error = new Error('MACP turn aborted.'); + error.name = 'AbortError'; + return error; +} + +async function waitFor(ms: number, signal?: AbortSignal): Promise { + if (signal?.aborted) { + throw abortError(); + } + + await new Promise((resolve, reject) => { + const onAbort = () => { + clearTimeout(timeout); + signal?.removeEventListener('abort', onAbort); + reject(abortError()); + }; + + const timeout = setTimeout(() => { + signal?.removeEventListener('abort', onAbort); + resolve(); + }, ms); + + signal?.addEventListener('abort', onAbort, { once: true }); + }); +} + +async function writeJsonAtomic(filePath: string, value: unknown): Promise { + const tempPath = `${filePath}.${process.pid}.${Date.now()}.tmp`; + await writeFile(tempPath, `${JSON.stringify(value, null, 2)}\n`, 'utf-8'); + await rename(tempPath, filePath); +} + +async function loadQueue(tasksPath: string): Promise { + try { + const raw = JSON.parse(await readFile(tasksPath, 'utf-8')) as unknown; + if (Array.isArray(raw)) { + return { tasks: raw as OrchestratorTask[] }; + } + if (isRecord(raw) && Array.isArray(raw.tasks)) { + return { tasks: raw.tasks as OrchestratorTask[] }; + } + throw new Error('tasks.json must contain a tasks array.'); + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code === 'ENOENT') { + return { tasks: [] }; + } + throw error; + } +} + +async function withFileLock( + lockPath: string, + timeoutMs: number, + action: () => Promise, +): Promise { + const deadline = Date.now() + Math.max(5_000, Math.min(timeoutMs, 30_000)); + + while (true) { + try { + const handle = await open(lockPath, 'wx'); + try { + return await action(); + } finally { + await handle.close(); + await rm(lockPath, { force: true }); + } + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code !== 'EEXIST') { + throw error; + } + if (Date.now() >= deadline) { + throw new Error(`Timed out waiting for orchestrator queue lock: ${lockPath}`); + } + await waitFor(200); + } + } +} + +async function appendTaskToQueue( + task: OrchestratorTask, + orchDir: string, + timeoutMs: number, +): Promise { + const tasksPath = path.join(orchDir, 'tasks.json'); + const lockPath = `${tasksPath}.lock`; + await mkdir(orchDir, { recursive: true }); + await withFileLock(lockPath, timeoutMs, async () => { + const queue = await loadQueue(tasksPath); + queue.tasks.push(task); + await writeJsonAtomic(tasksPath, queue); + }); +} + +async function readOrchestratorConfig(orchDir: string): Promise> { + const configPath = path.join(orchDir, 'config.json'); + const config = JSON.parse(await readFile(configPath, 'utf-8')) as unknown; + if (!isRecord(config)) { + throw new Error(`Invalid orchestrator config: ${configPath}`); + } + return config; +} + +async function ensureOrchestratorReady(orchDir: string): Promise { + const config = await readOrchestratorConfig(orchDir); + if (config.enabled !== true) { + throw new Error(`MACP orchestrator is disabled in ${path.join(orchDir, 'config.json')}.`); + } +} + +function triggerController(repoRoot: string): void { + const child = spawn( + 'bash', + ['-lc', `cd ${shellQuote(repoRoot)} && ${shellQuote(resolveOrchestratorRunPath())} --once`], + { + detached: true, + stdio: 'ignore', + }, + ); + child.unref(); +} + +async function pollForResult( + resultPath: string, + timeoutMs: number, + signal?: AbortSignal, +): Promise { + const deadline = Date.now() + Math.max(timeoutMs, 2_000); + + while (Date.now() <= deadline) { + if (signal?.aborted) { + throw abortError(); + } + + try { + const raw = JSON.parse(await readFile(resultPath, 'utf-8')) as unknown; + if (!isRecord(raw) || typeof raw.task_id !== 'string' || typeof raw.status !== 'string') { + throw new Error(`Invalid MACP result payload: ${resultPath}`); + } + return raw as TaskResult; + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code !== 'ENOENT' && !(error instanceof SyntaxError)) { + throw error; + } + } + + await waitFor(2_000, signal); + } + + throw new Error(`Timed out waiting for MACP result: ${resultPath}`); +} + +async function resolveResultOutput(result: TaskResult, orchDir: string): Promise { + const metadata = isRecord(result.metadata) ? result.metadata : {}; + const outputCandidates = [metadata.result_output_path, metadata.output_path] + .filter((value): value is string => typeof value === 'string' && value.trim().length > 0) + .map((value) => (path.isAbsolute(value) ? value : path.resolve(orchDir, value))); + + for (const candidate of outputCandidates) { + try { + return (await readFile(candidate, 'utf-8')).trim(); + } catch { + // Fall back to formatted result details below. + } + } + + const lines: string[] = []; + if (result.summary) { + lines.push(result.summary); + } + if (result.error) { + lines.push(`Error: ${toMessage(result.error)}`); + } + if (result.escalation_reason) { + lines.push(`Escalation: ${toMessage(result.escalation_reason)}`); + } + if (result.branch) { + lines.push(`Branch: ${result.branch}`); + } + if (result.pr) { + lines.push(`PR: ${result.pr}`); + } + if (Array.isArray(result.files_changed) && result.files_changed.length > 0) { + lines.push(`Files changed:\n${result.files_changed.map((file) => `- ${file}`).join('\n')}`); + } + if (Array.isArray(result.gate_results) && result.gate_results.length > 0) { + lines.push( + `Quality gates:\n${result.gate_results + .map((gate) => `- [${gate.exit_code ?? 0}] ${gate.command ?? 'unknown command'}`) + .join('\n')}`, + ); + } + + return lines.join('\n\n').trim() || JSON.stringify(result, null, 2); +} + export class MacpRuntime implements AcpRuntime { constructor(private readonly config: MacpRuntimeConfig) {} async ensureSession(input: AcpRuntimeEnsureInput): Promise { - if (input.agent !== 'pi') { - throw new Error(`macp runtime only supports agentId "pi"; received "${input.agent}".`); - } if (input.mode !== 'oneshot') { throw new Error(`macp runtime only supports oneshot sessions; received "${input.mode}".`); } @@ -78,6 +409,7 @@ export class MacpRuntime implements AcpRuntime { const state: HandleState = { name: toSessionName(input), agent: input.agent, + runtime: input.agent || this.config.defaultRuntime || 'codex', cwd, model: this.config.defaultModel, systemPrompt: this.config.systemPrompt, @@ -96,120 +428,73 @@ export class MacpRuntime implements AcpRuntime { async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable { const state = decodeHandleState(input.handle); - const logPath = path.join( - this.config.stateDir, - 'macp', - `${state.name}-${Date.now()}-${input.requestId}.json`, - ); - - const streamQueue: AcpRuntimeEvent[] = []; - const waiters: Array<() => void> = []; - let finished = false; - - const push = (event: AcpRuntimeEvent) => { - streamQueue.push(event); - const waiter = waiters.shift(); - waiter?.(); - }; - - const resultPromise = runPiTurn({ - model: state.model, - systemPrompt: state.systemPrompt, - prompt: input.text, - workDir: state.cwd, - timeoutMs: state.timeoutMs, - logPath, - ...(input.signal ? { signal: input.signal } : {}), - onEvent: async (event) => { - switch (event.type) { - case 'message_update': { - const formatted = formatAssistantEvent(event.assistantMessageEvent); - if (formatted?.text) { - push({ - type: 'text_delta', - text: formatted.text, - ...(formatted.stream ? { stream: formatted.stream } : {}), - ...(formatted.tag ? { tag: formatted.tag } : {}), - }); - } - break; - } - case 'tool_execution_start': - push({ - type: 'tool_call', - text: JSON.stringify(event.args ?? {}), - tag: 'tool_call', - toolCallId: event.toolCallId, - title: event.toolName, - status: 'running', - }); - break; - case 'tool_execution_update': - push({ - type: 'tool_call', - text: JSON.stringify(event.partialResult ?? {}), - tag: 'tool_call_update', - toolCallId: event.toolCallId, - title: event.toolName, - status: 'running', - }); - break; - case 'tool_execution_end': - push({ - type: 'tool_call', - text: JSON.stringify(event.result ?? {}), - tag: 'tool_call_update', - toolCallId: event.toolCallId, - title: event.toolName, - status: event.isError ? 'error' : 'completed', - }); - break; - case 'turn_end': - push({ - type: 'status', - text: 'Pi turn completed.', - tag: 'usage_update', - }); - break; - } - }, - }) - .then((result) => { - if (result.output) { - push({ - type: 'status', - text: result.output, - tag: 'session_info_update', - }); - } - push({ - type: 'done', - stopReason: result.stopReason, - }); - }) - .catch((error) => { - push({ - type: 'error', - message: error instanceof Error ? error.message : String(error), - }); - }) - .finally(() => { - finished = true; - while (waiters.length > 0) { - waiters.shift()?.(); - } - }); + const repoRoot = resolveRepoRoot(this.config); + const orchDir = resolveOrchDir(this.config); + const taskId = createTaskId(input.handle.sessionKey, input.requestId); + const briefDir = path.join(os.homedir(), '.mosaic', 'macp-oc'); + const briefPath = path.join(briefDir, `${state.name}-${input.requestId}.md`); + const resultPath = path.join(orchDir, 'results', `${taskId}.json`); try { - while (!finished || streamQueue.length > 0) { - if (streamQueue.length === 0) { - await new Promise((resolve) => waiters.push(resolve)); - continue; - } - yield streamQueue.shift() as AcpRuntimeEvent; + await access(resolveOrchestratorRunPath()); + await ensureOrchestratorReady(orchDir); + await mkdir(briefDir, { recursive: true }); + await mkdir(path.dirname(resultPath), { recursive: true }); + await writeFile(briefPath, `${input.text.trimEnd()}\n`, 'utf-8'); + + const task: OrchestratorTask = { + id: taskId, + title: createTaskTitle(input.text), + status: 'pending', + dispatch: this.config.defaultDispatch || 'yolo', + runtime: state.runtime || this.config.defaultRuntime || 'codex', + worktree: buildWorktreePath(repoRoot, taskId), + brief_path: briefPath, + _brief_temp_path: briefPath, + timeout_seconds: Math.max(1, Math.ceil(state.timeoutMs / 1_000)), + metadata: { + source: 'openclaw-macp-plugin', + created_at: nowIso(), + session_key: input.handle.sessionKey, + request_id: input.requestId, + agent_id: state.agent, + cwd: state.cwd, + }, + }; + + this.config.logger?.info?.( + `Queueing MACP orchestrator task ${taskId} (${task.runtime}/${task.dispatch}).`, + ); + yield { + type: 'status', + text: `Queued MACP task ${taskId}.`, + tag: 'session_info_update', + }; + + await appendTaskToQueue(task, orchDir, state.timeoutMs); + triggerController(repoRoot); + + const result = await pollForResult(resultPath, state.timeoutMs, input.signal); + const output = await resolveResultOutput(result, orchDir); + for (const chunk of chunkText(output)) { + yield { + type: 'text_delta', + text: chunk, + stream: 'output', + tag: 'agent_message_chunk', + }; } + yield { + type: 'done', + stopReason: result.status, + }; + } catch (error) { + yield { + type: 'error', + message: error instanceof Error ? error.message : String(error), + }; } finally { - await resultPromise; + await rm(briefPath, { force: true }).catch(() => undefined); } } @@ -220,47 +505,49 @@ export class MacpRuntime implements AcpRuntime { async getStatus(input: { handle: AcpRuntimeHandle }): Promise { const state = decodeHandleState(input.handle); return { - summary: 'macp Pi oneshot runtime ready', + summary: 'macp controller oneshot runtime ready', backendSessionId: state.name, agentSessionId: state.name, details: { mode: 'oneshot', - model: state.model, + agent: state.agent, + runtime: state.runtime, cwd: state.cwd, + repoRoot: resolveRepoRoot(this.config), + orchDir: resolveOrchDir(this.config), }, }; } async doctor(): Promise { try { - await access(PI_RUNNER_PATH); - const importErrors: string[] = []; - await import('@mariozechner/pi-agent-core').catch((error) => { - importErrors.push(error instanceof Error ? error.message : String(error)); - }); - await import('@mariozechner/pi-ai').catch((error) => { - importErrors.push(error instanceof Error ? error.message : String(error)); - }); - if (importErrors.length > 0) { + const repoRoot = resolveRepoRoot(this.config); + const orchDir = resolveOrchDir(this.config); + const orchestratorRunPath = resolveOrchestratorRunPath(); + await access(orchestratorRunPath); + await access(repoRoot); + await access(orchDir); + await access(PI_RUNNER_PATH).catch(() => undefined); + const orchestratorConfig = await readOrchestratorConfig(orchDir); + if (orchestratorConfig.enabled !== true) { return { ok: false, - code: 'MACP_DEPS_MISSING', - message: 'MACP runtime dependencies are not available.', - installCommand: 'pnpm install --frozen-lockfile', - details: importErrors, + code: 'MACP_ORCH_DISABLED', + message: 'MACP orchestrator is disabled for the configured repo.', + details: [path.join(orchDir, 'config.json')], }; } return { ok: true, message: 'MACP runtime is ready.', - details: [PI_RUNNER_PATH], + details: [orchestratorRunPath, repoRoot, orchDir], }; } catch (error) { return { ok: false, - code: 'MACP_PI_RUNNER_MISSING', - message: 'Pi runner was not found.', - details: [error instanceof Error ? error.message : String(error)], + code: 'MACP_ORCH_MISSING', + message: error instanceof Error ? error.message : String(error), + installCommand: 'pnpm install --frozen-lockfile', }; } }