add matrix publish/consume transport for orchestrator rail

This commit is contained in:
Jason Woltje
2026-02-17 13:35:25 -06:00
parent 4c7c314aac
commit d7d5415bce
11 changed files with 266 additions and 2 deletions

View File

@@ -138,6 +138,8 @@ Run from a bootstrapped repo:
```bash
~/.mosaic/bin/mosaic-orchestrator-run --once
~/.mosaic/bin/mosaic-orchestrator-run --poll-sec 10
~/.mosaic/bin/mosaic-orchestrator-matrix-publish
~/.mosaic/bin/mosaic-orchestrator-matrix-consume
```
The controller reads/writes repo-local state in `.mosaic/orchestrator/` and emits

View File

@@ -58,6 +58,7 @@ copy_file "$TEMPLATE_ROOT/.mosaic/quality-rails.yml" "$TARGET_DIR/.mosaic/qualit
copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/config.json" "$TARGET_DIR/.mosaic/orchestrator/config.json"
copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/tasks.json" "$TARGET_DIR/.mosaic/orchestrator/tasks.json"
copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/state.json" "$TARGET_DIR/.mosaic/orchestrator/state.json"
copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/matrix_state.json" "$TARGET_DIR/.mosaic/orchestrator/matrix_state.json"
copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/logs/.gitkeep" "$TARGET_DIR/.mosaic/orchestrator/logs/.gitkeep"
copy_file "$TEMPLATE_ROOT/.mosaic/orchestrator/results/.gitkeep" "$TARGET_DIR/.mosaic/orchestrator/results/.gitkeep"

View File

@@ -126,6 +126,9 @@ expect_file "$MOSAIC_HOME/bin/mosaic-sync-skills"
expect_file "$MOSAIC_HOME/bin/mosaic-quality-apply"
expect_file "$MOSAIC_HOME/bin/mosaic-quality-verify"
expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-run"
expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-matrix-publish"
expect_file "$MOSAIC_HOME/bin/mosaic-orchestrator-matrix-consume"
expect_file "$MOSAIC_HOME/rails/orchestrator-matrix/transport/matrix_transport.py"
# Claude runtime file checks (copied, non-symlink).
for rf in CLAUDE.md settings.json hooks-config.json context7-integration.md; do

View File

@@ -0,0 +1,12 @@
#!/usr/bin/env bash
set -euo pipefail
MOSAIC_HOME="${MOSAIC_HOME:-$HOME/.mosaic}"
BRIDGE="$MOSAIC_HOME/rails/orchestrator-matrix/transport/matrix_transport.py"
if [[ ! -f "$BRIDGE" ]]; then
echo "[mosaic-orch-matrix] missing transport bridge: $BRIDGE" >&2
exit 1
fi
exec python3 "$BRIDGE" --repo "$(pwd)" --mode consume "$@"

View File

@@ -0,0 +1,12 @@
#!/usr/bin/env bash
set -euo pipefail
MOSAIC_HOME="${MOSAIC_HOME:-$HOME/.mosaic}"
BRIDGE="$MOSAIC_HOME/rails/orchestrator-matrix/transport/matrix_transport.py"
if [[ ! -f "$BRIDGE" ]]; then
echo "[mosaic-orch-matrix] missing transport bridge: $BRIDGE" >&2
exit 1
fi
exec python3 "$BRIDGE" --repo "$(pwd)" --mode publish "$@"

View File

