#!/usr/bin/env python3 """Deterministic orchestrator controller for Mosaic task delegation.""" from __future__ import annotations import argparse import datetime as dt import json import os import pathlib import subprocess import sys import time import uuid from typing import Any def now_iso() -> str: return dt.datetime.now(dt.timezone.utc).isoformat() def load_json(path: pathlib.Path, default: Any) -> Any: if not path.exists(): return default with path.open("r", encoding="utf-8") as f: return json.load(f) def save_json(path: pathlib.Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with tmp.open("w", encoding="utf-8") as f: json.dump(data, f, indent=2) f.write("\n") tmp.replace(path) def append_event(events_path: pathlib.Path, event: dict[str, Any]) -> None: events_path.parent.mkdir(parents=True, exist_ok=True) with events_path.open("a", encoding="utf-8") as f: f.write(json.dumps(event, ensure_ascii=True) + "\n") def emit_event( events_path: pathlib.Path, event_type: str, task_id: str, status: str, source: str, message: str, metadata: dict[str, Any] | None = None, ) -> None: append_event( events_path, { "event_id": str(uuid.uuid4()), "event_type": event_type, "task_id": task_id, "status": status, "timestamp": now_iso(), "source": source, "message": message, "metadata": metadata or {}, }, ) def run_shell(command: str, cwd: pathlib.Path, log_path: pathlib.Path, timeout_sec: int) -> tuple[int, str, bool]: log_path.parent.mkdir(parents=True, exist_ok=True) with log_path.open("a", encoding="utf-8") as log: log.write(f"\n[{now_iso()}] COMMAND: {command}\n") log.flush() proc = subprocess.Popen( ["bash", "-lc", command], cwd=str(cwd), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", ) timed_out = False try: output, _ = proc.communicate(timeout=max(1, timeout_sec)) code = proc.returncode except subprocess.TimeoutExpired: timed_out = True proc.kill() output, _ = proc.communicate() code = 124 log.write(f"[{now_iso()}] TIMEOUT: exceeded {timeout_sec}s\n") if output: log.write(output) log.write(f"[{now_iso()}] EXIT: {code}\n") return code, output or "", timed_out def render_command_template(template: str, task: dict[str, Any], task_file: pathlib.Path) -> str: return ( template.replace("{task_id}", str(task.get("id", ""))) .replace("{task_title}", str(task.get("title", ""))) .replace("{task_file}", str(task_file)) ) def parse_dep_list(raw: Any) -> list[str]: if isinstance(raw, list): return [str(x).strip() for x in raw if str(x).strip()] if isinstance(raw, str): return [x.strip() for x in raw.split(",") if x.strip()] return [] def is_completed_status(status: str) -> bool: return status in {"completed", "done"} def pick_next_task(tasks: list[dict[str, Any]]) -> dict[str, Any] | None: status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in tasks} for task in tasks: if task.get("status", "pending") != "pending": continue deps = parse_dep_list(task.get("depends_on")) deps_ready = all(is_completed_status(status_by_id.get(dep, "")) for dep in deps) if deps_ready: return task return None def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dict[str, Any]) -> bool: tasks_path = orch_dir / "tasks.json" state_path = orch_dir / "state.json" events_path = orch_dir / "events.ndjson" logs_dir = orch_dir / "logs" results_dir = orch_dir / "results" tasks = load_json(tasks_path, {"tasks": []}) task_items = tasks.get("tasks", []) if not isinstance(task_items, list): raise ValueError("tasks.json must contain {'tasks': [...]} structure") task = pick_next_task(task_items) if not task: return False task_id = str(task.get("id", "unknown-task")) max_attempts = int(task.get("max_attempts") or config.get("worker", {}).get("max_attempts") or 1) attempt = int(task.get("attempts", 0)) + 1 task["attempts"] = attempt task["max_attempts"] = max_attempts task["status"] = "running" task["started_at"] = now_iso() save_json(tasks_path, {"tasks": task_items}) state = load_json(state_path, {"running_task_id": None, "updated_at": None}) state["running_task_id"] = task_id state["updated_at"] = now_iso() save_json(state_path, state) emit_event(events_path, "task.assigned", task_id, "running", "controller", "Task assigned") emit_event(events_path, "task.started", task_id, "running", "worker", "Worker execution started") log_path = logs_dir / f"{task_id}.log" task_file = orch_dir / f"task-{task_id}.json" save_json(task_file, task) cmd = str(task.get("command", "")).strip() if not cmd: template = str(config.get("worker", {}).get("command_template", "")).strip() if template: cmd = render_command_template(template, task, task_file) if not cmd: task["status"] = "failed" task["failed_at"] = now_iso() task["error"] = "No task command or worker command_template configured." save_json(tasks_path, {"tasks": task_items}) emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) state["running_task_id"] = None state["updated_at"] = now_iso() save_json(state_path, state) return True timeout_sec = int(task.get("timeout_seconds") or config.get("worker", {}).get("timeout_seconds") or 7200) rc, _, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec) if rc != 0: task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}" if attempt < max_attempts: task["status"] = "pending" task["last_failed_at"] = now_iso() emit_event( events_path, "task.retry.scheduled", task_id, "pending", "worker", f"{task['error']}; retry {attempt + 1}/{max_attempts}", ) else: task["status"] = "failed" task["failed_at"] = now_iso() emit_event(events_path, "task.failed", task_id, "failed", "worker", task["error"]) save_json(tasks_path, {"tasks": task_items}) state["running_task_id"] = None state["updated_at"] = now_iso() save_json(state_path, state) save_json( results_dir / f"{task_id}.json", {"task_id": task_id, "status": task["status"], "exit_code": rc, "attempt": attempt, "max_attempts": max_attempts}, ) return True gates = task.get("quality_gates") or config.get("quality_gates") or [] all_passed = True gate_results: list[dict[str, Any]] = [] for gate in gates: gate_cmd = str(gate).strip() if not gate_cmd: continue emit_event(events_path, "rail.check.started", task_id, "running", "quality-gate", f"Running gate: {gate_cmd}") gate_rc, _, gate_timed_out = run_shell(gate_cmd, repo_root, log_path, timeout_sec) if gate_rc == 0: emit_event(events_path, "rail.check.passed", task_id, "running", "quality-gate", f"Gate passed: {gate_cmd}") else: all_passed = False emit_event( events_path, "rail.check.failed", task_id, "failed", "quality-gate", f"Gate timed out after {timeout_sec}s: {gate_cmd}" if gate_timed_out else f"Gate failed ({gate_rc}): {gate_cmd}", ) gate_results.append({"command": gate_cmd, "exit_code": gate_rc}) if all_passed: task["status"] = "completed" task["completed_at"] = now_iso() emit_event(events_path, "task.completed", task_id, "completed", "controller", "Task completed") else: task["error"] = "One or more quality gates failed" if attempt < max_attempts: task["status"] = "pending" task["last_failed_at"] = now_iso() emit_event( events_path, "task.retry.scheduled", task_id, "pending", "controller", f"{task['error']}; retry {attempt + 1}/{max_attempts}", ) else: task["status"] = "failed" task["failed_at"] = now_iso() emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"]) save_json(tasks_path, {"tasks": task_items}) state["running_task_id"] = None state["updated_at"] = now_iso() save_json(state_path, state) save_json( results_dir / f"{task_id}.json", { "task_id": task_id, "status": task["status"], "completed_at": task.get("completed_at"), "failed_at": task.get("failed_at"), "gate_results": gate_results, }, ) return True def queue_state(orch_dir: pathlib.Path) -> dict[str, int]: tasks = load_json(orch_dir / "tasks.json", {"tasks": []}) task_items = tasks.get("tasks", []) if not isinstance(task_items, list): return {"pending": 0, "running": 0, "runnable": 0} pending = 0 running = 0 runnable = 0 status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in task_items} for task in task_items: status = str(task.get("status", "pending")) if status == "pending": pending += 1 deps = parse_dep_list(task.get("depends_on")) if all(is_completed_status(status_by_id.get(dep, "")) for dep in deps): runnable += 1 if status == "running": running += 1 return {"pending": pending, "running": running, "runnable": runnable} def main() -> int: parser = argparse.ArgumentParser(description="Mosaic deterministic orchestrator controller") parser.add_argument("--repo", default=os.getcwd(), help="Repository root (default: cwd)") parser.add_argument("--once", action="store_true", help="Process at most one pending task and exit") parser.add_argument("--until-drained", action="store_true", help="Run until no pending tasks remain (or blocked)") parser.add_argument("--poll-sec", type=int, default=10, help="Polling interval for continuous mode") args = parser.parse_args() repo_root = pathlib.Path(args.repo).resolve() orch_dir = repo_root / ".mosaic" / "orchestrator" config_path = orch_dir / "config.json" if not config_path.exists(): print(f"[mosaic-orchestrator] missing config: {config_path}", file=sys.stderr) return 1 config = load_json(config_path, {}) if not config.get("enabled", False): print("[mosaic-orchestrator] disabled in .mosaic/orchestrator/config.json (enabled=false)") return 0 if args.once: processed = run_single_task(repo_root, orch_dir, config) if not processed: print("[mosaic-orchestrator] no pending tasks") return 0 print(f"[mosaic-orchestrator] loop start repo={repo_root} poll={args.poll_sec}s") while True: try: processed = run_single_task(repo_root, orch_dir, config) if not processed: qs = queue_state(orch_dir) if args.until_drained: if qs["pending"] == 0 and qs["running"] == 0: print("[mosaic-orchestrator] drained: no pending tasks") return 0 if qs["pending"] > 0 and qs["runnable"] == 0 and qs["running"] == 0: print("[mosaic-orchestrator] blocked: pending tasks remain but dependencies are unmet", file=sys.stderr) return 2 time.sleep(max(1, args.poll_sec)) except KeyboardInterrupt: print("\n[mosaic-orchestrator] stopping") return 0 except Exception as exc: # pragma: no cover print(f"[mosaic-orchestrator] error: {exc}", file=sys.stderr) time.sleep(max(1, args.poll_sec)) if __name__ == "__main__": raise SystemExit(main())