diff --git a/bin/mosaic-macp b/bin/mosaic-macp index 413ab55..43024fa 100755 --- a/bin/mosaic-macp +++ b/bin/mosaic-macp @@ -16,6 +16,7 @@ Usage: mosaic macp status [--task-id TASK-001] mosaic macp drain mosaic macp history [--task-id TASK-001] + mosaic macp watch [--webhook] [--once] USAGE } @@ -193,6 +194,71 @@ for line in events_path.read_text(encoding="utf-8").splitlines(): PY } +watch_events() { + require_repo + local webhook_enabled="false" + local run_once="false" + while [[ $# -gt 0 ]]; do + case "$1" in + --webhook) webhook_enabled="true"; shift ;; + --once) run_once="true"; shift ;; + -h|--help) usage; exit 0 ;; + *) echo "[mosaic-macp] unknown watch option: $1" >&2; exit 1 ;; + esac + done + + python3 - "$repo_root" "$config_path" "$events_path" "$orch_dir/event_cursor.json" "$webhook_enabled" "$run_once" <<'PY' +import json +import pathlib +import sys + +repo_root = pathlib.Path(sys.argv[1]).resolve() +config_path = pathlib.Path(sys.argv[2]).resolve() +events_path = pathlib.Path(sys.argv[3]).resolve() +cursor_path = pathlib.Path(sys.argv[4]).resolve() +webhook_flag = sys.argv[5].lower() == "true" +run_once = sys.argv[6].lower() == "true" + +events_dir = repo_root / "tools" / "orchestrator-matrix" / "events" +if str(events_dir) not in sys.path: + sys.path.insert(0, str(events_dir)) + +from discord_formatter import format_event +from event_watcher import EventWatcher +from webhook_adapter import create_webhook_callback + +config = {} +if config_path.exists(): + config = json.loads(config_path.read_text(encoding="utf-8")) + +macp = dict(config.get("macp") or {}) +watcher = EventWatcher( + events_path=events_path, + cursor_path=cursor_path, + poll_interval=float(macp.get("watch_poll_interval_seconds") or 2.0), +) + +def print_callback(event: dict) -> None: + rendered = format_event(event) + if rendered: + print(rendered) + +watcher.on([], print_callback) + +webhook_config = dict(macp.get("webhook") or {}) +if webhook_flag and bool(webhook_config.get("enabled", False)): + watcher.on(list(webhook_config.get("event_filter") or []), create_webhook_callback(config)) +elif webhook_flag: + print("[mosaic-macp] webhook requested but disabled in config", file=sys.stderr) + +if run_once: + processed = watcher.poll_once() + print(f"[mosaic-macp] processed_events={len(processed)}") +else: + watcher.run() +PY +} + subcommand="${1:-help}" if [[ $# -gt 0 ]]; then shift @@ -203,6 +269,7 @@ case "$subcommand" in status) status_tasks "$@" ;; drain) drain_tasks "$@" ;; history) history_tasks "$@" ;; + watch) watch_events "$@" ;; help|-h|--help|"") usage ;; *) echo "[mosaic-macp] unknown subcommand: $subcommand" >&2 diff --git a/docs/DEVELOPER-GUIDE/README.md b/docs/DEVELOPER-GUIDE/README.md index 148140e..a66ae35 100644 --- a/docs/DEVELOPER-GUIDE/README.md +++ b/docs/DEVELOPER-GUIDE/README.md @@ -3,3 +3,4 @@ ## Orchestrator Matrix - [MACP Phase 1](./orchestrator-matrix/macp-phase1.md) +- [MACP Phase 2A](./orchestrator-matrix/macp-phase2a.md) diff --git a/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md new file mode 100644 index 0000000..443a549 --- /dev/null +++ b/docs/DEVELOPER-GUIDE/orchestrator-matrix/macp-phase2a.md @@ -0,0 +1,45 @@ +# MACP Phase 2A + +MACP Phase 2A adds the repo-local event bridge that makes orchestrator lifecycle events consumable by external systems. + +## What Changed + +1. `tools/orchestrator-matrix/events/event_watcher.py` polls `.mosaic/orchestrator/events.ndjson`, parses appended NDJSON events, dispatches callbacks, and persists a byte-offset cursor in `.mosaic/orchestrator/event_cursor.json`. +2. `tools/orchestrator-matrix/events/webhook_adapter.py` forwards selected MACP events to a configured webhook endpoint with bounded retries and optional bearer auth. +3. `tools/orchestrator-matrix/events/discord_formatter.py` renders task lifecycle events into concise Discord-friendly status lines. +4. `bin/mosaic-macp` adds `watch` mode for one-shot or continuous event processing. + +## Watcher Behavior + +1. File watching is polling-based and stdlib-only for portability. +2. The watcher resets its cursor if the events file is truncated. +3. Corrupt JSON lines are logged to stderr and skipped. +4. A trailing partial line is left unread until the newline arrives, preventing half-written events from being consumed. + +## Webhook Configuration + +Configure `.mosaic/orchestrator/config.json` under `macp.webhook`: + +```json +{ + "macp": { + "webhook": { + "enabled": false, + "url": "http://localhost:8080/macp/events", + "auth_token": "", + "timeout_seconds": 10, + "retry_count": 2, + "event_filter": ["task.completed", "task.failed", "task.escalated"] + } + } +} +``` + +## CLI + +```bash +mosaic macp watch --once +mosaic macp watch --webhook +``` + +`--once` performs a single poll and exits. `--webhook` enables delivery via the configured `macp.webhook` block while still printing Discord-formatted event lines to stdout. diff --git a/docs/PRD.md b/docs/PRD.md index be490b8..57408ed 100644 --- a/docs/PRD.md +++ b/docs/PRD.md @@ -1,4 +1,4 @@ -# PRD: MACP Phase 1 Core Protocol Implementation +# PRD: MACP Phase 2A Event Bridge + Notification System ## Metadata @@ -9,90 +9,89 @@ ## Problem Statement -The current orchestrator-matrix rail can queue shell-based worker tasks, but it does not yet expose a standardized protocol for dispatch selection, worktree-aware execution, structured results, or manual MACP queue operations. MACP Phase 1 extends the existing rail so orchestrators can delegate to multiple runtimes through a consistent task model while preserving current behavior for legacy tasks. +MACP Phase 1 writes structured lifecycle events to `.mosaic/orchestrator/events.ndjson`, but no repo-local bridge consumes those events for external systems. Phase 2A adds a portable watcher, webhook delivery, and Discord-friendly formatting so MACP event streams can drive OpenClaw integrations and human-facing notifications. ## Objectives -1. Extend the existing orchestrator-matrix protocol and controller to support MACP-aware task dispatch and status tracking. -2. Add a dispatcher layer that manages worktree lifecycle, runtime command generation, and standardized results. -3. Provide a CLI entrypoint for manual MACP submission, status inspection, queue draining, and history review. +1. Add a synchronous event watcher that tails `events.ndjson` using stdlib-only file polling and persists cursor state across restarts. +2. Add a webhook adapter that can forward selected MACP events to a configured HTTP endpoint with bounded retries. +3. Add a Discord formatter that turns task lifecycle events into concise human-readable strings. +4. Extend the `mosaic macp` CLI with a `watch` command for one-shot or continuous event bridge execution. ## Scope ### In Scope -1. Extend the orchestrator task and event schemas and add a result schema. -2. Add a Python dispatcher module under `tools/orchestrator-matrix/dispatcher/`. -3. Update the controller to use the dispatcher for MACP-aware tasks while preserving legacy execution paths. -4. Update orchestrator config templates, task markdown sync logic, and CLI routing/scripts for MACP commands. -5. Add verification for backward compatibility, schema validity, imports, and basic MACP execution flow. +1. New `tools/orchestrator-matrix/events/` package with watcher, webhook adapter, and Discord formatter modules. +2. Cursor persistence at `.mosaic/orchestrator/event_cursor.json`. +3. `mosaic macp watch [--webhook] [--once]` CLI support using `.mosaic/orchestrator/config.json`. +4. Stdlib-only verification of watcher polling, webhook delivery, Discord formatting, CLI watch behavior, and cursor persistence. +5. Developer documentation and sitemap updates covering the Phase 2A event bridge. ### Out of Scope -1. Rewriting the orchestrator controller architecture. -2. Changing Matrix transport behavior beyond schema compatibility. -3. Implementing real OpenClaw `sessions_spawn` execution beyond producing the config payload/command for callers. -4. Adding non-stdlib Python dependencies or npm-based tooling. +1. Adding Discord transport or webhook server hosting inside this repository. +2. Replacing the existing Matrix transport bridge. +3. Introducing async, threads, or third-party Python packages. +4. Changing event emission behavior in the controller beyond consuming the existing event stream. ## User/Stakeholder Requirements -1. MACP must evolve the current orchestrator-matrix implementation rather than replace it. -2. Legacy task queues without `dispatch` fields must continue to run exactly as before. -3. MACP-aware tasks must support dispatch modes `yolo`, `acp`, and `exec`. -4. Results must be written in a structured JSON format suitable for audit and orchestration follow-up. -5. A manual `mosaic macp` CLI must expose submit, status, drain, and history flows. +1. External systems must be able to consume MACP events without reading the NDJSON file directly. +2. The watcher must remain portable across environments, so file polling is required instead of platform-specific file watching. +3. Restarting the watcher must not replay previously consumed events. +4. Webhook delivery failures must be logged and isolated so the watcher loop continues running. +5. Discord formatting must stay concise and useful for task lifecycle visibility. ## Functional Requirements -1. Task schema must include MACP dispatch, worktree, result, retry, branch, brief, issue/PR, and dependency fields. -2. Event schema must recognize `task.gated`, `task.escalated`, and `task.retry.scheduled`, plus a `dispatcher` source. -3. Dispatcher functions must set up worktrees, build commands, execute tasks, collect results, and clean up worktrees. -4. Controller `run_single_task()` must route MACP-aware tasks through the dispatcher and emit the correct lifecycle events/status transitions. -5. `tasks_md_sync.py` must map optional MACP table columns only when those headers are present in `docs/TASKS.md`; absent MACP headers must not inject MACP fields into legacy tasks. -6. `bin/mosaic` must route `mosaic macp ...` to a new `bin/mosaic-macp` script. +1. `EventWatcher` must watch `.mosaic/orchestrator/events.ndjson`, parse appended JSON lines, and invoke registered callbacks for matching event types. +2. `EventWatcher.poll_once()` must tolerate a missing events file, truncated/corrupt lines, and cursor positions that are stale after file truncation. +3. Cursor writes must be atomic and stored at `.mosaic/orchestrator/event_cursor.json`. +4. `send_webhook(event, config)` must POST JSON to the configured URL using `urllib.request`, optionally adding a bearer token, respecting timeout, and retrying with exponential backoff. +5. `create_webhook_callback(config)` must return a callback that swallows/logs failures instead of raising into the watcher loop. +6. `format_event(event)` must support `task.completed`, `task.failed`, `task.escalated`, `task.gated`, and `task.started`, including useful task metadata when present. +7. `format_summary(events)` must produce a short batch summary suitable for notification digests. +8. `bin/mosaic-macp` must expose `watch`, optionally enabling webhook delivery from config, and support one-shot polling with `--once`. ## Non-Functional Requirements -1. Security: no secrets embedded in generated commands, config, or results. -2. Performance: controller remains deterministic and synchronous with no async or thread-based orchestration. -3. Reliability: worktree creation/cleanup failures must be surfaced predictably and produce structured task failure/escalation states. -4. Observability: lifecycle events, logs, and result JSON must clearly show task outcome, attempts, gates, and errors. +1. Security: no secrets embedded in code or logs; auth token only sent via header when configured. +2. Performance: each webhook attempt must be bounded by `timeout_seconds`; no event-processing path may hang indefinitely. +3. Reliability: corrupt input lines and callback delivery failures must be logged to stderr and skipped without crashing the watcher. +4. Portability: Python 3.10+ stdlib only; no OS-specific file watcher APIs. +5. Observability: warnings and failures must be clear enough to diagnose cursor, parsing, and webhook problems. ## Acceptance Criteria -1. Existing legacy tasks without `dispatch` still run through the old shell path with unchanged behavior. -2. MACP-aware `exec` tasks run through the dispatcher and produce result JSON with gate outcomes. -3. New schemas validate task/event/result payload expectations for MACP fields and statuses. -4. `mosaic macp submit`, `status`, and `history` work from a bootstrapped repo state, and `drain` delegates to the existing orchestrator runner. -5. Python imports for the updated controller, dispatcher, and sync code complete without errors on Python 3.10+. +1. `EventWatcher.poll_once()` reads newly appended events, returns parsed dicts, invokes registered callbacks, and skips already-consumed events after restart. +2. Webhook delivery posts matching events to a local test endpoint, supports bearer auth configuration, and retries boundedly on failure. +3. Discord formatter returns expected concise strings for the required task lifecycle event types and a usable batch summary. +4. `mosaic macp watch --once` processes events from a bootstrapped repo state without error and honors `--webhook`. +5. Cursor persistence prevents replay on a second run and resets safely when the events file is truncated. ## Constraints and Dependencies 1. Python implementation must use stdlib only and support Python 3.10+. -2. Shell tooling must remain bash-based and fit the existing Mosaic CLI style. -3. Dispatch fallback rules must use `exec` when `dispatch` is absent and config/default runtime when `runtime` is absent. -4. Worktree convention must derive from the repository name and task metadata unless explicitly overridden by task fields. +2. Shell CLI behavior must remain bash-based and consistent with the existing Mosaic command style. +3. The watcher consumes the event schema already emitted by Phase 1 controller logic. +4. Webhook configuration lives under `.mosaic/orchestrator/config.json` at `macp.webhook`. ## Risks and Open Questions -1. Risk: yolo command execution requires a PTY, so the dispatcher needs a safe wrapper that still behaves under `subprocess`. -2. Risk: worktree cleanup could remove a path unexpectedly if task metadata is malformed. -3. Risk: old queue consumers may assume only the original task statuses and event types. -4. Open Question: whether `task.gated` should be emitted by the dispatcher or controller once worker execution ends and quality gates begin. +1. Risk: partial writes may leave an incomplete trailing JSON line that must not advance the cursor incorrectly. +2. Risk: synchronous webhook retries can slow one poll cycle if the endpoint is unavailable; timeout and retry behavior must remain bounded. +3. Risk: event payloads may omit optional metadata fields, so formatter output must degrade cleanly. +4. ASSUMPTION: the watcher should advance past corrupt lines after logging them so a single bad line does not permanently stall downstream consumption. +5. ASSUMPTION: CLI `watch` should default to no-op callback processing when no delivery option is enabled, while still updating the cursor and reporting processed count. ## Testing and Verification Expectations -1. Baseline checks: Python import validation, targeted script execution checks, JSON syntax/schema validation, and any repo-local validation applicable to changed code paths. -2. Situational testing: legacy orchestrator run with old-style tasks, MACP `exec` flow including result file generation, CLI submit/status/history behavior, and worktree lifecycle validation. -3. Evidence format: command-level results captured in the scratchpad and summarized in the final delivery report. +1. Baseline checks: Python bytecode compilation/import validation for new modules and shell syntax validation for `bin/mosaic-macp`. +2. Situational tests: temporary orchestrator state exercising watcher polling, callback filtering, webhook POST capture, CLI one-shot watch execution, and cursor persistence across repeated runs. +3. Evidence format: command-level results recorded in the scratchpad and summarized against acceptance criteria. ## Milestone / Delivery Intent -1. Target milestone/version: 0.0.x bootstrap enhancement -2. Definition of done: code merged to `main`, CI terminal green, issue `#8` closed, and verification evidence recorded against all acceptance criteria. - -## Assumptions - -1. ASSUMPTION: A single issue can track the full Phase 1 implementation because the user requested one bounded feature delivery rather than separate independent tickets. -2. ASSUMPTION: For `acp` dispatch in Phase 1, the controller must escalate the task immediately with a clear reason instead of pretending work ran before OpenClaw integration exists. -3. ASSUMPTION: `task.gated` should be emitted by the controller as the transition into quality-gate execution, which keeps gate-state ownership in one place alongside the existing gate loop. +1. Target milestone/version: Phase 2A observability bridge +2. Definition of done: code merged to `main`, CI terminal green, issue `#10` closed, and verification evidence recorded against all acceptance criteria. diff --git a/docs/SITEMAP.md b/docs/SITEMAP.md index c4e8b1c..1694313 100644 --- a/docs/SITEMAP.md +++ b/docs/SITEMAP.md @@ -4,4 +4,6 @@ - [Tasks](./TASKS.md) - [Developer Guide](./DEVELOPER-GUIDE/README.md) - [Task Briefs](./tasks/MACP-PHASE1-brief.md) +- [Task Briefs](./tasks/MACP-PHASE2A-brief.md) - [Scratchpads](./scratchpads/macp-phase1.md) +- [Scratchpads](./scratchpads/macp-phase2a.md) diff --git a/docs/TASKS.md b/docs/TASKS.md index 2578549..9c17d20 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -14,4 +14,4 @@ Canonical tracking for active work. Keep this file current. | id | status | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | |---|---|---|---|---|---|---|---|---|---|---|---|---|---| -| MACP-PHASE1 | in-progress | Implement MACP Phase 1 across orchestrator schemas, dispatcher, controller, CLI, config, and task sync while preserving legacy queue behavior. | #8 | bootstrap | feat/macp-phase1 | | | Jarvis | 2026-03-27T23:00:00Z | | medium | in-progress | Review-fix pass started 2026-03-28T00:38:01Z to address backward-compatibility, ACP safety, result timing, and worktree/brief security findings on top of the blocked PR-create state. Prior blocker remains: `~/.config/mosaic/tools/git/pr-create.sh -t 'feat: implement MACP phase 1 core protocol' -b ... -B main -H feat/macp-phase1` failed with `Remote repository required: Specify ID via --repo or execute from a local git repo.` | +| MACP-PHASE2A | in-progress | Build the MACP event bridge with event watcher, webhook adapter, Discord formatter, CLI watch wiring, docs updates, and verification evidence. | #10 | bootstrap | feat/macp-phase2a | | | Jarvis | 2026-03-28T02:02:38Z | | medium | in-progress | Issue created via `~/.config/mosaic/tools/git/issue-create.sh` fallback after `tea` reported `Remote repository required: Specify ID via --repo or execute from a local git repo.` | diff --git a/docs/scratchpads/macp-phase2a.md b/docs/scratchpads/macp-phase2a.md new file mode 100644 index 0000000..8aae4b4 --- /dev/null +++ b/docs/scratchpads/macp-phase2a.md @@ -0,0 +1,61 @@ +# MACP Phase 2A Scratchpad + +## Session Start + +- Session date: 2026-03-27 / 2026-03-28 America/Chicago +- Branch: `feat/macp-phase2a` +- Issue: `#10` +- Objective: build the MACP event bridge and notification system from `docs/tasks/MACP-PHASE2A-brief.md` + +## Budget + +- Budget mode: inferred working budget +- Estimate: medium +- Token strategy: keep context narrow to event-bridge files, verify with targeted temp-repo tests, avoid unnecessary parallel deep dives + +## Requirements Notes + +- PRD updated to Phase 2A before coding +- TDD requirement: not mandatory for this feature work; targeted verification is sufficient because this is new observability functionality rather than a bug fix or auth/data-mutation change +- Documentation gate applies because developer-facing behavior and CLI surface change + +## Assumptions + +1. ASSUMPTION: corrupt or partial lines should be logged and skipped while still advancing the cursor past the offending line, preventing permanent replay loops. +2. ASSUMPTION: `mosaic macp watch` may run without webhook delivery enabled and should still process events plus persist cursor state. +3. ASSUMPTION: Discord formatting remains a pure formatting layer; no outbound Discord transport is part of Phase 2A. + +## Plan + +1. Update PRD/TASKS and create the Phase 2A issue/scratchpad. +2. Implement watcher, webhook adapter, formatter, and CLI wiring. +3. Update developer docs and sitemap. +4. Run baseline and situational verification. +5. Run independent code review, remediate findings, then commit/push/PR/merge/CI/issue-close. + +## Progress Log + +- 2026-03-28T02:02:38Z: Created provider issue `#10` for Phase 2A using Mosaic wrapper with Gitea API fallback. +- 2026-03-28T02:02:38Z: Replaced stale Phase 1 PRD/TASKS planning state with Phase 2A scope and tracking. + +## Verification Plan + +| Acceptance Criterion | Verification Method | Evidence | +|---|---|---| +| AC-1 watcher polls new events and respects cursor | Temp events file + repeated `poll_once()` / CLI runs | pending | +| AC-2 webhook delivery retries and succeeds/fails cleanly | Local stdlib echo server capture | pending | +| AC-3 Discord formatting covers required event types | Targeted Python formatter check | pending | +| AC-4 `mosaic macp watch --once` runs cleanly | CLI one-shot execution in temp repo | pending | +| AC-5 cursor persistence handles repeat run and truncation | Temp repo repeated runs with truncated file scenario | pending | + +## Tests Run + +- pending + +## Review Notes + +- pending + +## Risks / Blockers + +- Potential git wrapper friction in worktrees for PR creation/merge steps; if it recurs, capture exact failing command and stop per Mosaic contract. diff --git a/docs/tasks/MACP-PHASE2A-brief.md b/docs/tasks/MACP-PHASE2A-brief.md new file mode 100644 index 0000000..378b9a1 --- /dev/null +++ b/docs/tasks/MACP-PHASE2A-brief.md @@ -0,0 +1,152 @@ +# MACP Phase 2A — Event Bridge + Notification System + +**Branch:** `feat/macp-phase2a` +**Repo worktree:** `~/src/mosaic-bootstrap-worktrees/macp-phase2a` + +--- + +## Objective + +Build the event bridge that makes MACP events consumable by external systems (OpenClaw, Discord, webhooks). This is the observability layer — the controller already writes events to `events.ndjson`, but nothing reads them yet. + +--- + +## Task 1: Event File Watcher (`tools/orchestrator-matrix/events/event_watcher.py`) + +New Python module that tails `events.ndjson` and fires callbacks on new events. + +### Requirements: +- Watch `.mosaic/orchestrator/events.ndjson` for new lines (use file polling, not inotify — keeps it portable) +- Parse each new line as JSON +- Call registered callback functions with the parsed event +- Support filtering by event type (e.g., only `task.completed` and `task.failed`) +- Maintain a cursor (last read position) so restarts don't replay old events +- Cursor stored in `.mosaic/orchestrator/event_cursor.json` + +### Key Functions: +```python +class EventWatcher: + def __init__(self, events_path: Path, cursor_path: Path, poll_interval: float = 2.0): + ... + + def on(self, event_types: list[str], callback: Callable[[dict], None]) -> None: + """Register a callback for specific event types.""" + + def poll_once(self) -> list[dict]: + """Read new events since last cursor position. Returns list of new events.""" + + def run(self, max_iterations: int = 0) -> None: + """Polling loop. max_iterations=0 means infinite.""" +``` + +### Constraints: +- Python 3.10+ stdlib only (no pip dependencies) +- Must handle truncated/corrupt lines gracefully (skip, log warning) +- File might not exist yet — handle gracefully +- Thread-safe cursor updates (atomic write via temp file rename) + +--- + +## Task 2: Webhook Adapter (`tools/orchestrator-matrix/events/webhook_adapter.py`) + +POST events to a configurable URL. This is how the OC plugin will consume MACP events. + +### Requirements: +- Accept an event dict, POST it as JSON to a configured URL +- Support optional `Authorization` header (bearer token) +- Configurable from `.mosaic/orchestrator/config.json` under `macp.webhook`: + ```json + { + "macp": { + "webhook": { + "enabled": false, + "url": "http://localhost:8080/macp/events", + "auth_token": "", + "timeout_seconds": 10, + "retry_count": 2, + "event_filter": ["task.completed", "task.failed", "task.escalated"] + } + } + } + ``` +- Retry with exponential backoff on failure (configurable count) +- Log failures but don't crash the watcher +- Return success/failure status + +### Key Functions: +```python +def send_webhook(event: dict, config: dict) -> bool: + """POST event to webhook URL. Returns True on success.""" + +def create_webhook_callback(config: dict) -> Callable[[dict], None]: + """Factory that creates a watcher callback from config.""" +``` + +### Constraints: +- Use `urllib.request` only (no `requests` library) +- Must not block the event watcher for more than `timeout_seconds` per event +- Log to stderr on failure + +--- + +## Task 3: Discord Notification Formatter (`tools/orchestrator-matrix/events/discord_formatter.py`) + +Format MACP events into human-readable Discord messages. + +### Requirements: +- Format functions for each event type: + - `task.completed` → "✅ **Task TASK-001 completed** — Implement user auth (attempt 1/1, 45s)" + - `task.failed` → "❌ **Task TASK-001 failed** — Build error: exit code 1 (attempt 2/3)" + - `task.escalated` → "🚨 **Task TASK-001 escalated** — Gate failures after 3 attempts. Human review needed." + - `task.gated` → "🔍 **Task TASK-001 gated** — Quality gates running..." + - `task.started` → "⚙️ **Task TASK-001 started** — Worker: codex, dispatch: yolo" +- Include task metadata: runtime, dispatch type, attempt count, duration (if available) +- Keep messages concise — Discord has character limits +- Return plain strings (the caller decides where to send them) + +### Key Functions: +```python +def format_event(event: dict) -> str | None: + """Format an MACP event for Discord. Returns None for unformattable events.""" + +def format_summary(events: list[dict]) -> str: + """Format a batch summary (e.g., daily digest).""" +``` + +--- + +## Wiring: CLI Integration + +Add to `bin/mosaic-macp`: +```bash +mosaic macp watch [--webhook] [--once] +``` +- `watch`: Start the event watcher with configured callbacks +- `--webhook`: Enable webhook delivery (reads config from `.mosaic/orchestrator/config.json`) +- `--once`: Poll once and exit (useful for cron) + +--- + +## Verification + +1. Create a test `events.ndjson` with sample events, run `event_watcher.poll_once()`, verify all events returned +2. Run watcher with webhook pointing to a local echo server, verify POST payload +3. Format each event type through `discord_formatter`, verify output strings +4. `mosaic macp watch --once` processes events without error +5. Cursor persistence: run twice, second run returns no events + +## File Map +``` +tools/orchestrator-matrix/ +├── events/ +│ ├── __init__.py ← NEW +│ ├── event_watcher.py ← NEW +│ ├── webhook_adapter.py ← NEW +│ └── discord_formatter.py ← NEW +``` + +## Ground Rules +- Python 3.10+ stdlib only +- No async/threads — synchronous polling +- Commit: `feat: add MACP event bridge — watcher, webhook, Discord formatter` +- Push to `feat/macp-phase2a` diff --git a/tools/orchestrator-matrix/README.md b/tools/orchestrator-matrix/README.md index 0461c12..cf98a1d 100644 --- a/tools/orchestrator-matrix/README.md +++ b/tools/orchestrator-matrix/README.md @@ -109,4 +109,10 @@ mosaic macp submit ... mosaic macp status mosaic macp drain mosaic macp history --task-id TASK-001 +mosaic macp watch --once +mosaic macp watch --webhook ``` + +The Phase 2A event bridge consumes `.mosaic/orchestrator/events.ndjson` through a polling watcher, +persists cursor state in `.mosaic/orchestrator/event_cursor.json`, and can fan out events to +Discord-formatted stdout lines or webhook callbacks. diff --git a/tools/orchestrator-matrix/events/__init__.py b/tools/orchestrator-matrix/events/__init__.py new file mode 100644 index 0000000..fcac56e --- /dev/null +++ b/tools/orchestrator-matrix/events/__init__.py @@ -0,0 +1,15 @@ +"""Event bridge helpers for MACP orchestrator events.""" + +from .discord_formatter import format_event +from .discord_formatter import format_summary +from .event_watcher import EventWatcher +from .webhook_adapter import create_webhook_callback +from .webhook_adapter import send_webhook + +__all__ = [ + "EventWatcher", + "create_webhook_callback", + "format_event", + "format_summary", + "send_webhook", +] diff --git a/tools/orchestrator-matrix/events/__pycache__/__init__.cpython-312.pyc b/tools/orchestrator-matrix/events/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..58ea1d4 Binary files /dev/null and b/tools/orchestrator-matrix/events/__pycache__/__init__.cpython-312.pyc differ diff --git a/tools/orchestrator-matrix/events/__pycache__/discord_formatter.cpython-312.pyc b/tools/orchestrator-matrix/events/__pycache__/discord_formatter.cpython-312.pyc new file mode 100644 index 0000000..dde00eb Binary files /dev/null and b/tools/orchestrator-matrix/events/__pycache__/discord_formatter.cpython-312.pyc differ diff --git a/tools/orchestrator-matrix/events/__pycache__/event_watcher.cpython-312.pyc b/tools/orchestrator-matrix/events/__pycache__/event_watcher.cpython-312.pyc new file mode 100644 index 0000000..41e5be5 Binary files /dev/null and b/tools/orchestrator-matrix/events/__pycache__/event_watcher.cpython-312.pyc differ diff --git a/tools/orchestrator-matrix/events/__pycache__/webhook_adapter.cpython-312.pyc b/tools/orchestrator-matrix/events/__pycache__/webhook_adapter.cpython-312.pyc new file mode 100644 index 0000000..8fb4516 Binary files /dev/null and b/tools/orchestrator-matrix/events/__pycache__/webhook_adapter.cpython-312.pyc differ diff --git a/tools/orchestrator-matrix/events/discord_formatter.py b/tools/orchestrator-matrix/events/discord_formatter.py new file mode 100644 index 0000000..f07fa8c --- /dev/null +++ b/tools/orchestrator-matrix/events/discord_formatter.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +"""Format MACP orchestrator events for Discord-friendly text delivery.""" + +from __future__ import annotations + +from typing import Any + + +def _task_label(event: dict[str, Any]) -> str: + task_id = str(event.get("task_id") or "unknown") + return f"Task {task_id}" + + +def _title(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if isinstance(metadata, dict): + for key in ("task_title", "title", "description"): + value = str(metadata.get(key) or "").strip() + if value: + return value + message = str(event.get("message") or "").strip() + return message if message else "No details provided" + + +def _attempt_suffix(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if not isinstance(metadata, dict): + return "" + attempt = metadata.get("attempt") + max_attempts = metadata.get("max_attempts") + if attempt in (None, "") and max_attempts in (None, ""): + return "" + if attempt in (None, ""): + return f"attempt ?/{max_attempts}" + if max_attempts in (None, ""): + return f"attempt {attempt}" + return f"attempt {attempt}/{max_attempts}" + + +def _duration_suffix(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if not isinstance(metadata, dict): + return "" + duration = metadata.get("duration_seconds") + if duration in (None, ""): + return "" + try: + seconds = int(round(float(duration))) + except (TypeError, ValueError): + return "" + return f"{seconds}s" + + +def _runtime_dispatch_suffix(event: dict[str, Any]) -> str: + metadata = event.get("metadata") + if not isinstance(metadata, dict): + return "" + parts: list[str] = [] + runtime = str(metadata.get("runtime") or "").strip() + dispatch = str(metadata.get("dispatch") or "").strip() + if runtime: + parts.append(f"Worker: {runtime}") + if dispatch: + parts.append(f"dispatch: {dispatch}") + return ", ".join(parts) + + +def _meta_clause(*parts: str) -> str: + clean = [part for part in parts if part] + if not clean: + return "" + return f" ({', '.join(clean)})" + + +def format_event(event: dict[str, Any]) -> str | None: + """Format an MACP event for Discord. Returns None for unformattable events.""" + + event_type = str(event.get("event_type") or "").strip() + task_label = _task_label(event) + title = _title(event) + attempt_suffix = _attempt_suffix(event) + duration_suffix = _duration_suffix(event) + runtime_dispatch = _runtime_dispatch_suffix(event) + message = str(event.get("message") or "").strip() + + if event_type == "task.completed": + return f"✅ **{task_label} completed** — {title}{_meta_clause(attempt_suffix, duration_suffix)}" + if event_type == "task.failed": + detail = message or title + return f"❌ **{task_label} failed** — {detail}{_meta_clause(attempt_suffix)}" + if event_type == "task.escalated": + detail = message or title + return f"🚨 **{task_label} escalated** — {detail}{_meta_clause(attempt_suffix)}" + if event_type == "task.gated": + detail = message or "Quality gates running..." + return f"🔍 **{task_label} gated** — {detail}" + if event_type == "task.started": + detail = runtime_dispatch or message or "Worker execution started" + return f"⚙️ **{task_label} started** — {detail}{_meta_clause(attempt_suffix)}" + return None + + +def format_summary(events: list[dict[str, Any]]) -> str: + """Format a batch summary (e.g., daily digest).""" + + counts: dict[str, int] = {} + first_task = "" + last_task = "" + for event in events: + event_type = str(event.get("event_type") or "unknown") + counts[event_type] = counts.get(event_type, 0) + 1 + task_id = str(event.get("task_id") or "").strip() + if task_id and not first_task: + first_task = task_id + if task_id: + last_task = task_id + + if not counts: + return "No MACP events processed." + + ordered = ", ".join(f"{event_type}: {counts[event_type]}" for event_type in sorted(counts)) + task_span = "" + if first_task and last_task: + task_span = f" Tasks {first_task}" if first_task == last_task else f" Tasks {first_task} -> {last_task}" + return f"MACP event summary: {len(events)} events ({ordered}).{task_span}" diff --git a/tools/orchestrator-matrix/events/event_watcher.py b/tools/orchestrator-matrix/events/event_watcher.py new file mode 100644 index 0000000..ff77bee --- /dev/null +++ b/tools/orchestrator-matrix/events/event_watcher.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +"""Portable file-polling watcher for MACP orchestrator events.""" + +from __future__ import annotations + +import json +import pathlib +import sys +import time +from collections.abc import Callable +from typing import Any + + +def _warn(message: str) -> None: + print(f"[macp-event-watcher] {message}", file=sys.stderr) + + +def _load_json(path: pathlib.Path, default: Any) -> Any: + if not path.exists(): + return default + try: + with path.open("r", encoding="utf-8") as handle: + return json.load(handle) + except (OSError, json.JSONDecodeError) as exc: + _warn(f"failed to load cursor {path}: {exc}") + return default + + +def _save_json_atomic(path: pathlib.Path, data: Any) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + with tmp.open("w", encoding="utf-8") as handle: + json.dump(data, handle, indent=2) + handle.write("\n") + tmp.replace(path) + + +class EventWatcher: + """Poll an events NDJSON file and dispatch matching callbacks.""" + + def __init__(self, events_path: pathlib.Path, cursor_path: pathlib.Path, poll_interval: float = 2.0): + self.events_path = events_path + self.cursor_path = cursor_path + self.poll_interval = max(0.1, float(poll_interval)) + self._callbacks: list[tuple[set[str] | None, Callable[[dict[str, Any]], None]]] = [] + self._cursor_position = self._load_cursor() + + def on(self, event_types: list[str], callback: Callable[[dict[str, Any]], None]) -> None: + """Register a callback for specific event types.""" + + normalized = {str(event_type).strip() for event_type in event_types if str(event_type).strip()} + self._callbacks.append((normalized or None, callback)) + + def poll_once(self) -> list[dict[str, Any]]: + """Read new events since last cursor position. Returns list of new events.""" + + if not self.events_path.exists(): + return [] + + try: + file_size = self.events_path.stat().st_size + except OSError as exc: + _warn(f"failed to stat events file {self.events_path}: {exc}") + return [] + + if file_size < self._cursor_position: + _warn( + f"events file shrank from cursor={self._cursor_position} to size={file_size}; " + "resetting cursor to start" + ) + self._cursor_position = 0 + self._persist_cursor() + + events: list[dict[str, Any]] = [] + new_position = self._cursor_position + try: + with self.events_path.open("r", encoding="utf-8") as handle: + handle.seek(self._cursor_position) + while True: + line_start = handle.tell() + line = handle.readline() + if not line: + break + line_end = handle.tell() + if not line.endswith("\n"): + new_position = line_start + break + stripped = line.strip() + if not stripped: + new_position = line_end + continue + try: + event = json.loads(stripped) + except json.JSONDecodeError as exc: + _warn(f"skipping corrupt event at byte {line_start}: {exc}") + new_position = line_end + continue + if not isinstance(event, dict): + _warn(f"skipping non-object event at byte {line_start}") + new_position = line_end + continue + events.append(event) + self._dispatch(event) + new_position = line_end + except OSError as exc: + _warn(f"failed to read events file {self.events_path}: {exc}") + return [] + + if new_position != self._cursor_position: + self._cursor_position = new_position + self._persist_cursor() + return events + + def run(self, max_iterations: int = 0) -> None: + """Polling loop. max_iterations=0 means infinite.""" + + iterations = 0 + while max_iterations <= 0 or iterations < max_iterations: + self.poll_once() + iterations += 1 + if max_iterations > 0 and iterations >= max_iterations: + break + time.sleep(self.poll_interval) + + def _dispatch(self, event: dict[str, Any]) -> None: + event_type = str(event.get("event_type") or "").strip() + for filters, callback in self._callbacks: + if filters is not None and event_type not in filters: + continue + try: + callback(event) + except Exception as exc: # pragma: no cover - defensive boundary + _warn(f"callback failure for event {event_type or ''}: {exc}") + + def _load_cursor(self) -> int: + payload = _load_json(self.cursor_path, {"position": 0}) + try: + position = int(payload.get("position", 0)) + except (AttributeError, TypeError, ValueError): + position = 0 + return max(0, position) + + def _persist_cursor(self) -> None: + _save_json_atomic(self.cursor_path, {"position": self._cursor_position}) diff --git a/tools/orchestrator-matrix/events/webhook_adapter.py b/tools/orchestrator-matrix/events/webhook_adapter.py new file mode 100644 index 0000000..b3a042b --- /dev/null +++ b/tools/orchestrator-matrix/events/webhook_adapter.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +"""Webhook delivery helpers for MACP events.""" + +from __future__ import annotations + +import json +import sys +import time +import urllib.error +import urllib.request +from collections.abc import Callable +from typing import Any + + +def _warn(message: str) -> None: + print(f"[macp-webhook] {message}", file=sys.stderr) + + +def _webhook_config(config: dict[str, Any]) -> dict[str, Any]: + macp = config.get("macp") + if isinstance(macp, dict) and isinstance(macp.get("webhook"), dict): + return dict(macp["webhook"]) + return dict(config) + + +def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool: + """POST event to webhook URL. Returns True on success.""" + + webhook = _webhook_config(config) + url = str(webhook.get("url") or "").strip() + if not url: + _warn("missing webhook url") + return False + + timeout_seconds = max(1.0, float(webhook.get("timeout_seconds") or 10)) + retry_count = max(0, int(webhook.get("retry_count") or 0)) + auth_token = str(webhook.get("auth_token") or "").strip() + payload = json.dumps(event, ensure_ascii=True).encode("utf-8") + headers = {"Content-Type": "application/json"} + if auth_token: + headers["Authorization"] = f"Bearer {auth_token}" + + attempts = retry_count + 1 + for attempt in range(1, attempts + 1): + request = urllib.request.Request(url, data=payload, headers=headers, method="POST") + try: + with urllib.request.urlopen(request, timeout=timeout_seconds) as response: + status = getattr(response, "status", response.getcode()) + if 200 <= int(status) < 300: + return True + _warn(f"webhook returned HTTP {status} on attempt {attempt}/{attempts}") + except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, ValueError) as exc: + _warn(f"webhook attempt {attempt}/{attempts} failed: {exc}") + if attempt < attempts: + time.sleep(min(timeout_seconds, 2 ** (attempt - 1))) + return False + + +def create_webhook_callback(config: dict[str, Any]) -> Callable[[dict[str, Any]], None]: + """Factory that creates a watcher callback from config.""" + + def callback(event: dict[str, Any]) -> None: + if not send_webhook(event, config): + _warn(f"delivery failed for event {event.get('event_type', '')}") + + return callback