diff --git a/docs/docs/en/redis/sentinel.md b/docs/docs/en/redis/sentinel.md new file mode 100644 index 0000000000..1025a8309a --- /dev/null +++ b/docs/docs/en/redis/sentinel.md @@ -0,0 +1,104 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 10 +--- + +# Redis Sentinel Broker + +## Overview + +**FastStream** provides a `RedisSentinelBroker` for working with [**Redis Sentinel**](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/){.external-link target="_blank"} — Redis' built-in high-availability solution. Sentinel monitors a master/replica set and automatically promotes a replica when the master fails. `RedisSentinelBroker` discovers the current master from the sentinel nodes, so publishers and consumers keep working across a failover without any manual reconfiguration. + +Unlike a Redis Cluster, Sentinel does **not** shard data — it is a single logical Redis with automatic master failover. Commands, Pub/Sub, Lists, Streams and Pipelines therefore behave **exactly** like a plain `RedisBroker`; only the way the connection is acquired differs. + +## When to Use + +| Use RedisBroker | Use RedisSentinelBroker | +|---|---| +| Single Redis instance | Master/replica set behind Sentinel | +| Development / testing | Production with automatic failover | +| Fixed master address | Master address discovered dynamically | + +## Connecting + +Pass the sentinel `(host, port)` nodes and the monitored master group name: + +```python linenums="1" +from faststream.redis import RedisSentinelBroker + +broker = RedisSentinelBroker( + sentinels=[ + ("sentinel-1", 26379), + ("sentinel-2", 26379), + ("sentinel-3", 26379), + ], + sentinel_master_name="mymaster", +) +``` + +Both `sentinels` and `sentinel_master_name` are required. Connection options such as `db`, `socket_timeout` or `security` are forwarded to the discovered master connection. Use `sentinel_kwargs` to pass options that apply to the **sentinel** connections themselves: + +```python linenums="1" +broker = RedisSentinelBroker( + sentinels=[("sentinel-1", 26379), ("sentinel-2", 26379)], + sentinel_master_name="mymaster", + sentinel_kwargs={"socket_timeout": 1.0}, + db=1, +) +``` + +## How Failover Works + +Under the hood the broker builds the client through `Sentinel(...).master_for(master_name)`, which returns a client backed by a `SentinelConnectionPool`. That pool re-discovers the current master from the sentinels on every reconnect. Because every publisher and stream consumer goes through `connection.client`, they all fail over transparently — a dropped connection during a master promotion is simply re-established against the new master. + +## FastAPI Integration + +For FastAPI applications use `RedisSentinelRouter`, which builds a `RedisSentinelBroker` under the hood and otherwise behaves like `RedisRouter`: + +```python linenums="1" +from faststream.redis.fastapi import RedisSentinelRouter + +router = RedisSentinelRouter( + sentinels=[("sentinel-1", 26379), ("sentinel-2", 26379)], + sentinel_master_name="mymaster", +) + +@router.subscriber("channel-name") +async def handle(msg: str) -> None: ... +``` + +## Feature Support + +| Feature | RedisBroker | RedisSentinelBroker | +|---|---|---| +| List | ✅ | ✅ | +| Stream + XAUTOCLAIM | ✅ | ✅ | +| Pub/Sub | ✅ | ✅ | +| Pipeline | ✅ | ✅ | + +## Migration from RedisBroker + +`RedisSentinelBroker` accepts the same parameters as `RedisBroker` plus the sentinel-specific ones, so migration only requires pointing the broker at the sentinels instead of a fixed host: + +```python +# Before +from faststream.redis import RedisBroker +broker = RedisBroker(url="redis://master-host:6379") + +# After +from faststream.redis import RedisSentinelBroker +broker = RedisSentinelBroker( + sentinels=[("sentinel-1", 26379), ("sentinel-2", 26379)], + sentinel_master_name="mymaster", +) +``` + +## References + +- [Redis Sentinel Documentation](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/){.external-link target="_blank"} +- [High Availability with Redis Sentinel](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/#high-availability){.external-link target="_blank"} diff --git a/docs/docs/navigation_template.txt b/docs/docs/navigation_template.txt index 8ae793fa49..72433372bc 100644 --- a/docs/docs/navigation_template.txt +++ b/docs/docs/navigation_template.txt @@ -116,6 +116,7 @@ search: - [In-Progress](howto/nats/in-progress.md) - [Redis](redis/index.md) - [Cluster](redis/cluster.md) + - [Sentinel](redis/sentinel.md) - [Pub/Sub](redis/pubsub/index.md) - [Subscription](redis/pubsub/subscription.md) - [Publishing](redis/pubsub/publishing.md) diff --git a/faststream/redis/__init__.py b/faststream/redis/__init__.py index dd7150c088..55ff367d97 100644 --- a/faststream/redis/__init__.py +++ b/faststream/redis/__init__.py @@ -18,6 +18,7 @@ RedisPublisher, RedisRoute, RedisRouter, + RedisSentinelBroker, ) from .exceptions import StreamGroupNotFoundError from .parser import BinaryMessageFormatV1 @@ -50,6 +51,7 @@ "RedisResponse", "RedisRoute", "RedisRouter", + "RedisSentinelBroker", "RedisStreamMessage", "StreamGroupNotFoundError", "StreamSub", diff --git a/faststream/redis/broker/__init__.py b/faststream/redis/broker/__init__.py index 7ae04eb819..af99b48867 100644 --- a/faststream/redis/broker/__init__.py +++ b/faststream/redis/broker/__init__.py @@ -1,6 +1,7 @@ from .broker import RedisBroker from .cluster_broker import RedisClusterBroker from .router import RedisPublisher, RedisRoute, RedisRouter +from .sentinel_broker import RedisSentinelBroker __all__ = ( "RedisBroker", @@ -8,4 +9,5 @@ "RedisPublisher", "RedisRoute", "RedisRouter", + "RedisSentinelBroker", ) diff --git a/faststream/redis/broker/broker.py b/faststream/redis/broker/broker.py index cd9a25114e..b2d23f2d8f 100644 --- a/faststream/redis/broker/broker.py +++ b/faststream/redis/broker/broker.py @@ -21,7 +21,10 @@ from faststream._internal.context.repository import ContextRepo from faststream._internal.di import FastDependsConfig from faststream.message import gen_cor_id -from faststream.redis.configs import RedisBrokerConfig, RedisConnectionState +from faststream.redis.configs import ( + RedisBrokerConfig, + RedisConnectionState, +) from faststream.redis.message import UnifyRedisDict from faststream.redis.parser import BinaryMessageFormatV1 from faststream.redis.publisher.producer import RedisFastProducer diff --git a/faststream/redis/broker/sentinel_broker.py b/faststream/redis/broker/sentinel_broker.py new file mode 100644 index 0000000000..01a644511d --- /dev/null +++ b/faststream/redis/broker/sentinel_broker.py @@ -0,0 +1,126 @@ +import logging +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +from fast_depends import dependency_provider +from typing_extensions import Unpack + +from faststream._internal.constants import EMPTY +from faststream._internal.context.repository import ContextRepo +from faststream._internal.di import FastDependsConfig +from faststream.exceptions import SetupError +from faststream.redis.broker import RedisBroker +from faststream.redis.configs import RedisBrokerConfig +from faststream.redis.configs.state import RedisSentinelConnectionState +from faststream.redis.parser import BinaryMessageFormatV1 +from faststream.redis.publisher.producer import RedisFastProducer +from faststream.redis.schemas.types import NON_CONNECTION_PARAMS, SENTINEL_PARAMS +from faststream.specification.schema import BrokerSpec + +from .logging import make_redis_logger_state + +if TYPE_CHECKING: + from faststream.redis.schemas.types import RedisSentinelParams + + +class RedisSentinelBroker(RedisBroker): + """A Redis broker backed by Redis Sentinel (high-availability with failover). + + The master address is discovered from the ``sentinels`` nodes through + ``Sentinel.master_for(...)``; the underlying ``SentinelConnectionPool`` + re-discovers the current master on reconnect, so publishers and consumers + survive a master failover transparently. Commands, Pub/Sub, Lists and + Streams behave exactly like a plain ``RedisBroker`` — only the connection + acquisition differs. + """ + + def __init__( + self, + url: str = "redis://localhost:6379", + **kwargs: Unpack["RedisSentinelParams"], + ) -> None: + sentinels = kwargs.pop("sentinels", None) + sentinel_master_name = kwargs.pop("sentinel_master_name", None) + sentinel_kwargs = kwargs.pop("sentinel_kwargs", None) + + if not sentinels: + msg = "`sentinels` is required for RedisSentinelBroker." + raise SetupError(msg) + if not sentinel_master_name: + msg = "`sentinel_master_name` is required for RedisSentinelBroker." + raise SetupError(msg) + + host = kwargs.pop("host", EMPTY) + port = kwargs.pop("port", EMPTY) + security = kwargs.pop("security", None) + specification_url = kwargs.pop("specification_url", None) + protocol = kwargs.pop("protocol", None) + message_format = kwargs.pop("message_format", BinaryMessageFormatV1) + self.message_format = message_format + + if specification_url is None: + specification_url = url + if protocol is None: + protocol = urlparse(specification_url).scheme + + connection_kwargs = { + k: v + for k, v in kwargs.items() + if k not in NON_CONNECTION_PARAMS | SENTINEL_PARAMS + } + connection_options = self._resolve_url_options( + url, + security=security, + host=host, + port=port, + **connection_kwargs, + ) + + connection_state = RedisSentinelConnectionState( + connection_options, + sentinels=list(sentinels), + master_name=sentinel_master_name, + sentinel_kwargs=sentinel_kwargs, + ) + + super(RedisBroker, self).__init__( + **connection_options, + routers=kwargs.get("routers", ()), + config=RedisBrokerConfig( + connection=connection_state, + producer=RedisFastProducer( + connection=connection_state, + parser=kwargs.get("parser"), + decoder=kwargs.get("decoder"), + message_format=self.message_format, + serializer=kwargs.get("serializer"), + ), + message_format=self.message_format, + broker_middlewares=kwargs.get("middlewares", ()), + broker_parser=kwargs.get("parser"), + broker_decoder=kwargs.get("decoder"), + broker_codec=kwargs.get("codec"), + logger=make_redis_logger_state( + logger=kwargs.get("logger", EMPTY), + log_level=kwargs.get("log_level", logging.INFO), + ), + fd_config=FastDependsConfig( + use_fastdepends=kwargs.get("apply_types", True), + serializer=kwargs.get("serializer", EMPTY), + provider=kwargs.get("provider") or dependency_provider, + context=kwargs.get("context") or ContextRepo(), + ), + broker_dependencies=kwargs.get("dependencies", ()), + graceful_timeout=kwargs.get("graceful_timeout", 15.0), + ack_policy=kwargs.get("ack_policy", EMPTY), + extra_context={"broker": self}, + ), + specification=BrokerSpec( + description=kwargs.get("description"), + url=[specification_url], + protocol=protocol, + protocol_version=kwargs.get("protocol_version", "custom"), + security=security, + tags=kwargs.get("tags", ()), + ), + ) diff --git a/faststream/redis/configs/__init__.py b/faststream/redis/configs/__init__.py index 8dda192e8b..6fa7a17049 100644 --- a/faststream/redis/configs/__init__.py +++ b/faststream/redis/configs/__init__.py @@ -3,6 +3,7 @@ ConnectionState, RedisClusterConnectionState, RedisConnectionState, + RedisSentinelConnectionState, ) __all__ = ( @@ -10,4 +11,5 @@ "RedisBrokerConfig", "RedisClusterConnectionState", "RedisConnectionState", + "RedisSentinelConnectionState", ) diff --git a/faststream/redis/configs/state.py b/faststream/redis/configs/state.py index fe9d729ae6..0d73b26b2c 100644 --- a/faststream/redis/configs/state.py +++ b/faststream/redis/configs/state.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +from collections.abc import Mapping, Sequence from concurrent.futures import ThreadPoolExecutor from functools import partial from typing import Any, Generic, TypeVar @@ -6,6 +7,7 @@ from redis.asyncio.client import Redis from redis.asyncio.cluster import RedisCluster from redis.asyncio.connection import ConnectionPool +from redis.asyncio.sentinel import Sentinel from redis.cluster import ( ClusterNode, RedisCluster as SyncRC, @@ -66,6 +68,51 @@ async def connect(self) -> "Redis[bytes]": return client +class RedisSentinelConnectionState(RedisConnectionState): + """Builds the client via ``Sentinel.master_for`` for HA / failover. + + The underlying ``SentinelConnectionPool`` re-discovers the current master + on every reconnect, so publishers and stream consumers fail over for free + (both go through ``connection.client``). + """ + + def __init__( + self, + options: dict[str, Any] | None = None, + *, + sentinels: Sequence[tuple[str, int]], + master_name: str, + sentinel_kwargs: Mapping[str, Any] | None = None, + ) -> None: + super().__init__(options) + self._sentinels = list(sentinels) + self._master_name = master_name + self._sentinel_kwargs = sentinel_kwargs + + async def connect(self) -> "Redis[bytes]": + # ``host``/``port`` describe a single node and are meaningless for + # Sentinel — the master address is discovered from the sentinels. + connection_kwargs = { + k: v for k, v in self._options.items() if k not in {"host", "port"} + } + connection_kwargs["lib_name"] = "faststream" + connection_kwargs["lib_version"] = __version__ + + manager = Sentinel( + self._sentinels, + sentinel_kwargs=dict(self._sentinel_kwargs) + if self._sentinel_kwargs is not None + else None, + **connection_kwargs, + ) + client: Redis[bytes] = manager.master_for(self._master_name) + + self._client = client + self._connected = True + + return client + + class RedisClusterConnectionState(ConnectionState["RedisCluster[bytes]"]): """Manages a Redis Cluster connection lifecycle. diff --git a/faststream/redis/fastapi/__init__.py b/faststream/redis/fastapi/__init__.py index e5cfb320f2..059b426c1d 100644 --- a/faststream/redis/fastapi/__init__.py +++ b/faststream/redis/fastapi/__init__.py @@ -6,7 +6,7 @@ from faststream.redis.broker.broker import RedisBroker as RB from faststream.redis.message import BaseMessage as RM # noqa: N814 -from .fastapi import RedisRouter +from .fastapi import RedisRouter, RedisSentinelRouter __all__ = ( "Context", @@ -16,6 +16,7 @@ "RedisBroker", "RedisChannelMessage", "RedisRouter", + "RedisSentinelRouter", ) RedisChannelMessage = Annotated[RM, Context("message")] diff --git a/faststream/redis/fastapi/fastapi.py b/faststream/redis/fastapi/fastapi.py index be7bae6a67..93899de223 100644 --- a/faststream/redis/fastapi/fastapi.py +++ b/faststream/redis/fastapi/fastapi.py @@ -26,6 +26,7 @@ from faststream._internal.fastapi.router import StreamRouter from faststream.middlewares import AckPolicy from faststream.redis.broker.broker import RedisBroker as RB +from faststream.redis.broker.sentinel_broker import RedisSentinelBroker as RSB from faststream.redis.message import UnifyRedisDict from faststream.redis.schemas import ListSub, PubSub, StreamSub @@ -134,6 +135,7 @@ def __init__( generate_unique_id_function: Callable[["APIRoute"], str] = Default( generate_unique_id ), + **connection_kwargs: Any, ) -> None: """Initialize the RedisRouter object. @@ -314,6 +316,8 @@ def __init__( Read more about it in the [FastAPI docs about how to Generate Clients](https://fastapi.tiangolo.com/advanced/generate-clients/#custom-generate-unique-id-function). + connection_kwargs: + Extra connection arguments forwarded to the underlying broker. """ super().__init__( @@ -372,6 +376,7 @@ def __init__( include_in_schema=include_in_schema, lifespan=lifespan, generate_unique_id_function=generate_unique_id_function, + **connection_kwargs, ) @overload # type: ignore[override] @@ -737,3 +742,32 @@ def publisher( schema=schema, include_in_schema=include_in_schema, ) + + +class RedisSentinelRouter(RedisRouter): + """A Redis Sentinel router (high-availability with master failover). + + Behaves like :class:`RedisRouter` but builds a :class:`RedisSentinelBroker`, + discovering the master through the ``sentinels`` nodes. All other arguments + are forwarded to the underlying broker unchanged. + """ + + broker_class = RSB + broker: RSB + + def __init__( + self, + url: str = "redis://localhost:6379", + *, + sentinels: Sequence[tuple[str, int]], + sentinel_master_name: str | None = None, + sentinel_kwargs: Mapping[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__( + url, + sentinels=sentinels, + sentinel_master_name=sentinel_master_name, + sentinel_kwargs=sentinel_kwargs, + **kwargs, + ) diff --git a/faststream/redis/schemas/types.py b/faststream/redis/schemas/types.py index 60ce107e66..526c6e0a4b 100644 --- a/faststream/redis/schemas/types.py +++ b/faststream/redis/schemas/types.py @@ -109,6 +109,21 @@ class RedisClusterParams(RedisBrokerParams, total=False): ] +class RedisSentinelParams(RedisBrokerParams, total=False): + sentinels: Annotated[ + Sequence[tuple[str, int]], + "Redis Sentinel ``(host, port)`` nodes to discover the master from. Required.", + ] + sentinel_master_name: Annotated[ + str, + "Sentinel master group name. Required.", + ] + sentinel_kwargs: Annotated[ + Mapping[str, Any] | None, + "Connection kwargs for the Sentinel nodes themselves. Defaults to ``None``.", + ] + + CLUSTER_INCOMPATIBLE_PARAMS = frozenset({ "db", "socket_read_size", @@ -121,6 +136,13 @@ class RedisClusterParams(RedisBrokerParams, total=False): "port", }) + +SENTINEL_PARAMS = frozenset({ + "sentinels", + "sentinel_master_name", + "sentinel_kwargs", +}) + NON_CONNECTION_PARAMS = frozenset({ "graceful_timeout", "ack_policy", diff --git a/faststream/redis/subscriber/usecases/basic.py b/faststream/redis/subscriber/usecases/basic.py index 969e3bf4c3..3d526944f4 100644 --- a/faststream/redis/subscriber/usecases/basic.py +++ b/faststream/redis/subscriber/usecases/basic.py @@ -32,6 +32,11 @@ TopicName: TypeAlias = bytes Offset: TypeAlias = bytes +# Delay (seconds) before a subscriber retries after a message-fetch error. +# Shared by all Redis subscriber loops to avoid busy-looping on persistent +# connection errors (e.g. while a Sentinel pool fails over to a new master). +CONSUME_ERROR_BACKOFF_SECONDS = 5 + class LogicSubscriber(TasksMixin, SubscriberUsecase[UnifyRedisDict]): """A class to represent a Redis handler.""" @@ -100,7 +105,7 @@ async def _consume(self, *args: Any, start_signal: anyio.Event) -> None: if connected: connected = False - await anyio.sleep(5) + await anyio.sleep(CONSUME_ERROR_BACKOFF_SECONDS) else: if not connected: diff --git a/faststream/redis/subscriber/usecases/stream_subscriber.py b/faststream/redis/subscriber/usecases/stream_subscriber.py index fd3fb3e4e9..02dbe0cdca 100644 --- a/faststream/redis/subscriber/usecases/stream_subscriber.py +++ b/faststream/redis/subscriber/usecases/stream_subscriber.py @@ -4,6 +4,7 @@ from collections.abc import AsyncIterator, Awaitable, Callable from typing import TYPE_CHECKING, Any, Optional, TypeAlias +import anyio from redis.exceptions import ResponseError from typing_extensions import override @@ -20,7 +21,7 @@ RedisStreamParser, ) -from .basic import LogicSubscriber +from .basic import CONSUME_ERROR_BACKOFF_SECONDS, LogicSubscriber if TYPE_CHECKING: from anyio import Event @@ -107,6 +108,7 @@ async def _consume(self, *args: Any, start_signal: "Event") -> None: message="Message fetch error", exc_info=e, ) + await anyio.sleep(CONSUME_ERROR_BACKOFF_SECONDS) finally: if not start_signal.is_set(): diff --git a/pyproject.toml b/pyproject.toml index a2cec5674f..feaef4e869 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -213,6 +213,7 @@ markers = [ "nats", "redis", "redis_cluster", + "redis_sentinel", "mqtt", "slow", "connected", diff --git a/tests/brokers/redis/test_sentinel.py b/tests/brokers/redis/test_sentinel.py new file mode 100644 index 0000000000..f9d328b56c --- /dev/null +++ b/tests/brokers/redis/test_sentinel.py @@ -0,0 +1,71 @@ +import pytest +from redis.asyncio.sentinel import SentinelConnectionPool + +from faststream.exceptions import SetupError +from faststream.redis import RedisSentinelBroker +from faststream.redis.configs.state import RedisSentinelConnectionState + +SENTINELS = [("sentinel-1", 26379), ("sentinel-2", 26379)] + + +@pytest.mark.redis() +class TestRedisSentinelBrokerUnit: + """Unit tests for RedisSentinelBroker (no running Redis/Sentinel needed).""" + + def test_builds_sentinel_connection_state(self) -> None: + broker = RedisSentinelBroker(sentinels=SENTINELS, sentinel_master_name="mymaster") + connection = broker.config.broker_config.connection + assert isinstance(connection, RedisSentinelConnectionState) + assert connection._master_name == "mymaster" + assert connection._sentinels == SENTINELS + + def test_sentinel_kwargs_stored(self) -> None: + broker = RedisSentinelBroker( + sentinels=SENTINELS, + sentinel_master_name="mymaster", + sentinel_kwargs={"socket_timeout": 1.0}, + ) + connection = broker.config.broker_config.connection + assert connection._sentinel_kwargs == {"socket_timeout": 1.0} + + def test_requires_sentinels(self) -> None: + with pytest.raises(SetupError, match="sentinels"): + RedisSentinelBroker(sentinels=[], sentinel_master_name="mymaster") + + def test_requires_master_name(self) -> None: + with pytest.raises(SetupError, match="sentinel_master_name"): + RedisSentinelBroker(sentinels=SENTINELS) + + @pytest.mark.asyncio() + async def test_connect_builds_sentinel_pool(self) -> None: + broker = RedisSentinelBroker( + sentinels=SENTINELS, sentinel_master_name="mymaster", db=1 + ) + connection = broker.config.broker_config.connection + client = await connection.connect() + try: + assert isinstance(client.connection_pool, SentinelConnectionPool) + assert client.connection_pool.service_name == "mymaster" + finally: + await connection.disconnect() + + +@pytest.mark.redis() +class TestRedisSentinelFastAPIRouterUnit: + """RedisSentinelRouter (FastAPI) must build a Sentinel-backed broker.""" + + def test_router_builds_sentinel_broker(self) -> None: + from faststream.redis.fastapi import RedisSentinelRouter + + router = RedisSentinelRouter(sentinels=SENTINELS, sentinel_master_name="mymaster") + assert isinstance(router.broker, RedisSentinelBroker) + connection = router.broker.config.broker_config.connection + assert isinstance(connection, RedisSentinelConnectionState) + assert connection._master_name == "mymaster" + assert connection._sentinels == SENTINELS + + def test_router_requires_master_name(self) -> None: + from faststream.redis.fastapi import RedisSentinelRouter + + with pytest.raises(SetupError, match="sentinel_master_name"): + RedisSentinelRouter(sentinels=SENTINELS)