@@ -44,7 +44,26 @@ Continuous loop:
~/.mosaic/bin/mosaic-orchestrator-run --poll-sec 10
```
Publish new orchestrator events to Matrix:
```bash
~/.mosaic/bin/mosaic-orchestrator-matrix-publish
```
Consume Matrix task messages into `tasks.json`:
```bash
~/.mosaic/bin/mosaic-orchestrator-matrix-consume
```
## Matrix Note
This rail writes canonical events to `.mosaic/orchestrator/events.ndjson`.
Matrix bridge services can consume and relay these events to Matrix rooms.
The Matrix transport bridge publishes those events into the configured control room
and can consume task commands from that room.
Task injection message format (room text):
```text
!mosaic-task {"id":"TASK-123","title":"Fix bug","command":"echo run","quality_gates":["pnpm lint"]}
```

View File

@@ -0,0 +1,2 @@
__pycache__/
*.pyc

View File

@@ -0,0 +1,200 @@
#!/usr/bin/env python3
"""Matrix transport bridge for Mosaic orchestrator events/tasks."""
from __future__ import annotations
import argparse
import json
import pathlib
import urllib.parse
import urllib.request
import uuid
from typing import Any
def load_json(path: pathlib.Path, default: Any) -> Any:
if not path.exists():
return default
with path.open("r", encoding="utf-8") as f:
return json.load(f)
def save_json(path: pathlib.Path, data: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
f.write("\n")
tmp.replace(path)
def matrix_request(
homeserver: str,
access_token: str,
method: str,
path: str,
payload: dict[str, Any] | None = None,
) -> dict[str, Any]:
url = homeserver.rstrip("/") + path
body = None
headers = {"Authorization": f"Bearer {access_token}"}
if payload is not None:
body = json.dumps(payload, ensure_ascii=True).encode("utf-8")
headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, method=method, data=body, headers=headers)
with urllib.request.urlopen(req, timeout=30) as resp:
raw = resp.read().decode("utf-8")
return json.loads(raw) if raw else {}
def matrix_send_message(homeserver: str, access_token: str, room_id: str, message: str) -> None:
txn = str(uuid.uuid4())
path = f"/_matrix/client/v3/rooms/{urllib.parse.quote(room_id, safe='')}/send/m.room.message/{txn}"
matrix_request(
homeserver,
access_token,
"PUT",
path,
{"msgtype": "m.text", "body": message},
)
def format_event_message(event: dict[str, Any]) -> str:
et = event.get("event_type", "unknown")
tid = event.get("task_id", "unknown")
status = event.get("status", "unknown")
msg = event.get("message", "")
return f"[mosaic-orch] {et} task={tid} status={status} :: {msg}"
def publish_events(repo: pathlib.Path, config: dict[str, Any]) -> int:
orch = repo / ".mosaic" / "orchestrator"
events_path = orch / "events.ndjson"
bridge_state_path = orch / "matrix_state.json"
state = load_json(bridge_state_path, {"last_published_line": 0, "since": None})
homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip()
token = str(config.get("matrix", {}).get("access_token", "")).strip()
room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip()
if not homeserver or not token or not room_id:
raise ValueError("matrix homeserver_url, access_token, and control_room_id are required")
if not events_path.exists():
return 0
lines = events_path.read_text(encoding="utf-8").splitlines()
start = int(state.get("last_published_line", 0))
published = 0
for idx, line in enumerate(lines[start:], start=start + 1):
if not line.strip():
continue
event = json.loads(line)
matrix_send_message(homeserver, token, room_id, format_event_message(event))
state["last_published_line"] = idx
published += 1
save_json(bridge_state_path, state)
return published
def parse_task_command(body: str) -> dict[str, Any] | None:
raw = body.strip()
if raw.startswith("!mosaic-task "):
payload = raw[len("!mosaic-task ") :].strip()
elif raw.startswith("@mosaic task "):
payload = raw[len("@mosaic task ") :].strip()
else:
return None
task = json.loads(payload)
if not isinstance(task, dict):
raise ValueError("task payload must be a JSON object")
if "id" not in task or "title" not in task:
raise ValueError("task payload requires id and title")
task.setdefault("status", "pending")
return task
def consume_tasks(repo: pathlib.Path, config: dict[str, Any]) -> int:
orch = repo / ".mosaic" / "orchestrator"
tasks_path = orch / "tasks.json"
bridge_state_path = orch / "matrix_state.json"
state = load_json(bridge_state_path, {"last_published_line": 0, "since": None})
tasks = load_json(tasks_path, {"tasks": []})
task_items = tasks.get("tasks", [])
if not isinstance(task_items, list):
raise ValueError("tasks.json must contain {'tasks': [...]} structure")
homeserver = str(config.get("matrix", {}).get("homeserver_url", "")).strip()
token = str(config.get("matrix", {}).get("access_token", "")).strip()
room_id = str(config.get("matrix", {}).get("control_room_id", "")).strip()
bot_user_id = str(config.get("matrix", {}).get("bot_user_id", "")).strip()
if not homeserver or not token or not room_id:
raise ValueError("matrix homeserver_url, access_token, and control_room_id are required")
since = state.get("since")
path = "/_matrix/client/v3/sync?timeout=1"
if since:
path += "&since=" + urllib.parse.quote(str(since), safe="")
sync = matrix_request(homeserver, token, "GET", path)
if "next_batch" in sync:
state["since"] = sync["next_batch"]
room_timeline = (
sync.get("rooms", {})
.get("join", {})
.get(room_id, {})
.get("timeline", {})
.get("events", [])
)
added = 0
existing = {str(t.get("id")) for t in task_items if isinstance(t, dict)}
for evt in room_timeline:
if evt.get("type") != "m.room.message":
continue
sender = str(evt.get("sender", ""))
if bot_user_id and sender == bot_user_id:
continue
body = str(evt.get("content", {}).get("body", ""))
task = parse_task_command(body)
if not task:
continue
task_id = str(task.get("id"))
if task_id in existing:
continue
task_items.append(task)
existing.add(task_id)
added += 1
save_json(tasks_path, {"tasks": task_items})
save_json(bridge_state_path, state)
return added
def main() -> int:
p = argparse.ArgumentParser(description="Mosaic Matrix transport bridge")
p.add_argument("--repo", default=".", help="Repository root")
p.add_argument("--mode", required=True, choices=["publish", "consume"], help="Bridge mode")
args = p.parse_args()
repo = pathlib.Path(args.repo).resolve()
config = load_json(repo / ".mosaic" / "orchestrator" / "config.json", {})
if not config.get("enabled", False):
print("[mosaic-orch-matrix] disabled in config (enabled=false)")
return 0
if str(config.get("transport", "")).strip() != "matrix":
print("[mosaic-orch-matrix] config transport != matrix; nothing to do")
return 0
if args.mode == "publish":
count = publish_events(repo, config)
print(f"[mosaic-orch-matrix] published_events={count}")
return 0
count = consume_tasks(repo, config)
print(f"[mosaic-orch-matrix] consumed_tasks={count}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -45,3 +45,10 @@ Run continuously:
```bash
~/.mosaic/bin/mosaic-orchestrator-run --poll-sec 10
```
Bridge events to Matrix:
```bash
~/.mosaic/bin/mosaic-orchestrator-matrix-publish
~/.mosaic/bin/mosaic-orchestrator-matrix-consume
```

View File

@@ -4,7 +4,9 @@
"matrix": {
"control_room_id": "",
"workspace_id": "",
"homeserver_url": ""
"homeserver_url": "",
"access_token": "",
"bot_user_id": ""
},
"worker": {
"runtime": "codex",

View File

@@ -0,0 +1,4 @@
{
"last_published_line": 0,
"since": null
}