feat: MACP Phase 1 — Core Protocol Implementation (#9)

This commit was merged in pull request #9.
This commit is contained in:
2026-03-28 01:39:26 +00:00
parent 24496cea01
commit 28392914a7
19 changed files with 1724 additions and 53 deletions

View File

@@ -13,6 +13,7 @@ mechanical quality gates.
## Components
- `protocol/` - JSON schemas for task/event payloads
- `dispatcher/` - MACP dispatch helpers for worktrees, command generation, results, and cleanup
- `controller/mosaic_orchestrator.py` - deterministic controller loop
- `adapters/` - runtime adapter guidance
@@ -83,3 +84,29 @@ Task injection message format (room text):
```text
!mosaic-task {"id":"TASK-123","title":"Fix bug","command":"echo run","quality_gates":["pnpm lint"]}
```
## MACP Notes
MACP-aware tasks add dispatch metadata on top of the existing queue model:
- `dispatch`: `exec`, `yolo`, or `acp`
- `type`: task category used for orchestration intent
- `worktree` / `branch`: task-specific git execution context
- `brief_path`: markdown brief consumed by runtime-backed dispatchers
- `result_path`: structured result JSON written under `.mosaic/orchestrator/`
Controller behavior remains backward compatible:
- Tasks without `dispatch` continue through the legacy shell execution path.
- Tasks with `dispatch` use the MACP dispatcher and can emit `task.gated` and `task.escalated`.
- `acp` dispatch is fail-safe in Phase 1: it escalates with `ACP dispatch requires OpenClaw integration (Phase 2)` instead of reporting success.
- `yolo` dispatch stages the brief in a temporary file so the brief body does not appear in process arguments.
Manual queue operations are exposed through:
```bash
mosaic macp submit ...
mosaic macp status
mosaic macp drain
mosaic macp history --task-id TASK-001
```

View File

@@ -14,6 +14,12 @@ import time
import uuid
from typing import Any
DISPATCHER_DIR = pathlib.Path(__file__).resolve().parent.parent / "dispatcher"
if str(DISPATCHER_DIR) not in sys.path:
sys.path.insert(0, str(DISPATCHER_DIR))
import macp_dispatcher
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).isoformat()
@@ -115,6 +121,29 @@ def is_completed_status(status: str) -> bool:
return status in {"completed", "done"}
def is_macp_task(task: dict[str, Any]) -> bool:
return "dispatch" in task
def normalize_gate_result(gate: Any) -> dict[str, Any]:
if isinstance(gate, str):
return {"command": gate, "type": "mechanical"}
if isinstance(gate, dict):
return {
"command": str(gate.get("command") or ""),
"type": str(gate.get("type") or "mechanical"),
}
return {"command": "", "type": "mechanical"}
def detect_worker_escalation(output: str) -> str | None:
marker = "MACP_ESCALATE:"
for line in output.splitlines():
if marker in line:
return line.split(marker, 1)[1].strip() or "Worker requested escalation"
return None
def pick_next_task(tasks: list[dict[str, Any]]) -> dict[str, Any] | None:
status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in tasks}
for task in tasks:
@@ -164,27 +193,58 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
task_file = orch_dir / f"task-{task_id}.json"
save_json(task_file, task)
cmd = str(task.get("command", "")).strip()
if not cmd:
template = str(config.get("worker", {}).get("command_template", "")).strip()
if template:
cmd = render_command_template(template, task, task_file)
if not cmd:
task["status"] = "failed"
task["failed_at"] = now_iso()
task["error"] = "No task command or worker command_template configured."
save_json(tasks_path, {"tasks": task_items})
emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"])
state["running_task_id"] = None
state["updated_at"] = now_iso()
save_json(state_path, state)
return True
timeout_sec = int(task.get("timeout_seconds") or config.get("worker", {}).get("timeout_seconds") or 7200)
rc, _, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec)
timed_out = False
output = ""
rc = 0
if is_macp_task(task):
try:
rc, output = macp_dispatcher.dispatch_task(task, repo_root, orch_dir, config)
timed_out = bool(task.get("_timed_out"))
except Exception as exc:
rc = 1
task["error"] = str(exc)
else:
cmd = str(task.get("command", "")).strip()
if not cmd:
template = str(config.get("worker", {}).get("command_template", "")).strip()
if template:
cmd = render_command_template(template, task, task_file)
if not cmd:
task["status"] = "failed"
task["failed_at"] = now_iso()
task["error"] = "No task command or worker command_template configured."
save_json(tasks_path, {"tasks": task_items})
emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"])
state["running_task_id"] = None
state["updated_at"] = now_iso()
save_json(state_path, state)
return True
rc, output, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec)
if rc != 0:
task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}"
if is_macp_task(task) and str(task.get("status") or "") == "escalated":
task["failed_at"] = str(task.get("failed_at") or now_iso())
emit_event(
events_path,
"task.escalated",
task_id,
"escalated",
"controller",
str(task.get("escalation_reason") or task.get("error") or "Task requires human intervention."),
)
save_json(tasks_path, {"tasks": task_items})
state["running_task_id"] = None
state["updated_at"] = now_iso()
save_json(state_path, state)
macp_dispatcher.collect_result(task, rc, [], orch_dir)
if bool(config.get("macp", {}).get("cleanup_worktrees", True)):
macp_dispatcher.cleanup_worktree(task, config)
return True
if not task.get("error"):
task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}"
if attempt < max_attempts:
task["status"] = "pending"
task["last_failed_at"] = now_iso()
@@ -204,34 +264,61 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
state["running_task_id"] = None
state["updated_at"] = now_iso()
save_json(state_path, state)
save_json(
results_dir / f"{task_id}.json",
{"task_id": task_id, "status": task["status"], "exit_code": rc, "attempt": attempt, "max_attempts": max_attempts},
)
if is_macp_task(task):
if task["status"] == "failed":
macp_dispatcher.collect_result(task, rc, [], orch_dir)
if bool(config.get("macp", {}).get("cleanup_worktrees", True)):
macp_dispatcher.cleanup_worktree(task, config)
else:
save_json(
results_dir / f"{task_id}.json",
{"task_id": task_id, "status": task["status"], "exit_code": rc, "attempt": attempt, "max_attempts": max_attempts},
)
return True
escalation_reason = detect_worker_escalation(output) if is_macp_task(task) else None
if escalation_reason:
task["status"] = "escalated"
task["failed_at"] = now_iso()
task["escalation_reason"] = escalation_reason
emit_event(events_path, "task.escalated", task_id, "escalated", "controller", escalation_reason)
save_json(tasks_path, {"tasks": task_items})
state["running_task_id"] = None
state["updated_at"] = now_iso()
save_json(state_path, state)
macp_dispatcher.collect_result(task, rc, [], orch_dir)
if bool(config.get("macp", {}).get("cleanup_worktrees", True)):
macp_dispatcher.cleanup_worktree(task, config)
return True
task["status"] = "gated"
save_json(tasks_path, {"tasks": task_items})
emit_event(events_path, "task.gated", task_id, "gated", "controller", "Worker completed; quality gates starting")
gates = task.get("quality_gates") or config.get("quality_gates") or []
all_passed = True
gate_results: list[dict[str, Any]] = []
for gate in gates:
gate_cmd = str(gate).strip()
gate_entry = normalize_gate_result(gate)
gate_cmd = gate_entry["command"]
if not gate_cmd:
continue
emit_event(events_path, "rail.check.started", task_id, "running", "quality-gate", f"Running gate: {gate_cmd}")
gate_rc, _, gate_timed_out = run_shell(gate_cmd, repo_root, log_path, timeout_sec)
gate_cwd = pathlib.Path(os.path.expanduser(str(task.get("worktree") or repo_root))).resolve() if is_macp_task(task) else repo_root
emit_event(events_path, "rail.check.started", task_id, "gated", "quality-gate", f"Running gate: {gate_cmd}")
gate_rc, _, gate_timed_out = run_shell(gate_cmd, gate_cwd, log_path, timeout_sec)
if gate_rc == 0:
emit_event(events_path, "rail.check.passed", task_id, "running", "quality-gate", f"Gate passed: {gate_cmd}")
emit_event(events_path, "rail.check.passed", task_id, "gated", "quality-gate", f"Gate passed: {gate_cmd}")
else:
all_passed = False
emit_event(
events_path,
"rail.check.failed",
task_id,
"failed",
"gated",
"quality-gate",
f"Gate timed out after {timeout_sec}s: {gate_cmd}" if gate_timed_out else f"Gate failed ({gate_rc}): {gate_cmd}",
)
gate_results.append({"command": gate_cmd, "exit_code": gate_rc})
gate_results.append({"command": gate_cmd, "exit_code": gate_rc, "type": gate_entry["type"]})
if all_passed:
task["status"] = "completed"
@@ -251,24 +338,34 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
f"{task['error']}; retry {attempt + 1}/{max_attempts}",
)
else:
task["status"] = "failed"
task["status"] = "escalated" if is_macp_task(task) else "failed"
task["failed_at"] = now_iso()
emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"])
if is_macp_task(task):
task["escalation_reason"] = "Quality gates failed after max retries"
emit_event(events_path, "task.escalated", task_id, "escalated", "controller", task["escalation_reason"])
else:
emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"])
save_json(tasks_path, {"tasks": task_items})
state["running_task_id"] = None
state["updated_at"] = now_iso()
save_json(state_path, state)
save_json(
results_dir / f"{task_id}.json",
{
"task_id": task_id,
"status": task["status"],
"completed_at": task.get("completed_at"),
"failed_at": task.get("failed_at"),
"gate_results": gate_results,
},
)
if is_macp_task(task):
if task["status"] in {"completed", "failed", "escalated"}:
macp_dispatcher.collect_result(task, rc, gate_results, orch_dir)
if bool(config.get("macp", {}).get("cleanup_worktrees", True)):
macp_dispatcher.cleanup_worktree(task, config)
else:
save_json(
results_dir / f"{task_id}.json",
{
"task_id": task_id,
"status": task["status"],
"completed_at": task.get("completed_at"),
"failed_at": task.get("failed_at"),
"gate_results": gate_results,
},
)
return True
@@ -276,10 +373,14 @@ def queue_state(orch_dir: pathlib.Path) -> dict[str, int]:
tasks = load_json(orch_dir / "tasks.json", {"tasks": []})
task_items = tasks.get("tasks", [])
if not isinstance(task_items, list):
return {"pending": 0, "running": 0, "runnable": 0}
return {"pending": 0, "running": 0, "gated": 0, "completed": 0, "failed": 0, "escalated": 0, "runnable": 0}
pending = 0
running = 0
gated = 0
completed = 0
failed = 0
escalated = 0
runnable = 0
status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in task_items}
for task in task_items:
@@ -291,7 +392,23 @@ def queue_state(orch_dir: pathlib.Path) -> dict[str, int]:
runnable += 1
if status == "running":
running += 1
return {"pending": pending, "running": running, "runnable": runnable}
if status == "gated":
gated += 1
if status == "completed":
completed += 1
if status == "failed":
failed += 1
if status == "escalated":
escalated += 1
return {
"pending": pending,
"running": running,
"gated": gated,
"completed": completed,
"failed": failed,
"escalated": escalated,
"runnable": runnable,
}
def main() -> int:

