Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions docs/docs/en/redis/sentinel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 10
---

# Redis Sentinel Broker

## Overview

**FastStream** provides a `RedisSentinelBroker` for working with [**Redis Sentinel**](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/){.external-link target="_blank"} — Redis' built-in high-availability solution. Sentinel monitors a master/replica set and automatically promotes a replica when the master fails. `RedisSentinelBroker` discovers the current master from the sentinel nodes, so publishers and consumers keep working across a failover without any manual reconfiguration.

Unlike a Redis Cluster, Sentinel does **not** shard data — it is a single logical Redis with automatic master failover. Commands, Pub/Sub, Lists, Streams and Pipelines therefore behave **exactly** like a plain `RedisBroker`; only the way the connection is acquired differs.

## When to Use

| Use RedisBroker | Use RedisSentinelBroker |
|---|---|
| Single Redis instance | Master/replica set behind Sentinel |
| Development / testing | Production with automatic failover |
| Fixed master address | Master address discovered dynamically |

## Connecting

Pass the sentinel `(host, port)` nodes and the monitored master group name:

```python linenums="1"
from faststream.redis import RedisSentinelBroker

broker = RedisSentinelBroker(
sentinels=[
("sentinel-1", 26379),
("sentinel-2", 26379),
("sentinel-3", 26379),
],
sentinel_master_name="mymaster",
)
```

Both `sentinels` and `sentinel_master_name` are required. Connection options such as `db`, `socket_timeout` or `security` are forwarded to the discovered master connection. Use `sentinel_kwargs` to pass options that apply to the **sentinel** connections themselves:

```python linenums="1"
broker = RedisSentinelBroker(
sentinels=[("sentinel-1", 26379), ("sentinel-2", 26379)],
sentinel_master_name="mymaster",
sentinel_kwargs={"socket_timeout": 1.0},
db=1,
)
```

## How Failover Works

Under the hood the broker builds the client through `Sentinel(...).master_for(master_name)`, which returns a client backed by a `SentinelConnectionPool`. That pool re-discovers the current master from the sentinels on every reconnect. Because every publisher and stream consumer goes through `connection.client`, they all fail over transparently — a dropped connection during a master promotion is simply re-established against the new master.

## FastAPI Integration

For FastAPI applications use `RedisSentinelRouter`, which builds a `RedisSentinelBroker` under the hood and otherwise behaves like `RedisRouter`:

```python linenums="1"
from faststream.redis.fastapi import RedisSentinelRouter

router = RedisSentinelRouter(
sentinels=[("sentinel-1", 26379), ("sentinel-2", 26379)],
sentinel_master_name="mymaster",
)

@router.subscriber("channel-name")
async def handle(msg: str) -> None: ...
```

## Feature Support

| Feature | RedisBroker | RedisSentinelBroker |
|---|---|---|
| List | ✅ | ✅ |
| Stream + XAUTOCLAIM | ✅ | ✅ |
| Pub/Sub | ✅ | ✅ |
| Pipeline | ✅ | ✅ |

## Migration from RedisBroker

`RedisSentinelBroker` accepts the same parameters as `RedisBroker` plus the sentinel-specific ones, so migration only requires pointing the broker at the sentinels instead of a fixed host:

```python
# Before
from faststream.redis import RedisBroker
broker = RedisBroker(url="redis://master-host:6379")

# After
from faststream.redis import RedisSentinelBroker
broker = RedisSentinelBroker(
sentinels=[("sentinel-1", 26379), ("sentinel-2", 26379)],
sentinel_master_name="mymaster",
)
```

## References

- [Redis Sentinel Documentation](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/){.external-link target="_blank"}
- [High Availability with Redis Sentinel](https://redis.io/docs/latest/operate/oss_and_stack/management/sentinel/#high-availability){.external-link target="_blank"}
1 change: 1 addition & 0 deletions docs/docs/navigation_template.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ search:
- [In-Progress](howto/nats/in-progress.md)
- [Redis](redis/index.md)
- [Cluster](redis/cluster.md)
- [Sentinel](redis/sentinel.md)
- [Pub/Sub](redis/pubsub/index.md)
- [Subscription](redis/pubsub/subscription.md)
- [Publishing](redis/pubsub/publishing.md)
Expand Down
2 changes: 2 additions & 0 deletions faststream/redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
RedisPublisher,
RedisRoute,
RedisRouter,
RedisSentinelBroker,
)
from .exceptions import StreamGroupNotFoundError
from .parser import BinaryMessageFormatV1
Expand Down Expand Up @@ -50,6 +51,7 @@
"RedisResponse",
"RedisRoute",
"RedisRouter",
"RedisSentinelBroker",
"RedisStreamMessage",
"StreamGroupNotFoundError",
"StreamSub",
Expand Down
2 changes: 2 additions & 0 deletions faststream/redis/broker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from .broker import RedisBroker
from .cluster_broker import RedisClusterBroker
from .router import RedisPublisher, RedisRoute, RedisRouter
from .sentinel_broker import RedisSentinelBroker

