fix: address code and security review findings from Phase 2A

- Remove committed __pycache__ artifacts; add to .gitignore
- Wrap config JSON parse in try/except to prevent CLI crash on malformed config
- Add SSRF mitigation to webhook_adapter: reject non-http(s) schemes,
  refuse auth_token over cleartext to non-localhost, block private IPs
- Add _sanitize() to discord_formatter: strip ANSI/control chars,
  neutralize @everyone/@here Discord mentions
This commit is contained in:
Jarvis
2026-03-27 21:12:04 -05:00
parent 63c30b564d
commit b42762d7f4
8 changed files with 63 additions and 4 deletions

View File

@@ -23,6 +23,40 @@ def _webhook_config(config: dict[str, Any]) -> dict[str, Any]:
return dict(config)
def _validate_webhook_url(url: str, auth_token: str) -> str | None:
"""Validate webhook URL for SSRF and cleartext credential risks.
Returns an error message if the URL is disallowed, or None if safe.
"""
import ipaddress
import urllib.parse as urlparse
parsed = urlparse.urlparse(url)
scheme = parsed.scheme.lower()
if scheme not in ("http", "https"):
return f"unsupported scheme '{scheme}' — must be http or https"
if auth_token and scheme == "http":
host = parsed.hostname or ""
# Allow cleartext only for explicit loopback (dev use)
if host not in ("localhost", "127.0.0.1", "::1"):
return "refusing to send auth_token over non-HTTPS to non-localhost — use https://"
host = parsed.hostname or ""
# Block RFC1918, loopback, link-local, and metadata IPs unless auth_token is absent
try:
ip = ipaddress.ip_address(host)
if ip.is_loopback or ip.is_private or ip.is_link_local:
# Allow localhost for development (no token risk since we already checked above)
if auth_token and not ip.is_loopback:
return f"refusing to send auth_token to private/internal IP {ip}"
except ValueError:
pass # hostname — DNS resolution not validated here (best-effort)
return None
def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool:
"""POST event to webhook URL. Returns True on success."""
@@ -35,6 +69,12 @@ def send_webhook(event: dict[str, Any], config: dict[str, Any]) -> bool:
timeout_seconds = max(1.0, float(webhook.get("timeout_seconds") or 10))
retry_count = max(0, int(webhook.get("retry_count") or 0))
auth_token = str(webhook.get("auth_token") or "").strip()
url_err = _validate_webhook_url(url, auth_token)
if url_err:
_warn(f"webhook URL rejected: {url_err}")
return False
payload = json.dumps(event, ensure_ascii=True).encode("utf-8")
headers = {"Content-Type": "application/json"}
if auth_token: