From bfb4a82c3e889d1cb56a36c1094a53f8ab1d48e7 Mon Sep 17 00:00:00 2001 From: 00yhj22-debug <00yhj22@gmail.com> Date: Thu, 21 May 2026 19:04:11 +0900 Subject: [PATCH] Document the concurrency model for subscribers Closes #2749. Add a dedicated docs page (`getting-started/subscription/concurrency.md`) that walks through how `max_workers` and RabbitMQ's `prefetch_count` control concurrency, how they combine, and what `prefetch_count=N` actually means (broker-side flow control, not "N processed at once"). Cross-references the existing Kafka subscriber notes and links the new page from the navigation template. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../subscription/concurrency.md | 69 +++++++++++++++++++ docs/docs/navigation_template.txt | 1 + 2 files changed, 70 insertions(+) create mode 100644 docs/docs/en/getting-started/subscription/concurrency.md diff --git a/docs/docs/en/getting-started/subscription/concurrency.md b/docs/docs/en/getting-started/subscription/concurrency.md new file mode 100644 index 00000000000..0e3c7697756 --- /dev/null +++ b/docs/docs/en/getting-started/subscription/concurrency.md @@ -0,0 +1,69 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 10 +--- + +# Concurrency + +**FastStream** gives you two independent levers for processing messages concurrently. Mixing them up is a common source of confusion, so it helps to keep their roles straight. + +## `max_workers` — handler-level parallelism + +`max_workers` is the **number of concurrent handler invocations** a single subscriber may run. It is available on every broker's subscriber decorator: + +```python hl_lines="3" +@broker.subscriber( + "test", + max_workers=4, +) +async def handle(msg): ... +``` + +With `max_workers=1` (the default) **FastStream** processes one message at a time per subscriber. With `max_workers=N`, up to `N` invocations of the handler can run at the same time. + +## `prefetch_count` — RabbitMQ flow control + +`prefetch_count` is **RabbitMQ-specific**. It comes from AMQP's `basic.qos` and tells the broker how many unacknowledged messages it may have outstanding on a given channel at any time. + +It is set at the channel level: + +```python hl_lines="6" +from faststream.rabbit import RabbitBroker +from faststream.rabbit.schemas import Channel + +broker = RabbitBroker( + "amqp://guest:guest@localhost:5672/", + channel=Channel(prefetch_count=10), +) +``` + +`prefetch_count=N` does **not** mean "process `N` messages at the same time". It means "let the broker push up to `N` messages to this consumer before requiring an ack". It is a buffer / flow-control window — not parallelism. + +## How they combine + +The two settings are orthogonal: + +| `max_workers` | `prefetch_count` | Behavior | +|---------------|------------------|-----------------------------------------------------------------------------------------| +| `1` | `1` | One message in flight, processed sequentially. The classic per-message round trip. | +| `1` | `N` | Up to `N` messages held locally by the consumer, but **still processed one at a time**. | +| `K` | `N` (≥ `K`) | Up to `K` invocations of the handler run concurrently; up to `N` may be buffered. | +| `K` | `< K` | Concurrency is throttled by `prefetch_count`: the broker will not push enough messages to keep all workers busy. | + +A useful rule of thumb for RabbitMQ subscribers: set `prefetch_count` at least as large as `max_workers`, otherwise extra workers sit idle waiting for messages. + +## Other brokers + +- **Kafka / Confluent.** There is no `prefetch_count`. The consumer pulls in batches; tune `max_poll_records` (and friends) to control how many records are fetched per poll, and use `max_workers` for handler parallelism. See [Kafka Subscriber concurrent processing](../../kafka/Subscriber/index.md#concurrent-processing) for the interaction with `AckPolicy`. +- **Redis, NATS, MQTT.** Only `max_workers` applies. There is no broker-side prefetch knob equivalent to RabbitMQ's. + +## The common confusion + +> Does `prefetch_count=10` mean ten messages are processed at the same time? + +No. It means the **broker** may deliver up to ten messages before it expects acknowledgments. How many are *processed in parallel* is determined by `max_workers`. With the default `max_workers=1`, the ten prefetched messages are processed one after another — `prefetch_count` only changes how aggressively they are pulled from the broker. diff --git a/docs/docs/navigation_template.txt b/docs/docs/navigation_template.txt index 3434da56e05..0407d55c814 100644 --- a/docs/docs/navigation_template.txt +++ b/docs/docs/navigation_template.txt @@ -12,6 +12,7 @@ search: - [Filtering](getting-started/subscription/filtering.md) - [Testing](getting-started/subscription/test.md) - [Dynamic Subscribers](getting-started/subscription/dynamic.md) + - [Concurrency](getting-started/subscription/concurrency.md) - [Publishing](getting-started/publishing/index.md) - [Broker Publish](getting-started/publishing/broker.md) - [Decorator](getting-started/publishing/decorator.md)