diff --git a/bin/mosaic b/bin/mosaic index 9276bf5..3cd450b 100755 --- a/bin/mosaic +++ b/bin/mosaic @@ -15,6 +15,7 @@ set -euo pipefail # mosaic init [args...] Generate SOUL.md interactively # mosaic doctor [args...] Health audit # mosaic sync [args...] Sync skills +# mosaic macp [args...] Manual MACP queue operations # mosaic seq [subcommand] sequential-thinking MCP management (check/fix/start) # mosaic bootstrap Bootstrap a repo # mosaic upgrade release Upgrade installed Mosaic release @@ -41,6 +42,7 @@ Management: init [args...] Generate SOUL.md (agent identity contract) doctor [args...] Audit runtime state and detect drift sync [args...] Sync skills from canonical source + macp [args...] Manual MACP queue operations seq [subcommand] sequential-thinking MCP management: check [--runtime ] [--strict] fix [--runtime ] @@ -487,6 +489,11 @@ run_sync() { exec "$MOSAIC_HOME/bin/mosaic-sync-skills" "$@" } +run_macp() { + check_mosaic_home + exec "$MOSAIC_HOME/bin/mosaic-macp" "$@" +} + run_seq() { check_mosaic_home local checker="$MOSAIC_HOME/bin/mosaic-ensure-sequential-thinking" @@ -774,6 +781,7 @@ case "$command" in init) run_init "$@" ;; doctor) run_doctor "$@" ;; sync) run_sync "$@" ;; + macp) run_macp "$@" ;; seq) run_seq "$@" ;; bootstrap) run_bootstrap "$@" ;; prdy) run_prdy "$@" ;; diff --git a/bin/mosaic-macp b/bin/mosaic-macp new file mode 100755 index 0000000..413ab55 --- /dev/null +++ b/bin/mosaic-macp @@ -0,0 +1,212 @@ +#!/usr/bin/env bash +set -euo pipefail + +repo_root="$(pwd)" +orch_dir="$repo_root/.mosaic/orchestrator" +config_path="$orch_dir/config.json" +tasks_path="$orch_dir/tasks.json" +events_path="$orch_dir/events.ndjson" + +usage() { + cat <<'USAGE' +mosaic macp — manual MACP queue operations + +Usage: + mosaic macp submit --task-id TASK-001 --title "..." [--type coding] [--dispatch yolo|acp|exec] [--runtime codex] [--brief docs/tasks/TASK-001.md] [--command "..."] [--branch feat/...] + mosaic macp status [--task-id TASK-001] + mosaic macp drain + mosaic macp history [--task-id TASK-001] +USAGE +} + +require_repo() { + if [[ ! -f "$config_path" ]]; then + echo "[mosaic-macp] missing orchestrator config: $config_path" >&2 + exit 1 + fi +} + +submit_task() { + require_repo + + local task_id="" + local title="" + local task_type="coding" + local dispatch="" + local runtime="" + local brief="" + local command="" + local branch="" + + while [[ $# -gt 0 ]]; do + case "$1" in + --task-id) task_id="$2"; shift 2 ;; + --title) title="$2"; shift 2 ;; + --type) task_type="$2"; shift 2 ;; + --dispatch) dispatch="$2"; shift 2 ;; + --runtime) runtime="$2"; shift 2 ;; + --brief) brief="$2"; shift 2 ;; + --command) command="$2"; shift 2 ;; + --branch) branch="$2"; shift 2 ;; + -h|--help) usage; exit 0 ;; + *) echo "[mosaic-macp] unknown submit option: $1" >&2; exit 1 ;; + esac + done + + if [[ -z "$task_id" || -z "$title" ]]; then + echo "[mosaic-macp] submit requires --task-id and --title" >&2 + exit 1 + fi + + python3 - "$config_path" "$tasks_path" "$task_id" "$title" "$task_type" "$dispatch" "$runtime" "$brief" "$command" "$branch" <<'PY' +import json +import pathlib +import sys + +config_path = pathlib.Path(sys.argv[1]) +tasks_path = pathlib.Path(sys.argv[2]) +task_id, title, task_type, dispatch, runtime, brief, command, branch = sys.argv[3:] + +config = json.loads(config_path.read_text(encoding="utf-8")) +macp = dict(config.get("macp") or {}) +worker = dict(config.get("worker") or {}) +payload = {"tasks": []} +if tasks_path.exists(): + payload = json.loads(tasks_path.read_text(encoding="utf-8")) +if not isinstance(payload.get("tasks"), list): + payload = {"tasks": []} + +resolved_dispatch = dispatch or ("yolo" if brief and not command else str(macp.get("default_dispatch") or "exec")) +resolved_runtime = runtime or str(macp.get("default_runtime") or worker.get("runtime") or "codex") +if resolved_dispatch == "exec" and not command: + raise SystemExit("[mosaic-macp] exec dispatch requires --command") + +task = { + "id": task_id, + "title": title, + "description": title, + "status": "pending", + "type": task_type or "coding", + "dispatch": resolved_dispatch, + "runtime": resolved_runtime, + "branch": branch or "", + "brief_path": brief or "", + "command": command or "", + "quality_gates": config.get("quality_gates") or [], + "metadata": {"source": "bin/mosaic-macp"}, +} + +updated = [] +replaced = False +for existing in payload["tasks"]: + if str(existing.get("id")) == task_id: + merged = dict(existing) + merged.update({k: v for k, v in task.items() if v not in ("", [])}) + updated.append(merged) + replaced = True + else: + updated.append(existing) +if not replaced: + updated.append(task) +payload["tasks"] = updated +tasks_path.parent.mkdir(parents=True, exist_ok=True) +tasks_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") +print(f"[mosaic-macp] queued {task_id} dispatch={resolved_dispatch} runtime={resolved_runtime}") +PY +} + +status_tasks() { + require_repo + local task_id="" + while [[ $# -gt 0 ]]; do + case "$1" in + --task-id) task_id="$2"; shift 2 ;; + -h|--help) usage; exit 0 ;; + *) echo "[mosaic-macp] unknown status option: $1" >&2; exit 1 ;; + esac + done + + python3 - "$tasks_path" "$task_id" <<'PY' +import json +import pathlib +import sys + +tasks_path = pathlib.Path(sys.argv[1]) +task_id = sys.argv[2] +payload = {"tasks": []} +if tasks_path.exists(): + payload = json.loads(tasks_path.read_text(encoding="utf-8")) +tasks = payload.get("tasks", []) +counts = {key: 0 for key in ["pending", "running", "gated", "completed", "failed", "escalated"]} +for task in tasks: + status = str(task.get("status") or "pending") + counts[status] = counts.get(status, 0) + 1 + +if task_id: + for task in tasks: + if str(task.get("id")) == task_id: + print(json.dumps(task, indent=2)) + break + else: + raise SystemExit(f"[mosaic-macp] task not found: {task_id}") +else: + print("Queue state:") + for key in ["pending", "running", "gated", "completed", "failed", "escalated"]: + print(f" {key}: {counts.get(key, 0)}") +PY +} + +drain_tasks() { + require_repo + local mosaic_home="${MOSAIC_HOME:-$HOME/.config/mosaic}" + exec "$mosaic_home/bin/mosaic-orchestrator-run" --repo "$repo_root" --until-drained +} + +history_tasks() { + require_repo + local task_id="" + while [[ $# -gt 0 ]]; do + case "$1" in + --task-id) task_id="$2"; shift 2 ;; + -h|--help) usage; exit 0 ;; + *) echo "[mosaic-macp] unknown history option: $1" >&2; exit 1 ;; + esac + done + + python3 - "$events_path" "$task_id" <<'PY' +import json +import pathlib +import sys + +events_path = pathlib.Path(sys.argv[1]) +task_id = sys.argv[2] +if not events_path.exists(): + raise SystemExit(f"[mosaic-macp] events file not found: {events_path}") + +for line in events_path.read_text(encoding="utf-8").splitlines(): + if not line.strip(): + continue + event = json.loads(line) + if task_id and str(event.get("task_id")) != task_id: + continue + print(json.dumps(event, indent=2)) +PY +} + +subcommand="${1:-help}" +if [[ $# -gt 0 ]]; then + shift +fi + +case "$subcommand" in + submit) submit_task "$@" ;; + status) status_tasks "$@" ;; + drain) drain_tasks "$@" ;; + history) history_tasks "$@" ;; + help|-h|--help|"") usage ;; + *) + echo "[mosaic-macp] unknown subcommand: $subcommand" >&2 + usage + exit 1 + ;; +esac diff --git a/docs/DEVELOPER-GUIDE/README.md b/docs/DEVELOPER-GUIDE/README.md new file mode 100644 index 0000000..148140e --- /dev/null +++ b/docs/DEVELOPER-GUIDE/README.md @@ -0,0 +1,5 @@ +# Developer Guide + +## Orchestrator Matrix + +- [MACP Phase 1](./orchestrator-matrix/macp-phase1.md) diff --git a/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase1.md b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase1.md new file mode 100644 index 0000000..5e2cb5c --- /dev/null +++ b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase1.md @@ -0,0 +1,31 @@ +# MACP Phase 1 + +MACP Phase 1 extends `tools/orchestrator-matrix/` without replacing the existing deterministic controller model. + +## What Changed + +1. Task and event schemas now describe MACP dispatch metadata, new lifecycle statuses, and dispatcher-originated events. +2. `tools/orchestrator-matrix/dispatcher/macp_dispatcher.py` manages task worktrees, dispatch command generation, result files, and cleanup. +3. `mosaic_orchestrator.py` routes MACP-aware tasks through the dispatcher while leaving legacy non-`dispatch` tasks on the original shell path. +4. `bin/mosaic-macp` adds manual submit, status, drain, and history operations for `.mosaic/orchestrator/`. + +## Dispatch Modes + +1. `exec`: runs the task's `command` directly inside the task worktree. +2. `yolo`: launches `mosaic yolo ` via a PTY wrapper and stages the brief in a temporary file so the brief body is not exposed in process arguments. +3. `acp`: escalates immediately with `ACP dispatch requires OpenClaw integration (Phase 2)` until real ACP/OpenClaw spawning exists. + +## Result Contract + +MACP writes task result JSON under `.mosaic/orchestrator/results/` by default. Result files capture: + +1. Task status and timing +2. Attempt counters +3. Runtime and dispatch metadata +4. Changed files seen in the task worktree +5. Quality-gate command results +6. Error or escalation details + +## Compatibility + +Legacy tasks that omit `dispatch` still behave like the original matrix controller. `tasks_md_sync.py` only injects MACP fields when the corresponding markdown headers exist, which keeps existing `tasks.json` workflows functional while allowing orchestrators to opt into MACP incrementally. diff --git a/docs/PRD.md b/docs/PRD.md new file mode 100644 index 0000000..be490b8 --- /dev/null +++ b/docs/PRD.md @@ -0,0 +1,98 @@ +# PRD: MACP Phase 1 Core Protocol Implementation + +## Metadata + +- Owner: Jarvis +- Date: 2026-03-27 +- Status: in-progress +- Best-Guess Mode: true + +## Problem Statement + +The current orchestrator-matrix rail can queue shell-based worker tasks, but it does not yet expose a standardized protocol for dispatch selection, worktree-aware execution, structured results, or manual MACP queue operations. MACP Phase 1 extends the existing rail so orchestrators can delegate to multiple runtimes through a consistent task model while preserving current behavior for legacy tasks. + +## Objectives + +1. Extend the existing orchestrator-matrix protocol and controller to support MACP-aware task dispatch and status tracking. +2. Add a dispatcher layer that manages worktree lifecycle, runtime command generation, and standardized results. +3. Provide a CLI entrypoint for manual MACP submission, status inspection, queue draining, and history review. + +## Scope + +### In Scope + +1. Extend the orchestrator task and event schemas and add a result schema. +2. Add a Python dispatcher module under `tools/orchestrator-matrix/dispatcher/`. +3. Update the controller to use the dispatcher for MACP-aware tasks while preserving legacy execution paths. +4. Update orchestrator config templates, task markdown sync logic, and CLI routing/scripts for MACP commands. +5. Add verification for backward compatibility, schema validity, imports, and basic MACP execution flow. + +### Out of Scope + +1. Rewriting the orchestrator controller architecture. +2. Changing Matrix transport behavior beyond schema compatibility. +3. Implementing real OpenClaw `sessions_spawn` execution beyond producing the config payload/command for callers. +4. Adding non-stdlib Python dependencies or npm-based tooling. + +## User/Stakeholder Requirements + +1. MACP must evolve the current orchestrator-matrix implementation rather than replace it. +2. Legacy task queues without `dispatch` fields must continue to run exactly as before. +3. MACP-aware tasks must support dispatch modes `yolo`, `acp`, and `exec`. +4. Results must be written in a structured JSON format suitable for audit and orchestration follow-up. +5. A manual `mosaic macp` CLI must expose submit, status, drain, and history flows. + +## Functional Requirements + +1. Task schema must include MACP dispatch, worktree, result, retry, branch, brief, issue/PR, and dependency fields. +2. Event schema must recognize `task.gated`, `task.escalated`, and `task.retry.scheduled`, plus a `dispatcher` source. +3. Dispatcher functions must set up worktrees, build commands, execute tasks, collect results, and clean up worktrees. +4. Controller `run_single_task()` must route MACP-aware tasks through the dispatcher and emit the correct lifecycle events/status transitions. +5. `tasks_md_sync.py` must map optional MACP table columns only when those headers are present in `docs/TASKS.md`; absent MACP headers must not inject MACP fields into legacy tasks. +6. `bin/mosaic` must route `mosaic macp ...` to a new `bin/mosaic-macp` script. + +## Non-Functional Requirements + +1. Security: no secrets embedded in generated commands, config, or results. +2. Performance: controller remains deterministic and synchronous with no async or thread-based orchestration. +3. Reliability: worktree creation/cleanup failures must be surfaced predictably and produce structured task failure/escalation states. +4. Observability: lifecycle events, logs, and result JSON must clearly show task outcome, attempts, gates, and errors. + +## Acceptance Criteria + +1. Existing legacy tasks without `dispatch` still run through the old shell path with unchanged behavior. +2. MACP-aware `exec` tasks run through the dispatcher and produce result JSON with gate outcomes. +3. New schemas validate task/event/result payload expectations for MACP fields and statuses. +4. `mosaic macp submit`, `status`, and `history` work from a bootstrapped repo state, and `drain` delegates to the existing orchestrator runner. +5. Python imports for the updated controller, dispatcher, and sync code complete without errors on Python 3.10+. + +## Constraints and Dependencies + +1. Python implementation must use stdlib only and support Python 3.10+. +2. Shell tooling must remain bash-based and fit the existing Mosaic CLI style. +3. Dispatch fallback rules must use `exec` when `dispatch` is absent and config/default runtime when `runtime` is absent. +4. Worktree convention must derive from the repository name and task metadata unless explicitly overridden by task fields. + +## Risks and Open Questions + +1. Risk: yolo command execution requires a PTY, so the dispatcher needs a safe wrapper that still behaves under `subprocess`. +2. Risk: worktree cleanup could remove a path unexpectedly if task metadata is malformed. +3. Risk: old queue consumers may assume only the original task statuses and event types. +4. Open Question: whether `task.gated` should be emitted by the dispatcher or controller once worker execution ends and quality gates begin. + +## Testing and Verification Expectations + +1. Baseline checks: Python import validation, targeted script execution checks, JSON syntax/schema validation, and any repo-local validation applicable to changed code paths. +2. Situational testing: legacy orchestrator run with old-style tasks, MACP `exec` flow including result file generation, CLI submit/status/history behavior, and worktree lifecycle validation. +3. Evidence format: command-level results captured in the scratchpad and summarized in the final delivery report. + +## Milestone / Delivery Intent + +1. Target milestone/version: 0.0.x bootstrap enhancement +2. Definition of done: code merged to `main`, CI terminal green, issue `#8` closed, and verification evidence recorded against all acceptance criteria. + +## Assumptions + +1. ASSUMPTION: A single issue can track the full Phase 1 implementation because the user requested one bounded feature delivery rather than separate independent tickets. +2. ASSUMPTION: For `acp` dispatch in Phase 1, the controller must escalate the task immediately with a clear reason instead of pretending work ran before OpenClaw integration exists. +3. ASSUMPTION: `task.gated` should be emitted by the controller as the transition into quality-gate execution, which keeps gate-state ownership in one place alongside the existing gate loop. diff --git a/docs/SITEMAP.md b/docs/SITEMAP.md new file mode 100644 index 0000000..c4e8b1c --- /dev/null +++ b/docs/SITEMAP.md @@ -0,0 +1,7 @@ +# Sitemap + +- [PRD](./PRD.md) +- [Tasks](./TASKS.md) +- [Developer Guide](./DEVELOPER-GUIDE/README.md) +- [Task Briefs](./tasks/MACP-PHASE1-brief.md) +- [Scratchpads](./scratchpads/macp-phase1.md) diff --git a/docs/TASKS.md b/docs/TASKS.md new file mode 100644 index 0000000..2578549 --- /dev/null +++ b/docs/TASKS.md @@ -0,0 +1,17 @@ +# TASKS + +Canonical tracking for active work. Keep this file current. + +## Rules + +1. Update status as work progresses. +2. Link every non-trivial task to a provider issue (`#123`) or internal ref (`TASKS:T1`) if no provider is available. +3. Keep one row per active task. +4. Do not set `status=done` for source-code work until PR is merged, CI/pipeline is terminal green, and linked issue/ref is closed. +5. If merge/CI/issue closure fails, set `status=blocked` and record the exact failed wrapper command in `notes`. + +## Tasks + +| id | status | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | +|---|---|---|---|---|---|---|---|---|---|---|---|---|---| +| MACP-PHASE1 | in-progress | Implement MACP Phase 1 across orchestrator schemas, dispatcher, controller, CLI, config, and task sync while preserving legacy queue behavior. | #8 | bootstrap | feat/macp-phase1 | | | Jarvis | 2026-03-27T23:00:00Z | | medium | in-progress | Review-fix pass started 2026-03-28T00:38:01Z to address backward-compatibility, ACP safety, result timing, and worktree/brief security findings on top of the blocked PR-create state. Prior blocker remains: `~/.config/mosaic/tools/git/pr-create.sh -t 'feat: implement MACP phase 1 core protocol' -b ... -B main -H feat/macp-phase1` failed with `Remote repository required: Specify ID via --repo or execute from a local git repo.` | diff --git a/docs/scratchpads/macp-phase1.md b/docs/scratchpads/macp-phase1.md new file mode 100644 index 0000000..2f52b19 --- /dev/null +++ b/docs/scratchpads/macp-phase1.md @@ -0,0 +1,109 @@ +# MACP Phase 1 Scratchpad + +## Objective + +Implement MACP Phase 1 in `mosaic-bootstrap` by extending the orchestrator-matrix protocol, adding a dispatcher layer, wiring the controller, updating sync/config/CLI behavior, and preserving backward compatibility for legacy tasks. + +## Tracking + +- Task ID: `MACP-PHASE1` +- Issue: `#8` +- Branch: `feat/macp-phase1` +- Repo: `bootstrap` + +## Budget + +- User hard token budget: not specified +- Working assumption: complete in one implementation pass with focused verification and no unnecessary parallelization +- Budget response plan: keep scope limited to the requested file map and verification set + +## Plan + +1. Extend task and event schemas and add the MACP result schema. +2. Build `dispatcher/macp_dispatcher.py` with isolated helpers for worktree, command, result, and cleanup logic. +3. Wire the controller to detect MACP-aware tasks, emit gated/escalated events, and preserve the legacy path for tasks without `dispatch`. +4. Add MACP defaults to config, add `mosaic macp` routing and shell script support, and extend `tasks_md_sync.py`. +5. Run backward-compatibility and MACP verification, then review/remediate before commit and PR flow. + +## Assumptions + +1. `dispatch` presence is the MACP-aware signal for the controller, matching the user brief. +2. `docs/tasks/MACP-PHASE1-brief.md` is an input artifact for this delivery and should remain available for reference. +3. A minimal developer-facing documentation update tied to the orchestrator README/CLI behavior will satisfy the documentation gate for this change surface. + +## Risks + +1. Worktree cleanup must be constrained to task-owned paths only. +2. Gate-state transitions need to remain compatible with current event consumers. +3. CLI submit behavior must avoid corrupting existing `tasks.json` structure. + +## Verification Plan + +| Acceptance Criterion | Verification Method | Evidence | +|---|---|---| +| AC-1 legacy tasks still work | Run controller against old-style tasks without `dispatch` | Passed: temp repo run completed legacy task and wrote completed result JSON | +| AC-2 MACP `exec` flow works | Run controller against MACP task queue and inspect result JSON/events/worktree cleanup | Passed: temp repo run produced `task.gated` and `task.completed`, wrote structured result JSON with `files_changed`, and removed the task worktree | +| AC-3 schemas are valid | Validate schema JSON loads and sample payloads parse/import cleanly | Passed: `jsonschema.validate(...)` succeeded for task, event, and result schema samples | +| AC-4 CLI works | Run `mosaic macp submit/status/history` against a test repo state | Passed: `bin/mosaic-macp` submit/status/history succeeded and `bin/mosaic --help` exposed `macp` | +| AC-5 imports are clean | Run `python3 -c "import ..."` for updated modules | Passed: importlib-based module loads and `py_compile` succeeded | + +## Progress Log + +- 2026-03-27: Loaded global/project/runtime contracts and required delivery guides. +- 2026-03-27: Created provider issue `#8`. +- 2026-03-27: Drafted PRD, task tracking, and this scratchpad before implementation. +- 2026-03-27: Implemented MACP schemas, dispatcher lifecycle, controller integration, CLI support, sync updates, and developer docs. +- 2026-03-27: Added explicit worker escalation handling via the `MACP_ESCALATE:` stdout marker. +- 2026-03-27: Committed and pushed branch `feat/macp-phase1` (`7ef49a3`, `fd6274f`). +- 2026-03-27: Blocked in PR workflow when `~/.config/mosaic/tools/git/pr-create.sh` failed to resolve the remote repository from this worktree. +- 2026-03-28: Resumed from blocked state for a review-fix pass covering 5 findings in `docs/tasks/MACP-PHASE1-fixes.md`. + +## Review Fix Pass + +### Scope + +1. Restore legacy `tasks_md_sync.py` behavior so rows without MACP headers do not become MACP tasks. +2. Make ACP dispatch fail-safe via escalation instead of a no-op success path. +3. Move MACP result writes to the controller after quality gates determine the final task status. +4. Remove brief text from yolo command arguments by switching to file-based brief handoff. +5. Restrict worktree cleanup to validated paths under the configured worktree base. + +### TDD / Test-First Decision + +1. This is a bug-fix and security-hardening pass, so targeted reproducer verification is required. +2. Repo appears to use focused script-level verification rather than a Python test suite for this surface, so reproducer checks will be command-driven and recorded as evidence. + +### Planned Verification Additions + +| Finding | Verification | +|---|---| +| Legacy task reclassification | Sync `docs/TASKS.md` without MACP headers into `tasks.json` and confirm `dispatch` is absent so controller stays on `run_shell()` | +| ACP no-op success | Run controller/dispatcher with `dispatch=acp` and confirm `status=escalated`, exit path is non-zero, and `task.escalated` is emitted | +| Premature result write | Inspect result JSON after final controller state only; confirm gate results are present and no dispatcher pre-write remains | +| Brief exposure | Build yolo command and confirm the brief body is absent from the command text | +| Unsafe cleanup | Call cleanup against a path outside configured base and confirm it is refused | + +## Tests Run + +- `python3 -m py_compile tools/orchestrator-matrix/controller/mosaic_orchestrator.py tools/orchestrator-matrix/controller/tasks_md_sync.py tools/orchestrator-matrix/dispatcher/macp_dispatcher.py` +- importlib module-load check for updated Python files +- `jsonschema.validate(...)` for task/event/result samples +- Temp repo legacy controller run with old-style `tasks.json` +- Temp repo MACP `exec` controller run with worktree/result/cleanup verification +- Temp repo CLI checks using `bin/mosaic-macp submit`, `status`, and `history` +- Temp repo `tasks_md_sync.py --apply` run with MACP columns +- `git diff --check` +- `bash -n bin/mosaic bin/mosaic-macp` + +## Review + +- Automated review attempted with `~/.config/mosaic/tools/codex/codex-code-review.sh --uncommitted` +- Initial review attempt failed because generated `__pycache__` files introduced non-UTF-8 diff content; generated caches were removed +- Manual independent review completed on the dispatcher/controller/CLI delta +- Remediation applied: final MACP results are now written before worktree cleanup, and explicit worker escalation is handled instead of being ignored + +## Residual Risks + +- The repo-local `bin/mosaic` route was verified through help output; full runtime routing depends on installation into `$MOSAIC_HOME/bin`. +- Explicit worker escalation currently uses a stdout marker convention rather than a richer structured worker-to-controller handoff. +- Delivery is blocked on repository automation because PR creation failed before merge/CI/issue-closure stages could run. diff --git a/docs/tasks/MACP-PHASE1-brief.md b/docs/tasks/MACP-PHASE1-brief.md new file mode 100644 index 0000000..d39cb22 --- /dev/null +++ b/docs/tasks/MACP-PHASE1-brief.md @@ -0,0 +1,324 @@ +# MACP Phase 1 — Core Protocol Implementation + +**Branch:** `feat/macp-phase1` +**Repo:** `mosaic-bootstrap` (worktree at `~/src/mosaic-bootstrap-worktrees/macp-phase1`) + +--- + +## Objective + +Extend the existing orchestrator-matrix into **MACP (Mosaic Agent Coordination Protocol)** — a standardized protocol that lets any orchestrator delegate work to any agent runtime, track progress via quality gates, and collect structured results. + +This is an **evolution** of the existing code, not a rewrite. Extend existing schemas, enhance the controller, and add the dispatcher layer. + +--- + +## Task 1: Extend Task Schema (`tools/orchestrator-matrix/protocol/task.schema.json`) + +Add these new fields to the existing schema: + +```json +{ + "type": { + "type": "string", + "enum": ["coding", "deploy", "research", "review", "documentation", "infrastructure"], + "description": "Task type — determines dispatch strategy and gate requirements" + }, + "dispatch": { + "type": "string", + "enum": ["yolo", "acp", "exec"], + "description": "Execution backend: yolo=mosaic yolo (full system), acp=OpenClaw sessions_spawn (sandboxed), exec=direct shell" + }, + "worktree": { + "type": "string", + "description": "Path to git worktree for this task, e.g. ~/src/repo-worktrees/task-042" + }, + "branch": { + "type": "string", + "description": "Git branch name for this task" + }, + "brief_path": { + "type": "string", + "description": "Path to markdown task brief relative to repo root" + }, + "result_path": { + "type": "string", + "description": "Path to JSON result file relative to .mosaic/orchestrator/" + }, + "issue": { + "type": "string", + "description": "Issue reference (e.g. #42)" + }, + "pr": { + "type": ["string", "null"], + "description": "PR number/URL once opened" + }, + "depends_on": { + "type": "array", + "items": { "type": "string" }, + "description": "List of task IDs this task depends on" + }, + "max_attempts": { + "type": "integer", + "minimum": 1, + "default": 1 + }, + "attempts": { + "type": "integer", + "minimum": 0, + "default": 0 + }, + "timeout_seconds": { + "type": "integer", + "description": "Override default timeout for this task" + } +} +``` + +Extend the `status` enum to include: `"pending"`, `"running"`, `"gated"`, `"completed"`, `"failed"`, `"escalated"` + +Keep all existing fields. Keep `additionalProperties: true`. + +--- + +## Task 2: Extend Event Schema (`tools/orchestrator-matrix/protocol/event.schema.json`) + +Add new event types to the enum: +- `task.gated` — Worker done coding, quality gates now running +- `task.escalated` — Requires human intervention +- `task.retry.scheduled` — Task will be retried (already emitted by controller, just not in schema) + +Add new source type: `"dispatcher"` + +Keep all existing event types and fields. + +--- + +## Task 3: Create Result Schema (`tools/orchestrator-matrix/protocol/result.schema.json`) + +New file — standardized worker completion report: + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://mosaicstack.dev/schemas/orchestrator/result.schema.json", + "title": "MACP Task Result", + "type": "object", + "required": ["task_id", "status", "completed_at"], + "properties": { + "task_id": { "type": "string" }, + "status": { "type": "string", "enum": ["completed", "failed", "escalated"] }, + "completed_at": { "type": "string", "format": "date-time" }, + "failed_at": { "type": ["string", "null"], "format": "date-time" }, + "exit_code": { "type": ["integer", "null"] }, + "attempt": { "type": "integer" }, + "max_attempts": { "type": "integer" }, + "runtime": { "type": "string" }, + "dispatch": { "type": "string" }, + "worktree": { "type": ["string", "null"] }, + "branch": { "type": ["string", "null"] }, + "pr": { "type": ["string", "null"] }, + "summary": { "type": "string", "description": "Human-readable summary of what the worker did" }, + "files_changed": { "type": "array", "items": { "type": "string" } }, + "gate_results": { + "type": "array", + "items": { + "type": "object", + "properties": { + "command": { "type": "string" }, + "exit_code": { "type": "integer" }, + "type": { "type": "string", "enum": ["mechanical", "ai-review", "ci-pipeline"] } + } + } + }, + "error": { "type": ["string", "null"] }, + "escalation_reason": { "type": ["string", "null"] }, + "metadata": { "type": "object" } + } +} +``` + +--- + +## Task 4: Build MACP Dispatcher (`tools/orchestrator-matrix/dispatcher/macp_dispatcher.py`) + +New Python module — the core of MACP. This translates task specs into execution commands and manages the full lifecycle. + +### Dispatcher Responsibilities: +1. **Worktree setup** — `git worktree add` before dispatch (if task has `worktree` field) +2. **Command generation** — Build the right command based on `dispatch` type: + - `yolo`: `mosaic yolo codex|claude|opencode ""` (reads from `brief_path`) + - `acp`: Generates a command that would be used with OpenClaw sessions_spawn (just outputs the config JSON for the caller) + - `exec`: Direct shell command from task `command` field +3. **Result collection** — After worker exits, write structured result JSON to `results/.json` +4. **Worktree cleanup** — After task completion (success or failure after max retries), remove the worktree + +### Key Functions: + +```python +def setup_worktree(task: dict, repo_root: Path) -> Path: + """Create git worktree for task. Returns worktree path.""" + +def build_dispatch_command(task: dict, repo_root: Path) -> str: + """Generate execution command based on task dispatch type and runtime.""" + +def collect_result(task: dict, exit_code: int, gate_results: list, orch_dir: Path) -> dict: + """Build standardized result JSON after worker completion.""" + +def cleanup_worktree(task: dict) -> None: + """Remove git worktree after task is done.""" + +def dispatch_task(task: dict, repo_root: Path, orch_dir: Path, config: dict) -> tuple[int, str]: + """Full dispatch lifecycle: setup → execute → collect → cleanup. Returns (exit_code, output).""" +``` + +### Worktree Convention: +- Base path: `~/src/-worktrees/` +- Worktree name: task ID slugified (e.g., `task-042-auth`) +- Branch: `feat/-` (or use task `branch` field if specified) +- Created from: `origin/main` + +### `mosaic yolo` Command Pattern: +The dispatcher needs to invoke `mosaic yolo` via PTY (it requires a terminal). The command pattern: +```bash +export PATH="$HOME/.config/mosaic/bin:$PATH" +cd +mosaic yolo codex "" +``` + +For the dispatcher, since it's called from the controller which uses `subprocess.Popen`, the command should be wrapped appropriately. Use `script -qec` or similar for PTY simulation if needed, but prefer passing the brief via a temp file and using the worker script pattern. + +### Important Constraints: +- The dispatcher is a **library module** imported by the controller — NOT a standalone script +- It should be testable (pure functions where possible, side effects isolated) +- Error handling: if worktree creation fails, task goes to `failed` immediately +- If `dispatch` is not set on a task, fall back to `exec` (backward compatible) +- If `runtime` is not set, fall back to config's `worker.runtime` + +--- + +## Task 5: Wire Controller to Use Dispatcher (`tools/orchestrator-matrix/controller/mosaic_orchestrator.py`) + +Modify the existing controller to use the new dispatcher: + +1. **Import the dispatcher module** +2. **In `run_single_task()`**: + - If task has `dispatch` field (or is MACP-aware), call `dispatcher.dispatch_task()` instead of raw `run_shell()` + - If task does NOT have `dispatch` field, keep existing `run_shell()` behavior (backward compatibility) +3. **Handle new statuses**: `gated` and `escalated` + - `gated`: Set when worker completes but gates are running (transition state between running and completed/failed) + - `escalated`: Set when task needs human intervention (gate failures after max retries, explicit worker escalation) +4. **Emit new event types**: `task.gated` and `task.escalated` + +### Backward Compatibility is Critical: +- Existing tasks.json files with no `dispatch` field MUST continue to work exactly as before +- The controller should detect MACP-aware tasks by checking for the `dispatch` field +- All existing tests/workflows must remain functional + +--- + +## Task 6: Update Config Template (`templates/repo/.mosaic/orchestrator/config.json`) + +Add MACP dispatcher configuration: + +```json +{ + "enabled": false, + "transport": "matrix", + "macp": { + "worktree_base": "~/src/{repo}-worktrees", + "default_dispatch": "exec", + "default_runtime": "codex", + "cleanup_worktrees": true, + "brief_dir": "docs/tasks", + "result_dir": ".mosaic/orchestrator/results" + }, + "worker": { ... existing ... }, + "quality_gates": [ ... existing ... ] +} +``` + +--- + +## Task 7: Build `mosaic macp` CLI (`bin/mosaic-macp`) + +Shell script that provides manual MACP interaction: + +```bash +mosaic macp submit --task-id TASK-001 --title "..." --type coding --runtime codex --brief docs/tasks/TASK-001.md +mosaic macp status [--task-id TASK-001] +mosaic macp drain # Run all pending tasks sequentially +mosaic macp history [--task-id TASK-001] # Show events for a task +``` + +Implementation: +- `submit`: Adds a task to `tasks.json` with MACP fields populated +- `status`: Pretty-prints task queue state (pending/running/gated/completed/failed/escalated counts) +- `drain`: Calls `mosaic-orchestrator-run --until-drained` +- `history`: Reads `events.ndjson` and filters by task ID + +Register in the main `bin/mosaic` dispatcher so `mosaic macp ...` works. + +--- + +## Task 8: Update `tasks_md_sync.py` + +Extend the sync script to handle MACP-specific columns in `docs/TASKS.md`: + +Optional new columns: `type`, `dispatch`, `runtime`, `branch` + +If these columns exist in the markdown table, map them to the task JSON. If they don't exist, use defaults from config. + +--- + +## Verification + +After all tasks are complete: + +1. **Existing behavior preserved**: Run `python3 tools/orchestrator-matrix/controller/mosaic_orchestrator.py --repo /tmp/test-repo --once` with an old-style tasks.json — must work identically +2. **New MACP flow works**: Create a tasks.json with `dispatch: "exec"` tasks, run controller, verify worktree creation, execution, result collection, and cleanup +3. **Schema validation**: All JSON files should validate against their schemas +4. **CLI works**: `mosaic macp status` and `mosaic macp submit` produce correct output +5. **No broken imports**: All Python files should pass `python3 -c "import ..."` + +--- + +## File Map (what goes where) + +``` +tools/orchestrator-matrix/ +├── protocol/ +│ ├── task.schema.json ← MODIFY (extend) +│ ├── event.schema.json ← MODIFY (extend) +│ └── result.schema.json ← NEW +├── dispatcher/ +│ ├── __init__.py ← NEW +│ └── macp_dispatcher.py ← NEW +├── controller/ +│ ├── mosaic_orchestrator.py ← MODIFY (wire dispatcher) +│ └── tasks_md_sync.py ← MODIFY (new columns) +├── transport/ +│ └── matrix_transport.py ← UNCHANGED +└── adapters/ + └── README.md ← UNCHANGED + +templates/repo/.mosaic/orchestrator/ +├── config.json ← MODIFY (add macp section) +├── tasks.json ← UNCHANGED +└── ... + +bin/ +├── mosaic ← MODIFY (add macp subcommand routing) +└── mosaic-macp ← NEW +``` + +--- + +## Ground Rules + +- This is the `mosaic-bootstrap` repo — a CLI framework, NOT a web app +- No npm/pnpm — this is bash + Python (stdlib only, no pip dependencies) +- All Python must work with Python 3.10+ +- Keep the deterministic controller pattern — no async, no threads, no external services +- Commit messages: `feat: ` (conventional commits) +- Push to `feat/macp-phase1` branch when done diff --git a/docs/tasks/MACP-PHASE1-fixes.md b/docs/tasks/MACP-PHASE1-fixes.md new file mode 100644 index 0000000..782360f --- /dev/null +++ b/docs/tasks/MACP-PHASE1-fixes.md @@ -0,0 +1,64 @@ +# MACP Phase 1 — Review Fixes + +**Branch:** `feat/macp-phase1` (amend/fix commits on top of existing) +**Repo worktree:** `~/src/mosaic-bootstrap-worktrees/macp-phase1` + +These are 5 findings from code review and security review. Fix all of them. + +--- + +## Fix 1 (BLOCKER): Legacy task reclassification in tasks_md_sync.py + +**File:** `tools/orchestrator-matrix/controller/tasks_md_sync.py` +**Problem:** `build_task()` now always sets `task["dispatch"]` from `macp_defaults` even when the `docs/TASKS.md` table has no `dispatch` column. The controller's `is_macp_task()` check is simply `"dispatch" in task`, so ALL synced tasks get reclassified as MACP tasks and routed through the dispatcher instead of the legacy `run_shell()` path. This breaks backward compatibility. + +**Fix:** Only populate `dispatch`, `type`, `runtime`, and `branch` from MACP defaults when the markdown table row **explicitly contains that column**. If the column is absent from the table header, do NOT inject MACP fields. Check whether the parsed headers include these column names before adding them. + +--- + +## Fix 2 (BLOCKER): ACP dispatch is a no-op that falsely reports success + +**File:** `tools/orchestrator-matrix/dispatcher/macp_dispatcher.py` +**Problem:** For `dispatch == "acp"`, `build_dispatch_command()` returns a `python3 -c ...` command that only prints JSON but doesn't actually spawn an ACP session. The controller then marks the task completed even though no work ran. + +**Fix:** For `acp` dispatch, do NOT generate a command that runs and exits 0. Instead, set the task status to `"escalated"` with `escalation_reason = "ACP dispatch requires OpenClaw integration (Phase 2)"` and return exit code 1. This makes it fail-safe — ACP tasks won't silently succeed. The controller should emit a `task.escalated` event for these. + +--- + +## Fix 3 (SHOULD-FIX): Premature result write before quality gates + +**File:** `tools/orchestrator-matrix/dispatcher/macp_dispatcher.py` +**Problem:** `dispatch_task()` calls `collect_result(task, exit_code, [], orch_dir)` immediately after the worker exits, before quality gates run. If the controller crashes between this write and the final overwrite, a false "completed" result with empty gate_results sits on disk. + +**Fix:** Remove the `collect_result()` call from `dispatch_task()`. Instead, have `dispatch_task()` return `(exit_code, output)` only. The controller in `mosaic_orchestrator.py` should call `collect_result()` AFTER quality gates have run, when the final status is known. Make sure the controller passes gate_results to collect_result. + +--- + +## Fix 4 (SECURITY MEDIUM): Brief contents exposed in process arguments + +**File:** `tools/orchestrator-matrix/dispatcher/macp_dispatcher.py` +**Problem:** For `yolo` dispatch, `build_dispatch_command()` puts the full task brief text into the shell command as an argument: `mosaic yolo codex ""`. This is visible in `ps` output, shell logs, and crash reports. + +**Fix:** Write the brief contents to a temporary file with restrictive permissions (0600), and pass the file path to the worker instead. The command should read: `mosaic yolo "$(cat /path/to/brief-TASKID.tmp)"` or better, restructure so the worker script reads from a file path argument. Use the existing `orchestrator-worker.sh` pattern which already reads from a task file. Clean up the temp file after dispatch completes. + +--- + +## Fix 5 (SECURITY MEDIUM): Unrestricted worktree cleanup path + +**File:** `tools/orchestrator-matrix/dispatcher/macp_dispatcher.py` +**Problem:** `cleanup_worktree()` trusts `task['worktree']` without validating it belongs to the expected worktree base directory. A tampered task could cause deletion of unrelated worktrees. + +**Fix:** Before running `git worktree remove`, validate that the resolved worktree path starts with the configured `worktree_base` (from `config.macp.worktree_base`, with `{repo}` expanded). If the path doesn't match the expected base, log a warning and refuse to clean up. Add a helper `_is_safe_worktree_path(worktree_path, config)` for this validation. + +--- + +## Verification + +After fixes: +1. `python3 -c "from tools.orchestrator_matrix.dispatcher import macp_dispatcher"` should not error (fix import path if needed, or just verify `python3 tools/orchestrator-matrix/dispatcher/macp_dispatcher.py` has no syntax errors) +2. Legacy tasks.json with no `dispatch` field must still work with the controller's `run_shell()` path +3. ACP dispatch should NOT mark tasks completed — should escalate +4. No brief text should appear in generated shell commands for yolo dispatch +5. `cleanup_worktree()` should refuse paths outside the configured base + +Commit with: `fix: address review findings — backward compat, ACP safety, result timing, security` diff --git a/templates/repo/.mosaic/orchestrator/config.json b/templates/repo/.mosaic/orchestrator/config.json index d43fd05..e6ae0e6 100644 --- a/templates/repo/.mosaic/orchestrator/config.json +++ b/templates/repo/.mosaic/orchestrator/config.json @@ -1,6 +1,14 @@ { "enabled": false, "transport": "matrix", + "macp": { + "worktree_base": "~/src/{repo}-worktrees", + "default_dispatch": "exec", + "default_runtime": "codex", + "cleanup_worktrees": true, + "brief_dir": "docs/tasks", + "result_dir": ".mosaic/orchestrator/results" + }, "matrix": { "control_room_id": "", "workspace_id": "", diff --git a/tools/orchestrator-matrix/README.md b/tools/orchestrator-matrix/README.md index 68698bc..0461c12 100644 --- a/tools/orchestrator-matrix/README.md +++ b/tools/orchestrator-matrix/README.md @@ -13,6 +13,7 @@ mechanical quality gates. ## Components - `protocol/` - JSON schemas for task/event payloads +- `dispatcher/` - MACP dispatch helpers for worktrees, command generation, results, and cleanup - `controller/mosaic_orchestrator.py` - deterministic controller loop - `adapters/` - runtime adapter guidance @@ -83,3 +84,29 @@ Task injection message format (room text): ```text !mosaic-task {"id":"TASK-123","title":"Fix bug","command":"echo run","quality_gates":["pnpm lint"]} ``` + +## MACP Notes + +MACP-aware tasks add dispatch metadata on top of the existing queue model: + +- `dispatch`: `exec`, `yolo`, or `acp` +- `type`: task category used for orchestration intent +- `worktree` / `branch`: task-specific git execution context +- `brief_path`: markdown brief consumed by runtime-backed dispatchers +- `result_path`: structured result JSON written under `.mosaic/orchestrator/` + +Controller behavior remains backward compatible: + +- Tasks without `dispatch` continue through the legacy shell execution path. +- Tasks with `dispatch` use the MACP dispatcher and can emit `task.gated` and `task.escalated`. +- `acp` dispatch is fail-safe in Phase 1: it escalates with `ACP dispatch requires OpenClaw integration (Phase 2)` instead of reporting success. +- `yolo` dispatch stages the brief in a temporary file so the brief body does not appear in process arguments. + +Manual queue operations are exposed through: + +```bash +mosaic macp submit ... +mosaic macp status +mosaic macp drain +mosaic macp history --task-id TASK-001 +``` diff --git a/tools/orchestrator-matrix/controller/mosaic_orchestrator.py b/tools/orchestrator-matrix/controller/mosaic_orchestrator.py index f7d798c..003a204 100755 --- a/tools/orchestrator-matrix/controller/mosaic_orchestrator.py +++ b/tools/orchestrator-matrix/controller/mosaic_orchestrator.py @@ -14,6 +14,12 @@ import time import uuid from typing import Any +DISPATCHER_DIR = pathlib.Path(__file__).resolve().parent.parent / "dispatcher" +if str(DISPATCHER_DIR) not in sys.path: + sys.path.insert(0, str(DISPATCHER_DIR)) + +import macp_dispatcher + def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).isoformat() @@ -115,6 +121,29 @@ def is_completed_status(status: str) -> bool: return status in {"completed", "done"} +def is_macp_task(task: dict[str, Any]) -> bool: + return "dispatch" in task + + +def normalize_gate_result(gate: Any) -> dict[str, Any]: + if isinstance(gate, str): + return {"command": gate, "type": "mechanical"} + if isinstance(gate, dict): + return { + "command": str(gate.get("command") or ""), + "type": str(gate.get("type") or "mechanical"), + } + return {"command": "", "type": "mechanical"} + + +def detect_worker_escalation(output: str) -> str | None: + marker = "MACP_ESCALATE:" + for line in output.splitlines(): + if marker in line: + return line.split(marker, 1)[1].strip() or "Worker requested escalation" + return None + + def pick_next_task(tasks: list[dict[str, Any]]) -> dict[str, Any] | None: status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in tasks} for task in tasks: @@ -164,27 +193,58 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic task_file = orch_dir / f"task-{task_id}.json" save_json(task_file, task) - cmd = str(task.get("command", "")).strip() - if not cmd: - template = str(config.get("worker", {}).get("command_template", "")).strip() - if template: - cmd = render_command_template(template, task, task_file) - - if not cmd: - task["status"] = "failed" - task["failed_at"] = now_iso() - task["error"] = "No task command or worker command_template configured." - save_json(tasks_path, {"tasks": task_items}) - emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) - state["running_task_id"] = None - state["updated_at"] = now_iso() - save_json(state_path, state) - return True - timeout_sec = int(task.get("timeout_seconds") or config.get("worker", {}).get("timeout_seconds") or 7200) - rc, _, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec) + timed_out = False + output = "" + rc = 0 + if is_macp_task(task): + try: + rc, output = macp_dispatcher.dispatch_task(task, repo_root, orch_dir, config) + timed_out = bool(task.get("_timed_out")) + except Exception as exc: + rc = 1 + task["error"] = str(exc) + else: + cmd = str(task.get("command", "")).strip() + if not cmd: + template = str(config.get("worker", {}).get("command_template", "")).strip() + if template: + cmd = render_command_template(template, task, task_file) + + if not cmd: + task["status"] = "failed" + task["failed_at"] = now_iso() + task["error"] = "No task command or worker command_template configured." + save_json(tasks_path, {"tasks": task_items}) + emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) + state["running_task_id"] = None + state["updated_at"] = now_iso() + save_json(state_path, state) + return True + + rc, output, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec) + if rc != 0: - task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}" + if is_macp_task(task) and str(task.get("status") or "") == "escalated": + task["failed_at"] = str(task.get("failed_at") or now_iso()) + emit_event( + events_path, + "task.escalated", + task_id, + "escalated", + "controller", + str(task.get("escalation_reason") or task.get("error") or "Task requires human intervention."), + ) + save_json(tasks_path, {"tasks": task_items}) + state["running_task_id"] = None + state["updated_at"] = now_iso() + save_json(state_path, state) + macp_dispatcher.collect_result(task, rc, [], orch_dir) + if bool(config.get("macp", {}).get("cleanup_worktrees", True)): + macp_dispatcher.cleanup_worktree(task, config) + return True + if not task.get("error"): + task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}" if attempt < max_attempts: task["status"] = "pending" task["last_failed_at"] = now_iso() @@ -204,34 +264,61 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic state["running_task_id"] = None state["updated_at"] = now_iso() save_json(state_path, state) - save_json( - results_dir / f"{task_id}.json", - {"task_id": task_id, "status": task["status"], "exit_code": rc, "attempt": attempt, "max_attempts": max_attempts}, - ) + if is_macp_task(task): + if task["status"] == "failed": + macp_dispatcher.collect_result(task, rc, [], orch_dir) + if bool(config.get("macp", {}).get("cleanup_worktrees", True)): + macp_dispatcher.cleanup_worktree(task, config) + else: + save_json( + results_dir / f"{task_id}.json", + {"task_id": task_id, "status": task["status"], "exit_code": rc, "attempt": attempt, "max_attempts": max_attempts}, + ) return True + escalation_reason = detect_worker_escalation(output) if is_macp_task(task) else None + if escalation_reason: + task["status"] = "escalated" + task["failed_at"] = now_iso() + task["escalation_reason"] = escalation_reason + emit_event(events_path, "task.escalated", task_id, "escalated", "controller", escalation_reason) + save_json(tasks_path, {"tasks": task_items}) + state["running_task_id"] = None + state["updated_at"] = now_iso() + save_json(state_path, state) + macp_dispatcher.collect_result(task, rc, [], orch_dir) + if bool(config.get("macp", {}).get("cleanup_worktrees", True)): + macp_dispatcher.cleanup_worktree(task, config) + return True + + task["status"] = "gated" + save_json(tasks_path, {"tasks": task_items}) + emit_event(events_path, "task.gated", task_id, "gated", "controller", "Worker completed; quality gates starting") + gates = task.get("quality_gates") or config.get("quality_gates") or [] all_passed = True gate_results: list[dict[str, Any]] = [] for gate in gates: - gate_cmd = str(gate).strip() + gate_entry = normalize_gate_result(gate) + gate_cmd = gate_entry["command"] if not gate_cmd: continue - emit_event(events_path, "rail.check.started", task_id, "running", "quality-gate", f"Running gate: {gate_cmd}") - gate_rc, _, gate_timed_out = run_shell(gate_cmd, repo_root, log_path, timeout_sec) + gate_cwd = pathlib.Path(os.path.expanduser(str(task.get("worktree") or repo_root))).resolve() if is_macp_task(task) else repo_root + emit_event(events_path, "rail.check.started", task_id, "gated", "quality-gate", f"Running gate: {gate_cmd}") + gate_rc, _, gate_timed_out = run_shell(gate_cmd, gate_cwd, log_path, timeout_sec) if gate_rc == 0: - emit_event(events_path, "rail.check.passed", task_id, "running", "quality-gate", f"Gate passed: {gate_cmd}") + emit_event(events_path, "rail.check.passed", task_id, "gated", "quality-gate", f"Gate passed: {gate_cmd}") else: all_passed = False emit_event( events_path, "rail.check.failed", task_id, - "failed", + "gated", "quality-gate", f"Gate timed out after {timeout_sec}s: {gate_cmd}" if gate_timed_out else f"Gate failed ({gate_rc}): {gate_cmd}", ) - gate_results.append({"command": gate_cmd, "exit_code": gate_rc}) + gate_results.append({"command": gate_cmd, "exit_code": gate_rc, "type": gate_entry["type"]}) if all_passed: task["status"] = "completed" @@ -251,24 +338,34 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic f"{task['error']}; retry {attempt + 1}/{max_attempts}", ) else: - task["status"] = "failed" + task["status"] = "escalated" if is_macp_task(task) else "failed" task["failed_at"] = now_iso() - emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) + if is_macp_task(task): + task["escalation_reason"] = "Quality gates failed after max retries" + emit_event(events_path, "task.escalated", task_id, "escalated", "controller", task["escalation_reason"]) + else: + emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) save_json(tasks_path, {"tasks": task_items}) state["running_task_id"] = None state["updated_at"] = now_iso() save_json(state_path, state) - save_json( - results_dir / f"{task_id}.json", - { - "task_id": task_id, - "status": task["status"], - "completed_at": task.get("completed_at"), - "failed_at": task.get("failed_at"), - "gate_results": gate_results, - }, - ) + if is_macp_task(task): + if task["status"] in {"completed", "failed", "escalated"}: + macp_dispatcher.collect_result(task, rc, gate_results, orch_dir) + if bool(config.get("macp", {}).get("cleanup_worktrees", True)): + macp_dispatcher.cleanup_worktree(task, config) + else: + save_json( + results_dir / f"{task_id}.json", + { + "task_id": task_id, + "status": task["status"], + "completed_at": task.get("completed_at"), + "failed_at": task.get("failed_at"), + "gate_results": gate_results, + }, + ) return True @@ -276,10 +373,14 @@ def queue_state(orch_dir: pathlib.Path) -> dict[str, int]: tasks = load_json(orch_dir / "tasks.json", {"tasks": []}) task_items = tasks.get("tasks", []) if not isinstance(task_items, list): - return {"pending": 0, "running": 0, "runnable": 0} + return {"pending": 0, "running": 0, "gated": 0, "completed": 0, "failed": 0, "escalated": 0, "runnable": 0} pending = 0 running = 0 + gated = 0 + completed = 0 + failed = 0 + escalated = 0 runnable = 0 status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in task_items} for task in task_items: @@ -291,7 +392,23 @@ def queue_state(orch_dir: pathlib.Path) -> dict[str, int]: runnable += 1 if status == "running": running += 1 - return {"pending": pending, "running": running, "runnable": runnable} + if status == "gated": + gated += 1 + if status == "completed": + completed += 1 + if status == "failed": + failed += 1 + if status == "escalated": + escalated += 1 + return { + "pending": pending, + "running": running, + "gated": gated, + "completed": completed, + "failed": failed, + "escalated": escalated, + "runnable": runnable, + } def main() -> int: diff --git a/tools/orchestrator-matrix/controller/tasks_md_sync.py b/tools/orchestrator-matrix/controller/tasks_md_sync.py index 4bc0812..04092dc 100644 --- a/tools/orchestrator-matrix/controller/tasks_md_sync.py +++ b/tools/orchestrator-matrix/controller/tasks_md_sync.py @@ -35,9 +35,9 @@ def split_pipe_row(line: str) -> list[str]: return [c.strip() for c in row.split("|")] -def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]: +def parse_tasks_markdown(path: pathlib.Path) -> tuple[set[str], list[dict[str, str]]]: if not path.exists(): - return [] + return set(), [] lines = path.read_text(encoding="utf-8").splitlines() header_idx = -1 @@ -51,7 +51,7 @@ def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]: headers = cells break if header_idx < 0: - return [] + return set(), [] rows: list[dict[str, str]] = [] for line in lines[header_idx + 2 :]: @@ -67,7 +67,7 @@ def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]: if not task_id or task_id.lower() == "id": continue rows.append(row) - return rows + return set(headers), rows def map_status(raw: str) -> str: @@ -78,9 +78,11 @@ def map_status(raw: str) -> str: "pending": "pending", "in-progress": "pending", "needs-qa": "pending", + "gated": "gated", "done": "completed", "completed": "completed", "failed": "failed", + "escalated": "escalated", } return mapping.get(value, "pending") @@ -91,7 +93,9 @@ def parse_depends(raw: str) -> list[str]: def build_task( row: dict[str, str], + headers: set[str], existing: dict[str, Any], + macp_defaults: dict[str, str], runtime_default: str, source_path: str, ) -> dict[str, Any]: @@ -100,6 +104,9 @@ def build_task( issue = row.get("issue", "").strip() repo = row.get("repo", "").strip() branch = row.get("branch", "").strip() + task_type = row.get("type", "").strip() + dispatch = row.get("dispatch", "").strip() + runtime = row.get("runtime", "").strip() depends_on = parse_depends(row.get("depends_on", "")) task = dict(existing) @@ -108,9 +115,25 @@ def build_task( task["description"] = description task["status"] = map_status(row.get("status", "pending")) task["depends_on"] = depends_on - task["runtime"] = str(task.get("runtime") or runtime_default or "codex") + task["issue"] = issue or str(task.get("issue") or "") task["command"] = str(task.get("command") or "") task["quality_gates"] = task.get("quality_gates") or [] + if "type" in headers: + task["type"] = task_type or str(task.get("type") or macp_defaults.get("type") or "coding") + else: + task.pop("type", None) + if "dispatch" in headers: + task["dispatch"] = dispatch or str(task.get("dispatch") or macp_defaults.get("dispatch") or "") + else: + task.pop("dispatch", None) + if "runtime" in headers: + task["runtime"] = runtime or str(task.get("runtime") or macp_defaults.get("runtime") or runtime_default or "codex") + else: + task.pop("runtime", None) + if "branch" in headers: + task["branch"] = branch or str(task.get("branch") or macp_defaults.get("branch") or "") + else: + task.pop("branch", None) metadata = dict(task.get("metadata") or {}) metadata.update( { @@ -147,9 +170,16 @@ def main() -> int: tasks_path = (repo / args.tasks_json).resolve() config_path = repo / ".mosaic" / "orchestrator" / "config.json" config = load_json(config_path, {}) - runtime_default = str(config.get("worker", {}).get("runtime") or "codex") + macp_config = dict(config.get("macp") or {}) + runtime_default = str(config.get("worker", {}).get("runtime") or macp_config.get("default_runtime") or "codex") + macp_defaults = { + "type": "coding", + "dispatch": str(macp_config.get("default_dispatch") or ""), + "runtime": str(macp_config.get("default_runtime") or runtime_default or "codex"), + "branch": "", + } - rows = parse_tasks_markdown(docs_path) + headers, rows = parse_tasks_markdown(docs_path) try: source_path = str(docs_path.relative_to(repo)) except ValueError: @@ -170,7 +200,9 @@ def main() -> int: out_tasks.append( build_task( row, + headers, existing_by_id.get(task_id, {}), + macp_defaults, runtime_default, source_path, ) diff --git a/tools/orchestrator-matrix/dispatcher/__init__.py b/tools/orchestrator-matrix/dispatcher/__init__.py new file mode 100644 index 0000000..3e9676d --- /dev/null +++ b/tools/orchestrator-matrix/dispatcher/__init__.py @@ -0,0 +1,15 @@ +"""MACP dispatcher helpers for orchestrator-matrix.""" + +from .macp_dispatcher import build_dispatch_command +from .macp_dispatcher import cleanup_worktree +from .macp_dispatcher import collect_result +from .macp_dispatcher import dispatch_task +from .macp_dispatcher import setup_worktree + +__all__ = [ + "build_dispatch_command", + "cleanup_worktree", + "collect_result", + "dispatch_task", + "setup_worktree", +] diff --git a/tools/orchestrator-matrix/dispatcher/macp_dispatcher.py b/tools/orchestrator-matrix/dispatcher/macp_dispatcher.py new file mode 100644 index 0000000..997cbf1 --- /dev/null +++ b/tools/orchestrator-matrix/dispatcher/macp_dispatcher.py @@ -0,0 +1,401 @@ +#!/usr/bin/env python3 +"""MACP dispatcher library for orchestrator-matrix tasks.""" + +from __future__ import annotations + +import datetime as dt +import json +import os +import pathlib +import re +import shlex +import subprocess +import tempfile +from typing import Any + + +def now_iso() -> str: + return dt.datetime.now(dt.timezone.utc).isoformat() + + +def save_json(path: pathlib.Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with tmp.open("w", encoding="utf-8") as handle: + json.dump(data, handle, indent=2) + handle.write("\n") + tmp.replace(path) + + +def _slugify(value: str) -> str: + lowered = value.strip().lower() + slug = re.sub(r"[^a-z0-9]+", "-", lowered) + return slug.strip("-") or "task" + + +def _run_command(command: str, cwd: pathlib.Path, log_path: pathlib.Path, timeout_sec: int) -> tuple[int, str, bool]: + log_path.parent.mkdir(parents=True, exist_ok=True) + with log_path.open("a", encoding="utf-8") as log: + log.write(f"\n[{now_iso()}] COMMAND: {command}\n") + log.flush() + proc = subprocess.Popen( + ["bash", "-lc", command], + cwd=str(cwd), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + encoding="utf-8", + ) + timed_out = False + try: + output, _ = proc.communicate(timeout=max(1, timeout_sec)) + exit_code = proc.returncode + except subprocess.TimeoutExpired: + timed_out = True + proc.kill() + output, _ = proc.communicate() + exit_code = 124 + log.write(f"[{now_iso()}] TIMEOUT: exceeded {timeout_sec}s\n") + + if output: + log.write(output) + log.write(f"[{now_iso()}] EXIT: {exit_code}\n") + return exit_code, output or "", timed_out + + +def _git_capture(args: list[str], cwd: pathlib.Path) -> str: + proc = subprocess.run(args, cwd=str(cwd), check=True, capture_output=True, text=True, encoding="utf-8") + return proc.stdout.strip() + + +def _resolve_dispatch(task: dict[str, Any]) -> str: + return str(task.get("dispatch") or "exec").strip().lower() or "exec" + + +def _resolve_runtime(task: dict[str, Any]) -> str: + return str(task.get("runtime") or "codex").strip() or "codex" + + +def _resolve_branch(task: dict[str, Any]) -> str: + branch = str(task.get("branch") or "").strip() + if branch: + return branch + + task_id_slug = _slugify(str(task.get("id") or "task")) + title_slug = _slugify(str(task.get("title") or task.get("id") or "task")) + if title_slug == task_id_slug: + branch = f"feat/{task_id_slug}" + else: + branch = f"feat/{task_id_slug}-{title_slug}" + task["branch"] = branch + return branch + + +def _resolve_worktree_path(task: dict[str, Any], repo_root: pathlib.Path) -> pathlib.Path: + explicit = str(task.get("worktree") or "").strip() + if explicit: + path = pathlib.Path(os.path.expanduser(explicit)).resolve() + else: + base = pathlib.Path(os.path.expanduser(f"~/src/{repo_root.name}-worktrees")).resolve() + path = base / _slugify(str(task.get("id") or task.get("title") or "task")) + task["worktree"] = str(path) + return path + + +def _default_worktree_path(task: dict[str, Any], repo_root: pathlib.Path, base_template: str) -> pathlib.Path: + base = pathlib.Path(os.path.expanduser(base_template.format(repo=repo_root.name))).resolve() + return base / _slugify(str(task.get("id") or task.get("title") or "task")) + + +def _resolve_start_point(repo_root: pathlib.Path) -> str: + candidates = ["origin/main", "main", "HEAD"] + for candidate in candidates: + proc = subprocess.run( + ["git", "-C", str(repo_root), "rev-parse", "--verify", f"{candidate}^{{commit}}"], + capture_output=True, + text=True, + encoding="utf-8", + ) + if proc.returncode == 0: + return candidate + raise RuntimeError(f"Unable to resolve worktree start point for {repo_root}") + + +def _read_brief(task: dict[str, Any], repo_root: pathlib.Path) -> str: + brief_path_raw = str(task.get("brief_path") or "").strip() + if not brief_path_raw: + raise ValueError("MACP yolo dispatch requires brief_path") + brief_path = (repo_root / brief_path_raw).resolve() + return brief_path.read_text(encoding="utf-8").strip() + + +def _stage_yolo_brief_file(task: dict[str, Any], repo_root: pathlib.Path, orch_dir: pathlib.Path) -> pathlib.Path: + brief_dir = (orch_dir / "tmp").resolve() + brief_dir.mkdir(parents=True, exist_ok=True) + task_id = _slugify(str(task.get("id") or "task")) + fd, raw_path = tempfile.mkstemp(prefix=f"brief-{task_id}-", suffix=".tmp", dir=str(brief_dir), text=True) + with os.fdopen(fd, "w", encoding="utf-8") as handle: + handle.write(_read_brief(task, repo_root)) + handle.write("\n") + os.chmod(raw_path, 0o600) + path = pathlib.Path(raw_path).resolve() + task["_brief_temp_path"] = str(path) + return path + + +def _resolve_result_path(task: dict[str, Any], orch_dir: pathlib.Path) -> pathlib.Path: + result_path_raw = str(task.get("result_path") or "").strip() + if result_path_raw: + result_path = (orch_dir / result_path_raw).resolve() + else: + result_path = (orch_dir / "results" / f"{task.get('id', 'task')}.json").resolve() + task["result_path"] = str(result_path.relative_to(orch_dir)) + return result_path + + +def _resolve_worktree_base(config: dict[str, Any], repo_name: str) -> pathlib.Path: + macp_config = dict(config.get("macp") or {}) + base_template = str(macp_config.get("worktree_base") or "~/src/{repo}-worktrees") + return pathlib.Path(os.path.expanduser(base_template.format(repo=repo_name))).resolve() + + +def _changed_files(task: dict[str, Any]) -> list[str]: + worktree_raw = str(task.get("worktree") or "").strip() + if not worktree_raw: + return [] + + worktree = pathlib.Path(os.path.expanduser(worktree_raw)).resolve() + if not worktree.exists(): + return [] + + proc = subprocess.run( + ["git", "-C", str(worktree), "status", "--porcelain"], + capture_output=True, + text=True, + encoding="utf-8", + ) + if proc.returncode != 0: + return [] + + changed: list[str] = [] + for line in proc.stdout.splitlines(): + path_text = line[3:].strip() + if not path_text: + continue + if " -> " in path_text: + _, path_text = path_text.split(" -> ", 1) + changed.append(path_text) + return changed + + +def _resolve_repo_root_from_worktree(worktree: pathlib.Path) -> pathlib.Path | None: + try: + common_dir_raw = _git_capture(["git", "-C", str(worktree), "rev-parse", "--git-common-dir"], worktree) + except Exception: + return None + common_dir = pathlib.Path(common_dir_raw) + if not common_dir.is_absolute(): + common_dir = (worktree / common_dir).resolve() + return common_dir.parent if common_dir.name == ".git" else common_dir + + +def _is_safe_worktree_path(worktree_path: pathlib.Path, config: dict[str, Any]) -> bool: + repo_root = _resolve_repo_root_from_worktree(worktree_path) + if repo_root is None: + return False + expected_base = _resolve_worktree_base(config, repo_root.name) + try: + worktree_path.resolve().relative_to(expected_base) + return True + except ValueError: + return False + + +def setup_worktree(task: dict[str, Any], repo_root: pathlib.Path) -> pathlib.Path: + """Create git worktree for task. Returns worktree path.""" + + worktree_path = _resolve_worktree_path(task, repo_root) + branch = _resolve_branch(task) + if worktree_path.exists() and (worktree_path / ".git").exists(): + return worktree_path + + worktree_path.parent.mkdir(parents=True, exist_ok=True) + start_point = _resolve_start_point(repo_root) + subprocess.run( + ["git", "-C", str(repo_root), "worktree", "add", "-B", branch, str(worktree_path), start_point], + check=True, + capture_output=True, + text=True, + encoding="utf-8", + ) + task["worktree"] = str(worktree_path) + return worktree_path + + +def build_dispatch_command(task: dict[str, Any], repo_root: pathlib.Path) -> str: + """Generate execution command based on task dispatch type and runtime.""" + + dispatch = _resolve_dispatch(task) + runtime = _resolve_runtime(task) + worktree = pathlib.Path(os.path.expanduser(str(task.get("worktree") or repo_root))).resolve() + + if dispatch == "exec": + command = str(task.get("command") or "").strip() + if not command: + raise ValueError("MACP exec dispatch requires command") + return command + + if dispatch == "acp": + raise RuntimeError("ACP dispatch requires OpenClaw integration (Phase 2)") + + if dispatch == "yolo": + brief_file = pathlib.Path(str(task.get("_brief_temp_path") or "")).resolve() + if not str(task.get("_brief_temp_path") or "").strip(): + raise ValueError("MACP yolo dispatch requires a staged brief file") + inner = ( + 'export PATH="$HOME/.config/mosaic/bin:$PATH"; ' + f"cd {shlex.quote(str(worktree))}; " + f'mosaic yolo {shlex.quote(runtime)} "$(cat {shlex.quote(str(brief_file))})"' + ) + return f"script -qec {shlex.quote(inner)} /dev/null" + + raise ValueError(f"Unsupported MACP dispatch type: {dispatch}") + + +def collect_result(task: dict[str, Any], exit_code: int, gate_results: list[dict[str, Any]], orch_dir: pathlib.Path) -> dict[str, Any]: + """Build standardized result JSON after worker completion.""" + + raw_status = str(task.get("status") or "") + if raw_status in {"completed", "failed", "escalated"}: + status = raw_status + else: + status = "completed" if exit_code == 0 else "failed" + completed_at = str(task.get("completed_at") or task.get("failed_at") or now_iso()) + failed_at = task.get("failed_at") + if status == "failed" and not failed_at: + failed_at = now_iso() + + normalized_gates: list[dict[str, Any]] = [] + for gate in gate_results: + normalized_gates.append( + { + "command": str(gate.get("command") or ""), + "exit_code": int(gate.get("exit_code") or 0), + "type": str(gate.get("type") or "mechanical"), + } + ) + + summary_map = { + "completed": "Task completed and quality gates passed.", + "failed": "Task failed before completion.", + "escalated": "Task requires human intervention.", + } + result = { + "task_id": str(task.get("id") or ""), + "status": status, + "completed_at": completed_at, + "failed_at": failed_at, + "exit_code": exit_code, + "attempt": int(task.get("attempts") or 0), + "max_attempts": int(task.get("max_attempts") or 1), + "runtime": _resolve_runtime(task), + "dispatch": _resolve_dispatch(task), + "worktree": str(task.get("worktree")) if task.get("worktree") else None, + "branch": str(task.get("branch")) if task.get("branch") else None, + "pr": str(task.get("pr")) if task.get("pr") else None, + "summary": str(task.get("summary") or summary_map.get(status, "Task processed.")), + "files_changed": _changed_files(task), + "gate_results": normalized_gates, + "error": task.get("error"), + "escalation_reason": task.get("escalation_reason"), + "metadata": dict(task.get("metadata") or {}), + } + save_json(_resolve_result_path(task, orch_dir), result) + return result + + +def cleanup_worktree(task: dict[str, Any], config: dict[str, Any]) -> None: + """Remove git worktree after task is done.""" + + worktree_raw = str(task.get("worktree") or "").strip() + if not worktree_raw: + return + + worktree = pathlib.Path(os.path.expanduser(worktree_raw)).resolve() + if not worktree.exists(): + return + + repo_root = _resolve_repo_root_from_worktree(worktree) + if repo_root is None or repo_root == worktree: + return + + if not _is_safe_worktree_path(worktree, config): + print(f"[macp_dispatcher] refusing to clean unsafe worktree path: {worktree}", flush=True) + return + + subprocess.run( + ["git", "-C", str(repo_root), "worktree", "remove", "--force", str(worktree)], + check=True, + capture_output=True, + text=True, + encoding="utf-8", + ) + subprocess.run( + ["git", "-C", str(repo_root), "worktree", "prune"], + check=False, + capture_output=True, + text=True, + encoding="utf-8", + ) + + +def dispatch_task(task: dict[str, Any], repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dict[str, Any]) -> tuple[int, str]: + """Full dispatch lifecycle: setup -> execute. Returns (exit_code, output).""" + + macp_config = dict(config.get("macp") or {}) + worker_config = dict(config.get("worker") or {}) + task["dispatch"] = _resolve_dispatch({"dispatch": task.get("dispatch") or macp_config.get("default_dispatch") or "exec"}) + task["runtime"] = _resolve_runtime({"runtime": task.get("runtime") or macp_config.get("default_runtime") or worker_config.get("runtime") or "codex"}) + if not str(task.get("worktree") or "").strip(): + task["worktree"] = str( + _default_worktree_path( + task, + repo_root, + str(macp_config.get("worktree_base") or "~/src/{repo}-worktrees"), + ) + ) + if not str(task.get("result_path") or "").strip(): + result_dir = str(macp_config.get("result_dir") or ".mosaic/orchestrator/results").strip() + if result_dir.startswith(".mosaic/orchestrator/"): + result_dir = result_dir[len(".mosaic/orchestrator/") :] + task["result_path"] = f"{result_dir.rstrip('/')}/{task.get('id', 'task')}.json" + + if task["dispatch"] == "acp": + task["status"] = "escalated" + task["failed_at"] = now_iso() + task["escalation_reason"] = "ACP dispatch requires OpenClaw integration (Phase 2)" + task["error"] = task["escalation_reason"] + task["_timed_out"] = False + return 1, task["escalation_reason"] + + worktree = setup_worktree(task, repo_root) + log_path = orch_dir / "logs" / f"{task.get('id', 'task')}.log" + timeout_sec = int(task.get("timeout_seconds") or worker_config.get("timeout_seconds") or 7200) + if task["dispatch"] == "yolo": + _stage_yolo_brief_file(task, repo_root, orch_dir) + try: + command = build_dispatch_command(task, repo_root) + exit_code, output, timed_out = _run_command(command, worktree, log_path, timeout_sec) + task["_timed_out"] = timed_out + if timed_out: + task["error"] = f"Worker command timed out after {timeout_sec}s" + elif exit_code != 0 and not task.get("error"): + task["error"] = f"Worker command failed with exit code {exit_code}" + return exit_code, output + finally: + brief_temp_path = str(task.pop("_brief_temp_path", "") or "").strip() + if brief_temp_path: + try: + pathlib.Path(brief_temp_path).unlink(missing_ok=True) + except OSError: + pass diff --git a/tools/orchestrator-matrix/protocol/event.schema.json b/tools/orchestrator-matrix/protocol/event.schema.json index 1d4d1d2..f493e42 100644 --- a/tools/orchestrator-matrix/protocol/event.schema.json +++ b/tools/orchestrator-matrix/protocol/event.schema.json @@ -22,8 +22,11 @@ "task.assigned", "task.started", "task.progress", + "task.gated", "task.completed", "task.failed", + "task.escalated", + "task.retry.scheduled", "rail.check.started", "rail.check.passed", "rail.check.failed" @@ -37,8 +40,10 @@ "enum": [ "pending", "running", + "gated", "completed", - "failed" + "failed", + "escalated" ] }, "timestamp": { @@ -50,7 +55,8 @@ "enum": [ "controller", "worker", - "quality-gate" + "quality-gate", + "dispatcher" ] }, "message": { diff --git a/tools/orchestrator-matrix/protocol/result.schema.json b/tools/orchestrator-matrix/protocol/result.schema.json new file mode 100644 index 0000000..a6a4786 --- /dev/null +++ b/tools/orchestrator-matrix/protocol/result.schema.json @@ -0,0 +1,119 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://mosaicstack.dev/schemas/orchestrator/result.schema.json", + "title": "MACP Task Result", + "type": "object", + "required": [ + "task_id", + "status", + "completed_at" + ], + "properties": { + "task_id": { + "type": "string" + }, + "status": { + "type": "string", + "enum": [ + "completed", + "failed", + "escalated" + ] + }, + "completed_at": { + "type": "string", + "format": "date-time" + }, + "failed_at": { + "type": [ + "string", + "null" + ], + "format": "date-time" + }, + "exit_code": { + "type": [ + "integer", + "null" + ] + }, + "attempt": { + "type": "integer" + }, + "max_attempts": { + "type": "integer" + }, + "runtime": { + "type": "string" + }, + "dispatch": { + "type": "string" + }, + "worktree": { + "type": [ + "string", + "null" + ] + }, + "branch": { + "type": [ + "string", + "null" + ] + }, + "pr": { + "type": [ + "string", + "null" + ] + }, + "summary": { + "type": "string", + "description": "Human-readable summary of what the worker did" + }, + "files_changed": { + "type": "array", + "items": { + "type": "string" + } + }, + "gate_results": { + "type": "array", + "items": { + "type": "object", + "properties": { + "command": { + "type": "string" + }, + "exit_code": { + "type": "integer" + }, + "type": { + "type": "string", + "enum": [ + "mechanical", + "ai-review", + "ci-pipeline" + ] + } + } + } + }, + "error": { + "type": [ + "string", + "null" + ] + }, + "escalation_reason": { + "type": [ + "string", + "null" + ] + }, + "metadata": { + "type": "object" + } + }, + "additionalProperties": true +} diff --git a/tools/orchestrator-matrix/protocol/task.schema.json b/tools/orchestrator-matrix/protocol/task.schema.json index af97a29..1600b18 100644 --- a/tools/orchestrator-matrix/protocol/task.schema.json +++ b/tools/orchestrator-matrix/protocol/task.schema.json @@ -23,14 +23,85 @@ "enum": [ "pending", "running", + "gated", "completed", - "failed" + "failed", + "escalated" ] }, + "type": { + "type": "string", + "enum": [ + "coding", + "deploy", + "research", + "review", + "documentation", + "infrastructure" + ], + "description": "Task type - determines dispatch strategy and gate requirements" + }, + "dispatch": { + "type": "string", + "enum": [ + "yolo", + "acp", + "exec" + ], + "description": "Execution backend: yolo=mosaic yolo (full system), acp=OpenClaw sessions_spawn (sandboxed), exec=direct shell" + }, "runtime": { "type": "string", "description": "Preferred worker runtime, e.g. codex, claude, opencode" }, + "worktree": { + "type": "string", + "description": "Path to git worktree for this task, e.g. ~/src/repo-worktrees/task-042" + }, + "branch": { + "type": "string", + "description": "Git branch name for this task" + }, + "brief_path": { + "type": "string", + "description": "Path to markdown task brief relative to repo root" + }, + "result_path": { + "type": "string", + "description": "Path to JSON result file relative to .mosaic/orchestrator/" + }, + "issue": { + "type": "string", + "description": "Issue reference (e.g. #42)" + }, + "pr": { + "type": [ + "string", + "null" + ], + "description": "PR number/URL once opened" + }, + "depends_on": { + "type": "array", + "items": { + "type": "string" + }, + "description": "List of task IDs this task depends on" + }, + "max_attempts": { + "type": "integer", + "minimum": 1, + "default": 1 + }, + "attempts": { + "type": "integer", + "minimum": 0, + "default": 0 + }, + "timeout_seconds": { + "type": "integer", + "description": "Override default timeout for this task" + }, "command": { "type": "string", "description": "Worker command to execute for this task"