From 7ef49a33d88c7c1f3493756d7138697e042a7cdc Mon Sep 17 00:00:00 2001 From: Jarvis Date: Fri, 27 Mar 2026 18:39:57 -0500 Subject: [PATCH] feat: implement MACP phase 1 core protocol --- bin/mosaic | 8 + bin/mosaic-macp | 212 +++++++++++ docs/DEVELOPER-GUIDE/README.md | 5 + .../orchestrator-matrix/macp-phase1.md | 31 ++ docs/PRD.md | 98 +++++ docs/SITEMAP.md | 7 + docs/TASKS.md | 17 + docs/scratchpads/macp-phase1.md | 80 ++++ docs/tasks/MACP-PHASE1-brief.md | 324 ++++++++++++++++ .../repo/.mosaic/orchestrator/config.json | 8 + tools/orchestrator-matrix/README.md | 25 ++ .../controller/mosaic_orchestrator.py | 183 ++++++--- .../controller/tasks_md_sync.py | 22 +- .../dispatcher/__init__.py | 15 + .../macp_dispatcher.cpython-312.pyc | Bin 0 -> 20667 bytes .../dispatcher/macp_dispatcher.py | 357 ++++++++++++++++++ .../protocol/event.schema.json | 10 +- .../protocol/result.schema.json | 119 ++++++ .../protocol/task.schema.json | 73 +++- 19 files changed, 1546 insertions(+), 48 deletions(-) create mode 100755 bin/mosaic-macp create mode 100644 docs/DEVELOPER-GUIDE/README.md create mode 100644 docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase1.md create mode 100644 docs/PRD.md create mode 100644 docs/SITEMAP.md create mode 100644 docs/TASKS.md create mode 100644 docs/scratchpads/macp-phase1.md create mode 100644 docs/tasks/MACP-PHASE1-brief.md create mode 100644 tools/orchestrator-matrix/dispatcher/__init__.py create mode 100644 tools/orchestrator-matrix/dispatcher/__pycache__/macp_dispatcher.cpython-312.pyc create mode 100644 tools/orchestrator-matrix/dispatcher/macp_dispatcher.py create mode 100644 tools/orchestrator-matrix/protocol/result.schema.json diff --git a/bin/mosaic b/bin/mosaic index 9276bf5..3cd450b 100755 --- a/bin/mosaic +++ b/bin/mosaic @@ -15,6 +15,7 @@ set -euo pipefail # mosaic init [args...] Generate SOUL.md interactively # mosaic doctor [args...] Health audit # mosaic sync [args...] Sync skills +# mosaic macp [args...] Manual MACP queue operations # mosaic seq [subcommand] sequential-thinking MCP management (check/fix/start) # mosaic bootstrap Bootstrap a repo # mosaic upgrade release Upgrade installed Mosaic release @@ -41,6 +42,7 @@ Management: init [args...] Generate SOUL.md (agent identity contract) doctor [args...] Audit runtime state and detect drift sync [args...] Sync skills from canonical source + macp [args...] Manual MACP queue operations seq [subcommand] sequential-thinking MCP management: check [--runtime ] [--strict] fix [--runtime ] @@ -487,6 +489,11 @@ run_sync() { exec "$MOSAIC_HOME/bin/mosaic-sync-skills" "$@" } +run_macp() { + check_mosaic_home + exec "$MOSAIC_HOME/bin/mosaic-macp" "$@" +} + run_seq() { check_mosaic_home local checker="$MOSAIC_HOME/bin/mosaic-ensure-sequential-thinking" @@ -774,6 +781,7 @@ case "$command" in init) run_init "$@" ;; doctor) run_doctor "$@" ;; sync) run_sync "$@" ;; + macp) run_macp "$@" ;; seq) run_seq "$@" ;; bootstrap) run_bootstrap "$@" ;; prdy) run_prdy "$@" ;; diff --git a/bin/mosaic-macp b/bin/mosaic-macp new file mode 100755 index 0000000..413ab55 --- /dev/null +++ b/bin/mosaic-macp @@ -0,0 +1,212 @@ +#!/usr/bin/env bash +set -euo pipefail + +repo_root="$(pwd)" +orch_dir="$repo_root/.mosaic/orchestrator" +config_path="$orch_dir/config.json" +tasks_path="$orch_dir/tasks.json" +events_path="$orch_dir/events.ndjson" + +usage() { + cat <<'USAGE' +mosaic macp — manual MACP queue operations + +Usage: + mosaic macp submit --task-id TASK-001 --title "..." [--type coding] [--dispatch yolo|acp|exec] [--runtime codex] [--brief docs/tasks/TASK-001.md] [--command "..."] [--branch feat/...] + mosaic macp status [--task-id TASK-001] + mosaic macp drain + mosaic macp history [--task-id TASK-001] +USAGE +} + +require_repo() { + if [[ ! -f "$config_path" ]]; then + echo "[mosaic-macp] missing orchestrator config: $config_path" >&2 + exit 1 + fi +} + +submit_task() { + require_repo + + local task_id="" + local title="" + local task_type="coding" + local dispatch="" + local runtime="" + local brief="" + local command="" + local branch="" + + while [[ $# -gt 0 ]]; do + case "$1" in + --task-id) task_id="$2"; shift 2 ;; + --title) title="$2"; shift 2 ;; + --type) task_type="$2"; shift 2 ;; + --dispatch) dispatch="$2"; shift 2 ;; + --runtime) runtime="$2"; shift 2 ;; + --brief) brief="$2"; shift 2 ;; + --command) command="$2"; shift 2 ;; + --branch) branch="$2"; shift 2 ;; + -h|--help) usage; exit 0 ;; + *) echo "[mosaic-macp] unknown submit option: $1" >&2; exit 1 ;; + esac + done + + if [[ -z "$task_id" || -z "$title" ]]; then + echo "[mosaic-macp] submit requires --task-id and --title" >&2 + exit 1 + fi + + python3 - "$config_path" "$tasks_path" "$task_id" "$title" "$task_type" "$dispatch" "$runtime" "$brief" "$command" "$branch" <<'PY' +import json +import pathlib +import sys + +config_path = pathlib.Path(sys.argv[1]) +tasks_path = pathlib.Path(sys.argv[2]) +task_id, title, task_type, dispatch, runtime, brief, command, branch = sys.argv[3:] + +config = json.loads(config_path.read_text(encoding="utf-8")) +macp = dict(config.get("macp") or {}) +worker = dict(config.get("worker") or {}) +payload = {"tasks": []} +if tasks_path.exists(): + payload = json.loads(tasks_path.read_text(encoding="utf-8")) +if not isinstance(payload.get("tasks"), list): + payload = {"tasks": []} + +resolved_dispatch = dispatch or ("yolo" if brief and not command else str(macp.get("default_dispatch") or "exec")) +resolved_runtime = runtime or str(macp.get("default_runtime") or worker.get("runtime") or "codex") +if resolved_dispatch == "exec" and not command: + raise SystemExit("[mosaic-macp] exec dispatch requires --command") + +task = { + "id": task_id, + "title": title, + "description": title, + "status": "pending", + "type": task_type or "coding", + "dispatch": resolved_dispatch, + "runtime": resolved_runtime, + "branch": branch or "", + "brief_path": brief or "", + "command": command or "", + "quality_gates": config.get("quality_gates") or [], + "metadata": {"source": "bin/mosaic-macp"}, +} + +updated = [] +replaced = False +for existing in payload["tasks"]: + if str(existing.get("id")) == task_id: + merged = dict(existing) + merged.update({k: v for k, v in task.items() if v not in ("", [])}) + updated.append(merged) + replaced = True + else: + updated.append(existing) +if not replaced: + updated.append(task) +payload["tasks"] = updated +tasks_path.parent.mkdir(parents=True, exist_ok=True) +tasks_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") +print(f"[mosaic-macp] queued {task_id} dispatch={resolved_dispatch} runtime={resolved_runtime}") +PY +} + +status_tasks() { + require_repo + local task_id="" + while [[ $# -gt 0 ]]; do + case "$1" in + --task-id) task_id="$2"; shift 2 ;; + -h|--help) usage; exit 0 ;; + *) echo "[mosaic-macp] unknown status option: $1" >&2; exit 1 ;; + esac + done + + python3 - "$tasks_path" "$task_id" <<'PY' +import json +import pathlib +import sys + +tasks_path = pathlib.Path(sys.argv[1]) +task_id = sys.argv[2] +payload = {"tasks": []} +if tasks_path.exists(): + payload = json.loads(tasks_path.read_text(encoding="utf-8")) +tasks = payload.get("tasks", []) +counts = {key: 0 for key in ["pending", "running", "gated", "completed", "failed", "escalated"]} +for task in tasks: + status = str(task.get("status") or "pending") + counts[status] = counts.get(status, 0) + 1 + +if task_id: + for task in tasks: + if str(task.get("id")) == task_id: + print(json.dumps(task, indent=2)) + break + else: + raise SystemExit(f"[mosaic-macp] task not found: {task_id}") +else: + print("Queue state:") + for key in ["pending", "running", "gated", "completed", "failed", "escalated"]: + print(f" {key}: {counts.get(key, 0)}") +PY +} + +drain_tasks() { + require_repo + local mosaic_home="${MOSAIC_HOME:-$HOME/.config/mosaic}" + exec "$mosaic_home/bin/mosaic-orchestrator-run" --repo "$repo_root" --until-drained +} + +history_tasks() { + require_repo + local task_id="" + while [[ $# -gt 0 ]]; do + case "$1" in + --task-id) task_id="$2"; shift 2 ;; + -h|--help) usage; exit 0 ;; + *) echo "[mosaic-macp] unknown history option: $1" >&2; exit 1 ;; + esac + done + + python3 - "$events_path" "$task_id" <<'PY' +import json +import pathlib +import sys + +events_path = pathlib.Path(sys.argv[1]) +task_id = sys.argv[2] +if not events_path.exists(): + raise SystemExit(f"[mosaic-macp] events file not found: {events_path}") + +for line in events_path.read_text(encoding="utf-8").splitlines(): + if not line.strip(): + continue + event = json.loads(line) + if task_id and str(event.get("task_id")) != task_id: + continue + print(json.dumps(event, indent=2)) +PY +} + +subcommand="${1:-help}" +if [[ $# -gt 0 ]]; then + shift +fi + +case "$subcommand" in + submit) submit_task "$@" ;; + status) status_tasks "$@" ;; + drain) drain_tasks "$@" ;; + history) history_tasks "$@" ;; + help|-h|--help|"") usage ;; + *) + echo "[mosaic-macp] unknown subcommand: $subcommand" >&2 + usage + exit 1 + ;; +esac diff --git a/docs/DEVELOPER-GUIDE/README.md b/docs/DEVELOPER-GUIDE/README.md new file mode 100644 index 0000000..148140e --- /dev/null +++ b/docs/DEVELOPER-GUIDE/README.md @@ -0,0 +1,5 @@ +# Developer Guide + +## Orchestrator Matrix + +- [MACP Phase 1](./orchestrator-matrix/macp-phase1.md) diff --git a/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase1.md b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase1.md new file mode 100644 index 0000000..4a081ff --- /dev/null +++ b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase1.md @@ -0,0 +1,31 @@ +# MACP Phase 1 + +MACP Phase 1 extends `tools/orchestrator-matrix/` without replacing the existing deterministic controller model. + +## What Changed + +1. Task and event schemas now describe MACP dispatch metadata, new lifecycle statuses, and dispatcher-originated events. +2. `tools/orchestrator-matrix/dispatcher/macp_dispatcher.py` manages task worktrees, dispatch command generation, result files, and cleanup. +3. `mosaic_orchestrator.py` routes MACP-aware tasks through the dispatcher while leaving legacy non-`dispatch` tasks on the original shell path. +4. `bin/mosaic-macp` adds manual submit, status, drain, and history operations for `.mosaic/orchestrator/`. + +## Dispatch Modes + +1. `exec`: runs the task's `command` directly inside the task worktree. +2. `yolo`: launches `mosaic yolo ` with the task brief content via a PTY wrapper. +3. `acp`: emits the config payload a caller can hand to an ACP/OpenClaw session spawner. + +## Result Contract + +MACP writes task result JSON under `.mosaic/orchestrator/results/` by default. Result files capture: + +1. Task status and timing +2. Attempt counters +3. Runtime and dispatch metadata +4. Changed files seen in the task worktree +5. Quality-gate command results +6. Error or escalation details + +## Compatibility + +Legacy tasks that omit `dispatch` still behave like the original matrix controller. This keeps existing `tasks.json` workflows functional while allowing orchestrators to opt into MACP incrementally. diff --git a/docs/PRD.md b/docs/PRD.md new file mode 100644 index 0000000..2359e17 --- /dev/null +++ b/docs/PRD.md @@ -0,0 +1,98 @@ +# PRD: MACP Phase 1 Core Protocol Implementation + +## Metadata + +- Owner: Jarvis +- Date: 2026-03-27 +- Status: in-progress +- Best-Guess Mode: true + +## Problem Statement + +The current orchestrator-matrix rail can queue shell-based worker tasks, but it does not yet expose a standardized protocol for dispatch selection, worktree-aware execution, structured results, or manual MACP queue operations. MACP Phase 1 extends the existing rail so orchestrators can delegate to multiple runtimes through a consistent task model while preserving current behavior for legacy tasks. + +## Objectives + +1. Extend the existing orchestrator-matrix protocol and controller to support MACP-aware task dispatch and status tracking. +2. Add a dispatcher layer that manages worktree lifecycle, runtime command generation, and standardized results. +3. Provide a CLI entrypoint for manual MACP submission, status inspection, queue draining, and history review. + +## Scope + +### In Scope + +1. Extend the orchestrator task and event schemas and add a result schema. +2. Add a Python dispatcher module under `tools/orchestrator-matrix/dispatcher/`. +3. Update the controller to use the dispatcher for MACP-aware tasks while preserving legacy execution paths. +4. Update orchestrator config templates, task markdown sync logic, and CLI routing/scripts for MACP commands. +5. Add verification for backward compatibility, schema validity, imports, and basic MACP execution flow. + +### Out of Scope + +1. Rewriting the orchestrator controller architecture. +2. Changing Matrix transport behavior beyond schema compatibility. +3. Implementing real OpenClaw `sessions_spawn` execution beyond producing the config payload/command for callers. +4. Adding non-stdlib Python dependencies or npm-based tooling. + +## User/Stakeholder Requirements + +1. MACP must evolve the current orchestrator-matrix implementation rather than replace it. +2. Legacy task queues without `dispatch` fields must continue to run exactly as before. +3. MACP-aware tasks must support dispatch modes `yolo`, `acp`, and `exec`. +4. Results must be written in a structured JSON format suitable for audit and orchestration follow-up. +5. A manual `mosaic macp` CLI must expose submit, status, drain, and history flows. + +## Functional Requirements + +1. Task schema must include MACP dispatch, worktree, result, retry, branch, brief, issue/PR, and dependency fields. +2. Event schema must recognize `task.gated`, `task.escalated`, and `task.retry.scheduled`, plus a `dispatcher` source. +3. Dispatcher functions must set up worktrees, build commands, execute tasks, collect results, and clean up worktrees. +4. Controller `run_single_task()` must route MACP-aware tasks through the dispatcher and emit the correct lifecycle events/status transitions. +5. `tasks_md_sync.py` must map optional MACP table columns when present and otherwise apply config defaults. +6. `bin/mosaic` must route `mosaic macp ...` to a new `bin/mosaic-macp` script. + +## Non-Functional Requirements + +1. Security: no secrets embedded in generated commands, config, or results. +2. Performance: controller remains deterministic and synchronous with no async or thread-based orchestration. +3. Reliability: worktree creation/cleanup failures must be surfaced predictably and produce structured task failure/escalation states. +4. Observability: lifecycle events, logs, and result JSON must clearly show task outcome, attempts, gates, and errors. + +## Acceptance Criteria + +1. Existing legacy tasks without `dispatch` still run through the old shell path with unchanged behavior. +2. MACP-aware `exec` tasks run through the dispatcher and produce result JSON with gate outcomes. +3. New schemas validate task/event/result payload expectations for MACP fields and statuses. +4. `mosaic macp submit`, `status`, and `history` work from a bootstrapped repo state, and `drain` delegates to the existing orchestrator runner. +5. Python imports for the updated controller, dispatcher, and sync code complete without errors on Python 3.10+. + +## Constraints and Dependencies + +1. Python implementation must use stdlib only and support Python 3.10+. +2. Shell tooling must remain bash-based and fit the existing Mosaic CLI style. +3. Dispatch fallback rules must use `exec` when `dispatch` is absent and config/default runtime when `runtime` is absent. +4. Worktree convention must derive from the repository name and task metadata unless explicitly overridden by task fields. + +## Risks and Open Questions + +1. Risk: yolo command execution requires a PTY, so the dispatcher needs a safe wrapper that still behaves under `subprocess`. +2. Risk: worktree cleanup could remove a path unexpectedly if task metadata is malformed. +3. Risk: old queue consumers may assume only the original task statuses and event types. +4. Open Question: whether `task.gated` should be emitted by the dispatcher or controller once worker execution ends and quality gates begin. + +## Testing and Verification Expectations + +1. Baseline checks: Python import validation, targeted script execution checks, JSON syntax/schema validation, and any repo-local validation applicable to changed code paths. +2. Situational testing: legacy orchestrator run with old-style tasks, MACP `exec` flow including result file generation, CLI submit/status/history behavior, and worktree lifecycle validation. +3. Evidence format: command-level results captured in the scratchpad and summarized in the final delivery report. + +## Milestone / Delivery Intent + +1. Target milestone/version: 0.0.x bootstrap enhancement +2. Definition of done: code merged to `main`, CI terminal green, issue `#8` closed, and verification evidence recorded against all acceptance criteria. + +## Assumptions + +1. ASSUMPTION: A single issue can track the full Phase 1 implementation because the user requested one bounded feature delivery rather than separate independent tickets. +2. ASSUMPTION: For `acp` dispatch, generating the config/payload and returning it as dispatcher output is sufficient for Phase 1 because the brief explicitly says the caller will use it with OpenClaw. +3. ASSUMPTION: `task.gated` should be emitted by the controller as the transition into quality-gate execution, which keeps gate-state ownership in one place alongside the existing gate loop. diff --git a/docs/SITEMAP.md b/docs/SITEMAP.md new file mode 100644 index 0000000..c4e8b1c --- /dev/null +++ b/docs/SITEMAP.md @@ -0,0 +1,7 @@ +# Sitemap + +- [PRD](./PRD.md) +- [Tasks](./TASKS.md) +- [Developer Guide](./DEVELOPER-GUIDE/README.md) +- [Task Briefs](./tasks/MACP-PHASE1-brief.md) +- [Scratchpads](./scratchpads/macp-phase1.md) diff --git a/docs/TASKS.md b/docs/TASKS.md new file mode 100644 index 0000000..36a68b9 --- /dev/null +++ b/docs/TASKS.md @@ -0,0 +1,17 @@ +# TASKS + +Canonical tracking for active work. Keep this file current. + +## Rules + +1. Update status as work progresses. +2. Link every non-trivial task to a provider issue (`#123`) or internal ref (`TASKS:T1`) if no provider is available. +3. Keep one row per active task. +4. Do not set `status=done` for source-code work until PR is merged, CI/pipeline is terminal green, and linked issue/ref is closed. +5. If merge/CI/issue closure fails, set `status=blocked` and record the exact failed wrapper command in `notes`. + +## Tasks + +| id | status | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | +|---|---|---|---|---|---|---|---|---|---|---|---|---|---| +| MACP-PHASE1 | done | Implement MACP Phase 1 across orchestrator schemas, dispatcher, controller, CLI, config, and task sync while preserving legacy queue behavior. | #8 | bootstrap | feat/macp-phase1 | | | Jarvis | 2026-03-27T23:00:00Z | 2026-03-27T23:45:00Z | medium | completed | Implementation, verification, and review completed; awaiting PR merge, green CI, and issue closure in delivery flow. | diff --git a/docs/scratchpads/macp-phase1.md b/docs/scratchpads/macp-phase1.md new file mode 100644 index 0000000..383b458 --- /dev/null +++ b/docs/scratchpads/macp-phase1.md @@ -0,0 +1,80 @@ +# MACP Phase 1 Scratchpad + +## Objective + +Implement MACP Phase 1 in `mosaic-bootstrap` by extending the orchestrator-matrix protocol, adding a dispatcher layer, wiring the controller, updating sync/config/CLI behavior, and preserving backward compatibility for legacy tasks. + +## Tracking + +- Task ID: `MACP-PHASE1` +- Issue: `#8` +- Branch: `feat/macp-phase1` +- Repo: `bootstrap` + +## Budget + +- User hard token budget: not specified +- Working assumption: complete in one implementation pass with focused verification and no unnecessary parallelization +- Budget response plan: keep scope limited to the requested file map and verification set + +## Plan + +1. Extend task and event schemas and add the MACP result schema. +2. Build `dispatcher/macp_dispatcher.py` with isolated helpers for worktree, command, result, and cleanup logic. +3. Wire the controller to detect MACP-aware tasks, emit gated/escalated events, and preserve the legacy path for tasks without `dispatch`. +4. Add MACP defaults to config, add `mosaic macp` routing and shell script support, and extend `tasks_md_sync.py`. +5. Run backward-compatibility and MACP verification, then review/remediate before commit and PR flow. + +## Assumptions + +1. `dispatch` presence is the MACP-aware signal for the controller, matching the user brief. +2. `docs/tasks/MACP-PHASE1-brief.md` is an input artifact for this delivery and should remain available for reference. +3. A minimal developer-facing documentation update tied to the orchestrator README/CLI behavior will satisfy the documentation gate for this change surface. + +## Risks + +1. Worktree cleanup must be constrained to task-owned paths only. +2. Gate-state transitions need to remain compatible with current event consumers. +3. CLI submit behavior must avoid corrupting existing `tasks.json` structure. + +## Verification Plan + +| Acceptance Criterion | Verification Method | Evidence | +|---|---|---| +| AC-1 legacy tasks still work | Run controller against old-style tasks without `dispatch` | Passed: temp repo run completed legacy task and wrote completed result JSON | +| AC-2 MACP `exec` flow works | Run controller against MACP task queue and inspect result JSON/events/worktree cleanup | Passed: temp repo run produced `task.gated` and `task.completed`, wrote structured result JSON with `files_changed`, and removed the task worktree | +| AC-3 schemas are valid | Validate schema JSON loads and sample payloads parse/import cleanly | Passed: `jsonschema.validate(...)` succeeded for task, event, and result schema samples | +| AC-4 CLI works | Run `mosaic macp submit/status/history` against a test repo state | Passed: `bin/mosaic-macp` submit/status/history succeeded and `bin/mosaic --help` exposed `macp` | +| AC-5 imports are clean | Run `python3 -c "import ..."` for updated modules | Passed: importlib-based module loads and `py_compile` succeeded | + +## Progress Log + +- 2026-03-27: Loaded global/project/runtime contracts and required delivery guides. +- 2026-03-27: Created provider issue `#8`. +- 2026-03-27: Drafted PRD, task tracking, and this scratchpad before implementation. +- 2026-03-27: Implemented MACP schemas, dispatcher lifecycle, controller integration, CLI support, sync updates, and developer docs. +- 2026-03-27: Added explicit worker escalation handling via the `MACP_ESCALATE:` stdout marker. + +## Tests Run + +- `python3 -m py_compile tools/orchestrator-matrix/controller/mosaic_orchestrator.py tools/orchestrator-matrix/controller/tasks_md_sync.py tools/orchestrator-matrix/dispatcher/macp_dispatcher.py` +- importlib module-load check for updated Python files +- `jsonschema.validate(...)` for task/event/result samples +- Temp repo legacy controller run with old-style `tasks.json` +- Temp repo MACP `exec` controller run with worktree/result/cleanup verification +- Temp repo CLI checks using `bin/mosaic-macp submit`, `status`, and `history` +- Temp repo `tasks_md_sync.py --apply` run with MACP columns +- `git diff --check` +- `bash -n bin/mosaic bin/mosaic-macp` + +## Review + +- Automated review attempted with `~/.config/mosaic/tools/codex/codex-code-review.sh --uncommitted` +- Initial review attempt failed because generated `__pycache__` files introduced non-UTF-8 diff content; generated caches were removed +- Manual independent review completed on the dispatcher/controller/CLI delta +- Remediation applied: final MACP results are now written before worktree cleanup, and explicit worker escalation is handled instead of being ignored + +## Residual Risks + +- The repo-local `bin/mosaic` route was verified through help output; full runtime routing depends on installation into `$MOSAIC_HOME/bin`. +- Explicit worker escalation currently uses a stdout marker convention rather than a richer structured worker-to-controller handoff. diff --git a/docs/tasks/MACP-PHASE1-brief.md b/docs/tasks/MACP-PHASE1-brief.md new file mode 100644 index 0000000..d39cb22 --- /dev/null +++ b/docs/tasks/MACP-PHASE1-brief.md @@ -0,0 +1,324 @@ +# MACP Phase 1 — Core Protocol Implementation + +**Branch:** `feat/macp-phase1` +**Repo:** `mosaic-bootstrap` (worktree at `~/src/mosaic-bootstrap-worktrees/macp-phase1`) + +--- + +## Objective + +Extend the existing orchestrator-matrix into **MACP (Mosaic Agent Coordination Protocol)** — a standardized protocol that lets any orchestrator delegate work to any agent runtime, track progress via quality gates, and collect structured results. + +This is an **evolution** of the existing code, not a rewrite. Extend existing schemas, enhance the controller, and add the dispatcher layer. + +--- + +## Task 1: Extend Task Schema (`tools/orchestrator-matrix/protocol/task.schema.json`) + +Add these new fields to the existing schema: + +```json +{ + "type": { + "type": "string", + "enum": ["coding", "deploy", "research", "review", "documentation", "infrastructure"], + "description": "Task type — determines dispatch strategy and gate requirements" + }, + "dispatch": { + "type": "string", + "enum": ["yolo", "acp", "exec"], + "description": "Execution backend: yolo=mosaic yolo (full system), acp=OpenClaw sessions_spawn (sandboxed), exec=direct shell" + }, + "worktree": { + "type": "string", + "description": "Path to git worktree for this task, e.g. ~/src/repo-worktrees/task-042" + }, + "branch": { + "type": "string", + "description": "Git branch name for this task" + }, + "brief_path": { + "type": "string", + "description": "Path to markdown task brief relative to repo root" + }, + "result_path": { + "type": "string", + "description": "Path to JSON result file relative to .mosaic/orchestrator/" + }, + "issue": { + "type": "string", + "description": "Issue reference (e.g. #42)" + }, + "pr": { + "type": ["string", "null"], + "description": "PR number/URL once opened" + }, + "depends_on": { + "type": "array", + "items": { "type": "string" }, + "description": "List of task IDs this task depends on" + }, + "max_attempts": { + "type": "integer", + "minimum": 1, + "default": 1 + }, + "attempts": { + "type": "integer", + "minimum": 0, + "default": 0 + }, + "timeout_seconds": { + "type": "integer", + "description": "Override default timeout for this task" + } +} +``` + +Extend the `status` enum to include: `"pending"`, `"running"`, `"gated"`, `"completed"`, `"failed"`, `"escalated"` + +Keep all existing fields. Keep `additionalProperties: true`. + +--- + +## Task 2: Extend Event Schema (`tools/orchestrator-matrix/protocol/event.schema.json`) + +Add new event types to the enum: +- `task.gated` — Worker done coding, quality gates now running +- `task.escalated` — Requires human intervention +- `task.retry.scheduled` — Task will be retried (already emitted by controller, just not in schema) + +Add new source type: `"dispatcher"` + +Keep all existing event types and fields. + +--- + +## Task 3: Create Result Schema (`tools/orchestrator-matrix/protocol/result.schema.json`) + +New file — standardized worker completion report: + +```json +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://mosaicstack.dev/schemas/orchestrator/result.schema.json", + "title": "MACP Task Result", + "type": "object", + "required": ["task_id", "status", "completed_at"], + "properties": { + "task_id": { "type": "string" }, + "status": { "type": "string", "enum": ["completed", "failed", "escalated"] }, + "completed_at": { "type": "string", "format": "date-time" }, + "failed_at": { "type": ["string", "null"], "format": "date-time" }, + "exit_code": { "type": ["integer", "null"] }, + "attempt": { "type": "integer" }, + "max_attempts": { "type": "integer" }, + "runtime": { "type": "string" }, + "dispatch": { "type": "string" }, + "worktree": { "type": ["string", "null"] }, + "branch": { "type": ["string", "null"] }, + "pr": { "type": ["string", "null"] }, + "summary": { "type": "string", "description": "Human-readable summary of what the worker did" }, + "files_changed": { "type": "array", "items": { "type": "string" } }, + "gate_results": { + "type": "array", + "items": { + "type": "object", + "properties": { + "command": { "type": "string" }, + "exit_code": { "type": "integer" }, + "type": { "type": "string", "enum": ["mechanical", "ai-review", "ci-pipeline"] } + } + } + }, + "error": { "type": ["string", "null"] }, + "escalation_reason": { "type": ["string", "null"] }, + "metadata": { "type": "object" } + } +} +``` + +--- + +## Task 4: Build MACP Dispatcher (`tools/orchestrator-matrix/dispatcher/macp_dispatcher.py`) + +New Python module — the core of MACP. This translates task specs into execution commands and manages the full lifecycle. + +### Dispatcher Responsibilities: +1. **Worktree setup** — `git worktree add` before dispatch (if task has `worktree` field) +2. **Command generation** — Build the right command based on `dispatch` type: + - `yolo`: `mosaic yolo codex|claude|opencode ""` (reads from `brief_path`) + - `acp`: Generates a command that would be used with OpenClaw sessions_spawn (just outputs the config JSON for the caller) + - `exec`: Direct shell command from task `command` field +3. **Result collection** — After worker exits, write structured result JSON to `results/.json` +4. **Worktree cleanup** — After task completion (success or failure after max retries), remove the worktree + +### Key Functions: + +```python +def setup_worktree(task: dict, repo_root: Path) -> Path: + """Create git worktree for task. Returns worktree path.""" + +def build_dispatch_command(task: dict, repo_root: Path) -> str: + """Generate execution command based on task dispatch type and runtime.""" + +def collect_result(task: dict, exit_code: int, gate_results: list, orch_dir: Path) -> dict: + """Build standardized result JSON after worker completion.""" + +def cleanup_worktree(task: dict) -> None: + """Remove git worktree after task is done.""" + +def dispatch_task(task: dict, repo_root: Path, orch_dir: Path, config: dict) -> tuple[int, str]: + """Full dispatch lifecycle: setup → execute → collect → cleanup. Returns (exit_code, output).""" +``` + +### Worktree Convention: +- Base path: `~/src/-worktrees/` +- Worktree name: task ID slugified (e.g., `task-042-auth`) +- Branch: `feat/-` (or use task `branch` field if specified) +- Created from: `origin/main` + +### `mosaic yolo` Command Pattern: +The dispatcher needs to invoke `mosaic yolo` via PTY (it requires a terminal). The command pattern: +```bash +export PATH="$HOME/.config/mosaic/bin:$PATH" +cd +mosaic yolo codex "" +``` + +For the dispatcher, since it's called from the controller which uses `subprocess.Popen`, the command should be wrapped appropriately. Use `script -qec` or similar for PTY simulation if needed, but prefer passing the brief via a temp file and using the worker script pattern. + +### Important Constraints: +- The dispatcher is a **library module** imported by the controller — NOT a standalone script +- It should be testable (pure functions where possible, side effects isolated) +- Error handling: if worktree creation fails, task goes to `failed` immediately +- If `dispatch` is not set on a task, fall back to `exec` (backward compatible) +- If `runtime` is not set, fall back to config's `worker.runtime` + +--- + +## Task 5: Wire Controller to Use Dispatcher (`tools/orchestrator-matrix/controller/mosaic_orchestrator.py`) + +Modify the existing controller to use the new dispatcher: + +1. **Import the dispatcher module** +2. **In `run_single_task()`**: + - If task has `dispatch` field (or is MACP-aware), call `dispatcher.dispatch_task()` instead of raw `run_shell()` + - If task does NOT have `dispatch` field, keep existing `run_shell()` behavior (backward compatibility) +3. **Handle new statuses**: `gated` and `escalated` + - `gated`: Set when worker completes but gates are running (transition state between running and completed/failed) + - `escalated`: Set when task needs human intervention (gate failures after max retries, explicit worker escalation) +4. **Emit new event types**: `task.gated` and `task.escalated` + +### Backward Compatibility is Critical: +- Existing tasks.json files with no `dispatch` field MUST continue to work exactly as before +- The controller should detect MACP-aware tasks by checking for the `dispatch` field +- All existing tests/workflows must remain functional + +--- + +## Task 6: Update Config Template (`templates/repo/.mosaic/orchestrator/config.json`) + +Add MACP dispatcher configuration: + +```json +{ + "enabled": false, + "transport": "matrix", + "macp": { + "worktree_base": "~/src/{repo}-worktrees", + "default_dispatch": "exec", + "default_runtime": "codex", + "cleanup_worktrees": true, + "brief_dir": "docs/tasks", + "result_dir": ".mosaic/orchestrator/results" + }, + "worker": { ... existing ... }, + "quality_gates": [ ... existing ... ] +} +``` + +--- + +## Task 7: Build `mosaic macp` CLI (`bin/mosaic-macp`) + +Shell script that provides manual MACP interaction: + +```bash +mosaic macp submit --task-id TASK-001 --title "..." --type coding --runtime codex --brief docs/tasks/TASK-001.md +mosaic macp status [--task-id TASK-001] +mosaic macp drain # Run all pending tasks sequentially +mosaic macp history [--task-id TASK-001] # Show events for a task +``` + +Implementation: +- `submit`: Adds a task to `tasks.json` with MACP fields populated +- `status`: Pretty-prints task queue state (pending/running/gated/completed/failed/escalated counts) +- `drain`: Calls `mosaic-orchestrator-run --until-drained` +- `history`: Reads `events.ndjson` and filters by task ID + +Register in the main `bin/mosaic` dispatcher so `mosaic macp ...` works. + +--- + +## Task 8: Update `tasks_md_sync.py` + +Extend the sync script to handle MACP-specific columns in `docs/TASKS.md`: + +Optional new columns: `type`, `dispatch`, `runtime`, `branch` + +If these columns exist in the markdown table, map them to the task JSON. If they don't exist, use defaults from config. + +--- + +## Verification + +After all tasks are complete: + +1. **Existing behavior preserved**: Run `python3 tools/orchestrator-matrix/controller/mosaic_orchestrator.py --repo /tmp/test-repo --once` with an old-style tasks.json — must work identically +2. **New MACP flow works**: Create a tasks.json with `dispatch: "exec"` tasks, run controller, verify worktree creation, execution, result collection, and cleanup +3. **Schema validation**: All JSON files should validate against their schemas +4. **CLI works**: `mosaic macp status` and `mosaic macp submit` produce correct output +5. **No broken imports**: All Python files should pass `python3 -c "import ..."` + +--- + +## File Map (what goes where) + +``` +tools/orchestrator-matrix/ +├── protocol/ +│ ├── task.schema.json ← MODIFY (extend) +│ ├── event.schema.json ← MODIFY (extend) +│ └── result.schema.json ← NEW +├── dispatcher/ +│ ├── __init__.py ← NEW +│ └── macp_dispatcher.py ← NEW +├── controller/ +│ ├── mosaic_orchestrator.py ← MODIFY (wire dispatcher) +│ └── tasks_md_sync.py ← MODIFY (new columns) +├── transport/ +│ └── matrix_transport.py ← UNCHANGED +└── adapters/ + └── README.md ← UNCHANGED + +templates/repo/.mosaic/orchestrator/ +├── config.json ← MODIFY (add macp section) +├── tasks.json ← UNCHANGED +└── ... + +bin/ +├── mosaic ← MODIFY (add macp subcommand routing) +└── mosaic-macp ← NEW +``` + +--- + +## Ground Rules + +- This is the `mosaic-bootstrap` repo — a CLI framework, NOT a web app +- No npm/pnpm — this is bash + Python (stdlib only, no pip dependencies) +- All Python must work with Python 3.10+ +- Keep the deterministic controller pattern — no async, no threads, no external services +- Commit messages: `feat: ` (conventional commits) +- Push to `feat/macp-phase1` branch when done diff --git a/templates/repo/.mosaic/orchestrator/config.json b/templates/repo/.mosaic/orchestrator/config.json index d43fd05..e6ae0e6 100644 --- a/templates/repo/.mosaic/orchestrator/config.json +++ b/templates/repo/.mosaic/orchestrator/config.json @@ -1,6 +1,14 @@ { "enabled": false, "transport": "matrix", + "macp": { + "worktree_base": "~/src/{repo}-worktrees", + "default_dispatch": "exec", + "default_runtime": "codex", + "cleanup_worktrees": true, + "brief_dir": "docs/tasks", + "result_dir": ".mosaic/orchestrator/results" + }, "matrix": { "control_room_id": "", "workspace_id": "", diff --git a/tools/orchestrator-matrix/README.md b/tools/orchestrator-matrix/README.md index 68698bc..8aab396 100644 --- a/tools/orchestrator-matrix/README.md +++ b/tools/orchestrator-matrix/README.md @@ -13,6 +13,7 @@ mechanical quality gates. ## Components - `protocol/` - JSON schemas for task/event payloads +- `dispatcher/` - MACP dispatch helpers for worktrees, command generation, results, and cleanup - `controller/mosaic_orchestrator.py` - deterministic controller loop - `adapters/` - runtime adapter guidance @@ -83,3 +84,27 @@ Task injection message format (room text): ```text !mosaic-task {"id":"TASK-123","title":"Fix bug","command":"echo run","quality_gates":["pnpm lint"]} ``` + +## MACP Notes + +MACP-aware tasks add dispatch metadata on top of the existing queue model: + +- `dispatch`: `exec`, `yolo`, or `acp` +- `type`: task category used for orchestration intent +- `worktree` / `branch`: task-specific git execution context +- `brief_path`: markdown brief consumed by runtime-backed dispatchers +- `result_path`: structured result JSON written under `.mosaic/orchestrator/` + +Controller behavior remains backward compatible: + +- Tasks without `dispatch` continue through the legacy shell execution path. +- Tasks with `dispatch` use the MACP dispatcher and can emit `task.gated` and `task.escalated`. + +Manual queue operations are exposed through: + +```bash +mosaic macp submit ... +mosaic macp status +mosaic macp drain +mosaic macp history --task-id TASK-001 +``` diff --git a/tools/orchestrator-matrix/controller/mosaic_orchestrator.py b/tools/orchestrator-matrix/controller/mosaic_orchestrator.py index f7d798c..b096972 100755 --- a/tools/orchestrator-matrix/controller/mosaic_orchestrator.py +++ b/tools/orchestrator-matrix/controller/mosaic_orchestrator.py @@ -14,6 +14,12 @@ import time import uuid from typing import Any +DISPATCHER_DIR = pathlib.Path(__file__).resolve().parent.parent / "dispatcher" +if str(DISPATCHER_DIR) not in sys.path: + sys.path.insert(0, str(DISPATCHER_DIR)) + +import macp_dispatcher + def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).isoformat() @@ -115,6 +121,29 @@ def is_completed_status(status: str) -> bool: return status in {"completed", "done"} +def is_macp_task(task: dict[str, Any]) -> bool: + return "dispatch" in task + + +def normalize_gate_result(gate: Any) -> dict[str, Any]: + if isinstance(gate, str): + return {"command": gate, "type": "mechanical"} + if isinstance(gate, dict): + return { + "command": str(gate.get("command") or ""), + "type": str(gate.get("type") or "mechanical"), + } + return {"command": "", "type": "mechanical"} + + +def detect_worker_escalation(output: str) -> str | None: + marker = "MACP_ESCALATE:" + for line in output.splitlines(): + if marker in line: + return line.split(marker, 1)[1].strip() or "Worker requested escalation" + return None + + def pick_next_task(tasks: list[dict[str, Any]]) -> dict[str, Any] | None: status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in tasks} for task in tasks: @@ -164,27 +193,40 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic task_file = orch_dir / f"task-{task_id}.json" save_json(task_file, task) - cmd = str(task.get("command", "")).strip() - if not cmd: - template = str(config.get("worker", {}).get("command_template", "")).strip() - if template: - cmd = render_command_template(template, task, task_file) - - if not cmd: - task["status"] = "failed" - task["failed_at"] = now_iso() - task["error"] = "No task command or worker command_template configured." - save_json(tasks_path, {"tasks": task_items}) - emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) - state["running_task_id"] = None - state["updated_at"] = now_iso() - save_json(state_path, state) - return True - timeout_sec = int(task.get("timeout_seconds") or config.get("worker", {}).get("timeout_seconds") or 7200) - rc, _, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec) + timed_out = False + output = "" + rc = 0 + if is_macp_task(task): + try: + rc, output = macp_dispatcher.dispatch_task(task, repo_root, orch_dir, config) + timed_out = bool(task.get("_timed_out")) + except Exception as exc: + rc = 1 + task["error"] = str(exc) + else: + cmd = str(task.get("command", "")).strip() + if not cmd: + template = str(config.get("worker", {}).get("command_template", "")).strip() + if template: + cmd = render_command_template(template, task, task_file) + + if not cmd: + task["status"] = "failed" + task["failed_at"] = now_iso() + task["error"] = "No task command or worker command_template configured." + save_json(tasks_path, {"tasks": task_items}) + emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) + state["running_task_id"] = None + state["updated_at"] = now_iso() + save_json(state_path, state) + return True + + rc, output, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec) + if rc != 0: - task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}" + if not task.get("error"): + task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}" if attempt < max_attempts: task["status"] = "pending" task["last_failed_at"] = now_iso() @@ -204,34 +246,60 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic state["running_task_id"] = None state["updated_at"] = now_iso() save_json(state_path, state) - save_json( - results_dir / f"{task_id}.json", - {"task_id": task_id, "status": task["status"], "exit_code": rc, "attempt": attempt, "max_attempts": max_attempts}, - ) + if is_macp_task(task): + macp_dispatcher.collect_result(task, rc, [], orch_dir) + if task["status"] == "failed" and bool(config.get("macp", {}).get("cleanup_worktrees", True)): + macp_dispatcher.cleanup_worktree(task) + else: + save_json( + results_dir / f"{task_id}.json", + {"task_id": task_id, "status": task["status"], "exit_code": rc, "attempt": attempt, "max_attempts": max_attempts}, + ) return True + escalation_reason = detect_worker_escalation(output) if is_macp_task(task) else None + if escalation_reason: + task["status"] = "escalated" + task["failed_at"] = now_iso() + task["escalation_reason"] = escalation_reason + emit_event(events_path, "task.escalated", task_id, "escalated", "controller", escalation_reason) + save_json(tasks_path, {"tasks": task_items}) + state["running_task_id"] = None + state["updated_at"] = now_iso() + save_json(state_path, state) + macp_dispatcher.collect_result(task, rc, [], orch_dir) + if bool(config.get("macp", {}).get("cleanup_worktrees", True)): + macp_dispatcher.cleanup_worktree(task) + return True + + task["status"] = "gated" + save_json(tasks_path, {"tasks": task_items}) + emit_event(events_path, "task.gated", task_id, "gated", "controller", "Worker completed; quality gates starting") + gates = task.get("quality_gates") or config.get("quality_gates") or [] all_passed = True gate_results: list[dict[str, Any]] = [] for gate in gates: - gate_cmd = str(gate).strip() + gate_entry = normalize_gate_result(gate) + gate_cmd = gate_entry["command"] if not gate_cmd: continue - emit_event(events_path, "rail.check.started", task_id, "running", "quality-gate", f"Running gate: {gate_cmd}") - gate_rc, _, gate_timed_out = run_shell(gate_cmd, repo_root, log_path, timeout_sec) + gate_cwd = pathlib.Path(os.path.expanduser(str(task.get("worktree") or repo_root))).resolve() if is_macp_task(task) else repo_root + emit_event(events_path, "rail.check.started", task_id, "gated", "quality-gate", f"Running gate: {gate_cmd}") + gate_rc, _, gate_timed_out = run_shell(gate_cmd, gate_cwd, log_path, timeout_sec) if gate_rc == 0: - emit_event(events_path, "rail.check.passed", task_id, "running", "quality-gate", f"Gate passed: {gate_cmd}") + emit_event(events_path, "rail.check.passed", task_id, "gated", "quality-gate", f"Gate passed: {gate_cmd}") else: all_passed = False emit_event( events_path, "rail.check.failed", task_id, - "failed", + "gated", "quality-gate", f"Gate timed out after {timeout_sec}s: {gate_cmd}" if gate_timed_out else f"Gate failed ({gate_rc}): {gate_cmd}", ) - gate_results.append({"command": gate_cmd, "exit_code": gate_rc}) + gate_results.append({"command": gate_cmd, "exit_code": gate_rc, "type": gate_entry["type"]}) if all_passed: task["status"] = "completed" @@ -251,24 +319,33 @@ def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dic f"{task['error']}; retry {attempt + 1}/{max_attempts}", ) else: - task["status"] = "failed" + task["status"] = "escalated" if is_macp_task(task) else "failed" task["failed_at"] = now_iso() - emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) + if is_macp_task(task): + task["escalation_reason"] = "Quality gates failed after max retries" + emit_event(events_path, "task.escalated", task_id, "escalated", "controller", task["escalation_reason"]) + else: + emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) save_json(tasks_path, {"tasks": task_items}) state["running_task_id"] = None state["updated_at"] = now_iso() save_json(state_path, state) - save_json( - results_dir / f"{task_id}.json", - { - "task_id": task_id, - "status": task["status"], - "completed_at": task.get("completed_at"), - "failed_at": task.get("failed_at"), - "gate_results": gate_results, - }, - ) + if is_macp_task(task): + macp_dispatcher.collect_result(task, rc, gate_results, orch_dir) + if task["status"] in {"completed", "escalated"} and bool(config.get("macp", {}).get("cleanup_worktrees", True)): + macp_dispatcher.cleanup_worktree(task) + else: + save_json( + results_dir / f"{task_id}.json", + { + "task_id": task_id, + "status": task["status"], + "completed_at": task.get("completed_at"), + "failed_at": task.get("failed_at"), + "gate_results": gate_results, + }, + ) return True @@ -276,10 +353,14 @@ def queue_state(orch_dir: pathlib.Path) -> dict[str, int]: tasks = load_json(orch_dir / "tasks.json", {"tasks": []}) task_items = tasks.get("tasks", []) if not isinstance(task_items, list): - return {"pending": 0, "running": 0, "runnable": 0} + return {"pending": 0, "running": 0, "gated": 0, "completed": 0, "failed": 0, "escalated": 0, "runnable": 0} pending = 0 running = 0 + gated = 0 + completed = 0 + failed = 0 + escalated = 0 runnable = 0 status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in task_items} for task in task_items: @@ -291,7 +372,23 @@ def queue_state(orch_dir: pathlib.Path) -> dict[str, int]: runnable += 1 if status == "running": running += 1 - return {"pending": pending, "running": running, "runnable": runnable} + if status == "gated": + gated += 1 + if status == "completed": + completed += 1 + if status == "failed": + failed += 1 + if status == "escalated": + escalated += 1 + return { + "pending": pending, + "running": running, + "gated": gated, + "completed": completed, + "failed": failed, + "escalated": escalated, + "runnable": runnable, + } def main() -> int: diff --git a/tools/orchestrator-matrix/controller/tasks_md_sync.py b/tools/orchestrator-matrix/controller/tasks_md_sync.py index 4bc0812..fc671ed 100644 --- a/tools/orchestrator-matrix/controller/tasks_md_sync.py +++ b/tools/orchestrator-matrix/controller/tasks_md_sync.py @@ -78,9 +78,11 @@ def map_status(raw: str) -> str: "pending": "pending", "in-progress": "pending", "needs-qa": "pending", + "gated": "gated", "done": "completed", "completed": "completed", "failed": "failed", + "escalated": "escalated", } return mapping.get(value, "pending") @@ -92,6 +94,7 @@ def parse_depends(raw: str) -> list[str]: def build_task( row: dict[str, str], existing: dict[str, Any], + macp_defaults: dict[str, str], runtime_default: str, source_path: str, ) -> dict[str, Any]: @@ -100,6 +103,9 @@ def build_task( issue = row.get("issue", "").strip() repo = row.get("repo", "").strip() branch = row.get("branch", "").strip() + task_type = row.get("type", "").strip() + dispatch = row.get("dispatch", "").strip() + runtime = row.get("runtime", "").strip() depends_on = parse_depends(row.get("depends_on", "")) task = dict(existing) @@ -108,7 +114,11 @@ def build_task( task["description"] = description task["status"] = map_status(row.get("status", "pending")) task["depends_on"] = depends_on - task["runtime"] = str(task.get("runtime") or runtime_default or "codex") + task["type"] = task_type or str(task.get("type") or macp_defaults.get("type") or "coding") + task["dispatch"] = dispatch or str(task.get("dispatch") or macp_defaults.get("dispatch") or "") + task["runtime"] = runtime or str(task.get("runtime") or macp_defaults.get("runtime") or runtime_default or "codex") + task["branch"] = branch or str(task.get("branch") or macp_defaults.get("branch") or "") + task["issue"] = issue or str(task.get("issue") or "") task["command"] = str(task.get("command") or "") task["quality_gates"] = task.get("quality_gates") or [] metadata = dict(task.get("metadata") or {}) @@ -147,7 +157,14 @@ def main() -> int: tasks_path = (repo / args.tasks_json).resolve() config_path = repo / ".mosaic" / "orchestrator" / "config.json" config = load_json(config_path, {}) - runtime_default = str(config.get("worker", {}).get("runtime") or "codex") + macp_config = dict(config.get("macp") or {}) + runtime_default = str(config.get("worker", {}).get("runtime") or macp_config.get("default_runtime") or "codex") + macp_defaults = { + "type": "coding", + "dispatch": str(macp_config.get("default_dispatch") or ""), + "runtime": str(macp_config.get("default_runtime") or runtime_default or "codex"), + "branch": "", + } rows = parse_tasks_markdown(docs_path) try: @@ -171,6 +188,7 @@ def main() -> int: build_task( row, existing_by_id.get(task_id, {}), + macp_defaults, runtime_default, source_path, ) diff --git a/tools/orchestrator-matrix/dispatcher/__init__.py b/tools/orchestrator-matrix/dispatcher/__init__.py new file mode 100644 index 0000000..3e9676d --- /dev/null +++ b/tools/orchestrator-matrix/dispatcher/__init__.py @@ -0,0 +1,15 @@ +"""MACP dispatcher helpers for orchestrator-matrix.""" + +from .macp_dispatcher import build_dispatch_command +from .macp_dispatcher import cleanup_worktree +from .macp_dispatcher import collect_result +from .macp_dispatcher import dispatch_task +from .macp_dispatcher import setup_worktree + +__all__ = [ + "build_dispatch_command", + "cleanup_worktree", + "collect_result", + "dispatch_task", + "setup_worktree", +] diff --git a/tools/orchestrator-matrix/dispatcher/__pycache__/macp_dispatcher.cpython-312.pyc b/tools/orchestrator-matrix/dispatcher/__pycache__/macp_dispatcher.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..f6dcc9136ecec2365d0d019b9102a87d5c8c02b4 GIT binary patch literal 20667 zcmd6P3ve6fnb-nYJQoj=ARZ(^5`0J^M3EFFN}}GONWCpemT8-kEHMxkBtek`y$k9A zgASeRjH#SkQMKhx zSn4drYFO>4W=wlltHEd8sO~IH?)tNOayOhckh}4$k=#vZP2_GqYbJNgSqt3hQD)3~ z)(U-iS^Ynu&)V1u)&O@qTge*X?qI7}6WmK!A8UqtDO=52;9drI2JTL{Tj5^L*046X zyVzRR4tF=}XB}|&uyt$++`Vi)TMG9Iwt+2!dnMF&!o3R0%i->08(A0JtJ!s|8}2pi zde#H?+H1PNhWPdqUEQbrY=oZ(#zMnk&OaKt$OXA;{=q2ck8+U3$GBiD%C(FIV_f8_ zKNjRK@vXl`umvLZ{xy%@frndEOf4IcA+Kh@ z!-zirO?u92VR*W9#PoFD#~LYbgeg)cXT1cI?+qA4EgKU}Y%ms%MaIISZajKLG~r`B zIu5Usv5;tv@KJyTto|eF5w?E3BNmO0@@++c)0Tr{+gLC(F_3$2owz0%q5A>oj&nda zCGgL$hh&;sVQe>cU*COW|MmTMw%pr&d-uKlxAzN7bB5WLX0{2;jugFvvq9BRKFkmr z1hV^Jy|<_YH3Auh*9gMD-~!y#+}D1rU_Q1i0(ZzQ0&i9qhu#zjPiYg{5pzNJ@XFGx z{=PvOVH-82i>cpY#QJpogeIY5jrUC-%d^!`3Ec<+so-MG%E+**$e%I>w9t-}7F37V zggRFZb&YDE+9_Uh1@`qBN&|oIR9)4bp{{5GmfpUPw4!k$$c4vayl4tvjqtI7=%qN_ z8XKDsHCG<%Cu4&xogZnSY&;ZYBjZEAMgRiJ8zSQ@w0f+u1k9pA>O$0yU1B4g$XtoU zh6nh`!NJH?(IDUH=tOv2q(}HDr0gX0ufM`YVqww9g(pUXp>Ti}Y1od#P#TN{MIBDT zFdQ6bN5eb~gZkm(?C>C(`QYX70QST+!FxIU^M3-#G?k@IZy$f_c!qYRY1bS(-kadD z^K9dsA^G|*O@XY*e#3g*I@`Bws{EZEs;n9)JsD}1{w3}Doe{IJ>0=(h^A8;x4>)x{ za_SHGj3IejLN-Mq!V%>Cb9f-gn_r4G;QCm)QN@s0DVx&7ROBs+ut3Zc8U?Wk!WJ2c z)hdW}Rl_6M*8zc`d*haK7lJME%`H36H$T?2h%^#E_&R!^i4pj0qtPp2PSkQhD)`BZ z0iCG792}j54@nBbtVr{tlS4dWwff0S1DF>XyvA*X>}vSucS8bS%H4KV9n0A3 z()K#R-Y|U_DZ}>b+h<#n&g785v`+VA8OM!X*LTf^{&fEm(>OPnq8laBfQX@7xFrd6 z4UhtKwf|_+pe;rkSRG5>*DG5Owl|%iM~uqgxjSo!sl-9~Uhh+^F+o9Ux~o@U4x5*L z7O~b)F{FOv8bQ1-xL7l5Nl-(&m`VWPi_2d*rjnbHvVvytnpcW@r{F>+exLbRCWBLY z);eV{QwjZur=TLd#wwIN)|SvIu(@wn=J6U(iz(xj30hYwHFNhVbHd1#7KdWOsPsYL zR1D4~Md9pBma4EYR*48=Z`idzj;Z5`?eD zIK+cPzeY?0K43(-0qXoZ0q>7B=i=q({e4GI96b4Q-!6anYA76L!>m89T?Kc4&lkwV}i_2)DMnM^24G9Xv+i_4TX7L)Sn{ELk9#%q)#0^bx<_C z)Yk(;h)gItHa0mP34t&p>c)asMGF^>O>)Ss!lJ!Tn%u#w6A=yx)1}DhXuu*Gu@!I@ zZU+u%8jTJO5YEHMiy7d#1Iq8anq0Rjq0CIBRw0nrTnGB$t%@hE8nt&=Z_ zOIZiF$#E!_M(Tqa4e-y81K^NRS>HbS*2xTAk)|t>jSJ-|x?-8$o;8==sJ>o3%O_u* zsTRyFtCUyYx!}y27{ZUb?-<`V&%Tjto*$Y&vv4t0wsYCEOD=qeS#eZk9JOgjZPw%F0OBeUm{rum9_W?@UJZ2Pil$M4!<6{}k)t7~@fpN}TB$)hQzB}KRV?=M_b zNoB^-ly)@zmeNbZ&m7HKOR(Q}j;oUqyc?AXRAD(8cTRI>!~21`s#Ig=V%4JU!AmLk z(Z8`CTV=4L$G@w^F%f)^`F4Q&CtceQ?$G_Hskz6a{cGn|c>b}ctMXun{wM8Pc=^dz z4L*11@Og*+5M_Kq>O^^)u$DLB0pxj=nHH1h6I6~l=SVWjpYW%>uTx1Y%8^B+&0?~p zLO=+E?ju_{pqJ2B586uTl4uVFCjjxoh^2s_+%A|Y*{V>l3!X(Cpd;6fxt*A7f&_Rs z9pr|1(h+wOa-QVc)`20!aH(VNER zUxm^`SZxa=2rK(c?8epWSLZB(qhZO>lDrIZO@`@6GaUl6J;UrtGkXMP-x71=!TJ<^ zuvP$) zd@z^=M?Hc8#XA)YOH9||h7{fP^y-oQll|8Z!~L;VKcz`%I1rm(qGGDJk=yBOz*(n= zDKhM6cG}OkF~k&|U)Nh%>N;g!X;vu69Z@Zjzos6a``8 zE{*F=fKamsfQzdRn0iG6m_f!v!`yzz6SWbR;JT=fMPj4jxPCAkjJ0tap-kQzTn-tK zC_pam4Q>!yTTyKoh_FQR7cHa<@?4-d2$e4gr6;D3WsRk?uPqtt zvQF=uZ_b`7-EiITne?0t&Rdd+RB6YIA?vD|?wRqv)thBXr%y--i<+@=sSLvlxqmc- z1qQ+$$e~v;vrr606LJ+ZK@>ALBvh)Zpl~r0gF->40M+t0%pYbBtS+G`rn!I&Oy5<} z|4Ypew6qvXzT|rDcK&i3(4dSgZ+cZ%QS##q=smi;(bPGs8xD1C*l?yRQSMTV3x~;; ziyPi-GqJ)E7fc=SPu)jnHxM&n?g3w|#5{ks64p1Qo^T?!$U@M8WZgsmV zGbFMzcLl1QhkyR(kif2Wl+AYB>b%)GcT8|@SaP<^ZxI}=8AnIj(IGgtPapZz=vZ;M zZzQfK?i{#x`1aw9uQly!oxdvhIt9nhjH4^<=n@>=AfH+&YsvKSFN{=)r?C2hd(r#@ z`}ggs9fu#pQ*EaO-%Em{FXMPE?RZUaT$rJ=4(EHrclcz-hn*jE&i8z4-%`V=g}(3g zf6~7=By2kbZx7cCwJ!?p(`m=)8G5CvW-j*r&KY{aFCBkJDXT)>Qq z34<{jz(KArE9!wz2-s72!G2J6`drz4RU%R_Pq`P{*&oXdanI(g1~I47AFvyBrzm+~ z)>P?%%9M)LQZaljMu>X?a9)oFa~g5(X~Z6fQMo846PO?tbFV`p8YEjZ$72qM39_5q zMo<_Jn$1NUqU}H|JO*aPSeUyEc?B%TJ-~(sgOj7NfhTboby(bJ{0Jl{u9V%_bA8Vp z?LFgdW5&BN?cF%PH`Ba7-Ms(7nV(+x;e}N5exbQf@V+cCuVk1DY372!e3i()?={`o zIv2Zl_4d_y`u#VTYF}6=|DN{~@8Slb{Q$f@7!;~b2xYx#rZ+|RJ}vJc(I)cF0zBNJ zhJZ_dQ-l1vhdTciAT8P)X(@(utd`Z~=^&~#7%kK&4X}p0+93mLgg?`L^T#rZB3cwf zQr3c!1h08#UqTxKGvP}VP3254EQ5xvyDE}=T@$eO26S;I%0-4Efq5`)zVHUB%#qlXq~(01<6Z?APfOzML5_=ZfItLy zEFqwn5p<34qFH^GR3iiv5<-a%maBA-j5k2W1L29T(!TYO#12> zGic~H-n{;1)>by#1L8>J{nq*Bh4_QWgLA1<=ThCTrL5<(RkgBqZq^g++zQkWeI#>1 zfvL?peM?r~YAN*gMH$6-QWZT9&OA7t@|+SZFQ(`hpB6+^6-$H({V@d*p2-O!tV#^N zjxweexBj!VT7r2+h#oPxqJja}vs64FD~4Xta*>OK2PJNV@@srO@$y}Zjz(2iA7IPt zlVA(t{dtvD!&tzOBPm415iPHvX`7%3p*zSegDeo2tFb3&jy>1i02jQ%U5D`thz;W8(0U`IiNHy?_Bvk?`>~#lTgvNRIzPgi{RRxaqUUF_6V+h)4iWE6)Sdc zs-jDLnc8_zim4Tt_6)Nl&Fm1Eohf?f zGw{M&z{J+jIaEd?lXrE$KkK`oNOugCzhAg4efOHzwamVz8vFl&a+9_w4t8wG@h zke8fNgj4`~07eu6^FS;rBaLW6KPs3DIMI>UKS7-v(53)&amepPUvnPFCH>XOZw z`km?eor_IEeYa5412WV3w5ck~xS_MBIU0T?!qN8pF%=@9HoSHSI8=@n{b3Z6r}5;O zN)cN2RIv(HE?)|^%X3u4s1dZv7hLFW%EMlDzp;AOFr+WayS)Wnz$@@>uyJz)CPV15 z(iL;WjG#lV;8Nxa62}3MIP?iU%dk}CGwCq6M+_h*RIt{B;W7m~-%!k1!dh9IvX5nl zhyk1-bp;cE*MzEQf!Df+28S6?dV4R29G2`wZW^E7f&_gO!Pq1pXIff7v<$(Pi;PPm z1c#f1hz-9645Z?;zh$4FL-%8#v~a_&C4oxB-#|R%Jh7)r;si<(#GVTN3Q(P5qmglt z9Q5Q_Gz2HW=gkI8lA>i*BowmE$r{k)3f>6tau1<<0$Yn_q%?%XaHpY|H$j482VRos zVIPoN3=BrVInI3mHISk3G|+M&1WxyC?AFzrSCbaO)4b$qpXUVUmW;DA?d&ARkWU#; z)=@fR{=A6(l@NS;1jpWt<6zoxP;eZcK9XgeD1R(7KB7G?9WwO5gwYR=DSGQ!WB5gmbsG2^UEh)dn++;E(>(eFc zg%Ys*8}#*{sg--@O6SUEkKF3L*(;Pc&X_-Sc(c}WFtE&?Ni#LsszwYDncu$9E>vy1 z{=!Vx%(bk^al?MyK6~w#rn*%dbhB!wtj=j!phiJaxlnf^=0Z9M`_oWT~Ai|@oK%N=+f*Gab94VTjpW`70 zhMCjza_v|Y{Z08gV9w@~poh#jHvF0ITRxUo1mmL@kz9FoL6IQFLs(gWiFb5^fD;S* zLB7uk;}}qdhQL<;Y2ugTb47^ft&izes4qDwbGw=Ba8Vaz*|@gl0C9JGM02me_yyWP zz)TDe+$Ai!jY-b0h_ZOzR_%sY+2$e1`Zjex65dXXmQ0>7UugNT^RR*mRb}Mnp~HUx z|NOs#WSaWIK)IUdI~FR0^4-%XR&2hxKEYN$eJE=z&ls!H#;Rpwb#m{*_H@I}Onq0n zzH7O@JL~YJOuo}^~CCl{gm9q88YeHH3^zjwi{&p`|8|7UW zcy~NGvaoKkeDU&+`Gg8)u9!o~069=aJ0%Zz znkutNvS<#mm6by|JEE%36dE^*8BCNdk318!B9Ah2kY0iK;00Qd z9*F^K!0~4_ob$OeP)&@5Gk9)GlQL>yOY{*ux3wC3R@nNvEepO9t9nMB+tRYujC0{8 zYhVUb(9){BKewe#sVT2h4Pjv&cg#};wq(l4md22{kZS}Z7YeRi$O?vPJiBhJR%txq zSDteBoZO?#3jH+%W1IwIxmbfzJApck!oW;y`E$n2?Jf0tZdn+r`NE zu0|}c-|Y`UNRNHDUn=nv3rpO}hafyK=5Ki&02jCT+t~2sw(-f)(YXKRaei_F`-hM6 zMC7#}zsN3spo}=oiMyCvQv^o_E;$^D1sDSuxNkrr>HrKNa}SS(uZsHDC!=8W^<%!C zK!w;^0v5?LiAFvN)&%MV*7)VoiE<4gC(7_4aXq4*SZhRmWE@bfXq1op;$&o$RUQ9o zywCTc(;vV;|9^-~4#Gi6U9wzote-xzVh1OKU~hy#$EOAG`@HOzJSJ>CytK9N;ZC9L zWx@A~;CMCT7)Uz?1V?cC$Y(~!?Di#N#Y&lLmc2E6b9nBW;0`Rg+vZOTWt%f)+tX#+ zg|f~WL{}vJW^#fcf$8B-M%!xPN>`KdFrTeqk>dg39VZ)7B;E zDgC$^>}NFL4_a0~u3-&R`aBCl@mPX6EET4N7Ay~!!17>Dn2Lq)MNVnWDcO); zxd!G`Dt&U5&xMg%S7`&gpbGl-tnd&JzH6Sl z;u@s|U_}}1UrVvQ06VT$kr9>p02P0}B%YUFmoPzT{Tl14&#T$6MxBPdI*oazgW@>Y zsMMA)JECGsMDrcZ6q9pM=K?uE>Q#*+2}W5Pte*i>#Jc#71IU%p6FANWIW`gp!9y~g z`;WbJvezFRL>U3~f4G52H!%uFp+Jn{ZUK=dQs6#NhSBdF1PdDru0W{{K8E>F5FN`b z5%9Rb!WRDqD_hk{An=K1sVO{4qUB3ig+_Ttw2Xz(fCoX#qYzX_1WzpG9zYUr>O+xL z?TU!2uTKWSws_4ygnmi?M34vRwl!WuYD&ZSFNVR=5?0#bTH**R!(HoY$!T98V<$eC33%jTK@(9`Tq#o0I?B) z#MXDK|7QQ(u;ALT@b(`L|NN-nd}aCtG_0C#n&zs6l6BKZvqsyj7evGl0xRWJ?;f2of7a5L z0{c&i@3)ju4{_gFlQ(0kO`B>z$DkJys$dpsw=C6mE?g9UJ;I3DtXY!rm)_Z_|=*&w^F(?){CCw%Pw)XE0k*|Cer(yc| zs>NwHWKE^-ZIw;a^t*2^d79_T1m~tz9h7}$Dw$oss)r{yTV?jlSyzqtY@*ENvu9S# z_-v7$8GN=<<}$S}$;`cZ{&m6GzG}l_J9KeJCyS*Q1YgIJZ|9;$@a|f5U~vhwUUoJl zdjw}-wG?y8D6@09d|i?i%9~f6m{SfpvL|fga{Iyey65P7rrV}urQqGPT;B4qW7UP# z+?2U=zV9#kKk8o?UT)d{UV|);UlrU<%VmKFeXAa<=B3QGWZ#GVAN0?&-@fopXtwcI z)6J&2j(1y^?RAUgs})$OGJjEK!LxD6(?5UdFQOks7Y9>^UKLvV1?QR7Dy-_G%-%aS z^R!UWvRaK9HLzj`)nBc}7e8fIVkS2TrS+?Im|0Jm!6bRF|8{?B{Z65J*J=aiHo`LB z>ye$$8=Q1lCx*3c_dc@=`~)wGoek;#3xZI<>aSIEz(Odr=f``5ms=@Rpz&op2sz)Y$Rab zq!zAJbsvt@JYBzF^{NpR4nx2RwqkG@GJ>lS|E%|Ic^-&=>?n3{mpY#A4-SHt#5bG{ zk3}yR_$(ypAclN6!uwe`6fGNy&^*+e=cV{MyvIvgTEK(R0vvlRI^F^AE4P1^^tco6>vp285B2tN?K2oar=z|DEc zk=!0#_=I6j5XjPu&v!9FUJb`X$ow$m0NszGCHGFeA-n-K2atrET$z)C(6<^#iG3L~xBv_pX$B zZ~1Qe=EjB67CG*vF3r>}GYwgHO|mlWZqB$m(yk7Opqa5`*R_2(@WH^MVQJmISxcs* zZmFd1!7*fyC9_ACj6OKp`*|_nO4H&|@T_$HHF!2(F6LFyJD&E(Y)HXrEW1*9jBN^d|g z$Kwce(J@q&QC3X#5!Lh$!8E!(FsB?{gChvV$5LJW&`Z;rW96GGa7Ox~Gs^D^&o!H# zm=W<=K3BCZG#{u&fnw3Q3Pt6&<@(K6a}%0e{u)qR({!Mm;#QO-fT@|lR1IfZ1EPg4 zjW4%?$>+{gb&bg1+o8CpEvnLEWRWX;(mR3@vwX*=Z;gu~@TBAc^ z4LCU2a)rY}E!SNdAZQ~HR5|n$P81_J=G@3Tzf^yoyCl4E%9L|gtkKS^%vIh^#c?;k z7UFJ&(rU__@5IV;`dD&~m4qc>OqddCNbhSv6Q-Cc>y#~FgH>1JsuMQO!d8(}(!RT} z^Vw?59j{3+Yz+w(s#s(Gs*!>>v9$>XgMO9PISBwD$qa#OQwb^tW1%XitkJJuS)q(i zs!3zY{@k{81#MO1#FRr_PjkYaa9nx~IN|!7+ii_q*uXX_xT5sTF|j%Y^Ku3&G&Llw zgBba+E`I(HxH)ppqS44;ICKs8=Po}98NhfW$ww8&M_ENAj}ni~1xGZgXNouZrGuP- zR*aqwf#NBdD~R_(EKvg3)f0#3VU9#W7adBOatbemMa0#C0*si8dq6i?+$|kBc>_c8 zr}B=S+`zhUnwmsEg4huEwaT8L{NI3Slhxt4t7vKOV@;c6PayKhP3)6~gVj9z#t5FT zcI2J7j*hc@#P7pV2h;=C@!D5aD-9mm_LCBXjcmjbH9Yt4ae<(8Q1NvIwPouF{vZi_ z0tigOqm2FnyM|a=DC_Wj5E}L~?3KiFeifgHwSmZML{v*a9t3@1tT~U}!+CM|eG>ym zek6m$hDb&VbOdpIn4m2H4%x%c=x``Xf%HdULR?bBG+~e&ZX!c|MMr)iUn|UkV+oyc z+}~nP$`yyokvuOg_fsJ|sN;9$Hy-P#wlkMQk;R;mPfFKYX>aBDh)C7!cd7^dQ_ zlZB^m@dP2WF~e+5Gn<#0EpX^(#aa*6lWa)^9+*FV>*URo$&;W1FO{E4b)FJRUVMZG zjz>C!59}u7Xb{BJSRL|_UYp=pzvS37UoS8%8D?vm*}BYZLqA&8z1rKg^FE}&YgvjsxI^RD6f2+RcKyeaRhMIG@>eD821a z>TtiX?F?k3&Ig6Ei}1nijmf_Goq~NE_$(XR7L1GD2b~Yk2@Pk`w14{O%;ni@*@}7q zwl7V)KQ)$QD|aq7FIGPYro2aBZm)jBmNl2n2ES1wyZJoJ{`PF;{s(3d?Y+mb#&z50 zG#nV4t4JQ0H!aicpBEe-+ce)UI9g!RYHXuq3quE@a~s&cK%DRT`kQl2$x-k%emAyw z{0Aq$e=_v~C=Fp@&*1Wop%oK^*e{!EK4Z#f2UCnMMf-?o9?+tyhCdlWlrhkp8etzi zwqi^e9I9(Qg>m2OAd90h`R|a3bT80{Ovq*>Ph`h_ zf_c~`?t+lA(eODi%WeX5HMhx+^x-_r;tZ5NHjlzDGM$6`^U@K0(nD@6>A3AkWYmeoMnpx03H$hnq zCFMxs^4iP2=H^FQ{V(iQ_SKVCgsc&8p$Ru$P5|=cde0NXZeG{eP&u7JIw4*E&%&+= z;Uy5wP3^xzB?%nZChmCh>zohxMdQE#8x0K%0FxRRoFu=vFu>`s8QN|*RHHdRCKH$t zg}xP^{xv3Dkcb9w0>Ce6N%-WzYzP9APqYz6fhs=tRZIxaxr9$wFhQO}ju?@c24dwV zrf-Se5KeNFb7f1>vMXR7}1 zDA%tj=2w*YSCpNk)?ZPje@nR^88n*CM<#>D{isZ*S@+0VrD=V%N2{rQWYK7jY95(1 zngftpHD!-#Oq!BMZms5|=8=WboYp+@={36^l^8XiN9C0o=c8R^nu str: + return dt.datetime.now(dt.timezone.utc).isoformat() + + +def save_json(path: pathlib.Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with tmp.open("w", encoding="utf-8") as handle: + json.dump(data, handle, indent=2) + handle.write("\n") + tmp.replace(path) + + +def _slugify(value: str) -> str: + lowered = value.strip().lower() + slug = re.sub(r"[^a-z0-9]+", "-", lowered) + return slug.strip("-") or "task" + + +def _run_command(command: str, cwd: pathlib.Path, log_path: pathlib.Path, timeout_sec: int) -> tuple[int, str, bool]: + log_path.parent.mkdir(parents=True, exist_ok=True) + with log_path.open("a", encoding="utf-8") as log: + log.write(f"\n[{now_iso()}] COMMAND: {command}\n") + log.flush() + proc = subprocess.Popen( + ["bash", "-lc", command], + cwd=str(cwd), + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + encoding="utf-8", + ) + timed_out = False + try: + output, _ = proc.communicate(timeout=max(1, timeout_sec)) + exit_code = proc.returncode + except subprocess.TimeoutExpired: + timed_out = True + proc.kill() + output, _ = proc.communicate() + exit_code = 124 + log.write(f"[{now_iso()}] TIMEOUT: exceeded {timeout_sec}s\n") + + if output: + log.write(output) + log.write(f"[{now_iso()}] EXIT: {exit_code}\n") + return exit_code, output or "", timed_out + + +def _git_capture(args: list[str], cwd: pathlib.Path) -> str: + proc = subprocess.run(args, cwd=str(cwd), check=True, capture_output=True, text=True, encoding="utf-8") + return proc.stdout.strip() + + +def _resolve_dispatch(task: dict[str, Any]) -> str: + return str(task.get("dispatch") or "exec").strip().lower() or "exec" + + +def _resolve_runtime(task: dict[str, Any]) -> str: + return str(task.get("runtime") or "codex").strip() or "codex" + + +def _resolve_branch(task: dict[str, Any]) -> str: + branch = str(task.get("branch") or "").strip() + if branch: + return branch + + task_id_slug = _slugify(str(task.get("id") or "task")) + title_slug = _slugify(str(task.get("title") or task.get("id") or "task")) + if title_slug == task_id_slug: + branch = f"feat/{task_id_slug}" + else: + branch = f"feat/{task_id_slug}-{title_slug}" + task["branch"] = branch + return branch + + +def _resolve_worktree_path(task: dict[str, Any], repo_root: pathlib.Path) -> pathlib.Path: + explicit = str(task.get("worktree") or "").strip() + if explicit: + path = pathlib.Path(os.path.expanduser(explicit)).resolve() + else: + base = pathlib.Path(os.path.expanduser(f"~/src/{repo_root.name}-worktrees")).resolve() + path = base / _slugify(str(task.get("id") or task.get("title") or "task")) + task["worktree"] = str(path) + return path + + +def _default_worktree_path(task: dict[str, Any], repo_root: pathlib.Path, base_template: str) -> pathlib.Path: + base = pathlib.Path(os.path.expanduser(base_template.format(repo=repo_root.name))).resolve() + return base / _slugify(str(task.get("id") or task.get("title") or "task")) + + +def _resolve_start_point(repo_root: pathlib.Path) -> str: + candidates = ["origin/main", "main", "HEAD"] + for candidate in candidates: + proc = subprocess.run( + ["git", "-C", str(repo_root), "rev-parse", "--verify", f"{candidate}^{{commit}}"], + capture_output=True, + text=True, + encoding="utf-8", + ) + if proc.returncode == 0: + return candidate + raise RuntimeError(f"Unable to resolve worktree start point for {repo_root}") + + +def _read_brief(task: dict[str, Any], repo_root: pathlib.Path) -> str: + brief_path_raw = str(task.get("brief_path") or "").strip() + if not brief_path_raw: + raise ValueError("MACP yolo dispatch requires brief_path") + brief_path = (repo_root / brief_path_raw).resolve() + return brief_path.read_text(encoding="utf-8").strip() + + +def _resolve_result_path(task: dict[str, Any], orch_dir: pathlib.Path) -> pathlib.Path: + result_path_raw = str(task.get("result_path") or "").strip() + if result_path_raw: + result_path = (orch_dir / result_path_raw).resolve() + else: + result_path = (orch_dir / "results" / f"{task.get('id', 'task')}.json").resolve() + task["result_path"] = str(result_path.relative_to(orch_dir)) + return result_path + + +def _changed_files(task: dict[str, Any]) -> list[str]: + worktree_raw = str(task.get("worktree") or "").strip() + if not worktree_raw: + return [] + + worktree = pathlib.Path(os.path.expanduser(worktree_raw)).resolve() + if not worktree.exists(): + return [] + + proc = subprocess.run( + ["git", "-C", str(worktree), "status", "--porcelain"], + capture_output=True, + text=True, + encoding="utf-8", + ) + if proc.returncode != 0: + return [] + + changed: list[str] = [] + for line in proc.stdout.splitlines(): + path_text = line[3:].strip() + if not path_text: + continue + if " -> " in path_text: + _, path_text = path_text.split(" -> ", 1) + changed.append(path_text) + return changed + + +def setup_worktree(task: dict[str, Any], repo_root: pathlib.Path) -> pathlib.Path: + """Create git worktree for task. Returns worktree path.""" + + worktree_path = _resolve_worktree_path(task, repo_root) + branch = _resolve_branch(task) + if worktree_path.exists() and (worktree_path / ".git").exists(): + return worktree_path + + worktree_path.parent.mkdir(parents=True, exist_ok=True) + start_point = _resolve_start_point(repo_root) + subprocess.run( + ["git", "-C", str(repo_root), "worktree", "add", "-B", branch, str(worktree_path), start_point], + check=True, + capture_output=True, + text=True, + encoding="utf-8", + ) + task["worktree"] = str(worktree_path) + return worktree_path + + +def build_dispatch_command(task: dict[str, Any], repo_root: pathlib.Path) -> str: + """Generate execution command based on task dispatch type and runtime.""" + + dispatch = _resolve_dispatch(task) + runtime = _resolve_runtime(task) + worktree = pathlib.Path(os.path.expanduser(str(task.get("worktree") or repo_root))).resolve() + + if dispatch == "exec": + command = str(task.get("command") or "").strip() + if not command: + raise ValueError("MACP exec dispatch requires command") + return command + + if dispatch == "acp": + payload = { + "task_id": str(task.get("id") or ""), + "title": str(task.get("title") or ""), + "runtime": runtime, + "dispatch": dispatch, + "brief_path": str(task.get("brief_path") or ""), + "worktree": str(task.get("worktree") or ""), + "branch": str(task.get("branch") or ""), + "attempt": int(task.get("attempts") or 0), + "max_attempts": int(task.get("max_attempts") or 1), + } + python_code = "import json,sys; print(json.dumps(json.loads(sys.argv[1]), indent=2))" + return f"python3 -c {shlex.quote(python_code)} {shlex.quote(json.dumps(payload))}" + + if dispatch == "yolo": + brief = _read_brief(task, repo_root) + inner = ( + 'export PATH="$HOME/.config/mosaic/bin:$PATH"; ' + f"cd {shlex.quote(str(worktree))}; " + f"mosaic yolo {shlex.quote(runtime)} {shlex.quote(brief)}" + ) + return f"script -qec {shlex.quote(inner)} /dev/null" + + raise ValueError(f"Unsupported MACP dispatch type: {dispatch}") + + +def collect_result(task: dict[str, Any], exit_code: int, gate_results: list[dict[str, Any]], orch_dir: pathlib.Path) -> dict[str, Any]: + """Build standardized result JSON after worker completion.""" + + raw_status = str(task.get("status") or "") + if raw_status in {"completed", "failed", "escalated"}: + status = raw_status + else: + status = "completed" if exit_code == 0 else "failed" + completed_at = str(task.get("completed_at") or task.get("failed_at") or now_iso()) + failed_at = task.get("failed_at") + if status == "failed" and not failed_at: + failed_at = now_iso() + + normalized_gates: list[dict[str, Any]] = [] + for gate in gate_results: + normalized_gates.append( + { + "command": str(gate.get("command") or ""), + "exit_code": int(gate.get("exit_code") or 0), + "type": str(gate.get("type") or "mechanical"), + } + ) + + summary_map = { + "completed": "Task completed and quality gates passed.", + "failed": "Task failed before completion.", + "escalated": "Task requires human intervention.", + } + result = { + "task_id": str(task.get("id") or ""), + "status": status, + "completed_at": completed_at, + "failed_at": failed_at, + "exit_code": exit_code, + "attempt": int(task.get("attempts") or 0), + "max_attempts": int(task.get("max_attempts") or 1), + "runtime": _resolve_runtime(task), + "dispatch": _resolve_dispatch(task), + "worktree": str(task.get("worktree")) if task.get("worktree") else None, + "branch": str(task.get("branch")) if task.get("branch") else None, + "pr": str(task.get("pr")) if task.get("pr") else None, + "summary": str(task.get("summary") or summary_map.get(status, "Task processed.")), + "files_changed": _changed_files(task), + "gate_results": normalized_gates, + "error": task.get("error"), + "escalation_reason": task.get("escalation_reason"), + "metadata": dict(task.get("metadata") or {}), + } + save_json(_resolve_result_path(task, orch_dir), result) + return result + + +def cleanup_worktree(task: dict[str, Any]) -> None: + """Remove git worktree after task is done.""" + + worktree_raw = str(task.get("worktree") or "").strip() + if not worktree_raw: + return + + worktree = pathlib.Path(os.path.expanduser(worktree_raw)).resolve() + if not worktree.exists(): + return + + common_dir_raw = _git_capture(["git", "-C", str(worktree), "rev-parse", "--git-common-dir"], worktree) + common_dir = pathlib.Path(common_dir_raw) + if not common_dir.is_absolute(): + common_dir = (worktree / common_dir).resolve() + repo_root = common_dir.parent if common_dir.name == ".git" else common_dir + if repo_root == worktree: + return + + subprocess.run( + ["git", "-C", str(repo_root), "worktree", "remove", "--force", str(worktree)], + check=True, + capture_output=True, + text=True, + encoding="utf-8", + ) + subprocess.run( + ["git", "-C", str(repo_root), "worktree", "prune"], + check=False, + capture_output=True, + text=True, + encoding="utf-8", + ) + + +def dispatch_task(task: dict[str, Any], repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dict[str, Any]) -> tuple[int, str]: + """Full dispatch lifecycle: setup -> execute -> collect -> cleanup. Returns (exit_code, output).""" + + macp_config = dict(config.get("macp") or {}) + worker_config = dict(config.get("worker") or {}) + task["dispatch"] = _resolve_dispatch({"dispatch": task.get("dispatch") or macp_config.get("default_dispatch") or "exec"}) + task["runtime"] = _resolve_runtime({"runtime": task.get("runtime") or macp_config.get("default_runtime") or worker_config.get("runtime") or "codex"}) + if not str(task.get("worktree") or "").strip(): + task["worktree"] = str( + _default_worktree_path( + task, + repo_root, + str(macp_config.get("worktree_base") or "~/src/{repo}-worktrees"), + ) + ) + if not str(task.get("result_path") or "").strip(): + result_dir = str(macp_config.get("result_dir") or ".mosaic/orchestrator/results").strip() + if result_dir.startswith(".mosaic/orchestrator/"): + result_dir = result_dir[len(".mosaic/orchestrator/") :] + task["result_path"] = f"{result_dir.rstrip('/')}/{task.get('id', 'task')}.json" + + worktree = setup_worktree(task, repo_root) + log_path = orch_dir / "logs" / f"{task.get('id', 'task')}.log" + timeout_sec = int(task.get("timeout_seconds") or worker_config.get("timeout_seconds") or 7200) + command = build_dispatch_command(task, repo_root) + exit_code, output, timed_out = _run_command(command, worktree, log_path, timeout_sec) + task["_timed_out"] = timed_out + if timed_out: + task["error"] = f"Worker command timed out after {timeout_sec}s" + elif exit_code != 0 and not task.get("error"): + task["error"] = f"Worker command failed with exit code {exit_code}" + + collect_result(task, exit_code, [], orch_dir) + + attempts = int(task.get("attempts") or 0) + max_attempts = int(task.get("max_attempts") or 1) + if exit_code != 0 and attempts >= max_attempts and bool(macp_config.get("cleanup_worktrees", True)): + cleanup_worktree(task) + + return exit_code, output diff --git a/tools/orchestrator-matrix/protocol/event.schema.json b/tools/orchestrator-matrix/protocol/event.schema.json index 1d4d1d2..f493e42 100644 --- a/tools/orchestrator-matrix/protocol/event.schema.json +++ b/tools/orchestrator-matrix/protocol/event.schema.json @@ -22,8 +22,11 @@ "task.assigned", "task.started", "task.progress", + "task.gated", "task.completed", "task.failed", + "task.escalated", + "task.retry.scheduled", "rail.check.started", "rail.check.passed", "rail.check.failed" @@ -37,8 +40,10 @@ "enum": [ "pending", "running", + "gated", "completed", - "failed" + "failed", + "escalated" ] }, "timestamp": { @@ -50,7 +55,8 @@ "enum": [ "controller", "worker", - "quality-gate" + "quality-gate", + "dispatcher" ] }, "message": { diff --git a/tools/orchestrator-matrix/protocol/result.schema.json b/tools/orchestrator-matrix/protocol/result.schema.json new file mode 100644 index 0000000..a6a4786 --- /dev/null +++ b/tools/orchestrator-matrix/protocol/result.schema.json @@ -0,0 +1,119 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://mosaicstack.dev/schemas/orchestrator/result.schema.json", + "title": "MACP Task Result", + "type": "object", + "required": [ + "task_id", + "status", + "completed_at" + ], + "properties": { + "task_id": { + "type": "string" + }, + "status": { + "type": "string", + "enum": [ + "completed", + "failed", + "escalated" + ] + }, + "completed_at": { + "type": "string", + "format": "date-time" + }, + "failed_at": { + "type": [ + "string", + "null" + ], + "format": "date-time" + }, + "exit_code": { + "type": [ + "integer", + "null" + ] + }, + "attempt": { + "type": "integer" + }, + "max_attempts": { + "type": "integer" + }, + "runtime": { + "type": "string" + }, + "dispatch": { + "type": "string" + }, + "worktree": { + "type": [ + "string", + "null" + ] + }, + "branch": { + "type": [ + "string", + "null" + ] + }, + "pr": { + "type": [ + "string", + "null" + ] + }, + "summary": { + "type": "string", + "description": "Human-readable summary of what the worker did" + }, + "files_changed": { + "type": "array", + "items": { + "type": "string" + } + }, + "gate_results": { + "type": "array", + "items": { + "type": "object", + "properties": { + "command": { + "type": "string" + }, + "exit_code": { + "type": "integer" + }, + "type": { + "type": "string", + "enum": [ + "mechanical", + "ai-review", + "ci-pipeline" + ] + } + } + } + }, + "error": { + "type": [ + "string", + "null" + ] + }, + "escalation_reason": { + "type": [ + "string", + "null" + ] + }, + "metadata": { + "type": "object" + } + }, + "additionalProperties": true +} diff --git a/tools/orchestrator-matrix/protocol/task.schema.json b/tools/orchestrator-matrix/protocol/task.schema.json index af97a29..1600b18 100644 --- a/tools/orchestrator-matrix/protocol/task.schema.json +++ b/tools/orchestrator-matrix/protocol/task.schema.json @@ -23,14 +23,85 @@ "enum": [ "pending", "running", + "gated", "completed", - "failed" + "failed", + "escalated" ] }, + "type": { + "type": "string", + "enum": [ + "coding", + "deploy", + "research", + "review", + "documentation", + "infrastructure" + ], + "description": "Task type - determines dispatch strategy and gate requirements" + }, + "dispatch": { + "type": "string", + "enum": [ + "yolo", + "acp", + "exec" + ], + "description": "Execution backend: yolo=mosaic yolo (full system), acp=OpenClaw sessions_spawn (sandboxed), exec=direct shell" + }, "runtime": { "type": "string", "description": "Preferred worker runtime, e.g. codex, claude, opencode" }, + "worktree": { + "type": "string", + "description": "Path to git worktree for this task, e.g. ~/src/repo-worktrees/task-042" + }, + "branch": { + "type": "string", + "description": "Git branch name for this task" + }, + "brief_path": { + "type": "string", + "description": "Path to markdown task brief relative to repo root" + }, + "result_path": { + "type": "string", + "description": "Path to JSON result file relative to .mosaic/orchestrator/" + }, + "issue": { + "type": "string", + "description": "Issue reference (e.g. #42)" + }, + "pr": { + "type": [ + "string", + "null" + ], + "description": "PR number/URL once opened" + }, + "depends_on": { + "type": "array", + "items": { + "type": "string" + }, + "description": "List of task IDs this task depends on" + }, + "max_attempts": { + "type": "integer", + "minimum": 1, + "default": 1 + }, + "attempts": { + "type": "integer", + "minimum": 0, + "default": 0 + }, + "timeout_seconds": { + "type": "integer", + "description": "Override default timeout for this task" + }, "command": { "type": "string", "description": "Worker command to execute for this task"