#!/usr/bin/env python3 """Portable file-polling watcher for MACP orchestrator events.""" from __future__ import annotations import json import pathlib import sys import time from collections.abc import Callable from typing import Any def _warn(message: str) -> None: print(f"[macp-event-watcher] {message}", file=sys.stderr) def _load_json(path: pathlib.Path, default: Any) -> Any: if not path.exists(): return default try: with path.open("r", encoding="utf-8") as handle: return json.load(handle) except (OSError, json.JSONDecodeError) as exc: _warn(f"failed to load cursor {path}: {exc}") return default def _save_json_atomic(path: pathlib.Path, data: Any) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") with tmp.open("w", encoding="utf-8") as handle: json.dump(data, handle, indent=2) handle.write("\n") tmp.replace(path) class EventWatcher: """Poll an events NDJSON file and dispatch matching callbacks.""" def __init__(self, events_path: pathlib.Path, cursor_path: pathlib.Path, poll_interval: float = 2.0): self.events_path = events_path self.cursor_path = cursor_path self.poll_interval = max(0.1, float(poll_interval)) self._callbacks: list[tuple[set[str] | None, Callable[[dict[str, Any]], None]]] = [] self._cursor_position = self._load_cursor() def on(self, event_types: list[str], callback: Callable[[dict[str, Any]], None]) -> None: """Register a callback for specific event types.""" normalized = {str(event_type).strip() for event_type in event_types if str(event_type).strip()} self._callbacks.append((normalized or None, callback)) def poll_once(self) -> list[dict[str, Any]]: """Read new events since last cursor position. Returns list of new events.""" if not self.events_path.exists(): return [] try: file_size = self.events_path.stat().st_size except OSError as exc: _warn(f"failed to stat events file {self.events_path}: {exc}") return [] if file_size < self._cursor_position: _warn( f"events file shrank from cursor={self._cursor_position} to size={file_size}; " "resetting cursor to start" ) self._cursor_position = 0 self._persist_cursor() events: list[dict[str, Any]] = [] new_position = self._cursor_position try: with self.events_path.open("r", encoding="utf-8") as handle: handle.seek(self._cursor_position) while True: line_start = handle.tell() line = handle.readline() if not line: break line_end = handle.tell() if not line.endswith("\n"): new_position = line_start break stripped = line.strip() if not stripped: new_position = line_end continue try: event = json.loads(stripped) except json.JSONDecodeError as exc: _warn(f"skipping corrupt event at byte {line_start}: {exc}") new_position = line_end continue if not isinstance(event, dict): _warn(f"skipping non-object event at byte {line_start}") new_position = line_end continue events.append(event) self._dispatch(event) new_position = line_end except OSError as exc: _warn(f"failed to read events file {self.events_path}: {exc}") return [] if new_position != self._cursor_position: self._cursor_position = new_position self._persist_cursor() return events def run(self, max_iterations: int = 0) -> None: """Polling loop. max_iterations=0 means infinite.""" iterations = 0 while max_iterations <= 0 or iterations < max_iterations: self.poll_once() iterations += 1 if max_iterations > 0 and iterations >= max_iterations: break time.sleep(self.poll_interval) def _dispatch(self, event: dict[str, Any]) -> None: event_type = str(event.get("event_type") or "").strip() for filters, callback in self._callbacks: if filters is not None and event_type not in filters: continue try: callback(event) except Exception as exc: # pragma: no cover - defensive boundary _warn(f"callback failure for event {event_type or ''}: {exc}") def _load_cursor(self) -> int: payload = _load_json(self.cursor_path, {"position": 0}) try: position = int(payload.get("position", 0)) except (AttributeError, TypeError, ValueError): position = 0 return max(0, position) def _persist_cursor(self) -> None: _save_json_atomic(self.cursor_path, {"position": self._cursor_position})