fix: address review findings — backward compat, ACP safety, result timing, security

- Fix 1: tasks_md_sync only sets MACP fields when columns exist in table headers
- Fix 2: ACP dispatch now escalates instead of falsely completing
- Fix 3: Removed premature collect_result() from dispatch_task()
- Fix 4: Yolo brief staged via temp file (0600) instead of process args
- Fix 5: cleanup_worktree validates path against configured worktree base
This commit is contained in:
Jarvis
2026-03-27 19:48:52 -05:00
parent f8d7ed1d80
commit e5eac889ec
9 changed files with 231 additions and 61 deletions

View File

@@ -99,6 +99,8 @@ Controller behavior remains backward compatible:
- Tasks without `dispatch` continue through the legacy shell execution path.
- Tasks with `dispatch` use the MACP dispatcher and can emit `task.gated` and `task.escalated`.
- `acp` dispatch is fail-safe in Phase 1: it escalates with `ACP dispatch requires OpenClaw integration (Phase 2)` instead of reporting success.
- `yolo` dispatch stages the brief in a temporary file so the brief body does not appear in process arguments.
Manual queue operations are exposed through:

View File

@@ -225,6 +225,24 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
rc, output, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec)
if rc != 0:
if is_macp_task(task) and str(task.get("status") or "") == "escalated":
task["failed_at"] = str(task.get("failed_at") or now_iso())
emit_event(
events_path,
"task.escalated",
task_id,
"escalated",
"controller",
str(task.get("escalation_reason") or task.get("error") or "Task requires human intervention."),
)
save_json(tasks_path, {"tasks": task_items})
state["running_task_id"] = None
state["updated_at"] = now_iso()
save_json(state_path, state)
macp_dispatcher.collect_result(task, rc, [], orch_dir)
if bool(config.get("macp", {}).get("cleanup_worktrees", True)):
macp_dispatcher.cleanup_worktree(task, config)
return True
if not task.get("error"):
task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}"
if attempt < max_attempts:
@@ -247,9 +265,10 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
state["updated_at"] = now_iso()
save_json(state_path, state)
if is_macp_task(task):
macp_dispatcher.collect_result(task, rc, [], orch_dir)
if task["status"] == "failed" and bool(config.get("macp", {}).get("cleanup_worktrees", True)):
macp_dispatcher.cleanup_worktree(task)
if task["status"] == "failed":
macp_dispatcher.collect_result(task, rc, [], orch_dir)
if bool(config.get("macp", {}).get("cleanup_worktrees", True)):
macp_dispatcher.cleanup_worktree(task, config)
else:
save_json(
results_dir / f"{task_id}.json",
@@ -269,7 +288,7 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
save_json(state_path, state)
macp_dispatcher.collect_result(task, rc, [], orch_dir)
if bool(config.get("macp", {}).get("cleanup_worktrees", True)):
macp_dispatcher.cleanup_worktree(task)
macp_dispatcher.cleanup_worktree(task, config)
return True
task["status"] = "gated"
@@ -332,9 +351,10 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic
state["updated_at"] = now_iso()
save_json(state_path, state)
if is_macp_task(task):
macp_dispatcher.collect_result(task, rc, gate_results, orch_dir)
if task["status"] in {"completed", "escalated"} and bool(config.get("macp", {}).get("cleanup_worktrees", True)):
macp_dispatcher.cleanup_worktree(task)
if task["status"] in {"completed", "failed", "escalated"}:
macp_dispatcher.collect_result(task, rc, gate_results, orch_dir)
if bool(config.get("macp", {}).get("cleanup_worktrees", True)):
macp_dispatcher.cleanup_worktree(task, config)
else:
save_json(
results_dir / f"{task_id}.json",

View File

@@ -35,9 +35,9 @@ def split_pipe_row(line: str) -> list[str]:
return [c.strip() for c in row.split("|")]
def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]:
def parse_tasks_markdown(path: pathlib.Path) -> tuple[set[str], list[dict[str, str]]]:
if not path.exists():
return []
return set(), []
lines = path.read_text(encoding="utf-8").splitlines()
header_idx = -1
@@ -51,7 +51,7 @@ def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]:
headers = cells
break
if header_idx < 0:
return []
return set(), []
rows: list[dict[str, str]] = []
for line in lines[header_idx + 2 :]:
@@ -67,7 +67,7 @@ def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]:
if not task_id or task_id.lower() == "id":
continue
rows.append(row)
return rows
return set(headers), rows
def map_status(raw: str) -> str:
@@ -93,6 +93,7 @@ def parse_depends(raw: str) -> list[str]:
def build_task(
row: dict[str, str],
headers: set[str],
existing: dict[str, Any],
macp_defaults: dict[str, str],
runtime_default: str,
@@ -114,13 +115,25 @@ def build_task(
task["description"] = description
task["status"] = map_status(row.get("status", "pending"))
task["depends_on"] = depends_on
task["type"] = task_type or str(task.get("type") or macp_defaults.get("type") or "coding")
task["dispatch"] = dispatch or str(task.get("dispatch") or macp_defaults.get("dispatch") or "")
task["runtime"] = runtime or str(task.get("runtime") or macp_defaults.get("runtime") or runtime_default or "codex")
task["branch"] = branch or str(task.get("branch") or macp_defaults.get("branch") or "")
task["issue"] = issue or str(task.get("issue") or "")
task["command"] = str(task.get("command") or "")
task["quality_gates"] = task.get("quality_gates") or []
if "type" in headers:
task["type"] = task_type or str(task.get("type") or macp_defaults.get("type") or "coding")
else:
task.pop("type", None)
if "dispatch" in headers:
task["dispatch"] = dispatch or str(task.get("dispatch") or macp_defaults.get("dispatch") or "")
else:
task.pop("dispatch", None)
if "runtime" in headers:
task["runtime"] = runtime or str(task.get("runtime") or macp_defaults.get("runtime") or runtime_default or "codex")
else:
task.pop("runtime", None)
if "branch" in headers:
task["branch"] = branch or str(task.get("branch") or macp_defaults.get("branch") or "")
else:
task.pop("branch", None)
metadata = dict(task.get("metadata") or {})
metadata.update(
{
@@ -166,7 +179,7 @@ def main() -> int:
"branch": "",
}
rows = parse_tasks_markdown(docs_path)
headers, rows = parse_tasks_markdown(docs_path)
try:
source_path = str(docs_path.relative_to(repo))
except ValueError:
@@ -187,6 +200,7 @@ def main() -> int:
out_tasks.append(
build_task(
row,
headers,
existing_by_id.get(task_id, {}),
macp_defaults,
runtime_default,

View File

@@ -10,6 +10,7 @@ import pathlib
import re
import shlex
import subprocess
import tempfile
from typing import Any
@@ -128,6 +129,20 @@ def _read_brief(task: dict[str, Any], repo_root: pathlib.Path) -> str:
return brief_path.read_text(encoding="utf-8").strip()
def _stage_yolo_brief_file(task: dict[str, Any], repo_root: pathlib.Path, orch_dir: pathlib.Path) -> pathlib.Path:
brief_dir = (orch_dir / "tmp").resolve()
brief_dir.mkdir(parents=True, exist_ok=True)
task_id = _slugify(str(task.get("id") or "task"))
fd, raw_path = tempfile.mkstemp(prefix=f"brief-{task_id}-", suffix=".tmp", dir=str(brief_dir), text=True)
with os.fdopen(fd, "w", encoding="utf-8") as handle:
handle.write(_read_brief(task, repo_root))
handle.write("\n")
os.chmod(raw_path, 0o600)
path = pathlib.Path(raw_path).resolve()
task["_brief_temp_path"] = str(path)
return path
def _resolve_result_path(task: dict[str, Any], orch_dir: pathlib.Path) -> pathlib.Path:
result_path_raw = str(task.get("result_path") or "").strip()
if result_path_raw:
@@ -138,6 +153,12 @@ def _resolve_result_path(task: dict[str, Any], orch_dir: pathlib.Path) -> pathli
return result_path
def _resolve_worktree_base(config: dict[str, Any], repo_name: str) -> pathlib.Path:
macp_config = dict(config.get("macp") or {})
base_template = str(macp_config.get("worktree_base") or "~/src/{repo}-worktrees")
return pathlib.Path(os.path.expanduser(base_template.format(repo=repo_name))).resolve()
def _changed_files(task: dict[str, Any]) -> list[str]:
worktree_raw = str(task.get("worktree") or "").strip()
if not worktree_raw:
@@ -167,6 +188,29 @@ def _changed_files(task: dict[str, Any]) -> list[str]:
return changed
def _resolve_repo_root_from_worktree(worktree: pathlib.Path) -> pathlib.Path | None:
try:
common_dir_raw = _git_capture(["git", "-C", str(worktree), "rev-parse", "--git-common-dir"], worktree)
except Exception:
return None
common_dir = pathlib.Path(common_dir_raw)
if not common_dir.is_absolute():
common_dir = (worktree / common_dir).resolve()
return common_dir.parent if common_dir.name == ".git" else common_dir
def _is_safe_worktree_path(worktree_path: pathlib.Path, config: dict[str, Any]) -> bool:
repo_root = _resolve_repo_root_from_worktree(worktree_path)
if repo_root is None:
return False
expected_base = _resolve_worktree_base(config, repo_root.name)
try:
worktree_path.resolve().relative_to(expected_base)
return True
except ValueError:
return False
def setup_worktree(task: dict[str, Any], repo_root: pathlib.Path) -> pathlib.Path:
"""Create git worktree for task. Returns worktree path."""
@@ -202,26 +246,16 @@ def build_dispatch_command(task: dict[str, Any], repo_root: pathlib.Path) -> str
return command
if dispatch == "acp":
payload = {
"task_id": str(task.get("id") or ""),
"title": str(task.get("title") or ""),
"runtime": runtime,
"dispatch": dispatch,
"brief_path": str(task.get("brief_path") or ""),
"worktree": str(task.get("worktree") or ""),
"branch": str(task.get("branch") or ""),
"attempt": int(task.get("attempts") or 0),
"max_attempts": int(task.get("max_attempts") or 1),
}
python_code = "import json,sys; print(json.dumps(json.loads(sys.argv[1]), indent=2))"
return f"python3 -c {shlex.quote(python_code)} {shlex.quote(json.dumps(payload))}"
raise RuntimeError("ACP dispatch requires OpenClaw integration (Phase 2)")
if dispatch == "yolo":
brief = _read_brief(task, repo_root)
brief_file = pathlib.Path(str(task.get("_brief_temp_path") or "")).resolve()
if not str(task.get("_brief_temp_path") or "").strip():
raise ValueError("MACP yolo dispatch requires a staged brief file")
inner = (
'export PATH="$HOME/.config/mosaic/bin:$PATH"; '
f"cd {shlex.quote(str(worktree))}; "
f"mosaic yolo {shlex.quote(runtime)} {shlex.quote(brief)}"
f'mosaic yolo {shlex.quote(runtime)} "$(cat {shlex.quote(str(brief_file))})"'
)
return f"script -qec {shlex.quote(inner)} /dev/null"
@@ -280,7 +314,7 @@ def collect_result(task: dict[str, Any], exit_code: int, gate_results: list[dict
return result
def cleanup_worktree(task: dict[str, Any]) -> None:
def cleanup_worktree(task: dict[str, Any], config: dict[str, Any]) -> None:
"""Remove git worktree after task is done."""
worktree_raw = str(task.get("worktree") or "").strip()
@@ -291,12 +325,12 @@ def cleanup_worktree(task: dict[str, Any]) -> None:
if not worktree.exists():
return
common_dir_raw = _git_capture(["git", "-C", str(worktree), "rev-parse", "--git-common-dir"], worktree)
common_dir = pathlib.Path(common_dir_raw)
if not common_dir.is_absolute():
common_dir = (worktree / common_dir).resolve()
repo_root = common_dir.parent if common_dir.name == ".git" else common_dir
if repo_root == worktree:
repo_root = _resolve_repo_root_from_worktree(worktree)
if repo_root is None or repo_root == worktree:
return
if not _is_safe_worktree_path(worktree, config):
print(f"[macp_dispatcher] refusing to clean unsafe worktree path: {worktree}", flush=True)
return
subprocess.run(
@@ -316,7 +350,7 @@ def cleanup_worktree(task: dict[str, Any]) -> None:
def dispatch_task(task: dict[str, Any], repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dict[str, Any]) -> tuple[int, str]:
"""Full dispatch lifecycle: setup -> execute -> collect -> cleanup. Returns (exit_code, output)."""
"""Full dispatch lifecycle: setup -> execute. Returns (exit_code, output)."""
macp_config = dict(config.get("macp") or {})
worker_config = dict(config.get("worker") or {})
@@ -336,22 +370,32 @@ def dispatch_task(task: dict[str, Any], repo_root: pathlib.Path, orch_dir: pathl
result_dir = result_dir[len(".mosaic/orchestrator/") :]
task["result_path"] = f"{result_dir.rstrip('/')}/{task.get('id', 'task')}.json"
if task["dispatch"] == "acp":
task["status"] = "escalated"
task["failed_at"] = now_iso()
task["escalation_reason"] = "ACP dispatch requires OpenClaw integration (Phase 2)"
task["error"] = task["escalation_reason"]
task["_timed_out"] = False
return 1, task["escalation_reason"]
worktree = setup_worktree(task, repo_root)
log_path = orch_dir / "logs" / f"{task.get('id', 'task')}.log"
timeout_sec = int(task.get("timeout_seconds") or worker_config.get("timeout_seconds") or 7200)
command = build_dispatch_command(task, repo_root)
exit_code, output, timed_out = _run_command(command, worktree, log_path, timeout_sec)
task["_timed_out"] = timed_out
if timed_out:
task["error"] = f"Worker command timed out after {timeout_sec}s"
elif exit_code != 0 and not task.get("error"):
task["error"] = f"Worker command failed with exit code {exit_code}"
collect_result(task, exit_code, [], orch_dir)
attempts = int(task.get("attempts") or 0)
max_attempts = int(task.get("max_attempts") or 1)
if exit_code != 0 and attempts >= max_attempts and bool(macp_config.get("cleanup_worktrees", True)):
cleanup_worktree(task)
return exit_code, output
if task["dispatch"] == "yolo":
_stage_yolo_brief_file(task, repo_root, orch_dir)
try:
command = build_dispatch_command(task, repo_root)
exit_code, output, timed_out = _run_command(command, worktree, log_path, timeout_sec)
task["_timed_out"] = timed_out
if timed_out:
task["error"] = f"Worker command timed out after {timeout_sec}s"
elif exit_code != 0 and not task.get("error"):
task["error"] = f"Worker command failed with exit code {exit_code}"
return exit_code, output
finally:
brief_temp_path = str(task.pop("_brief_temp_path", "") or "").strip()
if brief_temp_path:
try:
pathlib.Path(brief_temp_path).unlink(missing_ok=True)
except OSError:
pass