Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
347 lines
12 KiB
Python
Executable File
347 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Deterministic orchestrator controller for Mosaic task delegation."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import datetime as dt
|
|
import json
|
|
import os
|
|
import pathlib
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import uuid
|
|
from typing import Any
|
|
|
|
|
|
def now_iso() -> str:
|
|
return dt.datetime.now(dt.timezone.utc).isoformat()
|
|
|
|
|
|
def load_json(path: pathlib.Path, default: Any) -> Any:
|
|
if not path.exists():
|
|
return default
|
|
with path.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_json(path: pathlib.Path, data: Any) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
|
with tmp.open("w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2)
|
|
f.write("\n")
|
|
tmp.replace(path)
|
|
|
|
|
|
def append_event(events_path: pathlib.Path, event: dict[str, Any]) -> None:
|
|
events_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with events_path.open("a", encoding="utf-8") as f:
|
|
f.write(json.dumps(event, ensure_ascii=True) + "\n")
|
|
|
|
|
|
def emit_event(
|
|
events_path: pathlib.Path,
|
|
event_type: str,
|
|
task_id: str,
|
|
status: str,
|
|
source: str,
|
|
message: str,
|
|
metadata: dict[str, Any] | None = None,
|
|
) -> None:
|
|
append_event(
|
|
events_path,
|
|
{
|
|
"event_id": str(uuid.uuid4()),
|
|
"event_type": event_type,
|
|
"task_id": task_id,
|
|
"status": status,
|
|
"timestamp": now_iso(),
|
|
"source": source,
|
|
"message": message,
|
|
"metadata": metadata or {},
|
|
},
|
|
)
|
|
|
|
|
|
def run_shell(command: str, cwd: pathlib.Path, log_path: pathlib.Path, timeout_sec: int) -> tuple[int, str, bool]:
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with log_path.open("a", encoding="utf-8") as log:
|
|
log.write(f"\n[{now_iso()}] COMMAND: {command}\n")
|
|
log.flush()
|
|
proc = subprocess.Popen(
|
|
["bash", "-lc", command],
|
|
cwd=str(cwd),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
encoding="utf-8",
|
|
)
|
|
timed_out = False
|
|
try:
|
|
output, _ = proc.communicate(timeout=max(1, timeout_sec))
|
|
code = proc.returncode
|
|
except subprocess.TimeoutExpired:
|
|
timed_out = True
|
|
proc.kill()
|
|
output, _ = proc.communicate()
|
|
code = 124
|
|
log.write(f"[{now_iso()}] TIMEOUT: exceeded {timeout_sec}s\n")
|
|
|
|
if output:
|
|
log.write(output)
|
|
log.write(f"[{now_iso()}] EXIT: {code}\n")
|
|
return code, output or "", timed_out
|
|
|
|
|
|
def render_command_template(template: str, task: dict[str, Any], task_file: pathlib.Path) -> str:
|
|
return (
|
|
template.replace("{task_id}", str(task.get("id", "")))
|
|
.replace("{task_title}", str(task.get("title", "")))
|
|
.replace("{task_file}", str(task_file))
|
|
)
|
|
|
|
|
|
def parse_dep_list(raw: Any) -> list[str]:
|
|
if isinstance(raw, list):
|
|
return [str(x).strip() for x in raw if str(x).strip()]
|
|
if isinstance(raw, str):
|
|
return [x.strip() for x in raw.split(",") if x.strip()]
|
|
return []
|
|
|
|
|
|
def is_completed_status(status: str) -> bool:
|
|
return status in {"completed", "done"}
|
|
|
|
|
|
def pick_next_task(tasks: list[dict[str, Any]]) -> dict[str, Any] | None:
|
|
status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in tasks}
|
|
for task in tasks:
|
|
if task.get("status", "pending") != "pending":
|
|
continue
|
|
deps = parse_dep_list(task.get("depends_on"))
|
|
deps_ready = all(is_completed_status(status_by_id.get(dep, "")) for dep in deps)
|
|
if deps_ready:
|
|
return task
|
|
return None
|
|
|
|
|
|
def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dict[str, Any]) -> bool:
|
|
tasks_path = orch_dir / "tasks.json"
|
|
state_path = orch_dir / "state.json"
|
|
events_path = orch_dir / "events.ndjson"
|
|
logs_dir = orch_dir / "logs"
|
|
results_dir = orch_dir / "results"
|
|
|
|
tasks = load_json(tasks_path, {"tasks": []})
|
|
task_items = tasks.get("tasks", [])
|
|
if not isinstance(task_items, list):
|
|
raise ValueError("tasks.json must contain {'tasks': [...]} structure")
|
|
|
|
task = pick_next_task(task_items)
|
|
if not task:
|
|
return False
|
|
|
|
task_id = str(task.get("id", "unknown-task"))
|
|
max_attempts = int(task.get("max_attempts") or config.get("worker", {}).get("max_attempts") or 1)
|
|
attempt = int(task.get("attempts", 0)) + 1
|
|
task["attempts"] = attempt
|
|
task["max_attempts"] = max_attempts
|
|
task["status"] = "running"
|
|
task["started_at"] = now_iso()
|
|
save_json(tasks_path, {"tasks": task_items})
|
|
|
|
state = load_json(state_path, {"running_task_id": None, "updated_at": None})
|
|
state["running_task_id"] = task_id
|
|
state["updated_at"] = now_iso()
|
|
save_json(state_path, state)
|
|
|
|
emit_event(events_path, "task.assigned", task_id, "running", "controller", "Task assigned")
|
|
emit_event(events_path, "task.started", task_id, "running", "worker", "Worker execution started")
|
|
|
|
log_path = logs_dir / f"{task_id}.log"
|
|
task_file = orch_dir / f"task-{task_id}.json"
|
|
save_json(task_file, task)
|
|
|
|
cmd = str(task.get("command", "")).strip()
|
|
if not cmd:
|
|
template = str(config.get("worker", {}).get("command_template", "")).strip()
|
|
if template:
|
|
cmd = render_command_template(template, task, task_file)
|
|
|
|
if not cmd:
|
|
task["status"] = "failed"
|
|
task["failed_at"] = now_iso()
|
|
task["error"] = "No task command or worker command_template configured."
|
|
save_json(tasks_path, {"tasks": task_items})
|
|
emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"])
|
|
state["running_task_id"] = None
|
|
state["updated_at"] = now_iso()
|
|
save_json(state_path, state)
|
|
return True
|
|
|
|
timeout_sec = int(task.get("timeout_seconds") or config.get("worker", {}).get("timeout_seconds") or 7200)
|
|
rc, _, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec)
|
|
if rc != 0:
|
|
task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}"
|
|
if attempt < max_attempts:
|
|
task["status"] = "pending"
|
|
task["last_failed_at"] = now_iso()
|
|
emit_event(
|
|
events_path,
|
|
"task.retry.scheduled",
|
|
task_id,
|
|
"pending",
|
|
"worker",
|
|
f"{task['error']}; retry {attempt + 1}/{max_attempts}",
|
|
)
|
|
else:
|
|
task["status"] = "failed"
|
|
task["failed_at"] = now_iso()
|
|
emit_event(events_path, "task.failed", task_id, "failed", "worker", task["error"])
|
|
save_json(tasks_path, {"tasks": task_items})
|
|
state["running_task_id"] = None
|
|
state["updated_at"] = now_iso()
|
|
save_json(state_path, state)
|
|
save_json(
|
|
results_dir / f"{task_id}.json",
|
|
{"task_id": task_id, "status": task["status"], "exit_code": rc, "attempt": attempt, "max_attempts": max_attempts},
|
|
)
|
|
return True
|
|
|
|
gates = task.get("quality_gates") or config.get("quality_gates") or []
|
|
all_passed = True
|
|
gate_results: list[dict[str, Any]] = []
|
|
for gate in gates:
|
|
gate_cmd = str(gate).strip()
|
|
if not gate_cmd:
|
|
continue
|
|
emit_event(events_path, "rail.check.started", task_id, "running", "quality-gate", f"Running gate: {gate_cmd}")
|
|
gate_rc, _, gate_timed_out = run_shell(gate_cmd, repo_root, log_path, timeout_sec)
|
|
if gate_rc == 0:
|
|
emit_event(events_path, "rail.check.passed", task_id, "running", "quality-gate", f"Gate passed: {gate_cmd}")
|
|
else:
|
|
all_passed = False
|
|
emit_event(
|
|
events_path,
|
|
"rail.check.failed",
|
|
task_id,
|
|
"failed",
|
|
"quality-gate",
|
|
f"Gate timed out after {timeout_sec}s: {gate_cmd}" if gate_timed_out else f"Gate failed ({gate_rc}): {gate_cmd}",
|
|
)
|
|
gate_results.append({"command": gate_cmd, "exit_code": gate_rc})
|
|
|
|
if all_passed:
|
|
task["status"] = "completed"
|
|
task["completed_at"] = now_iso()
|
|
emit_event(events_path, "task.completed", task_id, "completed", "controller", "Task completed")
|
|
else:
|
|
task["error"] = "One or more quality gates failed"
|
|
if attempt < max_attempts:
|
|
task["status"] = "pending"
|
|
task["last_failed_at"] = now_iso()
|
|
emit_event(
|
|
events_path,
|
|
"task.retry.scheduled",
|
|
task_id,
|
|
"pending",
|
|
"controller",
|
|
f"{task['error']}; retry {attempt + 1}/{max_attempts}",
|
|
)
|
|
else:
|
|
task["status"] = "failed"
|
|
task["failed_at"] = now_iso()
|
|
emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"])
|
|
|
|
save_json(tasks_path, {"tasks": task_items})
|
|
state["running_task_id"] = None
|
|
state["updated_at"] = now_iso()
|
|
save_json(state_path, state)
|
|
save_json(
|
|
results_dir / f"{task_id}.json",
|
|
{
|
|
"task_id": task_id,
|
|
"status": task["status"],
|
|
"completed_at": task.get("completed_at"),
|
|
"failed_at": task.get("failed_at"),
|
|
"gate_results": gate_results,
|
|
},
|
|
)
|
|
return True
|
|
|
|
|
|
def queue_state(orch_dir: pathlib.Path) -> dict[str, int]:
|
|
tasks = load_json(orch_dir / "tasks.json", {"tasks": []})
|
|
task_items = tasks.get("tasks", [])
|
|
if not isinstance(task_items, list):
|
|
return {"pending": 0, "running": 0, "runnable": 0}
|
|
|
|
pending = 0
|
|
running = 0
|
|
runnable = 0
|
|
status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in task_items}
|
|
for task in task_items:
|
|
status = str(task.get("status", "pending"))
|
|
if status == "pending":
|
|
pending += 1
|
|
deps = parse_dep_list(task.get("depends_on"))
|
|
if all(is_completed_status(status_by_id.get(dep, "")) for dep in deps):
|
|
runnable += 1
|
|
if status == "running":
|
|
running += 1
|
|
return {"pending": pending, "running": running, "runnable": runnable}
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Mosaic deterministic orchestrator controller")
|
|
parser.add_argument("--repo", default=os.getcwd(), help="Repository root (default: cwd)")
|
|
parser.add_argument("--once", action="store_true", help="Process at most one pending task and exit")
|
|
parser.add_argument("--until-drained", action="store_true", help="Run until no pending tasks remain (or blocked)")
|
|
parser.add_argument("--poll-sec", type=int, default=10, help="Polling interval for continuous mode")
|
|
args = parser.parse_args()
|
|
|
|
repo_root = pathlib.Path(args.repo).resolve()
|
|
orch_dir = repo_root / ".mosaic" / "orchestrator"
|
|
config_path = orch_dir / "config.json"
|
|
if not config_path.exists():
|
|
print(f"[mosaic-orchestrator] missing config: {config_path}", file=sys.stderr)
|
|
return 1
|
|
|
|
config = load_json(config_path, {})
|
|
if not config.get("enabled", False):
|
|
print("[mosaic-orchestrator] disabled in .mosaic/orchestrator/config.json (enabled=false)")
|
|
return 0
|
|
|
|
if args.once:
|
|
processed = run_single_task(repo_root, orch_dir, config)
|
|
if not processed:
|
|
print("[mosaic-orchestrator] no pending tasks")
|
|
return 0
|
|
|
|
print(f"[mosaic-orchestrator] loop start repo={repo_root} poll={args.poll_sec}s")
|
|
while True:
|
|
try:
|
|
processed = run_single_task(repo_root, orch_dir, config)
|
|
if not processed:
|
|
qs = queue_state(orch_dir)
|
|
if args.until_drained:
|
|
if qs["pending"] == 0 and qs["running"] == 0:
|
|
print("[mosaic-orchestrator] drained: no pending tasks")
|
|
return 0
|
|
if qs["pending"] > 0 and qs["runnable"] == 0 and qs["running"] == 0:
|
|
print("[mosaic-orchestrator] blocked: pending tasks remain but dependencies are unmet", file=sys.stderr)
|
|
return 2
|
|
time.sleep(max(1, args.poll_sec))
|
|
except KeyboardInterrupt:
|
|
print("\n[mosaic-orchestrator] stopping")
|
|
return 0
|
|
except Exception as exc: # pragma: no cover
|
|
print(f"[mosaic-orchestrator] error: {exc}", file=sys.stderr)
|
|
time.sleep(max(1, args.poll_sec))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|