feat: rename rails/ to tools/ and add service tool suites
Rename the `rails/` directory to `tools/` for agent discoverability — agents frequently failed to locate helper scripts due to the non-intuitive directory name. Add backward-compat symlink `rails/ → tools/`. New tool suites: - Authentik: auth-token, user-list, user-create, group-list, app-list, flow-list, admin-status (8 scripts) - Coolify: team-list, project-list, service-list, service-status, deploy, env-set (7 scripts) - Woodpecker: pipeline-list, pipeline-status, pipeline-trigger (3 stubs) - GLPI: session-init, computer-list, ticket-list, ticket-create, user-list (6 scripts) - Health: stack-health.sh — stack-wide connectivity check Infrastructure: - Shared credential loader at tools/_lib/credentials.sh - install.sh creates symlink + chmod on tool scripts - All ~253 rails/ path references updated across 68+ files Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
85
tools/orchestrator-matrix/README.md
Normal file
85
tools/orchestrator-matrix/README.md
Normal file
@@ -0,0 +1,85 @@
|
||||
# Mosaic Matrix Orchestrator Rail
|
||||
|
||||
Runtime-agnostic orchestration rail for delegating work to worker agents and enforcing
|
||||
mechanical quality gates.
|
||||
|
||||
## Purpose
|
||||
|
||||
- Decouple orchestration from any single agent runtime feature set
|
||||
- Persist state in repo-local `.mosaic/orchestrator/` files
|
||||
- Emit structured events for Matrix transport and audit trails
|
||||
- Enforce rails before marking tasks complete
|
||||
|
||||
## Components
|
||||
|
||||
- `protocol/` - JSON schemas for task/event payloads
|
||||
- `controller/mosaic_orchestrator.py` - deterministic controller loop
|
||||
- `adapters/` - runtime adapter guidance
|
||||
|
||||
## Repo Contract
|
||||
|
||||
The controller expects this layout in each bootstrapped repo:
|
||||
|
||||
```text
|
||||
.mosaic/orchestrator/
|
||||
config.json
|
||||
tasks.json
|
||||
state.json
|
||||
events.ndjson
|
||||
logs/
|
||||
results/
|
||||
```
|
||||
|
||||
## Quick Start
|
||||
|
||||
From a bootstrapped repo:
|
||||
|
||||
```bash
|
||||
~/.config/mosaic/bin/mosaic-orchestrator-matrix-cycle
|
||||
~/.config/mosaic/bin/mosaic-orchestrator-run --once
|
||||
~/.config/mosaic/bin/mosaic-orchestrator-drain
|
||||
```
|
||||
|
||||
Continuous loop:
|
||||
|
||||
```bash
|
||||
~/.config/mosaic/bin/mosaic-orchestrator-run --poll-sec 10
|
||||
```
|
||||
|
||||
Sync from `docs/TASKS.md` to queue:
|
||||
|
||||
```bash
|
||||
~/.config/mosaic/bin/mosaic-orchestrator-sync-tasks --apply
|
||||
```
|
||||
|
||||
Set worker command when needed:
|
||||
|
||||
```bash
|
||||
export MOSAIC_WORKER_EXEC="codex -p"
|
||||
# or
|
||||
export MOSAIC_WORKER_EXEC="opencode -p"
|
||||
```
|
||||
|
||||
Publish new orchestrator events to Matrix:
|
||||
|
||||
```bash
|
||||
~/.config/mosaic/bin/mosaic-orchestrator-matrix-publish
|
||||
```
|
||||
|
||||
Consume Matrix task messages into `tasks.json`:
|
||||
|
||||
```bash
|
||||
~/.config/mosaic/bin/mosaic-orchestrator-matrix-consume
|
||||
```
|
||||
|
||||
## Matrix Note
|
||||
|
||||
This rail writes canonical events to `.mosaic/orchestrator/events.ndjson`.
|
||||
The Matrix transport bridge publishes those events into the configured control room
|
||||
and can consume task commands from that room.
|
||||
|
||||
Task injection message format (room text):
|
||||
|
||||
```text
|
||||
!mosaic-task {"id":"TASK-123","title":"Fix bug","command":"echo run","quality_gates":["pnpm lint"]}
|
||||
```
|
||||
52
tools/orchestrator-matrix/adapters/README.md
Normal file
52
tools/orchestrator-matrix/adapters/README.md
Normal file
@@ -0,0 +1,52 @@
|
||||
# Adapter Contract
|
||||
|
||||
Runtime adapters translate task commands into concrete worker invocations.
|
||||
|
||||
## Minimal Contract
|
||||
|
||||
Each task should define either:
|
||||
|
||||
1. `command` directly in `tasks.json`, or
|
||||
2. controller-level `worker.command_template` in `.mosaic/orchestrator/config.json`
|
||||
|
||||
`command_template` may use:
|
||||
|
||||
- `{task_id}`
|
||||
- `{task_title}`
|
||||
- `{task_file}`
|
||||
|
||||
## Examples
|
||||
|
||||
Codex:
|
||||
|
||||
```json
|
||||
{
|
||||
"worker": {
|
||||
"command_template": "codex \"run task {task_id}: {task_title}\""
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Claude:
|
||||
|
||||
```json
|
||||
{
|
||||
"worker": {
|
||||
"command_template": "claude -p \"Execute task {task_id}: {task_title}\""
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
OpenCode:
|
||||
|
||||
```json
|
||||
{
|
||||
"worker": {
|
||||
"command_template": "opencode \"execute task {task_id}: {task_title}\""
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Recommendation
|
||||
|
||||
Prefer explicit per-task `command` for deterministic execution and auditability.
|
||||
2
tools/orchestrator-matrix/controller/.gitignore
vendored
Normal file
2
tools/orchestrator-matrix/controller/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
346
tools/orchestrator-matrix/controller/mosaic_orchestrator.py
Executable file
346
tools/orchestrator-matrix/controller/mosaic_orchestrator.py
Executable file
@@ -0,0 +1,346 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Deterministic orchestrator controller for Mosaic task delegation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import datetime as dt
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
|
||||
def now_iso() -> str:
|
||||
return dt.datetime.now(dt.timezone.utc).isoformat()
|
||||
|
||||
|
||||
def load_json(path: pathlib.Path, default: Any) -> Any:
|
||||
if not path.exists():
|
||||
return default
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def save_json(path: pathlib.Path, data: Any) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||||
with tmp.open("w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
f.write("\n")
|
||||
tmp.replace(path)
|
||||
|
||||
|
||||
def append_event(events_path: pathlib.Path, event: dict[str, Any]) -> None:
|
||||
events_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with events_path.open("a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(event, ensure_ascii=True) + "\n")
|
||||
|
||||
|
||||
def emit_event(
|
||||
events_path: pathlib.Path,
|
||||
event_type: str,
|
||||
task_id: str,
|
||||
status: str,
|
||||
source: str,
|
||||
message: str,
|
||||
metadata: dict[str, Any] | None = None,
|
||||
) -> None:
|
||||
append_event(
|
||||
events_path,
|
||||
{
|
||||
"event_id": str(uuid.uuid4()),
|
||||
"event_type": event_type,
|
||||
"task_id": task_id,
|
||||
"status": status,
|
||||
"timestamp": now_iso(),
|
||||
"source": source,
|
||||
"message": message,
|
||||
"metadata": metadata or {},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def run_shell(command: str, cwd: pathlib.Path, log_path: pathlib.Path, timeout_sec: int) -> tuple[int, str, bool]:
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with log_path.open("a", encoding="utf-8") as log:
|
||||
log.write(f"\n[{now_iso()}] COMMAND: {command}\n")
|
||||
log.flush()
|
||||
proc = subprocess.Popen(
|
||||
["bash", "-lc", command],
|
||||
cwd=str(cwd),
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
)
|
||||
timed_out = False
|
||||
try:
|
||||
output, _ = proc.communicate(timeout=max(1, timeout_sec))
|
||||
code = proc.returncode
|
||||
except subprocess.TimeoutExpired:
|
||||
timed_out = True
|
||||
proc.kill()
|
||||
output, _ = proc.communicate()
|
||||
code = 124
|
||||
log.write(f"[{now_iso()}] TIMEOUT: exceeded {timeout_sec}s\n")
|
||||
|
||||
if output:
|
||||
log.write(output)
|
||||
log.write(f"[{now_iso()}] EXIT: {code}\n")
|
||||
return code, output or "", timed_out
|
||||
|
||||
|
||||
def render_command_template(template: str, task: dict[str, Any], task_file: pathlib.Path) -> str:
|
||||
return (
|
||||
template.replace("{task_id}", str(task.get("id", "")))
|
||||
.replace("{task_title}", str(task.get("title", "")))
|
||||
.replace("{task_file}", str(task_file))
|
||||
)
|
||||
|
||||
|
||||
def parse_dep_list(raw: Any) -> list[str]:
|
||||
if isinstance(raw, list):
|
||||
return [str(x).strip() for x in raw if str(x).strip()]
|
||||
if isinstance(raw, str):
|
||||
return [x.strip() for x in raw.split(",") if x.strip()]
|
||||
return []
|
||||
|
||||
|
||||
def is_completed_status(status: str) -> bool:
|
||||
return status in {"completed", "done"}
|
||||
|
||||
|
||||
def pick_next_task(tasks: list[dict[str, Any]]) -> dict[str, Any] | None:
|
||||
status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in tasks}
|
||||
for task in tasks:
|
||||
if task.get("status", "pending") != "pending":
|
||||
continue
|
||||
deps = parse_dep_list(task.get("depends_on"))
|
||||
deps_ready = all(is_completed_status(status_by_id.get(dep, "")) for dep in deps)
|
||||
if deps_ready:
|
||||
return task
|
||||
return None
|
||||
|
||||
|
||||
def run_single_task(repo_root: pathlib.Path, orch_dir: pathlib.Path, config: dict[str, Any]) -> bool:
|
||||
tasks_path = orch_dir / "tasks.json"
|
||||
state_path = orch_dir / "state.json"
|
||||
events_path = orch_dir / "events.ndjson"
|
||||
logs_dir = orch_dir / "logs"
|
||||
results_dir = orch_dir / "results"
|
||||
|
||||
tasks = load_json(tasks_path, {"tasks": []})
|
||||
task_items = tasks.get("tasks", [])
|
||||
if not isinstance(task_items, list):
|
||||
raise ValueError("tasks.json must contain {'tasks': [...]} structure")
|
||||
|
||||
task = pick_next_task(task_items)
|
||||
if not task:
|
||||
return False
|
||||
|
||||
task_id = str(task.get("id", "unknown-task"))
|
||||
max_attempts = int(task.get("max_attempts") or config.get("worker", {}).get("max_attempts") or 1)
|
||||
attempt = int(task.get("attempts", 0)) + 1
|
||||
task["attempts"] = attempt
|
||||
task["max_attempts"] = max_attempts
|
||||
task["status"] = "running"
|
||||
task["started_at"] = now_iso()
|
||||
save_json(tasks_path, {"tasks": task_items})
|
||||
|
||||
state = load_json(state_path, {"running_task_id": None, "updated_at": None})
|
||||
state["running_task_id"] = task_id
|
||||
state["updated_at"] = now_iso()
|
||||
save_json(state_path, state)
|
||||
|
||||
emit_event(events_path, "task.assigned", task_id, "running", "controller", "Task assigned")
|
||||
emit_event(events_path, "task.started", task_id, "running", "worker", "Worker execution started")
|
||||
|
||||
log_path = logs_dir / f"{task_id}.log"
|
||||
task_file = orch_dir / f"task-{task_id}.json"
|
||||
save_json(task_file, task)
|
||||
|
||||
cmd = str(task.get("command", "")).strip()
|
||||
if not cmd:
|
||||
template = str(config.get("worker", {}).get("command_template", "")).strip()
|
||||
if template:
|
||||
cmd = render_command_template(template, task, task_file)
|
||||
|
||||
if not cmd:
|
||||
task["status"] = "failed"
|
||||
task["failed_at"] = now_iso()
|
||||
task["error"] = "No task command or worker command_template configured."
|
||||
save_json(tasks_path, {"tasks": task_items})
|
||||
emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"])
|
||||
state["running_task_id"] = None
|
||||
state["updated_at"] = now_iso()
|
||||
save_json(state_path, state)
|
||||
return True
|
||||
|
||||
timeout_sec = int(task.get("timeout_seconds") or config.get("worker", {}).get("timeout_seconds") or 7200)
|
||||
rc, _, timed_out = run_shell(cmd, repo_root, log_path, timeout_sec)
|
||||
if rc != 0:
|
||||
task["error"] = f"Worker command timed out after {timeout_sec}s" if timed_out else f"Worker command failed with exit code {rc}"
|
||||
if attempt < max_attempts:
|
||||
task["status"] = "pending"
|
||||
task["last_failed_at"] = now_iso()
|
||||
emit_event(
|
||||
events_path,
|
||||
"task.retry.scheduled",
|
||||
task_id,
|
||||
"pending",
|
||||
"worker",
|
||||
f"{task['error']}; retry {attempt + 1}/{max_attempts}",
|
||||
)
|
||||
else:
|
||||
task["status"] = "failed"
|
||||
task["failed_at"] = now_iso()
|
||||
emit_event(events_path, "task.failed", task_id, "failed", "worker", task["error"])
|
||||
save_json(tasks_path, {"tasks": task_items})
|
||||
state["running_task_id"] = None
|
||||
state["updated_at"] = now_iso()
|
||||
save_json(state_path, state)
|
||||
save_json(
|
||||
results_dir / f"{task_id}.json",
|
||||
{"task_id": task_id, "status": task["status"], "exit_code": rc, "attempt": attempt, "max_attempts": max_attempts},
|
||||
)
|
||||
return True
|
||||
|
||||
gates = task.get("quality_gates") or config.get("quality_gates") or []
|
||||
all_passed = True
|
||||
gate_results: list[dict[str, Any]] = []
|
||||
for gate in gates:
|
||||
gate_cmd = str(gate).strip()
|
||||
if not gate_cmd:
|
||||
continue
|
||||
emit_event(events_path, "rail.check.started", task_id, "running", "quality-gate", f"Running gate: {gate_cmd}")
|
||||
gate_rc, _, gate_timed_out = run_shell(gate_cmd, repo_root, log_path, timeout_sec)
|
||||
if gate_rc == 0:
|
||||
emit_event(events_path, "rail.check.passed", task_id, "running", "quality-gate", f"Gate passed: {gate_cmd}")
|
||||
else:
|
||||
all_passed = False
|
||||
emit_event(
|
||||
events_path,
|
||||
"rail.check.failed",
|
||||
task_id,
|
||||
"failed",
|
||||
"quality-gate",
|
||||
f"Gate timed out after {timeout_sec}s: {gate_cmd}" if gate_timed_out else f"Gate failed ({gate_rc}): {gate_cmd}",
|
||||
)
|
||||
gate_results.append({"command": gate_cmd, "exit_code": gate_rc})
|
||||
|
||||
if all_passed:
|
||||
task["status"] = "completed"
|
||||
task["completed_at"] = now_iso()
|
||||
emit_event(events_path, "task.completed", task_id, "completed", "controller", "Task completed")
|
||||
else:
|
||||
task["error"] = "One or more quality gates failed"
|
||||
if attempt < max_attempts:
|
||||
task["status"] = "pending"
|
||||
task["last_failed_at"] = now_iso()
|
||||
emit_event(
|
||||
events_path,
|
||||
"task.retry.scheduled",
|
||||
task_id,
|
||||
"pending",
|
||||
"controller",
|
||||
f"{task['error']}; retry {attempt + 1}/{max_attempts}",
|
||||
)
|
||||
else:
|
||||
task["status"] = "failed"
|
||||
task["failed_at"] = now_iso()
|
||||
emit_event(events_path, "task.failed", task_id, "failed", "controller", task["error"])
|
||||
|
||||
save_json(tasks_path, {"tasks": task_items})
|
||||
state["running_task_id"] = None
|
||||
state["updated_at"] = now_iso()
|
||||
save_json(state_path, state)
|
||||
save_json(
|
||||
results_dir / f"{task_id}.json",
|
||||
{
|
||||
"task_id": task_id,
|
||||
"status": task["status"],
|
||||
"completed_at": task.get("completed_at"),
|
||||
"failed_at": task.get("failed_at"),
|
||||
"gate_results": gate_results,
|
||||
},
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def queue_state(orch_dir: pathlib.Path) -> dict[str, int]:
|
||||
tasks = load_json(orch_dir / "tasks.json", {"tasks": []})
|
||||
task_items = tasks.get("tasks", [])
|
||||
if not isinstance(task_items, list):
|
||||
return {"pending": 0, "running": 0, "runnable": 0}
|
||||
|
||||
pending = 0
|
||||
running = 0
|
||||
runnable = 0
|
||||
status_by_id = {str(t.get("id", "")): str(t.get("status", "")) for t in task_items}
|
||||
for task in task_items:
|
||||
status = str(task.get("status", "pending"))
|
||||
if status == "pending":
|
||||
pending += 1
|
||||
deps = parse_dep_list(task.get("depends_on"))
|
||||
if all(is_completed_status(status_by_id.get(dep, "")) for dep in deps):
|
||||
runnable += 1
|
||||
if status == "running":
|
||||
running += 1
|
||||
return {"pending": pending, "running": running, "runnable": runnable}
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Mosaic deterministic orchestrator controller")
|
||||
parser.add_argument("--repo", default=os.getcwd(), help="Repository root (default: cwd)")
|
||||
parser.add_argument("--once", action="store_true", help="Process at most one pending task and exit")
|
||||
parser.add_argument("--until-drained", action="store_true", help="Run until no pending tasks remain (or blocked)")
|
||||
parser.add_argument("--poll-sec", type=int, default=10, help="Polling interval for continuous mode")
|
||||
args = parser.parse_args()
|
||||
|
||||
repo_root = pathlib.Path(args.repo).resolve()
|
||||
orch_dir = repo_root / ".mosaic" / "orchestrator"
|
||||
config_path = orch_dir / "config.json"
|
||||
if not config_path.exists():
|
||||
print(f"[mosaic-orchestrator] missing config: {config_path}", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
config = load_json(config_path, {})
|
||||
if not config.get("enabled", False):
|
||||
print("[mosaic-orchestrator] disabled in .mosaic/orchestrator/config.json (enabled=false)")
|
||||
return 0
|
||||
|
||||
if args.once:
|
||||
processed = run_single_task(repo_root, orch_dir, config)
|
||||
if not processed:
|
||||
print("[mosaic-orchestrator] no pending tasks")
|
||||
return 0
|
||||
|
||||
print(f"[mosaic-orchestrator] loop start repo={repo_root} poll={args.poll_sec}s")
|
||||
while True:
|
||||
try:
|
||||
processed = run_single_task(repo_root, orch_dir, config)
|
||||
if not processed:
|
||||
qs = queue_state(orch_dir)
|
||||
if args.until_drained:
|
||||
if qs["pending"] == 0 and qs["running"] == 0:
|
||||
print("[mosaic-orchestrator] drained: no pending tasks")
|
||||
return 0
|
||||
if qs["pending"] > 0 and qs["runnable"] == 0 and qs["running"] == 0:
|
||||
print("[mosaic-orchestrator] blocked: pending tasks remain but dependencies are unmet", file=sys.stderr)
|
||||
return 2
|
||||
time.sleep(max(1, args.poll_sec))
|
||||
except KeyboardInterrupt:
|
||||
print("\n[mosaic-orchestrator] stopping")
|
||||
return 0
|
||||
except Exception as exc: # pragma: no cover
|
||||
print(f"[mosaic-orchestrator] error: {exc}", file=sys.stderr)
|
||||
time.sleep(max(1, args.poll_sec))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
195
tools/orchestrator-matrix/controller/tasks_md_sync.py
Normal file
195
tools/orchestrator-matrix/controller/tasks_md_sync.py
Normal file
@@ -0,0 +1,195 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Sync docs/TASKS.md rows into .mosaic/orchestrator/tasks.json."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
from typing import Any
|
||||
|
||||
|
||||
def load_json(path: pathlib.Path, default: Any) -> Any:
|
||||
if not path.exists():
|
||||
return default
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def save_json(path: pathlib.Path, data: Any) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||||
with tmp.open("w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
f.write("\n")
|
||||
tmp.replace(path)
|
||||
|
||||
|
||||
def split_pipe_row(line: str) -> list[str]:
|
||||
row = line.strip()
|
||||
if row.startswith("|"):
|
||||
row = row[1:]
|
||||
if row.endswith("|"):
|
||||
row = row[:-1]
|
||||
return [c.strip() for c in row.split("|")]
|
||||
|
||||
|
||||
def parse_tasks_markdown(path: pathlib.Path) -> list[dict[str, str]]:
|
||||
if not path.exists():
|
||||
return []
|
||||
lines = path.read_text(encoding="utf-8").splitlines()
|
||||
|
||||
header_idx = -1
|
||||
headers: list[str] = []
|
||||
for i, line in enumerate(lines):
|
||||
if "|" not in line:
|
||||
continue
|
||||
cells = [x.lower() for x in split_pipe_row(line)]
|
||||
if "id" in cells and "status" in cells and "description" in cells:
|
||||
header_idx = i
|
||||
headers = cells
|
||||
break
|
||||
if header_idx < 0:
|
||||
return []
|
||||
|
||||
rows: list[dict[str, str]] = []
|
||||
for line in lines[header_idx + 2 :]:
|
||||
if not line.strip().startswith("|"):
|
||||
if rows:
|
||||
break
|
||||
continue
|
||||
cells = split_pipe_row(line)
|
||||
if len(cells) < len(headers):
|
||||
cells += [""] * (len(headers) - len(cells))
|
||||
row = {headers[i]: cells[i] for i in range(len(headers))}
|
||||
task_id = row.get("id", "").strip()
|
||||
if not task_id or task_id.lower() == "id":
|
||||
continue
|
||||
rows.append(row)
|
||||
return rows
|
||||
|
||||
|
||||
def map_status(raw: str) -> str:
|
||||
value = raw.strip().lower()
|
||||
mapping = {
|
||||
"not-started": "pending",
|
||||
"todo": "pending",
|
||||
"pending": "pending",
|
||||
"in-progress": "pending",
|
||||
"needs-qa": "pending",
|
||||
"done": "completed",
|
||||
"completed": "completed",
|
||||
"failed": "failed",
|
||||
}
|
||||
return mapping.get(value, "pending")
|
||||
|
||||
|
||||
def parse_depends(raw: str) -> list[str]:
|
||||
return [x.strip() for x in raw.split(",") if x.strip()]
|
||||
|
||||
|
||||
def build_task(
|
||||
row: dict[str, str],
|
||||
existing: dict[str, Any],
|
||||
runtime_default: str,
|
||||
source_path: str,
|
||||
) -> dict[str, Any]:
|
||||
task_id = row.get("id", "").strip()
|
||||
description = row.get("description", "").strip()
|
||||
issue = row.get("issue", "").strip()
|
||||
repo = row.get("repo", "").strip()
|
||||
branch = row.get("branch", "").strip()
|
||||
depends_on = parse_depends(row.get("depends_on", ""))
|
||||
|
||||
task = dict(existing)
|
||||
task["id"] = task_id
|
||||
task["title"] = description or task_id
|
||||
task["description"] = description
|
||||
task["status"] = map_status(row.get("status", "pending"))
|
||||
task["depends_on"] = depends_on
|
||||
task["runtime"] = str(task.get("runtime") or runtime_default or "codex")
|
||||
task["command"] = str(task.get("command") or "")
|
||||
task["quality_gates"] = task.get("quality_gates") or []
|
||||
metadata = dict(task.get("metadata") or {})
|
||||
metadata.update(
|
||||
{
|
||||
"source": source_path,
|
||||
"issue": issue,
|
||||
"repo": repo,
|
||||
"branch": branch,
|
||||
}
|
||||
)
|
||||
task["metadata"] = metadata
|
||||
return task
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Sync docs/TASKS.md into .mosaic/orchestrator/tasks.json")
|
||||
parser.add_argument("--repo", default=os.getcwd(), help="Repository root (default: cwd)")
|
||||
parser.add_argument("--docs", default="docs/TASKS.md", help="Path to tasks markdown (repo-relative)")
|
||||
parser.add_argument(
|
||||
"--tasks-json",
|
||||
default=".mosaic/orchestrator/tasks.json",
|
||||
help="Path to orchestrator tasks JSON (repo-relative)",
|
||||
)
|
||||
parser.add_argument("--keep-unlisted", action="store_true", help="Retain tasks already in JSON but missing from docs/TASKS.md")
|
||||
parser.add_argument("--apply", action="store_true", help="Write changes (default is dry-run)")
|
||||
args = parser.parse_args()
|
||||
|
||||
repo = pathlib.Path(args.repo).resolve()
|
||||
docs_path = (repo / args.docs).resolve()
|
||||
# Backward compatibility: fall back to legacy lowercase path when default path is absent.
|
||||
if args.docs == "docs/TASKS.md" and not docs_path.exists():
|
||||
legacy_docs_path = (repo / "docs/tasks.md").resolve()
|
||||
if legacy_docs_path.exists():
|
||||
docs_path = legacy_docs_path
|
||||
tasks_path = (repo / args.tasks_json).resolve()
|
||||
config_path = repo / ".mosaic" / "orchestrator" / "config.json"
|
||||
config = load_json(config_path, {})
|
||||
runtime_default = str(config.get("worker", {}).get("runtime") or "codex")
|
||||
|
||||
rows = parse_tasks_markdown(docs_path)
|
||||
try:
|
||||
source_path = str(docs_path.relative_to(repo))
|
||||
except ValueError:
|
||||
source_path = str(docs_path)
|
||||
existing_payload = load_json(tasks_path, {"tasks": []})
|
||||
existing_tasks = existing_payload.get("tasks", [])
|
||||
if not isinstance(existing_tasks, list):
|
||||
existing_tasks = []
|
||||
existing_by_id = {str(t.get("id", "")): t for t in existing_tasks}
|
||||
|
||||
out_tasks: list[dict[str, Any]] = []
|
||||
seen: set[str] = set()
|
||||
for row in rows:
|
||||
task_id = row.get("id", "").strip()
|
||||
if not task_id:
|
||||
continue
|
||||
seen.add(task_id)
|
||||
out_tasks.append(
|
||||
build_task(
|
||||
row,
|
||||
existing_by_id.get(task_id, {}),
|
||||
runtime_default,
|
||||
source_path,
|
||||
)
|
||||
)
|
||||
|
||||
if args.keep_unlisted:
|
||||
for task in existing_tasks:
|
||||
task_id = str(task.get("id", ""))
|
||||
if task_id and task_id not in seen:
|
||||
out_tasks.append(task)
|
||||
|
||||
payload = {"tasks": out_tasks}
|
||||
if args.apply:
|
||||
save_json(tasks_path, payload)
|
||||
print(f"[mosaic-orchestrator-sync] wrote {len(out_tasks)} tasks -> {tasks_path}")
|
||||
else:
|
||||
print(f"[mosaic-orchestrator-sync] dry-run: {len(out_tasks)} tasks would be written -> {tasks_path}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
64
tools/orchestrator-matrix/protocol/event.schema.json
Normal file
64
tools/orchestrator-matrix/protocol/event.schema.json
Normal file
@@ -0,0 +1,64 @@
|
||||
{
|
||||
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
||||
"$id": "https://mosaicstack.dev/schemas/orchestrator/event.schema.json",
|
||||
"title": "Mosaic Orchestrator Event",
|
||||
"type": "object",
|
||||
"required": [
|
||||
"event_id",
|
||||
"event_type",
|
||||
"task_id",
|
||||
"status",
|
||||
"timestamp",
|
||||
"source"
|
||||
],
|
||||
"properties": {
|
||||
"event_id": {
|
||||
"type": "string",
|
||||
"description": "UUID string"
|
||||
},
|
||||
"event_type": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"task.assigned",
|
||||
"task.started",
|
||||
"task.progress",
|
||||
"task.completed",
|
||||
"task.failed",
|
||||
"rail.check.started",
|
||||
"rail.check.passed",
|
||||
"rail.check.failed"
|
||||
]
|
||||
},
|
||||
"task_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"status": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"pending",
|
||||
"running",
|
||||
"completed",
|
||||
"failed"
|
||||
]
|
||||
},
|
||||
"timestamp": {
|
||||
"type": "string",
|
||||
"format": "date-time"
|
||||
},
|
||||
"source": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"controller",
|
||||
"worker",
|
||||
"quality-gate"
|
||||
]
|
||||
},
|
||||
"message": {
|
||||
"type": "string"
|
||||
},
|
||||
"metadata": {
|
||||
"type": "object"
|
||||
}
|
||||
},
|
||||
"additionalProperties": true
|
||||
}
|
||||
49
tools/orchestrator-matrix/protocol/task.schema.json
Normal file
49
tools/orchestrator-matrix/protocol/task.schema.json
Normal file
@@ -0,0 +1,49 @@
|
||||
{
|
||||
"$schema": "https://json-schema.org/draft/2020-12/schema",
|
||||
"$id": "https://mosaicstack.dev/schemas/orchestrator/task.schema.json",
|
||||
"title": "Mosaic Orchestrator Task",
|
||||
"type": "object",
|
||||
"required": [
|
||||
"id",
|
||||
"title",
|
||||
"status"
|
||||
],
|
||||
"properties": {
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"title": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": {
|
||||
"type": "string"
|
||||
},
|
||||
"status": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"pending",
|
||||
"running",
|
||||
"completed",
|
||||
"failed"
|
||||
]
|
||||
},
|
||||
"runtime": {
|
||||
"type": "string",
|
||||
"description": "Preferred worker runtime, e.g. codex, claude, opencode"
|
||||
},
|
||||
"command": {
|
||||
"type": "string",
|
||||
"description": "Worker command to execute for this task"
|
||||
},
|
||||
"quality_gates": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"metadata": {
|
||||
"type": "object"
|
||||
}
|
||||
},
|
||||
"additionalProperties": true
|
||||
}
|
||||
2
tools/orchestrator-matrix/transport/.gitignore
vendored
Normal file
2
tools/orchestrator-matrix/transport/.gitignore
vendored
Normal file
@@ -0,0 +1,2 @@
|
||||
__pycache__/
|
||||
*.pyc
|
||||
200
tools/orchestrator-matrix/transport/matrix_transport.py
Executable file
200
tools/orchestrator-matrix/transport/matrix_transport.py
Executable file
@@ -0,0 +1,200 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Matrix transport bridge for Mosaic orchestrator events/tasks."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import pathlib
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
|
||||
def load_json(path: pathlib.Path, default: Any) -> Any:
|
||||
if not path.exists():
|
||||
return default
|
||||
with path.open("r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def save_json(path: pathlib.Path, data: Any) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||||
with tmp.open("w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2)
|
||||
f.write("\n")
|
||||
tmp.replace(path)
|
||||
|
||||
|
||||
def matrix_request(
|
||||
homeserver: str,
|
||||
access_token: str,
|
||||
method: str,
|
||||
path: str,
|
||||
payload: dict[str, Any] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
url = homeserver.rstrip("/") + path
|
||||
body = None
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
if payload is not None:
|
||||
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
|
||||
headers["Content-Type"] = "application/json"
|
||||
req = urllib.request.Request(url, method=method, data=body, headers=headers)
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
raw = resp.read().decode("utf-8")
|
||||
return json.loads(raw) if raw else {}
|
||||
|
||||
|
||||
def matrix_send_message(homeserver: str, access_token: str, room_id: str, message: str) -> None:
|
||||
txn = str(uuid.uuid4())
|
||||
path = f"/_matrix/client/v3/rooms/{urllib.parse.quote(room_id, safe='')}/send/m.room.message/{txn}"
|
||||
matrix_request(
|
||||
homeserver,
|
||||
access_token,
|
||||
"PUT",
|
||||
path,
|
||||
{"msgtype": "m.text", "body": message},
|
||||
)
|
||||
|
||||
|
||||
def format_event_message(event: dict[str, Any]) -> str:
|
||||
et = event.get("event_type", "unknown")
|
||||
tid = event.get("task_id", "unknown")
|
||||
status = event.get("status", "unknown")
|
||||
msg = event.get("message", "")
|
||||
return f"[mosaic-orch] {et} task={tid} status={status} :: {msg}"
|
||||
|
||||
|
||||
def publish_events(repo: pathlib.Path, config: dict[str, Any]) -> int:
|
||||
orch = repo / ".mosaic" / "orchestrator"
|
||||
events_path = orch / "events.ndjson"
|
||||
bridge_state_path = orch / "matrix_state.json"
|
||||
state = load_json(bridge_state_path, {"last_published_line": 0, "since": None})
|
||||
|
||||
homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip()
|
||||
token = str(config.get("matrix", {}).get("access_token", "")).strip()
|
||||
room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip()
|
||||
if not homeserver or not token or not room_id:
|
||||
raise ValueError("matrix homeserver_url, access_token, and control_room_id are required")
|
||||
|
||||
if not events_path.exists():
|
||||
return 0
|
||||
|
||||
lines = events_path.read_text(encoding="utf-8").splitlines()
|
||||
start = int(state.get("last_published_line", 0))
|
||||
published = 0
|
||||
for idx, line in enumerate(lines[start:], start=start + 1):
|
||||
if not line.strip():
|
||||
continue
|
||||
event = json.loads(line)
|
||||
matrix_send_message(homeserver, token, room_id, format_event_message(event))
|
||||
state["last_published_line"] = idx
|
||||
published += 1
|
||||
save_json(bridge_state_path, state)
|
||||
return published
|
||||
|
||||
|
||||
def parse_task_command(body: str) -> dict[str, Any] | None:
|
||||
raw = body.strip()
|
||||
if raw.startswith("!mosaic-task "):
|
||||
payload = raw[len("!mosaic-task ") :].strip()
|
||||
elif raw.startswith("@mosaic task "):
|
||||
payload = raw[len("@mosaic task ") :].strip()
|
||||
else:
|
||||
return None
|
||||
|
||||
task = json.loads(payload)
|
||||
if not isinstance(task, dict):
|
||||
raise ValueError("task payload must be a JSON object")
|
||||
if "id" not in task or "title" not in task:
|
||||
raise ValueError("task payload requires id and title")
|
||||
task.setdefault("status", "pending")
|
||||
return task
|
||||
|
||||
|
||||
def consume_tasks(repo: pathlib.Path, config: dict[str, Any]) -> int:
|
||||
orch = repo / ".mosaic" / "orchestrator"
|
||||
tasks_path = orch / "tasks.json"
|
||||
bridge_state_path = orch / "matrix_state.json"
|
||||
state = load_json(bridge_state_path, {"last_published_line": 0, "since": None})
|
||||
tasks = load_json(tasks_path, {"tasks": []})
|
||||
task_items = tasks.get("tasks", [])
|
||||
if not isinstance(task_items, list):
|
||||
raise ValueError("tasks.json must contain {'tasks': [...]} structure")
|
||||
|
||||
homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip()
|
||||
token = str(config.get("matrix", {}).get("access_token", "")).strip()
|
||||
room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip()
|
||||
bot_user_id = str(config.get("matrix", {}).get("bot_user_id", "")).strip()
|
||||
if not homeserver or not token or not room_id:
|
||||
raise ValueError("matrix homeserver_url, access_token, and control_room_id are required")
|
||||
|
||||
since = state.get("since")
|
||||
path = "/_matrix/client/v3/sync?timeout=1"
|
||||
if since:
|
||||
path += "&since=" + urllib.parse.quote(str(since), safe="")
|
||||
sync = matrix_request(homeserver, token, "GET", path)
|
||||
if "next_batch" in sync:
|
||||
state["since"] = sync["next_batch"]
|
||||
|
||||
room_timeline = (
|
||||
sync.get("rooms", {})
|
||||
.get("join", {})
|
||||
.get(room_id, {})
|
||||
.get("timeline", {})
|
||||
.get("events", [])
|
||||
)
|
||||
added = 0
|
||||
existing = {str(t.get("id")) for t in task_items if isinstance(t, dict)}
|
||||
for evt in room_timeline:
|
||||
if evt.get("type") != "m.room.message":
|
||||
continue
|
||||
sender = str(evt.get("sender", ""))
|
||||
if bot_user_id and sender == bot_user_id:
|
||||
continue
|
||||
body = str(evt.get("content", {}).get("body", ""))
|
||||
task = parse_task_command(body)
|
||||
if not task:
|
||||
continue
|
||||
task_id = str(task.get("id"))
|
||||
if task_id in existing:
|
||||
continue
|
||||
task_items.append(task)
|
||||
existing.add(task_id)
|
||||
added += 1
|
||||
|
||||
save_json(tasks_path, {"tasks": task_items})
|
||||
save_json(bridge_state_path, state)
|
||||
return added
|
||||
|
||||
|
||||
def main() -> int:
|
||||
p = argparse.ArgumentParser(description="Mosaic Matrix transport bridge")
|
||||
p.add_argument("--repo", default=".", help="Repository root")
|
||||
p.add_argument("--mode", required=True, choices=["publish", "consume"], help="Bridge mode")
|
||||
args = p.parse_args()
|
||||
|
||||
repo = pathlib.Path(args.repo).resolve()
|
||||
config = load_json(repo / ".mosaic" / "orchestrator" / "config.json", {})
|
||||
if not config.get("enabled", False):
|
||||
print("[mosaic-orch-matrix] disabled in config (enabled=false)")
|
||||
return 0
|
||||
|
||||
if str(config.get("transport", "")).strip() != "matrix":
|
||||
print("[mosaic-orch-matrix] config transport != matrix; nothing to do")
|
||||
return 0
|
||||
|
||||
if args.mode == "publish":
|
||||
count = publish_events(repo, config)
|
||||
print(f"[mosaic-orch-matrix] published_events={count}")
|
||||
return 0
|
||||
|
||||
count = consume_tasks(repo, config)
|
||||
print(f"[mosaic-orch-matrix] consumed_tasks={count}")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user