feat(#157): Set up webhook receiver endpoint
Implement FastAPI webhook receiver for Gitea issue assignment events with HMAC SHA256 signature verification and event routing. Implementation details: - FastAPI application with /webhook/gitea POST endpoint - HMAC SHA256 signature verification in security.py - Event routing for assigned, unassigned, closed actions - Comprehensive logging for all webhook events - Health check endpoint at /health - Docker containerization with health checks - 91% test coverage (exceeds 85% requirement) TDD workflow followed: - Wrote 16 tests first (RED phase) - Implemented features to pass tests (GREEN phase) - All tests passing with 91% coverage - Type checking with mypy: success - Linting with ruff: success Files created: - apps/coordinator/src/main.py - FastAPI application - apps/coordinator/src/webhook.py - Webhook handlers - apps/coordinator/src/security.py - HMAC verification - apps/coordinator/src/config.py - Configuration management - apps/coordinator/tests/ - Comprehensive test suite - apps/coordinator/Dockerfile - Production container - apps/coordinator/pyproject.toml - Python project config Configuration: - Updated .env.example with GITEA_WEBHOOK_SECRET - Updated docker-compose.yml with coordinator service Testing: - 16 unit and integration tests - Security tests for signature verification - Event handler tests for all supported actions - Health check endpoint tests - All tests passing with 91% coverage This unblocks issue #158 (issue parser). Fixes #157 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
84
apps/coordinator/tests/test_security.py
Normal file
84
apps/coordinator/tests/test_security.py
Normal file
@@ -0,0 +1,84 @@
|
||||
"""Tests for HMAC signature verification."""
|
||||
|
||||
import hmac
|
||||
import json
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestSignatureVerification:
|
||||
"""Test suite for HMAC SHA256 signature verification."""
|
||||
|
||||
def test_verify_signature_valid(self, webhook_secret: str) -> None:
|
||||
"""Test that valid signature is accepted."""
|
||||
from src.security import verify_signature
|
||||
|
||||
payload = json.dumps({"action": "assigned", "number": 157}).encode("utf-8")
|
||||
signature = hmac.new(
|
||||
webhook_secret.encode("utf-8"), payload, "sha256"
|
||||
).hexdigest()
|
||||
|
||||
assert verify_signature(payload, signature, webhook_secret) is True
|
||||
|
||||
def test_verify_signature_invalid(self, webhook_secret: str) -> None:
|
||||
"""Test that invalid signature is rejected."""
|
||||
from src.security import verify_signature
|
||||
|
||||
payload = json.dumps({"action": "assigned", "number": 157}).encode("utf-8")
|
||||
invalid_signature = "invalid_signature_12345"
|
||||
|
||||
assert verify_signature(payload, invalid_signature, webhook_secret) is False
|
||||
|
||||
def test_verify_signature_empty_signature(self, webhook_secret: str) -> None:
|
||||
"""Test that empty signature is rejected."""
|
||||
from src.security import verify_signature
|
||||
|
||||
payload = json.dumps({"action": "assigned", "number": 157}).encode("utf-8")
|
||||
|
||||
assert verify_signature(payload, "", webhook_secret) is False
|
||||
|
||||
def test_verify_signature_wrong_secret(self, webhook_secret: str) -> None:
|
||||
"""Test that signature with wrong secret is rejected."""
|
||||
from src.security import verify_signature
|
||||
|
||||
payload = json.dumps({"action": "assigned", "number": 157}).encode("utf-8")
|
||||
wrong_secret = "wrong-secret-67890"
|
||||
signature = hmac.new(
|
||||
wrong_secret.encode("utf-8"), payload, "sha256"
|
||||
).hexdigest()
|
||||
|
||||
assert verify_signature(payload, signature, webhook_secret) is False
|
||||
|
||||
def test_verify_signature_modified_payload(self, webhook_secret: str) -> None:
|
||||
"""Test that signature fails when payload is modified."""
|
||||
from src.security import verify_signature
|
||||
|
||||
original_payload = json.dumps({"action": "assigned", "number": 157}).encode(
|
||||
"utf-8"
|
||||
)
|
||||
signature = hmac.new(
|
||||
webhook_secret.encode("utf-8"), original_payload, "sha256"
|
||||
).hexdigest()
|
||||
|
||||
# Modify the payload
|
||||
modified_payload = json.dumps({"action": "assigned", "number": 999}).encode(
|
||||
"utf-8"
|
||||
)
|
||||
|
||||
assert verify_signature(modified_payload, signature, webhook_secret) is False
|
||||
|
||||
def test_verify_signature_timing_safe(self, webhook_secret: str) -> None:
|
||||
"""Test that signature comparison is timing-attack safe."""
|
||||
from src.security import verify_signature
|
||||
|
||||
payload = json.dumps({"action": "assigned", "number": 157}).encode("utf-8")
|
||||
signature = hmac.new(
|
||||
webhook_secret.encode("utf-8"), payload, "sha256"
|
||||
).hexdigest()
|
||||
|
||||
# Valid signature should work
|
||||
assert verify_signature(payload, signature, webhook_secret) is True
|
||||
|
||||
# Similar but wrong signature should fail (timing-safe comparison)
|
||||
wrong_signature = signature[:-1] + ("0" if signature[-1] != "0" else "1")
|
||||
assert verify_signature(payload, wrong_signature, webhook_secret) is False
|
||||
Reference in New Issue
Block a user