__all__ = (
"RedisBroker",
"RedisClusterBroker",
"RedisPublisher",
"RedisRoute",
"RedisRouter",
"RedisSentinelBroker",
)
5 changes: 4 additions & 1 deletion faststream/redis/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from faststream._internal.context.repository import ContextRepo
from faststream._internal.di import FastDependsConfig
from faststream.message import gen_cor_id
from faststream.redis.configs import RedisBrokerConfig, RedisConnectionState
from faststream.redis.configs import (
RedisBrokerConfig,
RedisConnectionState,
)
from faststream.redis.message import UnifyRedisDict
from faststream.redis.parser import BinaryMessageFormatV1
from faststream.redis.publisher.producer import RedisFastProducer
Expand Down
126 changes: 126 additions & 0 deletions faststream/redis/broker/sentinel_broker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import logging
from typing import TYPE_CHECKING
from urllib.parse import urlparse

from fast_depends import dependency_provider
from typing_extensions import Unpack

from faststream._internal.constants import EMPTY
from faststream._internal.context.repository import ContextRepo
from faststream._internal.di import FastDependsConfig
from faststream.exceptions import SetupError
from faststream.redis.broker import RedisBroker
from faststream.redis.configs import RedisBrokerConfig
from faststream.redis.configs.state import RedisSentinelConnectionState
from faststream.redis.parser import BinaryMessageFormatV1
from faststream.redis.publisher.producer import RedisFastProducer
from faststream.redis.schemas.types import NON_CONNECTION_PARAMS, SENTINEL_PARAMS
from faststream.specification.schema import BrokerSpec

from .logging import make_redis_logger_state

if TYPE_CHECKING:
from faststream.redis.schemas.types import RedisSentinelParams


class RedisSentinelBroker(RedisBroker):
"""A Redis broker backed by Redis Sentinel (high-availability with failover).

The master address is discovered from the ``sentinels`` nodes through
``Sentinel.master_for(...)``; the underlying ``SentinelConnectionPool``
re-discovers the current master on reconnect, so publishers and consumers
survive a master failover transparently. Commands, Pub/Sub, Lists and
Streams behave exactly like a plain ``RedisBroker`` — only the connection
acquisition differs.
"""

def __init__(
self,
url: str = "redis://localhost:6379",
**kwargs: Unpack["RedisSentinelParams"],
) -> None:
sentinels = kwargs.pop("sentinels", None)
sentinel_master_name = kwargs.pop("sentinel_master_name", None)
sentinel_kwargs = kwargs.pop("sentinel_kwargs", None)

if not sentinels:
msg = "`sentinels` is required for RedisSentinelBroker."
raise SetupError(msg)
if not sentinel_master_name:
msg = "`sentinel_master_name` is required for RedisSentinelBroker."
raise SetupError(msg)

host = kwargs.pop("host", EMPTY)
port = kwargs.pop("port", EMPTY)
security = kwargs.pop("security", None)
specification_url = kwargs.pop("specification_url", None)
protocol = kwargs.pop("protocol", None)
message_format = kwargs.pop("message_format", BinaryMessageFormatV1)
self.message_format = message_format

if specification_url is None:
specification_url = url
if protocol is None:
protocol = urlparse(specification_url).scheme

connection_kwargs = {
k: v
for k, v in kwargs.items()
if k not in NON_CONNECTION_PARAMS | SENTINEL_PARAMS
}
connection_options = self._resolve_url_options(
url,
security=security,
host=host,
port=port,
**connection_kwargs,
)

connection_state = RedisSentinelConnectionState(
connection_options,
sentinels=list(sentinels),
master_name=sentinel_master_name,
sentinel_kwargs=sentinel_kwargs,
)

