From 539ee886732eb5c1c89a381075a477613ad603b9 Mon Sep 17 00:00:00 2001 From: ivankirpichnikov Date: Mon, 13 Oct 2025 19:26:01 +0300 Subject: [PATCH 1/3] feat: add support async context managers in subscribers --- .../getting-started/subscription/dynamic.md | 43 ++++++++++++------- docs/docs/en/release.md | 10 ++--- .../subscription/confluent/dynamic.py | 3 ++ .../subscription/confluent/dynamic_iter.py | 10 ++--- .../subscription/kafka/dynamic.py | 3 ++ .../subscription/kafka/dynamic_iter.py | 10 ++--- .../subscription/nats/dynamic.py | 3 ++ .../subscription/nats/dynamic_iter.py | 10 ++--- .../subscription/rabbit/dynamic.py | 3 ++ .../subscription/rabbit/dynamic_iter.py | 10 ++--- .../subscription/redis/dynamic.py | 3 ++ .../subscription/redis/dynamic_iter.py | 10 ++--- .../_internal/endpoint/subscriber/usecase.py | 13 ++++++ 13 files changed, 80 insertions(+), 51 deletions(-) diff --git a/docs/docs/en/getting-started/subscription/dynamic.md b/docs/docs/en/getting-started/subscription/dynamic.md index f0a144c0217..81fbec72f9f 100644 --- a/docs/docs/en/getting-started/subscription/dynamic.md +++ b/docs/docs/en/getting-started/subscription/dynamic.md @@ -17,9 +17,8 @@ However, the framework still allows you to do so in a suitable manner. async with TestKafkaBroker(broker) as br: subscriber = br.subscriber("test-topic", persistent=False) - await subscriber.start() - message = await subscriber.get_one() # does not work - await subscriber.stop() + async with subscriber: + message = await subscriber.get_one() # does not work ``` === "Confluent" ```python linenums="1" @@ -28,9 +27,9 @@ However, the framework still allows you to do so in a suitable manner. async with TestKafkaBroker(broker) as br: subscriber = br.subscriber("test-topic", persistent=False) - await subscriber.start() - message = await subscriber.get_one() # does not work - await subscriber.stop() + async with subscriber: + message = await subscriber.get_one() # does not work + ``` === "RabbitMQ" ```python linenums="1" @@ -39,9 +38,8 @@ However, the framework still allows you to do so in a suitable manner. async with TestRabbitBroker(broker) as br: subscriber = br.subscriber("test-queue", persistent=False) - await subscriber.start() - message = await subscriber.get_one() # does not work - await subscriber.stop() + async with subscriber: + message = await subscriber.get_one() # does not work ``` === "NATS" ```python linenums="1" @@ -50,9 +48,8 @@ However, the framework still allows you to do so in a suitable manner. async with TestNatsBroker(broker) as br: subscriber = br.subscriber("test-subject", persistent=False) - await subscriber.start() - message = await subscriber.get_one() # does not work - await subscriber.stop() + async with subscriber: + message = await subscriber.get_one() # does not work ``` === "Redis" ```python linenums="1" @@ -61,9 +58,8 @@ However, the framework still allows you to do so in a suitable manner. async with TestRedisBroker(broker) as br: subscriber = br.subscriber("test-channel", persistent=False) - await subscriber.start() - message = await subscriber.get_one() # does not work - await subscriber.stop() + async with subscriber: + message = await subscriber.get_one() # does not work ``` ## Consuming a Single Message @@ -93,6 +89,11 @@ To process a single message, you should create a subscriber and call the appropr ```python linenums="1" hl_lines="1 5" {!> docs_src/getting_started/subscription/confluent/dynamic.py [ln:6-10] !} ``` + Or so + ```python linenums="1" hl_lines="1" + {!> docs_src/getting_started/subscription/confluent/dynamic.py [ln:12-13] !} + ``` + === "RabbitMQ" ```python linenums="1" hl_lines="8" @@ -105,6 +106,10 @@ To process a single message, you should create a subscriber and call the appropr ```python linenums="1" hl_lines="1 5" {!> docs_src/getting_started/subscription/rabbit/dynamic.py [ln:6-10] !} ``` + Or so + ```python linenums="1" hl_lines="1" + {!> docs_src/getting_started/subscription/rabbit/dynamic.py [ln:12-13] !} + ``` === "NATS" ```python linenums="1" hl_lines="8" @@ -117,6 +122,10 @@ To process a single message, you should create a subscriber and call the appropr ```python linenums="1" hl_lines="1 5" {!> docs_src/getting_started/subscription/nats/dynamic.py [ln:6-10] !} ``` + Or so + ```python linenums="1" hl_lines="1" + {!> docs_src/getting_started/subscription/nats/dynamic.py [ln:12-13] !} + ``` === "Redis" ```python linenums="1" hl_lines="8" @@ -129,6 +138,10 @@ To process a single message, you should create a subscriber and call the appropr ```python linenums="1" hl_lines="1 5" {!> docs_src/getting_started/subscription/redis/dynamic.py [ln:6-10] !} ``` + Or so + ```python linenums="1" hl_lines="1" + {!> docs_src/getting_started/subscription/redis/dynamic.py [ln:12-13] !} + ``` ## Iteration over messages diff --git a/docs/docs/en/release.md b/docs/docs/en/release.md index 6cd6790bb97..036109bd276 100644 --- a/docs/docs/en/release.md +++ b/docs/docs/en/release.md @@ -2026,9 +2026,8 @@ subscriber = broker.subscriber("dynamic") subscriber(handler_method) ... broker.setup_subscriber(subscriber) -await subscriber.start() -... -await subscriber.close() +async with subscriber: + ... ``` 10. `faststream[docs]` distribution is removed. @@ -2202,9 +2201,8 @@ subscriber = broker.subscriber("dynamic") subscriber(handler_method) ... broker.setup_subscriber(subscriber) -await subscriber.start() -... -await subscriber.close() +async with subscriber: + ... ``` 10. `faststream[docs]` distribution is removed. diff --git a/docs/docs_src/getting_started/subscription/confluent/dynamic.py b/docs/docs_src/getting_started/subscription/confluent/dynamic.py index a31e57081a2..5983b063b37 100644 --- a/docs/docs_src/getting_started/subscription/confluent/dynamic.py +++ b/docs/docs_src/getting_started/subscription/confluent/dynamic.py @@ -9,4 +9,7 @@ async def main(): await subscriber.stop() + async with subscriber: + message: KafkaMessage | None = await subscriber.get_one(timeout=3.0) + return message diff --git a/docs/docs_src/getting_started/subscription/confluent/dynamic_iter.py b/docs/docs_src/getting_started/subscription/confluent/dynamic_iter.py index 0d300d1922c..4cccf4b712a 100644 --- a/docs/docs_src/getting_started/subscription/confluent/dynamic_iter.py +++ b/docs/docs_src/getting_started/subscription/confluent/dynamic_iter.py @@ -1,11 +1,9 @@ -from faststream.confluent import KafkaBroker, KafkaMessage +from faststream.confluent import KafkaBroker async def main(): async with KafkaBroker() as broker: subscriber = broker.subscriber("test-topic", persistent=False) - await subscriber.start() - async for msg in subscriber: # msg is KafkaMessage type - ... # do message process - - await subscriber.stop() + async with subscriber: + async for msg in subscriber: # msg is KafkaMessage type + ... # do message process diff --git a/docs/docs_src/getting_started/subscription/kafka/dynamic.py b/docs/docs_src/getting_started/subscription/kafka/dynamic.py index 2e1d4f5097c..64e44a90361 100644 --- a/docs/docs_src/getting_started/subscription/kafka/dynamic.py +++ b/docs/docs_src/getting_started/subscription/kafka/dynamic.py @@ -9,4 +9,7 @@ async def main(): await subscriber.stop() + async with subscriber: + message: KafkaMessage | None = await subscriber.get_one(timeout=3.0) + return message diff --git a/docs/docs_src/getting_started/subscription/kafka/dynamic_iter.py b/docs/docs_src/getting_started/subscription/kafka/dynamic_iter.py index be9efcd7cdd..31a3247115f 100644 --- a/docs/docs_src/getting_started/subscription/kafka/dynamic_iter.py +++ b/docs/docs_src/getting_started/subscription/kafka/dynamic_iter.py @@ -1,11 +1,9 @@ -from faststream.kafka import KafkaBroker, KafkaMessage +from faststream.kafka import KafkaBroker async def main(): async with KafkaBroker() as broker: subscriber = broker.subscriber("test-topic", persistent=False) - await subscriber.start() - async for msg in subscriber: # msg is KafkaMessage type - ... # do message process - - await subscriber.stop() + async with subscriber: + async for msg in subscriber: # msg is KafkaMessage type + ... # do message process diff --git a/docs/docs_src/getting_started/subscription/nats/dynamic.py b/docs/docs_src/getting_started/subscription/nats/dynamic.py index 876fca8787a..3cee2dbcad9 100644 --- a/docs/docs_src/getting_started/subscription/nats/dynamic.py +++ b/docs/docs_src/getting_started/subscription/nats/dynamic.py @@ -9,4 +9,7 @@ async def main(): await subscriber.stop() + async with subscriber: + message: NatsMessage | None = await subscriber.get_one(timeout=3.0) + return message diff --git a/docs/docs_src/getting_started/subscription/nats/dynamic_iter.py b/docs/docs_src/getting_started/subscription/nats/dynamic_iter.py index 69fe1df908e..2d6fe5ad9da 100644 --- a/docs/docs_src/getting_started/subscription/nats/dynamic_iter.py +++ b/docs/docs_src/getting_started/subscription/nats/dynamic_iter.py @@ -1,11 +1,9 @@ -from faststream.nats import NatsBroker, NatsMessage +from faststream.nats import NatsBroker async def main(): async with NatsBroker() as broker: subscriber = broker.subscriber("test-subject", persistent=False) - await subscriber.start() - async for msg in subscriber: # msg is NatsMessage type - ... # do message process - - await subscriber.stop() + async with subscriber: + async for msg in subscriber: # msg is NatsMessage type + ... # do message process diff --git a/docs/docs_src/getting_started/subscription/rabbit/dynamic.py b/docs/docs_src/getting_started/subscription/rabbit/dynamic.py index 891752eb511..f001f6a3d59 100644 --- a/docs/docs_src/getting_started/subscription/rabbit/dynamic.py +++ b/docs/docs_src/getting_started/subscription/rabbit/dynamic.py @@ -9,4 +9,7 @@ async def main(): await subscriber.stop() + async with subscriber: + message: RabbitMessage | None = await subscriber.get_one(timeout=3.0) + return message diff --git a/docs/docs_src/getting_started/subscription/rabbit/dynamic_iter.py b/docs/docs_src/getting_started/subscription/rabbit/dynamic_iter.py index 7a62951e7bc..57c0c2a91c0 100644 --- a/docs/docs_src/getting_started/subscription/rabbit/dynamic_iter.py +++ b/docs/docs_src/getting_started/subscription/rabbit/dynamic_iter.py @@ -1,11 +1,9 @@ -from faststream.rabbit import RabbitBroker, RabbitMessage +from faststream.rabbit import RabbitBroker async def main(): async with RabbitBroker() as broker: subscriber = broker.subscriber("test-queue", persistent=False) - await subscriber.start() - async for msg in subscriber: # msg is RabbitMessage type - ... # do message process - - await subscriber.stop() + async with subscriber: + async for msg in subscriber: # msg is RabbitMessage type + ... # do message process diff --git a/docs/docs_src/getting_started/subscription/redis/dynamic.py b/docs/docs_src/getting_started/subscription/redis/dynamic.py index 4c0ce1db0e3..b9c5ddeb634 100644 --- a/docs/docs_src/getting_started/subscription/redis/dynamic.py +++ b/docs/docs_src/getting_started/subscription/redis/dynamic.py @@ -9,4 +9,7 @@ async def main(): await subscriber.stop() + async with subscriber: + message: RedisChannelMessage | None = await subscriber.get_one(timeout=3.0) + return message diff --git a/docs/docs_src/getting_started/subscription/redis/dynamic_iter.py b/docs/docs_src/getting_started/subscription/redis/dynamic_iter.py index ddc25cfae65..1118652faf4 100644 --- a/docs/docs_src/getting_started/subscription/redis/dynamic_iter.py +++ b/docs/docs_src/getting_started/subscription/redis/dynamic_iter.py @@ -1,11 +1,9 @@ -from faststream.redis import RedisBroker, RedisMessage +from faststream.redis import RedisBroker async def main(): async with RedisBroker() as broker: subscriber = broker.subscriber("test-channel", persistent=False) - await subscriber.start() - async for msg in subscriber: # msg is RedisMessage type - ... # do message process - - await subscriber.stop() + async with subscriber: + async for msg in subscriber: # msg is RedisMessage type + ... # do message process diff --git a/faststream/_internal/endpoint/subscriber/usecase.py b/faststream/_internal/endpoint/subscriber/usecase.py index a5bb968badc..e0776b2a5cc 100644 --- a/faststream/_internal/endpoint/subscriber/usecase.py +++ b/faststream/_internal/endpoint/subscriber/usecase.py @@ -2,6 +2,7 @@ from collections.abc import AsyncIterator, Callable, Iterable, Sequence from contextlib import AbstractContextManager, AsyncExitStack from itertools import chain +from types import TracebackType from typing import ( TYPE_CHECKING, Annotated, @@ -105,6 +106,18 @@ def __init__( def _broker_middlewares(self) -> Sequence["BrokerMiddleware[MsgType]"]: return self._outer_config.broker_middlewares + async def __aenter__(self) -> Self: + await self.start() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + await self.stop() + async def start(self) -> None: """Private method to start subscriber by broker.""" self.lock = MultiLock() From c13d1cf0f3bf99421c4fb5991ff6407121248f09 Mon Sep 17 00:00:00 2001 From: ivankirpichnikov Date: Mon, 13 Oct 2025 19:39:03 +0300 Subject: [PATCH 2/3] feat: add kafka example --- docs/docs/en/getting-started/subscription/dynamic.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/docs/en/getting-started/subscription/dynamic.md b/docs/docs/en/getting-started/subscription/dynamic.md index 81fbec72f9f..61dca76ac6e 100644 --- a/docs/docs/en/getting-started/subscription/dynamic.md +++ b/docs/docs/en/getting-started/subscription/dynamic.md @@ -77,6 +77,10 @@ To process a single message, you should create a subscriber and call the appropr ```python linenums="1" hl_lines="1 5" {!> docs_src/getting_started/subscription/kafka/dynamic.py [ln:6-10] !} ``` + Or so + ```python linenums="1" hl_lines="1" + {!> docs_src/getting_started/subscription/kafka/dynamic.py [ln:12-13] !} + ``` === "Confluent" ```python linenums="1" hl_lines="8" From 8557490650d6547c329d3c50a684ce27f5f7089e Mon Sep 17 00:00:00 2001 From: ivankirpichnikov Date: Mon, 13 Oct 2025 19:53:20 +0300 Subject: [PATCH 3/3] feat: revert edit docs in release.md --- docs/docs/en/release.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/docs/en/release.md b/docs/docs/en/release.md index 036109bd276..76fec13df0a 100644 --- a/docs/docs/en/release.md +++ b/docs/docs/en/release.md @@ -2026,8 +2026,9 @@ subscriber = broker.subscriber("dynamic") subscriber(handler_method) ... broker.setup_subscriber(subscriber) -async with subscriber: - ... +await subscriber.start() +... +await subscriber.close() ``` 10. `faststream[docs]` distribution is removed.