View File

@@ -35,9 +35,9 @@ def split_pipe_row(line: str) -> list[str]:
return [c.strip() for c in row.split("|")]
def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]:
def parse_tasks_markdown(path: pathlib.Path) -> tuple[set[str], list[dict[str, str]]]:
if not path.exists():
return []
return set(), []
lines = path.read_text(encoding="utf-8").splitlines()
header_idx = -1
@@ -51,7 +51,7 @@ def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]:
headers = cells
break
if header_idx < 0:
return []
return set(), []
rows: list[dict[str, str]] = []
for line in lines[header_idx + 2 :]:
@@ -67,7 +67,7 @@ def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]:
if not task_id or task_id.lower() == "id":
continue
rows.append(row)
return rows
return set(headers), rows
def map_status(raw: str) -> str:
@@ -78,9 +78,11 @@ def map_status(raw: str) -> str:
"pending": "pending",
"in-progress": "pending",
"needs-qa": "pending",
"gated": "gated",
"done": "completed",
"completed": "completed",
"failed": "failed",
"escalated": "escalated",
}
return mapping.get(value, "pending")
@@ -91,7 +93,9 @@ def parse_depends(raw: str) -> list[str]:
def build_task(
row: dict[str, str],
headers: set[str],
existing: dict[str, Any],
macp_defaults: dict[str, str],
runtime_default: str,
source_path: str,
) -> dict[str, Any]:
@@ -100,6 +104,9 @@ def build_task(
issue = row.get("issue", "").strip()
repo = row.get("repo", "").strip()
branch = row.get("branch", "").strip()
task_type = row.get("type", "").strip()
dispatch = row.get("dispatch", "").strip()
runtime = row.get("runtime", "").strip()
depends_on = parse_depends(row.get("depends_on", ""))
task = dict(existing)
@@ -108,9 +115,25 @@ def build_task(
task["description"] = description
task["status"] = map_status(row.get("status", "pending"))
task["depends_on"] = depends_on
task["runtime"] = str(task.get("runtime") or runtime_default or "codex")
task["issue"] = issue or str(task.get("issue") or "")
task["command"] = str(task.get("command") or "")
task["quality_gates"] = task.get("quality_gates") or []
if "type" in headers:
task["type"] = task_type or str(task.get("type") or macp_defaults.get("type") or "coding")
else:
task.pop("type", None)
if "dispatch" in headers:
task["dispatch"] = dispatch or str(task.get("dispatch") or macp_defaults.get("dispatch") or "")
else:
task.pop("dispatch", None)
if "runtime" in headers:
task["runtime"] = runtime or str(task.get("runtime") or macp_defaults.get("runtime") or runtime_default or "codex")
else:
task.pop("runtime", None)
if "branch" in headers:
task["branch"] = branch or str(task.get("branch") or macp_defaults.get("branch") or "")
else:
task.pop("branch", None)
metadata = dict(task.get("metadata") or {})
metadata.update(
{
@@ -147,9 +170,16 @@ def main() -> int:
tasks_path = (repo / args.tasks_json).resolve()
config_path = repo / ".mosaic" / "orchestrator" / "config.json"
config = load_json(config_path, {})
runtime_default = str(config.get("worker", {}).get("runtime") or "codex")
macp_config = dict(config.get("macp") or {})
runtime_default = str(config.get("worker", {}).get("runtime") or macp_config.get("default_runtime") or "codex")
macp_defaults = {
"type": "coding",
"dispatch": str(macp_config.get("default_dispatch") or ""),
"runtime": str(macp_config.get("default_runtime") or runtime_default or "codex"),
"branch": "",
}
rows = parse_tasks_markdown(docs_path)
headers, rows = parse_tasks_markdown(docs_path)
try:
source_path = str(docs_path.relative_to(repo))
except ValueError:
@@ -170,7 +200,9 @@ def main() -> int:
out_tasks.append(
build_task(
row,
headers,
existing_by_id.get(task_id, {}),
macp_defaults,
runtime_default,
source_path,
)

View File

@@ -0,0 +1,15 @@
"""MACP dispatcher helpers for orchestrator-matrix."""
from .macp_dispatcher import build_dispatch_command
from .macp_dispatcher import cleanup_worktree
from .macp_dispatcher import collect_result
from .macp_dispatcher import dispatch_task
from .macp_dispatcher import setup_worktree
__all__ = [
"build_dispatch_command",
"cleanup_worktree",
"collect_result",
"dispatch_task",
"setup_worktree",
]

View File

@@ -0,0 +1,401 @@
#!/usr/bin/env python3
"""MACP dispatcher library for orchestrator-matrix tasks."""
from __future__ import annotations
import datetime as dt
import json
import os
import pathlib
import re
import shlex
import subprocess
import tempfile
from typing import Any
def now_iso() -> str:
return dt.datetime.now(dt.timezone.utc).isoformat()
def save_json(path: pathlib.Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as handle:
json.dump(data, handle, indent=2)
handle.write("\n")
tmp.replace(path)
def _slugify(value: str) -> str:
lowered = value.strip().lower()
slug = re.sub(r"[^a-z0-9]+", "-", lowered)
return slug.strip("-") or "task"
def _run_command(command: str, cwd: pathlib.Path, log_path: pathlib.Path, timeout_sec: int) -> tuple[int, str, bool]:
log_path.parent.mkdir(parents=True, exist_ok=True)
with log_path.open("a", encoding="utf-8") as log:
log.write(f"\n[{now_iso()}] COMMAND: {command}\n")
log.flush()
proc = subprocess.Popen(
["bash", "-lc", command],
cwd=str(cwd),
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
encoding="utf-8",
)
timed_out = False
try:
output, _ = proc.communicate(timeout=max(1, timeout_sec))
exit_code = proc.returncode
except subprocess.TimeoutExpired:
timed_out = True
proc.kill()
output, _ = proc.communicate()
exit_code = 124
log.write(f"[{now_iso()}] TIMEOUT: exceeded {timeout_sec}s\n")
if output:
log.write(output)
log.write(f"[{now_iso()}] EXIT: {exit_code}\n")
return exit_code, output or "", timed_out
def _git_capture(args: list[str], cwd: pathlib.Path) -> str:
proc = subprocess.run(args, cwd=str(cwd), check=True, capture_output=True, text=True, encoding="utf-8")
return proc.stdout.strip()
def _resolve_dispatch(task: dict[str, Any]) -> str:
return str(task.get("dispatch") or "exec").strip().lower() or "exec"
def _resolve_runtime(task: dict[str, Any]) -> str:
return str(task.get("runtime") or "codex").strip() or "codex"
def _resolve_branch(task: dict[str, Any]) -> str:
branch = str(task.get("branch") or "").strip()
if branch:
return branch
task_id_slug = _slugify(str(task.get("id") or "task"))
title_slug = _slugify(str(task.get("title") or task.get("id") or "task"))
if title_slug == task_id_slug:
branch = f"feat/{task_id_slug}"
else:
branch = f"feat/{task_id_slug}-{title_slug}"
task["branch"] = branch
return branch
def _resolve_worktree_path(task: dict[str, Any], repo_root: pathlib.Path) -> pathlib.Path:
explicit = str(task.get("worktree") or "").strip()
if explicit:
path = pathlib.Path(os.path.expanduser(explicit)).resolve()
else:
base = pathlib.Path(os.path.expanduser(f"~/src/{repo_root.name}-worktrees")).resolve()
path = base / _slugify(str(task.get("id") or task.get("title") or "task"))
task["worktree"] = str(path)
return path
def _default_worktree_path(task: dict[str, Any], repo_root: pathlib.Path, base_template: str) -> pathlib.Path:
base = pathlib.Path(os.path.expanduser(base_template.format(repo=repo_root.name))).resolve()
return base / _slugify(str(task.get("id") or task.get("title") or "task"))
def _resolve_start_point(repo_root: pathlib.Path) -> str:
candidates = ["origin/main", "main", "HEAD"]
for candidate in candidates:
proc = subprocess.run(
["git", "-C", str(repo_root), "rev-parse", "--verify", f"{candidate}^{{commit}}"],
capture_output=True,
text=True,
encoding="utf-8",
)
if proc.returncode == 0:
return candidate
raise RuntimeError(f"Unable to resolve worktree start point for {repo_root}")
def _read_brief(task: dict[str, Any], repo_root: pathlib.Path) -> str:
brief_path_raw = str(task.get("brief_path") or "").strip()
if not brief_path_raw:
raise ValueError("MACP yolo dispatch requires brief_path")
brief_path = (repo_root / brief_path_raw).resolve()
return brief_path.read_text(encoding="utf-8").strip()
def _stage_yolo_brief_file(task: dict[str, Any], repo_root: pathlib.Path, orch_dir: pathlib.Path) -> pathlib.Path:
brief_dir = (orch_dir / "tmp").resolve()
brief_dir.mkdir(parents=True, exist_ok=True)
task_id = _slugify(str(task.get("id") or "task"))
fd, raw_path = tempfile.mkstemp(prefix=f"brief-{task_id}-", suffix=".tmp", dir=str(brief_dir), text=True)
with os.fdopen(fd, "w", encoding="utf-8") as handle:
handle.write(_read_brief(task, repo_root))
handle.write("\n")
os.chmod(raw_path, 0o600)
path = pathlib.Path(raw_path).resolve()
task["_brief_temp_path"] = str(path)
return path
def _resolve_result_path(task: dict[str, Any], orch_dir: pathlib.Path) -> pathlib.Path:
result_path_raw = str(task.get("result_path") or "").strip()
if result_path_raw:
result_path = (orch_dir / result_path_raw).resolve()
else:
result_path = (orch_dir / "results" / f"{task.get('id', 'task')}.json").resolve()
task["result_path"] = str(result_path.relative_to(orch_dir))
return result_path
def _resolve_worktree_base(config: dict[str, Any], repo_name: str) -> pathlib.Path:
macp_config = dict(config.get("macp") or {})
base_template = str(macp_config.get("worktree_base") or "~/src/{repo}-worktrees")
return pathlib.Path(os.path.expanduser(base_template.format(repo=repo_name))).resolve()
def _changed_files(task: dict[str, Any]) -> list[str]:
worktree_raw = str(task.get("worktree") or "").strip()
if not worktree_raw:
return []
worktree = pathlib.Path(os.path.expanduser(worktree_raw)).resolve()
if not worktree.exists():
return []
proc = subprocess.run(
["git", "-C", str(worktree), "status", "--porcelain"],
capture_output=True,
text=True,
encoding="utf-8",
)
if proc.returncode != 0:
return []
changed: list[str] = []
for line in proc.stdout.splitlines():
path_text = line[3:].strip()
if not path_text:
continue
if " -> " in path_text:
_, path_text = path_text.split(" -> ", 1)
changed.append(path_text)
return changed
def _resolve_repo_root_from_worktree(worktree: pathlib.Path) -> pathlib.Path | None:
try:
common_dir_raw = _git_capture(["git", "-C", str(worktree), "rev-parse", "--git-common-dir"], worktree)
except Exception:
return None
common_dir = pathlib.Path(common_dir_raw)
if not common_dir.is_absolute():
common_dir = (worktree / common_dir).resolve()
return common_dir.parent if common_dir.name == ".git" else common_dir
def _is_safe_worktree_path(worktree_path: pathlib.Path, config: dict[str, Any]) -> bool:
repo_root = _resolve_repo_root_from_worktree(worktree_path)
if repo_root is None:
return False
expected_base = _resolve_worktree_base(config, repo_root.name)
try:
worktree_path.resolve().relative_to(expected_base)
return True
except ValueError:
return False
def setup_worktree(task: dict[str, Any], repo_root: pathlib.Path) -> pathlib.Path:
"""Create git worktree for task. Returns worktree path."""
worktree_path = _resolve_worktree_path(task, repo_root)
branch = _resolve_branch(task)
if worktree_path.exists() and (worktree_path / ".git").exists():
return worktree_path
worktree_path.parent.mkdir(parents=True, exist_ok=True)
start_point = _resolve_start_point(repo_root)
subprocess.run(
["git", "-C", str(repo_root), "worktree", "add", "-B", branch, str(worktree_path), start_point],
check=True,
capture_output=True,
text=True,
encoding="utf-8",
)
task["worktree"] = str(worktree_path)
return worktree_path
def build_dispatch_command(task: dict[str, Any], repo_root: pathlib.Path) -> str:
"""Generate execution command based on task dispatch type and runtime."""
dispatch = _resolve_dispatch(task)
runtime = _resolve_runtime(task)
worktree = pathlib.Path(os.path.expanduser(str(task.get("worktree") or repo_root))).resolve()
if dispatch == "exec":
command = str(task.get("command") or "").strip()
if not command:
raise ValueError("MACP exec dispatch requires command")
return command
if dispatch == "acp":
raise RuntimeError("ACP dispatch requires OpenClaw integration (Phase 2)")
if dispatch == "yolo":
brief_file = pathlib.Path(str(task.get("_brief_temp_path") or "")).resolve()
if not str(task.get("_brief_temp_path") or "").strip():
raise ValueError("MACP yolo dispatch requires a staged brief file")
inner = (
'export PATH="$HOME/.config/mosaic/bin:$PATH"; '
f"cd {shlex.quote(str(worktree))}; "
f'mosaic yolo {shlex.quote(runtime)} "$(cat {shlex.quote(str(brief_file))})"'
)
return f"script -qec {shlex.quote(inner)} /dev/null"
raise ValueError(f"Unsupported MACP dispatch type: {dispatch}")
def collect_result(task: dict[str, Any], exit_code: int, gate_results: list[dict[str, Any]], orch_dir: pathlib.Path) -> dict[str, Any]:
"""Build standardized result JSON after worker completion."""
raw_status = str(task.get("status") or "")
if raw_status in {"completed", "failed", "escalated"}:
status = raw_status
else:
status = "completed" if exit_code == 0 else "failed"
completed_at = str(task.get("completed_at") or task.get("failed_at") or now_iso())
failed_at = task.get("failed_at")
if status == "failed" and not failed_at:
failed_at = now_iso()
normalized_gates: list[dict[str, Any]] = []
for gate in gate_results:
normalized_gates.append(
{
"command": str(gate.get("command") or ""),
"exit_code": int(gate.get("exit_code") or 0),
"type": str(gate.get("type") or "mechanical"),
}
)
summary_map = {
"completed": "Task completed and quality gates passed.",
"failed": "Task failed before completion.",
"escalated": "Task requires human intervention.",
}
result = {
"task_id": str(task.get("id") or ""),
"status": status,
"completed_at": completed_at,
"failed_at": failed_at,
"exit_code": exit_code,
"attempt": int(task.get("attempts") or 0),
"max_attempts": int(task.get("max_attempts") or 1),
"runtime": _resolve_runtime(task),
"dispatch": _resolve_dispatch(task),
"worktree": str(task.get("worktree")) if task.get("worktree") else None,
"branch": str(task.get("branch")) if task.get("branch") else None,
"pr": str(task.get("pr")) if task.get("pr") else None,
"summary": str(task.get("summary") or summary_map.get(status, "Task processed.")),
"files_changed": _changed_files(task),
"gate_results": normalized_gates,
"error": task.get("error"),
"escalation_reason": task.get("escalation_reason"),
"metadata": dict(task.get("metadata") or {}),
}
save_json(_resolve_result_path(task, orch_dir), result)
return result
def cleanup_worktree(task: dict[str, Any], config: dict[str, Any]) -> None:
"""Remove git worktree after task is done."""
worktree_raw = str(task.get("worktree") or "").strip()
if not worktree_raw:
return
worktree = pathlib.Path(os.path.expanduser(worktree_raw)).resolve()
if not worktree.exists():
return
repo_root = _resolve_repo_root_from_worktree(worktree)
if repo_root is None or repo_root == worktree:
return
if not _is_safe_worktree_path(worktree, config):
print(f"[macp_dispatcher] refusing to clean unsafe worktree path: {worktree}", flush=True)
return
subprocess.run(
["git", "-C", str(repo_root), "worktree", "remove", "--force", str(worktree)],
check=True,
capture_output=True,
text=True,
encoding="utf-8",
)
subprocess.run(
["git", "-C", str(repo_root), "worktree", "prune"],
check=False,
capture_output=True,
text=True,
encoding="utf-8",
)
def dispatch_task(task: dict[str, Any], repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dict[str, Any]) -> tuple[int, str]:
"""Full dispatch lifecycle: setup -> execute. Returns (exit_code, output)."""
macp_config = dict(config.get("macp") or {})
worker_config = dict(config.get("worker") or {})
task["dispatch"] = _resolve_dispatch({"dispatch": task.get("dispatch") or macp_config.get("default_dispatch") or "exec"})
task["runtime"] = _resolve_runtime({"runtime": task.get("runtime") or macp_config.get("default_runtime") or worker_config.get("runtime") or "codex"})
if not str(task.get("worktree") or "").strip():
task["worktree"] = str(
_default_worktree_path(
task,
repo_root,
str(macp_config.get("worktree_base") or "~/src/{repo}-worktrees"),
)
)
if not str(task.get("result_path") or "").strip():
result_dir = str(macp_config.get("result_dir") or ".mosaic/orchestrator/results").strip()
if result_dir.startswith(".mosaic/orchestrator/"):
result_dir = result_dir[len(".mosaic/orchestrator/") :]
task["result_path"] = f"{result_dir.rstrip('/')}/{task.get('id', 'task')}.json"
if task["dispatch"] == "acp":
task["status"] = "escalated"
task["failed_at"] = now_iso()
task["escalation_reason"] = "ACP dispatch requires OpenClaw integration (Phase 2)"
task["error"] = task["escalation_reason"]
task["_timed_out"] = False
return 1, task["escalation_reason"]
worktree = setup_worktree(task, repo_root)
log_path = orch_dir / "logs" / f"{task.get('id', 'task')}.log"
timeout_sec = int(task.get("timeout_seconds") or worker_config.get("timeout_seconds") or 7200)
if task["dispatch"] == "yolo":
_stage_yolo_brief_file(task, repo_root, orch_dir)
try:
command = build_dispatch_command(task, repo_root)
exit_code, output, timed_out = _run_command(command, worktree, log_path, timeout_sec)
task["_timed_out"] = timed_out
if timed_out:
task["error"] = f"Worker command timed out after {timeout_sec}s"
elif exit_code != 0 and not task.get("error"):
task["error"] = f"Worker command failed with exit code {exit_code}"
return exit_code, output
finally:
brief_temp_path = str(task.pop("_brief_temp_path", "") or "").strip()
if brief_temp_path:
try:
pathlib.Path(brief_temp_path).unlink(missing_ok=True)
except OSError:
pass

View File

@@ -22,8 +22,11 @@
"task.assigned",
"task.started",
"task.progress",
"task.gated",
"task.completed",
"task.failed",
"task.escalated",
"task.retry.scheduled",
"rail.check.started",
"rail.check.passed",
"rail.check.failed"
@@ -37,8 +40,10 @@
"enum": [
"pending",
"running",
"gated",
"completed",
"failed"
"failed",
"escalated"
]
},
"timestamp": {
@@ -50,7 +55,8 @@
"enum": [
"controller",
"worker",
"quality-gate"
"quality-gate",
"dispatcher"
]
},
"message": {

View File

@@ -0,0 +1,119 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$id": "https://mosaicstack.dev/schemas/orchestrator/result.schema.json",
"title": "MACP Task Result",
"type": "object",
"required": [
"task_id",
"status",
"completed_at"
],
"properties": {
"task_id": {
"type": "string"
},
"status": {
"type": "string",
"enum": [
"completed",
"failed",
"escalated"
]
},
"completed_at": {
"type": "string",
"format": "date-time"
},
"failed_at": {
"type": [
"string",
"null"
],
"format": "date-time"
},
"exit_code": {
"type": [
"integer",
"null"
]
},
"attempt": {
"type": "integer"
},
"max_attempts": {
"type": "integer"
},
"runtime": {
"type": "string"
},
"dispatch": {
"type": "string"
},
"worktree": {
"type": [
"string",
"null"
]
},
"branch": {
"type": [
"string",
"null"
]
},
"pr": {
"type": [
"string",
"null"
]
},
"summary": {
"type": "string",
"description": "Human-readable summary of what the worker did"
},
"files_changed": {
"type": "array",
"items": {
"type": "string"
}
},
"gate_results": {
"type": "array",
"items": {
"type": "object",
"properties": {
"command": {
"type": "string"
},
"exit_code": {
"type": "integer"
},
"type": {
"type": "string",
"enum": [
"mechanical",
"ai-review",
"ci-pipeline"
]
}
}
}
},
"error": {
"type": [
"string",
"null"
]
},
"escalation_reason": {
"type": [
"string",
"null"
]
},
"metadata": {
"type": "object"
}
},
"additionalProperties": true
}

View File

@@ -23,14 +23,85 @@
"enum": [
"pending",
"running",
"gated",
"completed",
"failed"
"failed",
"escalated"
]
},
"type": {
"type": "string",
"enum": [
"coding",
"deploy",
"research",
"review",
"documentation",
"infrastructure"
],
"description": "Task type - determines dispatch strategy and gate requirements"
},
"dispatch": {
"type": "string",
"enum": [
"yolo",
"acp",
"exec"
],
"description": "Execution backend: yolo=mosaic yolo (full system), acp=OpenClaw sessions_spawn (sandboxed), exec=direct shell"
},
"runtime": {
"type": "string",
"description": "Preferred worker runtime, e.g. codex, claude, opencode"
},
"worktree": {
"type": "string",
"description": "Path to git worktree for this task, e.g. ~/src/repo-worktrees/task-042"
},
"branch": {
"type": "string",
"description": "Git branch name for this task"
},
"brief_path": {
"type": "string",
"description": "Path to markdown task brief relative to repo root"
},
"result_path": {
"type": "string",
"description": "Path to JSON result file relative to .mosaic/orchestrator/"
},
"issue": {
"type": "string",
"description": "Issue reference (e.g. #42)"
},
"pr": {
"type": [
"string",
"null"
],
"description": "PR number/URL once opened"
},
"depends_on": {
"type": "array",
"items": {
"type": "string"
},
"description": "List of task IDs this task depends on"
},
"max_attempts": {
"type": "integer",
"minimum": 1,
"default": 1
},
"attempts": {
"type": "integer",
"minimum": 0,
"default": 0
},
"timeout_seconds": {
"type": "integer",
"description": "Override default timeout for this task"
},
"command": {
"type": "string",
"description": "Worker command to execute for this task"