Merge pull request 'fix(oc-plugin): MACP OC bridge — route through controller queue instead of Pi-direct' (#330) from fix/macp-oc-bridge into main
Some checks failed
ci/woodpecker/push/ci Pipeline failed
Some checks failed
ci/woodpecker/push/ci Pipeline failed
This commit was merged in pull request #330.
This commit is contained in:
16
docs/scratchpads/macp-oc-bridge-20260330.md
Normal file
16
docs/scratchpads/macp-oc-bridge-20260330.md
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
# Scratchpad: MACP OC Bridge (2026-03-30)
|
||||||
|
|
||||||
|
- Objective: Replace the OpenClaw MACP plugin's Pi-direct `runTurn` with the MACP controller queue bridge.
|
||||||
|
- Scope: `plugins/macp/src/macp-runtime.ts`, `plugins/macp/src/index.ts`, `plugins/macp/openclaw.plugin.json`, `plugins/macp/README.md`.
|
||||||
|
- Plan:
|
||||||
|
1. Read controller/dispatcher/plugin docs and confirm queue/result contract.
|
||||||
|
2. Queue tasks through `.mosaic/orchestrator/tasks.json` using a brief file and controller trigger.
|
||||||
|
3. Poll result JSON, stream output back to ACP, then validate with typecheck/format checks.
|
||||||
|
- Risks:
|
||||||
|
- The repo orchestrator must be enabled in `.mosaic/orchestrator/config.json`.
|
||||||
|
- Result JSON does not always embed worker output, so the runtime falls back to metadata-linked output files or a formatted result summary.
|
||||||
|
- Verification:
|
||||||
|
- `npx tsc --noEmit --target es2022 --module nodenext --moduleResolution nodenext --skipLibCheck plugins/macp/src/macp-runtime.ts plugins/macp/src/index.ts`
|
||||||
|
- `pnpm prettier --write "plugins/macp/**/*.{ts,json,md}"`
|
||||||
|
- `pnpm format:check`
|
||||||
|
- `npx tsc --noEmit -p plugins/macp/tsconfig.json` still fails in this branch because `plugins/macp/tsconfig.json` extends a missing `packages/config/typescript/library.json` file and also pulls in pre-existing external OpenClaw type noise.
|
||||||
@@ -2,14 +2,15 @@
|
|||||||
|
|
||||||
This plugin registers a new OpenClaw ACP runtime backend named `macp`.
|
This plugin registers a new OpenClaw ACP runtime backend named `macp`.
|
||||||
|
|
||||||
When OpenClaw calls `sessions_spawn(runtime: "macp", agentId: "pi")`, the plugin runs a Pi agent turn in-process using the MACP Pi runner logic and streams Pi output back as ACP runtime events.
|
When OpenClaw calls `sessions_spawn(runtime: "macp")`, the plugin now writes the prompt to a brief file, queues a MACP controller task in `.mosaic/orchestrator/tasks.json`, triggers `mosaic-orchestrator-run --once`, polls `.mosaic/orchestrator/results/<task-id>.json`, and streams the resulting output back as ACP runtime events.
|
||||||
|
|
||||||
## Current behavior
|
## Current behavior
|
||||||
|
|
||||||
- Supports `agentId: "pi"` only
|
|
||||||
- Supports ACP `mode: "oneshot"` only
|
- Supports ACP `mode: "oneshot"` only
|
||||||
|
- Accepts any `agentId` and maps it to the queued MACP task `runtime`
|
||||||
|
- Defaults queued tasks to `dispatch: "yolo"` and `runtime: "codex"` when no override is provided
|
||||||
- Rejects persistent ACP sessions
|
- Rejects persistent ACP sessions
|
||||||
- Reuses the Pi default tools from `tools/macp/dispatcher/pi_runner.ts`
|
- Keeps `src/pi-bridge.ts` for future `dispatch: "pi"` support
|
||||||
|
|
||||||
## Install in OpenClaw
|
## Install in OpenClaw
|
||||||
|
|
||||||
@@ -32,15 +33,29 @@ Add the plugin entry to your OpenClaw config:
|
|||||||
"defaultModel": "openai/gpt-5-mini",
|
"defaultModel": "openai/gpt-5-mini",
|
||||||
"systemPrompt": "You are Pi running via MACP.",
|
"systemPrompt": "You are Pi running via MACP.",
|
||||||
"timeoutMs": 300000,
|
"timeoutMs": 300000,
|
||||||
"logDir": "~/.openclaw/state/macp"
|
"logDir": "~/.openclaw/state/macp",
|
||||||
|
"repoRoot": "~/src/mosaic-mono-v1",
|
||||||
|
"orchDir": "~/src/mosaic-mono-v1/.mosaic/orchestrator",
|
||||||
|
"defaultDispatch": "yolo",
|
||||||
|
"defaultRuntime": "codex"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## Runtime flow
|
||||||
|
|
||||||
|
1. OpenClaw ensures a oneshot `macp` session and preserves the requested `agentId`.
|
||||||
|
2. `runTurn` writes the turn prompt to `~/.mosaic/macp-oc/<session>-<request>.md`.
|
||||||
|
3. The plugin appends a pending MACP task to the configured orchestrator queue.
|
||||||
|
4. The plugin triggers `~/.config/mosaic/bin/mosaic-orchestrator-run --once` in the configured repo root.
|
||||||
|
5. The plugin polls for `.mosaic/orchestrator/results/<task-id>.json` and streams the result back to OpenClaw.
|
||||||
|
|
||||||
## Verification
|
## Verification
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
pnpm exec tsc --noEmit -p plugins/macp/tsconfig.json
|
pnpm --filter @mosaic/oc-macp-plugin typecheck || npx tsc --noEmit -p plugins/macp/tsconfig.json
|
||||||
|
pnpm prettier --write "plugins/macp/**/*.{ts,json,md}"
|
||||||
|
pnpm format:check
|
||||||
```
|
```
|
||||||
|
|||||||
@@ -1,18 +1,18 @@
|
|||||||
{
|
{
|
||||||
"id": "macp",
|
"id": "macp",
|
||||||
"name": "MACP Runtime",
|
"name": "MACP Runtime",
|
||||||
"description": "Registers the macp ACP runtime backend and dispatches Pi turns through the MACP Pi runner.",
|
"description": "Registers the macp ACP runtime backend and routes turns through the MACP controller queue.",
|
||||||
"configSchema": {
|
"configSchema": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"additionalProperties": false,
|
"additionalProperties": false,
|
||||||
"properties": {
|
"properties": {
|
||||||
"defaultModel": {
|
"defaultModel": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"description": "Default Pi model in provider/model format. Defaults to openai/gpt-5-mini."
|
"description": "Default Pi model in provider/model format. Retained for Pi bridge compatibility. Defaults to openai/gpt-5-mini."
|
||||||
},
|
},
|
||||||
"systemPrompt": {
|
"systemPrompt": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"description": "Optional system prompt prepended to each Pi turn."
|
"description": "Optional system prompt retained for Pi bridge compatibility."
|
||||||
},
|
},
|
||||||
"timeoutMs": {
|
"timeoutMs": {
|
||||||
"type": "number",
|
"type": "number",
|
||||||
@@ -21,7 +21,23 @@
|
|||||||
},
|
},
|
||||||
"logDir": {
|
"logDir": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"description": "Directory for Pi runtime logs. Defaults to the plugin state dir."
|
"description": "Directory for plugin state/log files. Defaults to the plugin state dir."
|
||||||
|
},
|
||||||
|
"repoRoot": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Repository root containing .mosaic/orchestrator. Defaults to ~/src/mosaic-mono-v1."
|
||||||
|
},
|
||||||
|
"orchDir": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Override for the orchestrator directory. Defaults to <repoRoot>/.mosaic/orchestrator."
|
||||||
|
},
|
||||||
|
"defaultDispatch": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Dispatch type written into queued MACP tasks. Defaults to yolo."
|
||||||
|
},
|
||||||
|
"defaultRuntime": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "Fallback runtime when agentId is unavailable. Defaults to codex."
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import path from 'node:path';
|
import * as os from 'node:os';
|
||||||
|
import * as path from 'node:path';
|
||||||
|
|
||||||
import {
|
import {
|
||||||
registerAcpRuntimeBackend,
|
registerAcpRuntimeBackend,
|
||||||
@@ -17,10 +18,27 @@ type PluginConfig = {
|
|||||||
systemPrompt?: string;
|
systemPrompt?: string;
|
||||||
timeoutMs?: number;
|
timeoutMs?: number;
|
||||||
logDir?: string;
|
logDir?: string;
|
||||||
|
repoRoot?: string;
|
||||||
|
orchDir?: string;
|
||||||
|
defaultDispatch?: string;
|
||||||
|
defaultRuntime?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
function expandHome(rawPath: string): string {
|
||||||
|
if (rawPath === '~') {
|
||||||
|
return os.homedir();
|
||||||
|
}
|
||||||
|
if (rawPath.startsWith('~/')) {
|
||||||
|
return path.join(os.homedir(), rawPath.slice(2));
|
||||||
|
}
|
||||||
|
return rawPath;
|
||||||
|
}
|
||||||
|
|
||||||
function resolveConfig(pluginConfig?: Record<string, unknown>, stateDir?: string) {
|
function resolveConfig(pluginConfig?: Record<string, unknown>, stateDir?: string) {
|
||||||
const config = (pluginConfig ?? {}) as PluginConfig;
|
const config = (pluginConfig ?? {}) as PluginConfig;
|
||||||
|
const repoRoot = config.repoRoot?.trim()
|
||||||
|
? path.resolve(expandHome(config.repoRoot))
|
||||||
|
: path.resolve('/home/jarvis/src/mosaic-mono-v1');
|
||||||
return {
|
return {
|
||||||
defaultModel: config.defaultModel?.trim() || 'openai/gpt-5-mini',
|
defaultModel: config.defaultModel?.trim() || 'openai/gpt-5-mini',
|
||||||
systemPrompt: config.systemPrompt ?? '',
|
systemPrompt: config.systemPrompt ?? '',
|
||||||
@@ -30,7 +48,15 @@ function resolveConfig(pluginConfig?: Record<string, unknown>, stateDir?: string
|
|||||||
config.timeoutMs > 0
|
config.timeoutMs > 0
|
||||||
? config.timeoutMs
|
? config.timeoutMs
|
||||||
: 300_000,
|
: 300_000,
|
||||||
stateDir: config.logDir?.trim() ? path.resolve(config.logDir) : (stateDir ?? process.cwd()),
|
stateDir: config.logDir?.trim()
|
||||||
|
? path.resolve(expandHome(config.logDir))
|
||||||
|
: (stateDir ?? process.cwd()),
|
||||||
|
repoRoot,
|
||||||
|
orchDir: config.orchDir?.trim()
|
||||||
|
? path.resolve(expandHome(config.orchDir))
|
||||||
|
: path.join(repoRoot, '.mosaic', 'orchestrator'),
|
||||||
|
defaultDispatch: config.defaultDispatch?.trim() || 'yolo',
|
||||||
|
defaultRuntime: config.defaultRuntime?.trim() || 'codex',
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,7 +77,7 @@ function createMacpRuntimeService(pluginConfig?: Record<string, unknown>): OpenC
|
|||||||
healthy: () => runtime !== null,
|
healthy: () => runtime !== null,
|
||||||
});
|
});
|
||||||
ctx.logger.info(
|
ctx.logger.info(
|
||||||
`macp runtime backend registered (defaultModel: ${resolved.defaultModel}, timeoutMs: ${resolved.timeoutMs})`,
|
`macp runtime backend registered (defaultRuntime: ${resolved.defaultRuntime}, defaultDispatch: ${resolved.defaultDispatch}, timeoutMs: ${resolved.timeoutMs})`,
|
||||||
);
|
);
|
||||||
},
|
},
|
||||||
async stop(_ctx: OpenClawPluginServiceContext) {
|
async stop(_ctx: OpenClawPluginServiceContext) {
|
||||||
@@ -64,7 +90,8 @@ function createMacpRuntimeService(pluginConfig?: Record<string, unknown>): OpenC
|
|||||||
const plugin = {
|
const plugin = {
|
||||||
id: 'macp',
|
id: 'macp',
|
||||||
name: 'MACP Runtime',
|
name: 'MACP Runtime',
|
||||||
description: 'ACP runtime backend that dispatches Pi oneshot sessions through MACP.',
|
description:
|
||||||
|
'ACP runtime backend that dispatches OpenClaw oneshot sessions through the MACP controller queue.',
|
||||||
register(api: OpenClawPluginApi) {
|
register(api: OpenClawPluginApi) {
|
||||||
api.registerService(createMacpRuntimeService(api.pluginConfig));
|
api.registerService(createMacpRuntimeService(api.pluginConfig));
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
import path from 'node:path';
|
import { spawn } from 'node:child_process';
|
||||||
import { access } from 'node:fs/promises';
|
import { randomUUID } from 'node:crypto';
|
||||||
|
import { access, mkdir, open, readFile, rename, rm, writeFile } from 'node:fs/promises';
|
||||||
|
import * as os from 'node:os';
|
||||||
|
import * as path from 'node:path';
|
||||||
|
|
||||||
import type {
|
import type {
|
||||||
AcpRuntime,
|
AcpRuntime,
|
||||||
@@ -12,13 +15,15 @@ import type {
|
|||||||
AcpRuntimeTurnInput,
|
AcpRuntimeTurnInput,
|
||||||
} from '/home/jarvis/.npm-global/lib/node_modules/openclaw/dist/plugin-sdk/acp-runtime.js';
|
} from '/home/jarvis/.npm-global/lib/node_modules/openclaw/dist/plugin-sdk/acp-runtime.js';
|
||||||
|
|
||||||
import { formatAssistantEvent, runPiTurn } from './pi-bridge.js';
|
|
||||||
|
|
||||||
export interface MacpRuntimeConfig {
|
export interface MacpRuntimeConfig {
|
||||||
defaultModel: string;
|
defaultModel: string;
|
||||||
systemPrompt: string;
|
systemPrompt: string;
|
||||||
timeoutMs: number;
|
timeoutMs: number;
|
||||||
stateDir: string;
|
stateDir: string;
|
||||||
|
repoRoot?: string;
|
||||||
|
orchDir?: string;
|
||||||
|
defaultDispatch?: string;
|
||||||
|
defaultRuntime?: string;
|
||||||
logger?: {
|
logger?: {
|
||||||
info?: (message: string) => void;
|
info?: (message: string) => void;
|
||||||
warn?: (message: string) => void;
|
warn?: (message: string) => void;
|
||||||
@@ -28,18 +33,96 @@ export interface MacpRuntimeConfig {
|
|||||||
type HandleState = {
|
type HandleState = {
|
||||||
name: string;
|
name: string;
|
||||||
agent: string;
|
agent: string;
|
||||||
|
runtime: string;
|
||||||
cwd: string;
|
cwd: string;
|
||||||
model: string;
|
model: string;
|
||||||
systemPrompt: string;
|
systemPrompt: string;
|
||||||
timeoutMs: number;
|
timeoutMs: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
type OrchestratorTask = {
|
||||||
|
id: string;
|
||||||
|
title: string;
|
||||||
|
status: 'pending';
|
||||||
|
dispatch: string;
|
||||||
|
runtime: string;
|
||||||
|
worktree: string;
|
||||||
|
brief_path: string;
|
||||||
|
_brief_temp_path: string;
|
||||||
|
timeout_seconds: number;
|
||||||
|
metadata: Record<string, unknown>;
|
||||||
|
};
|
||||||
|
|
||||||
|
type QueueFile = {
|
||||||
|
tasks: OrchestratorTask[];
|
||||||
|
};
|
||||||
|
|
||||||
|
type TaskGateResult = {
|
||||||
|
command?: string;
|
||||||
|
exit_code?: number;
|
||||||
|
type?: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
type TaskResult = {
|
||||||
|
task_id: string;
|
||||||
|
status: string;
|
||||||
|
summary?: string;
|
||||||
|
error?: unknown;
|
||||||
|
escalation_reason?: unknown;
|
||||||
|
branch?: string | null;
|
||||||
|
pr?: string | null;
|
||||||
|
files_changed?: string[];
|
||||||
|
gate_results?: TaskGateResult[];
|
||||||
|
metadata?: Record<string, unknown>;
|
||||||
|
};
|
||||||
|
|
||||||
const MACP_CAPABILITIES: AcpRuntimeCapabilities = {
|
const MACP_CAPABILITIES: AcpRuntimeCapabilities = {
|
||||||
controls: [],
|
controls: [],
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const DEFAULT_REPO_ROOT = '~/src/mosaic-mono-v1';
|
||||||
|
const ORCHESTRATOR_RUN_PATH = '~/.config/mosaic/bin/mosaic-orchestrator-run';
|
||||||
const PI_RUNNER_PATH = '/home/jarvis/src/mosaic-mono-v1/tools/macp/dispatcher/pi_runner.ts';
|
const PI_RUNNER_PATH = '/home/jarvis/src/mosaic-mono-v1/tools/macp/dispatcher/pi_runner.ts';
|
||||||
|
|
||||||
|
function expandHome(rawPath: string): string {
|
||||||
|
if (rawPath === '~') {
|
||||||
|
return os.homedir();
|
||||||
|
}
|
||||||
|
if (rawPath.startsWith('~/')) {
|
||||||
|
return path.join(os.homedir(), rawPath.slice(2));
|
||||||
|
}
|
||||||
|
return rawPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveRepoRoot(config: MacpRuntimeConfig): string {
|
||||||
|
return path.resolve(expandHome(config.repoRoot?.trim() || DEFAULT_REPO_ROOT));
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveOrchDir(config: MacpRuntimeConfig): string {
|
||||||
|
if (config.orchDir?.trim()) {
|
||||||
|
return path.resolve(expandHome(config.orchDir));
|
||||||
|
}
|
||||||
|
return path.join(resolveRepoRoot(config), '.mosaic', 'orchestrator');
|
||||||
|
}
|
||||||
|
|
||||||
|
function resolveOrchestratorRunPath(): string {
|
||||||
|
return path.resolve(expandHome(ORCHESTRATOR_RUN_PATH));
|
||||||
|
}
|
||||||
|
|
||||||
|
function shellQuote(value: string): string {
|
||||||
|
return `'${value.replace(/'/g, `'\\''`)}'`;
|
||||||
|
}
|
||||||
|
|
||||||
|
function sanitizeSegment(value: string): string {
|
||||||
|
return (
|
||||||
|
value
|
||||||
|
.trim()
|
||||||
|
.toLowerCase()
|
||||||
|
.replace(/[^a-z0-9]+/g, '-')
|
||||||
|
.replace(/^-+|-+$/g, '') || 'task'
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
function encodeHandleState(state: HandleState): string {
|
function encodeHandleState(state: HandleState): string {
|
||||||
return JSON.stringify(state);
|
return JSON.stringify(state);
|
||||||
}
|
}
|
||||||
@@ -49,6 +132,7 @@ function decodeHandleState(handle: AcpRuntimeHandle): HandleState {
|
|||||||
if (
|
if (
|
||||||
typeof parsed.name !== 'string' ||
|
typeof parsed.name !== 'string' ||
|
||||||
typeof parsed.agent !== 'string' ||
|
typeof parsed.agent !== 'string' ||
|
||||||
|
typeof parsed.runtime !== 'string' ||
|
||||||
typeof parsed.cwd !== 'string' ||
|
typeof parsed.cwd !== 'string' ||
|
||||||
typeof parsed.model !== 'string' ||
|
typeof parsed.model !== 'string' ||
|
||||||
typeof parsed.systemPrompt !== 'string' ||
|
typeof parsed.systemPrompt !== 'string' ||
|
||||||
@@ -63,13 +147,260 @@ function toSessionName(input: AcpRuntimeEnsureInput): string {
|
|||||||
return `${input.agent}-${input.sessionKey}`;
|
return `${input.agent}-${input.sessionKey}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function createTaskId(sessionKey: string, requestId: string): string {
|
||||||
|
return `${sanitizeSegment(sessionKey)}-${sanitizeSegment(requestId)}-${randomUUID().slice(0, 8)}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
function createTaskTitle(prompt: string): string {
|
||||||
|
const firstLine = prompt
|
||||||
|
.split(/\r?\n/)
|
||||||
|
.map((line) => line.trim())
|
||||||
|
.find((line) => line.length > 0);
|
||||||
|
return (firstLine || 'MACP OpenClaw task').slice(0, 120);
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildWorktreePath(repoRoot: string, taskId: string): string {
|
||||||
|
const repoName = path.basename(repoRoot);
|
||||||
|
return path.join(path.dirname(repoRoot), `${repoName}-worktrees`, `macp-oc-${taskId}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
function nowIso(): string {
|
||||||
|
return new Date().toISOString();
|
||||||
|
}
|
||||||
|
|
||||||
|
function chunkText(text: string, chunkSize = 4000): string[] {
|
||||||
|
const normalized = text.trim();
|
||||||
|
if (!normalized) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
const chunks: string[] = [];
|
||||||
|
for (let index = 0; index < normalized.length; index += chunkSize) {
|
||||||
|
chunks.push(normalized.slice(index, index + chunkSize));
|
||||||
|
}
|
||||||
|
return chunks;
|
||||||
|
}
|
||||||
|
|
||||||
|
function isRecord(value: unknown): value is Record<string, unknown> {
|
||||||
|
return typeof value === 'object' && value !== null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function toMessage(value: unknown): string {
|
||||||
|
if (typeof value === 'string') {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
if (value instanceof Error) {
|
||||||
|
return value.message;
|
||||||
|
}
|
||||||
|
if (value === null || value === undefined) {
|
||||||
|
return '';
|
||||||
|
}
|
||||||
|
return JSON.stringify(value, null, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
function abortError(): Error {
|
||||||
|
const error = new Error('MACP turn aborted.');
|
||||||
|
error.name = 'AbortError';
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function waitFor(ms: number, signal?: AbortSignal): Promise<void> {
|
||||||
|
if (signal?.aborted) {
|
||||||
|
throw abortError();
|
||||||
|
}
|
||||||
|
|
||||||
|
await new Promise<void>((resolve, reject) => {
|
||||||
|
const onAbort = () => {
|
||||||
|
clearTimeout(timeout);
|
||||||
|
signal?.removeEventListener('abort', onAbort);
|
||||||
|
reject(abortError());
|
||||||
|
};
|
||||||
|
|
||||||
|
const timeout = setTimeout(() => {
|
||||||
|
signal?.removeEventListener('abort', onAbort);
|
||||||
|
resolve();
|
||||||
|
}, ms);
|
||||||
|
|
||||||
|
signal?.addEventListener('abort', onAbort, { once: true });
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function writeJsonAtomic(filePath: string, value: unknown): Promise<void> {
|
||||||
|
const tempPath = `${filePath}.${process.pid}.${Date.now()}.tmp`;
|
||||||
|
await writeFile(tempPath, `${JSON.stringify(value, null, 2)}\n`, 'utf-8');
|
||||||
|
await rename(tempPath, filePath);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function loadQueue(tasksPath: string): Promise<QueueFile> {
|
||||||
|
try {
|
||||||
|
const raw = JSON.parse(await readFile(tasksPath, 'utf-8')) as unknown;
|
||||||
|
if (Array.isArray(raw)) {
|
||||||
|
return { tasks: raw as OrchestratorTask[] };
|
||||||
|
}
|
||||||
|
if (isRecord(raw) && Array.isArray(raw.tasks)) {
|
||||||
|
return { tasks: raw.tasks as OrchestratorTask[] };
|
||||||
|
}
|
||||||
|
throw new Error('tasks.json must contain a tasks array.');
|
||||||
|
} catch (error) {
|
||||||
|
const code = (error as NodeJS.ErrnoException).code;
|
||||||
|
if (code === 'ENOENT') {
|
||||||
|
return { tasks: [] };
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function withFileLock<T>(
|
||||||
|
lockPath: string,
|
||||||
|
timeoutMs: number,
|
||||||
|
action: () => Promise<T>,
|
||||||
|
): Promise<T> {
|
||||||
|
const deadline = Date.now() + Math.max(5_000, Math.min(timeoutMs, 30_000));
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
const handle = await open(lockPath, 'wx');
|
||||||
|
try {
|
||||||
|
return await action();
|
||||||
|
} finally {
|
||||||
|
await handle.close();
|
||||||
|
await rm(lockPath, { force: true });
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
const code = (error as NodeJS.ErrnoException).code;
|
||||||
|
if (code !== 'EEXIST') {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
if (Date.now() >= deadline) {
|
||||||
|
throw new Error(`Timed out waiting for orchestrator queue lock: ${lockPath}`);
|
||||||
|
}
|
||||||
|
await waitFor(200);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async function appendTaskToQueue(
|
||||||
|
task: OrchestratorTask,
|
||||||
|
orchDir: string,
|
||||||
|
timeoutMs: number,
|
||||||
|
): Promise<void> {
|
||||||
|
const tasksPath = path.join(orchDir, 'tasks.json');
|
||||||
|
const lockPath = `${tasksPath}.lock`;
|
||||||
|
await mkdir(orchDir, { recursive: true });
|
||||||
|
await withFileLock(lockPath, timeoutMs, async () => {
|
||||||
|
const queue = await loadQueue(tasksPath);
|
||||||
|
queue.tasks.push(task);
|
||||||
|
await writeJsonAtomic(tasksPath, queue);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function readOrchestratorConfig(orchDir: string): Promise<Record<string, unknown>> {
|
||||||
|
const configPath = path.join(orchDir, 'config.json');
|
||||||
|
const config = JSON.parse(await readFile(configPath, 'utf-8')) as unknown;
|
||||||
|
if (!isRecord(config)) {
|
||||||
|
throw new Error(`Invalid orchestrator config: ${configPath}`);
|
||||||
|
}
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function ensureOrchestratorReady(orchDir: string): Promise<void> {
|
||||||
|
const config = await readOrchestratorConfig(orchDir);
|
||||||
|
if (config.enabled !== true) {
|
||||||
|
throw new Error(`MACP orchestrator is disabled in ${path.join(orchDir, 'config.json')}.`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function triggerController(repoRoot: string): void {
|
||||||
|
const child = spawn(
|
||||||
|
'bash',
|
||||||
|
['-lc', `cd ${shellQuote(repoRoot)} && ${shellQuote(resolveOrchestratorRunPath())} --once`],
|
||||||
|
{
|
||||||
|
detached: true,
|
||||||
|
stdio: 'ignore',
|
||||||
|
},
|
||||||
|
);
|
||||||
|
child.unref();
|
||||||
|
}
|
||||||
|
|
||||||
|
async function pollForResult(
|
||||||
|
resultPath: string,
|
||||||
|
timeoutMs: number,
|
||||||
|
signal?: AbortSignal,
|
||||||
|
): Promise<TaskResult> {
|
||||||
|
const deadline = Date.now() + Math.max(timeoutMs, 2_000);
|
||||||
|
|
||||||
|
while (Date.now() <= deadline) {
|
||||||
|
if (signal?.aborted) {
|
||||||
|
throw abortError();
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const raw = JSON.parse(await readFile(resultPath, 'utf-8')) as unknown;
|
||||||
|
if (!isRecord(raw) || typeof raw.task_id !== 'string' || typeof raw.status !== 'string') {
|
||||||
|
throw new Error(`Invalid MACP result payload: ${resultPath}`);
|
||||||
|
}
|
||||||
|
return raw as TaskResult;
|
||||||
|
} catch (error) {
|
||||||
|
const code = (error as NodeJS.ErrnoException).code;
|
||||||
|
if (code !== 'ENOENT' && !(error instanceof SyntaxError)) {
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await waitFor(2_000, signal);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new Error(`Timed out waiting for MACP result: ${resultPath}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
async function resolveResultOutput(result: TaskResult, orchDir: string): Promise<string> {
|
||||||
|
const metadata = isRecord(result.metadata) ? result.metadata : {};
|
||||||
|
const outputCandidates = [metadata.result_output_path, metadata.output_path]
|
||||||
|
.filter((value): value is string => typeof value === 'string' && value.trim().length > 0)
|
||||||
|
.map((value) => (path.isAbsolute(value) ? value : path.resolve(orchDir, value)));
|
||||||
|
|
||||||
|
for (const candidate of outputCandidates) {
|
||||||
|
try {
|
||||||
|
return (await readFile(candidate, 'utf-8')).trim();
|
||||||
|
} catch {
|
||||||
|
// Fall back to formatted result details below.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const lines: string[] = [];
|
||||||
|
if (result.summary) {
|
||||||
|
lines.push(result.summary);
|
||||||
|
}
|
||||||
|
if (result.error) {
|
||||||
|
lines.push(`Error: ${toMessage(result.error)}`);
|
||||||
|
}
|
||||||
|
if (result.escalation_reason) {
|
||||||
|
lines.push(`Escalation: ${toMessage(result.escalation_reason)}`);
|
||||||
|
}
|
||||||
|
if (result.branch) {
|
||||||
|
lines.push(`Branch: ${result.branch}`);
|
||||||
|
}
|
||||||
|
if (result.pr) {
|
||||||
|
lines.push(`PR: ${result.pr}`);
|
||||||
|
}
|
||||||
|
if (Array.isArray(result.files_changed) && result.files_changed.length > 0) {
|
||||||
|
lines.push(`Files changed:\n${result.files_changed.map((file) => `- ${file}`).join('\n')}`);
|
||||||
|
}
|
||||||
|
if (Array.isArray(result.gate_results) && result.gate_results.length > 0) {
|
||||||
|
lines.push(
|
||||||
|
`Quality gates:\n${result.gate_results
|
||||||
|
.map((gate) => `- [${gate.exit_code ?? 0}] ${gate.command ?? 'unknown command'}`)
|
||||||
|
.join('\n')}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return lines.join('\n\n').trim() || JSON.stringify(result, null, 2);
|
||||||
|
}
|
||||||
|
|
||||||
export class MacpRuntime implements AcpRuntime {
|
export class MacpRuntime implements AcpRuntime {
|
||||||
constructor(private readonly config: MacpRuntimeConfig) {}
|
constructor(private readonly config: MacpRuntimeConfig) {}
|
||||||
|
|
||||||
async ensureSession(input: AcpRuntimeEnsureInput): Promise<AcpRuntimeHandle> {
|
async ensureSession(input: AcpRuntimeEnsureInput): Promise<AcpRuntimeHandle> {
|
||||||
if (input.agent !== 'pi') {
|
|
||||||
throw new Error(`macp runtime only supports agentId "pi"; received "${input.agent}".`);
|
|
||||||
}
|
|
||||||
if (input.mode !== 'oneshot') {
|
if (input.mode !== 'oneshot') {
|
||||||
throw new Error(`macp runtime only supports oneshot sessions; received "${input.mode}".`);
|
throw new Error(`macp runtime only supports oneshot sessions; received "${input.mode}".`);
|
||||||
}
|
}
|
||||||
@@ -78,6 +409,7 @@ export class MacpRuntime implements AcpRuntime {
|
|||||||
const state: HandleState = {
|
const state: HandleState = {
|
||||||
name: toSessionName(input),
|
name: toSessionName(input),
|
||||||
agent: input.agent,
|
agent: input.agent,
|
||||||
|
runtime: input.agent || this.config.defaultRuntime || 'codex',
|
||||||
cwd,
|
cwd,
|
||||||
model: this.config.defaultModel,
|
model: this.config.defaultModel,
|
||||||
systemPrompt: this.config.systemPrompt,
|
systemPrompt: this.config.systemPrompt,
|
||||||
@@ -96,120 +428,73 @@ export class MacpRuntime implements AcpRuntime {
|
|||||||
|
|
||||||
async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable<AcpRuntimeEvent> {
|
async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable<AcpRuntimeEvent> {
|
||||||
const state = decodeHandleState(input.handle);
|
const state = decodeHandleState(input.handle);
|
||||||
const logPath = path.join(
|
const repoRoot = resolveRepoRoot(this.config);
|
||||||
this.config.stateDir,
|
const orchDir = resolveOrchDir(this.config);
|
||||||
'macp',
|
const taskId = createTaskId(input.handle.sessionKey, input.requestId);
|
||||||
`${state.name}-${Date.now()}-${input.requestId}.json`,
|
const briefDir = path.join(os.homedir(), '.mosaic', 'macp-oc');
|
||||||
);
|
const briefPath = path.join(briefDir, `${state.name}-${input.requestId}.md`);
|
||||||
|
const resultPath = path.join(orchDir, 'results', `${taskId}.json`);
|
||||||
const streamQueue: AcpRuntimeEvent[] = [];
|
|
||||||
const waiters: Array<() => void> = [];
|
|
||||||
let finished = false;
|
|
||||||
|
|
||||||
const push = (event: AcpRuntimeEvent) => {
|
|
||||||
streamQueue.push(event);
|
|
||||||
const waiter = waiters.shift();
|
|
||||||
waiter?.();
|
|
||||||
};
|
|
||||||
|
|
||||||
const resultPromise = runPiTurn({
|
|
||||||
model: state.model,
|
|
||||||
systemPrompt: state.systemPrompt,
|
|
||||||
prompt: input.text,
|
|
||||||
workDir: state.cwd,
|
|
||||||
timeoutMs: state.timeoutMs,
|
|
||||||
logPath,
|
|
||||||
...(input.signal ? { signal: input.signal } : {}),
|
|
||||||
onEvent: async (event) => {
|
|
||||||
switch (event.type) {
|
|
||||||
case 'message_update': {
|
|
||||||
const formatted = formatAssistantEvent(event.assistantMessageEvent);
|
|
||||||
if (formatted?.text) {
|
|
||||||
push({
|
|
||||||
type: 'text_delta',
|
|
||||||
text: formatted.text,
|
|
||||||
...(formatted.stream ? { stream: formatted.stream } : {}),
|
|
||||||
...(formatted.tag ? { tag: formatted.tag } : {}),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case 'tool_execution_start':
|
|
||||||
push({
|
|
||||||
type: 'tool_call',
|
|
||||||
text: JSON.stringify(event.args ?? {}),
|
|
||||||
tag: 'tool_call',
|
|
||||||
toolCallId: event.toolCallId,
|
|
||||||
title: event.toolName,
|
|
||||||
status: 'running',
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
case 'tool_execution_update':
|
|
||||||
push({
|
|
||||||
type: 'tool_call',
|
|
||||||
text: JSON.stringify(event.partialResult ?? {}),
|
|
||||||
tag: 'tool_call_update',
|
|
||||||
toolCallId: event.toolCallId,
|
|
||||||
title: event.toolName,
|
|
||||||
status: 'running',
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
case 'tool_execution_end':
|
|
||||||
push({
|
|
||||||
type: 'tool_call',
|
|
||||||
text: JSON.stringify(event.result ?? {}),
|
|
||||||
tag: 'tool_call_update',
|
|
||||||
toolCallId: event.toolCallId,
|
|
||||||
title: event.toolName,
|
|
||||||
status: event.isError ? 'error' : 'completed',
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
case 'turn_end':
|
|
||||||
push({
|
|
||||||
type: 'status',
|
|
||||||
text: 'Pi turn completed.',
|
|
||||||
tag: 'usage_update',
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
.then((result) => {
|
|
||||||
if (result.output) {
|
|
||||||
push({
|
|
||||||
type: 'status',
|
|
||||||
text: result.output,
|
|
||||||
tag: 'session_info_update',
|
|
||||||
});
|
|
||||||
}
|
|
||||||
push({
|
|
||||||
type: 'done',
|
|
||||||
stopReason: result.stopReason,
|
|
||||||
});
|
|
||||||
})
|
|
||||||
.catch((error) => {
|
|
||||||
push({
|
|
||||||
type: 'error',
|
|
||||||
message: error instanceof Error ? error.message : String(error),
|
|
||||||
});
|
|
||||||
})
|
|
||||||
.finally(() => {
|
|
||||||
finished = true;
|
|
||||||
while (waiters.length > 0) {
|
|
||||||
waiters.shift()?.();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (!finished || streamQueue.length > 0) {
|
await access(resolveOrchestratorRunPath());
|
||||||
if (streamQueue.length === 0) {
|
await ensureOrchestratorReady(orchDir);
|
||||||
await new Promise<void>((resolve) => waiters.push(resolve));
|
await mkdir(briefDir, { recursive: true });
|
||||||
continue;
|
await mkdir(path.dirname(resultPath), { recursive: true });
|
||||||
}
|
await writeFile(briefPath, `${input.text.trimEnd()}\n`, 'utf-8');
|
||||||
yield streamQueue.shift() as AcpRuntimeEvent;
|
|
||||||
|
const task: OrchestratorTask = {
|
||||||
|
id: taskId,
|
||||||
|
title: createTaskTitle(input.text),
|
||||||
|
status: 'pending',
|
||||||
|
dispatch: this.config.defaultDispatch || 'yolo',
|
||||||
|
runtime: state.runtime || this.config.defaultRuntime || 'codex',
|
||||||
|
worktree: buildWorktreePath(repoRoot, taskId),
|
||||||
|
brief_path: briefPath,
|
||||||
|
_brief_temp_path: briefPath,
|
||||||
|
timeout_seconds: Math.max(1, Math.ceil(state.timeoutMs / 1_000)),
|
||||||
|
metadata: {
|
||||||
|
source: 'openclaw-macp-plugin',
|
||||||
|
created_at: nowIso(),
|
||||||
|
session_key: input.handle.sessionKey,
|
||||||
|
request_id: input.requestId,
|
||||||
|
agent_id: state.agent,
|
||||||
|
cwd: state.cwd,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
this.config.logger?.info?.(
|
||||||
|
`Queueing MACP orchestrator task ${taskId} (${task.runtime}/${task.dispatch}).`,
|
||||||
|
);
|
||||||
|
yield {
|
||||||
|
type: 'status',
|
||||||
|
text: `Queued MACP task ${taskId}.`,
|
||||||
|
tag: 'session_info_update',
|
||||||
|
};
|
||||||
|
|
||||||
|
await appendTaskToQueue(task, orchDir, state.timeoutMs);
|
||||||
|
triggerController(repoRoot);
|
||||||
|
|
||||||
|
const result = await pollForResult(resultPath, state.timeoutMs, input.signal);
|
||||||
|
const output = await resolveResultOutput(result, orchDir);
|
||||||
|
for (const chunk of chunkText(output)) {
|
||||||
|
yield {
|
||||||
|
type: 'text_delta',
|
||||||
|
text: chunk,
|
||||||
|
stream: 'output',
|
||||||
|
tag: 'agent_message_chunk',
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
yield {
|
||||||
|
type: 'done',
|
||||||
|
stopReason: result.status,
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
yield {
|
||||||
|
type: 'error',
|
||||||
|
message: error instanceof Error ? error.message : String(error),
|
||||||
|
};
|
||||||
} finally {
|
} finally {
|
||||||
await resultPromise;
|
await rm(briefPath, { force: true }).catch(() => undefined);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -220,47 +505,49 @@ export class MacpRuntime implements AcpRuntime {
|
|||||||
async getStatus(input: { handle: AcpRuntimeHandle }): Promise<AcpRuntimeStatus> {
|
async getStatus(input: { handle: AcpRuntimeHandle }): Promise<AcpRuntimeStatus> {
|
||||||
const state = decodeHandleState(input.handle);
|
const state = decodeHandleState(input.handle);
|
||||||
return {
|
return {
|
||||||
summary: 'macp Pi oneshot runtime ready',
|
summary: 'macp controller oneshot runtime ready',
|
||||||
backendSessionId: state.name,
|
backendSessionId: state.name,
|
||||||
agentSessionId: state.name,
|
agentSessionId: state.name,
|
||||||
details: {
|
details: {
|
||||||
mode: 'oneshot',
|
mode: 'oneshot',
|
||||||
model: state.model,
|
agent: state.agent,
|
||||||
|
runtime: state.runtime,
|
||||||
cwd: state.cwd,
|
cwd: state.cwd,
|
||||||
|
repoRoot: resolveRepoRoot(this.config),
|
||||||
|
orchDir: resolveOrchDir(this.config),
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
async doctor(): Promise<AcpRuntimeDoctorReport> {
|
async doctor(): Promise<AcpRuntimeDoctorReport> {
|
||||||
try {
|
try {
|
||||||
await access(PI_RUNNER_PATH);
|
const repoRoot = resolveRepoRoot(this.config);
|
||||||
const importErrors: string[] = [];
|
const orchDir = resolveOrchDir(this.config);
|
||||||
await import('@mariozechner/pi-agent-core').catch((error) => {
|
const orchestratorRunPath = resolveOrchestratorRunPath();
|
||||||
importErrors.push(error instanceof Error ? error.message : String(error));
|
await access(orchestratorRunPath);
|
||||||
});
|
await access(repoRoot);
|
||||||
await import('@mariozechner/pi-ai').catch((error) => {
|
await access(orchDir);
|
||||||
importErrors.push(error instanceof Error ? error.message : String(error));
|
await access(PI_RUNNER_PATH).catch(() => undefined);
|
||||||
});
|
const orchestratorConfig = await readOrchestratorConfig(orchDir);
|
||||||
if (importErrors.length > 0) {
|
if (orchestratorConfig.enabled !== true) {
|
||||||
return {
|
return {
|
||||||
ok: false,
|
ok: false,
|
||||||
code: 'MACP_DEPS_MISSING',
|
code: 'MACP_ORCH_DISABLED',
|
||||||
message: 'MACP runtime dependencies are not available.',
|
message: 'MACP orchestrator is disabled for the configured repo.',
|
||||||
installCommand: 'pnpm install --frozen-lockfile',
|
details: [path.join(orchDir, 'config.json')],
|
||||||
details: importErrors,
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
return {
|
return {
|
||||||
ok: true,
|
ok: true,
|
||||||
message: 'MACP runtime is ready.',
|
message: 'MACP runtime is ready.',
|
||||||
details: [PI_RUNNER_PATH],
|
details: [orchestratorRunPath, repoRoot, orchDir],
|
||||||
};
|
};
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
return {
|
return {
|
||||||
ok: false,
|
ok: false,
|
||||||
code: 'MACP_PI_RUNNER_MISSING',
|
code: 'MACP_ORCH_MISSING',
|
||||||
message: 'Pi runner was not found.',
|
message: error instanceof Error ? error.message : String(error),
|
||||||
details: [error instanceof Error ? error.message : String(error)],
|
installCommand: 'pnpm install --frozen-lockfile',
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user