diff --git a/aws_lambda_powertools/shared/constants.py b/aws_lambda_powertools/shared/constants.py index bc19ff13b30..6c808d38758 100644 --- a/aws_lambda_powertools/shared/constants.py +++ b/aws_lambda_powertools/shared/constants.py @@ -76,3 +76,6 @@ # Idempotency constants IDEMPOTENCY_DISABLED_ENV: str = "POWERTOOLS_IDEMPOTENCY_DISABLED" + +# Circuit breaker constants +CIRCUIT_BREAKER_DISABLED_ENV: str = "POWERTOOLS_CIRCUIT_BREAKER_DISABLED" diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/__init__.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/__init__.py new file mode 100644 index 00000000000..e931245f9d0 --- /dev/null +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/__init__.py @@ -0,0 +1,35 @@ +""" +Circuit Breaker utility for protecting unhealthy downstream dependencies. + +!!! warning "Alpha / experimental" + This utility is published under the `_alpha` namespace while we collect + feedback. The public API may change in a backwards-incompatible way before it + is promoted to GA. Pin your version and follow the tracking discussion before + relying on it in production. +""" + +from aws_lambda_powertools.utilities.circuit_breaker_alpha.circuit_breaker import circuit_breaker +from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig +from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import ( + CircuitBreakerConfigError, + CircuitBreakerError, + CircuitBreakerOpenError, + CircuitBreakerPersistenceError, +) +from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import ( + CircuitInfo, + CircuitState, + CircuitTransition, +) + +__all__ = ( + "circuit_breaker", + "CircuitBreakerConfig", + "CircuitInfo", + "CircuitState", + "CircuitTransition", + "CircuitBreakerError", + "CircuitBreakerOpenError", + "CircuitBreakerConfigError", + "CircuitBreakerPersistenceError", +) diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/base.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/base.py new file mode 100644 index 00000000000..a23595bd317 --- /dev/null +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/base.py @@ -0,0 +1,203 @@ +""" +Orchestrator for the Circuit Breaker utility. + +:class:`CircuitBreakerHandler` owns the state machine and the per-environment failure +counter; the persistence layer owns the shared truth. This split keeps the healthy +path write-free: failures are counted locally and only persisted on a state transition. +""" + +from __future__ import annotations + +import datetime +import logging +import uuid +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import CircuitBreakerOpenError +from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState, CircuitTransition + +if TYPE_CHECKING: + from collections.abc import Callable + + from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig + from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import ( + CircuitBreakerPersistenceLayer, + ) + from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitInfo + +logger = logging.getLogger(__name__) + +# Per-environment, per-circuit consecutive counters. Module-level so they survive across +# invocations within the same execution environment, the same way idempotency caches do. +_LOCAL_FAILURES: dict[str, int] = {} +_LOCAL_SUCCESSES: dict[str, int] = {} + +# Stable per-environment identifier used to claim the half-open probe lock. +_ENVIRONMENT_ID = uuid.uuid4().hex + + +class CircuitBreakerHandler: + """ + Drive a single protected call through the circuit breaker state machine. + + A new handler is created per invocation by the decorator. It reads the shared state, + routes the call (run, short-circuit, or probe), and records the outcome. + + Parameters + ---------- + function : Callable + The protected function. + name : str + Circuit name. + config : CircuitBreakerConfig + Circuit configuration. + persistence_store : CircuitBreakerPersistenceLayer + Shared state store. + on_circuit_open : Callable | None + Callback invoked with the protected call's own ``*args``/``**kwargs`` plus a + trailing ``circuit`` keyword argument when the circuit is open. If ``None``, an + open circuit raises :class:`CircuitBreakerOpenError`. + function_args : tuple + Positional arguments the protected function was called with. + function_kwargs : dict + Keyword arguments the protected function was called with. + """ + + def __init__( + self, + function: Callable, + name: str, + config: CircuitBreakerConfig, + persistence_store: CircuitBreakerPersistenceLayer, + on_circuit_open: Callable | None = None, + on_transition: Callable | None = None, + function_args: tuple | None = None, + function_kwargs: dict | None = None, + ): + self.function = function + self.name = name + self.config = config + self.on_circuit_open = on_circuit_open + self.on_transition = on_transition + self.fn_args = function_args or () + self.fn_kwargs = function_kwargs or {} + + persistence_store.configure(config=config, circuit_name=name) + self.persistence_store = persistence_store + + def handle(self) -> Any: + """ + Evaluate the circuit and route the call. + + Returns + ------- + Any + The protected function's result when the call runs, or the + ``on_circuit_open`` callback's return value when the circuit is open. + + Raises + ------ + CircuitBreakerOpenError + If the circuit is open and no callback is registered. + """ + record = self.persistence_store.get_state(self.name) + + if record.state == CircuitState.CLOSED: + return self._call_closed() + + if record.state == CircuitState.OPEN: + # ``opened_at`` may legitimately be 0 (epoch); treat only None as missing. + opened_at = record.opened_at if record.opened_at is not None else self._now() + if self._now() >= opened_at + self.config.recovery_timeout: + # Recovery window elapsed: try to become the single prober. + if self.persistence_store.try_acquire_half_open(self.name, _ENVIRONMENT_ID, opened_at): + self._notify(CircuitState.OPEN, CircuitState.HALF_OPEN, opened_at=opened_at) + return self._call_probe() + return self._open_response(record.to_circuit_info()) + + # HALF_OPEN: only the environment that owns the probe lock runs. + if record.half_open_owner == _ENVIRONMENT_ID: + return self._call_probe() + return self._open_response(record.to_circuit_info()) + + def _call_closed(self) -> Any: + """Run the protected call while the circuit is closed, tracking failures.""" + try: + result = self.function(*self.fn_args, **self.fn_kwargs) + except Exception as exc: + if not self.config.counts_as_failure(exc): + raise + failures = _LOCAL_FAILURES.get(self.name, 0) + 1 + _LOCAL_FAILURES[self.name] = failures + if failures >= self.config.failure_threshold: + logger.debug("Circuit '%s' tripping CLOSED to OPEN after %d failures.", self.name, failures) + opened_at = self._now() + self.persistence_store.save_open(self.name, failure_count=failures, opened_at=opened_at) + _LOCAL_FAILURES[self.name] = 0 + self._notify(CircuitState.CLOSED, CircuitState.OPEN, opened_at=opened_at) + raise + else: + _LOCAL_FAILURES[self.name] = 0 + return result + + def _call_probe(self) -> Any: + """Run a probe during half-open, closing or reopening based on the outcome.""" + try: + result = self.function(*self.fn_args, **self.fn_kwargs) + except Exception as exc: + if not self.config.counts_as_failure(exc): + raise + logger.debug("Circuit '%s' probe failed; reopening.", self.name) + opened_at = self._now() + self.persistence_store.save_reopen(self.name, opened_at=opened_at) + _LOCAL_SUCCESSES[self.name] = 0 + self._notify(CircuitState.HALF_OPEN, CircuitState.OPEN, opened_at=opened_at) + raise + else: + successes = _LOCAL_SUCCESSES.get(self.name, 0) + 1 + _LOCAL_SUCCESSES[self.name] = successes + if successes >= self.config.success_threshold: + logger.debug("Circuit '%s' closing after %d probe successes.", self.name, successes) + self.persistence_store.save_closed(self.name) + _LOCAL_SUCCESSES[self.name] = 0 + _LOCAL_FAILURES[self.name] = 0 + self._notify(CircuitState.HALF_OPEN, CircuitState.CLOSED) + return result + + def _open_response(self, circuit: CircuitInfo) -> Any: + """Produce the response for an open circuit: callback result or raise.""" + if self.on_circuit_open is not None: + # Forward the protected call's arguments unchanged: positional stay positional, + # keyword stay keyword. The circuit snapshot is passed as a keyword argument so + # it never collides with positionalized kwargs nor depends on dict ordering. + return self.on_circuit_open(*self.fn_args, **self.fn_kwargs, circuit=circuit) + raise CircuitBreakerOpenError( + f"Circuit '{self.name}' is open.", + circuit=circuit, + ) + + def _notify(self, from_state: CircuitState, to_state: CircuitState, opened_at: int | None = None) -> None: + """ + Fire the ``on_transition`` hook for a state change. + + Called only on real transitions, never on the hot path. Any exception the hook + raises is swallowed and logged: observability must never break the protected call. + """ + if self.on_transition is None: + return + try: + self.on_transition( + CircuitTransition( + circuit_name=self.name, + from_state=from_state, + to_state=to_state, + opened_at=opened_at, + ), + ) + except Exception: + logger.warning("on_transition hook for circuit '%s' raised; ignoring.", self.name, exc_info=True) + + @staticmethod + def _now() -> int: + """Current unix timestamp in seconds.""" + return int(datetime.datetime.now().timestamp()) diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/circuit_breaker.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/circuit_breaker.py new file mode 100644 index 00000000000..de4e66b0680 --- /dev/null +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/circuit_breaker.py @@ -0,0 +1,119 @@ +""" +Primary interface for the Circuit Breaker utility. +""" + +from __future__ import annotations + +import functools +import logging +import os +import warnings +from typing import TYPE_CHECKING, Any + +from aws_lambda_powertools.shared import constants +from aws_lambda_powertools.shared.functions import strtobool +from aws_lambda_powertools.utilities.circuit_breaker_alpha.base import CircuitBreakerHandler +from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig +from aws_lambda_powertools.warnings import PowertoolsUserWarning + +if TYPE_CHECKING: + from collections.abc import Callable + + from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import ( + CircuitBreakerPersistenceLayer, + ) + +logger = logging.getLogger(__name__) + + +def circuit_breaker( + name: str, + persistence_store: CircuitBreakerPersistenceLayer, + on_circuit_open: Callable | None = None, + on_transition: Callable | None = None, + config: CircuitBreakerConfig | None = None, +) -> Callable: + """ + Protect a function that calls an unhealthy-prone downstream with a circuit breaker. + + Wrap the function that makes the downstream call, not the whole Lambda handler, so a + tripped circuit reflects one dependency rather than unrelated handler logic. + + When the circuit is open the protected function is not called. Instead, if an + ``on_circuit_open`` callback is registered it runs and its return value becomes the + call's result; otherwise :class:`CircuitBreakerOpenError` is raised. + + Parameters + ---------- + name : str + Unique circuit name. Each name is an independent circuit; a function calling + several backends should use one circuit per backend. + persistence_store : CircuitBreakerPersistenceLayer + Shared state store (for example ``CircuitBreakerDynamoDBPersistence``). + on_circuit_open : Callable | None + Called when the circuit is open, with the protected function's own arguments + (positional stay positional, keyword stay keyword) plus a trailing ``circuit`` + keyword argument carrying a ``CircuitInfo``. Its return value becomes the call's + result. If ``None``, an open circuit raises ``CircuitBreakerOpenError``. + on_transition : Callable | None + Called with a single ``CircuitTransition`` argument whenever the circuit changes + state (open, probe, close, reopen). Fires only on transitions, never on the + per-invocation hot path, so it is a safe place to emit a CloudWatch metric. Any + exception it raises is swallowed and logged so observability never breaks the + protected call. + config : CircuitBreakerConfig | None + Tunables. Defaults to ``CircuitBreakerConfig()`` when omitted. + + Returns + ------- + Callable + The decorated function. + + Example + ------- + **Protect a payment backend, buffering rejected requests** + + from aws_lambda_powertools.utilities.circuit_breaker_alpha import circuit_breaker, CircuitInfo + from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import ( + CircuitBreakerDynamoDBPersistence, + ) + + persistence = CircuitBreakerDynamoDBPersistence(table_name="CircuitBreakerState") + + def buffer(order: dict, circuit: CircuitInfo): + sqs.send_message(QueueUrl=url, MessageBody=json.dumps(order)) + + @circuit_breaker(name="payment-backend", persistence_store=persistence, on_circuit_open=buffer) + def charge(order: dict) -> dict: + return payment_api.charge(order) + """ + config = config or CircuitBreakerConfig() + + def decorator(function: Callable) -> Callable: + @functools.wraps(function) + def wrapper(*args, **kwargs) -> Any: + # Skip the circuit entirely when disabled (development only). + if strtobool(os.getenv(constants.CIRCUIT_BREAKER_DISABLED_ENV, "false")): + warnings.warn( + message="Disabling the circuit breaker is intended for development environments only " + "and should not be used in production.", + category=PowertoolsUserWarning, + stacklevel=2, + ) + return function(*args, **kwargs) + + handler = CircuitBreakerHandler( + function=function, + name=name, + config=config, + persistence_store=persistence_store, + on_circuit_open=on_circuit_open, + on_transition=on_transition, + function_args=args, + function_kwargs=kwargs, + ) + return handler.handle() + + return wrapper + + return decorator diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/config.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/config.py new file mode 100644 index 00000000000..9425e90ab38 --- /dev/null +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/config.py @@ -0,0 +1,128 @@ +""" +Configuration for the Circuit Breaker utility. +""" + +from __future__ import annotations + +from aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions import CircuitBreakerConfigError + + +class CircuitBreakerConfig: + """ + Tunables for a circuit breaker. + + All values have sensible defaults, so ``CircuitBreakerConfig()`` is a valid + production configuration. Pass an instance to ``@circuit_breaker(config=...)`` to + override them. + + Parameters + ---------- + failure_threshold : int + Number of *consecutive* failures that trips a closed circuit to open. Defaults to 5. + recovery_timeout : int + Seconds the circuit stays open before allowing a half-open probe. Defaults to 30. + success_threshold : int + Number of *consecutive* probe successes required to close a half-open circuit. + Defaults to 3. + handled_exceptions : tuple[type[Exception], ...] | None + Allowlist: only these exception types count as failures; anything else + propagates without affecting the circuit. Mutually exclusive with + ``ignored_exceptions``. Defaults to ``None`` (treated as ``(Exception,)``). + ignored_exceptions : tuple[type[Exception], ...] | None + Denylist: every exception counts as a failure *except* these. Mutually + exclusive with ``handled_exceptions``. Defaults to ``None``. + local_cache_max_age : int + Seconds a circuit's state is cached in the execution environment before a + read-through to the store. Matches the Parameters utility default. Defaults to 5. + + Raises + ------ + CircuitBreakerConfigError + If both ``handled_exceptions`` and ``ignored_exceptions`` are provided, or a + numeric tunable is not a positive integer. + + Example + ------- + **Only count timeouts and connection errors as failures** + + config = CircuitBreakerConfig( + failure_threshold=5, + recovery_timeout=30, + handled_exceptions=(TimeoutError, ConnectionError), + ) + """ + + def __init__( + self, + failure_threshold: int = 5, + recovery_timeout: int = 30, + success_threshold: int = 3, + handled_exceptions: tuple[type[Exception], ...] | None = None, + ignored_exceptions: tuple[type[Exception], ...] | None = None, + local_cache_max_age: int = 5, + ): + self._validate( + failure_threshold=failure_threshold, + recovery_timeout=recovery_timeout, + success_threshold=success_threshold, + handled_exceptions=handled_exceptions, + ignored_exceptions=ignored_exceptions, + local_cache_max_age=local_cache_max_age, + ) + + self.failure_threshold = failure_threshold + self.recovery_timeout = recovery_timeout + self.success_threshold = success_threshold + self.handled_exceptions = handled_exceptions + self.ignored_exceptions = ignored_exceptions + self.local_cache_max_age = local_cache_max_age + + @staticmethod + def _validate( + failure_threshold: int, + recovery_timeout: int, + success_threshold: int, + handled_exceptions: tuple[type[Exception], ...] | None, + ignored_exceptions: tuple[type[Exception], ...] | None, + local_cache_max_age: int, + ) -> None: + if handled_exceptions and ignored_exceptions: + raise CircuitBreakerConfigError( + "handled_exceptions and ignored_exceptions are mutually exclusive; pass only one.", + ) + + # Thresholds and timeouts must be strictly positive; cache age may be 0 (always read through). + for field, value in ( + ("failure_threshold", failure_threshold), + ("recovery_timeout", recovery_timeout), + ("success_threshold", success_threshold), + ): + if not isinstance(value, int) or value <= 0: + raise CircuitBreakerConfigError(f"{field} must be a positive integer, got {value!r}.") + + if not isinstance(local_cache_max_age, int) or local_cache_max_age < 0: + raise CircuitBreakerConfigError( + f"local_cache_max_age must be a non-negative integer, got {local_cache_max_age!r}.", + ) + + def counts_as_failure(self, exception: Exception) -> bool: + """ + Decide whether an exception raised by the protected call counts as a circuit failure. + + Parameters + ---------- + exception : Exception + The exception raised by the protected function. + + Returns + ------- + bool + ``True`` if the exception should increment the failure counter, ``False`` if + it should propagate without affecting the circuit. + """ + if self.handled_exceptions is not None: + return isinstance(exception, self.handled_exceptions) + if self.ignored_exceptions is not None: + return not isinstance(exception, self.ignored_exceptions) + # Default: any exception counts as a failure. + return True diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/exceptions.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/exceptions.py new file mode 100644 index 00000000000..cf3c350fc83 --- /dev/null +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/exceptions.py @@ -0,0 +1,77 @@ +""" +Circuit Breaker exceptions. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitInfo + + +class CircuitBreakerError(Exception): + """ + Base error class. + + Overrides message/details formatting so the printed exception stays readable. + See https://github.com/aws-powertools/powertools-lambda-python/issues/1772 + """ + + def __init__(self, *args: str | Exception | None): + self.message = str(args[0]) if args else "" + self.details = "".join(str(arg) for arg in args[1:]) if args[1:] else None + + def __str__(self): + """Return all arguments formatted, or the original message.""" + if self.message and self.details: + return f"{self.message} - ({self.details})" + return self.message + + +class CircuitBreakerOpenError(CircuitBreakerError): + """ + Raised when the circuit is open and no ``on_circuit_open`` callback is registered. + + The rejected request never reached the downstream. The circuit snapshot is attached + so the caller can decide how to respond. + + Parameters + ---------- + *args : str | Exception | None + Standard error message/details. + circuit : CircuitInfo | None + Snapshot of the circuit at rejection time. + + Example + ------- + **Handling an open circuit when no callback is registered** + + try: + charge(order) + except CircuitBreakerOpenError as exc: + logger.warning("rejected by circuit %s", exc.circuit.name) + return {"statusCode": 202} + """ + + def __init__(self, *args: str | Exception | None, circuit: CircuitInfo | None = None): + self.circuit = circuit + super().__init__(*args) + + +class CircuitBreakerConfigError(CircuitBreakerError): + """ + Raised when ``CircuitBreakerConfig`` is built with an unsupported combination of + options (for example, both ``handled_exceptions`` and ``ignored_exceptions``). + """ + + +class CircuitBreakerPersistenceError(CircuitBreakerError): + """ + Raised by a persistence backend for an unrecoverable store error on a *write* path + (persisting a state transition), where there is no safe local fallback. + + Reads never raise this: ``get_state`` fails open (treats the circuit as closed) and + only logs, so a degraded store can never become the outage the breaker is meant to + prevent. Custom backends may raise this from their write primitives. + """ diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/__init__.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/__init__.py new file mode 100644 index 00000000000..18704bff793 --- /dev/null +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/__init__.py @@ -0,0 +1,15 @@ +""" +Persistence layers for the Circuit Breaker utility. +""" + +from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import CircuitBreakerPersistenceLayer +from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.dynamodb import ( + CircuitBreakerDynamoDBPersistence, +) +from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord + +__all__ = ( + "CircuitBreakerPersistenceLayer", + "CircuitBreakerDynamoDBPersistence", + "CircuitStateRecord", +) diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/base.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/base.py new file mode 100644 index 00000000000..c87056f29dd --- /dev/null +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/base.py @@ -0,0 +1,270 @@ +""" +Abstract persistence layer for the Circuit Breaker utility. + +Concrete backends (DynamoDB, cache) subclass :class:`CircuitBreakerPersistenceLayer` +and implement the small set of store primitives. The base class owns the local +read-through cache and the fail-open policy so every backend behaves identically. +""" + +from __future__ import annotations + +import datetime +import logging +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +from aws_lambda_powertools.shared.cache_dict import LRUDict +from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord +from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState + +if TYPE_CHECKING: + from aws_lambda_powertools.utilities.circuit_breaker_alpha.config import CircuitBreakerConfig + +logger = logging.getLogger(__name__) + +# Circuit names are static in user code, so a handful of circuits per environment is the +# norm. This cap only guards the pathological case of dynamically generated names. +LOCAL_CACHE_MAX_ITEMS = 1024 + +# Slack added on top of a recovery cycle when computing the durable store TTL. The item +# must outlive any in-flight recovery window so a live circuit is never reaped mid-cycle, +# while an abandoned circuit (no traffic, no further writes) still self-cleans soon after. +PERSISTED_STATE_TTL_BUFFER = 3600 + + +class CircuitBreakerExistingLockError(Exception): + """Internal signal that a conditional half-open probe write lost the race.""" + + +class CircuitBreakerRecordNotFoundError(Exception): + """Internal signal that no record exists for a circuit name.""" + + +class CircuitBreakerPersistenceLayer(ABC): + """ + Abstract base class for circuit breaker persistence layers. + + Owns the per-environment read cache and the fail-open behavior. Subclasses + implement :meth:`_get_record`, :meth:`_put_record`, and :meth:`_update_record` + for a specific store. + + A persistence layer is keyed by **circuit name**, not by a payload hash, which is + the main reason it does not reuse the Idempotency persistence layer. + """ + + def __init__(self) -> None: + """Initialize defaults; real configuration happens in :meth:`configure`.""" + self.circuit_name: str = "" + self.local_cache_max_age: int = 5 + self.recovery_timeout: int = 30 + # Maps circuit name -> the unix timestamp the locally cached record goes stale. + # Kept separate from the record's durable ``expiry_timestamp`` (the store TTL) so + # the short in-memory freshness window is never mistaken for the long store TTL. + self._cache: LRUDict = LRUDict(max_items=LOCAL_CACHE_MAX_ITEMS) + + def configure(self, config: CircuitBreakerConfig, circuit_name: str) -> None: + """ + Bind the layer to a circuit and its configuration. + + Called once per invocation by the handler; the assignments are cheap and the + same persistence instance is reused across invocations within an environment. + + Parameters + ---------- + config : CircuitBreakerConfig + Configuration providing the local cache TTL and recovery timeout. + circuit_name : str + The circuit this layer instance serves. + """ + self.circuit_name = circuit_name + self.local_cache_max_age = config.local_cache_max_age + self.recovery_timeout = config.recovery_timeout + + # ------------------------------------------------------------------ cache + + def _cache_key(self, name: str) -> str: + return name + + def _durable_ttl(self) -> int: + """ + Compute the store TTL stamped on a persisted record. + + Sized to outlive a full recovery window so a live circuit is never reaped + mid-cycle, while an abandoned circuit (no further writes) self-cleans soon after. + """ + return int(datetime.datetime.now().timestamp()) + self.recovery_timeout + PERSISTED_STATE_TTL_BUFFER + + def _save_to_cache(self, record: CircuitStateRecord) -> None: + """Cache a record locally with a short in-memory freshness window.""" + local_expiry = int(datetime.datetime.now().timestamp()) + self.local_cache_max_age + self._cache[self._cache_key(record.name)] = (local_expiry, record) + + def _retrieve_from_cache(self, name: str) -> CircuitStateRecord | None: + """Return a cached record if present and still within its local freshness window.""" + cached = self._cache.get(self._cache_key(name)) + if cached is None: + return None + local_expiry, record = cached + if int(datetime.datetime.now().timestamp()) < local_expiry: + return record + del self._cache[self._cache_key(name)] + return None + + # ------------------------------------------------------------- public API + + def get_state(self, name: str) -> CircuitStateRecord: + """ + Return the current circuit state, reading the store only on a cache miss. + + A cache miss (cold start or expired local entry) forces a read-through before + the caller routes the request, so a freshly started environment never assumes a + circuit is closed without checking. + + Fail-open: if the store read itself raises, the circuit is treated as + ``CLOSED``. A circuit breaker must never become the outage it is meant to + prevent. + + Parameters + ---------- + name : str + Circuit name. + + Returns + ------- + CircuitStateRecord + The current record, a synthesized closed record if none exists yet, or a + synthesized closed record if the store could not be reached. + """ + cached = self._retrieve_from_cache(name) + if cached is not None: + return cached + + try: + record = self._get_record(name) + except CircuitBreakerRecordNotFoundError: + record = CircuitStateRecord(name=name, state=CircuitState.CLOSED) + except Exception: + # Fail open without caching, so the next invocation retries the store rather + # than serving a synthesized CLOSED for the whole local cache window. + logger.warning( + "Failed to read circuit state for '%s'; failing open (treating as CLOSED).", + name, + exc_info=True, + ) + return CircuitStateRecord(name=name, state=CircuitState.CLOSED) + + self._save_to_cache(record) + return record + + def save_open(self, name: str, failure_count: int, opened_at: int) -> None: + """ + Persist a CLOSED to OPEN transition. + + Parameters + ---------- + name : str + Circuit name. + failure_count : int + Consecutive failures that tripped the circuit. + opened_at : int + Unix timestamp the circuit opened; anchors the recovery timeout. + """ + record = CircuitStateRecord( + name=name, + state=CircuitState.OPEN, + failure_count=failure_count, + opened_at=opened_at, + expiry_timestamp=self._durable_ttl(), + ) + self._put_record(record) + self._save_to_cache(record) + + def try_acquire_half_open(self, name: str, owner: str, opened_at: int) -> bool: + """ + Atomically elect a single environment to run the half-open probe. + + Parameters + ---------- + name : str + Circuit name. + owner : str + Identifier of the environment attempting the probe. + opened_at : int + The ``opened_at`` the caller observed, kept stable across the transition. + + Returns + ------- + bool + ``True`` if this environment won the probe lock, ``False`` if another + environment already holds it. + """ + record = CircuitStateRecord( + name=name, + state=CircuitState.HALF_OPEN, + opened_at=opened_at, + half_open_owner=owner, + expiry_timestamp=self._durable_ttl(), + ) + try: + self._put_record(record, condition="half_open") + except CircuitBreakerExistingLockError: + return False + self._save_to_cache(record) + return True + + def save_closed(self, name: str) -> None: + """Persist a transition back to CLOSED and reset counters.""" + record = CircuitStateRecord( + name=name, + state=CircuitState.CLOSED, + failure_count=0, + expiry_timestamp=self._durable_ttl(), + ) + self._update_record(record) + self._save_to_cache(record) + + def save_reopen(self, name: str, opened_at: int) -> None: + """Persist a HALF_OPEN to OPEN transition after a failed probe.""" + record = CircuitStateRecord( + name=name, + state=CircuitState.OPEN, + opened_at=opened_at, + expiry_timestamp=self._durable_ttl(), + ) + self._update_record(record) + self._save_to_cache(record) + + # --------------------------------------------------------- backend hooks + + @abstractmethod + def _get_record(self, name: str) -> CircuitStateRecord: + """ + Fetch a circuit record from the store. + + Raises + ------ + CircuitBreakerRecordNotFoundError + If no record exists for ``name``. + """ + raise NotImplementedError + + @abstractmethod + def _put_record(self, record: CircuitStateRecord, condition: str | None = None) -> None: + """ + Write a circuit record. + + Parameters + ---------- + record : CircuitStateRecord + Record to write. + condition : str | None + When ``"half_open"``, the write must be conditional so only one + environment wins the probe lock; on a lost race the backend raises + :class:`CircuitBreakerExistingLockError`. + """ + raise NotImplementedError + + @abstractmethod + def _update_record(self, record: CircuitStateRecord) -> None: + """Update an existing circuit record (unconditional state change).""" + raise NotImplementedError diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py new file mode 100644 index 00000000000..12ef7502ba5 --- /dev/null +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/dynamodb.py @@ -0,0 +1,209 @@ +""" +DynamoDB persistence backend for the Circuit Breaker utility. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +import boto3 +from boto3.dynamodb.types import TypeDeserializer +from botocore.exceptions import ClientError + +from aws_lambda_powertools.shared import user_agent +from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.base import ( + CircuitBreakerExistingLockError, + CircuitBreakerPersistenceLayer, + CircuitBreakerRecordNotFoundError, +) +from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence.record import CircuitStateRecord +from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitState + +if TYPE_CHECKING: + from botocore.config import Config + from mypy_boto3_dynamodb.client import DynamoDBClient + +logger = logging.getLogger(__name__) + + +class CircuitBreakerDynamoDBPersistence(CircuitBreakerPersistenceLayer): + """ + Store circuit state in an Amazon DynamoDB table, one item per circuit. + + The class name is prefixed with ``CircuitBreaker`` so a function using both the + Idempotency and Circuit Breaker utilities can import both persistence layers + without an alias. + + Parameters + ---------- + table_name : str + Name of the DynamoDB table that stores circuit state. + key_attr : str + Partition key attribute holding the circuit name. Defaults to ``"id"``. + state_attr : str + Attribute holding the circuit state. Defaults to ``"state"``. + failure_count_attr : str + Attribute holding the consecutive failure count. Defaults to ``"failure_count"``. + opened_at_attr : str + Attribute holding the open timestamp. Defaults to ``"opened_at"``. + half_open_owner_attr : str + Attribute holding the half-open probe lock owner. Defaults to ``"half_open_owner"``. + expiry_attr : str + TTL attribute. Defaults to ``"expiration"``. + boto_config : botocore.config.Config, optional + Botocore configuration used when creating the client. + boto3_session : boto3.session.Session, optional + Session used to create the client. + boto3_client : DynamoDBClient, optional + Pre-built client; ``boto3_session`` and ``boto_config`` are ignored if given. + + Example + ------- + **Create a DynamoDB-backed circuit breaker store** + + from aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence import ( + CircuitBreakerDynamoDBPersistence, + ) + + persistence = CircuitBreakerDynamoDBPersistence(table_name="CircuitBreakerState") + """ + + def __init__( + self, + table_name: str, + key_attr: str = "id", + state_attr: str = "state", + failure_count_attr: str = "failure_count", + opened_at_attr: str = "opened_at", + half_open_owner_attr: str = "half_open_owner", + expiry_attr: str = "expiration", + boto_config: Config | None = None, + boto3_session: boto3.session.Session | None = None, + boto3_client: DynamoDBClient | None = None, + ): + if boto3_client is None: + boto3_session = boto3_session or boto3.session.Session() + boto3_client = boto3_session.client("dynamodb", config=boto_config) + self.client = boto3_client + + user_agent.register_feature_to_client(client=self.client, feature="circuit_breaker") + + self.table_name = table_name + self.key_attr = key_attr + self.state_attr = state_attr + self.failure_count_attr = failure_count_attr + self.opened_at_attr = opened_at_attr + self.half_open_owner_attr = half_open_owner_attr + self.expiry_attr = expiry_attr + + self._deserializer = TypeDeserializer() + + super().__init__() + + def _item_to_record(self, item: dict) -> CircuitStateRecord: + """Translate a raw DynamoDB item into a :class:`CircuitStateRecord`.""" + data = self._deserializer.deserialize({"M": item}) + opened_at = data.get(self.opened_at_attr) + return CircuitStateRecord( + name=data[self.key_attr], + state=CircuitState(data[self.state_attr]), + failure_count=int(data.get(self.failure_count_attr, 0)), + opened_at=int(opened_at) if opened_at is not None else None, + half_open_owner=data.get(self.half_open_owner_attr), + expiry_timestamp=data.get(self.expiry_attr), + ) + + def _record_to_item(self, record: CircuitStateRecord) -> dict: + """Translate a :class:`CircuitStateRecord` into a DynamoDB item.""" + item: dict = { + self.key_attr: {"S": record.name}, + self.state_attr: {"S": str(record.state)}, + self.failure_count_attr: {"N": str(record.failure_count)}, + } + if record.opened_at is not None: + item[self.opened_at_attr] = {"N": str(record.opened_at)} + if record.half_open_owner is not None: + item[self.half_open_owner_attr] = {"S": record.half_open_owner} + if record.expiry_timestamp is not None: + item[self.expiry_attr] = {"N": str(record.expiry_timestamp)} + return item + + def _get_record(self, name: str) -> CircuitStateRecord: + # Eventually consistent on purpose: matches the local cache's stale tolerance + # and halves the read cost on the hot path. + response = self.client.get_item( + TableName=self.table_name, + Key={self.key_attr: {"S": name}}, + ConsistentRead=False, + ) + try: + item = response["Item"] + except KeyError as exc: + raise CircuitBreakerRecordNotFoundError from exc + return self._item_to_record(item) + + def _put_record(self, record: CircuitStateRecord, condition: str | None = None) -> None: + # Persist the open timestamp for the store's TTL so abandoned circuits self-clean. + item = self._record_to_item(record) + + put_kwargs: dict = {"TableName": self.table_name, "Item": item} + + if condition == "half_open": + # Elect exactly one prober. The write only succeeds while the circuit is still + # OPEN and no environment has claimed the probe. The winner flips the state to + # HALF_OPEN and stamps its owner, so every concurrent attempt fails both clauses + # (state is no longer OPEN and the owner now exists). DynamoDB serializes these + # conditional writes, so the election is atomic. + put_kwargs["ConditionExpression"] = "#state = :open AND attribute_not_exists(#half_open_owner)" + put_kwargs["ExpressionAttributeNames"] = { + "#state": self.state_attr, + "#half_open_owner": self.half_open_owner_attr, + } + put_kwargs["ExpressionAttributeValues"] = {":open": {"S": str(CircuitState.OPEN)}} + + try: + self.client.put_item(**put_kwargs) + except ClientError as exc: + if exc.response.get("Error", {}).get("Code") == "ConditionalCheckFailedException": + raise CircuitBreakerExistingLockError from exc + raise + + def _update_record(self, record: CircuitStateRecord) -> None: + update_expression = "SET #state = :state, #failure_count = :failure_count" + expression_attr_names = { + "#state": self.state_attr, + "#failure_count": self.failure_count_attr, + } + expression_attr_values: dict = { + ":state": {"S": str(record.state)}, + ":failure_count": {"N": str(record.failure_count)}, + } + + if record.expiry_timestamp is not None: + # Refresh the store TTL so a live circuit is never reaped mid-recovery. + update_expression += ", #expiration = :expiration" + expression_attr_names["#expiration"] = self.expiry_attr + expression_attr_values[":expiration"] = {"N": str(record.expiry_timestamp)} + + # The half-open owner lock must be cleared on every state change out of HALF_OPEN, + # whether the probe closed the circuit (opened_at is None) or reopened it (opened_at + # set). Otherwise the stale owner makes the next probe election's + # attribute_not_exists(half_open_owner) condition fail forever, stranding the circuit. + if record.opened_at is not None: + update_expression += ", #opened_at = :opened_at REMOVE #half_open_owner" + expression_attr_names["#opened_at"] = self.opened_at_attr + expression_attr_names["#half_open_owner"] = self.half_open_owner_attr + expression_attr_values[":opened_at"] = {"N": str(record.opened_at)} + else: + update_expression += " REMOVE #opened_at, #half_open_owner" + expression_attr_names["#opened_at"] = self.opened_at_attr + expression_attr_names["#half_open_owner"] = self.half_open_owner_attr + + self.client.update_item( + TableName=self.table_name, + Key={self.key_attr: {"S": record.name}}, + UpdateExpression=update_expression, + ExpressionAttributeNames=expression_attr_names, + ExpressionAttributeValues=expression_attr_values, + ) diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/record.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/record.py new file mode 100644 index 00000000000..afaaee7b3fa --- /dev/null +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/persistence/record.py @@ -0,0 +1,62 @@ +""" +Internal record type for circuit state held in a persistence store. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from aws_lambda_powertools.utilities.circuit_breaker_alpha.states import CircuitInfo, CircuitState + + +@dataclass +class CircuitStateRecord: + """ + The persisted state of a single circuit. + + One record exists per circuit name. This is the utility's internal representation; + user code never sees it directly, only the ``CircuitInfo`` produced by + :meth:`to_circuit_info`. + + Parameters + ---------- + name : str + Circuit name, used as the partition key in the store. + state : CircuitState + Current circuit state. + failure_count : int + Consecutive failures recorded by the environment that last wrote the record. + opened_at : int | None + Unix timestamp (seconds) the circuit opened. Anchors the recovery timeout; + ``None`` while closed. + half_open_owner : str | None + Identifier of the execution environment that won the half-open probe lock, if any. + expiry_timestamp : int | None + Unix timestamp (seconds) for the store's TTL attribute. + """ + + name: str + state: CircuitState + failure_count: int = 0 + opened_at: int | None = None + half_open_owner: str | None = None + expiry_timestamp: int | None = None + + def to_circuit_info(self) -> CircuitInfo: + """ + Project this record to the public ``CircuitInfo`` handed to user code. + + Strips internal fields (``half_open_owner``, ``expiry_timestamp``) so no + persistence detail leaks across the public boundary. + + Returns + ------- + CircuitInfo + Public snapshot of the circuit. + """ + return CircuitInfo( + name=self.name, + state=self.state, + failure_count=self.failure_count, + opened_at=self.opened_at, + ) diff --git a/aws_lambda_powertools/utilities/circuit_breaker_alpha/states.py b/aws_lambda_powertools/utilities/circuit_breaker_alpha/states.py new file mode 100644 index 00000000000..cc041e17a36 --- /dev/null +++ b/aws_lambda_powertools/utilities/circuit_breaker_alpha/states.py @@ -0,0 +1,126 @@ +""" +Public state types for the Circuit Breaker utility. + +These are the only circuit-breaker types handed to user code (callbacks and the +``CircuitInfo`` attached to ``CircuitBreakerOpenError``). They deliberately expose no +persistence internals. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum + + +class CircuitState(str, Enum): + """ + The state of a circuit. + + Subclasses ``str`` so the value serializes directly to a persistence store as a + plain string (e.g. DynamoDB) and compares equal to its string form. + + Attributes + ---------- + CLOSED : str + Normal operation. Requests reach the downstream and failures are counted. + OPEN : str + The downstream is considered unhealthy. The protected call is skipped. + HALF_OPEN : str + Recovery is being tested. A limited number of probe requests are allowed + through to decide whether the circuit should close again. + """ + + CLOSED = "CLOSED" + OPEN = "OPEN" + HALF_OPEN = "HALF_OPEN" + + def __str__(self) -> str: + """Return the bare value (e.g. ``"OPEN"``) rather than ``CircuitState.OPEN``.""" + return self.value + + +@dataclass(frozen=True) +class CircuitInfo: + """ + Immutable snapshot of a circuit, passed to user code. + + This is the public boundary of the utility: it is the single argument (alongside + the payload) handed to an ``on_circuit_open`` callback, and it is attached to + ``CircuitBreakerOpenError`` so a caller can inspect why the circuit rejected the + request. No persistence details (probe lock, TTL) are exposed. + + Parameters + ---------- + name : str + The circuit name, as given to the ``@circuit_breaker`` decorator. + state : CircuitState + The circuit state at the moment the request was evaluated. + failure_count : int + A point-in-time snapshot of the *consecutive* failures the environment that + last wrote the record had counted, captured at the moment of a state + transition. It is **not** a running total of failures across the fleet: the + failure counter lives in memory per execution environment (so the healthy path + stays write-free), and only the tripping environment's count is persisted when + the circuit opens. It is ``0`` in states reached without a fresh trip (for + example ``HALF_OPEN``, or ``OPEN`` re-entered after a failed probe). For failure + *volume*, emit a CloudWatch metric from your own code or an ``on_transition`` + hook rather than reading this field. + opened_at : int | None + Unix timestamp (seconds) at which the circuit opened, or ``None`` while the + circuit is closed. Drives the recovery timeout. + + Example + ------- + **Inspecting circuit details inside a callback** + + def on_open(payload: dict, circuit: CircuitInfo): + logger.warning("circuit %s open since %s", circuit.name, circuit.opened_at) + return {"statusCode": 503} + """ + + name: str + state: CircuitState + failure_count: int + opened_at: int | None = None + + +@dataclass(frozen=True) +class CircuitTransition: + """ + Immutable description of a circuit state change, passed to an ``on_transition`` hook. + + The hook fires only on the rare state transitions a circuit makes (open, probe, + close, reopen), never on the per-invocation hot path, so emitting a metric from it + does not undermine the write-free healthy path. + + Parameters + ---------- + circuit_name : str + The circuit name, as given to the ``@circuit_breaker`` decorator. + from_state : CircuitState + The state the circuit was in before the transition. + to_state : CircuitState + The state the circuit moved to. + opened_at : int | None + Unix timestamp (seconds) the circuit opened, when relevant to the new state. + + Example + ------- + **Emit a CloudWatch metric per transition** + + from aws_lambda_powertools.metrics import MetricUnit, single_metric + + def emit(transition: CircuitTransition) -> None: + with single_metric( + namespace="MyApp", + name=f"Circuit{transition.to_state}", + unit=MetricUnit.Count, + value=1, + ) as metric: + metric.add_dimension(name="circuit", value=transition.circuit_name) + """ + + circuit_name: str + from_state: CircuitState + to_state: CircuitState + opened_at: int | None = None diff --git a/docs/api_doc/circuit_breaker_alpha/circuit_breaker.md b/docs/api_doc/circuit_breaker_alpha/circuit_breaker.md new file mode 100644 index 00000000000..824b40225bc --- /dev/null +++ b/docs/api_doc/circuit_breaker_alpha/circuit_breaker.md @@ -0,0 +1,2 @@ + +::: aws_lambda_powertools.utilities.circuit_breaker_alpha.circuit_breaker diff --git a/docs/api_doc/circuit_breaker_alpha/config.md b/docs/api_doc/circuit_breaker_alpha/config.md new file mode 100644 index 00000000000..20b548082c1 --- /dev/null +++ b/docs/api_doc/circuit_breaker_alpha/config.md @@ -0,0 +1,2 @@ + +::: aws_lambda_powertools.utilities.circuit_breaker_alpha.config diff --git a/docs/api_doc/circuit_breaker_alpha/exceptions.md b/docs/api_doc/circuit_breaker_alpha/exceptions.md new file mode 100644 index 00000000000..283374e42e5 --- /dev/null +++ b/docs/api_doc/circuit_breaker_alpha/exceptions.md @@ -0,0 +1,2 @@ + +::: aws_lambda_powertools.utilities.circuit_breaker_alpha.exceptions diff --git a/docs/api_doc/circuit_breaker_alpha/persistence.md b/docs/api_doc/circuit_breaker_alpha/persistence.md new file mode 100644 index 00000000000..f865dfd10f0 --- /dev/null +++ b/docs/api_doc/circuit_breaker_alpha/persistence.md @@ -0,0 +1,2 @@ + +::: aws_lambda_powertools.utilities.circuit_breaker_alpha.persistence diff --git a/docs/api_doc/circuit_breaker_alpha/states.md b/docs/api_doc/circuit_breaker_alpha/states.md new file mode 100644 index 00000000000..c6f232a0e28 --- /dev/null +++ b/docs/api_doc/circuit_breaker_alpha/states.md @@ -0,0 +1,2 @@ + +::: aws_lambda_powertools.utilities.circuit_breaker_alpha.states diff --git a/docs/utilities/circuit_breaker.md b/docs/utilities/circuit_breaker.md new file mode 100644 index 00000000000..a59d3fde292 --- /dev/null +++ b/docs/utilities/circuit_breaker.md @@ -0,0 +1,228 @@ +--- +title: Circuit Breaker +description: Utility +--- + + + +!!! warning "Alpha / experimental" + This utility ships under the **`circuit_breaker_alpha`** namespace while we collect + feedback. The public API may change in a backwards-incompatible way before it is + promoted to GA, at which point the import path becomes `circuit_breaker`. Pin your + Powertools version and follow the tracking discussion before relying on it in + production. + +The circuit breaker utility stops sending traffic to an unhealthy downstream dependency, giving it room to recover while you decide what happens to the rejected requests. + +## Key features + +* Stops calling an unhealthy downstream after a configurable number of consecutive failures +* Hands rejected requests to an `on_circuit_open` callback so you decide what happens next (buffer, drop, return a cached value) +* Tests recovery with an explicit half-open probe rather than blindly retrying everything at once +* Shares circuit state across execution environments via Amazon DynamoDB +* Keeps the healthy path write-free: failures are counted in memory and only persisted on a state transition + +## Terminology + +**Circuit** is a named guard around a single downstream dependency. Each `name` is an independent circuit. + +**State** is the circuit's current mode: `CLOSED` (normal), `OPEN` (downstream considered unhealthy, calls skipped), or `HALF_OPEN` (testing recovery). + +**Persistence layer** is the shared storage that holds each circuit's state so every execution environment agrees on whether a circuit is open. + +**Recovery timeout** is how long a circuit stays open before allowing a half-open probe. + +