#!/usr/bin/env python3 """Matrix transport bridge for Mosaic orchestrator events/tasks.""" from __future__ import annotations import argparse import json import pathlib import urllib.parse import urllib.request import uuid from typing import Any def load_json(path: pathlib.Path, default: Any) -> Any: if not path.exists(): return default with path.open("r", encoding="utf-8") as f: return json.load(f) def save_json(path: pathlib.Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with tmp.open("w", encoding="utf-8") as f: json.dump(data, f, indent=2) f.write("\n") tmp.replace(path) def matrix_request( homeserver: str, access_token: str, method: str, path: str, payload: dict[str, Any] | None = None, ) -> dict[str, Any]: url = homeserver.rstrip("/") + path body = None headers = {"Authorization": f"Bearer {access_token}"} if payload is not None: body = json.dumps(payload, ensure_ascii=True).encode("utf-8") headers["Content-Type"] = "application/json" req = urllib.request.Request(url, method=method, data=body, headers=headers) with urllib.request.urlopen(req, timeout=30) as resp: raw = resp.read().decode("utf-8") return json.loads(raw) if raw else {} def matrix_send_message(homeserver: str, access_token: str, room_id: str, message: str) -> None: txn = str(uuid.uuid4()) path = f"/_matrix/client/v3/rooms/{urllib.parse.quote(room_id, safe='')}/send/m.room.message/{txn}" matrix_request( homeserver, access_token, "PUT", path, {"msgtype": "m.text", "body": message}, ) def format_event_message(event: dict[str, Any]) -> str: et = event.get("event_type", "unknown") tid = event.get("task_id", "unknown") status = event.get("status", "unknown") msg = event.get("message", "") return f"[mosaic-orch] {et} task={tid} status={status} :: {msg}" def publish_events(repo: pathlib.Path, config: dict[str, Any]) -> int: orch = repo / ".mosaic" / "orchestrator" events_path = orch / "events.ndjson" bridge_state_path = orch / "matrix_state.json" state = load_json(bridge_state_path, {"last_published_line": 0, "since": None}) homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip() token = str(config.get("matrix", {}).get("access_token", "")).strip() room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip() if not homeserver or not token or not room_id: raise ValueError("matrix homeserver_url, access_token, and control_room_id are required") if not events_path.exists(): return 0 lines = events_path.read_text(encoding="utf-8").splitlines() start = int(state.get("last_published_line", 0)) published = 0 for idx, line in enumerate(lines[start:], start=start + 1): if not line.strip(): continue event = json.loads(line) matrix_send_message(homeserver, token, room_id, format_event_message(event)) state["last_published_line"] = idx published += 1 save_json(bridge_state_path, state) return published def parse_task_command(body: str) -> dict[str, Any] | None: raw = body.strip() if raw.startswith("!mosaic-task "): payload = raw[len("!mosaic-task ") :].strip() elif raw.startswith("@mosaic task "): payload = raw[len("@mosaic task ") :].strip() else: return None task = json.loads(payload) if not isinstance(task, dict): raise ValueError("task payload must be a JSON object") if "id" not in task or "title" not in task: raise ValueError("task payload requires id and title") task.setdefault("status", "pending") return task def consume_tasks(repo: pathlib.Path, config: dict[str, Any]) -> int: orch = repo / ".mosaic" / "orchestrator" tasks_path = orch / "tasks.json" bridge_state_path = orch / "matrix_state.json" state = load_json(bridge_state_path, {"last_published_line": 0, "since": None}) tasks = load_json(tasks_path, {"tasks": []}) task_items = tasks.get("tasks", []) if not isinstance(task_items, list): raise ValueError("tasks.json must contain {'tasks': [...]} structure") homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip() token = str(config.get("matrix", {}).get("access_token", "")).strip() room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip() bot_user_id = str(config.get("matrix", {}).get("bot_user_id", "")).strip() if not homeserver or not token or not room_id: raise ValueError("matrix homeserver_url, access_token, and control_room_id are required") since = state.get("since") path = "/_matrix/client/v3/sync?timeout=1" if since: path += "&since=" + urllib.parse.quote(str(since), safe="") sync = matrix_request(homeserver, token, "GET", path) if "next_batch" in sync: state["since"] = sync["next_batch"] room_timeline = ( sync.get("rooms", {}) .get("join", {}) .get(room_id, {}) .get("timeline", {}) .get("events", []) ) added = 0 existing = {str(t.get("id")) for t in task_items if isinstance(t, dict)} for evt in room_timeline: if evt.get("type") != "m.room.message": continue sender = str(evt.get("sender", "")) if bot_user_id and sender == bot_user_id: continue body = str(evt.get("content", {}).get("body", "")) task = parse_task_command(body) if not task: continue task_id = str(task.get("id")) if task_id in existing: continue task_items.append(task) existing.add(task_id) added += 1 save_json(tasks_path, {"tasks": task_items}) save_json(bridge_state_path, state) return added def main() -> int: p = argparse.ArgumentParser(description="Mosaic Matrix transport bridge") p.add_argument("--repo", default=".", help="Repository root") p.add_argument("--mode", required=True, choices=["publish", "consume"], help="Bridge mode") args = p.parse_args() repo = pathlib.Path(args.repo).resolve() config = load_json(repo / ".mosaic" / "orchestrator" / "config.json", {}) if not config.get("enabled", False): print("[mosaic-orch-matrix] disabled in config (enabled=false)") return 0 if str(config.get("transport", "")).strip() != "matrix": print("[mosaic-orch-matrix] config transport != matrix; nothing to do") return 0 if args.mode == "publish": count = publish_events(repo, config) print(f"[mosaic-orch-matrix] published_events={count}") return 0 count = consume_tasks(repo, config) print(f"[mosaic-orch-matrix] consumed_tasks={count}") return 0 if __name__ == "__main__": raise SystemExit(main())