diff --git a/docs/PRD.md b/docs/PRD.md index 57408ed..fbdcc4e 100644 --- a/docs/PRD.md +++ b/docs/PRD.md @@ -27,6 +27,7 @@ MACP Phase 1 writes structured lifecycle events to `.mosaic/orchestrator/events. 3. `mosaic macp watch [--webhook] [--once]` CLI support using `.mosaic/orchestrator/config.json`. 4. Stdlib-only verification of watcher polling, webhook delivery, Discord formatting, CLI watch behavior, and cursor persistence. 5. Developer documentation and sitemap updates covering the Phase 2A event bridge. +6. A repo-local unittest suite under `tests/` that covers watcher polling/cursor behavior, webhook delivery logic, and Discord formatting. ### Out of Scope @@ -69,6 +70,7 @@ MACP Phase 1 writes structured lifecycle events to `.mosaic/orchestrator/events. 3. Discord formatter returns expected concise strings for the required task lifecycle event types and a usable batch summary. 4. `mosaic macp watch --once` processes events from a bootstrapped repo state without error and honors `--webhook`. 5. Cursor persistence prevents replay on a second run and resets safely when the events file is truncated. +6. `python3 -m unittest discover -s tests -p 'test_*.py' -v` passes with stdlib-only tests for the Phase 2A event bridge modules. ## Constraints and Dependencies @@ -88,7 +90,7 @@ MACP Phase 1 writes structured lifecycle events to `.mosaic/orchestrator/events. ## Testing and Verification Expectations 1. Baseline checks: Python bytecode compilation/import validation for new modules and shell syntax validation for `bin/mosaic-macp`. -2. Situational tests: temporary orchestrator state exercising watcher polling, callback filtering, webhook POST capture, CLI one-shot watch execution, and cursor persistence across repeated runs. +2. Situational tests: temporary orchestrator state exercising watcher polling, callback filtering, webhook POST capture/mocking, formatter sanitization, CLI one-shot watch execution, and cursor persistence across repeated runs. 3. Evidence format: command-level results recorded in the scratchpad and summarized against acceptance criteria. ## Milestone / Delivery Intent diff --git a/docs/TASKS.md b/docs/TASKS.md index 9c17d20..f9190bd 100644 --- a/docs/TASKS.md +++ b/docs/TASKS.md @@ -15,3 +15,4 @@ Canonical tracking for active work. Keep this file current. | id | status | description | issue | repo | branch | depends_on | blocks | agent | started_at | completed_at | estimate | used | notes | |---|---|---|---|---|---|---|---|---|---|---|---|---|---| | MACP-PHASE2A | in-progress | Build the MACP event bridge with event watcher, webhook adapter, Discord formatter, CLI watch wiring, docs updates, and verification evidence. | #10 | bootstrap | feat/macp-phase2a | | | Jarvis | 2026-03-28T02:02:38Z | | medium | in-progress | Issue created via `~/.config/mosaic/tools/git/issue-create.sh` fallback after `tea` reported `Remote repository required: Specify ID via --repo or execute from a local git repo.` | +| MACP-PHASE2A-TESTS | in-progress | Add comprehensive stdlib unittest coverage for the Phase 2A event bridge modules and runner scaffolding. | #10 | bootstrap | feat/macp-phase2a | MACP-PHASE2A | | Jarvis | 2026-03-28T02:17:40Z | | small | in-progress | User-requested follow-on task from `docs/tasks/MACP-PHASE2A-tests.md`; verification target is `python3 -m unittest discover -s tests -p 'test_*.py' -v`. | diff --git a/docs/scratchpads/macp-phase2a.md b/docs/scratchpads/macp-phase2a.md index 8aae4b4..7193009 100644 --- a/docs/scratchpads/macp-phase2a.md +++ b/docs/scratchpads/macp-phase2a.md @@ -37,6 +37,11 @@ - 2026-03-28T02:02:38Z: Created provider issue `#10` for Phase 2A using Mosaic wrapper with Gitea API fallback. - 2026-03-28T02:02:38Z: Replaced stale Phase 1 PRD/TASKS planning state with Phase 2A scope and tracking. +- 2026-03-28T02:17:40Z: Resumed Phase 2A for the test-suite follow-on task; loaded Mosaic intake, runtime, resume protocol, shared memory, and issue state before implementation. +- 2026-03-28T02:17:40Z: Updated PRD/TASKS to include the stdlib unittest coverage requirement and the `MACP-PHASE2A-TESTS` tracking row. +- 2026-03-28T02:23:08Z: Added repo-local unittest coverage for watcher, webhook adapter, and Discord formatter plus `tests/run_tests.sh`. +- 2026-03-28T02:23:08Z: Test-driven remediation exposed and fixed two formatter sanitization bugs (`re.sub` replacement escaping and ANSI escape stripping order). +- 2026-03-28T02:23:08Z: Tightened webhook callback config semantics so `enabled` and `event_filter` are enforced directly by `create_webhook_callback`; tightened literal-IP SSRF blocking to match requested tests. ## Verification Plan @@ -47,15 +52,23 @@ | AC-3 Discord formatting covers required event types | Targeted Python formatter check | pending | | AC-4 `mosaic macp watch --once` runs cleanly | CLI one-shot execution in temp repo | pending | | AC-5 cursor persistence handles repeat run and truncation | Temp repo repeated runs with truncated file scenario | pending | +| AC-6 unittest suite passes for Phase 2A modules | `python3 -m unittest discover -s tests -p 'test_*.py' -v` | pass | ## Tests Run -- pending +- `bash -n tests/run_tests.sh` — pass +- `python3 -m py_compile tests/__init__.py tests/conftest.py tests/test_event_watcher.py tests/test_webhook_adapter.py tests/test_discord_formatter.py tools/orchestrator-matrix/events/webhook_adapter.py tools/orchestrator-matrix/events/discord_formatter.py` — pass +- `./tests/run_tests.sh` — pass (24 tests) +- `python3 -m unittest discover -s tests -p 'test_*.py' -v` — pass (24 tests) +- `python3 -m pytest tests/` — environment limitation: `pytest` module is not installed in this worktree runtime, so compatibility was inferred from stdlib-only `unittest` test structure rather than executed here ## Review Notes -- pending +- Manual review of the final delta found no remaining correctness issues after the formatter sanitization fixes and webhook config enforcement updates. +- `~/.config/mosaic/tools/codex/codex-security-review.sh --uncommitted` — no findings, risk level `none` +- `~/.config/mosaic/tools/codex/codex-code-review.sh --uncommitted` did not return a terminal summary in this runtime; relied on manual review plus passing tests for the final gate in this session. ## Risks / Blockers - Potential git wrapper friction in worktrees for PR creation/merge steps; if it recurs, capture exact failing command and stop per Mosaic contract. +- `pytest` is not installed in the current runtime, so the suite’s pytest compatibility was not executed end-to-end here. diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..b58d645 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +import pathlib +import sys + + +REPO_ROOT = pathlib.Path(__file__).resolve().parents[1] +EVENTS_DIR = REPO_ROOT / "tools" / "orchestrator-matrix" / "events" +if str(EVENTS_DIR) not in sys.path: + sys.path.insert(0, str(EVENTS_DIR)) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..055f1df --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +import json +import pathlib +import sys +import tempfile +from dataclasses import dataclass +from typing import Any + + +REPO_ROOT = pathlib.Path(__file__).resolve().parents[1] +EVENTS_DIR = REPO_ROOT / "tools" / "orchestrator-matrix" / "events" +if str(EVENTS_DIR) not in sys.path: + sys.path.insert(0, str(EVENTS_DIR)) + + +@dataclass +class TempPaths: + root: pathlib.Path + events_path: pathlib.Path + cursor_path: pathlib.Path + config_path: pathlib.Path + + +def make_temp_paths() -> tuple[tempfile.TemporaryDirectory[str], TempPaths]: + tempdir = tempfile.TemporaryDirectory() + root = pathlib.Path(tempdir.name) + orch_dir = root / ".mosaic" / "orchestrator" + return tempdir, TempPaths( + root=root, + events_path=orch_dir / "events.ndjson", + cursor_path=orch_dir / "event_cursor.json", + config_path=orch_dir / "config.json", + ) + + +def sample_events() -> list[dict[str, Any]]: + return [ + { + "event_type": "task.assigned", + "task_id": "TASK-001", + "message": "Assigned to codex", + "metadata": {"runtime": "codex", "dispatch": "exec"}, + }, + { + "event_type": "task.started", + "task_id": "TASK-001", + "message": "Worker execution started", + "metadata": {"runtime": "codex", "dispatch": "exec", "attempt": 1, "max_attempts": 3}, + }, + { + "event_type": "task.completed", + "task_id": "TASK-001", + "message": "Completed successfully", + "metadata": {"task_title": "Finish test suite", "duration_seconds": 12.4}, + }, + { + "event_type": "task.failed", + "task_id": "TASK-002", + "message": "Exploded with stack trace", + "metadata": {"attempt": 2, "max_attempts": 3}, + }, + { + "event_type": "task.escalated", + "task_id": "TASK-003", + "message": "Human review required", + "metadata": {"attempt": 1}, + }, + { + "event_type": "task.gated", + "task_id": "TASK-004", + "message": "Waiting for quality gates", + "metadata": {}, + }, + { + "event_type": "task.retry.scheduled", + "task_id": "TASK-002", + "message": "Retry scheduled", + "metadata": {"attempt": 3, "max_attempts": 3}, + }, + ] + + +def sample_config( + *, + enabled: bool = True, + event_filter: list[str] | None = None, + url: str = "https://hooks.example.com/macp", + retry_count: int = 1, + timeout_seconds: float = 2.0, + auth_token: str = "token-123", +) -> dict[str, Any]: + return { + "macp": { + "watch_poll_interval_seconds": 0.1, + "webhook": { + "enabled": enabled, + "url": url, + "event_filter": list(event_filter or []), + "retry_count": retry_count, + "timeout_seconds": timeout_seconds, + "auth_token": auth_token, + }, + } + } + + +def write_ndjson(path: pathlib.Path, events: list[dict[str, Any]], extra_lines: list[str] | None = None) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + lines = [json.dumps(event) for event in events] + if extra_lines: + lines.extend(extra_lines) + path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + +def append_ndjson(path: pathlib.Path, events: list[dict[str, Any]]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("a", encoding="utf-8") as handle: + for event in events: + handle.write(json.dumps(event)) + handle.write("\n") diff --git a/tests/run_tests.sh b/tests/run_tests.sh new file mode 100755 index 0000000..81ccf60 --- /dev/null +++ b/tests/run_tests.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +set -euo pipefail + +python3 -m unittest discover -s tests -p 'test_*.py' -v diff --git a/tests/test_discord_formatter.py b/tests/test_discord_formatter.py new file mode 100644 index 0000000..19ac202 --- /dev/null +++ b/tests/test_discord_formatter.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +import unittest + +from tests.conftest import sample_events +from discord_formatter import format_event, format_summary + + +class DiscordFormatterTests(unittest.TestCase): + def setUp(self) -> None: + self.events = sample_events() + + def test_format_completed(self) -> None: + formatted = format_event(self.events[2]) + + self.assertIsNotNone(formatted) + self.assertIn("✅", formatted) + self.assertIn("TASK-001", formatted) + + def test_format_failed(self) -> None: + formatted = format_event(self.events[3]) + + self.assertIsNotNone(formatted) + self.assertIn("❌", formatted) + self.assertIn("Exploded with stack trace", formatted) + + def test_format_escalated(self) -> None: + formatted = format_event(self.events[4]) + + self.assertIsNotNone(formatted) + self.assertIn("🚨", formatted) + self.assertIn("Human review required", formatted) + + def test_format_gated(self) -> None: + formatted = format_event(self.events[5]) + + self.assertIsNotNone(formatted) + self.assertIn("🔍", formatted) + + def test_format_started(self) -> None: + formatted = format_event(self.events[1]) + + self.assertIsNotNone(formatted) + self.assertIn("⚙️", formatted) + self.assertIn("Worker: codex", formatted) + + def test_format_unknown_type(self) -> None: + self.assertIsNone(format_event({"event_type": "task.unknown", "task_id": "TASK-999"})) + + def test_sanitize_control_chars(self) -> None: + event = { + "event_type": "task.failed", + "task_id": "TASK-200", + "message": "bad\x00news\x1b[31m!", + } + + formatted = format_event(event) + + self.assertIsNotNone(formatted) + self.assertNotIn("\x00", formatted) + self.assertNotIn("\x1b", formatted) + self.assertIn("bad news !", formatted) + + def test_sanitize_mentions(self) -> None: + event = { + "event_type": "task.escalated", + "task_id": "TASK-201", + "message": "@everyone investigate", + } + + formatted = format_event(event) + + self.assertIsNotNone(formatted) + self.assertIn("@\u200beveryone investigate", formatted) + + def test_format_summary(self) -> None: + summary = format_summary([self.events[1], self.events[2], self.events[3]]) + + self.assertIn("3 events", summary) + self.assertIn("task.completed: 1", summary) + self.assertIn("task.failed: 1", summary) + self.assertIn("task.started: 1", summary) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_event_watcher.py b/tests/test_event_watcher.py new file mode 100644 index 0000000..f17f688 --- /dev/null +++ b/tests/test_event_watcher.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import json +import unittest + +from tests.conftest import append_ndjson, make_temp_paths, sample_events, write_ndjson +from event_watcher import EventWatcher + + +class EventWatcherTests(unittest.TestCase): + def setUp(self) -> None: + self.tempdir, self.paths = make_temp_paths() + self.events = sample_events() + + def tearDown(self) -> None: + self.tempdir.cleanup() + + def watcher(self) -> EventWatcher: + return EventWatcher(self.paths.events_path, self.paths.cursor_path, poll_interval=0.1) + + def test_poll_empty_file(self) -> None: + watcher = self.watcher() + self.assertEqual(watcher.poll_once(), []) + self.assertFalse(self.paths.cursor_path.exists()) + + def test_poll_new_events(self) -> None: + write_ndjson(self.paths.events_path, self.events[:3]) + + polled = self.watcher().poll_once() + + self.assertEqual(polled, self.events[:3]) + + def test_cursor_persistence(self) -> None: + watcher = self.watcher() + write_ndjson(self.paths.events_path, self.events[:3]) + + first = watcher.poll_once() + second = watcher.poll_once() + + self.assertEqual(first, self.events[:3]) + self.assertEqual(second, []) + cursor = json.loads(self.paths.cursor_path.read_text(encoding="utf-8")) + self.assertGreater(cursor["position"], 0) + + def test_cursor_survives_restart(self) -> None: + write_ndjson(self.paths.events_path, self.events[:3]) + + first_watcher = self.watcher() + self.assertEqual(first_watcher.poll_once(), self.events[:3]) + + second_watcher = self.watcher() + self.assertEqual(second_watcher.poll_once(), []) + + def test_corrupt_line_skipped(self) -> None: + self.paths.events_path.parent.mkdir(parents=True, exist_ok=True) + with self.paths.events_path.open("w", encoding="utf-8") as handle: + handle.write(json.dumps(self.events[0]) + "\n") + handle.write("{not-json}\n") + handle.write(json.dumps(self.events[1]) + "\n") + + polled = self.watcher().poll_once() + + self.assertEqual(polled, [self.events[0], self.events[1]]) + + def test_callback_filtering(self) -> None: + write_ndjson(self.paths.events_path, self.events) + received: list[dict[str, object]] = [] + watcher = self.watcher() + watcher.on(["task.completed"], received.append) + + watcher.poll_once() + + self.assertEqual(received, [self.events[2]]) + + def test_callback_receives_events(self) -> None: + write_ndjson(self.paths.events_path, self.events[:2]) + received: list[dict[str, object]] = [] + watcher = self.watcher() + watcher.on([], received.append) + + polled = watcher.poll_once() + + self.assertEqual(received, self.events[:2]) + self.assertEqual(polled, self.events[:2]) + + def test_file_grows_between_polls(self) -> None: + watcher = self.watcher() + write_ndjson(self.paths.events_path, self.events[:2]) + + first = watcher.poll_once() + append_ndjson(self.paths.events_path, self.events[2:5]) + second = watcher.poll_once() + + self.assertEqual(first, self.events[:2]) + self.assertEqual(second, self.events[2:5]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_webhook_adapter.py b/tests/test_webhook_adapter.py new file mode 100644 index 0000000..738a9de --- /dev/null +++ b/tests/test_webhook_adapter.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import unittest +import urllib.error +from unittest.mock import patch + +from tests.conftest import sample_config, sample_events +from webhook_adapter import create_webhook_callback, send_webhook + + +class MockHTTPResponse: + def __init__(self, status: int) -> None: + self.status = status + + def getcode(self) -> int: + return self.status + + def __enter__(self) -> "MockHTTPResponse": + return self + + def __exit__(self, exc_type, exc, tb) -> bool: + return False + + +class WebhookAdapterTests(unittest.TestCase): + def setUp(self) -> None: + self.events = sample_events() + self.completed_event = self.events[2] + self.started_event = self.events[1] + + @patch("webhook_adapter.urllib.request.urlopen", return_value=MockHTTPResponse(200)) + def test_send_webhook_success(self, mock_urlopen) -> None: + config = sample_config() + + result = send_webhook(self.completed_event, config) + + self.assertTrue(result) + self.assertEqual(mock_urlopen.call_count, 1) + request = mock_urlopen.call_args.args[0] + self.assertEqual(request.full_url, "https://hooks.example.com/macp") + self.assertEqual(request.get_method(), "POST") + self.assertEqual(request.headers["Authorization"], "Bearer token-123") + + @patch("webhook_adapter.urllib.request.urlopen", return_value=MockHTTPResponse(500)) + def test_send_webhook_failure(self, mock_urlopen) -> None: + result = send_webhook(self.completed_event, sample_config(retry_count=0)) + + self.assertFalse(result) + self.assertEqual(mock_urlopen.call_count, 1) + + @patch("webhook_adapter.urllib.request.urlopen", side_effect=TimeoutError("timed out")) + def test_send_webhook_timeout(self, mock_urlopen) -> None: + result = send_webhook(self.completed_event, sample_config(retry_count=0)) + + self.assertFalse(result) + self.assertEqual(mock_urlopen.call_count, 1) + + @patch("webhook_adapter.time.sleep", return_value=None) + @patch( + "webhook_adapter.urllib.request.urlopen", + side_effect=[MockHTTPResponse(500), MockHTTPResponse(200)], + ) + def test_send_webhook_retry(self, mock_urlopen, mock_sleep) -> None: + result = send_webhook(self.completed_event, sample_config(retry_count=1)) + + self.assertTrue(result) + self.assertEqual(mock_urlopen.call_count, 2) + mock_sleep.assert_called_once() + + @patch("webhook_adapter.send_webhook", return_value=True) + def test_event_filter(self, mock_send_webhook) -> None: + callback = create_webhook_callback(sample_config(event_filter=["task.completed"])) + + callback(self.started_event) + callback(self.completed_event) + + mock_send_webhook.assert_called_once_with(self.completed_event, sample_config(event_filter=["task.completed"])) + + @patch("webhook_adapter.send_webhook") + def test_webhook_disabled(self, mock_send_webhook) -> None: + callback = create_webhook_callback(sample_config(enabled=False)) + + callback(self.completed_event) + + mock_send_webhook.assert_not_called() + + @patch("webhook_adapter.urllib.request.urlopen") + def test_ssrf_blocked(self, mock_urlopen) -> None: + for url in ("http://127.0.0.1/webhook", "http://10.1.2.3/webhook"): + with self.subTest(url=url): + result = send_webhook( + self.completed_event, + sample_config(url=url, auth_token="", retry_count=0), + ) + self.assertFalse(result) + mock_urlopen.assert_not_called() + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/orchestrator-matrix/events/discord_formatter.py b/tools/orchestrator-matrix/events/discord_formatter.py index c596868..505b2be 100644 --- a/tools/orchestrator-matrix/events/discord_formatter.py +++ b/tools/orchestrator-matrix/events/discord_formatter.py @@ -6,8 +6,8 @@ from __future__ import annotations import re from typing import Any -# Strip control characters and ANSI escapes from untrusted event fields -_CTRL_RE = re.compile(r"[\x00-\x1f\x7f]|\x1b\[[0-9;]*[A-Za-z]") +# Strip ANSI escapes before generic control characters so escape fragments do not survive. +_CTRL_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]|[\x00-\x1f\x7f]") # Collapse Discord @-mentions / role pings to prevent deceptive pings _MENTION_RE = re.compile(r"@(everyone|here|&?\d+)") @@ -15,7 +15,7 @@ _MENTION_RE = re.compile(r"@(everyone|here|&?\d+)") def _sanitize(value: str) -> str: """Normalize untrusted text for safe rendering in Discord/terminal output.""" value = _CTRL_RE.sub(" ", value) - value = _MENTION_RE.sub(r"@\u200b\1", value) # zero-width space breaks pings + value = _MENTION_RE.sub(lambda match: "@\u200b" + match.group(1), value) return value.strip() diff --git a/tools/orchestrator-matrix/events/webhook_adapter.py b/tools/orchestrator-matrix/events/webhook_adapter.py index fb37843..7aff73a 100644 --- a/tools/orchestrator-matrix/events/webhook_adapter.py +++ b/tools/orchestrator-matrix/events/webhook_adapter.py @@ -44,13 +44,11 @@ def _validate_webhook_url(url: str, auth_token: str) -> str | None: return "refusing to send auth_token over non-HTTPS to non-localhost — use https://" host = parsed.hostname or "" - # Block RFC1918, loopback, link-local, and metadata IPs unless auth_token is absent + # Block RFC1918, loopback, and link-local IPs outright. try: ip = ipaddress.ip_address(host) if ip.is_loopback or ip.is_private or ip.is_link_local: - # Allow localhost for development (no token risk since we already checked above) - if auth_token and not ip.is_loopback: - return f"refusing to send auth_token to private/internal IP {ip}" + return f"refusing to send webhook to private/internal IP {ip}" except ValueError: pass # hostname — DNS resolution not validated here (best-effort) @@ -61,6 +59,9 @@ def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool: """POST event to webhook URL. Returns True on success.""" webhook = _webhook_config(config) + if webhook.get("enabled") is False: + return False + url = str(webhook.get("url") or "").strip() if not url: _warn("missing webhook url") @@ -99,7 +100,20 @@ def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool: def create_webhook_callback(config: dict[str, Any]) -> Callable[[dict[str, Any]], None]: """Factory that creates a watcher callback from config.""" + webhook = _webhook_config(config) + enabled = bool(webhook.get("enabled", False)) + event_filter = { + str(event_type).strip() + for event_type in list(webhook.get("event_filter") or []) + if str(event_type).strip() + } + def callback(event: dict[str, Any]) -> None: + if not enabled: + return + event_type = str(event.get("event_type") or "").strip() + if event_filter and event_type not in event_filter: + return if not send_webhook(event, config): _warn(f"delivery failed for event {event.get('event_type', '')}")