super(RedisBroker, self).__init__(
**connection_options,
routers=kwargs.get("routers", ()),
config=RedisBrokerConfig(
connection=connection_state,
producer=RedisFastProducer(
connection=connection_state,
parser=kwargs.get("parser"),
decoder=kwargs.get("decoder"),
message_format=self.message_format,
serializer=kwargs.get("serializer"),
),
message_format=self.message_format,
broker_middlewares=kwargs.get("middlewares", ()),
broker_parser=kwargs.get("parser"),
broker_decoder=kwargs.get("decoder"),
broker_codec=kwargs.get("codec"),
logger=make_redis_logger_state(
logger=kwargs.get("logger", EMPTY),
log_level=kwargs.get("log_level", logging.INFO),
),
fd_config=FastDependsConfig(
use_fastdepends=kwargs.get("apply_types", True),
serializer=kwargs.get("serializer", EMPTY),
provider=kwargs.get("provider") or dependency_provider,
context=kwargs.get("context") or ContextRepo(),
),
broker_dependencies=kwargs.get("dependencies", ()),
graceful_timeout=kwargs.get("graceful_timeout", 15.0),
ack_policy=kwargs.get("ack_policy", EMPTY),
extra_context={"broker": self},
),
specification=BrokerSpec(
description=kwargs.get("description"),
url=[specification_url],
protocol=protocol,
protocol_version=kwargs.get("protocol_version", "custom"),
security=security,
tags=kwargs.get("tags", ()),
),
)
2 changes: 2 additions & 0 deletions faststream/redis/configs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
ConnectionState,
RedisClusterConnectionState,
RedisConnectionState,
RedisSentinelConnectionState,
)

__all__ = (
"ConnectionState",
"RedisBrokerConfig",
"RedisClusterConnectionState",
"RedisConnectionState",
"RedisSentinelConnectionState",
)
47 changes: 47 additions & 0 deletions faststream/redis/configs/state.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from abc import ABC, abstractmethod
from collections.abc import Mapping, Sequence
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Generic, TypeVar

from redis.asyncio.client import Redis
from redis.asyncio.cluster import RedisCluster
from redis.asyncio.connection import ConnectionPool
from redis.asyncio.sentinel import Sentinel
from redis.cluster import (
ClusterNode,
RedisCluster as SyncRC,
Expand Down Expand Up @@ -66,6 +68,51 @@ async def connect(self) -> "Redis[bytes]":
return client


class RedisSentinelConnectionState(RedisConnectionState):
"""Builds the client via ``Sentinel.master_for`` for HA / failover.

The underlying ``SentinelConnectionPool`` re-discovers the current master
on every reconnect, so publishers and stream consumers fail over for free
(both go through ``connection.client``).
"""

def __init__(
self,
options: dict[str, Any] | None = None,
*,
sentinels: Sequence[tuple[str, int]],
master_name: str,
sentinel_kwargs: Mapping[str, Any] | None = None,
) -> None:
super().__init__(options)
self._sentinels = list(sentinels)
self._master_name = master_name
self._sentinel_kwargs = sentinel_kwargs

async def connect(self) -> "Redis[bytes]":
# ``host``/``port`` describe a single node and are meaningless for
# Sentinel — the master address is discovered from the sentinels.
connection_kwargs = {
k: v for k, v in self._options.items() if k not in {"host", "port"}
}
connection_kwargs["lib_name"] = "faststream"
connection_kwargs["lib_version"] = __version__

manager = Sentinel(
self._sentinels,
sentinel_kwargs=dict(self._sentinel_kwargs)
if self._sentinel_kwargs is not None
else None,
**connection_kwargs,
)
client: Redis[bytes] = manager.master_for(self._master_name)

self._client = client
self._connected = True

return client


class RedisClusterConnectionState(ConnectionState["RedisCluster[bytes]"]):
"""Manages a Redis Cluster connection lifecycle.

Expand Down
3 changes: 2 additions & 1 deletion faststream/redis/fastapi/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from faststream.redis.broker.broker import RedisBroker as RB
from faststream.redis.message import BaseMessage as RM # noqa: N814

from .fastapi import RedisRouter
from .fastapi import RedisRouter, RedisSentinelRouter

__all__ = (
"Context",
Expand All @@ -16,6 +16,7 @@
"RedisBroker",
"RedisChannelMessage",
"RedisRouter",
"RedisSentinelRouter",
)

RedisChannelMessage = Annotated[RM, Context("message")]
Expand Down
Loading