#!/usr/bin/env python3 """Sync docs/TASKS.md rows into .mosaic/orchestrator/tasks.json.""" from __future__ import annotations import argparse import json import os import pathlib from typing import Any def load_json(path: pathlib.Path, default: Any) -> Any: if not path.exists(): return default with path.open("r", encoding="utf-8") as f: return json.load(f) def save_json(path: pathlib.Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with tmp.open("w", encoding="utf-8") as f: json.dump(data, f, indent=2) f.write("\n") tmp.replace(path) def split_pipe_row(line: str) -> list[str]: row = line.strip() if row.startswith("|"): row = row[1:] if row.endswith("|"): row = row[:-1] return [c.strip() for c in row.split("|")] def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]: if not path.exists(): return [] lines = path.read_text(encoding="utf-8").splitlines() header_idx = -1 headers: list[str] = [] for i, line in enumerate(lines): if "|" not in line: continue cells = [x.lower() for x in split_pipe_row(line)] if "id" in cells and "status" in cells and "description" in cells: header_idx = i headers = cells break if header_idx < 0: return [] rows: list[dict[str, str]] = [] for line in lines[header_idx + 2 :]: if not line.strip().startswith("|"): if rows: break continue cells = split_pipe_row(line) if len(cells) < len(headers): cells += [""] * (len(headers) - len(cells)) row = {headers[i]: cells[i] for i in range(len(headers))} task_id = row.get("id", "").strip() if not task_id or task_id.lower() == "id": continue rows.append(row) return rows def map_status(raw: str) -> str: value = raw.strip().lower() mapping = { "not-started": "pending", "todo": "pending", "pending": "pending", "in-progress": "pending", "needs-qa": "pending", "done": "completed", "completed": "completed", "failed": "failed", } return mapping.get(value, "pending") def parse_depends(raw: str) -> list[str]: return [x.strip() for x in raw.split(",") if x.strip()] def build_task( row: dict[str, str], existing: dict[str, Any], runtime_default: str, source_path: str, ) -> dict[str, Any]: task_id = row.get("id", "").strip() description = row.get("description", "").strip() issue = row.get("issue", "").strip() repo = row.get("repo", "").strip() branch = row.get("branch", "").strip() depends_on = parse_depends(row.get("depends_on", "")) task = dict(existing) task["id"] = task_id task["title"] = description or task_id task["description"] = description task["status"] = map_status(row.get("status", "pending")) task["depends_on"] = depends_on task["runtime"] = str(task.get("runtime") or runtime_default or "codex") task["command"] = str(task.get("command") or "") task["quality_gates"] = task.get("quality_gates") or [] metadata = dict(task.get("metadata") or {}) metadata.update( { "source": source_path, "issue": issue, "repo": repo, "branch": branch, } ) task["metadata"] = metadata return task def main() -> int: parser = argparse.ArgumentParser(description="Sync docs/TASKS.md into .mosaic/orchestrator/tasks.json") parser.add_argument("--repo", default=os.getcwd(), help="Repository root (default: cwd)") parser.add_argument("--docs", default="docs/TASKS.md", help="Path to tasks markdown (repo-relative)") parser.add_argument( "--tasks-json", default=".mosaic/orchestrator/tasks.json", help="Path to orchestrator tasks JSON (repo-relative)", ) parser.add_argument("--keep-unlisted", action="store_true", help="Retain tasks already in JSON but missing from docs/TASKS.md") parser.add_argument("--apply", action="store_true", help="Write changes (default is dry-run)") args = parser.parse_args() repo = pathlib.Path(args.repo).resolve() docs_path = (repo / args.docs).resolve() # Backward compatibility: fall back to legacy lowercase path when default path is absent. if args.docs == "docs/TASKS.md" and not docs_path.exists(): legacy_docs_path = (repo / "docs/tasks.md").resolve() if legacy_docs_path.exists(): docs_path = legacy_docs_path tasks_path = (repo / args.tasks_json).resolve() config_path = repo / ".mosaic" / "orchestrator" / "config.json" config = load_json(config_path, {}) runtime_default = str(config.get("worker", {}).get("runtime") or "codex") rows = parse_tasks_markdown(docs_path) try: source_path = str(docs_path.relative_to(repo)) except ValueError: source_path = str(docs_path) existing_payload = load_json(tasks_path, {"tasks": []}) existing_tasks = existing_payload.get("tasks", []) if not isinstance(existing_tasks, list): existing_tasks = [] existing_by_id = {str(t.get("id", "")): t for t in existing_tasks} out_tasks: list[dict[str, Any]] = [] seen: set[str] = set() for row in rows: task_id = row.get("id", "").strip() if not task_id: continue seen.add(task_id) out_tasks.append( build_task( row, existing_by_id.get(task_id, {}), runtime_default, source_path, ) ) if args.keep_unlisted: for task in existing_tasks: task_id = str(task.get("id", "")) if task_id and task_id not in seen: out_tasks.append(task) payload = {"tasks": out_tasks} if args.apply: save_json(tasks_path, payload) print(f"[mosaic-orchestrator-sync] wrote {len(out_tasks)} tasks -> {tasks_path}") else: print(f"[mosaic-orchestrator-sync] dry-run: {len(out_tasks)} tasks would be written -> {tasks_path}") return 0 if __name__ == "__main__": raise SystemExit(main())