#!/usr/bin/env python3 """Webhook delivery helpers for MACP events.""" from __future__ import annotations import json import sys import time import urllib.error import urllib.request from collections.abc import Callable from typing import Any def _warn(message: str) -> None: print(f"[macp-webhook] {message}", file=sys.stderr) def _webhook_config(config: dict[str, Any]) -> dict[str, Any]: macp = config.get("macp") if isinstance(macp, dict) and isinstance(macp.get("webhook"), dict): return dict(macp["webhook"]) return dict(config) def _validate_webhook_url(url: str, auth_token: str) -> str | None: """Validate webhook URL for SSRF and cleartext credential risks. Returns an error message if the URL is disallowed, or None if safe. """ import ipaddress import urllib.parse as urlparse parsed = urlparse.urlparse(url) scheme = parsed.scheme.lower() if scheme not in ("http", "https"): return f"unsupported scheme '{scheme}' — must be http or https" if auth_token and scheme == "http": host = parsed.hostname or "" # Allow cleartext only for explicit loopback (dev use) if host not in ("localhost", "127.0.0.1", "::1"): return "refusing to send auth_token over non-HTTPS to non-localhost — use https://" host = parsed.hostname or "" # Block RFC1918, loopback, and link-local IPs outright. try: ip = ipaddress.ip_address(host) if ip.is_loopback or ip.is_private or ip.is_link_local: return f"refusing to send webhook to private/internal IP {ip}" except ValueError: pass # hostname — DNS resolution not validated here (best-effort) return None def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool: """POST event to webhook URL. Returns True on success.""" webhook = _webhook_config(config) if webhook.get("enabled") is False: return False url = str(webhook.get("url") or "").strip() if not url: _warn("missing webhook url") return False timeout_seconds = max(1.0, float(webhook.get("timeout_seconds") or 10)) retry_count = max(0, int(webhook.get("retry_count") or 0)) auth_token = str(webhook.get("auth_token") or "").strip() url_err = _validate_webhook_url(url, auth_token) if url_err: _warn(f"webhook URL rejected: {url_err}") return False payload = json.dumps(event, ensure_ascii=True).encode("utf-8") headers = {"Content-Type": "application/json"} if auth_token: headers["Authorization"] = f"Bearer {auth_token}" attempts = retry_count + 1 for attempt in range(1, attempts + 1): request = urllib.request.Request(url, data=payload, headers=headers, method="POST") try: with urllib.request.urlopen(request, timeout=timeout_seconds) as response: status = getattr(response, "status", response.getcode()) if 200 <= int(status) < 300: return True _warn(f"webhook returned HTTP {status} on attempt {attempt}/{attempts}") except (urllib.error.HTTPError, urllib.error.URLError, TimeoutError, ValueError) as exc: _warn(f"webhook attempt {attempt}/{attempts} failed: {exc}") if attempt < attempts: time.sleep(min(timeout_seconds, 2 ** (attempt - 1))) return False def create_webhook_callback(config: dict[str, Any]) -> Callable[[dict[str, Any]], None]: """Factory that creates a watcher callback from config.""" webhook = _webhook_config(config) enabled = bool(webhook.get("enabled", False)) event_filter = { str(event_type).strip() for event_type in list(webhook.get("event_filter") or []) if str(event_type).strip() } def callback(event: dict[str, Any]) -> None: if not enabled: return event_type = str(event.get("event_type") or "").strip() if event_filter and event_type not in event_filter: return if not send_webhook(event, config): _warn(f"delivery failed for event {event.get('event_type', '')}") return callback