"""Circuit breaker pattern for preventing infinite retry loops. This module provides a CircuitBreaker class that implements the circuit breaker pattern to protect against cascading failures in coordinator loops. Circuit breaker states: - CLOSED: Normal operation, requests pass through - OPEN: After N consecutive failures, all requests are blocked - HALF_OPEN: After cooldown, allow one request to test recovery Reference: SEC-ORCH-7 from security review """ import logging import time from collections.abc import Callable from enum import StrEnum from typing import Any logger = logging.getLogger(__name__) class CircuitState(StrEnum): """States for the circuit breaker.""" CLOSED = "closed" # Normal operation OPEN = "open" # Blocking requests after failures HALF_OPEN = "half_open" # Testing if service recovered class CircuitBreakerError(Exception): """Exception raised when circuit is open and blocking requests.""" def __init__(self, state: CircuitState, time_until_retry: float) -> None: """Initialize CircuitBreakerError. Args: state: Current circuit state time_until_retry: Seconds until circuit may close """ self.state = state self.time_until_retry = time_until_retry super().__init__( f"Circuit breaker is {state.value}. " f"Retry in {time_until_retry:.1f} seconds." ) class CircuitBreaker: """Circuit breaker for protecting against cascading failures. The circuit breaker tracks consecutive failures and opens the circuit after a threshold is reached, preventing further requests until a cooldown period has elapsed. Attributes: name: Identifier for this circuit breaker (for logging) failure_threshold: Number of consecutive failures before opening cooldown_seconds: Seconds to wait before allowing retry state: Current circuit state failure_count: Current consecutive failure count """ def __init__( self, name: str, failure_threshold: int = 5, cooldown_seconds: float = 30.0, ) -> None: """Initialize CircuitBreaker. Args: name: Identifier for this circuit breaker failure_threshold: Consecutive failures before opening (default: 5) cooldown_seconds: Seconds to wait before half-open (default: 30) """ self.name = name self.failure_threshold = failure_threshold self.cooldown_seconds = cooldown_seconds self._state = CircuitState.CLOSED self._failure_count = 0 self._last_failure_time: float | None = None self._total_failures = 0 self._total_successes = 0 self._state_transitions = 0 @property def state(self) -> CircuitState: """Get the current circuit state. This also handles automatic state transitions based on cooldown. Returns: Current CircuitState """ if self._state == CircuitState.OPEN: # Check if cooldown has elapsed if self._last_failure_time is not None: elapsed = time.time() - self._last_failure_time if elapsed >= self.cooldown_seconds: self._transition_to(CircuitState.HALF_OPEN) return self._state @property def failure_count(self) -> int: """Get current consecutive failure count. Returns: Number of consecutive failures """ return self._failure_count @property def total_failures(self) -> int: """Get total failure count (all-time). Returns: Total number of failures """ return self._total_failures @property def total_successes(self) -> int: """Get total success count (all-time). Returns: Total number of successes """ return self._total_successes @property def state_transitions(self) -> int: """Get total state transition count. Returns: Number of state transitions """ return self._state_transitions @property def time_until_retry(self) -> float: """Get time remaining until retry is allowed. Returns: Seconds until circuit may transition to half-open, or 0 if not open """ if self._state != CircuitState.OPEN or self._last_failure_time is None: return 0.0 elapsed = time.time() - self._last_failure_time remaining = self.cooldown_seconds - elapsed return max(0.0, remaining) def can_execute(self) -> bool: """Check if a request can be executed. This method checks the current state and determines if a request should be allowed through. Returns: True if request can proceed, False otherwise """ current_state = self.state # This handles cooldown transitions if current_state == CircuitState.CLOSED: return True elif current_state == CircuitState.HALF_OPEN: # Allow one test request return True else: # OPEN return False def record_success(self) -> None: """Record a successful operation. This resets the failure count and closes the circuit if it was in half-open state. """ self._total_successes += 1 if self._state == CircuitState.HALF_OPEN: logger.info( f"Circuit breaker '{self.name}': Recovery confirmed, closing circuit" ) self._transition_to(CircuitState.CLOSED) # Reset failure count on any success self._failure_count = 0 logger.debug(f"Circuit breaker '{self.name}': Success recorded, failure count reset") def record_failure(self) -> None: """Record a failed operation. This increments the failure count and may open the circuit if the threshold is reached. """ self._failure_count += 1 self._total_failures += 1 self._last_failure_time = time.time() logger.warning( f"Circuit breaker '{self.name}': Failure recorded " f"({self._failure_count}/{self.failure_threshold})" ) if self._state == CircuitState.HALF_OPEN: # Failed during test request, go back to open logger.warning( f"Circuit breaker '{self.name}': Test request failed, reopening circuit" ) self._transition_to(CircuitState.OPEN) elif self._failure_count >= self.failure_threshold: logger.error( f"Circuit breaker '{self.name}': Failure threshold reached, opening circuit" ) self._transition_to(CircuitState.OPEN) def reset(self) -> None: """Reset the circuit breaker to initial state. This should be used carefully, typically only for testing or manual intervention. """ old_state = self._state self._state = CircuitState.CLOSED self._failure_count = 0 self._last_failure_time = None logger.info( f"Circuit breaker '{self.name}': Manual reset " f"(was {old_state.value}, now closed)" ) def _transition_to(self, new_state: CircuitState) -> None: """Transition to a new state. Args: new_state: The state to transition to """ old_state = self._state self._state = new_state self._state_transitions += 1 logger.info( f"Circuit breaker '{self.name}': State transition " f"{old_state.value} -> {new_state.value}" ) def get_stats(self) -> dict[str, Any]: """Get circuit breaker statistics. Returns: Dictionary with current stats """ return { "name": self.name, "state": self.state.value, "failure_count": self._failure_count, "failure_threshold": self.failure_threshold, "cooldown_seconds": self.cooldown_seconds, "time_until_retry": self.time_until_retry, "total_failures": self._total_failures, "total_successes": self._total_successes, "state_transitions": self._state_transitions, } async def execute( self, func: Callable[..., Any], *args: Any, **kwargs: Any, ) -> Any: """Execute a function with circuit breaker protection. This is a convenience method that wraps async function execution with automatic success/failure recording. Args: func: Async function to execute *args: Positional arguments for the function **kwargs: Keyword arguments for the function Returns: Result of the function execution Raises: CircuitBreakerError: If circuit is open Exception: If function raises and circuit is closed/half-open """ if not self.can_execute(): raise CircuitBreakerError(self.state, self.time_until_retry) try: result = await func(*args, **kwargs) self.record_success() return result except Exception: self.record_failure() raise