Co-authored-by: Jason Woltje <jason@diversecanvas.com> Co-committed-by: Jason Woltje <jason@diversecanvas.com>
201 lines
6.9 KiB
Python
Executable File
201 lines
6.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Matrix transport bridge for Mosaic orchestrator events/tasks."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import pathlib
|
|
import urllib.parse
|
|
import urllib.request
|
|
import uuid
|
|
from typing import Any
|
|
|
|
|
|
def load_json(path: pathlib.Path, default: Any) -> Any:
|
|
if not path.exists():
|
|
return default
|
|
with path.open("r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def save_json(path: pathlib.Path, data: Any) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
|
with tmp.open("w", encoding="utf-8") as f:
|
|
json.dump(data, f, indent=2)
|
|
f.write("\n")
|
|
tmp.replace(path)
|
|
|
|
|
|
def matrix_request(
|
|
homeserver: str,
|
|
access_token: str,
|
|
method: str,
|
|
path: str,
|
|
payload: dict[str, Any] | None = None,
|
|
) -> dict[str, Any]:
|
|
url = homeserver.rstrip("/") + path
|
|
body = None
|
|
headers = {"Authorization": f"Bearer {access_token}"}
|
|
if payload is not None:
|
|
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
|
|
headers["Content-Type"] = "application/json"
|
|
req = urllib.request.Request(url, method=method, data=body, headers=headers)
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
raw = resp.read().decode("utf-8")
|
|
return json.loads(raw) if raw else {}
|
|
|
|
|
|
def matrix_send_message(homeserver: str, access_token: str, room_id: str, message: str) -> None:
|
|
txn = str(uuid.uuid4())
|
|
path = f"/_matrix/client/v3/rooms/{urllib.parse.quote(room_id, safe='')}/send/m.room.message/{txn}"
|
|
matrix_request(
|
|
homeserver,
|
|
access_token,
|
|
"PUT",
|
|
path,
|
|
{"msgtype": "m.text", "body": message},
|
|
)
|
|
|
|
|
|
def format_event_message(event: dict[str, Any]) -> str:
|
|
et = event.get("event_type", "unknown")
|
|
tid = event.get("task_id", "unknown")
|
|
status = event.get("status", "unknown")
|
|
msg = event.get("message", "")
|
|
return f"[mosaic-orch] {et} task={tid} status={status} :: {msg}"
|
|
|
|
|
|
def publish_events(repo: pathlib.Path, config: dict[str, Any]) -> int:
|
|
orch = repo / ".mosaic" / "orchestrator"
|
|
events_path = orch / "events.ndjson"
|
|
bridge_state_path = orch / "matrix_state.json"
|
|
state = load_json(bridge_state_path, {"last_published_line": 0, "since": None})
|
|
|
|
homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip()
|
|
token = str(config.get("matrix", {}).get("access_token", "")).strip()
|
|
room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip()
|
|
if not homeserver or not token or not room_id:
|
|
raise ValueError("matrix homeserver_url, access_token, and control_room_id are required")
|
|
|
|
if not events_path.exists():
|
|
return 0
|
|
|
|
lines = events_path.read_text(encoding="utf-8").splitlines()
|
|
start = int(state.get("last_published_line", 0))
|
|
published = 0
|
|
for idx, line in enumerate(lines[start:], start=start + 1):
|
|
if not line.strip():
|
|
continue
|
|
event = json.loads(line)
|
|
matrix_send_message(homeserver, token, room_id, format_event_message(event))
|
|
state["last_published_line"] = idx
|
|
published += 1
|
|
save_json(bridge_state_path, state)
|
|
return published
|
|
|
|
|
|
def parse_task_command(body: str) -> dict[str, Any] | None:
|
|
raw = body.strip()
|
|
if raw.startswith("!mosaic-task "):
|
|
payload = raw[len("!mosaic-task ") :].strip()
|
|
elif raw.startswith("@mosaic task "):
|
|
payload = raw[len("@mosaic task ") :].strip()
|
|
else:
|
|
return None
|
|
|
|
task = json.loads(payload)
|
|
if not isinstance(task, dict):
|
|
raise ValueError("task payload must be a JSON object")
|
|
if "id" not in task or "title" not in task:
|
|
raise ValueError("task payload requires id and title")
|
|
task.setdefault("status", "pending")
|
|
return task
|
|
|
|
|
|
def consume_tasks(repo: pathlib.Path, config: dict[str, Any]) -> int:
|
|
orch = repo / ".mosaic" / "orchestrator"
|
|
tasks_path = orch / "tasks.json"
|
|
bridge_state_path = orch / "matrix_state.json"
|
|
state = load_json(bridge_state_path, {"last_published_line": 0, "since": None})
|
|
tasks = load_json(tasks_path, {"tasks": []})
|
|
task_items = tasks.get("tasks", [])
|
|
if not isinstance(task_items, list):
|
|
raise ValueError("tasks.json must contain {'tasks': [...]} structure")
|
|
|
|
homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip()
|
|
token = str(config.get("matrix", {}).get("access_token", "")).strip()
|
|
room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip()
|
|
bot_user_id = str(config.get("matrix", {}).get("bot_user_id", "")).strip()
|
|
if not homeserver or not token or not room_id:
|
|
raise ValueError("matrix homeserver_url, access_token, and control_room_id are required")
|
|
|
|
since = state.get("since")
|
|
path = "/_matrix/client/v3/sync?timeout=1"
|
|
if since:
|
|
path += "&since=" + urllib.parse.quote(str(since), safe="")
|
|
sync = matrix_request(homeserver, token, "GET", path)
|
|
if "next_batch" in sync:
|
|
state["since"] = sync["next_batch"]
|
|
|
|
room_timeline = (
|
|
sync.get("rooms", {})
|
|
.get("join", {})
|
|
.get(room_id, {})
|
|
.get("timeline", {})
|
|
.get("events", [])
|
|
)
|
|
added = 0
|
|
existing = {str(t.get("id")) for t in task_items if isinstance(t, dict)}
|
|
for evt in room_timeline:
|
|
if evt.get("type") != "m.room.message":
|
|
continue
|
|
sender = str(evt.get("sender", ""))
|
|
if bot_user_id and sender == bot_user_id:
|
|
continue
|
|
body = str(evt.get("content", {}).get("body", ""))
|
|
task = parse_task_command(body)
|
|
if not task:
|
|
continue
|
|
task_id = str(task.get("id"))
|
|
if task_id in existing:
|
|
continue
|
|
task_items.append(task)
|
|
existing.add(task_id)
|
|
added += 1
|
|
|
|
save_json(tasks_path, {"tasks": task_items})
|
|
save_json(bridge_state_path, state)
|
|
return added
|
|
|
|
|
|
def main() -> int:
|
|
p = argparse.ArgumentParser(description="Mosaic Matrix transport bridge")
|
|
p.add_argument("--repo", default=".", help="Repository root")
|
|
p.add_argument("--mode", required=True, choices=["publish", "consume"], help="Bridge mode")
|
|
args = p.parse_args()
|
|
|
|
repo = pathlib.Path(args.repo).resolve()
|
|
config = load_json(repo / ".mosaic" / "orchestrator" / "config.json", {})
|
|
if not config.get("enabled", False):
|
|
print("[mosaic-orch-matrix] disabled in config (enabled=false)")
|
|
return 0
|
|
|
|
if str(config.get("transport", "")).strip() != "matrix":
|
|
print("[mosaic-orch-matrix] config transport != matrix; nothing to do")
|
|
return 0
|
|
|
|
if args.mode == "publish":
|
|
count = publish_events(repo, config)
|
|
print(f"[mosaic-orch-matrix] published_events={count}")
|
|
return 0
|
|
|
|
count = consume_tasks(repo, config)
|
|
print(f"[mosaic-orch-matrix] consumed_tasks={count}")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|