diff --git a/.gitignore b/.gitignore index f82a4ae..65aad93 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ node_modules/ rails +*.pyc +**/__pycache__/ diff --git a/bin/mosaic-macp b/bin/mosaic-macp index 413ab55..581e231 100755 --- a/bin/mosaic-macp +++ b/bin/mosaic-macp @@ -16,6 +16,7 @@ Usage: mosaic macp status [--task-id TASK-001] mosaic macp drain mosaic macp history [--task-id TASK-001] + mosaic macp watch [--webhook] [--once] USAGE } @@ -193,6 +194,75 @@ for line in events_path.read_text(encoding="utf-8").splitlines(): PY } +watch_events() { + require_repo + local webhook_enabled="false" + local run_once="false" + while [[ $# -gt 0 ]]; do + case "$1" in + --webhook) webhook_enabled="true"; shift ;; + --once) run_once="true"; shift ;; + -h|--help) usage; exit 0 ;; + *) echo "[mosaic-macp] unknown watch option: $1" >&2; exit 1 ;; + esac + done + + python3 - "$repo_root" "$config_path" "$events_path" "$orch_dir/event_cursor.json" "$webhook_enabled" "$run_once" <<'PY' +import json +import pathlib +import sys + +repo_root = pathlib.Path(sys.argv[1]).resolve() +config_path = pathlib.Path(sys.argv[2]).resolve() +events_path = pathlib.Path(sys.argv[3]).resolve() +cursor_path = pathlib.Path(sys.argv[4]).resolve() +webhook_flag = sys.argv[5].lower() == "true" +run_once = sys.argv[6].lower() == "true" + +events_dir = repo_root / "tools" / "orchestrator-matrix" / "events" +if str(events_dir) not in sys.path: + sys.path.insert(0, str(events_dir)) + +from discord_formatter import format_event +from event_watcher import EventWatcher +from webhook_adapter import create_webhook_callback + +config = {} +if config_path.exists(): + try: + config = json.loads(config_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError) as e: + print(f"[macp] Warning: could not parse config {config_path}: {e}", file=sys.stderr) + config = {} + +macp = dict(config.get("macp") or {}) +watcher = EventWatcher( + events_path=events_path, + cursor_path=cursor_path, + poll_interval=float(macp.get("watch_poll_interval_seconds") or 2.0), +) + +def print_callback(event: dict) -> None: + rendered = format_event(event) + if rendered: + print(rendered) + +watcher.on([], print_callback) + +webhook_config = dict(macp.get("webhook") or {}) +if webhook_flag and bool(webhook_config.get("enabled", False)): + watcher.on(list(webhook_config.get("event_filter") or []), create_webhook_callback(config)) +elif webhook_flag: + print("[mosaic-macp] webhook requested but disabled in config", file=sys.stderr) + +if run_once: + processed = watcher.poll_once() + print(f"[mosaic-macp] processed_events={len(processed)}") +else: + watcher.run() +PY +} + subcommand="${1:-help}" if [[ $# -gt 0 ]]; then shift @@ -203,6 +273,7 @@ case "$subcommand" in status) status_tasks "$@" ;; drain) drain_tasks "$@" ;; history) history_tasks "$@" ;; + watch) watch_events "$@" ;; help|-h|--help|"") usage ;; *) echo "[mosaic-macp] unknown subcommand: $subcommand" >&2 diff --git a/docs/DEVELOPER-GUIDE/README.md b/docs/DEVELOPER-GUIDE/README.md index 148140e..a66ae35 100644 --- a/docs/DEVELOPER-GUIDE/README.md +++ b/docs/DEVELOPER-GUIDE/README.md @@ -3,3 +3,4 @@ ## Orchestrator Matrix - [MACP Phase 1](./orchestrator-matrix/macp-phase1.md) +- [MACP Phase 2A](./orchestrator-matrix/macp-phase2a.md) diff --git a/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md new file mode 100644 index 0000000..443a549 --- /dev/null +++ b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md @@ -0,0 +1,45 @@ +# MACP Phase 2A + +MACP Phase 2A adds the repo-local event bridge that makes orchestrator lifecycle events consumable by external systems. + +## What Changed + +1. `tools/orchestrator-matrix/events/event_watcher.py` polls `.mosaic/orchestrator/events.ndjson`, parses appended NDJSON events, dispatches callbacks, and persists a byte-offset cursor in `.mosaic/orchestrator/event_cursor.json`. +2. `tools/orchestrator-matrix/events/webhook_adapter.py` forwards selected MACP events to a configured webhook endpoint with bounded retries and optional bearer auth. +3. `tools/orchestrator-matrix/events/discord_formatter.py` renders task lifecycle events into concise Discord-friendly status lines. +4. `bin/mosaic-macp` adds `watch` mode for one-shot or continuous event processing. + +## Watcher Behavior + +1. File watching is polling-based and stdlib-only for portability. +2. The watcher resets its cursor if the events file is truncated. +3. Corrupt JSON lines are logged to stderr and skipped. +4. A trailing partial line is left unread until the newline arrives, preventing half-written events from being consumed. + +## Webhook Configuration + +Configure `.mosaic/orchestrator/config.json` under `macp.webhook`: + +```json +{ + "macp": { + "webhook": { + "enabled": false, + "url": "http://localhost:8080/macp/events", + "auth_token": "", + "timeout_seconds": 10, + "retry_count": 2, + "event_filter": ["task.completed", "task.failed", "task.escalated"] + } + } +} +``` + +## CLI + +```bash +mosaic macp watch --once +mosaic macp watch --webhook +``` + +`--once` performs a single poll and exits. `--webhook` enables delivery via the configured `macp.webhook` block while still printing Discord-formatted event lines to stdout. diff --git a/docs/MACP-BRIEF-TEMPLATE.md b/docs/MACP-BRIEF-TEMPLATE.md new file mode 100644 index 0000000..7641eee --- /dev/null +++ b/docs/MACP-BRIEF-TEMPLATE.md @@ -0,0 +1,86 @@ +# MACP Task Brief Template + +**Use this template for all MACP task briefs.** Workers that receive briefs not following this structure should flag it as an issue. + +--- + +```markdown +#
+
+### Constraints:
+-
+
+---
+
+## Task 2:
+
+
+---
+
+## Tests (MANDATORY)
+
+**Every brief MUST include a Tests section. Workers MUST write tests before or alongside implementation. Tests MUST pass before committing.**
+
+### Test file: `tests/test_.py`
+
+### Test cases:
+1. `test_` —
+2. `test_` —
+...
+
+### Test runner:
+```bash
+python3 -m unittest discover -s tests -p 'test_*.py' -v
+```
+
+---
+
+## Verification
+
+1. All tests pass: ``
+2. Python syntax: `python3 -c "import "`
+3.
+
+## Ground Rules
+- Python 3.10+ stdlib only (no pip dependencies)
+- Commit message: `feat: ` (conventional commits)
+- Push to `feat/` branch when done
+```
+
+---
+
+## Brief Sizing Rules
+
+| Brief Type | Max Items | Rationale |
+|------------|-----------|-----------|
+| **Build** (new code) | 2-3 | High cognitive load per item |
+| **Fix** (surgical changes) | 5-7 | Low cognitive load, exact file/line/fix |
+| **Review** | 1 | Naturally focused |
+| **Test** (add tests) | 3-4 | Medium load, but well-scoped |
+
+The key metric is **cognitive load per item**, not item count.
+- Build = construction (high load)
+- Fix = scalpel (low load)
+- Review = naturally focused
+- Test = moderate (reading existing code + writing test logic)
diff --git a/docs/PRD.md b/docs/PRD.md
index be490b8..fbdcc4e 100644
--- a/docs/PRD.md
+++ b/docs/PRD.md
@@ -1,4 +1,4 @@
-# PRD: MACP Phase 1 Core Protocol Implementation
+# PRD: MACP Phase 2A Event Bridge + Notification System
## Metadata
@@ -9,90 +9,91 @@
## Problem Statement
-The current orchestrator-matrix rail can queue shell-based worker tasks, but it does not yet expose a standardized protocol for dispatch selection, worktree-aware execution, structured results, or manual MACP queue operations. MACP Phase 1 extends the existing rail so orchestrators can delegate to multiple runtimes through a consistent task model while preserving current behavior for legacy tasks.
+MACP Phase 1 writes structured lifecycle events to `.mosaic/orchestrator/events.ndjson`, but no repo-local bridge consumes those events for external systems. Phase 2A adds a portable watcher, webhook delivery, and Discord-friendly formatting so MACP event streams can drive OpenClaw integrations and human-facing notifications.
## Objectives
-1. Extend the existing orchestrator-matrix protocol and controller to support MACP-aware task dispatch and status tracking.
-2. Add a dispatcher layer that manages worktree lifecycle, runtime command generation, and standardized results.
-3. Provide a CLI entrypoint for manual MACP submission, status inspection, queue draining, and history review.
+1. Add a synchronous event watcher that tails `events.ndjson` using stdlib-only file polling and persists cursor state across restarts.
+2. Add a webhook adapter that can forward selected MACP events to a configured HTTP endpoint with bounded retries.
+3. Add a Discord formatter that turns task lifecycle events into concise human-readable strings.
+4. Extend the `mosaic macp` CLI with a `watch` command for one-shot or continuous event bridge execution.
## Scope
### In Scope
-1. Extend the orchestrator task and event schemas and add a result schema.
-2. Add a Python dispatcher module under `tools/orchestrator-matrix/dispatcher/`.
-3. Update the controller to use the dispatcher for MACP-aware tasks while preserving legacy execution paths.
-4. Update orchestrator config templates, task markdown sync logic, and CLI routing/scripts for MACP commands.
-5. Add verification for backward compatibility, schema validity, imports, and basic MACP execution flow.
+1. New `tools/orchestrator-matrix/events/` package with watcher, webhook adapter, and Discord formatter modules.
+2. Cursor persistence at `.mosaic/orchestrator/event_cursor.json`.
+3. `mosaic macp watch [--webhook] [--once]` CLI support using `.mosaic/orchestrator/config.json`.
+4. Stdlib-only verification of watcher polling, webhook delivery, Discord formatting, CLI watch behavior, and cursor persistence.
+5. Developer documentation and sitemap updates covering the Phase 2A event bridge.
+6. A repo-local unittest suite under `tests/` that covers watcher polling/cursor behavior, webhook delivery logic, and Discord formatting.
### Out of Scope
-1. Rewriting the orchestrator controller architecture.
-2. Changing Matrix transport behavior beyond schema compatibility.
-3. Implementing real OpenClaw `sessions_spawn` execution beyond producing the config payload/command for callers.
-4. Adding non-stdlib Python dependencies or npm-based tooling.
+1. Adding Discord transport or webhook server hosting inside this repository.
+2. Replacing the existing Matrix transport bridge.
+3. Introducing async, threads, or third-party Python packages.
+4. Changing event emission behavior in the controller beyond consuming the existing event stream.
## User/Stakeholder Requirements
-1. MACP must evolve the current orchestrator-matrix implementation rather than replace it.
-2. Legacy task queues without `dispatch` fields must continue to run exactly as before.
-3. MACP-aware tasks must support dispatch modes `yolo`, `acp`, and `exec`.
-4. Results must be written in a structured JSON format suitable for audit and orchestration follow-up.
-5. A manual `mosaic macp` CLI must expose submit, status, drain, and history flows.
+1. External systems must be able to consume MACP events without reading the NDJSON file directly.
+2. The watcher must remain portable across environments, so file polling is required instead of platform-specific file watching.
+3. Restarting the watcher must not replay previously consumed events.
+4. Webhook delivery failures must be logged and isolated so the watcher loop continues running.
+5. Discord formatting must stay concise and useful for task lifecycle visibility.
## Functional Requirements
-1. Task schema must include MACP dispatch, worktree, result, retry, branch, brief, issue/PR, and dependency fields.
-2. Event schema must recognize `task.gated`, `task.escalated`, and `task.retry.scheduled`, plus a `dispatcher` source.
-3. Dispatcher functions must set up worktrees, build commands, execute tasks, collect results, and clean up worktrees.
-4. Controller `run_single_task()` must route MACP-aware tasks through the dispatcher and emit the correct lifecycle events/status transitions.
-5. `tasks_md_sync.py` must map optional MACP table columns only when those headers are present in `docs/TASKS.md`; absent MACP headers must not inject MACP fields into legacy tasks.
-6. `bin/mosaic` must route `mosaic macp ...` to a new `bin/mosaic-macp` script.
+1. `EventWatcher` must watch `.mosaic/orchestrator/events.ndjson`, parse appended JSON lines, and invoke registered callbacks for matching event types.
+2. `EventWatcher.poll_once()` must tolerate a missing events file, truncated/corrupt lines, and cursor positions that are stale after file truncation.
+3. Cursor writes must be atomic and stored at `.mosaic/orchestrator/event_cursor.json`.
+4. `send_webhook(event, config)` must POST JSON to the configured URL using `urllib.request`, optionally adding a bearer token, respecting timeout, and retrying with exponential backoff.
+5. `create_webhook_callback(config)` must return a callback that swallows/logs failures instead of raising into the watcher loop.
+6. `format_event(event)` must support `task.completed`, `task.failed`, `task.escalated`, `task.gated`, and `task.started`, including useful task metadata when present.
+7. `format_summary(events)` must produce a short batch summary suitable for notification digests.
+8. `bin/mosaic-macp` must expose `watch`, optionally enabling webhook delivery from config, and support one-shot polling with `--once`.
## Non-Functional Requirements
-1. Security: no secrets embedded in generated commands, config, or results.
-2. Performance: controller remains deterministic and synchronous with no async or thread-based orchestration.
-3. Reliability: worktree creation/cleanup failures must be surfaced predictably and produce structured task failure/escalation states.
-4. Observability: lifecycle events, logs, and result JSON must clearly show task outcome, attempts, gates, and errors.
+1. Security: no secrets embedded in code or logs; auth token only sent via header when configured.
+2. Performance: each webhook attempt must be bounded by `timeout_seconds`; no event-processing path may hang indefinitely.
+3. Reliability: corrupt input lines and callback delivery failures must be logged to stderr and skipped without crashing the watcher.
+4. Portability: Python 3.10+ stdlib only; no OS-specific file watcher APIs.
+5. Observability: warnings and failures must be clear enough to diagnose cursor, parsing, and webhook problems.
## Acceptance Criteria
-1. Existing legacy tasks without `dispatch` still run through the old shell path with unchanged behavior.
-2. MACP-aware `exec` tasks run through the dispatcher and produce result JSON with gate outcomes.
-3. New schemas validate task/event/result payload expectations for MACP fields and statuses.
-4. `mosaic macp submit`, `status`, and `history` work from a bootstrapped repo state, and `drain` delegates to the existing orchestrator runner.
-5. Python imports for the updated controller, dispatcher, and sync code complete without errors on Python 3.10+.
+1. `EventWatcher.poll_once()` reads newly appended events, returns parsed dicts, invokes registered callbacks, and skips already-consumed events after restart.
+2. Webhook delivery posts matching events to a local test endpoint, supports bearer auth configuration, and retries boundedly on failure.
+3. Discord formatter returns expected concise strings for the required task lifecycle event types and a usable batch summary.
+4. `mosaic macp watch --once` processes events from a bootstrapped repo state without error and honors `--webhook`.
+5. Cursor persistence prevents replay on a second run and resets safely when the events file is truncated.
+6. `python3 -m unittest discover -s tests -p 'test_*.py' -v` passes with stdlib-only tests for the Phase 2A event bridge modules.
## Constraints and Dependencies
1. Python implementation must use stdlib only and support Python 3.10+.
-2. Shell tooling must remain bash-based and fit the existing Mosaic CLI style.
-3. Dispatch fallback rules must use `exec` when `dispatch` is absent and config/default runtime when `runtime` is absent.
-4. Worktree convention must derive from the repository name and task metadata unless explicitly overridden by task fields.
+2. Shell CLI behavior must remain bash-based and consistent with the existing Mosaic command style.
+3. The watcher consumes the event schema already emitted by Phase 1 controller logic.
+4. Webhook configuration lives under `.mosaic/orchestrator/config.json` at `macp.webhook`.
## Risks and Open Questions
-1. Risk: yolo command execution requires a PTY, so the dispatcher needs a safe wrapper that still behaves under `subprocess`.
-2. Risk: worktree cleanup could remove a path unexpectedly if task metadata is malformed.
-3. Risk: old queue consumers may assume only the original task statuses and event types.
-4. Open Question: whether `task.gated` should be emitted by the dispatcher or controller once worker execution ends and quality gates begin.
+1. Risk: partial writes may leave an incomplete trailing JSON line that must not advance the cursor incorrectly.
+2. Risk: synchronous webhook retries can slow one poll cycle if the endpoint is unavailable; timeout and retry behavior must remain bounded.
+3. Risk: event payloads may omit optional metadata fields, so formatter output must degrade cleanly.
+4. ASSUMPTION: the watcher should advance past corrupt lines after logging them so a single bad line does not permanently stall downstream consumption.
+5. ASSUMPTION: CLI `watch` should default to no-op callback processing when no delivery option is enabled, while still updating the cursor and reporting processed count.
## Testing and Verification Expectations
-1. Baseline checks: Python import validation, targeted script execution checks, JSON syntax/schema validation, and any repo-local validation applicable to changed code paths.
-2. Situational testing: legacy orchestrator run with old-style tasks, MACP `exec` flow including result file generation, CLI submit/status/history behavior, and worktree lifecycle validation.
-3. Evidence format: command-level results captured in the scratchpad and summarized in the final delivery report.
+1. Baseline checks: Python bytecode compilation/import validation for new modules and shell syntax validation for `bin/mosaic-macp`.
+2. Situational tests: temporary orchestrator state exercising watcher polling, callback filtering, webhook POST capture/mocking, formatter sanitization, CLI one-shot watch execution, and cursor persistence across repeated runs.
+3. Evidence format: command-level results recorded in the scratchpad and summarized against acceptance criteria.
## Milestone / Delivery Intent
-1. Target milestone/version: 0.0.x bootstrap enhancement
-2. Definition of done: code merged to `main`, CI terminal green, issue `#8` closed, and verification evidence recorded against all acceptance criteria.
-
-## Assumptions
-
-1. ASSUMPTION: A single issue can track the full Phase 1 implementation because the user requested one bounded feature delivery rather than separate independent tickets.
-2. ASSUMPTION: For `acp` dispatch in Phase 1, the controller must escalate the task immediately with a clear reason instead of pretending work ran before OpenClaw integration exists.
-3. ASSUMPTION: `task.gated` should be emitted by the controller as the transition into quality-gate execution, which keeps gate-state ownership in one place alongside the existing gate loop.
+1. Target milestone/version: Phase 2A observability bridge
+2. Definition of done: code merged to `main`, CI terminal green, issue `#10` closed, and verification evidence recorded against all acceptance criteria.
diff --git a/docs/SITEMAP.md b/docs/SITEMAP.md
index c4e8b1c..1694313 100644
--- a/docs/SITEMAP.md
+++ b/docs/SITEMAP.md
@@ -4,4 +4,6 @@
- [Tasks](./TASKS.md)
- [Developer Guide](./DEVELOPER-GUIDE/README.md)
- [Task Briefs](./tasks/MACP-PHASE1-brief.md)
+- [Task Briefs](./tasks/MACP-PHASE2A-brief.md)
- [Scratchpads](./scratchpads/macp-phase1.md)
+- [Scratchpads](./scratchpads/macp-phase2a.md)
diff --git a/docs/TASKS.md b/docs/TASKS.md
index 2578549..f9190bd 100644
--- a/docs/TASKS.md
+++ b/docs/TASKS.md
@@ -14,4 +14,5 @@ Canonical tracking for active work. Keep this file current.
| id | status | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
-| MACP-PHASE1 | in-progress | Implement MACP Phase 1 across orchestrator schemas, dispatcher, controller, CLI, config, and task sync while preserving legacy queue behavior. | #8 | bootstrap | feat/macp-phase1 | | | Jarvis | 2026-03-27T23:00:00Z | | medium | in-progress | Review-fix pass started 2026-03-28T00:38:01Z to address backward-compatibility, ACP safety, result timing, and worktree/brief security findings on top of the blocked PR-create state. Prior blocker remains: `~/.config/mosaic/tools/git/pr-create.sh -t 'feat: implement MACP phase 1 core protocol' -b ... -B main -H feat/macp-phase1` failed with `Remote repository required: Specify ID via --repo or execute from a local git repo.` |
+| MACP-PHASE2A | in-progress | Build the MACP event bridge with event watcher, webhook adapter, Discord formatter, CLI watch wiring, docs updates, and verification evidence. | #10 | bootstrap | feat/macp-phase2a | | | Jarvis | 2026-03-28T02:02:38Z | | medium | in-progress | Issue created via `~/.config/mosaic/tools/git/issue-create.sh` fallback after `tea` reported `Remote repository required: Specify ID via --repo or execute from a local git repo.` |
+| MACP-PHASE2A-TESTS | in-progress | Add comprehensive stdlib unittest coverage for the Phase 2A event bridge modules and runner scaffolding. | #10 | bootstrap | feat/macp-phase2a | MACP-PHASE2A | | Jarvis | 2026-03-28T02:17:40Z | | small | in-progress | User-requested follow-on task from `docs/tasks/MACP-PHASE2A-tests.md`; verification target is `python3 -m unittest discover -s tests -p 'test_*.py' -v`. |
diff --git a/docs/scratchpads/macp-phase2a.md b/docs/scratchpads/macp-phase2a.md
new file mode 100644
index 0000000..7193009
--- /dev/null
+++ b/docs/scratchpads/macp-phase2a.md
@@ -0,0 +1,74 @@
+# MACP Phase 2A Scratchpad
+
+## Session Start
+
+- Session date: 2026-03-27 / 2026-03-28 America/Chicago
+- Branch: `feat/macp-phase2a`
+- Issue: `#10`
+- Objective: build the MACP event bridge and notification system from `docs/tasks/MACP-PHASE2A-brief.md`
+
+## Budget
+
+- Budget mode: inferred working budget
+- Estimate: medium
+- Token strategy: keep context narrow to event-bridge files, verify with targeted temp-repo tests, avoid unnecessary parallel deep dives
+
+## Requirements Notes
+
+- PRD updated to Phase 2A before coding
+- TDD requirement: not mandatory for this feature work; targeted verification is sufficient because this is new observability functionality rather than a bug fix or auth/data-mutation change
+- Documentation gate applies because developer-facing behavior and CLI surface change
+
+## Assumptions
+
+1. ASSUMPTION: corrupt or partial lines should be logged and skipped while still advancing the cursor past the offending line, preventing permanent replay loops.
+2. ASSUMPTION: `mosaic macp watch` may run without webhook delivery enabled and should still process events plus persist cursor state.
+3. ASSUMPTION: Discord formatting remains a pure formatting layer; no outbound Discord transport is part of Phase 2A.
+
+## Plan
+
+1. Update PRD/TASKS and create the Phase 2A issue/scratchpad.
+2. Implement watcher, webhook adapter, formatter, and CLI wiring.
+3. Update developer docs and sitemap.
+4. Run baseline and situational verification.
+5. Run independent code review, remediate findings, then commit/push/PR/merge/CI/issue-close.
+
+## Progress Log
+
+- 2026-03-28T02:02:38Z: Created provider issue `#10` for Phase 2A using Mosaic wrapper with Gitea API fallback.
+- 2026-03-28T02:02:38Z: Replaced stale Phase 1 PRD/TASKS planning state with Phase 2A scope and tracking.
+- 2026-03-28T02:17:40Z: Resumed Phase 2A for the test-suite follow-on task; loaded Mosaic intake, runtime, resume protocol, shared memory, and issue state before implementation.
+- 2026-03-28T02:17:40Z: Updated PRD/TASKS to include the stdlib unittest coverage requirement and the `MACP-PHASE2A-TESTS` tracking row.
+- 2026-03-28T02:23:08Z: Added repo-local unittest coverage for watcher, webhook adapter, and Discord formatter plus `tests/run_tests.sh`.
+- 2026-03-28T02:23:08Z: Test-driven remediation exposed and fixed two formatter sanitization bugs (`re.sub` replacement escaping and ANSI escape stripping order).
+- 2026-03-28T02:23:08Z: Tightened webhook callback config semantics so `enabled` and `event_filter` are enforced directly by `create_webhook_callback`; tightened literal-IP SSRF blocking to match requested tests.
+
+## Verification Plan
+
+| Acceptance Criterion | Verification Method | Evidence |
+|---|---|---|
+| AC-1 watcher polls new events and respects cursor | Temp events file + repeated `poll_once()` / CLI runs | pending |
+| AC-2 webhook delivery retries and succeeds/fails cleanly | Local stdlib echo server capture | pending |
+| AC-3 Discord formatting covers required event types | Targeted Python formatter check | pending |
+| AC-4 `mosaic macp watch --once` runs cleanly | CLI one-shot execution in temp repo | pending |
+| AC-5 cursor persistence handles repeat run and truncation | Temp repo repeated runs with truncated file scenario | pending |
+| AC-6 unittest suite passes for Phase 2A modules | `python3 -m unittest discover -s tests -p 'test_*.py' -v` | pass |
+
+## Tests Run
+
+- `bash -n tests/run_tests.sh` — pass
+- `python3 -m py_compile tests/__init__.py tests/conftest.py tests/test_event_watcher.py tests/test_webhook_adapter.py tests/test_discord_formatter.py tools/orchestrator-matrix/events/webhook_adapter.py tools/orchestrator-matrix/events/discord_formatter.py` — pass
+- `./tests/run_tests.sh` — pass (24 tests)
+- `python3 -m unittest discover -s tests -p 'test_*.py' -v` — pass (24 tests)
+- `python3 -m pytest tests/` — environment limitation: `pytest` module is not installed in this worktree runtime, so compatibility was inferred from stdlib-only `unittest` test structure rather than executed here
+
+## Review Notes
+
+- Manual review of the final delta found no remaining correctness issues after the formatter sanitization fixes and webhook config enforcement updates.
+- `~/.config/mosaic/tools/codex/codex-security-review.sh --uncommitted` — no findings, risk level `none`
+- `~/.config/mosaic/tools/codex/codex-code-review.sh --uncommitted` did not return a terminal summary in this runtime; relied on manual review plus passing tests for the final gate in this session.
+
+## Risks / Blockers
+
+- Potential git wrapper friction in worktrees for PR creation/merge steps; if it recurs, capture exact failing command and stop per Mosaic contract.
+- `pytest` is not installed in the current runtime, so the suite’s pytest compatibility was not executed end-to-end here.
diff --git a/docs/tasks/MACP-PHASE2A-brief.md b/docs/tasks/MACP-PHASE2A-brief.md
new file mode 100644
index 0000000..378b9a1
--- /dev/null
+++ b/docs/tasks/MACP-PHASE2A-brief.md
@@ -0,0 +1,152 @@
+# MACP Phase 2A — Event Bridge + Notification System
+
+**Branch:** `feat/macp-phase2a`
+**Repo worktree:** `~/src/mosaic-bootstrap-worktrees/macp-phase2a`
+
+---
+
+## Objective
+
+Build the event bridge that makes MACP events consumable by external systems (OpenClaw, Discord, webhooks). This is the observability layer — the controller already writes events to `events.ndjson`, but nothing reads them yet.
+
+---
+
+## Task 1: Event File Watcher (`tools/orchestrator-matrix/events/event_watcher.py`)
+
+New Python module that tails `events.ndjson` and fires callbacks on new events.
+
+### Requirements:
+- Watch `.mosaic/orchestrator/events.ndjson` for new lines (use file polling, not inotify — keeps it portable)
+- Parse each new line as JSON
+- Call registered callback functions with the parsed event
+- Support filtering by event type (e.g., only `task.completed` and `task.failed`)
+- Maintain a cursor (last read position) so restarts don't replay old events
+- Cursor stored in `.mosaic/orchestrator/event_cursor.json`
+
+### Key Functions:
+```python
+class EventWatcher:
+ def __init__(self, events_path: Path, cursor_path: Path, poll_interval: float = 2.0):
+ ...
+
+ def on(self, event_types: list[str], callback: Callable[[dict], None]) -> None:
+ """Register a callback for specific event types."""
+
+ def poll_once(self) -> list[dict]:
+ """Read new events since last cursor position. Returns list of new events."""
+
+ def run(self, max_iterations: int = 0) -> None:
+ """Polling loop. max_iterations=0 means infinite."""
+```
+
+### Constraints:
+- Python 3.10+ stdlib only (no pip dependencies)
+- Must handle truncated/corrupt lines gracefully (skip, log warning)
+- File might not exist yet — handle gracefully
+- Thread-safe cursor updates (atomic write via temp file rename)
+
+---
+
+## Task 2: Webhook Adapter (`tools/orchestrator-matrix/events/webhook_adapter.py`)
+
+POST events to a configurable URL. This is how the OC plugin will consume MACP events.
+
+### Requirements:
+- Accept an event dict, POST it as JSON to a configured URL
+- Support optional `Authorization` header (bearer token)
+- Configurable from `.mosaic/orchestrator/config.json` under `macp.webhook`:
+ ```json
+ {
+ "macp": {
+ "webhook": {
+ "enabled": false,
+ "url": "http://localhost:8080/macp/events",
+ "auth_token": "",
+ "timeout_seconds": 10,
+ "retry_count": 2,
+ "event_filter": ["task.completed", "task.failed", "task.escalated"]
+ }
+ }
+ }
+ ```
+- Retry with exponential backoff on failure (configurable count)
+- Log failures but don't crash the watcher
+- Return success/failure status
+
+### Key Functions:
+```python
+def send_webhook(event: dict, config: dict) -> bool:
+ """POST event to webhook URL. Returns True on success."""
+
+def create_webhook_callback(config: dict) -> Callable[[dict], None]:
+ """Factory that creates a watcher callback from config."""
+```
+
+### Constraints:
+- Use `urllib.request` only (no `requests` library)
+- Must not block the event watcher for more than `timeout_seconds` per event
+- Log to stderr on failure
+
+---
+
+## Task 3: Discord Notification Formatter (`tools/orchestrator-matrix/events/discord_formatter.py`)
+
+Format MACP events into human-readable Discord messages.
+
+### Requirements:
+- Format functions for each event type:
+ - `task.completed` → "✅ **Task TASK-001 completed** — Implement user auth (attempt 1/1, 45s)"
+ - `task.failed` → "❌ **Task TASK-001 failed** — Build error: exit code 1 (attempt 2/3)"
+ - `task.escalated` → "🚨 **Task TASK-001 escalated** — Gate failures after 3 attempts. Human review needed."
+ - `task.gated` → "🔍 **Task TASK-001 gated** — Quality gates running..."
+ - `task.started` → "⚙️ **Task TASK-001 started** — Worker: codex, dispatch: yolo"
+- Include task metadata: runtime, dispatch type, attempt count, duration (if available)
+- Keep messages concise — Discord has character limits
+- Return plain strings (the caller decides where to send them)
+
+### Key Functions:
+```python
+def format_event(event: dict) -> str | None:
+ """Format an MACP event for Discord. Returns None for unformattable events."""
+
+def format_summary(events: list[dict]) -> str:
+ """Format a batch summary (e.g., daily digest)."""
+```
+
+---
+
+## Wiring: CLI Integration
+
+Add to `bin/mosaic-macp`:
+```bash
+mosaic macp watch [--webhook] [--once]
+```
+- `watch`: Start the event watcher with configured callbacks
+- `--webhook`: Enable webhook delivery (reads config from `.mosaic/orchestrator/config.json`)
+- `--once`: Poll once and exit (useful for cron)
+
+---
+
+## Verification
+
+1. Create a test `events.ndjson` with sample events, run `event_watcher.poll_once()`, verify all events returned
+2. Run watcher with webhook pointing to a local echo server, verify POST payload
+3. Format each event type through `discord_formatter`, verify output strings
+4. `mosaic macp watch --once` processes events without error
+5. Cursor persistence: run twice, second run returns no events
+
+## File Map
+```
+tools/orchestrator-matrix/
+├── events/
+│ ├── __init__.py ← NEW
+│ ├── event_watcher.py ← NEW
+│ ├── webhook_adapter.py ← NEW
+│ └── discord_formatter.py ← NEW
+```
+
+## Ground Rules
+- Python 3.10+ stdlib only
+- No async/threads — synchronous polling
+- Commit: `feat: add MACP event bridge — watcher, webhook, Discord formatter`
+- Push to `feat/macp-phase2a`
diff --git a/docs/tasks/MACP-PHASE2A-tests.md b/docs/tasks/MACP-PHASE2A-tests.md
new file mode 100644
index 0000000..7b09b1f
--- /dev/null
+++ b/docs/tasks/MACP-PHASE2A-tests.md
@@ -0,0 +1,81 @@
+# MACP Phase 2A — Test Suite
+
+**Branch:** `feat/macp-phase2a` (commit on top of existing)
+**Repo worktree:** `~/src/mosaic-bootstrap-worktrees/macp-phase2a`
+
+---
+
+## Objective
+
+Write a comprehensive test suite for the Phase 2A event bridge code using Python `unittest` (stdlib only). Tests must be runnable with `python3 -m pytest tests/` or `python3 -m unittest discover tests/`.
+
+---
+
+## Task 1: Test infrastructure (`tests/conftest.py` + `tests/run_tests.sh`)
+
+Create `tests/` directory at repo root with:
+- `conftest.py` — shared fixtures: temp directories, sample events, sample config
+- `run_tests.sh` — simple runner: `python3 -m unittest discover -s tests -p 'test_*.py' -v`
+- `__init__.py` — empty, makes tests a package
+
+Sample events fixture should include one of each type: `task.assigned`, `task.started`, `task.completed`, `task.failed`, `task.escalated`, `task.gated`, `task.retry.scheduled`
+
+---
+
+## Task 2: Event watcher tests (`tests/test_event_watcher.py`)
+
+Test the `EventWatcher` class from `tools/orchestrator-matrix/events/event_watcher.py`.
+
+### Test cases:
+1. `test_poll_empty_file` — No events file exists → returns empty list
+2. `test_poll_new_events` — Write 3 events to ndjson, poll → returns all 3
+3. `test_cursor_persistence` — Poll once (reads 3), poll again → returns 0 (cursor saved)
+4. `test_cursor_survives_restart` — Poll, create new watcher instance, poll → no duplicates
+5. `test_corrupt_line_skipped` — Insert a corrupt JSON line between valid events → valid events returned, corrupt skipped
+6. `test_callback_filtering` — Register callback for `task.completed` only → only completed events trigger it
+7. `test_callback_receives_events` — Register callback, poll → callback called with correct event dicts
+8. `test_file_grows_between_polls` — Poll (gets 2), append 3 more, poll → gets 3
+
+---
+
+## Task 3: Webhook adapter tests (`tests/test_webhook_adapter.py`)
+
+Test `send_webhook` and `create_webhook_callback` from `tools/orchestrator-matrix/events/webhook_adapter.py`.
+
+### Test cases:
+1. `test_send_webhook_success` — Mock HTTP response 200 → returns True
+2. `test_send_webhook_failure` — Mock HTTP response 500 → returns False
+3. `test_send_webhook_timeout` — Mock timeout → returns False, no crash
+4. `test_send_webhook_retry` — Mock 500 then 200 → retries and succeeds
+5. `test_event_filter` — Config with filter `["task.completed"]` → callback ignores `task.started`
+6. `test_webhook_disabled` — Config with `enabled: false` → no HTTP call made
+7. `test_ssrf_blocked` — URL with private IP (127.0.0.1, 10.x) → blocked, returns False
+
+Use `unittest.mock.patch` to mock `urllib.request.urlopen`.
+
+---
+
+## Task 4: Discord formatter tests (`tests/test_discord_formatter.py`)
+
+Test `format_event` and `format_summary` from `tools/orchestrator-matrix/events/discord_formatter.py`.
+
+### Test cases:
+1. `test_format_completed` — Completed event → contains "✅" and task ID
+2. `test_format_failed` — Failed event → contains "❌" and error message
+3. `test_format_escalated` — Escalated event → contains "🚨" and escalation reason
+4. `test_format_gated` — Gated event → contains "🔍"
+5. `test_format_started` — Started event → contains "⚙️" and runtime info
+6. `test_format_unknown_type` — Unknown event type → returns None
+7. `test_sanitize_control_chars` — Event with control characters in message → stripped in output
+8. `test_sanitize_mentions` — Event with `@everyone` in message → neutralized in output
+9. `test_format_summary` — List of mixed events → summary with counts
+
+---
+
+## Verification
+
+After writing tests:
+1. `cd ~/src/mosaic-bootstrap-worktrees/macp-phase2a && python3 -m unittest discover -s tests -p 'test_*.py' -v` — ALL tests must pass
+2. Fix any failures before committing
+
+Commit: `test: add comprehensive test suite for Phase 2A event bridge`
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..b58d645
--- /dev/null
+++ b/tests/__init__.py
@@ -0,0 +1,10 @@
+from __future__ import annotations
+
+import pathlib
+import sys
+
+
+REPO_ROOT = pathlib.Path(__file__).resolve().parents[1]
+EVENTS_DIR = REPO_ROOT / "tools" / "orchestrator-matrix" / "events"
+if str(EVENTS_DIR) not in sys.path:
+ sys.path.insert(0, str(EVENTS_DIR))
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..055f1df
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,121 @@
+from __future__ import annotations
+
+import json
+import pathlib
+import sys
+import tempfile
+from dataclasses import dataclass
+from typing import Any
+
+
+REPO_ROOT = pathlib.Path(__file__).resolve().parents[1]
+EVENTS_DIR = REPO_ROOT / "tools" / "orchestrator-matrix" / "events"
+if str(EVENTS_DIR) not in sys.path:
+ sys.path.insert(0, str(EVENTS_DIR))
+
+
+@dataclass
+class TempPaths:
+ root: pathlib.Path
+ events_path: pathlib.Path
+ cursor_path: pathlib.Path
+ config_path: pathlib.Path
+
+
+def make_temp_paths() -> tuple[tempfile.TemporaryDirectory[str], TempPaths]:
+ tempdir = tempfile.TemporaryDirectory()
+ root = pathlib.Path(tempdir.name)
+ orch_dir = root / ".mosaic" / "orchestrator"
+ return tempdir, TempPaths(
+ root=root,
+ events_path=orch_dir / "events.ndjson",
+ cursor_path=orch_dir / "event_cursor.json",
+ config_path=orch_dir / "config.json",
+ )
+
+
+def sample_events() -> list[dict[str, Any]]:
+ return [
+ {
+ "event_type": "task.assigned",
+ "task_id": "TASK-001",
+ "message": "Assigned to codex",
+ "metadata": {"runtime": "codex", "dispatch": "exec"},
+ },
+ {
+ "event_type": "task.started",
+ "task_id": "TASK-001",
+ "message": "Worker execution started",
+ "metadata": {"runtime": "codex", "dispatch": "exec", "attempt": 1, "max_attempts": 3},
+ },
+ {
+ "event_type": "task.completed",
+ "task_id": "TASK-001",
+ "message": "Completed successfully",
+ "metadata": {"task_title": "Finish test suite", "duration_seconds": 12.4},
+ },
+ {
+ "event_type": "task.failed",
+ "task_id": "TASK-002",
+ "message": "Exploded with stack trace",
+ "metadata": {"attempt": 2, "max_attempts": 3},
+ },
+ {
+ "event_type": "task.escalated",
+ "task_id": "TASK-003",
+ "message": "Human review required",
+ "metadata": {"attempt": 1},
+ },
+ {
+ "event_type": "task.gated",
+ "task_id": "TASK-004",
+ "message": "Waiting for quality gates",
+ "metadata": {},
+ },
+ {
+ "event_type": "task.retry.scheduled",
+ "task_id": "TASK-002",
+ "message": "Retry scheduled",
+ "metadata": {"attempt": 3, "max_attempts": 3},
+ },
+ ]
+
+
+def sample_config(
+ *,
+ enabled: bool = True,
+ event_filter: list[str] | None = None,
+ url: str = "https://hooks.example.com/macp",
+ retry_count: int = 1,
+ timeout_seconds: float = 2.0,
+ auth_token: str = "token-123",
+) -> dict[str, Any]:
+ return {
+ "macp": {
+ "watch_poll_interval_seconds": 0.1,
+ "webhook": {
+ "enabled": enabled,
+ "url": url,
+ "event_filter": list(event_filter or []),
+ "retry_count": retry_count,
+ "timeout_seconds": timeout_seconds,
+ "auth_token": auth_token,
+ },
+ }
+ }
+
+
+def write_ndjson(path: pathlib.Path, events: list[dict[str, Any]], extra_lines: list[str] | None = None) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ lines = [json.dumps(event) for event in events]
+ if extra_lines:
+ lines.extend(extra_lines)
+ path.write_text("\n".join(lines) + "\n", encoding="utf-8")
+
+
+def append_ndjson(path: pathlib.Path, events: list[dict[str, Any]]) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ with path.open("a", encoding="utf-8") as handle:
+ for event in events:
+ handle.write(json.dumps(event))
+ handle.write("\n")
diff --git a/tests/run_tests.sh b/tests/run_tests.sh
new file mode 100755
index 0000000..81ccf60
--- /dev/null
+++ b/tests/run_tests.sh
@@ -0,0 +1,4 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+python3 -m unittest discover -s tests -p 'test_*.py' -v
diff --git a/tests/test_discord_formatter.py b/tests/test_discord_formatter.py
new file mode 100644
index 0000000..19ac202
--- /dev/null
+++ b/tests/test_discord_formatter.py
@@ -0,0 +1,86 @@
+from __future__ import annotations
+
+import unittest
+
+from tests.conftest import sample_events
+from discord_formatter import format_event, format_summary
+
+
+class DiscordFormatterTests(unittest.TestCase):
+ def setUp(self) -> None:
+ self.events = sample_events()
+
+ def test_format_completed(self) -> None:
+ formatted = format_event(self.events[2])
+
+ self.assertIsNotNone(formatted)
+ self.assertIn("✅", formatted)
+ self.assertIn("TASK-001", formatted)
+
+ def test_format_failed(self) -> None:
+ formatted = format_event(self.events[3])
+
+ self.assertIsNotNone(formatted)
+ self.assertIn("❌", formatted)
+ self.assertIn("Exploded with stack trace", formatted)
+
+ def test_format_escalated(self) -> None:
+ formatted = format_event(self.events[4])
+
+ self.assertIsNotNone(formatted)
+ self.assertIn("🚨", formatted)
+ self.assertIn("Human review required", formatted)
+
+ def test_format_gated(self) -> None:
+ formatted = format_event(self.events[5])
+
+ self.assertIsNotNone(formatted)
+ self.assertIn("🔍", formatted)
+
+ def test_format_started(self) -> None:
+ formatted = format_event(self.events[1])
+
+ self.assertIsNotNone(formatted)
+ self.assertIn("⚙️", formatted)
+ self.assertIn("Worker: codex", formatted)
+
+ def test_format_unknown_type(self) -> None:
+ self.assertIsNone(format_event({"event_type": "task.unknown", "task_id": "TASK-999"}))
+
+ def test_sanitize_control_chars(self) -> None:
+ event = {
+ "event_type": "task.failed",
+ "task_id": "TASK-200",
+ "message": "bad\x00news\x1b[31m!",
+ }
+
+ formatted = format_event(event)
+
+ self.assertIsNotNone(formatted)
+ self.assertNotIn("\x00", formatted)
+ self.assertNotIn("\x1b", formatted)
+ self.assertIn("bad news !", formatted)
+
+ def test_sanitize_mentions(self) -> None:
+ event = {
+ "event_type": "task.escalated",
+ "task_id": "TASK-201",
+ "message": "@everyone investigate",
+ }
+
+ formatted = format_event(event)
+
+ self.assertIsNotNone(formatted)
+ self.assertIn("@\u200beveryone investigate", formatted)
+
+ def test_format_summary(self) -> None:
+ summary = format_summary([self.events[1], self.events[2], self.events[3]])
+
+ self.assertIn("3 events", summary)
+ self.assertIn("task.completed: 1", summary)
+ self.assertIn("task.failed: 1", summary)
+ self.assertIn("task.started: 1", summary)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_event_watcher.py b/tests/test_event_watcher.py
new file mode 100644
index 0000000..f17f688
--- /dev/null
+++ b/tests/test_event_watcher.py
@@ -0,0 +1,99 @@
+from __future__ import annotations
+
+import json
+import unittest
+
+from tests.conftest import append_ndjson, make_temp_paths, sample_events, write_ndjson
+from event_watcher import EventWatcher
+
+
+class EventWatcherTests(unittest.TestCase):
+ def setUp(self) -> None:
+ self.tempdir, self.paths = make_temp_paths()
+ self.events = sample_events()
+
+ def tearDown(self) -> None:
+ self.tempdir.cleanup()
+
+ def watcher(self) -> EventWatcher:
+ return EventWatcher(self.paths.events_path, self.paths.cursor_path, poll_interval=0.1)
+
+ def test_poll_empty_file(self) -> None:
+ watcher = self.watcher()
+ self.assertEqual(watcher.poll_once(), [])
+ self.assertFalse(self.paths.cursor_path.exists())
+
+ def test_poll_new_events(self) -> None:
+ write_ndjson(self.paths.events_path, self.events[:3])
+
+ polled = self.watcher().poll_once()
+
+ self.assertEqual(polled, self.events[:3])
+
+ def test_cursor_persistence(self) -> None:
+ watcher = self.watcher()
+ write_ndjson(self.paths.events_path, self.events[:3])
+
+ first = watcher.poll_once()
+ second = watcher.poll_once()
+
+ self.assertEqual(first, self.events[:3])
+ self.assertEqual(second, [])
+ cursor = json.loads(self.paths.cursor_path.read_text(encoding="utf-8"))
+ self.assertGreater(cursor["position"], 0)
+
+ def test_cursor_survives_restart(self) -> None:
+ write_ndjson(self.paths.events_path, self.events[:3])
+
+ first_watcher = self.watcher()
+ self.assertEqual(first_watcher.poll_once(), self.events[:3])
+
+ second_watcher = self.watcher()
+ self.assertEqual(second_watcher.poll_once(), [])
+
+ def test_corrupt_line_skipped(self) -> None:
+ self.paths.events_path.parent.mkdir(parents=True, exist_ok=True)
+ with self.paths.events_path.open("w", encoding="utf-8") as handle:
+ handle.write(json.dumps(self.events[0]) + "\n")
+ handle.write("{not-json}\n")
+ handle.write(json.dumps(self.events[1]) + "\n")
+
+ polled = self.watcher().poll_once()
+
+ self.assertEqual(polled, [self.events[0], self.events[1]])
+
+ def test_callback_filtering(self) -> None:
+ write_ndjson(self.paths.events_path, self.events)
+ received: list[dict[str, object]] = []
+ watcher = self.watcher()
+ watcher.on(["task.completed"], received.append)
+
+ watcher.poll_once()
+
+ self.assertEqual(received, [self.events[2]])
+
+ def test_callback_receives_events(self) -> None:
+ write_ndjson(self.paths.events_path, self.events[:2])
+ received: list[dict[str, object]] = []
+ watcher = self.watcher()
+ watcher.on([], received.append)
+
+ polled = watcher.poll_once()
+
+ self.assertEqual(received, self.events[:2])
+ self.assertEqual(polled, self.events[:2])
+
+ def test_file_grows_between_polls(self) -> None:
+ watcher = self.watcher()
+ write_ndjson(self.paths.events_path, self.events[:2])
+
+ first = watcher.poll_once()
+ append_ndjson(self.paths.events_path, self.events[2:5])
+ second = watcher.poll_once()
+
+ self.assertEqual(first, self.events[:2])
+ self.assertEqual(second, self.events[2:5])
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_webhook_adapter.py b/tests/test_webhook_adapter.py
new file mode 100644
index 0000000..738a9de
--- /dev/null
+++ b/tests/test_webhook_adapter.py
@@ -0,0 +1,100 @@
+from __future__ import annotations
+
+import unittest
+import urllib.error
+from unittest.mock import patch
+
+from tests.conftest import sample_config, sample_events
+from webhook_adapter import create_webhook_callback, send_webhook
+
+
+class MockHTTPResponse:
+ def __init__(self, status: int) -> None:
+ self.status = status
+
+ def getcode(self) -> int:
+ return self.status
+
+ def __enter__(self) -> "MockHTTPResponse":
+ return self
+
+ def __exit__(self, exc_type, exc, tb) -> bool:
+ return False
+
+
+class WebhookAdapterTests(unittest.TestCase):
+ def setUp(self) -> None:
+ self.events = sample_events()
+ self.completed_event = self.events[2]
+ self.started_event = self.events[1]
+
+ @patch("webhook_adapter.urllib.request.urlopen", return_value=MockHTTPResponse(200))
+ def test_send_webhook_success(self, mock_urlopen) -> None:
+ config = sample_config()
+
+ result = send_webhook(self.completed_event, config)
+
+ self.assertTrue(result)
+ self.assertEqual(mock_urlopen.call_count, 1)
+ request = mock_urlopen.call_args.args[0]
+ self.assertEqual(request.full_url, "https://hooks.example.com/macp")
+ self.assertEqual(request.get_method(), "POST")
+ self.assertEqual(request.headers["Authorization"], "Bearer token-123")
+
+ @patch("webhook_adapter.urllib.request.urlopen", return_value=MockHTTPResponse(500))
+ def test_send_webhook_failure(self, mock_urlopen) -> None:
+ result = send_webhook(self.completed_event, sample_config(retry_count=0))
+
+ self.assertFalse(result)
+ self.assertEqual(mock_urlopen.call_count, 1)
+
+ @patch("webhook_adapter.urllib.request.urlopen", side_effect=TimeoutError("timed out"))
+ def test_send_webhook_timeout(self, mock_urlopen) -> None:
+ result = send_webhook(self.completed_event, sample_config(retry_count=0))
+
+ self.assertFalse(result)
+ self.assertEqual(mock_urlopen.call_count, 1)
+
+ @patch("webhook_adapter.time.sleep", return_value=None)
+ @patch(
+ "webhook_adapter.urllib.request.urlopen",
+ side_effect=[MockHTTPResponse(500), MockHTTPResponse(200)],
+ )
+ def test_send_webhook_retry(self, mock_urlopen, mock_sleep) -> None:
+ result = send_webhook(self.completed_event, sample_config(retry_count=1))
+
+ self.assertTrue(result)
+ self.assertEqual(mock_urlopen.call_count, 2)
+ mock_sleep.assert_called_once()
+
+ @patch("webhook_adapter.send_webhook", return_value=True)
+ def test_event_filter(self, mock_send_webhook) -> None:
+ callback = create_webhook_callback(sample_config(event_filter=["task.completed"]))
+
+ callback(self.started_event)
+ callback(self.completed_event)
+
+ mock_send_webhook.assert_called_once_with(self.completed_event, sample_config(event_filter=["task.completed"]))
+
+ @patch("webhook_adapter.send_webhook")
+ def test_webhook_disabled(self, mock_send_webhook) -> None:
+ callback = create_webhook_callback(sample_config(enabled=False))
+
+ callback(self.completed_event)
+
+ mock_send_webhook.assert_not_called()
+
+ @patch("webhook_adapter.urllib.request.urlopen")
+ def test_ssrf_blocked(self, mock_urlopen) -> None:
+ for url in ("http://127.0.0.1/webhook", "http://10.1.2.3/webhook"):
+ with self.subTest(url=url):
+ result = send_webhook(
+ self.completed_event,
+ sample_config(url=url, auth_token="", retry_count=0),
+ )
+ self.assertFalse(result)
+ mock_urlopen.assert_not_called()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tools/orchestrator-matrix/README.md b/tools/orchestrator-matrix/README.md
index 0461c12..cf98a1d 100644
--- a/tools/orchestrator-matrix/README.md
+++ b/tools/orchestrator-matrix/README.md
@@ -109,4 +109,10 @@ mosaic macp submit ...
mosaic macp status
mosaic macp drain
mosaic macp history --task-id TASK-001
+mosaic macp watch --once
+mosaic macp watch --webhook
```
+
+The Phase 2A event bridge consumes `.mosaic/orchestrator/events.ndjson` through a polling watcher,
+persists cursor state in `.mosaic/orchestrator/event_cursor.json`, and can fan out events to
+Discord-formatted stdout lines or webhook callbacks.
diff --git a/tools/orchestrator-matrix/events/__init__.py b/tools/orchestrator-matrix/events/__init__.py
new file mode 100644
index 0000000..fcac56e
--- /dev/null
+++ b/tools/orchestrator-matrix/events/__init__.py
@@ -0,0 +1,15 @@
+"""Event bridge helpers for MACP orchestrator events."""
+
+from .discord_formatter import format_event
+from .discord_formatter import format_summary
+from .event_watcher import EventWatcher
+from .webhook_adapter import create_webhook_callback
+from .webhook_adapter import send_webhook
+
+__all__ = [
+ "EventWatcher",
+ "create_webhook_callback",
+ "format_event",
+ "format_summary",
+ "send_webhook",
+]
diff --git a/tools/orchestrator-matrix/events/discord_formatter.py b/tools/orchestrator-matrix/events/discord_formatter.py
new file mode 100644
index 0000000..505b2be
--- /dev/null
+++ b/tools/orchestrator-matrix/events/discord_formatter.py
@@ -0,0 +1,138 @@
+#!/usr/bin/env python3
+"""Format MACP orchestrator events for Discord-friendly text delivery."""
+
+from __future__ import annotations
+
+import re
+from typing import Any
+
+# Strip ANSI escapes before generic control characters so escape fragments do not survive.
+_CTRL_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]|[\x00-\x1f\x7f]")
+# Collapse Discord @-mentions / role pings to prevent deceptive pings
+_MENTION_RE = re.compile(r"@(everyone|here|&?\d+)")
+
+
+def _sanitize(value: str) -> str:
+ """Normalize untrusted text for safe rendering in Discord/terminal output."""
+ value = _CTRL_RE.sub(" ", value)
+ value = _MENTION_RE.sub(lambda match: "@\u200b" + match.group(1), value)
+ return value.strip()
+
+
+def _task_label(event: dict[str, Any]) -> str:
+ task_id = str(event.get("task_id") or "unknown")
+ return f"Task {task_id}"
+
+
+def _title(event: dict[str, Any]) -> str:
+ metadata = event.get("metadata")
+ if isinstance(metadata, dict):
+ for key in ("task_title", "title", "description"):
+ value = _sanitize(str(metadata.get(key) or ""))
+ if value:
+ return value
+ message = _sanitize(str(event.get("message") or ""))
+ return message if message else "No details provided"
+
+
+def _attempt_suffix(event: dict[str, Any]) -> str:
+ metadata = event.get("metadata")
+ if not isinstance(metadata, dict):
+ return ""
+ attempt = metadata.get("attempt")
+ max_attempts = metadata.get("max_attempts")
+ if attempt in (None, "") and max_attempts in (None, ""):
+ return ""
+ if attempt in (None, ""):
+ return f"attempt ?/{max_attempts}"
+ if max_attempts in (None, ""):
+ return f"attempt {attempt}"
+ return f"attempt {attempt}/{max_attempts}"
+
+
+def _duration_suffix(event: dict[str, Any]) -> str:
+ metadata = event.get("metadata")
+ if not isinstance(metadata, dict):
+ return ""
+ duration = metadata.get("duration_seconds")
+ if duration in (None, ""):
+ return ""
+ try:
+ seconds = int(round(float(duration)))
+ except (TypeError, ValueError):
+ return ""
+ return f"{seconds}s"
+
+
+def _runtime_dispatch_suffix(event: dict[str, Any]) -> str:
+ metadata = event.get("metadata")
+ if not isinstance(metadata, dict):
+ return ""
+ parts: list[str] = []
+ runtime = str(metadata.get("runtime") or "").strip()
+ dispatch = str(metadata.get("dispatch") or "").strip()
+ if runtime:
+ parts.append(f"Worker: {runtime}")
+ if dispatch:
+ parts.append(f"dispatch: {dispatch}")
+ return ", ".join(parts)
+
+
+def _meta_clause(*parts: str) -> str:
+ clean = [part for part in parts if part]
+ if not clean:
+ return ""
+ return f" ({', '.join(clean)})"
+
+
+def format_event(event: dict[str, Any]) -> str | None:
+ """Format an MACP event for Discord. Returns None for unformattable events."""
+
+ event_type = str(event.get("event_type") or "").strip()
+ task_label = _task_label(event)
+ title = _title(event)
+ attempt_suffix = _attempt_suffix(event)
+ duration_suffix = _duration_suffix(event)
+ runtime_dispatch = _runtime_dispatch_suffix(event)
+ message = _sanitize(str(event.get("message") or ""))
+
+ if event_type == "task.completed":
+ return f"✅ **{task_label} completed** — {title}{_meta_clause(attempt_suffix, duration_suffix)}"
+ if event_type == "task.failed":
+ detail = message or title
+ return f"❌ **{task_label} failed** — {detail}{_meta_clause(attempt_suffix)}"
+ if event_type == "task.escalated":
+ detail = message or title
+ return f"🚨 **{task_label} escalated** — {detail}{_meta_clause(attempt_suffix)}"
+ if event_type == "task.gated":
+ detail = message or "Quality gates running..."
+ return f"🔍 **{task_label} gated** — {detail}"
+ if event_type == "task.started":
+ detail = runtime_dispatch or message or "Worker execution started"
+ return f"⚙️ **{task_label} started** — {detail}{_meta_clause(attempt_suffix)}"
+ return None
+
+
+def format_summary(events: list[dict[str, Any]]) -> str:
+ """Format a batch summary (e.g., daily digest)."""
+
+ counts: dict[str, int] = {}
+ first_task = ""
+ last_task = ""
+ for event in events:
+ event_type = str(event.get("event_type") or "unknown")
+ counts[event_type] = counts.get(event_type, 0) + 1
+ task_id = str(event.get("task_id") or "").strip()
+ if task_id and not first_task:
+ first_task = task_id
+ if task_id:
+ last_task = task_id
+
+ if not counts:
+ return "No MACP events processed."
+
+ ordered = ", ".join(f"{event_type}: {counts[event_type]}" for event_type in sorted(counts))
+ task_span = ""
+ if first_task and last_task:
+ task_span = f" Tasks {first_task}" if first_task == last_task else f" Tasks {first_task} -> {last_task}"
+ return f"MACP event summary: {len(events)} events ({ordered}).{task_span}"
diff --git a/tools/orchestrator-matrix/events/event_watcher.py b/tools/orchestrator-matrix/events/event_watcher.py
new file mode 100644
index 0000000..ff77bee
--- /dev/null
+++ b/tools/orchestrator-matrix/events/event_watcher.py
@@ -0,0 +1,144 @@
+#!/usr/bin/env python3
+"""Portable file-polling watcher for MACP orchestrator events."""
+
+from __future__ import annotations
+
+import json
+import pathlib
+import sys
+import time
+from collections.abc import Callable
+from typing import Any
+
+
+def _warn(message: str) -> None:
+ print(f"[macp-event-watcher] {message}", file=sys.stderr)
+
+
+def _load_json(path: pathlib.Path, default: Any) -> Any:
+ if not path.exists():
+ return default
+ try:
+ with path.open("r", encoding="utf-8") as handle:
+ return json.load(handle)
+ except (OSError, json.JSONDecodeError) as exc:
+ _warn(f"failed to load cursor {path}: {exc}")
+ return default
+
+
+def _save_json_atomic(path: pathlib.Path, data: Any) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ tmp = path.with_suffix(path.suffix + ".tmp")
+ with tmp.open("w", encoding="utf-8") as handle:
+ json.dump(data, handle, indent=2)
+ handle.write("\n")
+ tmp.replace(path)
+
+
+class EventWatcher:
+ """Poll an events NDJSON file and dispatch matching callbacks."""
+
+ def __init__(self, events_path: pathlib.Path, cursor_path: pathlib.Path, poll_interval: float = 2.0):
+ self.events_path = events_path
+ self.cursor_path = cursor_path
+ self.poll_interval = max(0.1, float(poll_interval))
+ self._callbacks: list[tuple[set[str] | None, Callable[[dict[str, Any]], None]]] = []
+ self._cursor_position = self._load_cursor()
+
+ def on(self, event_types: list[str], callback: Callable[[dict[str, Any]], None]) -> None:
+ """Register a callback for specific event types."""
+
+ normalized = {str(event_type).strip() for event_type in event_types if str(event_type).strip()}
+ self._callbacks.append((normalized or None, callback))
+
+ def poll_once(self) -> list[dict[str, Any]]:
+ """Read new events since last cursor position. Returns list of new events."""
+
+ if not self.events_path.exists():
+ return []
+
+ try:
+ file_size = self.events_path.stat().st_size
+ except OSError as exc:
+ _warn(f"failed to stat events file {self.events_path}: {exc}")
+ return []
+
+ if file_size < self._cursor_position:
+ _warn(
+ f"events file shrank from cursor={self._cursor_position} to size={file_size}; "
+ "resetting cursor to start"
+ )
+ self._cursor_position = 0
+ self._persist_cursor()
+
+ events: list[dict[str, Any]] = []
+ new_position = self._cursor_position
+ try:
+ with self.events_path.open("r", encoding="utf-8") as handle:
+ handle.seek(self._cursor_position)
+ while True:
+ line_start = handle.tell()
+ line = handle.readline()
+ if not line:
+ break
+ line_end = handle.tell()
+ if not line.endswith("\n"):
+ new_position = line_start
+ break
+ stripped = line.strip()
+ if not stripped:
+ new_position = line_end
+ continue
+ try:
+ event = json.loads(stripped)
+ except json.JSONDecodeError as exc:
+ _warn(f"skipping corrupt event at byte {line_start}: {exc}")
+ new_position = line_end
+ continue
+ if not isinstance(event, dict):
+ _warn(f"skipping non-object event at byte {line_start}")
+ new_position = line_end
+ continue
+ events.append(event)
+ self._dispatch(event)
+ new_position = line_end
+ except OSError as exc:
+ _warn(f"failed to read events file {self.events_path}: {exc}")
+ return []
+
+ if new_position != self._cursor_position:
+ self._cursor_position = new_position
+ self._persist_cursor()
+ return events
+
+ def run(self, max_iterations: int = 0) -> None:
+ """Polling loop. max_iterations=0 means infinite."""
+
+ iterations = 0
+ while max_iterations <= 0 or iterations < max_iterations:
+ self.poll_once()
+ iterations += 1
+ if max_iterations > 0 and iterations >= max_iterations:
+ break
+ time.sleep(self.poll_interval)
+
+ def _dispatch(self, event: dict[str, Any]) -> None:
+ event_type = str(event.get("event_type") or "").strip()
+ for filters, callback in self._callbacks:
+ if filters is not None and event_type not in filters:
+ continue
+ try:
+ callback(event)
+ except Exception as exc: # pragma: no cover - defensive boundary
+ _warn(f"callback failure for event {event_type or ''}: {exc}")
+
+ def _load_cursor(self) -> int:
+ payload = _load_json(self.cursor_path, {"position": 0})
+ try:
+ position = int(payload.get("position", 0))
+ except (AttributeError, TypeError, ValueError):
+ position = 0
+ return max(0, position)
+
+ def _persist_cursor(self) -> None:
+ _save_json_atomic(self.cursor_path, {"position": self._cursor_position})
diff --git a/tools/orchestrator-matrix/events/webhook_adapter.py b/tools/orchestrator-matrix/events/webhook_adapter.py
new file mode 100644
index 0000000..7aff73a
--- /dev/null
+++ b/tools/orchestrator-matrix/events/webhook_adapter.py
@@ -0,0 +1,120 @@
+#!/usr/bin/env python3
+"""Webhook delivery helpers for MACP events."""
+
+from __future__ import annotations
+
+import json
+import sys
+import time
+import urllib.error
+import urllib.request
+from collections.abc import Callable
+from typing import Any
+
+
+def _warn(message: str) -> None:
+ print(f"[macp-webhook] {message}", file=sys.stderr)
+
+
+def _webhook_config(config: dict[str, Any]) -> dict[str, Any]:
+ macp = config.get("macp")
+ if isinstance(macp, dict) and isinstance(macp.get("webhook"), dict):
+ return dict(macp["webhook"])
+ return dict(config)
+
+
+def _validate_webhook_url(url: str, auth_token: str) -> str | None:
+ """Validate webhook URL for SSRF and cleartext credential risks.
+
+ Returns an error message if the URL is disallowed, or None if safe.
+ """
+ import ipaddress
+ import urllib.parse as urlparse
+
+ parsed = urlparse.urlparse(url)
+ scheme = parsed.scheme.lower()
+
+ if scheme not in ("http", "https"):
+ return f"unsupported scheme '{scheme}' — must be http or https"
+
+ if auth_token and scheme == "http":
+ host = parsed.hostname or ""
+ # Allow cleartext only for explicit loopback (dev use)
+ if host not in ("localhost", "127.0.0.1", "::1"):
+ return "refusing to send auth_token over non-HTTPS to non-localhost — use https://"
+
+ host = parsed.hostname or ""
+ # Block RFC1918, loopback, and link-local IPs outright.
+ try:
+ ip = ipaddress.ip_address(host)
+ if ip.is_loopback or ip.is_private or ip.is_link_local:
+ return f"refusing to send webhook to private/internal IP {ip}"
+ except ValueError:
+ pass # hostname — DNS resolution not validated here (best-effort)
+
+ return None
+
+
+def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool:
+ """POST event to webhook URL. Returns True on success."""
+
+ webhook = _webhook_config(config)
+ if webhook.get("enabled") is False:
+ return False
+
+ url = str(webhook.get("url") or "").strip()
+ if not url:
+ _warn("missing webhook url")
+ return False
+
+ timeout_seconds = max(1.0, float(webhook.get("timeout_seconds") or 10))
+ retry_count = max(0, int(webhook.get("retry_count") or 0))
+ auth_token = str(webhook.get("auth_token") or "").strip()
+
+ url_err = _validate_webhook_url(url, auth_token)
+ if url_err:
+ _warn(f"webhook URL rejected: {url_err}")
+ return False
+
+ payload = json.dumps(event, ensure_ascii=True).encode("utf-8")
+ headers = {"Content-Type": "application/json"}
+ if auth_token:
+ headers["Authorization"] = f"Bearer {auth_token}"
+
+ attempts = retry_count + 1
+ for attempt in range(1, attempts + 1):
+ request = urllib.request.Request(url, data=payload, headers=headers, method="POST")
+ try:
+ with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
+ status = getattr(response, "status", response.getcode())
+ if 200 <= int(status) < 300:
+ return True
+ _warn(f"webhook returned HTTP {status} on attempt {attempt}/{attempts}")
+ except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, ValueError) as exc:
+ _warn(f"webhook attempt {attempt}/{attempts} failed: {exc}")
+ if attempt < attempts:
+ time.sleep(min(timeout_seconds, 2 ** (attempt - 1)))
+ return False
+
+
+def create_webhook_callback(config: dict[str, Any]) -> Callable[[dict[str, Any]], None]:
+ """Factory that creates a watcher callback from config."""
+
+ webhook = _webhook_config(config)
+ enabled = bool(webhook.get("enabled", False))
+ event_filter = {
+ str(event_type).strip()
+ for event_type in list(webhook.get("event_filter") or [])
+ if str(event_type).strip()
+ }
+
+ def callback(event: dict[str, Any]) -> None:
+ if not enabled:
+ return
+ event_type = str(event.get("event_type") or "").strip()
+ if event_filter and event_type not in event_filter:
+ return
+ if not send_webhook(event, config):
+ _warn(f"delivery failed for event {event.get('event_type', '')}")
+
+ return callback