Implements the Coordinator class with main orchestration loop: - Async loop architecture with configurable poll interval - process_queue() method gets next ready issue and spawns agent (stub) - Graceful shutdown handling with stop() method - Error handling that allows loop to continue after failures - Logging for all actions (start, stop, processing, errors) - Integration with QueueManager from #159 - Active agent tracking for future agent management Configuration settings added: - COORDINATOR_POLL_INTERVAL (default: 5.0s) - COORDINATOR_MAX_CONCURRENT_AGENTS (default: 10) - COORDINATOR_ENABLED (default: true) Tests: 27 new tests covering all acceptance criteria Coverage: 92% overall (100% for coordinator.py) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
83 lines
3.2 KiB
Python
83 lines
3.2 KiB
Python
"""Tests for HMAC signature verification."""
|
|
|
|
import hmac
|
|
import json
|
|
|
|
|
|
class TestSignatureVerification:
|
|
"""Test suite for HMAC SHA256 signature verification."""
|
|
|
|
def test_verify_signature_valid(self, webhook_secret: str) -> None:
|
|
"""Test that valid signature is accepted."""
|
|
from src.security import verify_signature
|
|
|
|
payload = json.dumps({"action": "assigned", "number": 157}).encode("utf-8")
|
|
signature = hmac.new(
|
|
webhook_secret.encode("utf-8"), payload, "sha256"
|
|
).hexdigest()
|
|
|
|
assert verify_signature(payload, signature, webhook_secret) is True
|
|
|
|
def test_verify_signature_invalid(self, webhook_secret: str) -> None:
|
|
"""Test that invalid signature is rejected."""
|
|
from src.security import verify_signature
|
|
|
|
payload = json.dumps({"action": "assigned", "number": 157}).encode("utf-8")
|
|
invalid_signature = "invalid_signature_12345"
|
|
|
|
assert verify_signature(payload, invalid_signature, webhook_secret) is False
|
|
|
|
def test_verify_signature_empty_signature(self, webhook_secret: str) -> None:
|
|
"""Test that empty signature is rejected."""
|
|
from src.security import verify_signature
|
|
|
|
payload = json.dumps({"action": "assigned", "number": 157}).encode("utf-8")
|
|
|
|
assert verify_signature(payload, "", webhook_secret) is False
|
|
|
|
def test_verify_signature_wrong_secret(self, webhook_secret: str) -> None:
|
|
"""Test that signature with wrong secret is rejected."""
|
|
from src.security import verify_signature
|
|
|
|
payload = json.dumps({"action": "assigned", "number": 157}).encode("utf-8")
|
|
wrong_secret = "wrong-secret-67890"
|
|
signature = hmac.new(
|
|
wrong_secret.encode("utf-8"), payload, "sha256"
|
|
).hexdigest()
|
|
|
|
assert verify_signature(payload, signature, webhook_secret) is False
|
|
|
|
def test_verify_signature_modified_payload(self, webhook_secret: str) -> None:
|
|
"""Test that signature fails when payload is modified."""
|
|
from src.security import verify_signature
|
|
|
|
original_payload = json.dumps({"action": "assigned", "number": 157}).encode(
|
|
"utf-8"
|
|
)
|
|
signature = hmac.new(
|
|
webhook_secret.encode("utf-8"), original_payload, "sha256"
|
|
).hexdigest()
|
|
|
|
# Modify the payload
|
|
modified_payload = json.dumps({"action": "assigned", "number": 999}).encode(
|
|
"utf-8"
|
|
)
|
|
|
|
assert verify_signature(modified_payload, signature, webhook_secret) is False
|
|
|
|
def test_verify_signature_timing_safe(self, webhook_secret: str) -> None:
|
|
"""Test that signature comparison is timing-attack safe."""
|
|
from src.security import verify_signature
|
|
|
|
payload = json.dumps({"action": "assigned", "number": 157}).encode("utf-8")
|
|
signature = hmac.new(
|
|
webhook_secret.encode("utf-8"), payload, "sha256"
|
|
).hexdigest()
|
|
|
|
# Valid signature should work
|
|
assert verify_signature(payload, signature, webhook_secret) is True
|
|
|
|
# Similar but wrong signature should fail (timing-safe comparison)
|
|
wrong_signature = signature[:-1] + ("0" if signature[-1] != "0" else "1")
|
|
assert verify_signature(payload, wrong_signature, webhook_secret) is False
|