From 6e496102c62c082bf9f79e773023a277f8a1f886 Mon Sep 17 00:00:00 2001 From: Alexandr Yakovlev Date: Sun, 31 May 2026 20:14:21 +0300 Subject: [PATCH 1/4] feat(redis): add Redis Sentinel support Add connection-level Redis Sentinel support to RedisBroker: - New broker params: sentinels, sentinel_master_name, sentinel_kwargs. - RedisConnectionState builds the client via Sentinel.master_for() when configured, so the SentinelConnectionPool re-discovers the current master on reconnect. Publishers and stream consumers fail over for free, since both go through connection.client. - Stream subscriber now backs off on fetch errors like the base channel/list loop (shared CONSUME_ERROR_BACKOFF_SECONDS), avoiding a busy-loop during a master failover. - Unit tests (no running Sentinel needed) and a redis_sentinel marker. Refs #1812 --- faststream/redis/broker/broker.py | 27 +++++++- faststream/redis/configs/__init__.py | 2 + faststream/redis/configs/state.py | 61 +++++++++++++++++-- faststream/redis/schemas/types.py | 17 ++++++ faststream/redis/subscriber/usecases/basic.py | 7 ++- .../subscriber/usecases/stream_subscriber.py | 4 +- pyproject.toml | 1 + tests/brokers/redis/test_sentinel.py | 51 ++++++++++++++++ 8 files changed, 160 insertions(+), 10 deletions(-) create mode 100644 tests/brokers/redis/test_sentinel.py diff --git a/faststream/redis/broker/broker.py b/faststream/redis/broker/broker.py index cd9a25114e..42e8e81427 100644 --- a/faststream/redis/broker/broker.py +++ b/faststream/redis/broker/broker.py @@ -20,8 +20,13 @@ from faststream._internal.constants import EMPTY from faststream._internal.context.repository import ContextRepo from faststream._internal.di import FastDependsConfig +from faststream.exceptions import SetupError from faststream.message import gen_cor_id -from faststream.redis.configs import RedisBrokerConfig, RedisConnectionState +from faststream.redis.configs import ( + RedisBrokerConfig, + RedisConnectionState, + SentinelConfig, +) from faststream.redis.message import UnifyRedisDict from faststream.redis.parser import BinaryMessageFormatV1 from faststream.redis.publisher.producer import RedisFastProducer @@ -64,6 +69,10 @@ def __init__( protocol = kwargs.pop("protocol", None) message_format = kwargs.pop("message_format", BinaryMessageFormatV1) + sentinels = kwargs.pop("sentinels", None) + sentinel_master_name = kwargs.pop("sentinel_master_name", None) + sentinel_kwargs = kwargs.pop("sentinel_kwargs", None) + self.message_format = message_format if specification_url is None: @@ -82,7 +91,21 @@ def __init__( **connection_kwargs, ) - connection_state = RedisConnectionState(connection_options) + sentinel_config: SentinelConfig | None = None + if sentinels: + if not sentinel_master_name: + msg = "`sentinel_master_name` is required when `sentinels` is set." + raise SetupError(msg) + sentinel_config = SentinelConfig( + sentinels=list(sentinels), + master_name=sentinel_master_name, + sentinel_kwargs=sentinel_kwargs, + ) + + connection_state = RedisConnectionState( + connection_options, + sentinel=sentinel_config, + ) super().__init__( **connection_options, diff --git a/faststream/redis/configs/__init__.py b/faststream/redis/configs/__init__.py index 8dda192e8b..2d25307cf1 100644 --- a/faststream/redis/configs/__init__.py +++ b/faststream/redis/configs/__init__.py @@ -3,6 +3,7 @@ ConnectionState, RedisClusterConnectionState, RedisConnectionState, + SentinelConfig, ) __all__ = ( @@ -10,4 +11,5 @@ "RedisBrokerConfig", "RedisClusterConnectionState", "RedisConnectionState", + "SentinelConfig", ) diff --git a/faststream/redis/configs/state.py b/faststream/redis/configs/state.py index fe9d729ae6..b6b609279e 100644 --- a/faststream/redis/configs/state.py +++ b/faststream/redis/configs/state.py @@ -1,11 +1,14 @@ from abc import ABC, abstractmethod +from collections.abc import Mapping, Sequence from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass, field from functools import partial from typing import Any, Generic, TypeVar from redis.asyncio.client import Redis from redis.asyncio.cluster import RedisCluster from redis.asyncio.connection import ConnectionPool +from redis.asyncio.sentinel import Sentinel from redis.cluster import ( ClusterNode, RedisCluster as SyncRC, @@ -18,6 +21,21 @@ ClientT = TypeVar("ClientT") +@dataclass +class SentinelConfig: + """Redis Sentinel connection configuration. + + When set on ``RedisConnectionState``, the client is built through + ``Sentinel(...).master_for(...)`` so the underlying + ``SentinelConnectionPool`` re-discovers the current master on every + reconnect (high-availability / failover). + """ + + sentinels: Sequence[tuple[str, int]] + master_name: str + sentinel_kwargs: Mapping[str, Any] | None = field(default=None) + + class ConnectionState(ABC, Generic[ClientT]): """Base connection state.""" @@ -52,19 +70,50 @@ async def disconnect(self) -> None: class RedisConnectionState(ConnectionState["Redis[bytes]"]): + def __init__( + self, + options: dict[str, Any] | None = None, + *, + sentinel: SentinelConfig | None = None, + ) -> None: + super().__init__(options) + self._sentinel = sentinel + async def connect(self) -> "Redis[bytes]": - pool = ConnectionPool( - **self._options, - lib_name="faststream", - lib_version=__version__, - ) - client: Redis[bytes] = Redis.from_pool(pool) # type: ignore[attr-defined] + client: Redis[bytes] + if self._sentinel is not None: + client = self._connect_via_sentinel(self._sentinel) + else: + pool = ConnectionPool( + **self._options, + lib_name="faststream", + lib_version=__version__, + ) + client = Redis.from_pool(pool) # type: ignore[attr-defined] self._client = client self._connected = True return client + def _connect_via_sentinel(self, sentinel: SentinelConfig) -> "Redis[bytes]": + # ``host``/``port`` describe a single node and are meaningless for + # Sentinel — the master address is discovered from the sentinels. + connection_kwargs = { + k: v for k, v in self._options.items() if k not in {"host", "port"} + } + connection_kwargs["lib_name"] = "faststream" + connection_kwargs["lib_version"] = __version__ + + manager = Sentinel( + list(sentinel.sentinels), + sentinel_kwargs=dict(sentinel.sentinel_kwargs) + if sentinel.sentinel_kwargs is not None + else None, + **connection_kwargs, + ) + return manager.master_for(sentinel.master_name) + class RedisClusterConnectionState(ConnectionState["RedisCluster[bytes]"]): """Manages a Redis Cluster connection lifecycle. diff --git a/faststream/redis/schemas/types.py b/faststream/redis/schemas/types.py index 60ce107e66..ec96d54597 100644 --- a/faststream/redis/schemas/types.py +++ b/faststream/redis/schemas/types.py @@ -55,6 +55,20 @@ class RedisConnectionParams(TypedDict, total=False): type[BaseParser], "Parser class. Defaults to ``DefaultParser``." ] encoder_class: Annotated[type[Encoder], "Encoder class. Defaults to ``Encoder``."] + sentinels: Annotated[ + Sequence[tuple[str, int]], + "Redis Sentinel ``(host, port)`` nodes. Enables Sentinel mode " + "(high-availability with master failover). Defaults to ``None``.", + ] + sentinel_master_name: Annotated[ + str, + "Sentinel master group name. Required when ``sentinels`` is set. " + "Defaults to ``None``.", + ] + sentinel_kwargs: Annotated[ + Mapping[str, Any] | None, + "Connection kwargs for the Sentinel nodes themselves. Defaults to ``None``.", + ] class RedisBrokerParams(RedisConnectionParams, total=False): @@ -119,6 +133,9 @@ class RedisClusterParams(RedisBrokerParams, total=False): "connection_class", "host", "port", + "sentinels", + "sentinel_master_name", + "sentinel_kwargs", }) NON_CONNECTION_PARAMS = frozenset({ diff --git a/faststream/redis/subscriber/usecases/basic.py b/faststream/redis/subscriber/usecases/basic.py index 969e3bf4c3..3d526944f4 100644 --- a/faststream/redis/subscriber/usecases/basic.py +++ b/faststream/redis/subscriber/usecases/basic.py @@ -32,6 +32,11 @@ TopicName: TypeAlias = bytes Offset: TypeAlias = bytes +# Delay (seconds) before a subscriber retries after a message-fetch error. +# Shared by all Redis subscriber loops to avoid busy-looping on persistent +# connection errors (e.g. while a Sentinel pool fails over to a new master). +CONSUME_ERROR_BACKOFF_SECONDS = 5 + class LogicSubscriber(TasksMixin, SubscriberUsecase[UnifyRedisDict]): """A class to represent a Redis handler.""" @@ -100,7 +105,7 @@ async def _consume(self, *args: Any, start_signal: anyio.Event) -> None: if connected: connected = False - await anyio.sleep(5) + await anyio.sleep(CONSUME_ERROR_BACKOFF_SECONDS) else: if not connected: diff --git a/faststream/redis/subscriber/usecases/stream_subscriber.py b/faststream/redis/subscriber/usecases/stream_subscriber.py index fd3fb3e4e9..02dbe0cdca 100644 --- a/faststream/redis/subscriber/usecases/stream_subscriber.py +++ b/faststream/redis/subscriber/usecases/stream_subscriber.py @@ -4,6 +4,7 @@ from collections.abc import AsyncIterator, Awaitable, Callable from typing import TYPE_CHECKING, Any, Optional, TypeAlias +import anyio from redis.exceptions import ResponseError from typing_extensions import override @@ -20,7 +21,7 @@ RedisStreamParser, ) -from .basic import LogicSubscriber +from .basic import CONSUME_ERROR_BACKOFF_SECONDS, LogicSubscriber if TYPE_CHECKING: from anyio import Event @@ -107,6 +108,7 @@ async def _consume(self, *args: Any, start_signal: "Event") -> None: message="Message fetch error", exc_info=e, ) + await anyio.sleep(CONSUME_ERROR_BACKOFF_SECONDS) finally: if not start_signal.is_set(): diff --git a/pyproject.toml b/pyproject.toml index a2cec5674f..feaef4e869 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -213,6 +213,7 @@ markers = [ "nats", "redis", "redis_cluster", + "redis_sentinel", "mqtt", "slow", "connected", diff --git a/tests/brokers/redis/test_sentinel.py b/tests/brokers/redis/test_sentinel.py new file mode 100644 index 0000000000..622d98248e --- /dev/null +++ b/tests/brokers/redis/test_sentinel.py @@ -0,0 +1,51 @@ +import pytest +from redis.asyncio.sentinel import SentinelConnectionPool + +from faststream.exceptions import SetupError +from faststream.redis import RedisBroker +from faststream.redis.configs.state import RedisConnectionState, SentinelConfig + +SENTINELS = [("sentinel-1", 26379), ("sentinel-2", 26379)] + + +@pytest.mark.redis() +class TestSentinelConfigUnit: + """Unit tests for Sentinel mode (no running Redis/Sentinel needed).""" + + def test_sentinel_mode_builds_config(self) -> None: + broker = RedisBroker(sentinels=SENTINELS, sentinel_master_name="mymaster") + connection = broker.config.broker_config.connection + assert isinstance(connection._sentinel, SentinelConfig) + assert connection._sentinel.master_name == "mymaster" + assert list(connection._sentinel.sentinels) == SENTINELS + + def test_sentinel_kwargs_stored(self) -> None: + broker = RedisBroker( + sentinels=SENTINELS, + sentinel_master_name="mymaster", + sentinel_kwargs={"socket_timeout": 1.0}, + ) + sentinel = broker.config.broker_config.connection._sentinel + assert sentinel is not None + assert sentinel.sentinel_kwargs == {"socket_timeout": 1.0} + + def test_direct_mode_has_no_sentinel(self) -> None: + broker = RedisBroker("redis://localhost:6379") + assert broker.config.broker_config.connection._sentinel is None + + def test_sentinel_requires_master_name(self) -> None: + with pytest.raises(SetupError, match="sentinel_master_name"): + RedisBroker(sentinels=SENTINELS) + + @pytest.mark.asyncio() + async def test_connect_builds_sentinel_pool(self) -> None: + state = RedisConnectionState( + {"host": "localhost", "port": 6379, "db": 1}, + sentinel=SentinelConfig(sentinels=SENTINELS, master_name="mymaster"), + ) + client = await state.connect() + try: + assert isinstance(client.connection_pool, SentinelConnectionPool) + assert client.connection_pool.service_name == "mymaster" + finally: + await state.disconnect() From 31697299fc502975939d1c5cf08d8d0dade9ad31 Mon Sep 17 00:00:00 2001 From: Alexandr Yakovlev Date: Mon, 1 Jun 2026 01:04:26 +0300 Subject: [PATCH 2/4] feat(redis): forward Sentinel params through FastAPI RedisRouter RedisRouter (FastAPI integration) now accepts sentinels / sentinel_master_name / sentinel_kwargs and forwards them to the underlying RedisBroker, so FastAPI + FastStream applications get Sentinel support too (previously only the standalone RedisBroker did). Adds unit tests asserting the router forwards config to broker. --- faststream/redis/fastapi/fastapi.py | 13 +++++++++++++ tests/brokers/redis/test_sentinel.py | 20 ++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/faststream/redis/fastapi/fastapi.py b/faststream/redis/fastapi/fastapi.py index be7bae6a67..85a23e491f 100644 --- a/faststream/redis/fastapi/fastapi.py +++ b/faststream/redis/fastapi/fastapi.py @@ -94,6 +94,10 @@ def __init__( encoding_errors: str = "strict", parser_class: type["BaseParser"] = DefaultParser, encoder_class: type["Encoder"] = Encoder, + # Sentinel args + sentinels: Sequence[tuple[str, int]] | None = None, + sentinel_master_name: str | None = None, + sentinel_kwargs: Mapping[str, Any] | None = None, # broker base args graceful_timeout: float | None = 15.0, decoder: Optional["CustomCallable"] = None, @@ -176,6 +180,12 @@ def __init__( _ ... encoder_class: _ ... + sentinels: + Redis Sentinel ``(host, port)`` nodes. Enables Sentinel mode (HA with master failover). + sentinel_master_name: + Sentinel master group name. Required when ``sentinels`` is set. + sentinel_kwargs: + Connection kwargs for the Sentinel nodes themselves. graceful_timeout: Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down. decoder: @@ -334,6 +344,9 @@ def __init__( parser_class=parser_class, connection_class=connection_class, encoder_class=encoder_class, + sentinels=sentinels, + sentinel_master_name=sentinel_master_name, + sentinel_kwargs=sentinel_kwargs, graceful_timeout=graceful_timeout, decoder=decoder, parser=parser, diff --git a/tests/brokers/redis/test_sentinel.py b/tests/brokers/redis/test_sentinel.py index 622d98248e..c92b6d16c0 100644 --- a/tests/brokers/redis/test_sentinel.py +++ b/tests/brokers/redis/test_sentinel.py @@ -49,3 +49,23 @@ async def test_connect_builds_sentinel_pool(self) -> None: assert client.connection_pool.service_name == "mymaster" finally: await state.disconnect() + + +@pytest.mark.redis() +class TestSentinelFastAPIRouterUnit: + """RedisRouter (FastAPI) must forward Sentinel params down to the broker.""" + + def test_router_forwards_sentinel_to_broker(self) -> None: + from faststream.redis.fastapi import RedisRouter + + router = RedisRouter(sentinels=SENTINELS, sentinel_master_name="mymaster") + connection = router.broker.config.broker_config.connection + assert isinstance(connection._sentinel, SentinelConfig) + assert connection._sentinel.master_name == "mymaster" + assert list(connection._sentinel.sentinels) == SENTINELS + + def test_router_direct_mode_has_no_sentinel(self) -> None: + from faststream.redis.fastapi import RedisRouter + + router = RedisRouter("redis://localhost:6379") + assert router.broker.config.broker_config.connection._sentinel is None From 1a7226b977ac8bc17d29cdf383a81ae5b48d5ab6 Mon Sep 17 00:00:00 2001 From: Alexandr Yakovlev Date: Wed, 3 Jun 2026 15:29:06 +0300 Subject: [PATCH 3/4] refactor(redis): move Sentinel support into a dedicated RedisSentinelBroker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per review feedback on #2895, Sentinel support now lives in its own RedisSentinelBroker / RedisSentinelRouter instead of extra parameters on the generic RedisBroker — mirroring RedisClusterBroker (#2854), so each topology is a distinct broker class and Sentinel params no longer leak into RedisBroker / RedisClusterBroker. - RedisSentinelConnectionState builds the client via Sentinel.master_for(); the generic RedisBroker / RedisRouter keep their original API. - RedisSentinelParams holds sentinels / sentinel_master_name / sentinel_kwargs. - RedisSentinelRouter adds FastAPI Sentinel support (which RedisClusterBroker does not provide). - Docs: docs/en/redis/sentinel.md + navigation entry. Refs #1812 --- docs/docs/en/redis/sentinel.md | 104 +++++++++++++++++ docs/docs/navigation_template.txt | 1 + faststream/redis/__init__.py | 2 + faststream/redis/broker/__init__.py | 2 + faststream/redis/broker/broker.py | 22 +--- faststream/redis/broker/sentinel_broker.py | 126 +++++++++++++++++++++ faststream/redis/configs/__init__.py | 4 +- faststream/redis/configs/state.py | 76 ++++++------- faststream/redis/fastapi/__init__.py | 3 +- faststream/redis/fastapi/fastapi.py | 47 +++++--- faststream/redis/schemas/types.py | 33 +++--- tests/brokers/redis/test_sentinel.py | 74 ++++++------ 12 files changed, 369 insertions(+), 125 deletions(-) create mode 100644 docs/docs/en/redis/sentinel.md create mode 100644 faststream/redis/broker/sentinel_broker.py diff --git a/docs/docs/en/redis/sentinel.md b/docs/docs/en/redis/sentinel.md new file mode 100644 index 0000000000..1025a8309a --- /dev/null +++ b/docs/docs/en/redis/sentinel.md @@ -0,0 +1,104 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 10 +--- + +# Redis Sentinel Broker + +## Overview + +**FastStream** provides a `RedisSentinelBroker` for working with [**Redis Sentinel**](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/){.external-link target="_blank"} — Redis' built-in high-availability solution. Sentinel monitors a master/replica set and automatically promotes a replica when the master fails. `RedisSentinelBroker` discovers the current master from the sentinel nodes, so publishers and consumers keep working across a failover without any manual reconfiguration. + +Unlike a Redis Cluster, Sentinel does **not** shard data — it is a single logical Redis with automatic master failover. Commands, Pub/Sub, Lists, Streams and Pipelines therefore behave **exactly** like a plain `RedisBroker`; only the way the connection is acquired differs. + +## When to Use + +| Use RedisBroker | Use RedisSentinelBroker | +|---|---| +| Single Redis instance | Master/replica set behind Sentinel | +| Development / testing | Production with automatic failover | +| Fixed master address | Master address discovered dynamically | + +## Connecting + +Pass the sentinel `(host, port)` nodes and the monitored master group name: + +```python linenums="1" +from faststream.redis import RedisSentinelBroker + +broker = RedisSentinelBroker( + sentinels=[ + ("sentinel-1", 26379), + ("sentinel-2", 26379), + ("sentinel-3", 26379), + ], + sentinel_master_name="mymaster", +) +``` + +Both `sentinels` and `sentinel_master_name` are required. Connection options such as `db`, `socket_timeout` or `security` are forwarded to the discovered master connection. Use `sentinel_kwargs` to pass options that apply to the **sentinel** connections themselves: + +```python linenums="1" +broker = RedisSentinelBroker( + sentinels=[("sentinel-1", 26379), ("sentinel-2", 26379)], + sentinel_master_name="mymaster", + sentinel_kwargs={"socket_timeout": 1.0}, + db=1, +) +``` + +## How Failover Works + +Under the hood the broker builds the client through `Sentinel(...).master_for(master_name)`, which returns a client backed by a `SentinelConnectionPool`. That pool re-discovers the current master from the sentinels on every reconnect. Because every publisher and stream consumer goes through `connection.client`, they all fail over transparently — a dropped connection during a master promotion is simply re-established against the new master. + +## FastAPI Integration + +For FastAPI applications use `RedisSentinelRouter`, which builds a `RedisSentinelBroker` under the hood and otherwise behaves like `RedisRouter`: + +```python linenums="1" +from faststream.redis.fastapi import RedisSentinelRouter + +router = RedisSentinelRouter( + sentinels=[("sentinel-1", 26379), ("sentinel-2", 26379)], + sentinel_master_name="mymaster", +) + +@router.subscriber("channel-name") +async def handle(msg: str) -> None: ... +``` + +## Feature Support + +| Feature | RedisBroker | RedisSentinelBroker | +|---|---|---| +| List | ✅ | ✅ | +| Stream + XAUTOCLAIM | ✅ | ✅ | +| Pub/Sub | ✅ | ✅ | +| Pipeline | ✅ | ✅ | + +## Migration from RedisBroker + +`RedisSentinelBroker` accepts the same parameters as `RedisBroker` plus the sentinel-specific ones, so migration only requires pointing the broker at the sentinels instead of a fixed host: + +```python +# Before +from faststream.redis import RedisBroker +broker = RedisBroker(url="redis://master-host:6379") + +# After +from faststream.redis import RedisSentinelBroker +broker = RedisSentinelBroker( + sentinels=[("sentinel-1", 26379), ("sentinel-2", 26379)], + sentinel_master_name="mymaster", +) +``` + +## References + +- [Redis Sentinel Documentation](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/){.external-link target="_blank"} +- [High Availability with Redis Sentinel](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/#high-availability){.external-link target="_blank"} diff --git a/docs/docs/navigation_template.txt b/docs/docs/navigation_template.txt index 8ae793fa49..72433372bc 100644 --- a/docs/docs/navigation_template.txt +++ b/docs/docs/navigation_template.txt @@ -116,6 +116,7 @@ search: - [In-Progress](howto/nats/in-progress.md) - [Redis](redis/index.md) - [Cluster](redis/cluster.md) + - [Sentinel](redis/sentinel.md) - [Pub/Sub](redis/pubsub/index.md) - [Subscription](redis/pubsub/subscription.md) - [Publishing](redis/pubsub/publishing.md) diff --git a/faststream/redis/__init__.py b/faststream/redis/__init__.py index dd7150c088..55ff367d97 100644 --- a/faststream/redis/__init__.py +++ b/faststream/redis/__init__.py @@ -18,6 +18,7 @@ RedisPublisher, RedisRoute, RedisRouter, + RedisSentinelBroker, ) from .exceptions import StreamGroupNotFoundError from .parser import BinaryMessageFormatV1 @@ -50,6 +51,7 @@ "RedisResponse", "RedisRoute", "RedisRouter", + "RedisSentinelBroker", "RedisStreamMessage", "StreamGroupNotFoundError", "StreamSub", diff --git a/faststream/redis/broker/__init__.py b/faststream/redis/broker/__init__.py index 7ae04eb819..af99b48867 100644 --- a/faststream/redis/broker/__init__.py +++ b/faststream/redis/broker/__init__.py @@ -1,6 +1,7 @@ from .broker import RedisBroker from .cluster_broker import RedisClusterBroker from .router import RedisPublisher, RedisRoute, RedisRouter +from .sentinel_broker import RedisSentinelBroker __all__ = ( "RedisBroker", @@ -8,4 +9,5 @@ "RedisPublisher", "RedisRoute", "RedisRouter", + "RedisSentinelBroker", ) diff --git a/faststream/redis/broker/broker.py b/faststream/redis/broker/broker.py index 42e8e81427..b2d23f2d8f 100644 --- a/faststream/redis/broker/broker.py +++ b/faststream/redis/broker/broker.py @@ -20,12 +20,10 @@ from faststream._internal.constants import EMPTY from faststream._internal.context.repository import ContextRepo from faststream._internal.di import FastDependsConfig -from faststream.exceptions import SetupError from faststream.message import gen_cor_id from faststream.redis.configs import ( RedisBrokerConfig, RedisConnectionState, - SentinelConfig, ) from faststream.redis.message import UnifyRedisDict from faststream.redis.parser import BinaryMessageFormatV1 @@ -69,10 +67,6 @@ def __init__( protocol = kwargs.pop("protocol", None) message_format = kwargs.pop("message_format", BinaryMessageFormatV1) - sentinels = kwargs.pop("sentinels", None) - sentinel_master_name = kwargs.pop("sentinel_master_name", None) - sentinel_kwargs = kwargs.pop("sentinel_kwargs", None) - self.message_format = message_format if specification_url is None: @@ -91,21 +85,7 @@ def __init__( **connection_kwargs, ) - sentinel_config: SentinelConfig | None = None - if sentinels: - if not sentinel_master_name: - msg = "`sentinel_master_name` is required when `sentinels` is set." - raise SetupError(msg) - sentinel_config = SentinelConfig( - sentinels=list(sentinels), - master_name=sentinel_master_name, - sentinel_kwargs=sentinel_kwargs, - ) - - connection_state = RedisConnectionState( - connection_options, - sentinel=sentinel_config, - ) + connection_state = RedisConnectionState(connection_options) super().__init__( **connection_options, diff --git a/faststream/redis/broker/sentinel_broker.py b/faststream/redis/broker/sentinel_broker.py new file mode 100644 index 0000000000..01a644511d --- /dev/null +++ b/faststream/redis/broker/sentinel_broker.py @@ -0,0 +1,126 @@ +import logging +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +from fast_depends import dependency_provider +from typing_extensions import Unpack + +from faststream._internal.constants import EMPTY +from faststream._internal.context.repository import ContextRepo +from faststream._internal.di import FastDependsConfig +from faststream.exceptions import SetupError +from faststream.redis.broker import RedisBroker +from faststream.redis.configs import RedisBrokerConfig +from faststream.redis.configs.state import RedisSentinelConnectionState +from faststream.redis.parser import BinaryMessageFormatV1 +from faststream.redis.publisher.producer import RedisFastProducer +from faststream.redis.schemas.types import NON_CONNECTION_PARAMS, SENTINEL_PARAMS +from faststream.specification.schema import BrokerSpec + +from .logging import make_redis_logger_state + +if TYPE_CHECKING: + from faststream.redis.schemas.types import RedisSentinelParams + + +class RedisSentinelBroker(RedisBroker): + """A Redis broker backed by Redis Sentinel (high-availability with failover). + + The master address is discovered from the ``sentinels`` nodes through + ``Sentinel.master_for(...)``; the underlying ``SentinelConnectionPool`` + re-discovers the current master on reconnect, so publishers and consumers + survive a master failover transparently. Commands, Pub/Sub, Lists and + Streams behave exactly like a plain ``RedisBroker`` — only the connection + acquisition differs. + """ + + def __init__( + self, + url: str = "redis://localhost:6379", + **kwargs: Unpack["RedisSentinelParams"], + ) -> None: + sentinels = kwargs.pop("sentinels", None) + sentinel_master_name = kwargs.pop("sentinel_master_name", None) + sentinel_kwargs = kwargs.pop("sentinel_kwargs", None) + + if not sentinels: + msg = "`sentinels` is required for RedisSentinelBroker." + raise SetupError(msg) + if not sentinel_master_name: + msg = "`sentinel_master_name` is required for RedisSentinelBroker." + raise SetupError(msg) + + host = kwargs.pop("host", EMPTY) + port = kwargs.pop("port", EMPTY) + security = kwargs.pop("security", None) + specification_url = kwargs.pop("specification_url", None) + protocol = kwargs.pop("protocol", None) + message_format = kwargs.pop("message_format", BinaryMessageFormatV1) + self.message_format = message_format + + if specification_url is None: + specification_url = url + if protocol is None: + protocol = urlparse(specification_url).scheme + + connection_kwargs = { + k: v + for k, v in kwargs.items() + if k not in NON_CONNECTION_PARAMS | SENTINEL_PARAMS + } + connection_options = self._resolve_url_options( + url, + security=security, + host=host, + port=port, + **connection_kwargs, + ) + + connection_state = RedisSentinelConnectionState( + connection_options, + sentinels=list(sentinels), + master_name=sentinel_master_name, + sentinel_kwargs=sentinel_kwargs, + ) + + super(RedisBroker, self).__init__( + **connection_options, + routers=kwargs.get("routers", ()), + config=RedisBrokerConfig( + connection=connection_state, + producer=RedisFastProducer( + connection=connection_state, + parser=kwargs.get("parser"), + decoder=kwargs.get("decoder"), + message_format=self.message_format, + serializer=kwargs.get("serializer"), + ), + message_format=self.message_format, + broker_middlewares=kwargs.get("middlewares", ()), + broker_parser=kwargs.get("parser"), + broker_decoder=kwargs.get("decoder"), + broker_codec=kwargs.get("codec"), + logger=make_redis_logger_state( + logger=kwargs.get("logger", EMPTY), + log_level=kwargs.get("log_level", logging.INFO), + ), + fd_config=FastDependsConfig( + use_fastdepends=kwargs.get("apply_types", True), + serializer=kwargs.get("serializer", EMPTY), + provider=kwargs.get("provider") or dependency_provider, + context=kwargs.get("context") or ContextRepo(), + ), + broker_dependencies=kwargs.get("dependencies", ()), + graceful_timeout=kwargs.get("graceful_timeout", 15.0), + ack_policy=kwargs.get("ack_policy", EMPTY), + extra_context={"broker": self}, + ), + specification=BrokerSpec( + description=kwargs.get("description"), + url=[specification_url], + protocol=protocol, + protocol_version=kwargs.get("protocol_version", "custom"), + security=security, + tags=kwargs.get("tags", ()), + ), + ) diff --git a/faststream/redis/configs/__init__.py b/faststream/redis/configs/__init__.py index 2d25307cf1..6fa7a17049 100644 --- a/faststream/redis/configs/__init__.py +++ b/faststream/redis/configs/__init__.py @@ -3,7 +3,7 @@ ConnectionState, RedisClusterConnectionState, RedisConnectionState, - SentinelConfig, + RedisSentinelConnectionState, ) __all__ = ( @@ -11,5 +11,5 @@ "RedisBrokerConfig", "RedisClusterConnectionState", "RedisConnectionState", - "SentinelConfig", + "RedisSentinelConnectionState", ) diff --git a/faststream/redis/configs/state.py b/faststream/redis/configs/state.py index b6b609279e..0d73b26b2c 100644 --- a/faststream/redis/configs/state.py +++ b/faststream/redis/configs/state.py @@ -1,7 +1,6 @@ from abc import ABC, abstractmethod from collections.abc import Mapping, Sequence from concurrent.futures import ThreadPoolExecutor -from dataclasses import dataclass, field from functools import partial from typing import Any, Generic, TypeVar @@ -21,21 +20,6 @@ ClientT = TypeVar("ClientT") -@dataclass -class SentinelConfig: - """Redis Sentinel connection configuration. - - When set on ``RedisConnectionState``, the client is built through - ``Sentinel(...).master_for(...)`` so the underlying - ``SentinelConnectionPool`` re-discovers the current master on every - reconnect (high-availability / failover). - """ - - sentinels: Sequence[tuple[str, int]] - master_name: str - sentinel_kwargs: Mapping[str, Any] | None = field(default=None) - - class ConnectionState(ABC, Generic[ClientT]): """Base connection state.""" @@ -70,33 +54,42 @@ async def disconnect(self) -> None: class RedisConnectionState(ConnectionState["Redis[bytes]"]): + async def connect(self) -> "Redis[bytes]": + pool = ConnectionPool( + **self._options, + lib_name="faststream", + lib_version=__version__, + ) + client: Redis[bytes] = Redis.from_pool(pool) # type: ignore[attr-defined] + + self._client = client + self._connected = True + + return client + + +class RedisSentinelConnectionState(RedisConnectionState): + """Builds the client via ``Sentinel.master_for`` for HA / failover. + + The underlying ``SentinelConnectionPool`` re-discovers the current master + on every reconnect, so publishers and stream consumers fail over for free + (both go through ``connection.client``). + """ + def __init__( self, options: dict[str, Any] | None = None, *, - sentinel: SentinelConfig | None = None, + sentinels: Sequence[tuple[str, int]], + master_name: str, + sentinel_kwargs: Mapping[str, Any] | None = None, ) -> None: super().__init__(options) - self._sentinel = sentinel + self._sentinels = list(sentinels) + self._master_name = master_name + self._sentinel_kwargs = sentinel_kwargs async def connect(self) -> "Redis[bytes]": - client: Redis[bytes] - if self._sentinel is not None: - client = self._connect_via_sentinel(self._sentinel) - else: - pool = ConnectionPool( - **self._options, - lib_name="faststream", - lib_version=__version__, - ) - client = Redis.from_pool(pool) # type: ignore[attr-defined] - - self._client = client - self._connected = True - - return client - - def _connect_via_sentinel(self, sentinel: SentinelConfig) -> "Redis[bytes]": # ``host``/``port`` describe a single node and are meaningless for # Sentinel — the master address is discovered from the sentinels. connection_kwargs = { @@ -106,13 +99,18 @@ def _connect_via_sentinel(self, sentinel: SentinelConfig) -> "Redis[bytes]": connection_kwargs["lib_version"] = __version__ manager = Sentinel( - list(sentinel.sentinels), - sentinel_kwargs=dict(sentinel.sentinel_kwargs) - if sentinel.sentinel_kwargs is not None + self._sentinels, + sentinel_kwargs=dict(self._sentinel_kwargs) + if self._sentinel_kwargs is not None else None, **connection_kwargs, ) - return manager.master_for(sentinel.master_name) + client: Redis[bytes] = manager.master_for(self._master_name) + + self._client = client + self._connected = True + + return client class RedisClusterConnectionState(ConnectionState["RedisCluster[bytes]"]): diff --git a/faststream/redis/fastapi/__init__.py b/faststream/redis/fastapi/__init__.py index e5cfb320f2..059b426c1d 100644 --- a/faststream/redis/fastapi/__init__.py +++ b/faststream/redis/fastapi/__init__.py @@ -6,7 +6,7 @@ from faststream.redis.broker.broker import RedisBroker as RB from faststream.redis.message import BaseMessage as RM # noqa: N814 -from .fastapi import RedisRouter +from .fastapi import RedisRouter, RedisSentinelRouter __all__ = ( "Context", @@ -16,6 +16,7 @@ "RedisBroker", "RedisChannelMessage", "RedisRouter", + "RedisSentinelRouter", ) RedisChannelMessage = Annotated[RM, Context("message")] diff --git a/faststream/redis/fastapi/fastapi.py b/faststream/redis/fastapi/fastapi.py index 85a23e491f..93899de223 100644 --- a/faststream/redis/fastapi/fastapi.py +++ b/faststream/redis/fastapi/fastapi.py @@ -26,6 +26,7 @@ from faststream._internal.fastapi.router import StreamRouter from faststream.middlewares import AckPolicy from faststream.redis.broker.broker import RedisBroker as RB +from faststream.redis.broker.sentinel_broker import RedisSentinelBroker as RSB from faststream.redis.message import UnifyRedisDict from faststream.redis.schemas import ListSub, PubSub, StreamSub @@ -94,10 +95,6 @@ def __init__( encoding_errors: str = "strict", parser_class: type["BaseParser"] = DefaultParser, encoder_class: type["Encoder"] = Encoder, - # Sentinel args - sentinels: Sequence[tuple[str, int]] | None = None, - sentinel_master_name: str | None = None, - sentinel_kwargs: Mapping[str, Any] | None = None, # broker base args graceful_timeout: float | None = 15.0, decoder: Optional["CustomCallable"] = None, @@ -138,6 +135,7 @@ def __init__( generate_unique_id_function: Callable[["APIRoute"], str] = Default( generate_unique_id ), + **connection_kwargs: Any, ) -> None: """Initialize the RedisRouter object. @@ -180,12 +178,6 @@ def __init__( _ ... encoder_class: _ ... - sentinels: - Redis Sentinel ``(host, port)`` nodes. Enables Sentinel mode (HA with master failover). - sentinel_master_name: - Sentinel master group name. Required when ``sentinels`` is set. - sentinel_kwargs: - Connection kwargs for the Sentinel nodes themselves. graceful_timeout: Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down. decoder: @@ -324,6 +316,8 @@ def __init__( Read more about it in the [FastAPI docs about how to Generate Clients](https://fastapi.tiangolo.com/advanced/generate-clients/#custom-generate-unique-id-function). + connection_kwargs: + Extra connection arguments forwarded to the underlying broker. """ super().__init__( @@ -344,9 +338,6 @@ def __init__( parser_class=parser_class, connection_class=connection_class, encoder_class=encoder_class, - sentinels=sentinels, - sentinel_master_name=sentinel_master_name, - sentinel_kwargs=sentinel_kwargs, graceful_timeout=graceful_timeout, decoder=decoder, parser=parser, @@ -385,6 +376,7 @@ def __init__( include_in_schema=include_in_schema, lifespan=lifespan, generate_unique_id_function=generate_unique_id_function, + **connection_kwargs, ) @overload # type: ignore[override] @@ -750,3 +742,32 @@ def publisher( schema=schema, include_in_schema=include_in_schema, ) + + +class RedisSentinelRouter(RedisRouter): + """A Redis Sentinel router (high-availability with master failover). + + Behaves like :class:`RedisRouter` but builds a :class:`RedisSentinelBroker`, + discovering the master through the ``sentinels`` nodes. All other arguments + are forwarded to the underlying broker unchanged. + """ + + broker_class = RSB + broker: RSB + + def __init__( + self, + url: str = "redis://localhost:6379", + *, + sentinels: Sequence[tuple[str, int]], + sentinel_master_name: str | None = None, + sentinel_kwargs: Mapping[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__( + url, + sentinels=sentinels, + sentinel_master_name=sentinel_master_name, + sentinel_kwargs=sentinel_kwargs, + **kwargs, + ) diff --git a/faststream/redis/schemas/types.py b/faststream/redis/schemas/types.py index ec96d54597..526c6e0a4b 100644 --- a/faststream/redis/schemas/types.py +++ b/faststream/redis/schemas/types.py @@ -55,20 +55,6 @@ class RedisConnectionParams(TypedDict, total=False): type[BaseParser], "Parser class. Defaults to ``DefaultParser``." ] encoder_class: Annotated[type[Encoder], "Encoder class. Defaults to ``Encoder``."] - sentinels: Annotated[ - Sequence[tuple[str, int]], - "Redis Sentinel ``(host, port)`` nodes. Enables Sentinel mode " - "(high-availability with master failover). Defaults to ``None``.", - ] - sentinel_master_name: Annotated[ - str, - "Sentinel master group name. Required when ``sentinels`` is set. " - "Defaults to ``None``.", - ] - sentinel_kwargs: Annotated[ - Mapping[str, Any] | None, - "Connection kwargs for the Sentinel nodes themselves. Defaults to ``None``.", - ] class RedisBrokerParams(RedisConnectionParams, total=False): @@ -123,6 +109,21 @@ class RedisClusterParams(RedisBrokerParams, total=False): ] +class RedisSentinelParams(RedisBrokerParams, total=False): + sentinels: Annotated[ + Sequence[tuple[str, int]], + "Redis Sentinel ``(host, port)`` nodes to discover the master from. Required.", + ] + sentinel_master_name: Annotated[ + str, + "Sentinel master group name. Required.", + ] + sentinel_kwargs: Annotated[ + Mapping[str, Any] | None, + "Connection kwargs for the Sentinel nodes themselves. Defaults to ``None``.", + ] + + CLUSTER_INCOMPATIBLE_PARAMS = frozenset({ "db", "socket_read_size", @@ -133,6 +134,10 @@ class RedisClusterParams(RedisBrokerParams, total=False): "connection_class", "host", "port", +}) + + +SENTINEL_PARAMS = frozenset({ "sentinels", "sentinel_master_name", "sentinel_kwargs", diff --git a/tests/brokers/redis/test_sentinel.py b/tests/brokers/redis/test_sentinel.py index c92b6d16c0..98e86f77d8 100644 --- a/tests/brokers/redis/test_sentinel.py +++ b/tests/brokers/redis/test_sentinel.py @@ -2,70 +2,74 @@ from redis.asyncio.sentinel import SentinelConnectionPool from faststream.exceptions import SetupError -from faststream.redis import RedisBroker -from faststream.redis.configs.state import RedisConnectionState, SentinelConfig +from faststream.redis import RedisSentinelBroker +from faststream.redis.configs.state import RedisSentinelConnectionState SENTINELS = [("sentinel-1", 26379), ("sentinel-2", 26379)] @pytest.mark.redis() -class TestSentinelConfigUnit: - """Unit tests for Sentinel mode (no running Redis/Sentinel needed).""" +class TestRedisSentinelBrokerUnit: + """Unit tests for RedisSentinelBroker (no running Redis/Sentinel needed).""" - def test_sentinel_mode_builds_config(self) -> None: - broker = RedisBroker(sentinels=SENTINELS, sentinel_master_name="mymaster") + def test_builds_sentinel_connection_state(self) -> None: + broker = RedisSentinelBroker( + sentinels=SENTINELS, sentinel_master_name="mymaster" + ) connection = broker.config.broker_config.connection - assert isinstance(connection._sentinel, SentinelConfig) - assert connection._sentinel.master_name == "mymaster" - assert list(connection._sentinel.sentinels) == SENTINELS + assert isinstance(connection, RedisSentinelConnectionState) + assert connection._master_name == "mymaster" + assert connection._sentinels == SENTINELS def test_sentinel_kwargs_stored(self) -> None: - broker = RedisBroker( + broker = RedisSentinelBroker( sentinels=SENTINELS, sentinel_master_name="mymaster", sentinel_kwargs={"socket_timeout": 1.0}, ) - sentinel = broker.config.broker_config.connection._sentinel - assert sentinel is not None - assert sentinel.sentinel_kwargs == {"socket_timeout": 1.0} + connection = broker.config.broker_config.connection + assert connection._sentinel_kwargs == {"socket_timeout": 1.0} - def test_direct_mode_has_no_sentinel(self) -> None: - broker = RedisBroker("redis://localhost:6379") - assert broker.config.broker_config.connection._sentinel is None + def test_requires_sentinels(self) -> None: + with pytest.raises(SetupError, match="sentinels"): + RedisSentinelBroker(sentinels=[], sentinel_master_name="mymaster") - def test_sentinel_requires_master_name(self) -> None: + def test_requires_master_name(self) -> None: with pytest.raises(SetupError, match="sentinel_master_name"): - RedisBroker(sentinels=SENTINELS) + RedisSentinelBroker(sentinels=SENTINELS) @pytest.mark.asyncio() async def test_connect_builds_sentinel_pool(self) -> None: - state = RedisConnectionState( - {"host": "localhost", "port": 6379, "db": 1}, - sentinel=SentinelConfig(sentinels=SENTINELS, master_name="mymaster"), + broker = RedisSentinelBroker( + sentinels=SENTINELS, sentinel_master_name="mymaster", db=1 ) - client = await state.connect() + connection = broker.config.broker_config.connection + client = await connection.connect() try: assert isinstance(client.connection_pool, SentinelConnectionPool) assert client.connection_pool.service_name == "mymaster" finally: - await state.disconnect() + await connection.disconnect() @pytest.mark.redis() -class TestSentinelFastAPIRouterUnit: - """RedisRouter (FastAPI) must forward Sentinel params down to the broker.""" +class TestRedisSentinelFastAPIRouterUnit: + """RedisSentinelRouter (FastAPI) must build a Sentinel-backed broker.""" - def test_router_forwards_sentinel_to_broker(self) -> None: - from faststream.redis.fastapi import RedisRouter + def test_router_builds_sentinel_broker(self) -> None: + from faststream.redis.fastapi import RedisSentinelRouter - router = RedisRouter(sentinels=SENTINELS, sentinel_master_name="mymaster") + router = RedisSentinelRouter( + sentinels=SENTINELS, sentinel_master_name="mymaster" + ) + assert isinstance(router.broker, RedisSentinelBroker) connection = router.broker.config.broker_config.connection - assert isinstance(connection._sentinel, SentinelConfig) - assert connection._sentinel.master_name == "mymaster" - assert list(connection._sentinel.sentinels) == SENTINELS + assert isinstance(connection, RedisSentinelConnectionState) + assert connection._master_name == "mymaster" + assert connection._sentinels == SENTINELS - def test_router_direct_mode_has_no_sentinel(self) -> None: - from faststream.redis.fastapi import RedisRouter + def test_router_requires_master_name(self) -> None: + from faststream.redis.fastapi import RedisSentinelRouter - router = RedisRouter("redis://localhost:6379") - assert router.broker.config.broker_config.connection._sentinel is None + with pytest.raises(SetupError, match="sentinel_master_name"): + RedisSentinelRouter(sentinels=SENTINELS) From 7c95d67cb352593e6f0756caf26bbe87f0c2b024 Mon Sep 17 00:00:00 2001 From: Alexandr Yakovlev Date: Thu, 11 Jun 2026 14:31:46 +0300 Subject: [PATCH 4/4] style(redis): format test_sentinel.py with ruff --- tests/brokers/redis/test_sentinel.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/brokers/redis/test_sentinel.py b/tests/brokers/redis/test_sentinel.py index 98e86f77d8..f9d328b56c 100644 --- a/tests/brokers/redis/test_sentinel.py +++ b/tests/brokers/redis/test_sentinel.py @@ -13,9 +13,7 @@ class TestRedisSentinelBrokerUnit: """Unit tests for RedisSentinelBroker (no running Redis/Sentinel needed).""" def test_builds_sentinel_connection_state(self) -> None: - broker = RedisSentinelBroker( - sentinels=SENTINELS, sentinel_master_name="mymaster" - ) + broker = RedisSentinelBroker(sentinels=SENTINELS, sentinel_master_name="mymaster") connection = broker.config.broker_config.connection assert isinstance(connection, RedisSentinelConnectionState) assert connection._master_name == "mymaster" @@ -59,9 +57,7 @@ class TestRedisSentinelFastAPIRouterUnit: def test_router_builds_sentinel_broker(self) -> None: from faststream.redis.fastapi import RedisSentinelRouter - router = RedisSentinelRouter( - sentinels=SENTINELS, sentinel_master_name="mymaster" - ) + router = RedisSentinelRouter(sentinels=SENTINELS, sentinel_master_name="mymaster") assert isinstance(router.broker, RedisSentinelBroker) connection = router.broker.config.broker_config.connection assert isinstance(connection, RedisSentinelConnectionState)