Files
stack/plugins/macp/src/macp-runtime.ts
Mos (Agent) d3fdc4ff54
Some checks failed
ci/woodpecker/push/ci Pipeline failed
ci/woodpecker/pr/ci Pipeline failed
fix: update MACP plugin paths from /home/jarvis to dynamic resolution
- plugins/macp/src/index.ts: updated OC SDK imports to local paths
- plugins/macp/src/macp-runtime.ts: DEFAULT_REPO_ROOT → mosaic-stack-new, PI_RUNNER_PATH updated
- plugins/macp/openclaw.plugin.json: default repoRoot description updated
- Removed stale tsconfig.tsbuildinfo with old path references
2026-03-30 19:46:52 +00:00

563 lines
16 KiB
TypeScript

import { spawn } from 'node:child_process';
import { randomUUID } from 'node:crypto';
import { access, mkdir, open, readFile, rename, rm, writeFile } from 'node:fs/promises';
import * as os from 'node:os';
import * as path from 'node:path';
import type {
AcpRuntime,
AcpRuntimeCapabilities,
AcpRuntimeDoctorReport,
AcpRuntimeEnsureInput,
AcpRuntimeEvent,
AcpRuntimeHandle,
AcpRuntimeStatus,
AcpRuntimeTurnInput,
} from '/home/woltjejason/.npm-global/lib/node_modules/openclaw/dist/plugin-sdk/acp-runtime.js';
export interface MacpRuntimeConfig {
defaultModel: string;
systemPrompt: string;
timeoutMs: number;
stateDir: string;
repoRoot?: string;
orchDir?: string;
defaultDispatch?: string;
defaultRuntime?: string;
logger?: {
info?: (message: string) => void;
warn?: (message: string) => void;
};
}
type HandleState = {
name: string;
agent: string;
runtime: string;
cwd: string;
model: string;
systemPrompt: string;
timeoutMs: number;
};
type OrchestratorTask = {
id: string;
title: string;
status: 'pending';
dispatch: string;
runtime: string;
worktree: string;
brief_path: string;
_brief_temp_path: string;
timeout_seconds: number;
metadata: Record<string, unknown>;
};
type QueueFile = {
tasks: OrchestratorTask[];
};
type TaskGateResult = {
command?: string;
exit_code?: number;
type?: string;
};
type TaskResult = {
task_id: string;
status: string;
summary?: string;
error?: unknown;
escalation_reason?: unknown;
branch?: string | null;
pr?: string | null;
files_changed?: string[];
gate_results?: TaskGateResult[];
metadata?: Record<string, unknown>;
};
const MACP_CAPABILITIES: AcpRuntimeCapabilities = {
controls: [],
};
const DEFAULT_REPO_ROOT = '~/src/mosaic-stack-new';
const ORCHESTRATOR_RUN_PATH = '~/.config/mosaic/bin/mosaic-orchestrator-run';
const PI_RUNNER_PATH = '/home/woltjejason/src/mosaic-stack-new/tools/macp/dispatcher/pi_runner.ts';
function expandHome(rawPath: string): string {
if (rawPath === '~') {
return os.homedir();
}
if (rawPath.startsWith('~/')) {
return path.join(os.homedir(), rawPath.slice(2));
}
return rawPath;
}
function resolveRepoRoot(config: MacpRuntimeConfig): string {
return path.resolve(expandHome(config.repoRoot?.trim() || DEFAULT_REPO_ROOT));
}
function resolveOrchDir(config: MacpRuntimeConfig): string {
if (config.orchDir?.trim()) {
return path.resolve(expandHome(config.orchDir));
}
return path.join(resolveRepoRoot(config), '.mosaic', 'orchestrator');
}
function resolveOrchestratorRunPath(): string {
return path.resolve(expandHome(ORCHESTRATOR_RUN_PATH));
}
function shellQuote(value: string): string {
return `'${value.replace(/'/g, `'\\''`)}'`;
}
function sanitizeSegment(value: string): string {
return (
value
.trim()
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-+|-+$/g, '') || 'task'
);
}
function encodeHandleState(state: HandleState): string {
return JSON.stringify(state);
}
function decodeHandleState(handle: AcpRuntimeHandle): HandleState {
const parsed = JSON.parse(handle.runtimeSessionName) as Partial<HandleState>;
if (
typeof parsed.name !== 'string' ||
typeof parsed.agent !== 'string' ||
typeof parsed.runtime !== 'string' ||
typeof parsed.cwd !== 'string' ||
typeof parsed.model !== 'string' ||
typeof parsed.systemPrompt !== 'string' ||
typeof parsed.timeoutMs !== 'number'
) {
throw new Error('Invalid MACP runtime handle state.');
}
return parsed as HandleState;
}
function toSessionName(input: AcpRuntimeEnsureInput): string {
return `${input.agent}-${input.sessionKey}`;
}
function createTaskId(sessionKey: string, requestId: string): string {
return `${sanitizeSegment(sessionKey)}-${sanitizeSegment(requestId)}-${randomUUID().slice(0, 8)}`;
}
function createTaskTitle(prompt: string): string {
const firstLine = prompt
.split(/\r?\n/)
.map((line) => line.trim())
.find((line) => line.length > 0);
return (firstLine || 'MACP OpenClaw task').slice(0, 120);
}
function buildWorktreePath(repoRoot: string, taskId: string): string {
const repoName = path.basename(repoRoot);
return path.join(path.dirname(repoRoot), `${repoName}-worktrees`, `macp-oc-${taskId}`);
}
function nowIso(): string {
return new Date().toISOString();
}
function chunkText(text: string, chunkSize = 4000): string[] {
const normalized = text.trim();
if (!normalized) {
return [];
}
const chunks: string[] = [];
for (let index = 0; index < normalized.length; index += chunkSize) {
chunks.push(normalized.slice(index, index + chunkSize));
}
return chunks;
}
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null;
}
function toMessage(value: unknown): string {
if (typeof value === 'string') {
return value;
}
if (value instanceof Error) {
return value.message;
}
if (value === null || value === undefined) {
return '';
}
return JSON.stringify(value, null, 2);
}
function abortError(): Error {
const error = new Error('MACP turn aborted.');
error.name = 'AbortError';
return error;
}
async function waitFor(ms: number, signal?: AbortSignal): Promise<void> {
if (signal?.aborted) {
throw abortError();
}
await new Promise<void>((resolve, reject) => {
const onAbort = () => {
clearTimeout(timeout);
signal?.removeEventListener('abort', onAbort);
reject(abortError());
};
const timeout = setTimeout(() => {
signal?.removeEventListener('abort', onAbort);
resolve();
}, ms);
signal?.addEventListener('abort', onAbort, { once: true });
});
}
async function writeJsonAtomic(filePath: string, value: unknown): Promise<void> {
const tempPath = `${filePath}.${process.pid}.${Date.now()}.tmp`;
await writeFile(tempPath, `${JSON.stringify(value, null, 2)}\n`, 'utf-8');
await rename(tempPath, filePath);
}
async function loadQueue(tasksPath: string): Promise<QueueFile> {
try {
const raw = JSON.parse(await readFile(tasksPath, 'utf-8')) as unknown;
if (Array.isArray(raw)) {
return { tasks: raw as OrchestratorTask[] };
}
if (isRecord(raw) && Array.isArray(raw.tasks)) {
return { tasks: raw.tasks as OrchestratorTask[] };
}
throw new Error('tasks.json must contain a tasks array.');
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code === 'ENOENT') {
return { tasks: [] };
}
throw error;
}
}
async function withFileLock<T>(
lockPath: string,
timeoutMs: number,
action: () => Promise<T>,
): Promise<T> {
const deadline = Date.now() + Math.max(5_000, Math.min(timeoutMs, 30_000));
while (true) {
try {
const handle = await open(lockPath, 'wx');
try {
return await action();
} finally {
await handle.close();
await rm(lockPath, { force: true });
}
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code !== 'EEXIST') {
throw error;
}
if (Date.now() >= deadline) {
throw new Error(`Timed out waiting for orchestrator queue lock: ${lockPath}`);
}
await waitFor(200);
}
}
}
async function appendTaskToQueue(
task: OrchestratorTask,
orchDir: string,
timeoutMs: number,
): Promise<void> {
const tasksPath = path.join(orchDir, 'tasks.json');
const lockPath = `${tasksPath}.lock`;
await mkdir(orchDir, { recursive: true });
await withFileLock(lockPath, timeoutMs, async () => {
const queue = await loadQueue(tasksPath);
queue.tasks.push(task);
await writeJsonAtomic(tasksPath, queue);
});
}
async function readOrchestratorConfig(orchDir: string): Promise<Record<string, unknown>> {
const configPath = path.join(orchDir, 'config.json');
const config = JSON.parse(await readFile(configPath, 'utf-8')) as unknown;
if (!isRecord(config)) {
throw new Error(`Invalid orchestrator config: ${configPath}`);
}
return config;
}
async function ensureOrchestratorReady(orchDir: string): Promise<void> {
const config = await readOrchestratorConfig(orchDir);
if (config.enabled !== true) {
throw new Error(`MACP orchestrator is disabled in ${path.join(orchDir, 'config.json')}.`);
}
}
function triggerController(repoRoot: string): void {
const child = spawn(
'bash',
['-lc', `cd ${shellQuote(repoRoot)} && ${shellQuote(resolveOrchestratorRunPath())} --once`],
{
detached: true,
stdio: 'ignore',
},
);
child.unref();
}
async function pollForResult(
resultPath: string,
timeoutMs: number,
signal?: AbortSignal,
): Promise<TaskResult> {
const deadline = Date.now() + Math.max(timeoutMs, 2_000);
while (Date.now() <= deadline) {
if (signal?.aborted) {
throw abortError();
}
try {
const raw = JSON.parse(await readFile(resultPath, 'utf-8')) as unknown;
if (!isRecord(raw) || typeof raw.task_id !== 'string' || typeof raw.status !== 'string') {
throw new Error(`Invalid MACP result payload: ${resultPath}`);
}
return raw as TaskResult;
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code !== 'ENOENT' && !(error instanceof SyntaxError)) {
throw error;
}
}
await waitFor(2_000, signal);
}
throw new Error(`Timed out waiting for MACP result: ${resultPath}`);
}
async function resolveResultOutput(result: TaskResult, orchDir: string): Promise<string> {
const metadata = isRecord(result.metadata) ? result.metadata : {};
const outputCandidates = [metadata.result_output_path, metadata.output_path]
.filter((value): value is string => typeof value === 'string' && value.trim().length > 0)
.map((value) => (path.isAbsolute(value) ? value : path.resolve(orchDir, value)));
for (const candidate of outputCandidates) {
try {
return (await readFile(candidate, 'utf-8')).trim();
} catch {
// Fall back to formatted result details below.
}
}
const lines: string[] = [];
if (result.summary) {
lines.push(result.summary);
}
if (result.error) {
lines.push(`Error: ${toMessage(result.error)}`);
}
if (result.escalation_reason) {
lines.push(`Escalation: ${toMessage(result.escalation_reason)}`);
}
if (result.branch) {
lines.push(`Branch: ${result.branch}`);
}
if (result.pr) {
lines.push(`PR: ${result.pr}`);
}
if (Array.isArray(result.files_changed) && result.files_changed.length > 0) {
lines.push(`Files changed:\n${result.files_changed.map((file) => `- ${file}`).join('\n')}`);
}
if (Array.isArray(result.gate_results) && result.gate_results.length > 0) {
lines.push(
`Quality gates:\n${result.gate_results
.map((gate) => `- [${gate.exit_code ?? 0}] ${gate.command ?? 'unknown command'}`)
.join('\n')}`,
);
}
return lines.join('\n\n').trim() || JSON.stringify(result, null, 2);
}
export class MacpRuntime implements AcpRuntime {
constructor(private readonly config: MacpRuntimeConfig) {}
async ensureSession(input: AcpRuntimeEnsureInput): Promise<AcpRuntimeHandle> {
if (input.mode !== 'oneshot') {
throw new Error(`macp runtime only supports oneshot sessions; received "${input.mode}".`);
}
const cwd = path.resolve(input.cwd ?? process.cwd());
const state: HandleState = {
name: toSessionName(input),
agent: input.agent,
runtime: input.agent || this.config.defaultRuntime || 'codex',
cwd,
model: this.config.defaultModel,
systemPrompt: this.config.systemPrompt,
timeoutMs: this.config.timeoutMs,
};
return {
sessionKey: input.sessionKey,
backend: 'macp',
runtimeSessionName: encodeHandleState(state),
cwd,
backendSessionId: state.name,
agentSessionId: state.name,
};
}
async *runTurn(input: AcpRuntimeTurnInput): AsyncIterable<AcpRuntimeEvent> {
const state = decodeHandleState(input.handle);
const repoRoot = resolveRepoRoot(this.config);
const orchDir = resolveOrchDir(this.config);
const taskId = createTaskId(input.handle.sessionKey, input.requestId);
const briefDir = path.join(os.homedir(), '.mosaic', 'macp-oc');
const briefPath = path.join(briefDir, `${state.name}-${input.requestId}.md`);
const resultPath = path.join(orchDir, 'results', `${taskId}.json`);
try {
await access(resolveOrchestratorRunPath());
await ensureOrchestratorReady(orchDir);
await mkdir(briefDir, { recursive: true });
await mkdir(path.dirname(resultPath), { recursive: true });
await writeFile(briefPath, `${input.text.trimEnd()}\n`, 'utf-8');
const task: OrchestratorTask = {
id: taskId,
title: createTaskTitle(input.text),
status: 'pending',
dispatch: this.config.defaultDispatch || 'yolo',
runtime: state.runtime || this.config.defaultRuntime || 'codex',
worktree: buildWorktreePath(repoRoot, taskId),
brief_path: briefPath,
_brief_temp_path: briefPath,
timeout_seconds: Math.max(1, Math.ceil(state.timeoutMs / 1_000)),
metadata: {
source: 'openclaw-macp-plugin',
created_at: nowIso(),
session_key: input.handle.sessionKey,
request_id: input.requestId,
agent_id: state.agent,
cwd: state.cwd,
},
};
this.config.logger?.info?.(
`Queueing MACP orchestrator task ${taskId} (${task.runtime}/${task.dispatch}).`,
);
yield {
type: 'status',
text: `Queued MACP task ${taskId}.`,
tag: 'session_info_update',
};
await appendTaskToQueue(task, orchDir, state.timeoutMs);
triggerController(repoRoot);
const result = await pollForResult(resultPath, state.timeoutMs, input.signal);
const output = await resolveResultOutput(result, orchDir);
for (const chunk of chunkText(output)) {
yield {
type: 'text_delta',
text: chunk,
stream: 'output',
tag: 'agent_message_chunk',
};
}
yield {
type: 'done',
stopReason: result.status,
};
} catch (error) {
yield {
type: 'error',
message: error instanceof Error ? error.message : String(error),
};
} finally {
await rm(briefPath, { force: true }).catch(() => undefined);
}
}
getCapabilities(): AcpRuntimeCapabilities {
return MACP_CAPABILITIES;
}
async getStatus(input: { handle: AcpRuntimeHandle }): Promise<AcpRuntimeStatus> {
const state = decodeHandleState(input.handle);
return {
summary: 'macp controller oneshot runtime ready',
backendSessionId: state.name,
agentSessionId: state.name,
details: {
mode: 'oneshot',
agent: state.agent,
runtime: state.runtime,
cwd: state.cwd,
repoRoot: resolveRepoRoot(this.config),
orchDir: resolveOrchDir(this.config),
},
};
}
async doctor(): Promise<AcpRuntimeDoctorReport> {
try {
const repoRoot = resolveRepoRoot(this.config);
const orchDir = resolveOrchDir(this.config);
const orchestratorRunPath = resolveOrchestratorRunPath();
await access(orchestratorRunPath);
await access(repoRoot);
await access(orchDir);
await access(PI_RUNNER_PATH).catch(() => undefined);
const orchestratorConfig = await readOrchestratorConfig(orchDir);
if (orchestratorConfig.enabled !== true) {
return {
ok: false,
code: 'MACP_ORCH_DISABLED',
message: 'MACP orchestrator is disabled for the configured repo.',
details: [path.join(orchDir, 'config.json')],
};
}
return {
ok: true,
message: 'MACP runtime is ready.',
details: [orchestratorRunPath, repoRoot, orchDir],
};
} catch (error) {
return {
ok: false,
code: 'MACP_ORCH_MISSING',
message: error instanceof Error ? error.message : String(error),
installCommand: 'pnpm install --frozen-lockfile',
};
}
}
async cancel(_input: { handle: AcpRuntimeHandle; reason?: string }): Promise<void> {
this.config.logger?.info?.('macp runtime cancel requested');
}
async close(_input: { handle: AcpRuntimeHandle; reason: string }): Promise<void> {
this.config.logger?.info?.('macp runtime close requested');
}
}