This repository has been archived on 2026-03-28. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
bootstrap/docs/tasks/MACP-PHASE2A-brief.md

5.4 KiB

MACP Phase 2A — Event Bridge + Notification System

Branch: feat/macp-phase2a Repo worktree: ~/src/mosaic-bootstrap-worktrees/macp-phase2a


Objective

Build the event bridge that makes MACP events consumable by external systems (OpenClaw, Discord, webhooks). This is the observability layer — the controller already writes events to events.ndjson, but nothing reads them yet.


Task 1: Event File Watcher (tools/orchestrator-matrix/events/event_watcher.py)

New Python module that tails events.ndjson and fires callbacks on new events.

Requirements:

  • Watch .mosaic/orchestrator/events.ndjson for new lines (use file polling, not inotify — keeps it portable)
  • Parse each new line as JSON
  • Call registered callback functions with the parsed event
  • Support filtering by event type (e.g., only task.completed and task.failed)
  • Maintain a cursor (last read position) so restarts don't replay old events
  • Cursor stored in .mosaic/orchestrator/event_cursor.json

Key Functions:

class EventWatcher:
    def __init__(self, events_path: Path, cursor_path: Path, poll_interval: float = 2.0):
        ...
    
    def on(self, event_types: list[str], callback: Callable[[dict], None]) -> None:
        """Register a callback for specific event types."""
    
    def poll_once(self) -> list[dict]:
        """Read new events since last cursor position. Returns list of new events."""
    
    def run(self, max_iterations: int = 0) -> None:
        """Polling loop. max_iterations=0 means infinite."""

Constraints:

  • Python 3.10+ stdlib only (no pip dependencies)
  • Must handle truncated/corrupt lines gracefully (skip, log warning)
  • File might not exist yet — handle gracefully
  • Thread-safe cursor updates (atomic write via temp file rename)

Task 2: Webhook Adapter (tools/orchestrator-matrix/events/webhook_adapter.py)

POST events to a configurable URL. This is how the OC plugin will consume MACP events.

Requirements:

  • Accept an event dict, POST it as JSON to a configured URL
  • Support optional Authorization header (bearer token)
  • Configurable from .mosaic/orchestrator/config.json under macp.webhook:
    {
      "macp": {
        "webhook": {
          "enabled": false,
          "url": "http://localhost:8080/macp/events",
          "auth_token": "",
          "timeout_seconds": 10,
          "retry_count": 2,
          "event_filter": ["task.completed", "task.failed", "task.escalated"]
        }
      }
    }
    
  • Retry with exponential backoff on failure (configurable count)
  • Log failures but don't crash the watcher
  • Return success/failure status

Key Functions:

def send_webhook(event: dict, config: dict) -> bool:
    """POST event to webhook URL. Returns True on success."""

def create_webhook_callback(config: dict) -> Callable[[dict], None]:
    """Factory that creates a watcher callback from config."""

Constraints:

  • Use urllib.request only (no requests library)
  • Must not block the event watcher for more than timeout_seconds per event
  • Log to stderr on failure

Task 3: Discord Notification Formatter (tools/orchestrator-matrix/events/discord_formatter.py)

Format MACP events into human-readable Discord messages.

Requirements:

  • Format functions for each event type:
    • task.completed → " Task TASK-001 completed — Implement user auth (attempt 1/1, 45s)"
    • task.failed → " Task TASK-001 failed — Build error: exit code 1 (attempt 2/3)"
    • task.escalated → "🚨 Task TASK-001 escalated — Gate failures after 3 attempts. Human review needed."
    • task.gated → "🔍 Task TASK-001 gated — Quality gates running..."
    • task.started → "⚙️ Task TASK-001 started — Worker: codex, dispatch: yolo"
  • Include task metadata: runtime, dispatch type, attempt count, duration (if available)
  • Keep messages concise — Discord has character limits
  • Return plain strings (the caller decides where to send them)

Key Functions:

def format_event(event: dict) -> str | None:
    """Format an MACP event for Discord. Returns None for unformattable events."""

def format_summary(events: list[dict]) -> str:
    """Format a batch summary (e.g., daily digest)."""

Wiring: CLI Integration

Add to bin/mosaic-macp:

mosaic macp watch [--webhook] [--once]
  • watch: Start the event watcher with configured callbacks
  • --webhook: Enable webhook delivery (reads config from .mosaic/orchestrator/config.json)
  • --once: Poll once and exit (useful for cron)

Verification

  1. Create a test events.ndjson with sample events, run event_watcher.poll_once(), verify all events returned
  2. Run watcher with webhook pointing to a local echo server, verify POST payload
  3. Format each event type through discord_formatter, verify output strings
  4. mosaic macp watch --once processes events without error
  5. Cursor persistence: run twice, second run returns no events

File Map

tools/orchestrator-matrix/
├── events/
│   ├── __init__.py               ← NEW
│   ├── event_watcher.py          ← NEW
│   ├── webhook_adapter.py        ← NEW
│   └── discord_formatter.py      ← NEW

Ground Rules

  • Python 3.10+ stdlib only
  • No async/threads — synchronous polling
  • Commit: feat: add MACP event bridge — watcher, webhook, Discord formatter
  • Push to feat/macp-phase2a