fix(oc-plugin): replace Pi-direct with MACP controller bridge in runTurn
This commit is contained in:
@@ -2,14 +2,15 @@
|
||||
|
||||
This plugin registers a new OpenClaw ACP runtime backend named `macp`.
|
||||
|
||||
When OpenClaw calls `sessions_spawn(runtime: "macp", agentId: "pi")`, the plugin runs a Pi agent turn in-process using the MACP Pi runner logic and streams Pi output back as ACP runtime events.
|
||||
When OpenClaw calls `sessions_spawn(runtime: "macp")`, the plugin now writes the prompt to a brief file, queues a MACP controller task in `.mosaic/orchestrator/tasks.json`, triggers `mosaic-orchestrator-run --once`, polls `.mosaic/orchestrator/results/<task-id>.json`, and streams the resulting output back as ACP runtime events.
|
||||
|
||||
## Current behavior
|
||||
|
||||
- Supports `agentId: "pi"` only
|
||||
- Supports ACP `mode: "oneshot"` only
|
||||
- Accepts any `agentId` and maps it to the queued MACP task `runtime`
|
||||
- Defaults queued tasks to `dispatch: "yolo"` and `runtime: "codex"` when no override is provided
|
||||
- Rejects persistent ACP sessions
|
||||
- Reuses the Pi default tools from `tools/macp/dispatcher/pi_runner.ts`
|
||||
- Keeps `src/pi-bridge.ts` for future `dispatch: "pi"` support
|
||||
|
||||
## Install in OpenClaw
|
||||
|
||||
@@ -32,15 +33,29 @@ Add the plugin entry to your OpenClaw config:
|
||||
"defaultModel": "openai/gpt-5-mini",
|
||||
"systemPrompt": "You are Pi running via MACP.",
|
||||
"timeoutMs": 300000,
|
||||
"logDir": "~/.openclaw/state/macp"
|
||||
"logDir": "~/.openclaw/state/macp",
|
||||
"repoRoot": "~/src/mosaic-mono-v1",
|
||||
"orchDir": "~/src/mosaic-mono-v1/.mosaic/orchestrator",
|
||||
"defaultDispatch": "yolo",
|
||||
"defaultRuntime": "codex"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
## Runtime flow
|
||||
|
||||
1. OpenClaw ensures a oneshot `macp` session and preserves the requested `agentId`.
|
||||
2. `runTurn` writes the turn prompt to `~/.mosaic/macp-oc/<session>-<request>.md`.
|
||||
3. The plugin appends a pending MACP task to the configured orchestrator queue.
|
||||
4. The plugin triggers `~/.config/mosaic/bin/mosaic-orchestrator-run --once` in the configured repo root.
|
||||
5. The plugin polls for `.mosaic/orchestrator/results/<task-id>.json` and streams the result back to OpenClaw.
|
||||
|
||||
## Verification
|
||||
|
||||
```bash
|
||||
pnpm exec tsc --noEmit -p plugins/macp/tsconfig.json
|
||||
pnpm --filter @mosaic/oc-macp-plugin typecheck || npx tsc --noEmit -p plugins/macp/tsconfig.json
|
||||
pnpm prettier --write "plugins/macp/**/*.{ts,json,md}"
|
||||
pnpm format:check
|
||||
```
|
||||
|
||||
@@ -1,18 +1,18 @@
|
||||
{
|
||||
"id": "macp",
|
||||
"name": "MACP Runtime",
|
||||
"description": "Registers the macp ACP runtime backend and dispatches Pi turns through the MACP Pi runner.",
|
||||
"description": "Registers the macp ACP runtime backend and routes turns through the MACP controller queue.",
|
||||
"configSchema": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"defaultModel": {
|
||||
"type": "string",
|
||||
"description": "Default Pi model in provider/model format. Defaults to openai/gpt-5-mini."
|
||||
"description": "Default Pi model in provider/model format. Retained for Pi bridge compatibility. Defaults to openai/gpt-5-mini."
|
||||
},
|
||||
"systemPrompt": {
|
||||
"type": "string",
|
||||
"description": "Optional system prompt prepended to each Pi turn."
|
||||
"description": "Optional system prompt retained for Pi bridge compatibility."
|
||||
},
|
||||
"timeoutMs": {
|
||||
"type": "number",
|
||||
@@ -21,7 +21,23 @@
|
||||
},
|
||||
"logDir": {
|
||||
"type": "string",
|
||||
"description": "Directory for Pi runtime logs. Defaults to the plugin state dir."
|
||||
"description": "Directory for plugin state/log files. Defaults to the plugin state dir."
|
||||
},
|
||||
"repoRoot": {
|
||||
"type": "string",
|
||||
"description": "Repository root containing .mosaic/orchestrator. Defaults to ~/src/mosaic-mono-v1."
|
||||
},
|
||||
"orchDir": {
|
||||
"type": "string",
|
||||
"description": "Override for the orchestrator directory. Defaults to <repoRoot>/.mosaic/orchestrator."
|
||||
},
|
||||
"defaultDispatch": {
|
||||
"type": "string",
|
||||
"description": "Dispatch type written into queued MACP tasks. Defaults to yolo."
|
||||
},
|
||||
"defaultRuntime": {
|
||||
"type": "string",
|
||||
"description": "Fallback runtime when agentId is unavailable. Defaults to codex."
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import path from 'node:path';
|
||||
import * as os from 'node:os';
|
||||
import * as path from 'node:path';
|
||||
|
||||
import {
|
||||
registerAcpRuntimeBackend,
|
||||
@@ -17,10 +18,27 @@ type PluginConfig = {
|
||||
systemPrompt?: string;
|
||||
timeoutMs?: number;
|
||||
logDir?: string;
|
||||
repoRoot?: string;
|
||||
orchDir?: string;
|
||||
defaultDispatch?: string;
|
||||
defaultRuntime?: string;
|
||||
};
|
||||
|
||||
function expandHome(rawPath: string): string {
|
||||
if (rawPath === '~') {
|
||||
return os.homedir();
|
||||
}
|
||||
if (rawPath.startsWith('~/')) {
|
||||
return path.join(os.homedir(), rawPath.slice(2));
|
||||
}
|
||||
return rawPath;
|
||||
}
|
||||
|
||||
function resolveConfig(pluginConfig?: Record<string, unknown>, stateDir?: string) {
|
||||
const config = (pluginConfig ?? {}) as PluginConfig;
|
||||
const repoRoot = config.repoRoot?.trim()
|
||||
? path.resolve(expandHome(config.repoRoot))
|
||||
: path.resolve('/home/jarvis/src/mosaic-mono-v1');
|
||||
return {
|
||||
defaultModel: config.defaultModel?.trim() || 'openai/gpt-5-mini',
|
||||
systemPrompt: config.systemPrompt ?? '',
|
||||
@@ -30,7 +48,15 @@ function resolveConfig(pluginConfig?: Record<string, unknown>, stateDir?: string
|
||||
config.timeoutMs > 0
|
||||
? config.timeoutMs
|
||||
: 300_000,
|
||||
stateDir: config.logDir?.trim() ? path.resolve(config.logDir) : (stateDir ?? process.cwd()),
|
||||
stateDir: config.logDir?.trim()
|
||||
? path.resolve(expandHome(config.logDir))
|
||||
: (stateDir ?? process.cwd()),
|
||||
repoRoot,
|
||||
orchDir: config.orchDir?.trim()
|
||||
? path.resolve(expandHome(config.orchDir))
|
||||
: path.join(repoRoot, '.mosaic', 'orchestrator'),
|
||||
defaultDispatch: config.defaultDispatch?.trim() || 'yolo',
|
||||
defaultRuntime: config.defaultRuntime?.trim() || 'codex',
|
||||
};
|
||||
}
|
||||
|
||||
@@ -51,7 +77,7 @@ function createMacpRuntimeService(pluginConfig?: Record<string, unknown>): OpenC
|
||||
healthy: () => runtime !== null,
|
||||
});
|
||||
ctx.logger.info(
|
||||
`macp runtime backend registered (defaultModel: ${resolved.defaultModel}, timeoutMs: ${resolved.timeoutMs})`,
|
||||
`macp runtime backend registered (defaultRuntime: ${resolved.defaultRuntime}, defaultDispatch: ${resolved.defaultDispatch}, timeoutMs: ${resolved.timeoutMs})`,
|
||||
);
|
||||
},
|
||||
async stop(_ctx: OpenClawPluginServiceContext) {
|
||||
@@ -64,7 +90,8 @@ function createMacpRuntimeService(pluginConfig?: Record<string, unknown>): OpenC
|
||||
const plugin = {
|
||||
id: 'macp',
|
||||
name: 'MACP Runtime',
|
||||
description: 'ACP runtime backend that dispatches Pi oneshot sessions through MACP.',
|
||||
description:
|
||||
'ACP runtime backend that dispatches OpenClaw oneshot sessions through the MACP controller queue.',
|
||||
register(api: OpenClawPluginApi) {
|
||||
api.registerService(createMacpRuntimeService(api.pluginConfig));
|
||||
},
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
import path from 'node:path';
|
||||
import { access } from 'node:fs/promises';
|
||||
import { spawn } from 'node:child_process';
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { access, mkdir, open, readFile, rename, rm, writeFile } from 'node:fs/promises';
|
||||
import * as os from 'node:os';
|
||||
import * as path from 'node:path';
|
||||
|
||||
import type {
|
||||
AcpRuntime,
|
||||
@@ -12,13 +15,15 @@ import type {
|
||||
AcpRuntimeTurnInput,
|
||||
} from '/home/jarvis/.npm-global/lib/node_modules/openclaw/dist/plugin-sdk/acp-runtime.js';
|
||||
|
||||
import { formatAssistantEvent, runPiTurn } from './pi-bridge.js';
|
||||
|
||||
export interface MacpRuntimeConfig {
|
||||
defaultModel: string;
|
||||
systemPrompt: string;
|
||||
timeoutMs: number;
|
||||
stateDir: string;
|
||||
repoRoot?: string;
|
||||
orchDir?: string;
|
||||
defaultDispatch?: string;
|
||||
defaultRuntime?: string;
|
||||
logger?: {
|
||||
info?: (message: string) => void;
|
||||
warn?: (message: string) => void;
|
||||
@@ -28,18 +33,96 @@ export interface MacpRuntimeConfig {
|
||||
type HandleState = {
|
||||
name: string;
|
||||
agent: string;
|
||||
runtime: string;
|
||||
cwd: string;
|
||||
model: string;
|
||||
systemPrompt: string;
|
||||
timeoutMs: number;
|
||||
};
|
||||
|
||||
type OrchestratorTask = {
|
||||
id: string;
|
||||
title: string;
|
||||
status: 'pending';
|
||||
dispatch: string;
|
||||
runtime: string;
|
||||
worktree: string;
|
||||
brief_path: string;
|
||||
_brief_temp_path: string;
|
||||
timeout_seconds: number;
|
||||
metadata: Record<string, unknown>;
|
||||
};
|
||||
|
||||
type QueueFile = {
|
||||
tasks: OrchestratorTask[];
|
||||
};
|
||||
|
||||
type TaskGateResult = {
|
||||
command?: string;
|
||||
exit_code?: number;
|
||||
type?: string;
|
||||
};
|
||||
|
||||
type TaskResult = {
|
||||
task_id: string;
|
||||
status: string;
|
||||
summary?: string;
|
||||
error?: unknown;
|
||||
escalation_reason?: unknown;
|
||||
branch?: string | null;
|
||||
pr?: string | null;
|
||||
files_changed?: string[];
|
||||
gate_results?: TaskGateResult[];
|
||||
metadata?: Record<string, unknown>;
|
||||
};
|
||||
|
||||
const MACP_CAPABILITIES: AcpRuntimeCapabilities = {
|
||||
controls: [],
|
||||
};
|
||||
|
||||
const DEFAULT_REPO_ROOT = '~/src/mosaic-mono-v1';
|
||||
const ORCHESTRATOR_RUN_PATH = '~/.config/mosaic/bin/mosaic-orchestrator-run';
|
||||
const PI_RUNNER_PATH = '/home/jarvis/src/mosaic-mono-v1/tools/macp/dispatcher/pi_runner.ts';
|
||||
|
||||
function expandHome(rawPath: string): string {
|
||||
if (rawPath === '~') {
|
||||
return os.homedir();
|
||||
}
|
||||
if (rawPath.startsWith('~/')) {
|
||||
return path.join(os.homedir(), rawPath.slice(2));
|
||||
}
|
||||
return rawPath;
|
||||
}
|
||||
|
||||
function resolveRepoRoot(config: MacpRuntimeConfig): string {
|
||||
return path.resolve(expandHome(config.repoRoot?.trim() || DEFAULT_REPO_ROOT));
|
||||
}
|
||||
|
||||
function resolveOrchDir(config: MacpRuntimeConfig): string {
|
||||
if (config.orchDir?.trim()) {
|
||||
return path.resolve(expandHome(config.orchDir));
|
||||
}
|
||||
return path.join(resolveRepoRoot(config), '.mosaic', 'orchestrator');
|
||||
}
|
||||
|
||||
function resolveOrchestratorRunPath(): string {
|
||||
return path.resolve(expandHome(ORCHESTRATOR_RUN_PATH));
|
||||
}
|
||||
|
||||
function shellQuote(value: string): string {
|
||||
return `'${value.replace(/'/g, `'\\''`)}'`;
|
||||
}
|
||||
|
||||
function sanitizeSegment(value: string): string {
|
||||
return (
|
||||
value
|
||||
.trim()
|
||||
.toLowerCase()
|
||||
.replace(/[^a-z0-9]+/g, '-')
|
||||
.replace(/^-+|-+$/g, '') || 'task'
|
||||
);
|
||||
}
|
||||
|
||||
function encodeHandleState(state: HandleState): string {
|
||||
return JSON.stringify(state);
|
||||
}
|
||||
@@ -49,6 +132,7 @@ function decodeHandleState(handle: AcpRuntimeHandle): HandleState {
|
||||
if (
|
||||
typeof parsed.name !== 'string' ||
|
||||
typeof parsed.agent !== 'string' ||
|
||||
typeof parsed.runtime !== 'string' ||
|
||||
typeof parsed.cwd !== 'string' ||
|
||||
typeof parsed.model !== 'string' ||
|
||||
typeof parsed.systemPrompt !== 'string' ||
|
||||
@@ -63,13 +147,260 @@ function toSessionName(input: AcpRuntimeEnsureInput): string {
|
||||
return `${input.agent}-${input.sessionKey}`;
|
||||
}
|
||||
|
||||
function createTaskId(sessionKey: string, requestId: string): string {
|
||||
return `${sanitizeSegment(sessionKey)}-${sanitizeSegment(requestId)}-${randomUUID().slice(0, 8)}`;
|
||||
}
|
||||
|
||||
function createTaskTitle(prompt: string): string {
|
||||
const firstLine = prompt
|
||||
.split(/\r?\n/)
|
||||
.map((line) => line.trim())
|
||||
.find((line) => line.length > 0);
|
||||
return (firstLine || 'MACP OpenClaw task').slice(0, 120);
|
||||
}
|
||||
|
||||
function buildWorktreePath(repoRoot: string, taskId: string): string {
|
||||
const repoName = path.basename(repoRoot);
|
||||
return path.join(path.dirname(repoRoot), `${repoName}-worktrees`, `macp-oc-${taskId}`);
|
||||
}
|
||||
|
||||
function nowIso(): string {
|
||||
return new Date().toISOString();
|
||||
}
|
||||
|
||||
function chunkText(text: string, chunkSize = 4000): string[] {
|
||||
const normalized = text.trim();
|
||||
if (!normalized) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const chunks: string[] = [];
|
||||
for (let index = 0; index < normalized.length; index += chunkSize) {
|
||||
chunks.push(normalized.slice(index, index + chunkSize));
|
||||
}
|
||||
return chunks;
|
||||
}
|
||||
|
||||
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||
return typeof value === 'object' && value !== null;
|
||||
}
|
||||
|
||||
function toMessage(value: unknown): string {
|
||||
if (typeof value === 'string') {
|
||||
return value;
|
||||
}
|
||||
if (value instanceof Error) {
|
||||
return value.message;
|
||||
}
|
||||
if (value === null || value === undefined) {
|
||||
return '';
|
||||
}
|
||||
return JSON.stringify(value, null, 2);
|
||||
}
|
||||
|
||||
function abortError(): Error {
|
||||
const error = new Error('MACP turn aborted.');
|
||||
error.name = 'AbortError';
|
||||
return error;
|
||||
}
|
||||
|
||||
async function waitFor(ms: number, signal?: AbortSignal): Promise<void> {
|
||||
if (signal?.aborted) {
|
||||
throw abortError();
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const onAbort = () => {
|
||||
clearTimeout(timeout);
|
||||
signal?.removeEventListener('abort', onAbort);
|
||||
reject(abortError());
|
||||
};
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
signal?.removeEventListener('abort', onAbort);
|
||||
resolve();
|
||||
}, ms);
|
||||
|
||||
signal?.addEventListener('abort', onAbort, { once: true });
|
||||
});
|
||||
}
|
||||
|
||||
async function writeJsonAtomic(filePath: string, value: unknown): Promise<void> {
|
||||
const tempPath = `${filePath}.${process.pid}.${Date.now()}.tmp`;
|
||||
await writeFile(tempPath, `${JSON.stringify(value, null, 2)}\n`, 'utf-8');
|
||||
await rename(tempPath, filePath);
|
||||
}
|
||||
|
||||
async function loadQueue(tasksPath: string): Promise<QueueFile> {
|
||||
try {
|
||||
const raw = JSON.parse(await readFile(tasksPath, 'utf-8')) as unknown;
|
||||
if (Array.isArray(raw)) {
|
||||
return { tasks: raw as OrchestratorTask[] };
|
||||
}
|
||||
if (isRecord(raw) && Array.isArray(raw.tasks)) {
|
||||
return { tasks: raw.tasks as OrchestratorTask[] };
|
||||
}
|
||||
throw new Error('tasks.json must contain a tasks array.');
|
||||
} catch (error) {
|
||||
const code = (error as NodeJS.ErrnoException).code;
|
||||
if (code === 'ENOENT') {
|
||||
return { tasks: [] };
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function withFileLock<T>(
|
||||
lockPath: string,
|
||||
timeoutMs: number,
|
||||
action: () => Promise<T>,
|
||||
): Promise<T> {
|
||||
const deadline = Date.now() + Math.max(5_000, Math.min(timeoutMs, 30_000));
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
const handle = await open(lockPath, 'wx');
|
||||
try {
|
||||
return await action();
|
||||
} finally {
|
||||
await handle.close();
|
||||
await rm(lockPath, { force: true });
|
||||
}
|
||||
} catch (error) {
|
||||
const code = (error as NodeJS.ErrnoException).code;
|
||||
if (code !== 'EEXIST') {
|
||||
throw error;
|
||||
}
|
||||
if (Date.now() >= deadline) {
|
||||
throw new Error(`Timed out waiting for orchestrator queue lock: ${lockPath}`);
|
||||
}
|
||||
await waitFor(200);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function appendTaskToQueue(
|
||||
task: OrchestratorTask,
|
||||
orchDir: string,
|
||||
timeoutMs: number,
|
||||
): Promise<void> {
|
||||
const tasksPath = path.join(orchDir, 'tasks.json');
|
||||
const lockPath = `${tasksPath}.lock`;
|
||||
await mkdir(orchDir, { recursive: true });
|
||||
await withFileLock(lockPath, timeoutMs, async () => {
|
||||
const queue = await loadQueue(tasksPath);
|
||||
queue.tasks.push(task);
|
||||
await writeJsonAtomic(tasksPath, queue);
|
||||
});
|
||||
}
|
||||
|
||||
async function readOrchestratorConfig(orchDir: string): Promise<Record<string, unknown>> {
|
||||
const configPath = path.join(orchDir, 'config.json');
|
||||
const config = JSON.parse(await readFile(configPath, 'utf-8')) as unknown;
|
||||
if (!isRecord(config)) {
|
||||
throw new Error(`Invalid orchestrator config: ${configPath}`);
|
||||
}
|
||||
return config;
|
||||
}
|
||||
|
||||
async function ensureOrchestratorReady(orchDir: string): Promise<void> {
|
||||
const config = await readOrchestratorConfig(orchDir);
|
||||
if (config.enabled !== true) {
|
||||
throw new Error(`MACP orchestrator is disabled in ${path.join(orchDir, 'config.json')}.`);
|
||||
}
|
||||
}
|
||||
|
||||
function triggerController(repoRoot: string): void {
|
||||
const child = spawn(
|
||||
'bash',
|
||||
['-lc', `cd ${shellQuote(repoRoot)} && ${shellQuote(resolveOrchestratorRunPath())} --once`],
|
||||
{
|
||||
detached: true,
|
||||
stdio: 'ignore',
|
||||
},
|
||||
);
|
||||
child.unref();
|
||||
}
|
||||
|
||||
async function pollForResult(
|
||||
resultPath: string,
|
||||
timeoutMs: number,
|
||||
signal?: AbortSignal,
|
||||
): Promise<TaskResult> {
|
||||
const deadline = Date.now() + Math.max(timeoutMs, 2_000);
|
||||
|
||||
while (Date.now() <= deadline) {
|
||||
if (signal?.aborted) {
|
||||
throw abortError();
|
||||
}
|
||||
|
||||
try {
|
||||
const raw = JSON.parse(await readFile(resultPath, 'utf-8')) as unknown;
|
||||
if (!isRecord(raw) || typeof raw.task_id !== 'string' || typeof raw.status !== 'string') {
|
||||
throw new Error(`Invalid MACP result payload: ${resultPath}`);
|
||||
}
|
||||
return raw as TaskResult;
|
||||
} catch (error) {
|
||||
const code = (error as NodeJS.ErrnoException).code;
|
||||
if (code !== 'ENOENT' && !(error instanceof SyntaxError)) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
await waitFor(2_000, signal);
|
||||
}
|
||||
|
||||
throw new Error(`Timed out waiting for MACP result: ${resultPath}`);
|
||||
}
|
||||
|
||||
async function resolveResultOutput(result: TaskResult, orchDir: string): Promise<string> {
|
||||
const metadata = isRecord(result.metadata) ? result.metadata : {};
|
||||
const outputCandidates = [metadata.result_output_path, metadata.output_path]
|
||||
.filter((value): value is string => typeof value === 'string' && value.trim().length > 0)
|
||||
.map((value) => (path.isAbsolute(value) ? value : path.resolve(orchDir, value)));
|
||||
|
||||
for (const candidate of outputCandidates) {
|
||||
try {
|
||||
return (await readFile(candidate, 'utf-8')).trim();
|
||||
} catch {
|
||||
// Fall back to formatted result details below.
|
||||
}
|
||||
}
|
||||
|
||||
const lines: string[] = [];
|
||||
if (result.summary) {
|
||||
lines.push(result.summary);
|
||||
}
|
||||
if (result.error) {
|
||||
lines.push(`Error: ${toMessage(result.error)}`);
|
||||
}
|
||||
if (result.escalation_reason) {
|
||||
lines.push(`Escalation: ${toMessage(result.escalation_reason)}`);
|
||||
}
|
||||
if (result.branch) {
|
||||
lines.push(`Branch: ${result.branch}`);
|
||||
}
|
||||
if (result.pr) {
|
||||
lines.push(`PR: ${result.pr}`);
|
||||
}
|
||||
if (Array.isArray(result.files_changed) && result.files_changed.length > 0) {
|
||||
lines.push(`Files changed:\n${result.files_changed.map((file) => `- ${file}`).join('\n')}`);
|
||||
}
|
||||
if (Array.isArray(result.gate_results) && result.gate_results.length > 0) {
|
||||
lines.push(
|
||||
`Quality gates:\n${result.gate_results
|
||||
.map((gate) => `- [${gate.exit_code ?? 0}] ${gate.command ?? 'unknown command'}`)
|
||||
.join('\n')}`,
|
||||
);
|
||||
}
|
||||
|
||||
return lines.join('\n\n').trim() || JSON.stringify(result, null, 2);
|
||||
}
|
||||
|
||||
export class MacpRuntime implements AcpRuntime {
|
||||
constructor(private readonly config: MacpRuntimeConfig) {}
|
||||
|
||||
async ensureSession(input: AcpRuntimeEnsureInput): Promise<AcpRuntimeHandle> {
|
||||
if (input.agent !== 'pi') {
|
||||
throw new Error(`macp runtime only supports agentId "pi"; received "${input.agent}".`);
|
||||
}
|
||||
if (input.mode !== 'oneshot') {
|
||||
throw new Error(`macp runtime only supports oneshot sessions; received "${input.mode}".`);
|
||||
}
|
||||
@@ -78,6 +409,7 @@ export class MacpRuntime implements AcpRuntime {
|
||||
const state: HandleState = {
|
||||
name: toSessionName(input),
|
||||
agent: input.agent,
|
||||
runtime: input.agent || this.config.defaultRuntime || 'codex',
|
||||
cwd,
|
||||
model: this.config.defaultModel,
|
||||
systemPrompt: this.config.systemPrompt,
|
||||
@@ -96,120 +428,73 @@ export class MacpRuntime implements AcpRuntime {
|
||||
|
||||
async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable<AcpRuntimeEvent> {
|
||||
const state = decodeHandleState(input.handle);
|
||||
const logPath = path.join(
|
||||
this.config.stateDir,
|
||||
'macp',
|
||||
`${state.name}-${Date.now()}-${input.requestId}.json`,
|
||||
);
|
||||
|
||||
const streamQueue: AcpRuntimeEvent[] = [];
|
||||
const waiters: Array<() => void> = [];
|
||||
let finished = false;
|
||||
|
||||
const push = (event: AcpRuntimeEvent) => {
|
||||
streamQueue.push(event);
|
||||
const waiter = waiters.shift();
|
||||
waiter?.();
|
||||
};
|
||||
|
||||
const resultPromise = runPiTurn({
|
||||
model: state.model,
|
||||
systemPrompt: state.systemPrompt,
|
||||
prompt: input.text,
|
||||
workDir: state.cwd,
|
||||
timeoutMs: state.timeoutMs,
|
||||
logPath,
|
||||
...(input.signal ? { signal: input.signal } : {}),
|
||||
onEvent: async (event) => {
|
||||
switch (event.type) {
|
||||
case 'message_update': {
|
||||
const formatted = formatAssistantEvent(event.assistantMessageEvent);
|
||||
if (formatted?.text) {
|
||||
push({
|
||||
type: 'text_delta',
|
||||
text: formatted.text,
|
||||
...(formatted.stream ? { stream: formatted.stream } : {}),
|
||||
...(formatted.tag ? { tag: formatted.tag } : {}),
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'tool_execution_start':
|
||||
push({
|
||||
type: 'tool_call',
|
||||
text: JSON.stringify(event.args ?? {}),
|
||||
tag: 'tool_call',
|
||||
toolCallId: event.toolCallId,
|
||||
title: event.toolName,
|
||||
status: 'running',
|
||||
});
|
||||
break;
|
||||
case 'tool_execution_update':
|
||||
push({
|
||||
type: 'tool_call',
|
||||
text: JSON.stringify(event.partialResult ?? {}),
|
||||
tag: 'tool_call_update',
|
||||
toolCallId: event.toolCallId,
|
||||
title: event.toolName,
|
||||
status: 'running',
|
||||
});
|
||||
break;
|
||||
case 'tool_execution_end':
|
||||
push({
|
||||
type: 'tool_call',
|
||||
text: JSON.stringify(event.result ?? {}),
|
||||
tag: 'tool_call_update',
|
||||
toolCallId: event.toolCallId,
|
||||
title: event.toolName,
|
||||
status: event.isError ? 'error' : 'completed',
|
||||
});
|
||||
break;
|
||||
case 'turn_end':
|
||||
push({
|
||||
type: 'status',
|
||||
text: 'Pi turn completed.',
|
||||
tag: 'usage_update',
|
||||
});
|
||||
break;
|
||||
}
|
||||
},
|
||||
})
|
||||
.then((result) => {
|
||||
if (result.output) {
|
||||
push({
|
||||
type: 'status',
|
||||
text: result.output,
|
||||
tag: 'session_info_update',
|
||||
});
|
||||
}
|
||||
push({
|
||||
type: 'done',
|
||||
stopReason: result.stopReason,
|
||||
});
|
||||
})
|
||||
.catch((error) => {
|
||||
push({
|
||||
type: 'error',
|
||||
message: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
})
|
||||
.finally(() => {
|
||||
finished = true;
|
||||
while (waiters.length > 0) {
|
||||
waiters.shift()?.();
|
||||
}
|
||||
});
|
||||
const repoRoot = resolveRepoRoot(this.config);
|
||||
const orchDir = resolveOrchDir(this.config);
|
||||
const taskId = createTaskId(input.handle.sessionKey, input.requestId);
|
||||
const briefDir = path.join(os.homedir(), '.mosaic', 'macp-oc');
|
||||
const briefPath = path.join(briefDir, `${state.name}-${input.requestId}.md`);
|
||||
const resultPath = path.join(orchDir, 'results', `${taskId}.json`);
|
||||
|
||||
try {
|
||||
while (!finished || streamQueue.length > 0) {
|
||||
if (streamQueue.length === 0) {
|
||||
await new Promise<void>((resolve) => waiters.push(resolve));
|
||||
continue;
|
||||
}
|
||||
yield streamQueue.shift() as AcpRuntimeEvent;
|
||||
await access(resolveOrchestratorRunPath());
|
||||
await ensureOrchestratorReady(orchDir);
|
||||
await mkdir(briefDir, { recursive: true });
|
||||
await mkdir(path.dirname(resultPath), { recursive: true });
|
||||
await writeFile(briefPath, `${input.text.trimEnd()}\n`, 'utf-8');
|
||||
|
||||
const task: OrchestratorTask = {
|
||||
id: taskId,
|
||||
title: createTaskTitle(input.text),
|
||||
status: 'pending',
|
||||
dispatch: this.config.defaultDispatch || 'yolo',
|
||||
runtime: state.runtime || this.config.defaultRuntime || 'codex',
|
||||
worktree: buildWorktreePath(repoRoot, taskId),
|
||||
brief_path: briefPath,
|
||||
_brief_temp_path: briefPath,
|
||||
timeout_seconds: Math.max(1, Math.ceil(state.timeoutMs / 1_000)),
|
||||
metadata: {
|
||||
source: 'openclaw-macp-plugin',
|
||||
created_at: nowIso(),
|
||||
session_key: input.handle.sessionKey,
|
||||
request_id: input.requestId,
|
||||
agent_id: state.agent,
|
||||
cwd: state.cwd,
|
||||
},
|
||||
};
|
||||
|
||||
this.config.logger?.info?.(
|
||||
`Queueing MACP orchestrator task ${taskId} (${task.runtime}/${task.dispatch}).`,
|
||||
);
|
||||
yield {
|
||||
type: 'status',
|
||||
text: `Queued MACP task ${taskId}.`,
|
||||
tag: 'session_info_update',
|
||||
};
|
||||
|
||||
await appendTaskToQueue(task, orchDir, state.timeoutMs);
|
||||
triggerController(repoRoot);
|
||||
|
||||
const result = await pollForResult(resultPath, state.timeoutMs, input.signal);
|
||||
const output = await resolveResultOutput(result, orchDir);
|
||||
for (const chunk of chunkText(output)) {
|
||||
yield {
|
||||
type: 'text_delta',
|
||||
text: chunk,
|
||||
stream: 'output',
|
||||
tag: 'agent_message_chunk',
|
||||
};
|
||||
}
|
||||
yield {
|
||||
type: 'done',
|
||||
stopReason: result.status,
|
||||
};
|
||||
} catch (error) {
|
||||
yield {
|
||||
type: 'error',
|
||||
message: error instanceof Error ? error.message : String(error),
|
||||
};
|
||||
} finally {
|
||||
await resultPromise;
|
||||
await rm(briefPath, { force: true }).catch(() => undefined);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -220,47 +505,49 @@ export class MacpRuntime implements AcpRuntime {
|
||||
async getStatus(input: { handle: AcpRuntimeHandle }): Promise<AcpRuntimeStatus> {
|
||||
const state = decodeHandleState(input.handle);
|
||||
return {
|
||||
summary: 'macp Pi oneshot runtime ready',
|
||||
summary: 'macp controller oneshot runtime ready',
|
||||
backendSessionId: state.name,
|
||||
agentSessionId: state.name,
|
||||
details: {
|
||||
mode: 'oneshot',
|
||||
model: state.model,
|
||||
agent: state.agent,
|
||||
runtime: state.runtime,
|
||||
cwd: state.cwd,
|
||||
repoRoot: resolveRepoRoot(this.config),
|
||||
orchDir: resolveOrchDir(this.config),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async doctor(): Promise<AcpRuntimeDoctorReport> {
|
||||
try {
|
||||
await access(PI_RUNNER_PATH);
|
||||
const importErrors: string[] = [];
|
||||
await import('@mariozechner/pi-agent-core').catch((error) => {
|
||||
importErrors.push(error instanceof Error ? error.message : String(error));
|
||||
});
|
||||
await import('@mariozechner/pi-ai').catch((error) => {
|
||||
importErrors.push(error instanceof Error ? error.message : String(error));
|
||||
});
|
||||
if (importErrors.length > 0) {
|
||||
const repoRoot = resolveRepoRoot(this.config);
|
||||
const orchDir = resolveOrchDir(this.config);
|
||||
const orchestratorRunPath = resolveOrchestratorRunPath();
|
||||
await access(orchestratorRunPath);
|
||||
await access(repoRoot);
|
||||
await access(orchDir);
|
||||
await access(PI_RUNNER_PATH).catch(() => undefined);
|
||||
const orchestratorConfig = await readOrchestratorConfig(orchDir);
|
||||
if (orchestratorConfig.enabled !== true) {
|
||||
return {
|
||||
ok: false,
|
||||
code: 'MACP_DEPS_MISSING',
|
||||
message: 'MACP runtime dependencies are not available.',
|
||||
installCommand: 'pnpm install --frozen-lockfile',
|
||||
details: importErrors,
|
||||
code: 'MACP_ORCH_DISABLED',
|
||||
message: 'MACP orchestrator is disabled for the configured repo.',
|
||||
details: [path.join(orchDir, 'config.json')],
|
||||
};
|
||||
}
|
||||
return {
|
||||
ok: true,
|
||||
message: 'MACP runtime is ready.',
|
||||
details: [PI_RUNNER_PATH],
|
||||
details: [orchestratorRunPath, repoRoot, orchDir],
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
ok: false,
|
||||
code: 'MACP_PI_RUNNER_MISSING',
|
||||
message: 'Pi runner was not found.',
|
||||
details: [error instanceof Error ? error.message : String(error)],
|
||||
code: 'MACP_ORCH_MISSING',
|
||||
message: error instanceof Error ? error.message : String(error),
|
||||
installCommand: 'pnpm install --frozen-lockfile',
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user