#!/usr/bin/env bash set -euo pipefail repo_root="$(pwd)" orch_dir="$repo_root/.mosaic/orchestrator" config_path="$orch_dir/config.json" tasks_path="$orch_dir/tasks.json" events_path="$orch_dir/events.ndjson" usage() { cat <<'USAGE' mosaic macp — manual MACP queue operations Usage: mosaic macp submit --task-id TASK-001 --title "..." [--type coding] [--dispatch yolo|acp|exec] [--runtime codex] [--brief docs/tasks/TASK-001.md] [--command "..."] [--branch feat/...] mosaic macp status [--task-id TASK-001] mosaic macp drain mosaic macp history [--task-id TASK-001] mosaic macp watch [--webhook] [--once] USAGE } require_repo() { if [[ ! -f "$config_path" ]]; then echo "[mosaic-macp] missing orchestrator config: $config_path" >&2 exit 1 fi } submit_task() { require_repo local task_id="" local title="" local task_type="coding" local dispatch="" local runtime="" local brief="" local command="" local branch="" while [[ $# -gt 0 ]]; do case "$1" in --task-id) task_id="$2"; shift 2 ;; --title) title="$2"; shift 2 ;; --type) task_type="$2"; shift 2 ;; --dispatch) dispatch="$2"; shift 2 ;; --runtime) runtime="$2"; shift 2 ;; --brief) brief="$2"; shift 2 ;; --command) command="$2"; shift 2 ;; --branch) branch="$2"; shift 2 ;; -h|--help) usage; exit 0 ;; *) echo "[mosaic-macp] unknown submit option: $1" >&2; exit 1 ;; esac done if [[ -z "$task_id" || -z "$title" ]]; then echo "[mosaic-macp] submit requires --task-id and --title" >&2 exit 1 fi python3 - "$config_path" "$tasks_path" "$task_id" "$title" "$task_type" "$dispatch" "$runtime" "$brief" "$command" "$branch" <<'PY' import json import pathlib import sys config_path = pathlib.Path(sys.argv[1]) tasks_path = pathlib.Path(sys.argv[2]) task_id, title, task_type, dispatch, runtime, brief, command, branch = sys.argv[3:] config = json.loads(config_path.read_text(encoding="utf-8")) macp = dict(config.get("macp") or {}) worker = dict(config.get("worker") or {}) payload = {"tasks": []} if tasks_path.exists(): payload = json.loads(tasks_path.read_text(encoding="utf-8")) if not isinstance(payload.get("tasks"), list): payload = {"tasks": []} resolved_dispatch = dispatch or ("yolo" if brief and not command else str(macp.get("default_dispatch") or "exec")) resolved_runtime = runtime or str(macp.get("default_runtime") or worker.get("runtime") or "codex") if resolved_dispatch == "exec" and not command: raise SystemExit("[mosaic-macp] exec dispatch requires --command") task = { "id": task_id, "title": title, "description": title, "status": "pending", "type": task_type or "coding", "dispatch": resolved_dispatch, "runtime": resolved_runtime, "branch": branch or "", "brief_path": brief or "", "command": command or "", "quality_gates": config.get("quality_gates") or [], "metadata": {"source": "bin/mosaic-macp"}, } updated = [] replaced = False for existing in payload["tasks"]: if str(existing.get("id")) == task_id: merged = dict(existing) merged.update({k: v for k, v in task.items() if v not in ("", [])}) updated.append(merged) replaced = True else: updated.append(existing) if not replaced: updated.append(task) payload["tasks"] = updated tasks_path.parent.mkdir(parents=True, exist_ok=True) tasks_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") print(f"[mosaic-macp] queued {task_id} dispatch={resolved_dispatch} runtime={resolved_runtime}") PY } status_tasks() { require_repo local task_id="" while [[ $# -gt 0 ]]; do case "$1" in --task-id) task_id="$2"; shift 2 ;; -h|--help) usage; exit 0 ;; *) echo "[mosaic-macp] unknown status option: $1" >&2; exit 1 ;; esac done python3 - "$tasks_path" "$task_id" <<'PY' import json import pathlib import sys tasks_path = pathlib.Path(sys.argv[1]) task_id = sys.argv[2] payload = {"tasks": []} if tasks_path.exists(): payload = json.loads(tasks_path.read_text(encoding="utf-8")) tasks = payload.get("tasks", []) counts = {key: 0 for key in ["pending", "running", "gated", "completed", "failed", "escalated"]} for task in tasks: status = str(task.get("status") or "pending") counts[status] = counts.get(status, 0) + 1 if task_id: for task in tasks: if str(task.get("id")) == task_id: print(json.dumps(task, indent=2)) break else: raise SystemExit(f"[mosaic-macp] task not found: {task_id}") else: print("Queue state:") for key in ["pending", "running", "gated", "completed", "failed", "escalated"]: print(f" {key}: {counts.get(key, 0)}") PY } drain_tasks() { require_repo local mosaic_home="${MOSAIC_HOME:-$HOME/.config/mosaic}" exec "$mosaic_home/bin/mosaic-orchestrator-run" --repo "$repo_root" --until-drained } history_tasks() { require_repo local task_id="" while [[ $# -gt 0 ]]; do case "$1" in --task-id) task_id="$2"; shift 2 ;; -h|--help) usage; exit 0 ;; *) echo "[mosaic-macp] unknown history option: $1" >&2; exit 1 ;; esac done python3 - "$events_path" "$task_id" <<'PY' import json import pathlib import sys events_path = pathlib.Path(sys.argv[1]) task_id = sys.argv[2] if not events_path.exists(): raise SystemExit(f"[mosaic-macp] events file not found: {events_path}") for line in events_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue event = json.loads(line) if task_id and str(event.get("task_id")) != task_id: continue print(json.dumps(event, indent=2)) PY } watch_events() { require_repo local webhook_enabled="false" local run_once="false" while [[ $# -gt 0 ]]; do case "$1" in --webhook) webhook_enabled="true"; shift ;; --once) run_once="true"; shift ;; -h|--help) usage; exit 0 ;; *) echo "[mosaic-macp] unknown watch option: $1" >&2; exit 1 ;; esac done python3 - "$repo_root" "$config_path" "$events_path" "$orch_dir/event_cursor.json" "$webhook_enabled" "$run_once" <<'PY' import json import pathlib import sys repo_root = pathlib.Path(sys.argv[1]).resolve() config_path = pathlib.Path(sys.argv[2]).resolve() events_path = pathlib.Path(sys.argv[3]).resolve() cursor_path = pathlib.Path(sys.argv[4]).resolve() webhook_flag = sys.argv[5].lower() == "true" run_once = sys.argv[6].lower() == "true" events_dir = repo_root / "tools" / "orchestrator-matrix" / "events" if str(events_dir) not in sys.path: sys.path.insert(0, str(events_dir)) from discord_formatter import format_event from event_watcher import EventWatcher from webhook_adapter import create_webhook_callback config = {} if config_path.exists(): try: config = json.loads(config_path.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError) as e: print(f"[macp] Warning: could not parse config {config_path}: {e}", file=sys.stderr) config = {} macp = dict(config.get("macp") or {}) watcher = EventWatcher( events_path=events_path, cursor_path=cursor_path, poll_interval=float(macp.get("watch_poll_interval_seconds") or 2.0), ) def print_callback(event: dict) -> None: rendered = format_event(event) if rendered: print(rendered) watcher.on([], print_callback) webhook_config = dict(macp.get("webhook") or {}) if webhook_flag and bool(webhook_config.get("enabled", False)): watcher.on(list(webhook_config.get("event_filter") or []), create_webhook_callback(config)) elif webhook_flag: print("[mosaic-macp] webhook requested but disabled in config", file=sys.stderr) if run_once: processed = watcher.poll_once() print(f"[mosaic-macp] processed_events={len(processed)}") else: watcher.run() PY } subcommand="${1:-help}" if [[ $# -gt 0 ]]; then shift fi case "$subcommand" in submit) submit_task "$@" ;; status) status_tasks "$@" ;; drain) drain_tasks "$@" ;; history) history_tasks "$@" ;; watch) watch_events "$@" ;; help|-h|--help|"") usage ;; *) echo "[mosaic-macp] unknown subcommand: $subcommand" >&2 usage exit 1 ;; esac