From 63c30b564de863dfa12a85f47ffedb21e74ba64c Mon Sep 17 00:00:00 2001 From: Jarvis Date: Fri, 27 Mar 2026 21:06:22 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20add=20MACP=20event=20bridge=20=E2=80=94?= =?UTF-8?q?=20watcher,=20webhook,=20Discord=20formatter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bin/mosaic-macp | 67 ++++++++ docs/DEVELOPER-GUIDE/README.md | 1 + .../orchestrator-matrix/macp-phase2a.md | 45 ++++++ docs/PRD.md | 103 ++++++------ docs/SITEMAP.md | 2 + docs/TASKS.md | 2 +- docs/scratchpads/macp-phase2a.md | 61 +++++++ docs/tasks/MACP-PHASE2A-brief.md | 152 ++++++++++++++++++ tools/orchestrator-matrix/README.md | 6 + tools/orchestrator-matrix/events/__init__.py | 15 ++ .../__pycache__/__init__.cpython-312.pyc | Bin 0 -> 506 bytes .../discord_formatter.cpython-312.pyc | Bin 0 -> 6321 bytes .../__pycache__/event_watcher.cpython-312.pyc | Bin 0 -> 8097 bytes .../webhook_adapter.cpython-312.pyc | Bin 0 -> 4313 bytes .../events/discord_formatter.py | 125 ++++++++++++++ .../events/event_watcher.py | 144 +++++++++++++++++ .../events/webhook_adapter.py | 66 ++++++++ 17 files changed, 736 insertions(+), 53 deletions(-) create mode 100644 docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md create mode 100644 docs/scratchpads/macp-phase2a.md create mode 100644 docs/tasks/MACP-PHASE2A-brief.md create mode 100644 tools/orchestrator-matrix/events/__init__.py create mode 100644 tools/orchestrator-matrix/events/__pycache__/__init__.cpython-312.pyc create mode 100644 tools/orchestrator-matrix/events/__pycache__/discord_formatter.cpython-312.pyc create mode 100644 tools/orchestrator-matrix/events/__pycache__/event_watcher.cpython-312.pyc create mode 100644 tools/orchestrator-matrix/events/__pycache__/webhook_adapter.cpython-312.pyc create mode 100644 tools/orchestrator-matrix/events/discord_formatter.py create mode 100644 tools/orchestrator-matrix/events/event_watcher.py create mode 100644 tools/orchestrator-matrix/events/webhook_adapter.py diff --git a/bin/mosaic-macp b/bin/mosaic-macp index 413ab55..43024fa 100755 --- a/bin/mosaic-macp +++ b/bin/mosaic-macp @@ -16,6 +16,7 @@ Usage: mosaic macp status [--task-id TASK-001] mosaic macp drain mosaic macp history [--task-id TASK-001] + mosaic macp watch [--webhook] [--once] USAGE } @@ -193,6 +194,71 @@ for line in events_path.read_text(encoding="utf-8").splitlines(): PY } +watch_events() { + require_repo + local webhook_enabled="false" + local run_once="false" + while [[ $# -gt 0 ]]; do + case "$1" in + --webhook) webhook_enabled="true"; shift ;; + --once) run_once="true"; shift ;; + -h|--help) usage; exit 0 ;; + *) echo "[mosaic-macp] unknown watch option: $1" >&2; exit 1 ;; + esac + done + + python3 - "$repo_root" "$config_path" "$events_path" "$orch_dir/event_cursor.json" "$webhook_enabled" "$run_once" <<'PY' +import json +import pathlib +import sys + +repo_root = pathlib.Path(sys.argv[1]).resolve() +config_path = pathlib.Path(sys.argv[2]).resolve() +events_path = pathlib.Path(sys.argv[3]).resolve() +cursor_path = pathlib.Path(sys.argv[4]).resolve() +webhook_flag = sys.argv[5].lower() == "true" +run_once = sys.argv[6].lower() == "true" + +events_dir = repo_root / "tools" / "orchestrator-matrix" / "events" +if str(events_dir) not in sys.path: + sys.path.insert(0, str(events_dir)) + +from discord_formatter import format_event +from event_watcher import EventWatcher +from webhook_adapter import create_webhook_callback + +config = {} +if config_path.exists(): + config = json.loads(config_path.read_text(encoding="utf-8")) + +macp = dict(config.get("macp") or {}) +watcher = EventWatcher( + events_path=events_path, + cursor_path=cursor_path, + poll_interval=float(macp.get("watch_poll_interval_seconds") or 2.0), +) + +def print_callback(event: dict) -> None: + rendered = format_event(event) + if rendered: + print(rendered) + +watcher.on([], print_callback) + +webhook_config = dict(macp.get("webhook") or {}) +if webhook_flag and bool(webhook_config.get("enabled", False)): + watcher.on(list(webhook_config.get("event_filter") or []), create_webhook_callback(config)) +elif webhook_flag: + print("[mosaic-macp] webhook requested but disabled in config", file=sys.stderr) + +if run_once: + processed = watcher.poll_once() + print(f"[mosaic-macp] processed_events={len(processed)}") +else: + watcher.run() +PY +} + subcommand="${1:-help}" if [[ $# -gt 0 ]]; then shift @@ -203,6 +269,7 @@ case "$subcommand" in status) status_tasks "$@" ;; drain) drain_tasks "$@" ;; history) history_tasks "$@" ;; + watch) watch_events "$@" ;; help|-h|--help|"") usage ;; *) echo "[mosaic-macp] unknown subcommand: $subcommand" >&2 diff --git a/docs/DEVELOPER-GUIDE/README.md b/docs/DEVELOPER-GUIDE/README.md index 148140e..a66ae35 100644 --- a/docs/DEVELOPER-GUIDE/README.md +++ b/docs/DEVELOPER-GUIDE/README.md @@ -3,3 +3,4 @@ ## Orchestrator Matrix - [MACP Phase 1](./orchestrator-matrix/macp-phase1.md) +- [MACP Phase 2A](./orchestrator-matrix/macp-phase2a.md) diff --git a/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md new file mode 100644 index 0000000..443a549 --- /dev/null +++ b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md @@ -0,0 +1,45 @@ +# MACP Phase 2A + +MACP Phase 2A adds the repo-local event bridge that makes orchestrator lifecycle events consumable by external systems. + +## What Changed + +1. `tools/orchestrator-matrix/events/event_watcher.py` polls `.mosaic/orchestrator/events.ndjson`, parses appended NDJSON events, dispatches callbacks, and persists a byte-offset cursor in `.mosaic/orchestrator/event_cursor.json`. +2. `tools/orchestrator-matrix/events/webhook_adapter.py` forwards selected MACP events to a configured webhook endpoint with bounded retries and optional bearer auth. +3. `tools/orchestrator-matrix/events/discord_formatter.py` renders task lifecycle events into concise Discord-friendly status lines. +4. `bin/mosaic-macp` adds `watch` mode for one-shot or continuous event processing. + +## Watcher Behavior + +1. File watching is polling-based and stdlib-only for portability. +2. The watcher resets its cursor if the events file is truncated. +3. Corrupt JSON lines are logged to stderr and skipped. +4. A trailing partial line is left unread until the newline arrives, preventing half-written events from being consumed. + +## Webhook Configuration + +Configure `.mosaic/orchestrator/config.json` under `macp.webhook`: + +```json +{ + "macp": { + "webhook": { + "enabled": false, + "url": "http://localhost:8080/macp/events", + "auth_token": "", + "timeout_seconds": 10, + "retry_count": 2, + "event_filter": ["task.completed", "task.failed", "task.escalated"] + } + } +} +``` + +## CLI + +```bash +mosaic macp watch --once +mosaic macp watch --webhook +``` + +`--once` performs a single poll and exits. `--webhook` enables delivery via the configured `macp.webhook` block while still printing Discord-formatted event lines to stdout. diff --git a/docs/PRD.md b/docs/PRD.md index be490b8..57408ed 100644 --- a/docs/PRD.md +++ b/docs/PRD.md @@ -1,4 +1,4 @@ -# PRD: MACP Phase 1 Core Protocol Implementation +# PRD: MACP Phase 2A Event Bridge + Notification System ## Metadata @@ -9,90 +9,89 @@ ## Problem Statement -The current orchestrator-matrix rail can queue shell-based worker tasks, but it does not yet expose a standardized protocol for dispatch selection, worktree-aware execution, structured results, or manual MACP queue operations. MACP Phase 1 extends the existing rail so orchestrators can delegate to multiple runtimes through a consistent task model while preserving current behavior for legacy tasks. +MACP Phase 1 writes structured lifecycle events to `.mosaic/orchestrator/events.ndjson`, but no repo-local bridge consumes those events for external systems. Phase 2A adds a portable watcher, webhook delivery, and Discord-friendly formatting so MACP event streams can drive OpenClaw integrations and human-facing notifications. ## Objectives -1. Extend the existing orchestrator-matrix protocol and controller to support MACP-aware task dispatch and status tracking. -2. Add a dispatcher layer that manages worktree lifecycle, runtime command generation, and standardized results. -3. Provide a CLI entrypoint for manual MACP submission, status inspection, queue draining, and history review. +1. Add a synchronous event watcher that tails `events.ndjson` using stdlib-only file polling and persists cursor state across restarts. +2. Add a webhook adapter that can forward selected MACP events to a configured HTTP endpoint with bounded retries. +3. Add a Discord formatter that turns task lifecycle events into concise human-readable strings. +4. Extend the `mosaic macp` CLI with a `watch` command for one-shot or continuous event bridge execution. ## Scope ### In Scope -1. Extend the orchestrator task and event schemas and add a result schema. -2. Add a Python dispatcher module under `tools/orchestrator-matrix/dispatcher/`. -3. Update the controller to use the dispatcher for MACP-aware tasks while preserving legacy execution paths. -4. Update orchestrator config templates, task markdown sync logic, and CLI routing/scripts for MACP commands. -5. Add verification for backward compatibility, schema validity, imports, and basic MACP execution flow. +1. New `tools/orchestrator-matrix/events/` package with watcher, webhook adapter, and Discord formatter modules. +2. Cursor persistence at `.mosaic/orchestrator/event_cursor.json`. +3. `mosaic macp watch [--webhook] [--once]` CLI support using `.mosaic/orchestrator/config.json`. +4. Stdlib-only verification of watcher polling, webhook delivery, Discord formatting, CLI watch behavior, and cursor persistence. +5. Developer documentation and sitemap updates covering the Phase 2A event bridge. ### Out of Scope -1. Rewriting the orchestrator controller architecture. -2. Changing Matrix transport behavior beyond schema compatibility. -3. Implementing real OpenClaw `sessions_spawn` execution beyond producing the config payload/command for callers. -4. Adding non-stdlib Python dependencies or npm-based tooling. +1. Adding Discord transport or webhook server hosting inside this repository. +2. Replacing the existing Matrix transport bridge. +3. Introducing async, threads, or third-party Python packages. +4. Changing event emission behavior in the controller beyond consuming the existing event stream. ## User/Stakeholder Requirements -1. MACP must evolve the current orchestrator-matrix implementation rather than replace it. -2. Legacy task queues without `dispatch` fields must continue to run exactly as before. -3. MACP-aware tasks must support dispatch modes `yolo`, `acp`, and `exec`. -4. Results must be written in a structured JSON format suitable for audit and orchestration follow-up. -5. A manual `mosaic macp` CLI must expose submit, status, drain, and history flows. +1. External systems must be able to consume MACP events without reading the NDJSON file directly. +2. The watcher must remain portable across environments, so file polling is required instead of platform-specific file watching. +3. Restarting the watcher must not replay previously consumed events. +4. Webhook delivery failures must be logged and isolated so the watcher loop continues running. +5. Discord formatting must stay concise and useful for task lifecycle visibility. ## Functional Requirements -1. Task schema must include MACP dispatch, worktree, result, retry, branch, brief, issue/PR, and dependency fields. -2. Event schema must recognize `task.gated`, `task.escalated`, and `task.retry.scheduled`, plus a `dispatcher` source. -3. Dispatcher functions must set up worktrees, build commands, execute tasks, collect results, and clean up worktrees. -4. Controller `run_single_task()` must route MACP-aware tasks through the dispatcher and emit the correct lifecycle events/status transitions. -5. `tasks_md_sync.py` must map optional MACP table columns only when those headers are present in `docs/TASKS.md`; absent MACP headers must not inject MACP fields into legacy tasks. -6. `bin/mosaic` must route `mosaic macp ...` to a new `bin/mosaic-macp` script. +1. `EventWatcher` must watch `.mosaic/orchestrator/events.ndjson`, parse appended JSON lines, and invoke registered callbacks for matching event types. +2. `EventWatcher.poll_once()` must tolerate a missing events file, truncated/corrupt lines, and cursor positions that are stale after file truncation. +3. Cursor writes must be atomic and stored at `.mosaic/orchestrator/event_cursor.json`. +4. `send_webhook(event, config)` must POST JSON to the configured URL using `urllib.request`, optionally adding a bearer token, respecting timeout, and retrying with exponential backoff. +5. `create_webhook_callback(config)` must return a callback that swallows/logs failures instead of raising into the watcher loop. +6. `format_event(event)` must support `task.completed`, `task.failed`, `task.escalated`, `task.gated`, and `task.started`, including useful task metadata when present. +7. `format_summary(events)` must produce a short batch summary suitable for notification digests. +8. `bin/mosaic-macp` must expose `watch`, optionally enabling webhook delivery from config, and support one-shot polling with `--once`. ## Non-Functional Requirements -1. Security: no secrets embedded in generated commands, config, or results. -2. Performance: controller remains deterministic and synchronous with no async or thread-based orchestration. -3. Reliability: worktree creation/cleanup failures must be surfaced predictably and produce structured task failure/escalation states. -4. Observability: lifecycle events, logs, and result JSON must clearly show task outcome, attempts, gates, and errors. +1. Security: no secrets embedded in code or logs; auth token only sent via header when configured. +2. Performance: each webhook attempt must be bounded by `timeout_seconds`; no event-processing path may hang indefinitely. +3. Reliability: corrupt input lines and callback delivery failures must be logged to stderr and skipped without crashing the watcher. +4. Portability: Python 3.10+ stdlib only; no OS-specific file watcher APIs. +5. Observability: warnings and failures must be clear enough to diagnose cursor, parsing, and webhook problems. ## Acceptance Criteria -1. Existing legacy tasks without `dispatch` still run through the old shell path with unchanged behavior. -2. MACP-aware `exec` tasks run through the dispatcher and produce result JSON with gate outcomes. -3. New schemas validate task/event/result payload expectations for MACP fields and statuses. -4. `mosaic macp submit`, `status`, and `history` work from a bootstrapped repo state, and `drain` delegates to the existing orchestrator runner. -5. Python imports for the updated controller, dispatcher, and sync code complete without errors on Python 3.10+. +1. `EventWatcher.poll_once()` reads newly appended events, returns parsed dicts, invokes registered callbacks, and skips already-consumed events after restart. +2. Webhook delivery posts matching events to a local test endpoint, supports bearer auth configuration, and retries boundedly on failure. +3. Discord formatter returns expected concise strings for the required task lifecycle event types and a usable batch summary. +4. `mosaic macp watch --once` processes events from a bootstrapped repo state without error and honors `--webhook`. +5. Cursor persistence prevents replay on a second run and resets safely when the events file is truncated. ## Constraints and Dependencies 1. Python implementation must use stdlib only and support Python 3.10+. -2. Shell tooling must remain bash-based and fit the existing Mosaic CLI style. -3. Dispatch fallback rules must use `exec` when `dispatch` is absent and config/default runtime when `runtime` is absent. -4. Worktree convention must derive from the repository name and task metadata unless explicitly overridden by task fields. +2. Shell CLI behavior must remain bash-based and consistent with the existing Mosaic command style. +3. The watcher consumes the event schema already emitted by Phase 1 controller logic. +4. Webhook configuration lives under `.mosaic/orchestrator/config.json` at `macp.webhook`. ## Risks and Open Questions -1. Risk: yolo command execution requires a PTY, so the dispatcher needs a safe wrapper that still behaves under `subprocess`. -2. Risk: worktree cleanup could remove a path unexpectedly if task metadata is malformed. -3. Risk: old queue consumers may assume only the original task statuses and event types. -4. Open Question: whether `task.gated` should be emitted by the dispatcher or controller once worker execution ends and quality gates begin. +1. Risk: partial writes may leave an incomplete trailing JSON line that must not advance the cursor incorrectly. +2. Risk: synchronous webhook retries can slow one poll cycle if the endpoint is unavailable; timeout and retry behavior must remain bounded. +3. Risk: event payloads may omit optional metadata fields, so formatter output must degrade cleanly. +4. ASSUMPTION: the watcher should advance past corrupt lines after logging them so a single bad line does not permanently stall downstream consumption. +5. ASSUMPTION: CLI `watch` should default to no-op callback processing when no delivery option is enabled, while still updating the cursor and reporting processed count. ## Testing and Verification Expectations -1. Baseline checks: Python import validation, targeted script execution checks, JSON syntax/schema validation, and any repo-local validation applicable to changed code paths. -2. Situational testing: legacy orchestrator run with old-style tasks, MACP `exec` flow including result file generation, CLI submit/status/history behavior, and worktree lifecycle validation. -3. Evidence format: command-level results captured in the scratchpad and summarized in the final delivery report. +1. Baseline checks: Python bytecode compilation/import validation for new modules and shell syntax validation for `bin/mosaic-macp`. +2. Situational tests: temporary orchestrator state exercising watcher polling, callback filtering, webhook POST capture, CLI one-shot watch execution, and cursor persistence across repeated runs. +3. Evidence format: command-level results recorded in the scratchpad and summarized against acceptance criteria. ## Milestone / Delivery Intent -1. Target milestone/version: 0.0.x bootstrap enhancement -2. Definition of done: code merged to `main`, CI terminal green, issue `#8` closed, and verification evidence recorded against all acceptance criteria. - -## Assumptions - -1. ASSUMPTION: A single issue can track the full Phase 1 implementation because the user requested one bounded feature delivery rather than separate independent tickets. -2. ASSUMPTION: For `acp` dispatch in Phase 1, the controller must escalate the task immediately with a clear reason instead of pretending work ran before OpenClaw integration exists. -3. ASSUMPTION: `task.gated` should be emitted by the controller as the transition into quality-gate execution, which keeps gate-state ownership in one place alongside the existing gate loop. +1. Target milestone/version: Phase 2A observability bridge +2. Definition of done: code merged to `main`, CI terminal green, issue `#10` closed, and verification evidence recorded against all acceptance criteria. diff --git a/docs/SITEMAP.md b/docs/SITEMAP.md index c4e8b1c..1694313 100644 --- a/docs/SITEMAP.md +++ b/docs/SITEMAP.md @@ -4,4 +4,6 @@ - [Tasks](./TASKS.md) - [Developer Guide](./DEVELOPER-GUIDE/README.md) - [Task Briefs](./tasks/MACP-PHASE1-brief.md) +- [Task Briefs](./tasks/MACP-PHASE2A-brief.md) - [Scratchpads](./scratchpads/macp-phase1.md) +- [Scratchpads](./scratchpads/macp-phase2a.md) diff --git a/docs/TASKS.md b/docs/TASKS.md index 2578549..9c17d20 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -14,4 +14,4 @@ Canonical tracking for active work. Keep this file current. | id | status | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | |---|---|---|---|---|---|---|---|---|---|---|---|---|---| -| MACP-PHASE1 | in-progress | Implement MACP Phase 1 across orchestrator schemas, dispatcher, controller, CLI, config, and task sync while preserving legacy queue behavior. | #8 | bootstrap | feat/macp-phase1 | | | Jarvis | 2026-03-27T23:00:00Z | | medium | in-progress | Review-fix pass started 2026-03-28T00:38:01Z to address backward-compatibility, ACP safety, result timing, and worktree/brief security findings on top of the blocked PR-create state. Prior blocker remains: `~/.config/mosaic/tools/git/pr-create.sh -t 'feat: implement MACP phase 1 core protocol' -b ... -B main -H feat/macp-phase1` failed with `Remote repository required: Specify ID via --repo or execute from a local git repo.` | +| MACP-PHASE2A | in-progress | Build the MACP event bridge with event watcher, webhook adapter, Discord formatter, CLI watch wiring, docs updates, and verification evidence. | #10 | bootstrap | feat/macp-phase2a | | | Jarvis | 2026-03-28T02:02:38Z | | medium | in-progress | Issue created via `~/.config/mosaic/tools/git/issue-create.sh` fallback after `tea` reported `Remote repository required: Specify ID via --repo or execute from a local git repo.` | diff --git a/docs/scratchpads/macp-phase2a.md b/docs/scratchpads/macp-phase2a.md new file mode 100644 index 0000000..8aae4b4 --- /dev/null +++ b/docs/scratchpads/macp-phase2a.md @@ -0,0 +1,61 @@ +# MACP Phase 2A Scratchpad + +## Session Start + +- Session date: 2026-03-27 / 2026-03-28 America/Chicago +- Branch: `feat/macp-phase2a` +- Issue: `#10` +- Objective: build the MACP event bridge and notification system from `docs/tasks/MACP-PHASE2A-brief.md` + +## Budget + +- Budget mode: inferred working budget +- Estimate: medium +- Token strategy: keep context narrow to event-bridge files, verify with targeted temp-repo tests, avoid unnecessary parallel deep dives + +## Requirements Notes + +- PRD updated to Phase 2A before coding +- TDD requirement: not mandatory for this feature work; targeted verification is sufficient because this is new observability functionality rather than a bug fix or auth/data-mutation change +- Documentation gate applies because developer-facing behavior and CLI surface change + +## Assumptions + +1. ASSUMPTION: corrupt or partial lines should be logged and skipped while still advancing the cursor past the offending line, preventing permanent replay loops. +2. ASSUMPTION: `mosaic macp watch` may run without webhook delivery enabled and should still process events plus persist cursor state. +3. ASSUMPTION: Discord formatting remains a pure formatting layer; no outbound Discord transport is part of Phase 2A. + +## Plan + +1. Update PRD/TASKS and create the Phase 2A issue/scratchpad. +2. Implement watcher, webhook adapter, formatter, and CLI wiring. +3. Update developer docs and sitemap. +4. Run baseline and situational verification. +5. Run independent code review, remediate findings, then commit/push/PR/merge/CI/issue-close. + +## Progress Log + +- 2026-03-28T02:02:38Z: Created provider issue `#10` for Phase 2A using Mosaic wrapper with Gitea API fallback. +- 2026-03-28T02:02:38Z: Replaced stale Phase 1 PRD/TASKS planning state with Phase 2A scope and tracking. + +## Verification Plan + +| Acceptance Criterion | Verification Method | Evidence | +|---|---|---| +| AC-1 watcher polls new events and respects cursor | Temp events file + repeated `poll_once()` / CLI runs | pending | +| AC-2 webhook delivery retries and succeeds/fails cleanly | Local stdlib echo server capture | pending | +| AC-3 Discord formatting covers required event types | Targeted Python formatter check | pending | +| AC-4 `mosaic macp watch --once` runs cleanly | CLI one-shot execution in temp repo | pending | +| AC-5 cursor persistence handles repeat run and truncation | Temp repo repeated runs with truncated file scenario | pending | + +## Tests Run + +- pending + +## Review Notes + +- pending + +## Risks / Blockers + +- Potential git wrapper friction in worktrees for PR creation/merge steps; if it recurs, capture exact failing command and stop per Mosaic contract. diff --git a/docs/tasks/MACP-PHASE2A-brief.md b/docs/tasks/MACP-PHASE2A-brief.md new file mode 100644 index 0000000..378b9a1 --- /dev/null +++ b/docs/tasks/MACP-PHASE2A-brief.md @@ -0,0 +1,152 @@ +# MACP Phase 2A — Event Bridge + Notification System + +**Branch:** `feat/macp-phase2a` +**Repo worktree:** `~/src/mosaic-bootstrap-worktrees/macp-phase2a` + +--- + +## Objective + +Build the event bridge that makes MACP events consumable by external systems (OpenClaw, Discord, webhooks). This is the observability layer — the controller already writes events to `events.ndjson`, but nothing reads them yet. + +--- + +## Task 1: Event File Watcher (`tools/orchestrator-matrix/events/event_watcher.py`) + +New Python module that tails `events.ndjson` and fires callbacks on new events. + +### Requirements: +- Watch `.mosaic/orchestrator/events.ndjson` for new lines (use file polling, not inotify — keeps it portable) +- Parse each new line as JSON +- Call registered callback functions with the parsed event +- Support filtering by event type (e.g., only `task.completed` and `task.failed`) +- Maintain a cursor (last read position) so restarts don't replay old events +- Cursor stored in `.mosaic/orchestrator/event_cursor.json` + +### Key Functions: +```python +class EventWatcher: + def __init__(self, events_path: Path, cursor_path: Path, poll_interval: float = 2.0): + ... + + def on(self, event_types: list[str], callback: Callable[[dict], None]) -> None: + """Register a callback for specific event types.""" + + def poll_once(self) -> list[dict]: + """Read new events since last cursor position. Returns list of new events.""" + + def run(self, max_iterations: int = 0) -> None: + """Polling loop. max_iterations=0 means infinite.""" +``` + +### Constraints: +- Python 3.10+ stdlib only (no pip dependencies) +- Must handle truncated/corrupt lines gracefully (skip, log warning) +- File might not exist yet — handle gracefully +- Thread-safe cursor updates (atomic write via temp file rename) + +--- + +## Task 2: Webhook Adapter (`tools/orchestrator-matrix/events/webhook_adapter.py`) + +POST events to a configurable URL. This is how the OC plugin will consume MACP events. + +### Requirements: +- Accept an event dict, POST it as JSON to a configured URL +- Support optional `Authorization` header (bearer token) +- Configurable from `.mosaic/orchestrator/config.json` under `macp.webhook`: + ```json + { + "macp": { + "webhook": { + "enabled": false, + "url": "http://localhost:8080/macp/events", + "auth_token": "", + "timeout_seconds": 10, + "retry_count": 2, + "event_filter": ["task.completed", "task.failed", "task.escalated"] + } + } + } + ``` +- Retry with exponential backoff on failure (configurable count) +- Log failures but don't crash the watcher +- Return success/failure status + +### Key Functions: +```python +def send_webhook(event: dict, config: dict) -> bool: + """POST event to webhook URL. Returns True on success.""" + +def create_webhook_callback(config: dict) -> Callable[[dict], None]: + """Factory that creates a watcher callback from config.""" +``` + +### Constraints: +- Use `urllib.request` only (no `requests` library) +- Must not block the event watcher for more than `timeout_seconds` per event +- Log to stderr on failure + +--- + +## Task 3: Discord Notification Formatter (`tools/orchestrator-matrix/events/discord_formatter.py`) + +Format MACP events into human-readable Discord messages. + +### Requirements: +- Format functions for each event type: + - `task.completed` → "✅ **Task TASK-001 completed** — Implement user auth (attempt 1/1, 45s)" + - `task.failed` → "❌ **Task TASK-001 failed** — Build error: exit code 1 (attempt 2/3)" + - `task.escalated` → "🚨 **Task TASK-001 escalated** — Gate failures after 3 attempts. Human review needed." + - `task.gated` → "🔍 **Task TASK-001 gated** — Quality gates running..." + - `task.started` → "⚙️ **Task TASK-001 started** — Worker: codex, dispatch: yolo" +- Include task metadata: runtime, dispatch type, attempt count, duration (if available) +- Keep messages concise — Discord has character limits +- Return plain strings (the caller decides where to send them) + +### Key Functions: +```python +def format_event(event: dict) -> str | None: + """Format an MACP event for Discord. Returns None for unformattable events.""" + +def format_summary(events: list[dict]) -> str: + """Format a batch summary (e.g., daily digest).""" +``` + +--- + +## Wiring: CLI Integration + +Add to `bin/mosaic-macp`: +```bash +mosaic macp watch [--webhook] [--once] +``` +- `watch`: Start the event watcher with configured callbacks +- `--webhook`: Enable webhook delivery (reads config from `.mosaic/orchestrator/config.json`) +- `--once`: Poll once and exit (useful for cron) + +--- + +## Verification + +1. Create a test `events.ndjson` with sample events, run `event_watcher.poll_once()`, verify all events returned +2. Run watcher with webhook pointing to a local echo server, verify POST payload +3. Format each event type through `discord_formatter`, verify output strings +4. `mosaic macp watch --once` processes events without error +5. Cursor persistence: run twice, second run returns no events + +## File Map +``` +tools/orchestrator-matrix/ +├── events/ +│ ├── __init__.py ← NEW +│ ├── event_watcher.py ← NEW +│ ├── webhook_adapter.py ← NEW +│ └── discord_formatter.py ← NEW +``` + +## Ground Rules +- Python 3.10+ stdlib only +- No async/threads — synchronous polling +- Commit: `feat: add MACP event bridge — watcher, webhook, Discord formatter` +- Push to `feat/macp-phase2a` diff --git a/tools/orchestrator-matrix/README.md b/tools/orchestrator-matrix/README.md index 0461c12..cf98a1d 100644 --- a/tools/orchestrator-matrix/README.md +++ b/tools/orchestrator-matrix/README.md @@ -109,4 +109,10 @@ mosaic macp submit ... mosaic macp status mosaic macp drain mosaic macp history --task-id TASK-001 +mosaic macp watch --once +mosaic macp watch --webhook ``` + +The Phase 2A event bridge consumes `.mosaic/orchestrator/events.ndjson` through a polling watcher, +persists cursor state in `.mosaic/orchestrator/event_cursor.json`, and can fan out events to +Discord-formatted stdout lines or webhook callbacks. diff --git a/tools/orchestrator-matrix/events/__init__.py b/tools/orchestrator-matrix/events/__init__.py new file mode 100644 index 0000000..fcac56e --- /dev/null +++ b/tools/orchestrator-matrix/events/__init__.py @@ -0,0 +1,15 @@ +"""Event bridge helpers for MACP orchestrator events.""" + +from .discord_formatter import format_event +from .discord_formatter import format_summary +from .event_watcher import EventWatcher +from .webhook_adapter import create_webhook_callback +from .webhook_adapter import send_webhook + +__all__ = [ + "EventWatcher", + "create_webhook_callback", + "format_event", + "format_summary", + "send_webhook", +] diff --git a/tools/orchestrator-matrix/events/__pycache__/__init__.cpython-312.pyc b/tools/orchestrator-matrix/events/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..58ea1d475b533b83efabb487ffa4c7cd546ec03b GIT binary patch literal 506 zcmYjNJx{|h6nsvaCM}eYi4A2#5ww*M5)23lAvQp4h%A;Hn>K1vSB^_58^3~qjg4Qz zz%L+DC&a*p=+=qnv=w+b-8tvGm-kkyRRPcS{%ib106vUZ9BWP{XGk7_LkOI3D

B zTEuW0xgFXCFD0ea37vvFNjY^xx8P;uUg!~M19zWESQ&Xi<$m|{R`j&rQZa7}zax^q zP?>)tm4AMGa_LJIbws8W(|99{HEVp~d=QX)8C@yUw75pSGS}H4O_>@QM-hI-G=?bD z8<7%B3py07j+9**u_S4+sB5g5=<&sez)=o%sImduK*@lMa1m5-~m|PQz(@^>X6O=IB(NZQ^^Zx?s__r#)YtHu2 zq%`iun$kvpH1!Ts$p?uzQfv6wEGXMV__hfl?{M%5+heGYVQUO~V+ejqdV-J$};jpN)@SjD#bh`JoVh!ne`72 zX_Ts0ntSiud+y(vGvE2n{cA;qhk$f)=U=2yfFS;XU+hGcD&*oBs7whF8a)L^?*|J}Wt)bPxo&cmOKXgi`KPi4kaD zodoi>B{5FQ`0SMZJ^@-#$?L>T>L?MS2Mk7u%I71JU@)n~`9$(!BI7&;EqsVF9B>@N zF)S)jcbZ2+E?7v$dgl43JCtNHE_W_Jcn7EhN#mWR^LGlRvqwM=(4mMZJeD@R5p%_O z^sE?1KA6gl=X?>eEV1AZvw&OyxAfp{EH@Nm)Huwv?7sGC?Y3m8qmc+Y+}e142ClwU-!tnII-8g|T~8a;;g_ zvSKYoJ8VUd;B&1?x2^XHhw6~YOWRaO3_i|Lf+m)&v-a+3d)^ygGl2>-B%!%dF!V%l(Ch{jVQs)hy` zTQ!dp^5TA1#M}jHgcm>V|0QHuqR3TWPEV!tjXJj}dtiYHjo(7Qv-DNK8i}?nP zYth)2Vnt1EICnB1E%@>uXx^|!ho7|vj1s0jJcP>s%^oO$hOZ4@*}V)ulrrup_@^?{ z5=z-9VwV_+t^+z?chEv87>~`Q;Q!A|L)0b$IY{Cpk)|#Z-*274Hltb()8!#MFLNS~unMw$Tb10sSDu(-5dQ9AhP!buQcQ7=V_2t%L zu=WB5W38~tY$_wRgl#ulUD=;G%xmTs7p8=qbU2ez>N+lF& zR0M;Qh_3{~y-PX%LrP}!rv&yzjuZQYjF~b!d8w0Xu&?8AHvBCp7 z$D&A)Q3pt&Fl4JeV12~eLd0IpHt6jj-GxWog6#PQUDuG^hnZCCbo-U|d|D5N=7ZQH z`nTTqcg^{`bboht{{pzcZWAIsdexTss+VSv&TYHTb?p>$R>q__t`z$An@_>;xHCKtf z#APx@4HGeH5ZWh5l>|QIQpqdSB{vNw3@sFRMWFA3S>0p9^f+}C5FS{ur<%skMTU%c1;Pvp2KHu5W*H@&QgMT zUK|%=DNJQ}t0#s3kcXdK``4Yzcphx+c9%8NK*(K!ek^I?zZun;?+1G30zGor@l0}Hh;6jFL^NA~bSUFWP*uj|t2>eXTVGvU=&OhBvIJkM+{ z2I}&joT|0FJbQe0|L1Rfxn4VYPVbYnT_YNIUSrP}A=(?9XEqiC_49!Z`F)cvYe=Dg z_J!H{&)e^Qp!dF`^_WNVt_DA;1G)GnfQ-=vW4zjy3Oh%8bYY&@!M-&{OPM zBUBpLnRUS-I5S`YQ>;q&LHUDgxolXp)+*mEJ=f(pRVSjsty8KKoVgR+dje|IrC-pn zGPrg|U`NVM3ASOSNp^^YJ2!U8HBGVh`g;l6q1i1HF3=me1{ev$H!2AW`hM1))~Wn@g(4}+#fEBq+vOEBV^zH zllV4E%-2QkwwcC%cBXdLGZ)&Wwf5bs-#u3s`Ob{aepy{dW*Gh$I1ZW5(0oTfpQ&C_ zfvrto)^7X0v^7J8!XejcIK{-xaLGxG)P^H2CPHq*Rnnv38Iq8!m{*F1J07*FO9*Rf z-(cVdRzxt`jZr@W+&+O`!{=EJm#>uI(!Wv@=rG6)W7+Q@gM&l#x1P_wRt!{S4;3qe z*@FN`j|s-fPdXQB>hm?X8m=`I`t*ja^9{Ra59&4jlb&L8%dJz_PR)2`_vp=CcLHMzN{OP&swl7a;$0ORYGujzJ6BC+{)LtK(bQdc&eA-eN`s0>D^Ne(dnN8oN z?)Kc>qIm~2dcZ`o0V^th>c`i7@4*FMI}aDtgN6&4h1bO0HhxXiF1jeP pdC|j?J&UzIvU730muy&U@{&!9O*FY_(ceH;KIyI{50mD^{{fWSv{?WE literal 0 HcmV?d00001 diff --git a/tools/orchestrator-matrix/events/__pycache__/event_watcher.cpython-312.pyc b/tools/orchestrator-matrix/events/__pycache__/event_watcher.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..41e5be50b802b19c6346fec3340b78d56f64d098 GIT binary patch literal 8097 zcmb_hT~HfWmhM(}>mS6Qgg{`L7-I_q2FH$LJJ`lQF>w;?#GWL^@oHpsgCz?|x!nZN zNRv!$wgzu8WwM(o<4Mh$)TSy-#Sd)N)NXcbA53a0VQTh4GI*qPNY!r5?nB;K<0@zI z%bs&vEf6NNGgW&j?!A3~&iy&(p7Wif-<6fw8Aulod>L+YGt3|G!$`bdVOPhXa+8sm z0Y+jaQ9>`f2qdP*|WdF|G;lL1Lr~=9k02;)U#8W|_BToXs;kXX-Nc-M`-c zPi_Cu_Iti(75bP^@nrB}fq!}r9@xN5CdEXGH-TgI^l70~Oeyv}W5ZD}XLAB!Fx}yd zd)d#VO{!pwiI|PrdYLkPT39P&l0ft%#+b`8=b7zHH>0qlAcPkg7XHSWG43KW%5G;= zt3g98awbAW+-TR!K~oDu8q=JLS3{G;BX1=k#Q}$TV|A8Eu_4n%SU=7Em^)u6N7>QB zihgsy#*#H*L>=sS_E#WRax4^=Ks=M4!65Kf64kgEjRz$$G(r@R%Qe#>(Qnbr@>p0= z6^)N4138&CP%ip;jZXyCtC~fU2ZJL~RWn}= z#-ymMaq?J5!841Hh!ZNBJ%AJgNSri5LlyihX~^DX@($OH!`BZ__T*ijiSiYbAk^l| zDsNo8esM~fd41yIVp+!u!`gPtd6uhdZytRA;Ph*Y)h&-WXnKV04={WDpU&zJF67+) ztlOV)v^`)@=D>M;5u$em3o#ilsP za>x2<;qidraS_+1LGUTrjFmx?(m!Jjm|==kJ~<;*+$%nm%};02r#~IPz5jyWq*)R{ zf{GIa83i;Dzn0`X)!~H3juNCzP=>xmB?0=$9F9pac#XCDZJJr1sR_f^q%hGOqha-G zKp7bv43B|Iqb2bmg<8loUK#ALIs4IxBm3SpS+mk-C5o}EzMrY@a`pkZ{A*>v)5X)|2(^S+sY(dm?Xh-tb=cPHmsDOn4Vu7pU0Bh=$IIxNWSVta8vs5m*BmsIXQU$f2R;f}#`)QM^ zsQt8$+x*p<^Ay^>*LBl~mNOa^gRz1&6#Gx2PN8NLS|w2mE1*n5S4A)*p{rvf4$(Sp#F;ta7h#1-ZgR~Q1T5hy-D zcy>jtY|MuTXHa4SMXF$3P5nc^xP|GTkNT~eLpOv0sv;e_%GE!(;oz7i41z*bwK9Yd z0Wi2Sc{>;dHCa5h=G0ZDK2xg*7$o9K7_kGYUqy}%l6|lnvLCZ&A=@OO)&LL>tARk$ zy^$TAMzbHM9Dxi()z><~FIl}tx!fquRDUebzP#9aXtD7y_SNSH-kjhkU3sx3Cw6DW z?nUwG3I4vdk)TNr6{HEO8`zYA4=~v5N#+J<%&TD5FT-X*2Nl`0&XBG2qjA{^JK6Z> z&iNbDU#6+&zV*oeJm475RaQYb^HHVU8aAgt z%5c*V;46yaHcx}^t&F50DqES#G>e*ID|2@ZObB^;-6SIV+G-p{WGv+7UkZi>0mEkj zeNi;cFmnq-Y3z$-hCuMQrk>`%_oQ1GQpqaW?i8I9FzG@{h*Up$O_p*G<}@FB3&x?@-IR)}*-|zb zX-)H!mu-xSW|pSNHlwy)ri$PsZCAG&?J2w3V$==ouT*Z}eObzkUX`&g05Z7R2LOO} z8uQl6lzqcECW5+jQz{vOls=O<{4>@D`rjjM*A5^)!QMYzx>Dd3Z1g+8uhzhA|>6Q!%j6wRDzh8RA5%b$)puNlTFICa3Vpc#0eQm6t*G4 zey)tGvY33TI3N~}b;Pekt(DR4D*Y;GmiXpgZoFpaP(dBRPz{ zu0TR23Icl_*ib}{kVd|y@v0n+YE~o<@f8_eSxG?@PmUoG3fl#AC>2tVwCrJE3#7G(^we8v3_Ir-@ zWoKo+s(Y>_xA$~*@99i;--0dk>ZQypZ-NhfnQe!>vi%{)m6fkpnH{YYCzqVfd1pn= zDQ2DGlJlv5-|3$)=WBgBy$R=vsR>rEbmuCYvz5)c%GPXU>x?q@dZw~P+6(eEaCFql>=wsgqy%n)2Sp zeAABGKf3j!57Se9d0*4*o?AV$HA}wE{Lb!aC+*&TJ9#VlVQQ-HE8q6~&OO*FLTJF< zyezhSV*kiK`|P5)Z(4Y0XBt{(?#4Djz~FJP#_ z=6U=O$Tx^N-yZPv9xxmn1h;$iuKlyr!rq0@!Vfar&M$c{&`p2i6(6ryV9eU1?YJ|% z<;M@7{d3n`Q{Dmj`n#6r_cNbYH}>sfKJV&1*w?`Qrp$mlY6NImsN=BSAoT69EHqny!on`w z3tn!ayWxdu?zd$u)PGwoKwc6gK?i|i!e2uL!niDiELe%X29H<~z;|OQ7JCWwchNb3 zfrVtC01PfjrX=`fh1n-TC79{#H=)Lkb1JSw(^jA#s2A~?F>XDG0@MpTBJAX;;-1*a z7sojW0rSaDbYxM5M&t2BC%CI)0SFvGq!;74Jze6k90U^(jt!#UC3jL~{4E{_d;oF@ zO3-fMVHrGus4OR_&*tZJ?NnY$4AfeYu!2Vd<)UjAj*P^T6(u?aS_lkLeh3-WCZ5^} zd%nD3hP_wbJToxYd2h!tc<49!ulFxG8}d%ibo2GI6KC?SMi}90o{7x$-)lXduW&EJ zpflIcpi1#)YyC^kwiO41qZV~K_jerAy*;!tMk<2+HE7uwrcSY2L41idfK$QODgv%Q z2J3;{Te9vXa~bT-o3NiWm*!L4Pf(P&w^?9dnT;_a#44`GGLyADjXkZ0X8}MorG*VJ ziA$LZ`{F6+7fPWK4Z#XO7ABvAP%M<|un0o11f-GCn555e;6yb1BLsdiFn$UUleXtZ zV%K8v(by4c`u%2d6lZZmvZ`A+>L1!pjfLa{%9S2(HB%`e129xy#RAq0Bt#)6un7&C zf|9+#29a0cdz1Ak8bu7hDmNg5FudD4?Y>=it8V7G#k#J0b&cd4=Xd*6DFDo*&#ZwLlrItDKv!*^>=j1oq}S31u@sc$6#?bJFD@B|qccpQqP4SkDC z06#C&N1qf=6n;FsbO~`U_Oy%eBjV-D?P7mCCSTUfgnA_!kDq=Mc?-$ZP%@ilz`?2P z@#KdvhM=cNM-wzGV7yTAB+E-DZ5W`Frqc?`=o1|p1R-O=VL1?h_-0@@E{y;N4?uh~ z@YYB$TIjI^0#ZB_2#_9J_Ib?ESduH4p%$a!N;&Z>hTfyjn!VVCSu6Z2yCHjz`HDaF zkQan~tL40Kbk%uC@H`x2ZNlzVk5zbsU9B+-;=>A7c!kyJ;A}V^VtA^DH=M~#e+3|v z7wGU36h*xsYrqvBHqidg(td*dF7h@McFw+`=( zW&eZe{V(RJKQNB3newk0``3))8>ZzO#*(0k~D_gtjF|(dWRS}kbg?#`>3<^s7 Fe*vYO3W@*# literal 0 HcmV?d00001 diff --git a/tools/orchestrator-matrix/events/__pycache__/webhook_adapter.cpython-312.pyc b/tools/orchestrator-matrix/events/__pycache__/webhook_adapter.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..8fb4516047f8905de04e8c7722eb262555500558 GIT binary patch literal 4313 zcmb7HO>7&-6`m!RT>h*`k<@=IYh%fl8Cz7H*ma!vC$i$mRvg=jl}MIquxsv0B1P^p zv&%?iS1k_#Vxs{p11ZcPE~)|rO2Y>10zLT9gAQrpqCr8KO66=Aphen)Z))Tsg>&hf zT`no5Zi5cNd2eUl%)I$#=KcJ!p}~WooZJ6#Ty!J!CHb(5&19%`6_~3?K;uZD1Y43y z*~V=Y;q6KLxZR-iIBihJxWk~%amJu7gVITN$}{dU#yFDR6g$oW?-87D)8h?-S73np z1XgeX^$QJx8)#$J9`@zJqvC~WS)SnpF&TeL#93}yOr}MwaFa6TUKtoX!-;Q+lBz@= z5N*Sh?&T#(R(UlpOUfzub`A2$B!3|(0&gFXvN6ismQmAC+YzGHUjXJRQfqwzrJ)bd z&uwZI1cVYq_FLtq?S=&s%A&CSRL(z<;$!K)SySut94tyt#$j>0?nvXfr0RAhtLRQe z6-11omr03=!cU0`S(bquM%8sUU3@jZ$l%SU}*ujs1_E%Vy2>n9)9!=wlQO?pix=mh@S)p>IPg zz^>B-b)BhV17O>yL{<01mAIs+yc834T8PKguuXTyWN9)!g&SdJg&ZEoVHSw)k6Or9 zC0d|0M5K;El}Gozfy>WbdTwE|?Cs3ISYZMSJ=f?Jrt?=%6-RE3T_3w0FZaFhVb998 zp^87W@M59;%IUJdr{wCXc>N2`pB^mHt=I=M#cDSSKFH%Uz+V6$H{L(34!CUtpteK} z9vE3I^~0H?6!I`@04EqrrcQ5E*?GHWS6CGuy;%gi1vG`SwCb(3J5+M9jG{e$eWx|D zx;|rlTbE*f<9sfaT+;tFpW7N=ozG=`*XDD4Cng?i%>RXnQw>-1|1U0wWYOU}h zI{yjCc+@saCedMWb(-j`^A@MsH3rrS-E6v1eWLSTH~Nlwx~$Q4s(Cf~X0xTwJ~Yp& zPg=biE3{~AqTBkeQ|pf1sG5rT2BCGssn;-y+2(9Bup{gc+GgNgV4no{R{~9icJi~< zr4Zan?}oeDp*4JfgeNo))LS$+AX~eGaR0wGR`Y0XGP^>(TU7fkEfe3fxH?szv3QLZ zx-<`*2E5Rm3ye_p)mo=Zp?Cj8!#n9ldtlyK(E1$c2(A0)?B~!d)x8nx9&6S*P3)-i z7PsMa9yemAC0D2FQx>o8wzx^U2jQ*rz71$~)^QTHYQD{Aoddk)eLwcS`<_W>saa(1 zAb(fw*XCCV8!s)%541?2&;g_yvsd$FkEY&0fkz+Za!j5-}v@-wlIL znU57g`@i-74 zs2CI5qdS0$rwzBR+f)3U?wCxM$aKp3GjuUQQA$Z1j18Ssyw2{ssWEW{%+*4;$^V+QMjV>UT<#5^nW%ku`G%!npW z6Kp`!q$s9A$+#5u>kcDQ!NfXZVoou!UYHmzyoFT6T#MjLO1|zDSI>Qa)8AU!3*FZ; zi=)4u_~k@t&(ZSs!F$_JEa7t3aJl8>{OL-td4azcDvbVO;{AzYXSs9#z0QMo_LSQX zm4k=#FINIh3!_&ju1qX$DK|&%HSfK>tsL058hCy=@O(LNC_h~B1urKqB?`^Okrm(5 ztG)xvz5{o9%f6%eq5J;MQrDi6@9F%|XH4M!_T9zt;>l9?fs4$7f7!+T#otkQ_0muB z$19<>LZEPb@y+*7mP6sMkk>IlU1Tc3wyTjVk=5YN<>1a@==T1lq0-2BDY&y7oVduW z*%_G4*4hvo_&Q_^GZy&fp$CEH>=kykamRAwj+@R~>~(f^=b`1DhkmDhGG6kJeT68p zKQ>Tm>RB9JJW+hLI9E*EaW8c*HJ5@ztL*V*_V_nzb|UpiIRtm*!#z(9?XdkmxOb?- z@sZO7{6~#Vgzq8z#~qHLZuiGs4B>YSP$wLYyS+^(kmC=?0sN+@nm9iAQ|nE@yhE}l z+eI`<3Dn!{Jf%@*Az8@neTk1je4pjiXlq`wY5$J1Jv6snG z1e_mMlZDp*q9r~9LeCRvl3##g|8&|Mqtu!Uu}%5Y*3u-l#>92wS4edVDp1H+>13^y zaABv3m(jYpU#BlXruZMja@=4w-JO`&ytnhaIkKtq;egIWqkaVE(?`YeX$oo0+d`{}n i=cxG$)bj;uS$9&@v+FLJYFqbhr2-F+QVi8(O#5$Qi0QWg literal 0 HcmV?d00001 diff --git a/tools/orchestrator-matrix/events/discord_formatter.py b/tools/orchestrator-matrix/events/discord_formatter.py new file mode 100644 index 0000000..f07fa8c --- /dev/null +++ b/tools/orchestrator-matrix/events/discord_formatter.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +"""Format MACP orchestrator events for Discord-friendly text delivery.""" + +from __future__ import annotations + +from typing import Any + + +def _task_label(event: dict[str, Any]) -> str: + task_id = str(event.get("task_id") or "unknown") + return f"Task {task_id}" + + +def _title(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if isinstance(metadata, dict): + for key in ("task_title", "title", "description"): + value = str(metadata.get(key) or "").strip() + if value: + return value + message = str(event.get("message") or "").strip() + return message if message else "No details provided" + + +def _attempt_suffix(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if not isinstance(metadata, dict): + return "" + attempt = metadata.get("attempt") + max_attempts = metadata.get("max_attempts") + if attempt in (None, "") and max_attempts in (None, ""): + return "" + if attempt in (None, ""): + return f"attempt ?/{max_attempts}" + if max_attempts in (None, ""): + return f"attempt {attempt}" + return f"attempt {attempt}/{max_attempts}" + + +def _duration_suffix(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if not isinstance(metadata, dict): + return "" + duration = metadata.get("duration_seconds") + if duration in (None, ""): + return "" + try: + seconds = int(round(float(duration))) + except (TypeError, ValueError): + return "" + return f"{seconds}s" + + +def _runtime_dispatch_suffix(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if not isinstance(metadata, dict): + return "" + parts: list[str] = [] + runtime = str(metadata.get("runtime") or "").strip() + dispatch = str(metadata.get("dispatch") or "").strip() + if runtime: + parts.append(f"Worker: {runtime}") + if dispatch: + parts.append(f"dispatch: {dispatch}") + return ", ".join(parts) + + +def _meta_clause(*parts: str) -> str: + clean = [part for part in parts if part] + if not clean: + return "" + return f" ({', '.join(clean)})" + + +def format_event(event: dict[str, Any]) -> str | None: + """Format an MACP event for Discord. Returns None for unformattable events.""" + + event_type = str(event.get("event_type") or "").strip() + task_label = _task_label(event) + title = _title(event) + attempt_suffix = _attempt_suffix(event) + duration_suffix = _duration_suffix(event) + runtime_dispatch = _runtime_dispatch_suffix(event) + message = str(event.get("message") or "").strip() + + if event_type == "task.completed": + return f"✅ **{task_label} completed** — {title}{_meta_clause(attempt_suffix, duration_suffix)}" + if event_type == "task.failed": + detail = message or title + return f"❌ **{task_label} failed** — {detail}{_meta_clause(attempt_suffix)}" + if event_type == "task.escalated": + detail = message or title + return f"🚨 **{task_label} escalated** — {detail}{_meta_clause(attempt_suffix)}" + if event_type == "task.gated": + detail = message or "Quality gates running..." + return f"🔍 **{task_label} gated** — {detail}" + if event_type == "task.started": + detail = runtime_dispatch or message or "Worker execution started" + return f"⚙️ **{task_label} started** — {detail}{_meta_clause(attempt_suffix)}" + return None + + +def format_summary(events: list[dict[str, Any]]) -> str: + """Format a batch summary (e.g., daily digest).""" + + counts: dict[str, int] = {} + first_task = "" + last_task = "" + for event in events: + event_type = str(event.get("event_type") or "unknown") + counts[event_type] = counts.get(event_type, 0) + 1 + task_id = str(event.get("task_id") or "").strip() + if task_id and not first_task: + first_task = task_id + if task_id: + last_task = task_id + + if not counts: + return "No MACP events processed." + + ordered = ", ".join(f"{event_type}: {counts[event_type]}" for event_type in sorted(counts)) + task_span = "" + if first_task and last_task: + task_span = f" Tasks {first_task}" if first_task == last_task else f" Tasks {first_task} -> {last_task}" + return f"MACP event summary: {len(events)} events ({ordered}).{task_span}" diff --git a/tools/orchestrator-matrix/events/event_watcher.py b/tools/orchestrator-matrix/events/event_watcher.py new file mode 100644 index 0000000..ff77bee --- /dev/null +++ b/tools/orchestrator-matrix/events/event_watcher.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +"""Portable file-polling watcher for MACP orchestrator events.""" + +from __future__ import annotations + +import json +import pathlib +import sys +import time +from collections.abc import Callable +from typing import Any + + +def _warn(message: str) -> None: + print(f"[macp-event-watcher] {message}", file=sys.stderr) + + +def _load_json(path: pathlib.Path, default: Any) -> Any: + if not path.exists(): + return default + try: + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + except (OSError, json.JSONDecodeError) as exc: + _warn(f"failed to load cursor {path}: {exc}") + return default + + +def _save_json_atomic(path: pathlib.Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with tmp.open("w", encoding="utf-8") as handle: + json.dump(data, handle, indent=2) + handle.write("\n") + tmp.replace(path) + + +class EventWatcher: + """Poll an events NDJSON file and dispatch matching callbacks.""" + + def __init__(self, events_path: pathlib.Path, cursor_path: pathlib.Path, poll_interval: float = 2.0): + self.events_path = events_path + self.cursor_path = cursor_path + self.poll_interval = max(0.1, float(poll_interval)) + self._callbacks: list[tuple[set[str] | None, Callable[[dict[str, Any]], None]]] = [] + self._cursor_position = self._load_cursor() + + def on(self, event_types: list[str], callback: Callable[[dict[str, Any]], None]) -> None: + """Register a callback for specific event types.""" + + normalized = {str(event_type).strip() for event_type in event_types if str(event_type).strip()} + self._callbacks.append((normalized or None, callback)) + + def poll_once(self) -> list[dict[str, Any]]: + """Read new events since last cursor position. Returns list of new events.""" + + if not self.events_path.exists(): + return [] + + try: + file_size = self.events_path.stat().st_size + except OSError as exc: + _warn(f"failed to stat events file {self.events_path}: {exc}") + return [] + + if file_size < self._cursor_position: + _warn( + f"events file shrank from cursor={self._cursor_position} to size={file_size}; " + "resetting cursor to start" + ) + self._cursor_position = 0 + self._persist_cursor() + + events: list[dict[str, Any]] = [] + new_position = self._cursor_position + try: + with self.events_path.open("r", encoding="utf-8") as handle: + handle.seek(self._cursor_position) + while True: + line_start = handle.tell() + line = handle.readline() + if not line: + break + line_end = handle.tell() + if not line.endswith("\n"): + new_position = line_start + break + stripped = line.strip() + if not stripped: + new_position = line_end + continue + try: + event = json.loads(stripped) + except json.JSONDecodeError as exc: + _warn(f"skipping corrupt event at byte {line_start}: {exc}") + new_position = line_end + continue + if not isinstance(event, dict): + _warn(f"skipping non-object event at byte {line_start}") + new_position = line_end + continue + events.append(event) + self._dispatch(event) + new_position = line_end + except OSError as exc: + _warn(f"failed to read events file {self.events_path}: {exc}") + return [] + + if new_position != self._cursor_position: + self._cursor_position = new_position + self._persist_cursor() + return events + + def run(self, max_iterations: int = 0) -> None: + """Polling loop. max_iterations=0 means infinite.""" + + iterations = 0 + while max_iterations <= 0 or iterations < max_iterations: + self.poll_once() + iterations += 1 + if max_iterations > 0 and iterations >= max_iterations: + break + time.sleep(self.poll_interval) + + def _dispatch(self, event: dict[str, Any]) -> None: + event_type = str(event.get("event_type") or "").strip() + for filters, callback in self._callbacks: + if filters is not None and event_type not in filters: + continue + try: + callback(event) + except Exception as exc: # pragma: no cover - defensive boundary + _warn(f"callback failure for event {event_type or ''}: {exc}") + + def _load_cursor(self) -> int: + payload = _load_json(self.cursor_path, {"position": 0}) + try: + position = int(payload.get("position", 0)) + except (AttributeError, TypeError, ValueError): + position = 0 + return max(0, position) + + def _persist_cursor(self) -> None: + _save_json_atomic(self.cursor_path, {"position": self._cursor_position}) diff --git a/tools/orchestrator-matrix/events/webhook_adapter.py b/tools/orchestrator-matrix/events/webhook_adapter.py new file mode 100644 index 0000000..b3a042b --- /dev/null +++ b/tools/orchestrator-matrix/events/webhook_adapter.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +"""Webhook delivery helpers for MACP events.""" + +from __future__ import annotations + +import json +import sys +import time +import urllib.error +import urllib.request +from collections.abc import Callable +from typing import Any + + +def _warn(message: str) -> None: + print(f"[macp-webhook] {message}", file=sys.stderr) + + +def _webhook_config(config: dict[str, Any]) -> dict[str, Any]: + macp = config.get("macp") + if isinstance(macp, dict) and isinstance(macp.get("webhook"), dict): + return dict(macp["webhook"]) + return dict(config) + + +def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool: + """POST event to webhook URL. Returns True on success.""" + + webhook = _webhook_config(config) + url = str(webhook.get("url") or "").strip() + if not url: + _warn("missing webhook url") + return False + + timeout_seconds = max(1.0, float(webhook.get("timeout_seconds") or 10)) + retry_count = max(0, int(webhook.get("retry_count") or 0)) + auth_token = str(webhook.get("auth_token") or "").strip() + payload = json.dumps(event, ensure_ascii=True).encode("utf-8") + headers = {"Content-Type": "application/json"} + if auth_token: + headers["Authorization"] = f"Bearer {auth_token}" + + attempts = retry_count + 1 + for attempt in range(1, attempts + 1): + request = urllib.request.Request(url, data=payload, headers=headers, method="POST") + try: + with urllib.request.urlopen(request, timeout=timeout_seconds) as response: + status = getattr(response, "status", response.getcode()) + if 200 <= int(status) < 300: + return True + _warn(f"webhook returned HTTP {status} on attempt {attempt}/{attempts}") + except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, ValueError) as exc: + _warn(f"webhook attempt {attempt}/{attempts} failed: {exc}") + if attempt < attempts: + time.sleep(min(timeout_seconds, 2 ** (attempt - 1))) + return False + + +def create_webhook_callback(config: dict[str, Any]) -> Callable[[dict[str, Any]], None]: + """Factory that creates a watcher callback from config.""" + + def callback(event: dict[str, Any]) -> None: + if not send_webhook(event, config): + _warn(f"delivery failed for event {event.get('event_type', '')}") + + return callback