Files
2026-02-22 17:52:23 +00:00

196 lines
6.2 KiB
Python

#!/usr/bin/env python3
"""Sync docs/TASKS.md rows into .mosaic/orchestrator/tasks.json."""
from __future__ import annotations
import argparse
import json
import os
import pathlib
from typing import Any
def load_json(path: pathlib.Path, default: Any) -> Any:
if not path.exists():
return default
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: pathlib.Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
f.write("\n")
tmp.replace(path)
def split_pipe_row(line: str) -> list[str]:
row = line.strip()
if row.startswith("|"):
row = row[1:]
if row.endswith("|"):
row = row[:-1]
return [c.strip() for c in row.split("|")]
def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]:
if not path.exists():
return []
lines = path.read_text(encoding="utf-8").splitlines()
header_idx = -1
headers: list[str] = []
for i, line in enumerate(lines):
if "|" not in line:
continue
cells = [x.lower() for x in split_pipe_row(line)]
if "id" in cells and "status" in cells and "description" in cells:
header_idx = i
headers = cells
break
if header_idx < 0:
return []
rows: list[dict[str, str]] = []
for line in lines[header_idx + 2 :]:
if not line.strip().startswith("|"):
if rows:
break
continue
cells = split_pipe_row(line)
if len(cells) < len(headers):
cells += [""] * (len(headers) - len(cells))
row = {headers[i]: cells[i] for i in range(len(headers))}
task_id = row.get("id", "").strip()
if not task_id or task_id.lower() == "id":
continue
rows.append(row)
return rows
def map_status(raw: str) -> str:
value = raw.strip().lower()
mapping = {
"not-started": "pending",
"todo": "pending",
"pending": "pending",
"in-progress": "pending",
"needs-qa": "pending",
"done": "completed",
"completed": "completed",
"failed": "failed",
}
return mapping.get(value, "pending")
def parse_depends(raw: str) -> list[str]:
return [x.strip() for x in raw.split(",") if x.strip()]
def build_task(
row: dict[str, str],
existing: dict[str, Any],
runtime_default: str,
source_path: str,
) -> dict[str, Any]:
task_id = row.get("id", "").strip()
description = row.get("description", "").strip()
issue = row.get("issue", "").strip()
repo = row.get("repo", "").strip()
branch = row.get("branch", "").strip()
depends_on = parse_depends(row.get("depends_on", ""))
task = dict(existing)
task["id"] = task_id
task["title"] = description or task_id
task["description"] = description
task["status"] = map_status(row.get("status", "pending"))
task["depends_on"] = depends_on
task["runtime"] = str(task.get("runtime") or runtime_default or "codex")
task["command"] = str(task.get("command") or "")
task["quality_gates"] = task.get("quality_gates") or []
metadata = dict(task.get("metadata") or {})
metadata.update(
{
"source": source_path,
"issue": issue,
"repo": repo,
"branch": branch,
}
)
task["metadata"] = metadata
return task
def main() -> int:
parser = argparse.ArgumentParser(description="Sync docs/TASKS.md into .mosaic/orchestrator/tasks.json")
parser.add_argument("--repo", default=os.getcwd(), help="Repository root (default: cwd)")
parser.add_argument("--docs", default="docs/TASKS.md", help="Path to tasks markdown (repo-relative)")
parser.add_argument(
"--tasks-json",
default=".mosaic/orchestrator/tasks.json",
help="Path to orchestrator tasks JSON (repo-relative)",
)
parser.add_argument("--keep-unlisted", action="store_true", help="Retain tasks already in JSON but missing from docs/TASKS.md")
parser.add_argument("--apply", action="store_true", help="Write changes (default is dry-run)")
args = parser.parse_args()
repo = pathlib.Path(args.repo).resolve()
docs_path = (repo / args.docs).resolve()
# Backward compatibility: fall back to legacy lowercase path when default path is absent.
if args.docs == "docs/TASKS.md" and not docs_path.exists():
legacy_docs_path = (repo / "docs/tasks.md").resolve()
if legacy_docs_path.exists():
docs_path = legacy_docs_path
tasks_path = (repo / args.tasks_json).resolve()
config_path = repo / ".mosaic" / "orchestrator" / "config.json"
config = load_json(config_path, {})
runtime_default = str(config.get("worker", {}).get("runtime") or "codex")
rows = parse_tasks_markdown(docs_path)
try:
source_path = str(docs_path.relative_to(repo))
except ValueError:
source_path = str(docs_path)
existing_payload = load_json(tasks_path, {"tasks": []})
existing_tasks = existing_payload.get("tasks", [])
if not isinstance(existing_tasks, list):
existing_tasks = []
existing_by_id = {str(t.get("id", "")): t for t in existing_tasks}
out_tasks: list[dict[str, Any]] = []
seen: set[str] = set()
for row in rows:
task_id = row.get("id", "").strip()
if not task_id:
continue
seen.add(task_id)
out_tasks.append(
build_task(
row,
existing_by_id.get(task_id, {}),
runtime_default,
source_path,
)
)
if args.keep_unlisted:
for task in existing_tasks:
task_id = str(task.get("id", ""))
if task_id and task_id not in seen:
out_tasks.append(task)
payload = {"tasks": out_tasks}
if args.apply:
save_json(tasks_path, payload)
print(f"[mosaic-orchestrator-sync] wrote {len(out_tasks)} tasks -> {tasks_path}")
else:
print(f"[mosaic-orchestrator-sync] dry-run: {len(out_tasks)} tasks would be written -> {tasks_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())