test: add comprehensive test suite for Phase 2A event bridge

This commit is contained in:
Jarvis
2026-03-27 21:23:41 -05:00
parent b42762d7f4
commit b381d81bfc
11 changed files with 460 additions and 10 deletions

View File

@@ -6,8 +6,8 @@ from __future__ import annotations
import re
from typing import Any
# Strip control characters and ANSI escapes from untrusted event fields
_CTRL_RE = re.compile(r"[\x00-\x1f\x7f]|\x1b\[[0-9;]*[A-Za-z]")
# Strip ANSI escapes before generic control characters so escape fragments do not survive.
_CTRL_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]|[\x00-\x1f\x7f]")
# Collapse Discord @-mentions / role pings to prevent deceptive pings
_MENTION_RE = re.compile(r"@(everyone|here|&?\d+)")
@@ -15,7 +15,7 @@ _MENTION_RE = re.compile(r"@(everyone|here|&?\d+)")
def _sanitize(value: str) -> str:
"""Normalize untrusted text for safe rendering in Discord/terminal output."""
value = _CTRL_RE.sub(" ", value)
value = _MENTION_RE.sub(r"@\u200b\1", value) # zero-width space breaks pings
value = _MENTION_RE.sub(lambda match: "@\u200b" + match.group(1), value)
return value.strip()

View File

@@ -44,13 +44,11 @@ def _validate_webhook_url(url: str, auth_token: str) -> str | None:
return "refusing to send auth_token over non-HTTPS to non-localhost — use https://"
host = parsed.hostname or ""
# Block RFC1918, loopback, link-local, and metadata IPs unless auth_token is absent
# Block RFC1918, loopback, and link-local IPs outright.
try:
ip = ipaddress.ip_address(host)
if ip.is_loopback or ip.is_private or ip.is_link_local:
# Allow localhost for development (no token risk since we already checked above)
if auth_token and not ip.is_loopback:
return f"refusing to send auth_token to private/internal IP {ip}"
return f"refusing to send webhook to private/internal IP {ip}"
except ValueError:
pass # hostname — DNS resolution not validated here (best-effort)
@@ -61,6 +59,9 @@ def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool:
"""POST event to webhook URL. Returns True on success."""
webhook = _webhook_config(config)
if webhook.get("enabled") is False:
return False
url = str(webhook.get("url") or "").strip()
if not url:
_warn("missing webhook url")
@@ -99,7 +100,20 @@ def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool:
def create_webhook_callback(config: dict[str, Any]) -> Callable[[dict[str, Any]], None]:
"""Factory that creates a watcher callback from config."""
webhook = _webhook_config(config)
enabled = bool(webhook.get("enabled", False))
event_filter = {
str(event_type).strip()
for event_type in list(webhook.get("event_filter") or [])
if str(event_type).strip()
}
def callback(event: dict[str, Any]) -> None:
if not enabled:
return
event_type = str(event.get("event_type") or "").strip()
if event_filter and event_type not in event_filter:
return
if not send_webhook(event, config):
_warn(f"delivery failed for event {event.get('event_type', '<unknown>')}")