#!/usr/bin/env python3 """MACP dispatcher library for orchestrator-matrix tasks.""" from __future__ import annotations import datetime as dt import json import os import pathlib import re import shlex import subprocess import tempfile from typing import Any def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).isoformat() def save_json(path: pathlib.Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with tmp.open("w", encoding="utf-8") as handle: json.dump(data, handle, indent=2) handle.write("\n") tmp.replace(path) def _slugify(value: str) -> str: lowered = value.strip().lower() slug = re.sub(r"[^a-z0-9]+", "-", lowered) return slug.strip("-") or "task" def _run_command(command: str, cwd: pathlib.Path, log_path: pathlib.Path, timeout_sec: int) -> tuple[int, str, bool]: log_path.parent.mkdir(parents=True, exist_ok=True) with log_path.open("a", encoding="utf-8") as log: log.write(f"\n[{now_iso()}] COMMAND: {command}\n") log.flush() proc = subprocess.Popen( ["bash", "-lc", command], cwd=str(cwd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", ) timed_out = False try: output, _ = proc.communicate(timeout=max(1, timeout_sec)) exit_code = proc.returncode except subprocess.TimeoutExpired: timed_out = True proc.kill() output, _ = proc.communicate() exit_code = 124 log.write(f"[{now_iso()}] TIMEOUT: exceeded {timeout_sec}s\n") if output: log.write(output) log.write(f"[{now_iso()}] EXIT: {exit_code}\n") return exit_code, output or "", timed_out def _git_capture(args: list[str], cwd: pathlib.Path) -> str: proc = subprocess.run(args, cwd=str(cwd), check=True, capture_output=True, text=True, encoding="utf-8") return proc.stdout.strip() def _resolve_dispatch(task: dict[str, Any]) -> str: return str(task.get("dispatch") or "exec").strip().lower() or "exec" def _resolve_runtime(task: dict[str, Any]) -> str: return str(task.get("runtime") or "codex").strip() or "codex" def _resolve_branch(task: dict[str, Any]) -> str: branch = str(task.get("branch") or "").strip() if branch: return branch task_id_slug = _slugify(str(task.get("id") or "task")) title_slug = _slugify(str(task.get("title") or task.get("id") or "task")) if title_slug == task_id_slug: branch = f"feat/{task_id_slug}" else: branch = f"feat/{task_id_slug}-{title_slug}" task["branch"] = branch return branch def _resolve_worktree_path(task: dict[str, Any], repo_root: pathlib.Path) -> pathlib.Path: explicit = str(task.get("worktree") or "").strip() if explicit: path = pathlib.Path(os.path.expanduser(explicit)).resolve() else: base = pathlib.Path(os.path.expanduser(f"~/src/{repo_root.name}-worktrees")).resolve() path = base / _slugify(str(task.get("id") or task.get("title") or "task")) task["worktree"] = str(path) return path def _default_worktree_path(task: dict[str, Any], repo_root: pathlib.Path, base_template: str) -> pathlib.Path: base = pathlib.Path(os.path.expanduser(base_template.format(repo=repo_root.name))).resolve() return base / _slugify(str(task.get("id") or task.get("title") or "task")) def _resolve_start_point(repo_root: pathlib.Path) -> str: candidates = ["origin/main", "main", "HEAD"] for candidate in candidates: proc = subprocess.run( ["git", "-C", str(repo_root), "rev-parse", "--verify", f"{candidate}^{{commit}}"], capture_output=True, text=True, encoding="utf-8", ) if proc.returncode == 0: return candidate raise RuntimeError(f"Unable to resolve worktree start point for {repo_root}") def _read_brief(task: dict[str, Any], repo_root: pathlib.Path) -> str: brief_path_raw = str(task.get("brief_path") or "").strip() if not brief_path_raw: raise ValueError("MACP yolo dispatch requires brief_path") brief_path = (repo_root / brief_path_raw).resolve() return brief_path.read_text(encoding="utf-8").strip() def _stage_yolo_brief_file(task: dict[str, Any], repo_root: pathlib.Path, orch_dir: pathlib.Path) -> pathlib.Path: brief_dir = (orch_dir / "tmp").resolve() brief_dir.mkdir(parents=True, exist_ok=True) task_id = _slugify(str(task.get("id") or "task")) fd, raw_path = tempfile.mkstemp(prefix=f"brief-{task_id}-", suffix=".tmp", dir=str(brief_dir), text=True) with os.fdopen(fd, "w", encoding="utf-8") as handle: handle.write(_read_brief(task, repo_root)) handle.write("\n") os.chmod(raw_path, 0o600) path = pathlib.Path(raw_path).resolve() task["_brief_temp_path"] = str(path) return path def _resolve_result_path(task: dict[str, Any], orch_dir: pathlib.Path) -> pathlib.Path: result_path_raw = str(task.get("result_path") or "").strip() if result_path_raw: result_path = (orch_dir / result_path_raw).resolve() else: result_path = (orch_dir / "results" / f"{task.get('id', 'task')}.json").resolve() task["result_path"] = str(result_path.relative_to(orch_dir)) return result_path def _resolve_worktree_base(config: dict[str, Any], repo_name: str) -> pathlib.Path: macp_config = dict(config.get("macp") or {}) base_template = str(macp_config.get("worktree_base") or "~/src/{repo}-worktrees") return pathlib.Path(os.path.expanduser(base_template.format(repo=repo_name))).resolve() def _changed_files(task: dict[str, Any]) -> list[str]: worktree_raw = str(task.get("worktree") or "").strip() if not worktree_raw: return [] worktree = pathlib.Path(os.path.expanduser(worktree_raw)).resolve() if not worktree.exists(): return [] proc = subprocess.run( ["git", "-C", str(worktree), "status", "--porcelain"], capture_output=True, text=True, encoding="utf-8", ) if proc.returncode != 0: return [] changed: list[str] = [] for line in proc.stdout.splitlines(): path_text = line[3:].strip() if not path_text: continue if " -> " in path_text: _, path_text = path_text.split(" -> ", 1) changed.append(path_text) return changed def _resolve_repo_root_from_worktree(worktree: pathlib.Path) -> pathlib.Path | None: try: common_dir_raw = _git_capture(["git", "-C", str(worktree), "rev-parse", "--git-common-dir"], worktree) except Exception: return None common_dir = pathlib.Path(common_dir_raw) if not common_dir.is_absolute(): common_dir = (worktree / common_dir).resolve() return common_dir.parent if common_dir.name == ".git" else common_dir def _is_safe_worktree_path(worktree_path: pathlib.Path, config: dict[str, Any]) -> bool: repo_root = _resolve_repo_root_from_worktree(worktree_path) if repo_root is None: return False expected_base = _resolve_worktree_base(config, repo_root.name) try: worktree_path.resolve().relative_to(expected_base) return True except ValueError: return False def setup_worktree(task: dict[str, Any], repo_root: pathlib.Path) -> pathlib.Path: """Create git worktree for task. Returns worktree path.""" worktree_path = _resolve_worktree_path(task, repo_root) branch = _resolve_branch(task) if worktree_path.exists() and (worktree_path / ".git").exists(): return worktree_path worktree_path.parent.mkdir(parents=True, exist_ok=True) start_point = _resolve_start_point(repo_root) subprocess.run( ["git", "-C", str(repo_root), "worktree", "add", "-B", branch, str(worktree_path), start_point], check=True, capture_output=True, text=True, encoding="utf-8", ) task["worktree"] = str(worktree_path) return worktree_path def build_dispatch_command(task: dict[str, Any], repo_root: pathlib.Path) -> str: """Generate execution command based on task dispatch type and runtime.""" dispatch = _resolve_dispatch(task) runtime = _resolve_runtime(task) worktree = pathlib.Path(os.path.expanduser(str(task.get("worktree") or repo_root))).resolve() if dispatch == "exec": command = str(task.get("command") or "").strip() if not command: raise ValueError("MACP exec dispatch requires command") return command if dispatch == "acp": raise RuntimeError("ACP dispatch requires OpenClaw integration (Phase 2)") if dispatch == "yolo": brief_file = pathlib.Path(str(task.get("_brief_temp_path") or "")).resolve() if not str(task.get("_brief_temp_path") or "").strip(): raise ValueError("MACP yolo dispatch requires a staged brief file") inner = ( 'export PATH="$HOME/.config/mosaic/bin:$PATH"; ' f"cd {shlex.quote(str(worktree))}; " f'mosaic yolo {shlex.quote(runtime)} "$(cat {shlex.quote(str(brief_file))})"' ) return f"script -qec {shlex.quote(inner)} /dev/null" raise ValueError(f"Unsupported MACP dispatch type: {dispatch}") def collect_result(task: dict[str, Any], exit_code: int, gate_results: list[dict[str, Any]], orch_dir: pathlib.Path) -> dict[str, Any]: """Build standardized result JSON after worker completion.""" raw_status = str(task.get("status") or "") if raw_status in {"completed", "failed", "escalated"}: status = raw_status else: status = "completed" if exit_code == 0 else "failed" completed_at = str(task.get("completed_at") or task.get("failed_at") or now_iso()) failed_at = task.get("failed_at") if status == "failed" and not failed_at: failed_at = now_iso() normalized_gates: list[dict[str, Any]] = [] for gate in gate_results: normalized_gates.append( { "command": str(gate.get("command") or ""), "exit_code": int(gate.get("exit_code") or 0), "type": str(gate.get("type") or "mechanical"), } ) summary_map = { "completed": "Task completed and quality gates passed.", "failed": "Task failed before completion.", "escalated": "Task requires human intervention.", } result = { "task_id": str(task.get("id") or ""), "status": status, "completed_at": completed_at, "failed_at": failed_at, "exit_code": exit_code, "attempt": int(task.get("attempts") or 0), "max_attempts": int(task.get("max_attempts") or 1), "runtime": _resolve_runtime(task), "dispatch": _resolve_dispatch(task), "worktree": str(task.get("worktree")) if task.get("worktree") else None, "branch": str(task.get("branch")) if task.get("branch") else None, "pr": str(task.get("pr")) if task.get("pr") else None, "summary": str(task.get("summary") or summary_map.get(status, "Task processed.")), "files_changed": _changed_files(task), "gate_results": normalized_gates, "error": task.get("error"), "escalation_reason": task.get("escalation_reason"), "metadata": dict(task.get("metadata") or {}), } save_json(_resolve_result_path(task, orch_dir), result) return result def cleanup_worktree(task: dict[str, Any], config: dict[str, Any]) -> None: """Remove git worktree after task is done.""" worktree_raw = str(task.get("worktree") or "").strip() if not worktree_raw: return worktree = pathlib.Path(os.path.expanduser(worktree_raw)).resolve() if not worktree.exists(): return repo_root = _resolve_repo_root_from_worktree(worktree) if repo_root is None or repo_root == worktree: return if not _is_safe_worktree_path(worktree, config): print(f"[macp_dispatcher] refusing to clean unsafe worktree path: {worktree}", flush=True) return subprocess.run( ["git", "-C", str(repo_root), "worktree", "remove", "--force", str(worktree)], check=True, capture_output=True, text=True, encoding="utf-8", ) subprocess.run( ["git", "-C", str(repo_root), "worktree", "prune"], check=False, capture_output=True, text=True, encoding="utf-8", ) def dispatch_task(task: dict[str, Any], repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dict[str, Any]) -> tuple[int, str]: """Full dispatch lifecycle: setup -> execute. Returns (exit_code, output).""" macp_config = dict(config.get("macp") or {}) worker_config = dict(config.get("worker") or {}) task["dispatch"] = _resolve_dispatch({"dispatch": task.get("dispatch") or macp_config.get("default_dispatch") or "exec"}) task["runtime"] = _resolve_runtime({"runtime": task.get("runtime") or macp_config.get("default_runtime") or worker_config.get("runtime") or "codex"}) if not str(task.get("worktree") or "").strip(): task["worktree"] = str( _default_worktree_path( task, repo_root, str(macp_config.get("worktree_base") or "~/src/{repo}-worktrees"), ) ) if not str(task.get("result_path") or "").strip(): result_dir = str(macp_config.get("result_dir") or ".mosaic/orchestrator/results").strip() if result_dir.startswith(".mosaic/orchestrator/"): result_dir = result_dir[len(".mosaic/orchestrator/") :] task["result_path"] = f"{result_dir.rstrip('/')}/{task.get('id', 'task')}.json" if task["dispatch"] == "acp": task["status"] = "escalated" task["failed_at"] = now_iso() task["escalation_reason"] = "ACP dispatch requires OpenClaw integration (Phase 2)" task["error"] = task["escalation_reason"] task["_timed_out"] = False return 1, task["escalation_reason"] worktree = setup_worktree(task, repo_root) log_path = orch_dir / "logs" / f"{task.get('id', 'task')}.log" timeout_sec = int(task.get("timeout_seconds") or worker_config.get("timeout_seconds") or 7200) if task["dispatch"] == "yolo": _stage_yolo_brief_file(task, repo_root, orch_dir) try: command = build_dispatch_command(task, repo_root) exit_code, output, timed_out = _run_command(command, worktree, log_path, timeout_sec) task["_timed_out"] = timed_out if timed_out: task["error"] = f"Worker command timed out after {timeout_sec}s" elif exit_code != 0 and not task.get("error"): task["error"] = f"Worker command failed with exit code {exit_code}" return exit_code, output finally: brief_temp_path = str(task.pop("_brief_temp_path", "") or "").strip() if brief_temp_path: try: pathlib.Path(brief_temp_path).unlink(missing_ok=True) except OSError: pass