Files
bootstrap/tools/orchestrator-matrix/transport/matrix_transport.py
2026-02-22 17:52:23 +00:00

201 lines
6.9 KiB
Python
Executable File

#!/usr/bin/env python3
"""Matrix transport bridge for Mosaic orchestrator events/tasks."""
from __future__ import annotations
import argparse
import json
import pathlib
import urllib.parse
import urllib.request
import uuid
from typing import Any
def load_json(path: pathlib.Path, default: Any) -> Any:
if not path.exists():
return default
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: pathlib.Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
f.write("\n")
tmp.replace(path)
def matrix_request(
homeserver: str,
access_token: str,
method: str,
path: str,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
url = homeserver.rstrip("/") + path
body = None
headers = {"Authorization": f"Bearer {access_token}"}
if payload is not None:
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, method=method, data=body, headers=headers)
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw else {}
def matrix_send_message(homeserver: str, access_token: str, room_id: str, message: str) -> None:
txn = str(uuid.uuid4())
path = f"/_matrix/client/v3/rooms/{urllib.parse.quote(room_id, safe='')}/send/m.room.message/{txn}"
matrix_request(
homeserver,
access_token,
"PUT",
path,
{"msgtype": "m.text", "body": message},
)
def format_event_message(event: dict[str, Any]) -> str:
et = event.get("event_type", "unknown")
tid = event.get("task_id", "unknown")
status = event.get("status", "unknown")
msg = event.get("message", "")
return f"[mosaic-orch] {et} task={tid} status={status} :: {msg}"
def publish_events(repo: pathlib.Path, config: dict[str, Any]) -> int:
orch = repo / ".mosaic" / "orchestrator"
events_path = orch / "events.ndjson"
bridge_state_path = orch / "matrix_state.json"
state = load_json(bridge_state_path, {"last_published_line": 0, "since": None})
homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip()
token = str(config.get("matrix", {}).get("access_token", "")).strip()
room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip()
if not homeserver or not token or not room_id:
raise ValueError("matrix homeserver_url, access_token, and control_room_id are required")
if not events_path.exists():
return 0
lines = events_path.read_text(encoding="utf-8").splitlines()
start = int(state.get("last_published_line", 0))
published = 0
for idx, line in enumerate(lines[start:], start=start + 1):
if not line.strip():
continue
event = json.loads(line)
matrix_send_message(homeserver, token, room_id, format_event_message(event))
state["last_published_line"] = idx
published += 1
save_json(bridge_state_path, state)
return published
def parse_task_command(body: str) -> dict[str, Any] | None:
raw = body.strip()
if raw.startswith("!mosaic-task "):
payload = raw[len("!mosaic-task ") :].strip()
elif raw.startswith("@mosaic task "):
payload = raw[len("@mosaic task ") :].strip()
else:
return None
task = json.loads(payload)
if not isinstance(task, dict):
raise ValueError("task payload must be a JSON object")
if "id" not in task or "title" not in task:
raise ValueError("task payload requires id and title")
task.setdefault("status", "pending")
return task
def consume_tasks(repo: pathlib.Path, config: dict[str, Any]) -> int:
orch = repo / ".mosaic" / "orchestrator"
tasks_path = orch / "tasks.json"
bridge_state_path = orch / "matrix_state.json"
state = load_json(bridge_state_path, {"last_published_line": 0, "since": None})
tasks = load_json(tasks_path, {"tasks": []})
task_items = tasks.get("tasks", [])
if not isinstance(task_items, list):
raise ValueError("tasks.json must contain {'tasks': [...]} structure")
homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip()
token = str(config.get("matrix", {}).get("access_token", "")).strip()
room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip()
bot_user_id = str(config.get("matrix", {}).get("bot_user_id", "")).strip()
if not homeserver or not token or not room_id:
raise ValueError("matrix homeserver_url, access_token, and control_room_id are required")
since = state.get("since")
path = "/_matrix/client/v3/sync?timeout=1"
if since:
path += "&since=" + urllib.parse.quote(str(since), safe="")
sync = matrix_request(homeserver, token, "GET", path)
if "next_batch" in sync:
state["since"] = sync["next_batch"]
room_timeline = (
sync.get("rooms", {})
.get("join", {})
.get(room_id, {})
.get("timeline", {})
.get("events", [])
)
added = 0
existing = {str(t.get("id")) for t in task_items if isinstance(t, dict)}
for evt in room_timeline:
if evt.get("type") != "m.room.message":
continue
sender = str(evt.get("sender", ""))
if bot_user_id and sender == bot_user_id:
continue
body = str(evt.get("content", {}).get("body", ""))
task = parse_task_command(body)
if not task:
continue
task_id = str(task.get("id"))
if task_id in existing:
continue
task_items.append(task)
existing.add(task_id)
added += 1
save_json(tasks_path, {"tasks": task_items})
save_json(bridge_state_path, state)
return added
def main() -> int:
p = argparse.ArgumentParser(description="Mosaic Matrix transport bridge")
p.add_argument("--repo", default=".", help="Repository root")
p.add_argument("--mode", required=True, choices=["publish", "consume"], help="Bridge mode")
args = p.parse_args()
repo = pathlib.Path(args.repo).resolve()
config = load_json(repo / ".mosaic" / "orchestrator" / "config.json", {})
if not config.get("enabled", False):
print("[mosaic-orch-matrix] disabled in config (enabled=false)")
return 0
if str(config.get("transport", "")).strip() != "matrix":
print("[mosaic-orch-matrix] config transport != matrix; nothing to do")
return 0
if args.mode == "publish":
count = publish_events(repo, config)
print(f"[mosaic-orch-matrix] published_events={count}")
return 0
count = consume_tasks(repo, config)
print(f"[mosaic-orch-matrix] consumed_tasks={count}")
return 0
if __name__ == "__main__":
raise SystemExit(main())