diff --git a/README.md b/README.md index d8de5ca..e1c7cec 100644 --- a/README.md +++ b/README.md @@ -138,6 +138,8 @@ Run from a bootstrapped repo: ```bash ~/.mosaic/bin/mosaic-orchestrator-run --once ~/.mosaic/bin/mosaic-orchestrator-run --poll-sec 10 +~/.mosaic/bin/mosaic-orchestrator-matrix-publish +~/.mosaic/bin/mosaic-orchestrator-matrix-consume ``` The controller reads/writes repo-local state in `.mosaic/orchestrator/` and emits diff --git a/bin/mosaic-bootstrap-repo b/bin/mosaic-bootstrap-repo index 1a4210d..82ff1c7 100755 --- a/bin/mosaic-bootstrap-repo +++ b/bin/mosaic-bootstrap-repo @@ -58,6 +58,7 @@ copy_file "$TEMPLATE_ROOT/.mosaic/quality-rails.yml" "$TARGET_DIR/.mosaic/qualit copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/config.json" "$TARGET_DIR/.mosaic/orchestrator/config.json" copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/tasks.json" "$TARGET_DIR/.mosaic/orchestrator/tasks.json" copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/state.json" "$TARGET_DIR/.mosaic/orchestrator/state.json" +copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/matrix_state.json" "$TARGET_DIR/.mosaic/orchestrator/matrix_state.json" copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/logs/.gitkeep" "$TARGET_DIR/.mosaic/orchestrator/logs/.gitkeep" copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/results/.gitkeep" "$TARGET_DIR/.mosaic/orchestrator/results/.gitkeep" diff --git a/bin/mosaic-doctor b/bin/mosaic-doctor index ffab18a..e640f6f 100755 --- a/bin/mosaic-doctor +++ b/bin/mosaic-doctor @@ -126,6 +126,9 @@ expect_file "$MOSAIC_HOME/bin/mosaic-sync-skills" expect_file "$MOSAIC_HOME/bin/mosaic-quality-apply" expect_file "$MOSAIC_HOME/bin/mosaic-quality-verify" expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-run" +expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-matrix-publish" +expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-matrix-consume" +expect_file "$MOSAIC_HOME/rails/orchestrator-matrix/transport/matrix_transport.py" # Claude runtime file checks (copied, non-symlink). for rf in CLAUDE.md settings.json hooks-config.json context7-integration.md; do diff --git a/bin/mosaic-orchestrator-matrix-consume b/bin/mosaic-orchestrator-matrix-consume new file mode 100755 index 0000000..21fd8d3 --- /dev/null +++ b/bin/mosaic-orchestrator-matrix-consume @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail + +MOSAIC_HOME="${MOSAIC_HOME:-$HOME/.mosaic}" +BRIDGE="$MOSAIC_HOME/rails/orchestrator-matrix/transport/matrix_transport.py" + +if [[ ! -f "$BRIDGE" ]]; then + echo "[mosaic-orch-matrix] missing transport bridge: $BRIDGE" >&2 + exit 1 +fi + +exec python3 "$BRIDGE" --repo "$(pwd)" --mode consume "$@" diff --git a/bin/mosaic-orchestrator-matrix-publish b/bin/mosaic-orchestrator-matrix-publish new file mode 100755 index 0000000..f438480 --- /dev/null +++ b/bin/mosaic-orchestrator-matrix-publish @@ -0,0 +1,12 @@ +#!/usr/bin/env bash +set -euo pipefail + +MOSAIC_HOME="${MOSAIC_HOME:-$HOME/.mosaic}" +BRIDGE="$MOSAIC_HOME/rails/orchestrator-matrix/transport/matrix_transport.py" + +if [[ ! -f "$BRIDGE" ]]; then + echo "[mosaic-orch-matrix] missing transport bridge: $BRIDGE" >&2 + exit 1 +fi + +exec python3 "$BRIDGE" --repo "$(pwd)" --mode publish "$@" diff --git a/rails/orchestrator-matrix/README.md b/rails/orchestrator-matrix/README.md index 1e9e4d6..9f594a2 100644 --- a/rails/orchestrator-matrix/README.md +++ b/rails/orchestrator-matrix/README.md @@ -44,7 +44,26 @@ Continuous loop: ~/.mosaic/bin/mosaic-orchestrator-run --poll-sec 10 ``` +Publish new orchestrator events to Matrix: + +```bash +~/.mosaic/bin/mosaic-orchestrator-matrix-publish +``` + +Consume Matrix task messages into `tasks.json`: + +```bash +~/.mosaic/bin/mosaic-orchestrator-matrix-consume +``` + ## Matrix Note This rail writes canonical events to `.mosaic/orchestrator/events.ndjson`. -Matrix bridge services can consume and relay these events to Matrix rooms. +The Matrix transport bridge publishes those events into the configured control room +and can consume task commands from that room. + +Task injection message format (room text): + +```text +!mosaic-task {"id":"TASK-123","title":"Fix bug","command":"echo run","quality_gates":["pnpm lint"]} +``` diff --git a/rails/orchestrator-matrix/transport/.gitignore b/rails/orchestrator-matrix/transport/.gitignore new file mode 100644 index 0000000..7a60b85 --- /dev/null +++ b/rails/orchestrator-matrix/transport/.gitignore @@ -0,0 +1,2 @@ +__pycache__/ +*.pyc diff --git a/rails/orchestrator-matrix/transport/matrix_transport.py b/rails/orchestrator-matrix/transport/matrix_transport.py new file mode 100755 index 0000000..2c6d8b4 --- /dev/null +++ b/rails/orchestrator-matrix/transport/matrix_transport.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +"""Matrix transport bridge for Mosaic orchestrator events/tasks.""" + +from __future__ import annotations + +import argparse +import json +import pathlib +import urllib.parse +import urllib.request +import uuid +from typing import Any + + +def load_json(path: pathlib.Path, default: Any) -> Any: + if not path.exists(): + return default + with path.open("r", encoding="utf-8") as f: + return json.load(f) + + +def save_json(path: pathlib.Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with tmp.open("w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + f.write("\n") + tmp.replace(path) + + +def matrix_request( + homeserver: str, + access_token: str, + method: str, + path: str, + payload: dict[str, Any] | None = None, +) -> dict[str, Any]: + url = homeserver.rstrip("/") + path + body = None + headers = {"Authorization": f"Bearer {access_token}"} + if payload is not None: + body = json.dumps(payload, ensure_ascii=True).encode("utf-8") + headers["Content-Type"] = "application/json" + req = urllib.request.Request(url, method=method, data=body, headers=headers) + with urllib.request.urlopen(req, timeout=30) as resp: + raw = resp.read().decode("utf-8") + return json.loads(raw) if raw else {} + + +def matrix_send_message(homeserver: str, access_token: str, room_id: str, message: str) -> None: + txn = str(uuid.uuid4()) + path = f"/_matrix/client/v3/rooms/{urllib.parse.quote(room_id, safe='')}/send/m.room.message/{txn}" + matrix_request( + homeserver, + access_token, + "PUT", + path, + {"msgtype": "m.text", "body": message}, + ) + + +def format_event_message(event: dict[str, Any]) -> str: + et = event.get("event_type", "unknown") + tid = event.get("task_id", "unknown") + status = event.get("status", "unknown") + msg = event.get("message", "") + return f"[mosaic-orch] {et} task={tid} status={status} :: {msg}" + + +def publish_events(repo: pathlib.Path, config: dict[str, Any]) -> int: + orch = repo / ".mosaic" / "orchestrator" + events_path = orch / "events.ndjson" + bridge_state_path = orch / "matrix_state.json" + state = load_json(bridge_state_path, {"last_published_line": 0, "since": None}) + + homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip() + token = str(config.get("matrix", {}).get("access_token", "")).strip() + room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip() + if not homeserver or not token or not room_id: + raise ValueError("matrix homeserver_url, access_token, and control_room_id are required") + + if not events_path.exists(): + return 0 + + lines = events_path.read_text(encoding="utf-8").splitlines() + start = int(state.get("last_published_line", 0)) + published = 0 + for idx, line in enumerate(lines[start:], start=start + 1): + if not line.strip(): + continue + event = json.loads(line) + matrix_send_message(homeserver, token, room_id, format_event_message(event)) + state["last_published_line"] = idx + published += 1 + save_json(bridge_state_path, state) + return published + + +def parse_task_command(body: str) -> dict[str, Any] | None: + raw = body.strip() + if raw.startswith("!mosaic-task "): + payload = raw[len("!mosaic-task ") :].strip() + elif raw.startswith("@mosaic task "): + payload = raw[len("@mosaic task ") :].strip() + else: + return None + + task = json.loads(payload) + if not isinstance(task, dict): + raise ValueError("task payload must be a JSON object") + if "id" not in task or "title" not in task: + raise ValueError("task payload requires id and title") + task.setdefault("status", "pending") + return task + + +def consume_tasks(repo: pathlib.Path, config: dict[str, Any]) -> int: + orch = repo / ".mosaic" / "orchestrator" + tasks_path = orch / "tasks.json" + bridge_state_path = orch / "matrix_state.json" + state = load_json(bridge_state_path, {"last_published_line": 0, "since": None}) + tasks = load_json(tasks_path, {"tasks": []}) + task_items = tasks.get("tasks", []) + if not isinstance(task_items, list): + raise ValueError("tasks.json must contain {'tasks': [...]} structure") + + homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip() + token = str(config.get("matrix", {}).get("access_token", "")).strip() + room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip() + bot_user_id = str(config.get("matrix", {}).get("bot_user_id", "")).strip() + if not homeserver or not token or not room_id: + raise ValueError("matrix homeserver_url, access_token, and control_room_id are required") + + since = state.get("since") + path = "/_matrix/client/v3/sync?timeout=1" + if since: + path += "&since=" + urllib.parse.quote(str(since), safe="") + sync = matrix_request(homeserver, token, "GET", path) + if "next_batch" in sync: + state["since"] = sync["next_batch"] + + room_timeline = ( + sync.get("rooms", {}) + .get("join", {}) + .get(room_id, {}) + .get("timeline", {}) + .get("events", []) + ) + added = 0 + existing = {str(t.get("id")) for t in task_items if isinstance(t, dict)} + for evt in room_timeline: + if evt.get("type") != "m.room.message": + continue + sender = str(evt.get("sender", "")) + if bot_user_id and sender == bot_user_id: + continue + body = str(evt.get("content", {}).get("body", "")) + task = parse_task_command(body) + if not task: + continue + task_id = str(task.get("id")) + if task_id in existing: + continue + task_items.append(task) + existing.add(task_id) + added += 1 + + save_json(tasks_path, {"tasks": task_items}) + save_json(bridge_state_path, state) + return added + + +def main() -> int: + p = argparse.ArgumentParser(description="Mosaic Matrix transport bridge") + p.add_argument("--repo", default=".", help="Repository root") + p.add_argument("--mode", required=True, choices=["publish", "consume"], help="Bridge mode") + args = p.parse_args() + + repo = pathlib.Path(args.repo).resolve() + config = load_json(repo / ".mosaic" / "orchestrator" / "config.json", {}) + if not config.get("enabled", False): + print("[mosaic-orch-matrix] disabled in config (enabled=false)") + return 0 + + if str(config.get("transport", "")).strip() != "matrix": + print("[mosaic-orch-matrix] config transport != matrix; nothing to do") + return 0 + + if args.mode == "publish": + count = publish_events(repo, config) + print(f"[mosaic-orch-matrix] published_events={count}") + return 0 + + count = consume_tasks(repo, config) + print(f"[mosaic-orch-matrix] consumed_tasks={count}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/templates/repo/.mosaic/README.md b/templates/repo/.mosaic/README.md index f61cf37..49f48d6 100644 --- a/templates/repo/.mosaic/README.md +++ b/templates/repo/.mosaic/README.md @@ -45,3 +45,10 @@ Run continuously: ```bash ~/.mosaic/bin/mosaic-orchestrator-run --poll-sec 10 ``` + +Bridge events to Matrix: + +```bash +~/.mosaic/bin/mosaic-orchestrator-matrix-publish +~/.mosaic/bin/mosaic-orchestrator-matrix-consume +``` diff --git a/templates/repo/.mosaic/orchestrator/config.json b/templates/repo/.mosaic/orchestrator/config.json index 8ede0fb..3cf84b0 100644 --- a/templates/repo/.mosaic/orchestrator/config.json +++ b/templates/repo/.mosaic/orchestrator/config.json @@ -4,7 +4,9 @@ "matrix": { "control_room_id": "", "workspace_id": "", - "homeserver_url": "" + "homeserver_url": "", + "access_token": "", + "bot_user_id": "" }, "worker": { "runtime": "codex", diff --git a/templates/repo/.mosaic/orchestrator/matrix_state.json b/templates/repo/.mosaic/orchestrator/matrix_state.json new file mode 100644 index 0000000..4ca7856 --- /dev/null +++ b/templates/repo/.mosaic/orchestrator/matrix_state.json @@ -0,0 +1,4 @@ +{ + "last_published_line": 0, + "since": null +}