From 356c756cfb040ea19e15ae834b0c3458eb493da1 Mon Sep 17 00:00:00 2001 From: "jason.woltje" Date: Sat, 28 Mar 2026 13:05:28 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20MACP=20Phase=202A=20=E2=80=94=20Event?= =?UTF-8?q?=20Bridge=20+=20Notification=20System=20(#11)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 + bin/mosaic-macp | 71 ++++++++ docs/DEVELOPER-GUIDE/README.md | 1 + .../orchestrator-matrix/macp-phase2a.md | 45 ++++++ docs/MACP-BRIEF-TEMPLATE.md | 86 ++++++++++ docs/PRD.md | 105 ++++++------ docs/SITEMAP.md | 2 + docs/TASKS.md | 3 +- docs/scratchpads/macp-phase2a.md | 74 +++++++++ docs/tasks/MACP-PHASE2A-brief.md | 152 ++++++++++++++++++ docs/tasks/MACP-PHASE2A-tests.md | 81 ++++++++++ tests/__init__.py | 10 ++ tests/conftest.py | 121 ++++++++++++++ tests/run_tests.sh | 4 + tests/test_discord_formatter.py | 86 ++++++++++ tests/test_event_watcher.py | 99 ++++++++++++ tests/test_webhook_adapter.py | 100 ++++++++++++ tools/orchestrator-matrix/README.md | 6 + tools/orchestrator-matrix/events/__init__.py | 15 ++ .../events/discord_formatter.py | 138 ++++++++++++++++ .../events/event_watcher.py | 144 +++++++++++++++++ .../events/webhook_adapter.py | 120 ++++++++++++++ 22 files changed, 1412 insertions(+), 53 deletions(-) create mode 100644 docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md create mode 100644 docs/MACP-BRIEF-TEMPLATE.md create mode 100644 docs/scratchpads/macp-phase2a.md create mode 100644 docs/tasks/MACP-PHASE2A-brief.md create mode 100644 docs/tasks/MACP-PHASE2A-tests.md create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100755 tests/run_tests.sh create mode 100644 tests/test_discord_formatter.py create mode 100644 tests/test_event_watcher.py create mode 100644 tests/test_webhook_adapter.py create mode 100644 tools/orchestrator-matrix/events/__init__.py create mode 100644 tools/orchestrator-matrix/events/discord_formatter.py create mode 100644 tools/orchestrator-matrix/events/event_watcher.py create mode 100644 tools/orchestrator-matrix/events/webhook_adapter.py diff --git a/.gitignore b/.gitignore index f82a4ae..65aad93 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ node_modules/ rails +*.pyc +**/__pycache__/ diff --git a/bin/mosaic-macp b/bin/mosaic-macp index 413ab55..581e231 100755 --- a/bin/mosaic-macp +++ b/bin/mosaic-macp @@ -16,6 +16,7 @@ Usage: mosaic macp status [--task-id TASK-001] mosaic macp drain mosaic macp history [--task-id TASK-001] + mosaic macp watch [--webhook] [--once] USAGE } @@ -193,6 +194,75 @@ for line in events_path.read_text(encoding="utf-8").splitlines(): PY } +watch_events() { + require_repo + local webhook_enabled="false" + local run_once="false" + while [[ $# -gt 0 ]]; do + case "$1" in + --webhook) webhook_enabled="true"; shift ;; + --once) run_once="true"; shift ;; + -h|--help) usage; exit 0 ;; + *) echo "[mosaic-macp] unknown watch option: $1" >&2; exit 1 ;; + esac + done + + python3 - "$repo_root" "$config_path" "$events_path" "$orch_dir/event_cursor.json" "$webhook_enabled" "$run_once" <<'PY' +import json +import pathlib +import sys + +repo_root = pathlib.Path(sys.argv[1]).resolve() +config_path = pathlib.Path(sys.argv[2]).resolve() +events_path = pathlib.Path(sys.argv[3]).resolve() +cursor_path = pathlib.Path(sys.argv[4]).resolve() +webhook_flag = sys.argv[5].lower() == "true" +run_once = sys.argv[6].lower() == "true" + +events_dir = repo_root / "tools" / "orchestrator-matrix" / "events" +if str(events_dir) not in sys.path: + sys.path.insert(0, str(events_dir)) + +from discord_formatter import format_event +from event_watcher import EventWatcher +from webhook_adapter import create_webhook_callback + +config = {} +if config_path.exists(): + try: + config = json.loads(config_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError) as e: + print(f"[macp] Warning: could not parse config {config_path}: {e}", file=sys.stderr) + config = {} + +macp = dict(config.get("macp") or {}) +watcher = EventWatcher( + events_path=events_path, + cursor_path=cursor_path, + poll_interval=float(macp.get("watch_poll_interval_seconds") or 2.0), +) + +def print_callback(event: dict) -> None: + rendered = format_event(event) + if rendered: + print(rendered) + +watcher.on([], print_callback) + +webhook_config = dict(macp.get("webhook") or {}) +if webhook_flag and bool(webhook_config.get("enabled", False)): + watcher.on(list(webhook_config.get("event_filter") or []), create_webhook_callback(config)) +elif webhook_flag: + print("[mosaic-macp] webhook requested but disabled in config", file=sys.stderr) + +if run_once: + processed = watcher.poll_once() + print(f"[mosaic-macp] processed_events={len(processed)}") +else: + watcher.run() +PY +} + subcommand="${1:-help}" if [[ $# -gt 0 ]]; then shift @@ -203,6 +273,7 @@ case "$subcommand" in status) status_tasks "$@" ;; drain) drain_tasks "$@" ;; history) history_tasks "$@" ;; + watch) watch_events "$@" ;; help|-h|--help|"") usage ;; *) echo "[mosaic-macp] unknown subcommand: $subcommand" >&2 diff --git a/docs/DEVELOPER-GUIDE/README.md b/docs/DEVELOPER-GUIDE/README.md index 148140e..a66ae35 100644 --- a/docs/DEVELOPER-GUIDE/README.md +++ b/docs/DEVELOPER-GUIDE/README.md @@ -3,3 +3,4 @@ ## Orchestrator Matrix - [MACP Phase 1](./orchestrator-matrix/macp-phase1.md) +- [MACP Phase 2A](./orchestrator-matrix/macp-phase2a.md) diff --git a/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md new file mode 100644 index 0000000..443a549 --- /dev/null +++ b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md @@ -0,0 +1,45 @@ +# MACP Phase 2A + +MACP Phase 2A adds the repo-local event bridge that makes orchestrator lifecycle events consumable by external systems. + +## What Changed + +1. `tools/orchestrator-matrix/events/event_watcher.py` polls `.mosaic/orchestrator/events.ndjson`, parses appended NDJSON events, dispatches callbacks, and persists a byte-offset cursor in `.mosaic/orchestrator/event_cursor.json`. +2. `tools/orchestrator-matrix/events/webhook_adapter.py` forwards selected MACP events to a configured webhook endpoint with bounded retries and optional bearer auth. +3. `tools/orchestrator-matrix/events/discord_formatter.py` renders task lifecycle events into concise Discord-friendly status lines. +4. `bin/mosaic-macp` adds `watch` mode for one-shot or continuous event processing. + +## Watcher Behavior + +1. File watching is polling-based and stdlib-only for portability. +2. The watcher resets its cursor if the events file is truncated. +3. Corrupt JSON lines are logged to stderr and skipped. +4. A trailing partial line is left unread until the newline arrives, preventing half-written events from being consumed. + +## Webhook Configuration + +Configure `.mosaic/orchestrator/config.json` under `macp.webhook`: + +```json +{ + "macp": { + "webhook": { + "enabled": false, + "url": "http://localhost:8080/macp/events", + "auth_token": "", + "timeout_seconds": 10, + "retry_count": 2, + "event_filter": ["task.completed", "task.failed", "task.escalated"] + } + } +} +``` + +## CLI + +```bash +mosaic macp watch --once +mosaic macp watch --webhook +``` + +`--once` performs a single poll and exits. `--webhook` enables delivery via the configured `macp.webhook` block while still printing Discord-formatted event lines to stdout. diff --git a/docs/MACP-BRIEF-TEMPLATE.md b/docs/MACP-BRIEF-TEMPLATE.md new file mode 100644 index 0000000..7641eee --- /dev/null +++ b/docs/MACP-BRIEF-TEMPLATE.md @@ -0,0 +1,86 @@ +# MACP Task Brief Template + +**Use this template for all MACP task briefs.** Workers that receive briefs not following this structure should flag it as an issue. + +--- + +```markdown +# + +**Branch:** `feat/<branch-name>` +**Repo worktree:** `~/src/<repo>-worktrees/<task-slug>` + +--- + +## Objective + +<1-2 sentences: what is being built and why> + +--- + +## Task 1: <Component Name> + +<Description of what to build> + +### Requirements: +- <Specific, testable requirements> + +### Key Functions/APIs: +<Code signatures or interface definitions> + +### Constraints: +- <Language, dependencies, patterns to follow> + +--- + +## Task 2: <Component Name> +<Same structure as Task 1> + +--- + +## Tests (MANDATORY) + +**Every brief MUST include a Tests section. Workers MUST write tests before or alongside implementation. Tests MUST pass before committing.** + +### Test file: `tests/test_<module>.py` + +### Test cases: +1. `test_<name>` — <what it verifies> +2. `test_<name>` — <what it verifies> +... + +### Test runner: +```bash +python3 -m unittest discover -s tests -p 'test_*.py' -v +``` + +--- + +## Verification + +1. All tests pass: `<test command>` +2. Python syntax: `python3 -c "import <module>"` +3. <Any additional verification steps> + +## Ground Rules +- Python 3.10+ stdlib only (no pip dependencies) +- Commit message: `feat: <what changed>` (conventional commits) +- Push to `feat/<branch>` branch when done +``` + +--- + +## Brief Sizing Rules + +| Brief Type | Max Items | Rationale | +|------------|-----------|-----------| +| **Build** (new code) | 2-3 | High cognitive load per item | +| **Fix** (surgical changes) | 5-7 | Low cognitive load, exact file/line/fix | +| **Review** | 1 | Naturally focused | +| **Test** (add tests) | 3-4 | Medium load, but well-scoped | + +The key metric is **cognitive load per item**, not item count. +- Build = construction (high load) +- Fix = scalpel (low load) +- Review = naturally focused +- Test = moderate (reading existing code + writing test logic) diff --git a/docs/PRD.md b/docs/PRD.md index be490b8..fbdcc4e 100644 --- a/docs/PRD.md +++ b/docs/PRD.md @@ -1,4 +1,4 @@ -# PRD: MACP Phase 1 Core Protocol Implementation +# PRD: MACP Phase 2A Event Bridge + Notification System ## Metadata @@ -9,90 +9,91 @@ ## Problem Statement -The current orchestrator-matrix rail can queue shell-based worker tasks, but it does not yet expose a standardized protocol for dispatch selection, worktree-aware execution, structured results, or manual MACP queue operations. MACP Phase 1 extends the existing rail so orchestrators can delegate to multiple runtimes through a consistent task model while preserving current behavior for legacy tasks. +MACP Phase 1 writes structured lifecycle events to `.mosaic/orchestrator/events.ndjson`, but no repo-local bridge consumes those events for external systems. Phase 2A adds a portable watcher, webhook delivery, and Discord-friendly formatting so MACP event streams can drive OpenClaw integrations and human-facing notifications. ## Objectives -1. Extend the existing orchestrator-matrix protocol and controller to support MACP-aware task dispatch and status tracking. -2. Add a dispatcher layer that manages worktree lifecycle, runtime command generation, and standardized results. -3. Provide a CLI entrypoint for manual MACP submission, status inspection, queue draining, and history review. +1. Add a synchronous event watcher that tails `events.ndjson` using stdlib-only file polling and persists cursor state across restarts. +2. Add a webhook adapter that can forward selected MACP events to a configured HTTP endpoint with bounded retries. +3. Add a Discord formatter that turns task lifecycle events into concise human-readable strings. +4. Extend the `mosaic macp` CLI with a `watch` command for one-shot or continuous event bridge execution. ## Scope ### In Scope -1. Extend the orchestrator task and event schemas and add a result schema. -2. Add a Python dispatcher module under `tools/orchestrator-matrix/dispatcher/`. -3. Update the controller to use the dispatcher for MACP-aware tasks while preserving legacy execution paths. -4. Update orchestrator config templates, task markdown sync logic, and CLI routing/scripts for MACP commands. -5. Add verification for backward compatibility, schema validity, imports, and basic MACP execution flow. +1. New `tools/orchestrator-matrix/events/` package with watcher, webhook adapter, and Discord formatter modules. +2. Cursor persistence at `.mosaic/orchestrator/event_cursor.json`. +3. `mosaic macp watch [--webhook] [--once]` CLI support using `.mosaic/orchestrator/config.json`. +4. Stdlib-only verification of watcher polling, webhook delivery, Discord formatting, CLI watch behavior, and cursor persistence. +5. Developer documentation and sitemap updates covering the Phase 2A event bridge. +6. A repo-local unittest suite under `tests/` that covers watcher polling/cursor behavior, webhook delivery logic, and Discord formatting. ### Out of Scope -1. Rewriting the orchestrator controller architecture. -2. Changing Matrix transport behavior beyond schema compatibility. -3. Implementing real OpenClaw `sessions_spawn` execution beyond producing the config payload/command for callers. -4. Adding non-stdlib Python dependencies or npm-based tooling. +1. Adding Discord transport or webhook server hosting inside this repository. +2. Replacing the existing Matrix transport bridge. +3. Introducing async, threads, or third-party Python packages. +4. Changing event emission behavior in the controller beyond consuming the existing event stream. ## User/Stakeholder Requirements -1. MACP must evolve the current orchestrator-matrix implementation rather than replace it. -2. Legacy task queues without `dispatch` fields must continue to run exactly as before. -3. MACP-aware tasks must support dispatch modes `yolo`, `acp`, and `exec`. -4. Results must be written in a structured JSON format suitable for audit and orchestration follow-up. -5. A manual `mosaic macp` CLI must expose submit, status, drain, and history flows. +1. External systems must be able to consume MACP events without reading the NDJSON file directly. +2. The watcher must remain portable across environments, so file polling is required instead of platform-specific file watching. +3. Restarting the watcher must not replay previously consumed events. +4. Webhook delivery failures must be logged and isolated so the watcher loop continues running. +5. Discord formatting must stay concise and useful for task lifecycle visibility. ## Functional Requirements -1. Task schema must include MACP dispatch, worktree, result, retry, branch, brief, issue/PR, and dependency fields. -2. Event schema must recognize `task.gated`, `task.escalated`, and `task.retry.scheduled`, plus a `dispatcher` source. -3. Dispatcher functions must set up worktrees, build commands, execute tasks, collect results, and clean up worktrees. -4. Controller `run_single_task()` must route MACP-aware tasks through the dispatcher and emit the correct lifecycle events/status transitions. -5. `tasks_md_sync.py` must map optional MACP table columns only when those headers are present in `docs/TASKS.md`; absent MACP headers must not inject MACP fields into legacy tasks. -6. `bin/mosaic` must route `mosaic macp ...` to a new `bin/mosaic-macp` script. +1. `EventWatcher` must watch `.mosaic/orchestrator/events.ndjson`, parse appended JSON lines, and invoke registered callbacks for matching event types. +2. `EventWatcher.poll_once()` must tolerate a missing events file, truncated/corrupt lines, and cursor positions that are stale after file truncation. +3. Cursor writes must be atomic and stored at `.mosaic/orchestrator/event_cursor.json`. +4. `send_webhook(event, config)` must POST JSON to the configured URL using `urllib.request`, optionally adding a bearer token, respecting timeout, and retrying with exponential backoff. +5. `create_webhook_callback(config)` must return a callback that swallows/logs failures instead of raising into the watcher loop. +6. `format_event(event)` must support `task.completed`, `task.failed`, `task.escalated`, `task.gated`, and `task.started`, including useful task metadata when present. +7. `format_summary(events)` must produce a short batch summary suitable for notification digests. +8. `bin/mosaic-macp` must expose `watch`, optionally enabling webhook delivery from config, and support one-shot polling with `--once`. ## Non-Functional Requirements -1. Security: no secrets embedded in generated commands, config, or results. -2. Performance: controller remains deterministic and synchronous with no async or thread-based orchestration. -3. Reliability: worktree creation/cleanup failures must be surfaced predictably and produce structured task failure/escalation states. -4. Observability: lifecycle events, logs, and result JSON must clearly show task outcome, attempts, gates, and errors. +1. Security: no secrets embedded in code or logs; auth token only sent via header when configured. +2. Performance: each webhook attempt must be bounded by `timeout_seconds`; no event-processing path may hang indefinitely. +3. Reliability: corrupt input lines and callback delivery failures must be logged to stderr and skipped without crashing the watcher. +4. Portability: Python 3.10+ stdlib only; no OS-specific file watcher APIs. +5. Observability: warnings and failures must be clear enough to diagnose cursor, parsing, and webhook problems. ## Acceptance Criteria -1. Existing legacy tasks without `dispatch` still run through the old shell path with unchanged behavior. -2. MACP-aware `exec` tasks run through the dispatcher and produce result JSON with gate outcomes. -3. New schemas validate task/event/result payload expectations for MACP fields and statuses. -4. `mosaic macp submit`, `status`, and `history` work from a bootstrapped repo state, and `drain` delegates to the existing orchestrator runner. -5. Python imports for the updated controller, dispatcher, and sync code complete without errors on Python 3.10+. +1. `EventWatcher.poll_once()` reads newly appended events, returns parsed dicts, invokes registered callbacks, and skips already-consumed events after restart. +2. Webhook delivery posts matching events to a local test endpoint, supports bearer auth configuration, and retries boundedly on failure. +3. Discord formatter returns expected concise strings for the required task lifecycle event types and a usable batch summary. +4. `mosaic macp watch --once` processes events from a bootstrapped repo state without error and honors `--webhook`. +5. Cursor persistence prevents replay on a second run and resets safely when the events file is truncated. +6. `python3 -m unittest discover -s tests -p 'test_*.py' -v` passes with stdlib-only tests for the Phase 2A event bridge modules. ## Constraints and Dependencies 1. Python implementation must use stdlib only and support Python 3.10+. -2. Shell tooling must remain bash-based and fit the existing Mosaic CLI style. -3. Dispatch fallback rules must use `exec` when `dispatch` is absent and config/default runtime when `runtime` is absent. -4. Worktree convention must derive from the repository name and task metadata unless explicitly overridden by task fields. +2. Shell CLI behavior must remain bash-based and consistent with the existing Mosaic command style. +3. The watcher consumes the event schema already emitted by Phase 1 controller logic. +4. Webhook configuration lives under `.mosaic/orchestrator/config.json` at `macp.webhook`. ## Risks and Open Questions -1. Risk: yolo command execution requires a PTY, so the dispatcher needs a safe wrapper that still behaves under `subprocess`. -2. Risk: worktree cleanup could remove a path unexpectedly if task metadata is malformed. -3. Risk: old queue consumers may assume only the original task statuses and event types. -4. Open Question: whether `task.gated` should be emitted by the dispatcher or controller once worker execution ends and quality gates begin. +1. Risk: partial writes may leave an incomplete trailing JSON line that must not advance the cursor incorrectly. +2. Risk: synchronous webhook retries can slow one poll cycle if the endpoint is unavailable; timeout and retry behavior must remain bounded. +3. Risk: event payloads may omit optional metadata fields, so formatter output must degrade cleanly. +4. ASSUMPTION: the watcher should advance past corrupt lines after logging them so a single bad line does not permanently stall downstream consumption. +5. ASSUMPTION: CLI `watch` should default to no-op callback processing when no delivery option is enabled, while still updating the cursor and reporting processed count. ## Testing and Verification Expectations -1. Baseline checks: Python import validation, targeted script execution checks, JSON syntax/schema validation, and any repo-local validation applicable to changed code paths. -2. Situational testing: legacy orchestrator run with old-style tasks, MACP `exec` flow including result file generation, CLI submit/status/history behavior, and worktree lifecycle validation. -3. Evidence format: command-level results captured in the scratchpad and summarized in the final delivery report. +1. Baseline checks: Python bytecode compilation/import validation for new modules and shell syntax validation for `bin/mosaic-macp`. +2. Situational tests: temporary orchestrator state exercising watcher polling, callback filtering, webhook POST capture/mocking, formatter sanitization, CLI one-shot watch execution, and cursor persistence across repeated runs. +3. Evidence format: command-level results recorded in the scratchpad and summarized against acceptance criteria. ## Milestone / Delivery Intent -1. Target milestone/version: 0.0.x bootstrap enhancement -2. Definition of done: code merged to `main`, CI terminal green, issue `#8` closed, and verification evidence recorded against all acceptance criteria. - -## Assumptions - -1. ASSUMPTION: A single issue can track the full Phase 1 implementation because the user requested one bounded feature delivery rather than separate independent tickets. -2. ASSUMPTION: For `acp` dispatch in Phase 1, the controller must escalate the task immediately with a clear reason instead of pretending work ran before OpenClaw integration exists. -3. ASSUMPTION: `task.gated` should be emitted by the controller as the transition into quality-gate execution, which keeps gate-state ownership in one place alongside the existing gate loop. +1. Target milestone/version: Phase 2A observability bridge +2. Definition of done: code merged to `main`, CI terminal green, issue `#10` closed, and verification evidence recorded against all acceptance criteria. diff --git a/docs/SITEMAP.md b/docs/SITEMAP.md index c4e8b1c..1694313 100644 --- a/docs/SITEMAP.md +++ b/docs/SITEMAP.md @@ -4,4 +4,6 @@ - [Tasks](./TASKS.md) - [Developer Guide](./DEVELOPER-GUIDE/README.md) - [Task Briefs](./tasks/MACP-PHASE1-brief.md) +- [Task Briefs](./tasks/MACP-PHASE2A-brief.md) - [Scratchpads](./scratchpads/macp-phase1.md) +- [Scratchpads](./scratchpads/macp-phase2a.md) diff --git a/docs/TASKS.md b/docs/TASKS.md index 2578549..f9190bd 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -14,4 +14,5 @@ Canonical tracking for active work. Keep this file current. | id | status | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | |---|---|---|---|---|---|---|---|---|---|---|---|---|---| -| MACP-PHASE1 | in-progress | Implement MACP Phase 1 across orchestrator schemas, dispatcher, controller, CLI, config, and task sync while preserving legacy queue behavior. | #8 | bootstrap | feat/macp-phase1 | | | Jarvis | 2026-03-27T23:00:00Z | | medium | in-progress | Review-fix pass started 2026-03-28T00:38:01Z to address backward-compatibility, ACP safety, result timing, and worktree/brief security findings on top of the blocked PR-create state. Prior blocker remains: `~/.config/mosaic/tools/git/pr-create.sh -t 'feat: implement MACP phase 1 core protocol' -b ... -B main -H feat/macp-phase1` failed with `Remote repository required: Specify ID via --repo or execute from a local git repo.` | +| MACP-PHASE2A | in-progress | Build the MACP event bridge with event watcher, webhook adapter, Discord formatter, CLI watch wiring, docs updates, and verification evidence. | #10 | bootstrap | feat/macp-phase2a | | | Jarvis | 2026-03-28T02:02:38Z | | medium | in-progress | Issue created via `~/.config/mosaic/tools/git/issue-create.sh` fallback after `tea` reported `Remote repository required: Specify ID via --repo or execute from a local git repo.` | +| MACP-PHASE2A-TESTS | in-progress | Add comprehensive stdlib unittest coverage for the Phase 2A event bridge modules and runner scaffolding. | #10 | bootstrap | feat/macp-phase2a | MACP-PHASE2A | | Jarvis | 2026-03-28T02:17:40Z | | small | in-progress | User-requested follow-on task from `docs/tasks/MACP-PHASE2A-tests.md`; verification target is `python3 -m unittest discover -s tests -p 'test_*.py' -v`. | diff --git a/docs/scratchpads/macp-phase2a.md b/docs/scratchpads/macp-phase2a.md new file mode 100644 index 0000000..7193009 --- /dev/null +++ b/docs/scratchpads/macp-phase2a.md @@ -0,0 +1,74 @@ +# MACP Phase 2A Scratchpad + +## Session Start + +- Session date: 2026-03-27 / 2026-03-28 America/Chicago +- Branch: `feat/macp-phase2a` +- Issue: `#10` +- Objective: build the MACP event bridge and notification system from `docs/tasks/MACP-PHASE2A-brief.md` + +## Budget + +- Budget mode: inferred working budget +- Estimate: medium +- Token strategy: keep context narrow to event-bridge files, verify with targeted temp-repo tests, avoid unnecessary parallel deep dives + +## Requirements Notes + +- PRD updated to Phase 2A before coding +- TDD requirement: not mandatory for this feature work; targeted verification is sufficient because this is new observability functionality rather than a bug fix or auth/data-mutation change +- Documentation gate applies because developer-facing behavior and CLI surface change + +## Assumptions + +1. ASSUMPTION: corrupt or partial lines should be logged and skipped while still advancing the cursor past the offending line, preventing permanent replay loops. +2. ASSUMPTION: `mosaic macp watch` may run without webhook delivery enabled and should still process events plus persist cursor state. +3. ASSUMPTION: Discord formatting remains a pure formatting layer; no outbound Discord transport is part of Phase 2A. + +## Plan + +1. Update PRD/TASKS and create the Phase 2A issue/scratchpad. +2. Implement watcher, webhook adapter, formatter, and CLI wiring. +3. Update developer docs and sitemap. +4. Run baseline and situational verification. +5. Run independent code review, remediate findings, then commit/push/PR/merge/CI/issue-close. + +## Progress Log + +- 2026-03-28T02:02:38Z: Created provider issue `#10` for Phase 2A using Mosaic wrapper with Gitea API fallback. +- 2026-03-28T02:02:38Z: Replaced stale Phase 1 PRD/TASKS planning state with Phase 2A scope and tracking. +- 2026-03-28T02:17:40Z: Resumed Phase 2A for the test-suite follow-on task; loaded Mosaic intake, runtime, resume protocol, shared memory, and issue state before implementation. +- 2026-03-28T02:17:40Z: Updated PRD/TASKS to include the stdlib unittest coverage requirement and the `MACP-PHASE2A-TESTS` tracking row. +- 2026-03-28T02:23:08Z: Added repo-local unittest coverage for watcher, webhook adapter, and Discord formatter plus `tests/run_tests.sh`. +- 2026-03-28T02:23:08Z: Test-driven remediation exposed and fixed two formatter sanitization bugs (`re.sub` replacement escaping and ANSI escape stripping order). +- 2026-03-28T02:23:08Z: Tightened webhook callback config semantics so `enabled` and `event_filter` are enforced directly by `create_webhook_callback`; tightened literal-IP SSRF blocking to match requested tests. + +## Verification Plan + +| Acceptance Criterion | Verification Method | Evidence | +|---|---|---| +| AC-1 watcher polls new events and respects cursor | Temp events file + repeated `poll_once()` / CLI runs | pending | +| AC-2 webhook delivery retries and succeeds/fails cleanly | Local stdlib echo server capture | pending | +| AC-3 Discord formatting covers required event types | Targeted Python formatter check | pending | +| AC-4 `mosaic macp watch --once` runs cleanly | CLI one-shot execution in temp repo | pending | +| AC-5 cursor persistence handles repeat run and truncation | Temp repo repeated runs with truncated file scenario | pending | +| AC-6 unittest suite passes for Phase 2A modules | `python3 -m unittest discover -s tests -p 'test_*.py' -v` | pass | + +## Tests Run + +- `bash -n tests/run_tests.sh` — pass +- `python3 -m py_compile tests/__init__.py tests/conftest.py tests/test_event_watcher.py tests/test_webhook_adapter.py tests/test_discord_formatter.py tools/orchestrator-matrix/events/webhook_adapter.py tools/orchestrator-matrix/events/discord_formatter.py` — pass +- `./tests/run_tests.sh` — pass (24 tests) +- `python3 -m unittest discover -s tests -p 'test_*.py' -v` — pass (24 tests) +- `python3 -m pytest tests/` — environment limitation: `pytest` module is not installed in this worktree runtime, so compatibility was inferred from stdlib-only `unittest` test structure rather than executed here + +## Review Notes + +- Manual review of the final delta found no remaining correctness issues after the formatter sanitization fixes and webhook config enforcement updates. +- `~/.config/mosaic/tools/codex/codex-security-review.sh --uncommitted` — no findings, risk level `none` +- `~/.config/mosaic/tools/codex/codex-code-review.sh --uncommitted` did not return a terminal summary in this runtime; relied on manual review plus passing tests for the final gate in this session. + +## Risks / Blockers + +- Potential git wrapper friction in worktrees for PR creation/merge steps; if it recurs, capture exact failing command and stop per Mosaic contract. +- `pytest` is not installed in the current runtime, so the suite’s pytest compatibility was not executed end-to-end here. diff --git a/docs/tasks/MACP-PHASE2A-brief.md b/docs/tasks/MACP-PHASE2A-brief.md new file mode 100644 index 0000000..378b9a1 --- /dev/null +++ b/docs/tasks/MACP-PHASE2A-brief.md @@ -0,0 +1,152 @@ +# MACP Phase 2A — Event Bridge + Notification System + +**Branch:** `feat/macp-phase2a` +**Repo worktree:** `~/src/mosaic-bootstrap-worktrees/macp-phase2a` + +--- + +## Objective + +Build the event bridge that makes MACP events consumable by external systems (OpenClaw, Discord, webhooks). This is the observability layer — the controller already writes events to `events.ndjson`, but nothing reads them yet. + +--- + +## Task 1: Event File Watcher (`tools/orchestrator-matrix/events/event_watcher.py`) + +New Python module that tails `events.ndjson` and fires callbacks on new events. + +### Requirements: +- Watch `.mosaic/orchestrator/events.ndjson` for new lines (use file polling, not inotify — keeps it portable) +- Parse each new line as JSON +- Call registered callback functions with the parsed event +- Support filtering by event type (e.g., only `task.completed` and `task.failed`) +- Maintain a cursor (last read position) so restarts don't replay old events +- Cursor stored in `.mosaic/orchestrator/event_cursor.json` + +### Key Functions: +```python +class EventWatcher: + def __init__(self, events_path: Path, cursor_path: Path, poll_interval: float = 2.0): + ... + + def on(self, event_types: list[str], callback: Callable[[dict], None]) -> None: + """Register a callback for specific event types.""" + + def poll_once(self) -> list[dict]: + """Read new events since last cursor position. Returns list of new events.""" + + def run(self, max_iterations: int = 0) -> None: + """Polling loop. max_iterations=0 means infinite.""" +``` + +### Constraints: +- Python 3.10+ stdlib only (no pip dependencies) +- Must handle truncated/corrupt lines gracefully (skip, log warning) +- File might not exist yet — handle gracefully +- Thread-safe cursor updates (atomic write via temp file rename) + +--- + +## Task 2: Webhook Adapter (`tools/orchestrator-matrix/events/webhook_adapter.py`) + +POST events to a configurable URL. This is how the OC plugin will consume MACP events. + +### Requirements: +- Accept an event dict, POST it as JSON to a configured URL +- Support optional `Authorization` header (bearer token) +- Configurable from `.mosaic/orchestrator/config.json` under `macp.webhook`: + ```json + { + "macp": { + "webhook": { + "enabled": false, + "url": "http://localhost:8080/macp/events", + "auth_token": "", + "timeout_seconds": 10, + "retry_count": 2, + "event_filter": ["task.completed", "task.failed", "task.escalated"] + } + } + } + ``` +- Retry with exponential backoff on failure (configurable count) +- Log failures but don't crash the watcher +- Return success/failure status + +### Key Functions: +```python +def send_webhook(event: dict, config: dict) -> bool: + """POST event to webhook URL. Returns True on success.""" + +def create_webhook_callback(config: dict) -> Callable[[dict], None]: + """Factory that creates a watcher callback from config.""" +``` + +### Constraints: +- Use `urllib.request` only (no `requests` library) +- Must not block the event watcher for more than `timeout_seconds` per event +- Log to stderr on failure + +--- + +## Task 3: Discord Notification Formatter (`tools/orchestrator-matrix/events/discord_formatter.py`) + +Format MACP events into human-readable Discord messages. + +### Requirements: +- Format functions for each event type: + - `task.completed` → "✅ **Task TASK-001 completed** — Implement user auth (attempt 1/1, 45s)" + - `task.failed` → "❌ **Task TASK-001 failed** — Build error: exit code 1 (attempt 2/3)" + - `task.escalated` → "🚨 **Task TASK-001 escalated** — Gate failures after 3 attempts. Human review needed." + - `task.gated` → "🔍 **Task TASK-001 gated** — Quality gates running..." + - `task.started` → "⚙️ **Task TASK-001 started** — Worker: codex, dispatch: yolo" +- Include task metadata: runtime, dispatch type, attempt count, duration (if available) +- Keep messages concise — Discord has character limits +- Return plain strings (the caller decides where to send them) + +### Key Functions: +```python +def format_event(event: dict) -> str | None: + """Format an MACP event for Discord. Returns None for unformattable events.""" + +def format_summary(events: list[dict]) -> str: + """Format a batch summary (e.g., daily digest).""" +``` + +--- + +## Wiring: CLI Integration + +Add to `bin/mosaic-macp`: +```bash +mosaic macp watch [--webhook] [--once] +``` +- `watch`: Start the event watcher with configured callbacks +- `--webhook`: Enable webhook delivery (reads config from `.mosaic/orchestrator/config.json`) +- `--once`: Poll once and exit (useful for cron) + +--- + +## Verification + +1. Create a test `events.ndjson` with sample events, run `event_watcher.poll_once()`, verify all events returned +2. Run watcher with webhook pointing to a local echo server, verify POST payload +3. Format each event type through `discord_formatter`, verify output strings +4. `mosaic macp watch --once` processes events without error +5. Cursor persistence: run twice, second run returns no events + +## File Map +``` +tools/orchestrator-matrix/ +├── events/ +│ ├── __init__.py ← NEW +│ ├── event_watcher.py ← NEW +│ ├── webhook_adapter.py ← NEW +│ └── discord_formatter.py ← NEW +``` + +## Ground Rules +- Python 3.10+ stdlib only +- No async/threads — synchronous polling +- Commit: `feat: add MACP event bridge — watcher, webhook, Discord formatter` +- Push to `feat/macp-phase2a` diff --git a/docs/tasks/MACP-PHASE2A-tests.md b/docs/tasks/MACP-PHASE2A-tests.md new file mode 100644 index 0000000..7b09b1f --- /dev/null +++ b/docs/tasks/MACP-PHASE2A-tests.md @@ -0,0 +1,81 @@ +# MACP Phase 2A — Test Suite + +**Branch:** `feat/macp-phase2a` (commit on top of existing) +**Repo worktree:** `~/src/mosaic-bootstrap-worktrees/macp-phase2a` + +--- + +## Objective + +Write a comprehensive test suite for the Phase 2A event bridge code using Python `unittest` (stdlib only). Tests must be runnable with `python3 -m pytest tests/` or `python3 -m unittest discover tests/`. + +--- + +## Task 1: Test infrastructure (`tests/conftest.py` + `tests/run_tests.sh`) + +Create `tests/` directory at repo root with: +- `conftest.py` — shared fixtures: temp directories, sample events, sample config +- `run_tests.sh` — simple runner: `python3 -m unittest discover -s tests -p 'test_*.py' -v` +- `__init__.py` — empty, makes tests a package + +Sample events fixture should include one of each type: `task.assigned`, `task.started`, `task.completed`, `task.failed`, `task.escalated`, `task.gated`, `task.retry.scheduled` + +--- + +## Task 2: Event watcher tests (`tests/test_event_watcher.py`) + +Test the `EventWatcher` class from `tools/orchestrator-matrix/events/event_watcher.py`. + +### Test cases: +1. `test_poll_empty_file` — No events file exists → returns empty list +2. `test_poll_new_events` — Write 3 events to ndjson, poll → returns all 3 +3. `test_cursor_persistence` — Poll once (reads 3), poll again → returns 0 (cursor saved) +4. `test_cursor_survives_restart` — Poll, create new watcher instance, poll → no duplicates +5. `test_corrupt_line_skipped` — Insert a corrupt JSON line between valid events → valid events returned, corrupt skipped +6. `test_callback_filtering` — Register callback for `task.completed` only → only completed events trigger it +7. `test_callback_receives_events` — Register callback, poll → callback called with correct event dicts +8. `test_file_grows_between_polls` — Poll (gets 2), append 3 more, poll → gets 3 + +--- + +## Task 3: Webhook adapter tests (`tests/test_webhook_adapter.py`) + +Test `send_webhook` and `create_webhook_callback` from `tools/orchestrator-matrix/events/webhook_adapter.py`. + +### Test cases: +1. `test_send_webhook_success` — Mock HTTP response 200 → returns True +2. `test_send_webhook_failure` — Mock HTTP response 500 → returns False +3. `test_send_webhook_timeout` — Mock timeout → returns False, no crash +4. `test_send_webhook_retry` — Mock 500 then 200 → retries and succeeds +5. `test_event_filter` — Config with filter `["task.completed"]` → callback ignores `task.started` +6. `test_webhook_disabled` — Config with `enabled: false` → no HTTP call made +7. `test_ssrf_blocked` — URL with private IP (127.0.0.1, 10.x) → blocked, returns False + +Use `unittest.mock.patch` to mock `urllib.request.urlopen`. + +--- + +## Task 4: Discord formatter tests (`tests/test_discord_formatter.py`) + +Test `format_event` and `format_summary` from `tools/orchestrator-matrix/events/discord_formatter.py`. + +### Test cases: +1. `test_format_completed` — Completed event → contains "✅" and task ID +2. `test_format_failed` — Failed event → contains "❌" and error message +3. `test_format_escalated` — Escalated event → contains "🚨" and escalation reason +4. `test_format_gated` — Gated event → contains "🔍" +5. `test_format_started` — Started event → contains "⚙️" and runtime info +6. `test_format_unknown_type` — Unknown event type → returns None +7. `test_sanitize_control_chars` — Event with control characters in message → stripped in output +8. `test_sanitize_mentions` — Event with `@everyone` in message → neutralized in output +9. `test_format_summary` — List of mixed events → summary with counts + +--- + +## Verification + +After writing tests: +1. `cd ~/src/mosaic-bootstrap-worktrees/macp-phase2a && python3 -m unittest discover -s tests -p 'test_*.py' -v` — ALL tests must pass +2. Fix any failures before committing + +Commit: `test: add comprehensive test suite for Phase 2A event bridge` diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..b58d645 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +import pathlib +import sys + + +REPO_ROOT = pathlib.Path(__file__).resolve().parents[1] +EVENTS_DIR = REPO_ROOT / "tools" / "orchestrator-matrix" / "events" +if str(EVENTS_DIR) not in sys.path: + sys.path.insert(0, str(EVENTS_DIR)) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..055f1df --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +import json +import pathlib +import sys +import tempfile +from dataclasses import dataclass +from typing import Any + + +REPO_ROOT = pathlib.Path(__file__).resolve().parents[1] +EVENTS_DIR = REPO_ROOT / "tools" / "orchestrator-matrix" / "events" +if str(EVENTS_DIR) not in sys.path: + sys.path.insert(0, str(EVENTS_DIR)) + + +@dataclass +class TempPaths: + root: pathlib.Path + events_path: pathlib.Path + cursor_path: pathlib.Path + config_path: pathlib.Path + + +def make_temp_paths() -> tuple[tempfile.TemporaryDirectory[str], TempPaths]: + tempdir = tempfile.TemporaryDirectory() + root = pathlib.Path(tempdir.name) + orch_dir = root / ".mosaic" / "orchestrator" + return tempdir, TempPaths( + root=root, + events_path=orch_dir / "events.ndjson", + cursor_path=orch_dir / "event_cursor.json", + config_path=orch_dir / "config.json", + ) + + +def sample_events() -> list[dict[str, Any]]: + return [ + { + "event_type": "task.assigned", + "task_id": "TASK-001", + "message": "Assigned to codex", + "metadata": {"runtime": "codex", "dispatch": "exec"}, + }, + { + "event_type": "task.started", + "task_id": "TASK-001", + "message": "Worker execution started", + "metadata": {"runtime": "codex", "dispatch": "exec", "attempt": 1, "max_attempts": 3}, + }, + { + "event_type": "task.completed", + "task_id": "TASK-001", + "message": "Completed successfully", + "metadata": {"task_title": "Finish test suite", "duration_seconds": 12.4}, + }, + { + "event_type": "task.failed", + "task_id": "TASK-002", + "message": "Exploded with stack trace", + "metadata": {"attempt": 2, "max_attempts": 3}, + }, + { + "event_type": "task.escalated", + "task_id": "TASK-003", + "message": "Human review required", + "metadata": {"attempt": 1}, + }, + { + "event_type": "task.gated", + "task_id": "TASK-004", + "message": "Waiting for quality gates", + "metadata": {}, + }, + { + "event_type": "task.retry.scheduled", + "task_id": "TASK-002", + "message": "Retry scheduled", + "metadata": {"attempt": 3, "max_attempts": 3}, + }, + ] + + +def sample_config( + *, + enabled: bool = True, + event_filter: list[str] | None = None, + url: str = "https://hooks.example.com/macp", + retry_count: int = 1, + timeout_seconds: float = 2.0, + auth_token: str = "token-123", +) -> dict[str, Any]: + return { + "macp": { + "watch_poll_interval_seconds": 0.1, + "webhook": { + "enabled": enabled, + "url": url, + "event_filter": list(event_filter or []), + "retry_count": retry_count, + "timeout_seconds": timeout_seconds, + "auth_token": auth_token, + }, + } + } + + +def write_ndjson(path: pathlib.Path, events: list[dict[str, Any]], extra_lines: list[str] | None = None) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + lines = [json.dumps(event) for event in events] + if extra_lines: + lines.extend(extra_lines) + path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +def append_ndjson(path: pathlib.Path, events: list[dict[str, Any]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", encoding="utf-8") as handle: + for event in events: + handle.write(json.dumps(event)) + handle.write("\n") diff --git a/tests/run_tests.sh b/tests/run_tests.sh new file mode 100755 index 0000000..81ccf60 --- /dev/null +++ b/tests/run_tests.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +set -euo pipefail + +python3 -m unittest discover -s tests -p 'test_*.py' -v diff --git a/tests/test_discord_formatter.py b/tests/test_discord_formatter.py new file mode 100644 index 0000000..19ac202 --- /dev/null +++ b/tests/test_discord_formatter.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +import unittest + +from tests.conftest import sample_events +from discord_formatter import format_event, format_summary + + +class DiscordFormatterTests(unittest.TestCase): + def setUp(self) -> None: + self.events = sample_events() + + def test_format_completed(self) -> None: + formatted = format_event(self.events[2]) + + self.assertIsNotNone(formatted) + self.assertIn("✅", formatted) + self.assertIn("TASK-001", formatted) + + def test_format_failed(self) -> None: + formatted = format_event(self.events[3]) + + self.assertIsNotNone(formatted) + self.assertIn("❌", formatted) + self.assertIn("Exploded with stack trace", formatted) + + def test_format_escalated(self) -> None: + formatted = format_event(self.events[4]) + + self.assertIsNotNone(formatted) + self.assertIn("🚨", formatted) + self.assertIn("Human review required", formatted) + + def test_format_gated(self) -> None: + formatted = format_event(self.events[5]) + + self.assertIsNotNone(formatted) + self.assertIn("🔍", formatted) + + def test_format_started(self) -> None: + formatted = format_event(self.events[1]) + + self.assertIsNotNone(formatted) + self.assertIn("⚙️", formatted) + self.assertIn("Worker: codex", formatted) + + def test_format_unknown_type(self) -> None: + self.assertIsNone(format_event({"event_type": "task.unknown", "task_id": "TASK-999"})) + + def test_sanitize_control_chars(self) -> None: + event = { + "event_type": "task.failed", + "task_id": "TASK-200", + "message": "bad\x00news\x1b[31m!", + } + + formatted = format_event(event) + + self.assertIsNotNone(formatted) + self.assertNotIn("\x00", formatted) + self.assertNotIn("\x1b", formatted) + self.assertIn("bad news !", formatted) + + def test_sanitize_mentions(self) -> None: + event = { + "event_type": "task.escalated", + "task_id": "TASK-201", + "message": "@everyone investigate", + } + + formatted = format_event(event) + + self.assertIsNotNone(formatted) + self.assertIn("@\u200beveryone investigate", formatted) + + def test_format_summary(self) -> None: + summary = format_summary([self.events[1], self.events[2], self.events[3]]) + + self.assertIn("3 events", summary) + self.assertIn("task.completed: 1", summary) + self.assertIn("task.failed: 1", summary) + self.assertIn("task.started: 1", summary) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_event_watcher.py b/tests/test_event_watcher.py new file mode 100644 index 0000000..f17f688 --- /dev/null +++ b/tests/test_event_watcher.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import json +import unittest + +from tests.conftest import append_ndjson, make_temp_paths, sample_events, write_ndjson +from event_watcher import EventWatcher + + +class EventWatcherTests(unittest.TestCase): + def setUp(self) -> None: + self.tempdir, self.paths = make_temp_paths() + self.events = sample_events() + + def tearDown(self) -> None: + self.tempdir.cleanup() + + def watcher(self) -> EventWatcher: + return EventWatcher(self.paths.events_path, self.paths.cursor_path, poll_interval=0.1) + + def test_poll_empty_file(self) -> None: + watcher = self.watcher() + self.assertEqual(watcher.poll_once(), []) + self.assertFalse(self.paths.cursor_path.exists()) + + def test_poll_new_events(self) -> None: + write_ndjson(self.paths.events_path, self.events[:3]) + + polled = self.watcher().poll_once() + + self.assertEqual(polled, self.events[:3]) + + def test_cursor_persistence(self) -> None: + watcher = self.watcher() + write_ndjson(self.paths.events_path, self.events[:3]) + + first = watcher.poll_once() + second = watcher.poll_once() + + self.assertEqual(first, self.events[:3]) + self.assertEqual(second, []) + cursor = json.loads(self.paths.cursor_path.read_text(encoding="utf-8")) + self.assertGreater(cursor["position"], 0) + + def test_cursor_survives_restart(self) -> None: + write_ndjson(self.paths.events_path, self.events[:3]) + + first_watcher = self.watcher() + self.assertEqual(first_watcher.poll_once(), self.events[:3]) + + second_watcher = self.watcher() + self.assertEqual(second_watcher.poll_once(), []) + + def test_corrupt_line_skipped(self) -> None: + self.paths.events_path.parent.mkdir(parents=True, exist_ok=True) + with self.paths.events_path.open("w", encoding="utf-8") as handle: + handle.write(json.dumps(self.events[0]) + "\n") + handle.write("{not-json}\n") + handle.write(json.dumps(self.events[1]) + "\n") + + polled = self.watcher().poll_once() + + self.assertEqual(polled, [self.events[0], self.events[1]]) + + def test_callback_filtering(self) -> None: + write_ndjson(self.paths.events_path, self.events) + received: list[dict[str, object]] = [] + watcher = self.watcher() + watcher.on(["task.completed"], received.append) + + watcher.poll_once() + + self.assertEqual(received, [self.events[2]]) + + def test_callback_receives_events(self) -> None: + write_ndjson(self.paths.events_path, self.events[:2]) + received: list[dict[str, object]] = [] + watcher = self.watcher() + watcher.on([], received.append) + + polled = watcher.poll_once() + + self.assertEqual(received, self.events[:2]) + self.assertEqual(polled, self.events[:2]) + + def test_file_grows_between_polls(self) -> None: + watcher = self.watcher() + write_ndjson(self.paths.events_path, self.events[:2]) + + first = watcher.poll_once() + append_ndjson(self.paths.events_path, self.events[2:5]) + second = watcher.poll_once() + + self.assertEqual(first, self.events[:2]) + self.assertEqual(second, self.events[2:5]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_webhook_adapter.py b/tests/test_webhook_adapter.py new file mode 100644 index 0000000..738a9de --- /dev/null +++ b/tests/test_webhook_adapter.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import unittest +import urllib.error +from unittest.mock import patch + +from tests.conftest import sample_config, sample_events +from webhook_adapter import create_webhook_callback, send_webhook + + +class MockHTTPResponse: + def __init__(self, status: int) -> None: + self.status = status + + def getcode(self) -> int: + return self.status + + def __enter__(self) -> "MockHTTPResponse": + return self + + def __exit__(self, exc_type, exc, tb) -> bool: + return False + + +class WebhookAdapterTests(unittest.TestCase): + def setUp(self) -> None: + self.events = sample_events() + self.completed_event = self.events[2] + self.started_event = self.events[1] + + @patch("webhook_adapter.urllib.request.urlopen", return_value=MockHTTPResponse(200)) + def test_send_webhook_success(self, mock_urlopen) -> None: + config = sample_config() + + result = send_webhook(self.completed_event, config) + + self.assertTrue(result) + self.assertEqual(mock_urlopen.call_count, 1) + request = mock_urlopen.call_args.args[0] + self.assertEqual(request.full_url, "https://hooks.example.com/macp") + self.assertEqual(request.get_method(), "POST") + self.assertEqual(request.headers["Authorization"], "Bearer token-123") + + @patch("webhook_adapter.urllib.request.urlopen", return_value=MockHTTPResponse(500)) + def test_send_webhook_failure(self, mock_urlopen) -> None: + result = send_webhook(self.completed_event, sample_config(retry_count=0)) + + self.assertFalse(result) + self.assertEqual(mock_urlopen.call_count, 1) + + @patch("webhook_adapter.urllib.request.urlopen", side_effect=TimeoutError("timed out")) + def test_send_webhook_timeout(self, mock_urlopen) -> None: + result = send_webhook(self.completed_event, sample_config(retry_count=0)) + + self.assertFalse(result) + self.assertEqual(mock_urlopen.call_count, 1) + + @patch("webhook_adapter.time.sleep", return_value=None) + @patch( + "webhook_adapter.urllib.request.urlopen", + side_effect=[MockHTTPResponse(500), MockHTTPResponse(200)], + ) + def test_send_webhook_retry(self, mock_urlopen, mock_sleep) -> None: + result = send_webhook(self.completed_event, sample_config(retry_count=1)) + + self.assertTrue(result) + self.assertEqual(mock_urlopen.call_count, 2) + mock_sleep.assert_called_once() + + @patch("webhook_adapter.send_webhook", return_value=True) + def test_event_filter(self, mock_send_webhook) -> None: + callback = create_webhook_callback(sample_config(event_filter=["task.completed"])) + + callback(self.started_event) + callback(self.completed_event) + + mock_send_webhook.assert_called_once_with(self.completed_event, sample_config(event_filter=["task.completed"])) + + @patch("webhook_adapter.send_webhook") + def test_webhook_disabled(self, mock_send_webhook) -> None: + callback = create_webhook_callback(sample_config(enabled=False)) + + callback(self.completed_event) + + mock_send_webhook.assert_not_called() + + @patch("webhook_adapter.urllib.request.urlopen") + def test_ssrf_blocked(self, mock_urlopen) -> None: + for url in ("http://127.0.0.1/webhook", "http://10.1.2.3/webhook"): + with self.subTest(url=url): + result = send_webhook( + self.completed_event, + sample_config(url=url, auth_token="", retry_count=0), + ) + self.assertFalse(result) + mock_urlopen.assert_not_called() + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/orchestrator-matrix/README.md b/tools/orchestrator-matrix/README.md index 0461c12..cf98a1d 100644 --- a/tools/orchestrator-matrix/README.md +++ b/tools/orchestrator-matrix/README.md @@ -109,4 +109,10 @@ mosaic macp submit ... mosaic macp status mosaic macp drain mosaic macp history --task-id TASK-001 +mosaic macp watch --once +mosaic macp watch --webhook ``` + +The Phase 2A event bridge consumes `.mosaic/orchestrator/events.ndjson` through a polling watcher, +persists cursor state in `.mosaic/orchestrator/event_cursor.json`, and can fan out events to +Discord-formatted stdout lines or webhook callbacks. diff --git a/tools/orchestrator-matrix/events/__init__.py b/tools/orchestrator-matrix/events/__init__.py new file mode 100644 index 0000000..fcac56e --- /dev/null +++ b/tools/orchestrator-matrix/events/__init__.py @@ -0,0 +1,15 @@ +"""Event bridge helpers for MACP orchestrator events.""" + +from .discord_formatter import format_event +from .discord_formatter import format_summary +from .event_watcher import EventWatcher +from .webhook_adapter import create_webhook_callback +from .webhook_adapter import send_webhook + +__all__ = [ + "EventWatcher", + "create_webhook_callback", + "format_event", + "format_summary", + "send_webhook", +] diff --git a/tools/orchestrator-matrix/events/discord_formatter.py b/tools/orchestrator-matrix/events/discord_formatter.py new file mode 100644 index 0000000..505b2be --- /dev/null +++ b/tools/orchestrator-matrix/events/discord_formatter.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +"""Format MACP orchestrator events for Discord-friendly text delivery.""" + +from __future__ import annotations + +import re +from typing import Any + +# Strip ANSI escapes before generic control characters so escape fragments do not survive. +_CTRL_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]|[\x00-\x1f\x7f]") +# Collapse Discord @-mentions / role pings to prevent deceptive pings +_MENTION_RE = re.compile(r"@(everyone|here|&?\d+)") + + +def _sanitize(value: str) -> str: + """Normalize untrusted text for safe rendering in Discord/terminal output.""" + value = _CTRL_RE.sub(" ", value) + value = _MENTION_RE.sub(lambda match: "@\u200b" + match.group(1), value) + return value.strip() + + +def _task_label(event: dict[str, Any]) -> str: + task_id = str(event.get("task_id") or "unknown") + return f"Task {task_id}" + + +def _title(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if isinstance(metadata, dict): + for key in ("task_title", "title", "description"): + value = _sanitize(str(metadata.get(key) or "")) + if value: + return value + message = _sanitize(str(event.get("message") or "")) + return message if message else "No details provided" + + +def _attempt_suffix(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if not isinstance(metadata, dict): + return "" + attempt = metadata.get("attempt") + max_attempts = metadata.get("max_attempts") + if attempt in (None, "") and max_attempts in (None, ""): + return "" + if attempt in (None, ""): + return f"attempt ?/{max_attempts}" + if max_attempts in (None, ""): + return f"attempt {attempt}" + return f"attempt {attempt}/{max_attempts}" + + +def _duration_suffix(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if not isinstance(metadata, dict): + return "" + duration = metadata.get("duration_seconds") + if duration in (None, ""): + return "" + try: + seconds = int(round(float(duration))) + except (TypeError, ValueError): + return "" + return f"{seconds}s" + + +def _runtime_dispatch_suffix(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if not isinstance(metadata, dict): + return "" + parts: list[str] = [] + runtime = str(metadata.get("runtime") or "").strip() + dispatch = str(metadata.get("dispatch") or "").strip() + if runtime: + parts.append(f"Worker: {runtime}") + if dispatch: + parts.append(f"dispatch: {dispatch}") + return ", ".join(parts) + + +def _meta_clause(*parts: str) -> str: + clean = [part for part in parts if part] + if not clean: + return "" + return f" ({', '.join(clean)})" + + +def format_event(event: dict[str, Any]) -> str | None: + """Format an MACP event for Discord. Returns None for unformattable events.""" + + event_type = str(event.get("event_type") or "").strip() + task_label = _task_label(event) + title = _title(event) + attempt_suffix = _attempt_suffix(event) + duration_suffix = _duration_suffix(event) + runtime_dispatch = _runtime_dispatch_suffix(event) + message = _sanitize(str(event.get("message") or "")) + + if event_type == "task.completed": + return f"✅ **{task_label} completed** — {title}{_meta_clause(attempt_suffix, duration_suffix)}" + if event_type == "task.failed": + detail = message or title + return f"❌ **{task_label} failed** — {detail}{_meta_clause(attempt_suffix)}" + if event_type == "task.escalated": + detail = message or title + return f"🚨 **{task_label} escalated** — {detail}{_meta_clause(attempt_suffix)}" + if event_type == "task.gated": + detail = message or "Quality gates running..." + return f"🔍 **{task_label} gated** — {detail}" + if event_type == "task.started": + detail = runtime_dispatch or message or "Worker execution started" + return f"⚙️ **{task_label} started** — {detail}{_meta_clause(attempt_suffix)}" + return None + + +def format_summary(events: list[dict[str, Any]]) -> str: + """Format a batch summary (e.g., daily digest).""" + + counts: dict[str, int] = {} + first_task = "" + last_task = "" + for event in events: + event_type = str(event.get("event_type") or "unknown") + counts[event_type] = counts.get(event_type, 0) + 1 + task_id = str(event.get("task_id") or "").strip() + if task_id and not first_task: + first_task = task_id + if task_id: + last_task = task_id + + if not counts: + return "No MACP events processed." + + ordered = ", ".join(f"{event_type}: {counts[event_type]}" for event_type in sorted(counts)) + task_span = "" + if first_task and last_task: + task_span = f" Tasks {first_task}" if first_task == last_task else f" Tasks {first_task} -> {last_task}" + return f"MACP event summary: {len(events)} events ({ordered}).{task_span}" diff --git a/tools/orchestrator-matrix/events/event_watcher.py b/tools/orchestrator-matrix/events/event_watcher.py new file mode 100644 index 0000000..ff77bee --- /dev/null +++ b/tools/orchestrator-matrix/events/event_watcher.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +"""Portable file-polling watcher for MACP orchestrator events.""" + +from __future__ import annotations + +import json +import pathlib +import sys +import time +from collections.abc import Callable +from typing import Any + + +def _warn(message: str) -> None: + print(f"[macp-event-watcher] {message}", file=sys.stderr) + + +def _load_json(path: pathlib.Path, default: Any) -> Any: + if not path.exists(): + return default + try: + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + except (OSError, json.JSONDecodeError) as exc: + _warn(f"failed to load cursor {path}: {exc}") + return default + + +def _save_json_atomic(path: pathlib.Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with tmp.open("w", encoding="utf-8") as handle: + json.dump(data, handle, indent=2) + handle.write("\n") + tmp.replace(path) + + +class EventWatcher: + """Poll an events NDJSON file and dispatch matching callbacks.""" + + def __init__(self, events_path: pathlib.Path, cursor_path: pathlib.Path, poll_interval: float = 2.0): + self.events_path = events_path + self.cursor_path = cursor_path + self.poll_interval = max(0.1, float(poll_interval)) + self._callbacks: list[tuple[set[str] | None, Callable[[dict[str, Any]], None]]] = [] + self._cursor_position = self._load_cursor() + + def on(self, event_types: list[str], callback: Callable[[dict[str, Any]], None]) -> None: + """Register a callback for specific event types.""" + + normalized = {str(event_type).strip() for event_type in event_types if str(event_type).strip()} + self._callbacks.append((normalized or None, callback)) + + def poll_once(self) -> list[dict[str, Any]]: + """Read new events since last cursor position. Returns list of new events.""" + + if not self.events_path.exists(): + return [] + + try: + file_size = self.events_path.stat().st_size + except OSError as exc: + _warn(f"failed to stat events file {self.events_path}: {exc}") + return [] + + if file_size < self._cursor_position: + _warn( + f"events file shrank from cursor={self._cursor_position} to size={file_size}; " + "resetting cursor to start" + ) + self._cursor_position = 0 + self._persist_cursor() + + events: list[dict[str, Any]] = [] + new_position = self._cursor_position + try: + with self.events_path.open("r", encoding="utf-8") as handle: + handle.seek(self._cursor_position) + while True: + line_start = handle.tell() + line = handle.readline() + if not line: + break + line_end = handle.tell() + if not line.endswith("\n"): + new_position = line_start + break + stripped = line.strip() + if not stripped: + new_position = line_end + continue + try: + event = json.loads(stripped) + except json.JSONDecodeError as exc: + _warn(f"skipping corrupt event at byte {line_start}: {exc}") + new_position = line_end + continue + if not isinstance(event, dict): + _warn(f"skipping non-object event at byte {line_start}") + new_position = line_end + continue + events.append(event) + self._dispatch(event) + new_position = line_end + except OSError as exc: + _warn(f"failed to read events file {self.events_path}: {exc}") + return [] + + if new_position != self._cursor_position: + self._cursor_position = new_position + self._persist_cursor() + return events + + def run(self, max_iterations: int = 0) -> None: + """Polling loop. max_iterations=0 means infinite.""" + + iterations = 0 + while max_iterations <= 0 or iterations < max_iterations: + self.poll_once() + iterations += 1 + if max_iterations > 0 and iterations >= max_iterations: + break + time.sleep(self.poll_interval) + + def _dispatch(self, event: dict[str, Any]) -> None: + event_type = str(event.get("event_type") or "").strip() + for filters, callback in self._callbacks: + if filters is not None and event_type not in filters: + continue + try: + callback(event) + except Exception as exc: # pragma: no cover - defensive boundary + _warn(f"callback failure for event {event_type or '<unknown>'}: {exc}") + + def _load_cursor(self) -> int: + payload = _load_json(self.cursor_path, {"position": 0}) + try: + position = int(payload.get("position", 0)) + except (AttributeError, TypeError, ValueError): + position = 0 + return max(0, position) + + def _persist_cursor(self) -> None: + _save_json_atomic(self.cursor_path, {"position": self._cursor_position}) diff --git a/tools/orchestrator-matrix/events/webhook_adapter.py b/tools/orchestrator-matrix/events/webhook_adapter.py new file mode 100644 index 0000000..7aff73a --- /dev/null +++ b/tools/orchestrator-matrix/events/webhook_adapter.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +"""Webhook delivery helpers for MACP events.""" + +from __future__ import annotations + +import json +import sys +import time +import urllib.error +import urllib.request +from collections.abc import Callable +from typing import Any + + +def _warn(message: str) -> None: + print(f"[macp-webhook] {message}", file=sys.stderr) + + +def _webhook_config(config: dict[str, Any]) -> dict[str, Any]: + macp = config.get("macp") + if isinstance(macp, dict) and isinstance(macp.get("webhook"), dict): + return dict(macp["webhook"]) + return dict(config) + + +def _validate_webhook_url(url: str, auth_token: str) -> str | None: + """Validate webhook URL for SSRF and cleartext credential risks. + + Returns an error message if the URL is disallowed, or None if safe. + """ + import ipaddress + import urllib.parse as urlparse + + parsed = urlparse.urlparse(url) + scheme = parsed.scheme.lower() + + if scheme not in ("http", "https"): + return f"unsupported scheme '{scheme}' — must be http or https" + + if auth_token and scheme == "http": + host = parsed.hostname or "" + # Allow cleartext only for explicit loopback (dev use) + if host not in ("localhost", "127.0.0.1", "::1"): + return "refusing to send auth_token over non-HTTPS to non-localhost — use https://" + + host = parsed.hostname or "" + # Block RFC1918, loopback, and link-local IPs outright. + try: + ip = ipaddress.ip_address(host) + if ip.is_loopback or ip.is_private or ip.is_link_local: + return f"refusing to send webhook to private/internal IP {ip}" + except ValueError: + pass # hostname — DNS resolution not validated here (best-effort) + + return None + + +def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool: + """POST event to webhook URL. Returns True on success.""" + + webhook = _webhook_config(config) + if webhook.get("enabled") is False: + return False + + url = str(webhook.get("url") or "").strip() + if not url: + _warn("missing webhook url") + return False + + timeout_seconds = max(1.0, float(webhook.get("timeout_seconds") or 10)) + retry_count = max(0, int(webhook.get("retry_count") or 0)) + auth_token = str(webhook.get("auth_token") or "").strip() + + url_err = _validate_webhook_url(url, auth_token) + if url_err: + _warn(f"webhook URL rejected: {url_err}") + return False + + payload = json.dumps(event, ensure_ascii=True).encode("utf-8") + headers = {"Content-Type": "application/json"} + if auth_token: + headers["Authorization"] = f"Bearer {auth_token}" + + attempts = retry_count + 1 + for attempt in range(1, attempts + 1): + request = urllib.request.Request(url, data=payload, headers=headers, method="POST") + try: + with urllib.request.urlopen(request, timeout=timeout_seconds) as response: + status = getattr(response, "status", response.getcode()) + if 200 <= int(status) < 300: + return True + _warn(f"webhook returned HTTP {status} on attempt {attempt}/{attempts}") + except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, ValueError) as exc: + _warn(f"webhook attempt {attempt}/{attempts} failed: {exc}") + if attempt < attempts: + time.sleep(min(timeout_seconds, 2 ** (attempt - 1))) + return False + + +def create_webhook_callback(config: dict[str, Any]) -> Callable[[dict[str, Any]], None]: + """Factory that creates a watcher callback from config.""" + + webhook = _webhook_config(config) + enabled = bool(webhook.get("enabled", False)) + event_filter = { + str(event_type).strip() + for event_type in list(webhook.get("event_filter") or []) + if str(event_type).strip() + } + + def callback(event: dict[str, Any]) -> None: + if not enabled: + return + event_type = str(event.get("event_type") or "").strip() + if event_filter and event_type not in event_filter: + return + if not send_webhook(event, config): + _warn(f"delivery failed for event {event.get('event_type', '<unknown>')}") + + return callback