From c8afed6bde8fef434aacc1ee7159d20145d4d6f0 Mon Sep 17 00:00:00 2001 From: "daniel.sasu" Date: Wed, 3 Jun 2026 12:04:50 +0200 Subject: [PATCH 1/4] feat(codec): add destination parameter to encode --- faststream/_internal/parser.py | 3 +++ faststream/confluent/publisher/producer.py | 6 +++--- faststream/confluent/testing.py | 4 ++-- faststream/kafka/publisher/producer.py | 6 +++--- faststream/kafka/testing.py | 4 ++-- faststream/mqtt/publisher/producer.py | 8 ++++---- faststream/mqtt/testing.py | 2 +- faststream/nats/publisher/producer.py | 8 ++++---- faststream/nats/testing.py | 2 +- tests/brokers/base/codec.py | 12 +++++++----- 10 files changed, 30 insertions(+), 25 deletions(-) diff --git a/faststream/_internal/parser.py b/faststream/_internal/parser.py index 815075136a..60ff281950 100644 --- a/faststream/_internal/parser.py +++ b/faststream/_internal/parser.py @@ -66,6 +66,7 @@ async def encode( self, msg: "SendableMessage", serializer: "SerializerProto | None" = None, + destination: str = "", ) -> tuple[bytes, str | None]: ... @@ -76,6 +77,7 @@ async def encode_batch( self, msgs: Sequence["SendableMessage"], serializer: "SerializerProto | None" = None, + destination: str = "", ) -> list[tuple[bytes, str | None]]: ... @abstractmethod @@ -93,5 +95,6 @@ async def encode( self, msg: "SendableMessage", serializer: "SerializerProto | None" = None, + destination: str = "", ) -> tuple[bytes, str | None]: return encode_message(msg, serializer) diff --git a/faststream/confluent/publisher/producer.py b/faststream/confluent/publisher/producer.py index 87748ae1ae..bcc8055d96 100644 --- a/faststream/confluent/publisher/producer.py +++ b/faststream/confluent/publisher/producer.py @@ -139,7 +139,7 @@ async def publish( cmd: "KafkaPublishCommand", ) -> "asyncio.Future[Message | None] | Message | None": """Publish a message to a topic.""" - message, content_type = await self.codec.encode(cmd.body, self.serializer) + message, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) headers_to_send = { "content-type": content_type or "", @@ -165,11 +165,11 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: if isinstance(self.codec, BatchCodecProto): encoded_batch = await self.codec.encode_batch( - cmd.batch_bodies, self.serializer + cmd.batch_bodies, self.serializer, destination=cmd.destination ) else: encoded_batch = [ - await self.codec.encode(msg, self.serializer) for msg in cmd.batch_bodies + await self.codec.encode(msg, self.serializer, destination=cmd.destination) for msg in cmd.batch_bodies ] for message_position, (message, content_type) in enumerate(encoded_batch): diff --git a/faststream/confluent/testing.py b/faststream/confluent/testing.py index 1e5853b1f9..4bf3eeed44 100644 --- a/faststream/confluent/testing.py +++ b/faststream/confluent/testing.py @@ -161,10 +161,10 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: serializer = self.broker.config.fd_config._serializer if isinstance(self.codec, BatchCodecProto): - encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer) + encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer, destination=cmd.destination) else: encoded = [ - await self.codec.encode(body, serializer) for body in cmd.batch_bodies + await self.codec.encode(body, serializer, destination=cmd.destination) for body in cmd.batch_bodies ] for handler in _find_handler( diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index 3a6fd5c512..9f08009768 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -110,7 +110,7 @@ async def publish( cmd: "KafkaPublishCommand", ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: """Publish a message to a topic.""" - message, content_type = await self.codec.encode(cmd.body, self.serializer) + message, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) headers_to_send = { "content-type": content_type or "", @@ -142,11 +142,11 @@ async def publish_batch( if isinstance(self.codec, BatchCodecProto): encoded_batch = await self.codec.encode_batch( - cmd.batch_bodies, self.serializer + cmd.batch_bodies, self.serializer, destination=cmd.destination ) else: encoded_batch = [ - await self.codec.encode(body, self.serializer) + await self.codec.encode(body, self.serializer, destination=cmd.destination) for body in cmd.batch_bodies ] diff --git a/faststream/kafka/testing.py b/faststream/kafka/testing.py index cff58d56b0..4591780aa8 100755 --- a/faststream/kafka/testing.py +++ b/faststream/kafka/testing.py @@ -210,10 +210,10 @@ async def publish_batch( serializer = self.broker.config.fd_config._serializer if isinstance(self.codec, BatchCodecProto): - encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer) + encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer, destination=cmd.destination) else: encoded = [ - await self.codec.encode(body, serializer) for body in cmd.batch_bodies + await self.codec.encode(body, serializer, destination=cmd.destination) for body in cmd.batch_bodies ] for handler in _find_handler( diff --git a/faststream/mqtt/publisher/producer.py b/faststream/mqtt/publisher/producer.py index 6cb4dc83ef..f278beb481 100644 --- a/faststream/mqtt/publisher/producer.py +++ b/faststream/mqtt/publisher/producer.py @@ -92,7 +92,7 @@ async def publish(self, cmd: "MQTTPublishCommand") -> None: if cmd.headers: msg = "MQTT 3.1.1 does not support message headers. Use MQTT 5.0." raise FeatureNotSupportedException(msg) - payload, _ = await self.codec.encode(cmd.body, self.serializer) + payload, _ = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) await self._connected_client.publish( cmd.destination, payload, @@ -117,7 +117,7 @@ async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message": await sub.start() try: - payload, _ = await self.codec.encode(cmd.body, self.serializer) + payload, _ = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) await self._connected_client.publish( cmd.destination, payload, @@ -144,7 +144,7 @@ def __init__( @override async def publish(self, cmd: "MQTTPublishCommand") -> None: - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) user_props: list[tuple[str, str]] = [ (k, str(v)) for k, v in (cmd.headers or {}).items() @@ -174,7 +174,7 @@ async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message": ID explicitly so the responder echoes it back and the caller can verify it on the response StreamMessage. """ - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) correlation_id = cmd.correlation_id or gen_cor_id() user_props: list[tuple[str, str]] = [ diff --git a/faststream/mqtt/testing.py b/faststream/mqtt/testing.py index a73768f871..21ac23e31d 100644 --- a/faststream/mqtt/testing.py +++ b/faststream/mqtt/testing.py @@ -253,7 +253,7 @@ async def build_message( """ if codec is None: codec = DefaultCodec() - payload, content_type = await codec.encode(message, serializer=serializer) + payload, content_type = await codec.encode(message, serializer=serializer, destination=topic) if version == "3.1.1": return zmqtt.Message( diff --git a/faststream/nats/publisher/producer.py b/faststream/nats/publisher/producer.py index 8095efd391..ec816990b1 100644 --- a/faststream/nats/publisher/producer.py +++ b/faststream/nats/publisher/producer.py @@ -90,7 +90,7 @@ def disconnect(self) -> None: @override async def publish(self, cmd: "NatsPublishCommand") -> None: - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) headers_to_send = { "content-type": content_type or "", @@ -106,7 +106,7 @@ async def publish(self, cmd: "NatsPublishCommand") -> None: @override async def request(self, cmd: "NatsPublishCommand") -> "Msg": - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) headers_to_send = { "content-type": content_type or "", @@ -157,7 +157,7 @@ def disconnect(self) -> None: @override async def publish(self, cmd: "NatsPublishCommand") -> "PubAck": - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) headers_to_send = { "content-type": content_type or "", @@ -174,7 +174,7 @@ async def publish(self, cmd: "NatsPublishCommand") -> "PubAck": @override async def request(self, cmd: "NatsPublishCommand") -> "Msg": - payload, content_type = await self.codec.encode(cmd.body, self.serializer) + payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) reply_to = self.__state.connection._nc.new_inbox() future: asyncio.Future[Msg] = asyncio.Future() diff --git a/faststream/nats/testing.py b/faststream/nats/testing.py index d46c91b157..bbc39aeb27 100644 --- a/faststream/nats/testing.py +++ b/faststream/nats/testing.py @@ -241,7 +241,7 @@ async def build_message( ) -> "PatchedMessage": if codec is None: codec = DefaultCodec() - msg, content_type = await codec.encode(message, serializer=serializer) + msg, content_type = await codec.encode(message, serializer=serializer, destination=subject) return PatchedMessage( _client=None, # type: ignore[arg-type] subject=subject, diff --git a/tests/brokers/base/codec.py b/tests/brokers/base/codec.py index 52ebfa692b..5c8d3a45e0 100644 --- a/tests/brokers/base/codec.py +++ b/tests/brokers/base/codec.py @@ -103,9 +103,9 @@ async def test_codec_encode_called(self, queue: str) -> None: mock = MagicMock() class TrackingCodec(DefaultCodec): - async def encode(self, msg, serializer=None): + async def encode(self, msg, serializer=None, destination=""): mock() - return await super().encode(msg, serializer) + return await super().encode(msg, serializer, destination=destination) broker = self.get_broker(codec=TrackingCodec()) @@ -131,7 +131,7 @@ async def test_default_codec_encode_matches_encode_message(self, queue: str) -> ] for msg in test_cases: - codec_result = await codec.encode(msg, None) + codec_result = await codec.encode(msg, None, destination="test") direct_result = encode_message(msg, None) assert codec_result == direct_result, ( f"DefaultCodec.encode({msg!r}) = {codec_result!r} " @@ -153,8 +153,9 @@ async def encode_batch( self, msgs: Sequence[Any], serializer: Any = None, + destination: str = "", ) -> list[tuple[bytes, str | None]]: - return [await DefaultCodec.encode(self, m, serializer) for m in msgs] + return [await DefaultCodec.encode(self, m, serializer, destination=destination) for m in msgs] async def decode_batch(self, msg: Any) -> list[Any]: decode_batch_mock() @@ -184,9 +185,10 @@ async def encode_batch( self, msgs: Sequence[Any], serializer: Any = None, + destination: str = "", ) -> list[tuple[bytes, str | None]]: encode_batch_mock() - return [await DefaultCodec.encode(self, m, serializer) for m in msgs] + return [await DefaultCodec.encode(self, m, serializer, destination=destination) for m in msgs] async def decode_batch(self, msg: Any) -> list[Any]: return [b.decode() if isinstance(b, bytes) else b for b in msg.body] From a4a0bbc33ee4a22f3e298b7232d0bbb01a6b76ef Mon Sep 17 00:00:00 2001 From: "daniel.sasu" Date: Wed, 3 Jun 2026 12:17:05 +0200 Subject: [PATCH 2/4] docs(codec): add gzip codec examples and documentation fix linter --- .../en/getting-started/serialization/codec.md | 88 +++++++++++++++++++ .../getting-started/serialization/decoder.md | 5 ++ docs/docs/navigation_template.txt | 1 + .../serialization/codec_gzip_confluent.py | 41 +++++++++ .../serialization/codec_gzip_kafka.py | 41 +++++++++ .../serialization/codec_gzip_mqtt.py | 41 +++++++++ .../serialization/codec_gzip_nats.py | 41 +++++++++ .../serialization/codec_gzip_rabbit.py | 41 +++++++++ .../serialization/codec_gzip_redis.py | 41 +++++++++ faststream/confluent/publisher/producer.py | 7 +- faststream/confluent/testing.py | 7 +- faststream/kafka/publisher/producer.py | 8 +- faststream/kafka/testing.py | 7 +- faststream/mqtt/publisher/producer.py | 16 +++- faststream/mqtt/testing.py | 4 +- faststream/nats/publisher/producer.py | 16 +++- faststream/nats/testing.py | 4 +- tests/brokers/base/codec.py | 14 ++- 18 files changed, 403 insertions(+), 20 deletions(-) create mode 100644 docs/docs/en/getting-started/serialization/codec.md create mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_confluent.py create mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_kafka.py create mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py create mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_nats.py create mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py create mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_redis.py diff --git a/docs/docs/en/getting-started/serialization/codec.md b/docs/docs/en/getting-started/serialization/codec.md new file mode 100644 index 0000000000..3348dbc41b --- /dev/null +++ b/docs/docs/en/getting-started/serialization/codec.md @@ -0,0 +1,88 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 10 +--- + +# Custom Codec + +A codec provides a unified interface for both encoding (publishing) and decoding (consuming) messages. Unlike the older `decoder=` approach, a codec handles both directions in a single class. + +## Protocol + +Implement the `CodecProto` interface to create a custom codec: + +```python +class CodecProto(Protocol): + async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": ... + async def encode( + self, + msg: "SendableMessage", + serializer: "SerializerProto | None" = None, + destination: str = "", + ) -> tuple[bytes, str | None]: ... +``` + +- **`decode`** — receives a `StreamMessage` with raw bytes in `msg.body` and returns the decoded Python value. You can mutate `msg.body` before delegating to `decode_message`. +- **`encode`** — receives the outgoing message, an optional serializer, and the destination topic/subject/queue. Returns a `(bytes, content_type)` tuple. +- **`destination`** — the target topic, subject, or queue name. Useful for codecs that need destination-specific behavior (e.g. Schema Registry topic-to-schema resolution). + +If no codec is set, `DefaultCodec` is used automatically. It handles JSON objects, plain text, and raw bytes without any configuration. + +## Compression Example + +A Gzip codec that compresses outgoing messages and decompresses incoming ones: + +=== "AIOKafka" + ```python linenums="1" hl_lines="15-27 30" + {!> docs_src/getting_started/serialization/codec_gzip_kafka.py !} + ``` + +=== "Confluent" + ```python linenums="1" hl_lines="15-27 30" + {!> docs_src/getting_started/serialization/codec_gzip_confluent.py !} + ``` + +=== "RabbitMQ" + ```python linenums="1" hl_lines="15-27 30" + {!> docs_src/getting_started/serialization/codec_gzip_rabbit.py !} + ``` + +=== "NATS" + ```python linenums="1" hl_lines="15-27 30" + {!> docs_src/getting_started/serialization/codec_gzip_nats.py !} + ``` + +=== "Redis" + ```python linenums="1" hl_lines="15-27 30" + {!> docs_src/getting_started/serialization/codec_gzip_redis.py !} + ``` + +=== "MQTT" + ```python linenums="1" hl_lines="15-27 30" + {!> docs_src/getting_started/serialization/codec_gzip_mqtt.py !} + ``` + +## Priority + +You can set a codec at the broker level or override it per subscriber. The subscriber-level codec always wins: + +```python +broker = KafkaBroker(codec=BrokerCodec()) + +@broker.subscriber("test", codec=SubscriberCodec()) # ← this wins +async def handle(body: str) -> None: + ... + +# If no codec is set at any level, DefaultCodec is used (JSON/text/bytes) +``` + +## Compatibility + +- **`codec=` and `parser=`** work together. The parser controls how the raw broker message is parsed into a `StreamMessage`; the codec then decodes or encodes the body. +- **`codec=` and `decoder=`** cannot be used together. Specifying both raises a `ValueError`. +- For the legacy `decoder=` approach, see [Custom Decoder](./decoder.md){.internal-link}. diff --git a/docs/docs/en/getting-started/serialization/decoder.md b/docs/docs/en/getting-started/serialization/decoder.md index 36a97b7989..47732fe029 100644 --- a/docs/docs/en/getting-started/serialization/decoder.md +++ b/docs/docs/en/getting-started/serialization/decoder.md @@ -8,6 +8,11 @@ search: boost: 10 --- +!!! warning "Superseded by Codec" + The `decoder=` parameter has been superseded by the new **Codec** system, which handles both encoding and decoding in a single interface. See [Custom Codec](./codec.md){.internal-link} for the recommended approach. + + Note: `codec=` and `decoder=` cannot be used together — specifying both will raise a `ValueError`. + # Custom Decoder At this stage, the body of a **StreamMessage** is transformed into the format that it will take when it enters your handler function. This stage is the one you will need to redefine more often. diff --git a/docs/docs/navigation_template.txt b/docs/docs/navigation_template.txt index 8ae793fa49..d30018d7e3 100644 --- a/docs/docs/navigation_template.txt +++ b/docs/docs/navigation_template.txt @@ -23,6 +23,7 @@ search: - [Context](getting-started/context.md) - [Custom Serialization](getting-started/serialization/index.md) - [Parser](getting-started/serialization/parser.md) + - [Codec](getting-started/serialization/codec.md) - [Decoder](getting-started/serialization/decoder.md) - [Examples](getting-started/serialization/examples.md) - [Lifespan](getting-started/lifespan/index.md) diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_confluent.py b/docs/docs_src/getting_started/serialization/codec_gzip_confluent.py new file mode 100644 index 0000000000..b233fa70fd --- /dev/null +++ b/docs/docs_src/getting_started/serialization/codec_gzip_confluent.py @@ -0,0 +1,41 @@ +import gzip +from typing import TYPE_CHECKING, Any + +from faststream import FastStream +from faststream.confluent import KafkaBroker +from faststream.message.utils import decode_message, encode_message + +if TYPE_CHECKING: + from fast_depends.library.serializer import SerializerProto + + from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream.message import StreamMessage + + +class GzipCodec: + async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": + msg.body = gzip.decompress(msg.body) + return decode_message(msg) + + async def encode( + self, + msg: "SendableMessage", + serializer: "SerializerProto | None" = None, + destination: str = "", + ) -> tuple[bytes, str | None]: + raw_bytes, _ = encode_message(msg, serializer) + return gzip.compress(raw_bytes), "application/gzip" + + +broker = KafkaBroker(codec=GzipCodec()) +app = FastStream(broker) + + +@broker.subscriber("test") +async def handle(body: str) -> None: + ... + + +@app.after_startup +async def test() -> None: + await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_kafka.py b/docs/docs_src/getting_started/serialization/codec_gzip_kafka.py new file mode 100644 index 0000000000..9708d5aed3 --- /dev/null +++ b/docs/docs_src/getting_started/serialization/codec_gzip_kafka.py @@ -0,0 +1,41 @@ +import gzip +from typing import TYPE_CHECKING, Any + +from faststream import FastStream +from faststream.kafka import KafkaBroker +from faststream.message.utils import decode_message, encode_message + +if TYPE_CHECKING: + from fast_depends.library.serializer import SerializerProto + + from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream.message import StreamMessage + + +class GzipCodec: + async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": + msg.body = gzip.decompress(msg.body) + return decode_message(msg) + + async def encode( + self, + msg: "SendableMessage", + serializer: "SerializerProto | None" = None, + destination: str = "", + ) -> tuple[bytes, str | None]: + raw_bytes, _ = encode_message(msg, serializer) + return gzip.compress(raw_bytes), "application/gzip" + + +broker = KafkaBroker(codec=GzipCodec()) +app = FastStream(broker) + + +@broker.subscriber("test") +async def handle(body: str) -> None: + ... + + +@app.after_startup +async def test() -> None: + await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py b/docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py new file mode 100644 index 0000000000..b1b2f58de6 --- /dev/null +++ b/docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py @@ -0,0 +1,41 @@ +import gzip +from typing import TYPE_CHECKING, Any + +from faststream import FastStream +from faststream.mqtt import MQTTBroker +from faststream.message.utils import decode_message, encode_message + +if TYPE_CHECKING: + from fast_depends.library.serializer import SerializerProto + + from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream.message import StreamMessage + + +class GzipCodec: + async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": + msg.body = gzip.decompress(msg.body) + return decode_message(msg) + + async def encode( + self, + msg: "SendableMessage", + serializer: "SerializerProto | None" = None, + destination: str = "", + ) -> tuple[bytes, str | None]: + raw_bytes, _ = encode_message(msg, serializer) + return gzip.compress(raw_bytes), "application/gzip" + + +broker = MQTTBroker(codec=GzipCodec()) +app = FastStream(broker) + + +@broker.subscriber("test") +async def handle(body: str) -> None: + ... + + +@app.after_startup +async def test() -> None: + await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_nats.py b/docs/docs_src/getting_started/serialization/codec_gzip_nats.py new file mode 100644 index 0000000000..9082a493db --- /dev/null +++ b/docs/docs_src/getting_started/serialization/codec_gzip_nats.py @@ -0,0 +1,41 @@ +import gzip +from typing import TYPE_CHECKING, Any + +from faststream import FastStream +from faststream.nats import NatsBroker +from faststream.message.utils import decode_message, encode_message + +if TYPE_CHECKING: + from fast_depends.library.serializer import SerializerProto + + from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream.message import StreamMessage + + +class GzipCodec: + async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": + msg.body = gzip.decompress(msg.body) + return decode_message(msg) + + async def encode( + self, + msg: "SendableMessage", + serializer: "SerializerProto | None" = None, + destination: str = "", + ) -> tuple[bytes, str | None]: + raw_bytes, _ = encode_message(msg, serializer) + return gzip.compress(raw_bytes), "application/gzip" + + +broker = NatsBroker(codec=GzipCodec()) +app = FastStream(broker) + + +@broker.subscriber("test") +async def handle(body: str) -> None: + ... + + +@app.after_startup +async def test() -> None: + await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py b/docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py new file mode 100644 index 0000000000..e5888bd0c0 --- /dev/null +++ b/docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py @@ -0,0 +1,41 @@ +import gzip +from typing import TYPE_CHECKING, Any + +from faststream import FastStream +from faststream.rabbit import RabbitBroker +from faststream.message.utils import decode_message, encode_message + +if TYPE_CHECKING: + from fast_depends.library.serializer import SerializerProto + + from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream.message import StreamMessage + + +class GzipCodec: + async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": + msg.body = gzip.decompress(msg.body) + return decode_message(msg) + + async def encode( + self, + msg: "SendableMessage", + serializer: "SerializerProto | None" = None, + destination: str = "", + ) -> tuple[bytes, str | None]: + raw_bytes, _ = encode_message(msg, serializer) + return gzip.compress(raw_bytes), "application/gzip" + + +broker = RabbitBroker(codec=GzipCodec()) +app = FastStream(broker) + + +@broker.subscriber("test") +async def handle(body: str) -> None: + ... + + +@app.after_startup +async def test() -> None: + await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_redis.py b/docs/docs_src/getting_started/serialization/codec_gzip_redis.py new file mode 100644 index 0000000000..ce04441eae --- /dev/null +++ b/docs/docs_src/getting_started/serialization/codec_gzip_redis.py @@ -0,0 +1,41 @@ +import gzip +from typing import TYPE_CHECKING, Any + +from faststream import FastStream +from faststream.redis import RedisBroker +from faststream.message.utils import decode_message, encode_message + +if TYPE_CHECKING: + from fast_depends.library.serializer import SerializerProto + + from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream.message import StreamMessage + + +class GzipCodec: + async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": + msg.body = gzip.decompress(msg.body) + return decode_message(msg) + + async def encode( + self, + msg: "SendableMessage", + serializer: "SerializerProto | None" = None, + destination: str = "", + ) -> tuple[bytes, str | None]: + raw_bytes, _ = encode_message(msg, serializer) + return gzip.compress(raw_bytes), "application/gzip" + + +broker = RedisBroker(codec=GzipCodec()) +app = FastStream(broker) + + +@broker.subscriber("test") +async def handle(body: str) -> None: + ... + + +@app.after_startup +async def test() -> None: + await broker.publish("hello", "test") diff --git a/faststream/confluent/publisher/producer.py b/faststream/confluent/publisher/producer.py index bcc8055d96..a2e2bd7a01 100644 --- a/faststream/confluent/publisher/producer.py +++ b/faststream/confluent/publisher/producer.py @@ -139,7 +139,9 @@ async def publish( cmd: "KafkaPublishCommand", ) -> "asyncio.Future[Message | None] | Message | None": """Publish a message to a topic.""" - message, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) + message, content_type = await self.codec.encode( + cmd.body, self.serializer, destination=cmd.destination + ) headers_to_send = { "content-type": content_type or "", @@ -169,7 +171,8 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: ) else: encoded_batch = [ - await self.codec.encode(msg, self.serializer, destination=cmd.destination) for msg in cmd.batch_bodies + await self.codec.encode(msg, self.serializer, destination=cmd.destination) + for msg in cmd.batch_bodies ] for message_position, (message, content_type) in enumerate(encoded_batch): diff --git a/faststream/confluent/testing.py b/faststream/confluent/testing.py index 4bf3eeed44..0ac6d25a54 100644 --- a/faststream/confluent/testing.py +++ b/faststream/confluent/testing.py @@ -161,10 +161,13 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: serializer = self.broker.config.fd_config._serializer if isinstance(self.codec, BatchCodecProto): - encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer, destination=cmd.destination) + encoded = await self.codec.encode_batch( + cmd.batch_bodies, serializer, destination=cmd.destination + ) else: encoded = [ - await self.codec.encode(body, serializer, destination=cmd.destination) for body in cmd.batch_bodies + await self.codec.encode(body, serializer, destination=cmd.destination) + for body in cmd.batch_bodies ] for handler in _find_handler( diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index 9f08009768..00df984cce 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -110,7 +110,9 @@ async def publish( cmd: "KafkaPublishCommand", ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: """Publish a message to a topic.""" - message, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) + message, content_type = await self.codec.encode( + cmd.body, self.serializer, destination=cmd.destination + ) headers_to_send = { "content-type": content_type or "", @@ -146,7 +148,9 @@ async def publish_batch( ) else: encoded_batch = [ - await self.codec.encode(body, self.serializer, destination=cmd.destination) + await self.codec.encode( + body, self.serializer, destination=cmd.destination + ) for body in cmd.batch_bodies ] diff --git a/faststream/kafka/testing.py b/faststream/kafka/testing.py index 4591780aa8..cf113ad07d 100755 --- a/faststream/kafka/testing.py +++ b/faststream/kafka/testing.py @@ -210,10 +210,13 @@ async def publish_batch( serializer = self.broker.config.fd_config._serializer if isinstance(self.codec, BatchCodecProto): - encoded = await self.codec.encode_batch(cmd.batch_bodies, serializer, destination=cmd.destination) + encoded = await self.codec.encode_batch( + cmd.batch_bodies, serializer, destination=cmd.destination + ) else: encoded = [ - await self.codec.encode(body, serializer, destination=cmd.destination) for body in cmd.batch_bodies + await self.codec.encode(body, serializer, destination=cmd.destination) + for body in cmd.batch_bodies ] for handler in _find_handler( diff --git a/faststream/mqtt/publisher/producer.py b/faststream/mqtt/publisher/producer.py index f278beb481..2b4b0483e8 100644 --- a/faststream/mqtt/publisher/producer.py +++ b/faststream/mqtt/publisher/producer.py @@ -92,7 +92,9 @@ async def publish(self, cmd: "MQTTPublishCommand") -> None: if cmd.headers: msg = "MQTT 3.1.1 does not support message headers. Use MQTT 5.0." raise FeatureNotSupportedException(msg) - payload, _ = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) + payload, _ = await self.codec.encode( + cmd.body, self.serializer, destination=cmd.destination + ) await self._connected_client.publish( cmd.destination, payload, @@ -117,7 +119,9 @@ async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message": await sub.start() try: - payload, _ = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) + payload, _ = await self.codec.encode( + cmd.body, self.serializer, destination=cmd.destination + ) await self._connected_client.publish( cmd.destination, payload, @@ -144,7 +148,9 @@ def __init__( @override async def publish(self, cmd: "MQTTPublishCommand") -> None: - payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) + payload, content_type = await self.codec.encode( + cmd.body, self.serializer, destination=cmd.destination + ) user_props: list[tuple[str, str]] = [ (k, str(v)) for k, v in (cmd.headers or {}).items() @@ -174,7 +180,9 @@ async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message": ID explicitly so the responder echoes it back and the caller can verify it on the response StreamMessage. """ - payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) + payload, content_type = await self.codec.encode( + cmd.body, self.serializer, destination=cmd.destination + ) correlation_id = cmd.correlation_id or gen_cor_id() user_props: list[tuple[str, str]] = [ diff --git a/faststream/mqtt/testing.py b/faststream/mqtt/testing.py index 21ac23e31d..adf4926b8f 100644 --- a/faststream/mqtt/testing.py +++ b/faststream/mqtt/testing.py @@ -253,7 +253,9 @@ async def build_message( """ if codec is None: codec = DefaultCodec() - payload, content_type = await codec.encode(message, serializer=serializer, destination=topic) + payload, content_type = await codec.encode( + message, serializer=serializer, destination=topic + ) if version == "3.1.1": return zmqtt.Message( diff --git a/faststream/nats/publisher/producer.py b/faststream/nats/publisher/producer.py index ec816990b1..c8303280e9 100644 --- a/faststream/nats/publisher/producer.py +++ b/faststream/nats/publisher/producer.py @@ -90,7 +90,9 @@ def disconnect(self) -> None: @override async def publish(self, cmd: "NatsPublishCommand") -> None: - payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) + payload, content_type = await self.codec.encode( + cmd.body, self.serializer, destination=cmd.destination + ) headers_to_send = { "content-type": content_type or "", @@ -106,7 +108,9 @@ async def publish(self, cmd: "NatsPublishCommand") -> None: @override async def request(self, cmd: "NatsPublishCommand") -> "Msg": - payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) + payload, content_type = await self.codec.encode( + cmd.body, self.serializer, destination=cmd.destination + ) headers_to_send = { "content-type": content_type or "", @@ -157,7 +161,9 @@ def disconnect(self) -> None: @override async def publish(self, cmd: "NatsPublishCommand") -> "PubAck": - payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) + payload, content_type = await self.codec.encode( + cmd.body, self.serializer, destination=cmd.destination + ) headers_to_send = { "content-type": content_type or "", @@ -174,7 +180,9 @@ async def publish(self, cmd: "NatsPublishCommand") -> "PubAck": @override async def request(self, cmd: "NatsPublishCommand") -> "Msg": - payload, content_type = await self.codec.encode(cmd.body, self.serializer, destination=cmd.destination) + payload, content_type = await self.codec.encode( + cmd.body, self.serializer, destination=cmd.destination + ) reply_to = self.__state.connection._nc.new_inbox() future: asyncio.Future[Msg] = asyncio.Future() diff --git a/faststream/nats/testing.py b/faststream/nats/testing.py index bbc39aeb27..e03195fc80 100644 --- a/faststream/nats/testing.py +++ b/faststream/nats/testing.py @@ -241,7 +241,9 @@ async def build_message( ) -> "PatchedMessage": if codec is None: codec = DefaultCodec() - msg, content_type = await codec.encode(message, serializer=serializer, destination=subject) + msg, content_type = await codec.encode( + message, serializer=serializer, destination=subject + ) return PatchedMessage( _client=None, # type: ignore[arg-type] subject=subject, diff --git a/tests/brokers/base/codec.py b/tests/brokers/base/codec.py index 5c8d3a45e0..3066528988 100644 --- a/tests/brokers/base/codec.py +++ b/tests/brokers/base/codec.py @@ -155,7 +155,12 @@ async def encode_batch( serializer: Any = None, destination: str = "", ) -> list[tuple[bytes, str | None]]: - return [await DefaultCodec.encode(self, m, serializer, destination=destination) for m in msgs] + return [ + await DefaultCodec.encode( + self, m, serializer, destination=destination + ) + for m in msgs + ] async def decode_batch(self, msg: Any) -> list[Any]: decode_batch_mock() @@ -188,7 +193,12 @@ async def encode_batch( destination: str = "", ) -> list[tuple[bytes, str | None]]: encode_batch_mock() - return [await DefaultCodec.encode(self, m, serializer, destination=destination) for m in msgs] + return [ + await DefaultCodec.encode( + self, m, serializer, destination=destination + ) + for m in msgs + ] async def decode_batch(self, msg: Any) -> list[Any]: return [b.decode() if isinstance(b, bytes) else b for b in msg.body] From 3e0d1d16f39f92b32e1429e3cd205afaed9fdb51 Mon Sep 17 00:00:00 2001 From: "daniel.sasu" Date: Thu, 4 Jun 2026 16:55:16 +0200 Subject: [PATCH 3/4] feat(codec): change encode signature to accept PublishCommand --- .../en/getting-started/serialization/codec.md | 7 ++-- .../serialization/codec_gzip_confluent.py | 8 ++-- .../serialization/codec_gzip_kafka.py | 8 ++-- .../serialization/codec_gzip_mqtt.py | 8 ++-- .../serialization/codec_gzip_nats.py | 8 ++-- .../serialization/codec_gzip_rabbit.py | 8 ++-- .../serialization/codec_gzip_redis.py | 8 ++-- faststream/_internal/parser.py | 14 +++---- faststream/confluent/publisher/producer.py | 15 +++---- faststream/confluent/testing.py | 20 ++++++--- faststream/kafka/publisher/producer.py | 13 +++--- faststream/kafka/testing.py | 20 ++++++--- faststream/mqtt/publisher/producer.py | 16 ++------ faststream/mqtt/testing.py | 9 +++- faststream/nats/publisher/producer.py | 16 ++------ faststream/nats/testing.py | 9 +++- faststream/rabbit/parser.py | 7 +++- faststream/rabbit/publisher/producer.py | 1 + faststream/rabbit/testing.py | 1 + faststream/redis/parser/binary.py | 2 + faststream/redis/parser/message.py | 7 +++- faststream/redis/publisher/producer.py | 5 +++ faststream/redis/testing.py | 4 ++ tests/brokers/base/codec.py | 41 ++++++++++++++----- 24 files changed, 154 insertions(+), 101 deletions(-) diff --git a/docs/docs/en/getting-started/serialization/codec.md b/docs/docs/en/getting-started/serialization/codec.md index 3348dbc41b..3e61071107 100644 --- a/docs/docs/en/getting-started/serialization/codec.md +++ b/docs/docs/en/getting-started/serialization/codec.md @@ -21,15 +21,14 @@ class CodecProto(Protocol): async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": ... async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, - destination: str = "", ) -> tuple[bytes, str | None]: ... ``` - **`decode`** — receives a `StreamMessage` with raw bytes in `msg.body` and returns the decoded Python value. You can mutate `msg.body` before delegating to `decode_message`. -- **`encode`** — receives the outgoing message, an optional serializer, and the destination topic/subject/queue. Returns a `(bytes, content_type)` tuple. -- **`destination`** — the target topic, subject, or queue name. Useful for codecs that need destination-specific behavior (e.g. Schema Registry topic-to-schema resolution). +- **`encode`** — receives a `PublishCommand` with the outgoing message body and metadata, and an optional serializer. Returns a `(bytes, content_type)` tuple. +- **`cmd.destination`** — the target topic, subject, or queue name. Useful for codecs that need destination-specific behavior (e.g. Schema Registry topic-to-schema resolution). If no codec is set, `DefaultCodec` is used automatically. It handles JSON objects, plain text, and raw bytes without any configuration. diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_confluent.py b/docs/docs_src/getting_started/serialization/codec_gzip_confluent.py index b233fa70fd..40fb37dbb0 100644 --- a/docs/docs_src/getting_started/serialization/codec_gzip_confluent.py +++ b/docs/docs_src/getting_started/serialization/codec_gzip_confluent.py @@ -8,8 +8,9 @@ if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto - from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream._internal.basic_types import DecodedMessage from faststream.message import StreamMessage + from faststream.response.response import PublishCommand class GzipCodec: @@ -19,11 +20,10 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, - destination: str = "", ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(msg, serializer) + raw_bytes, _ = encode_message(cmd.body, serializer) return gzip.compress(raw_bytes), "application/gzip" diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_kafka.py b/docs/docs_src/getting_started/serialization/codec_gzip_kafka.py index 9708d5aed3..425dccd820 100644 --- a/docs/docs_src/getting_started/serialization/codec_gzip_kafka.py +++ b/docs/docs_src/getting_started/serialization/codec_gzip_kafka.py @@ -8,8 +8,9 @@ if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto - from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream._internal.basic_types import DecodedMessage from faststream.message import StreamMessage + from faststream.response.response import PublishCommand class GzipCodec: @@ -19,11 +20,10 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, - destination: str = "", ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(msg, serializer) + raw_bytes, _ = encode_message(cmd.body, serializer) return gzip.compress(raw_bytes), "application/gzip" diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py b/docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py index b1b2f58de6..a1b81880e1 100644 --- a/docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py +++ b/docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py @@ -8,8 +8,9 @@ if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto - from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream._internal.basic_types import DecodedMessage from faststream.message import StreamMessage + from faststream.response.response import PublishCommand class GzipCodec: @@ -19,11 +20,10 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, - destination: str = "", ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(msg, serializer) + raw_bytes, _ = encode_message(cmd.body, serializer) return gzip.compress(raw_bytes), "application/gzip" diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_nats.py b/docs/docs_src/getting_started/serialization/codec_gzip_nats.py index 9082a493db..3757c5fded 100644 --- a/docs/docs_src/getting_started/serialization/codec_gzip_nats.py +++ b/docs/docs_src/getting_started/serialization/codec_gzip_nats.py @@ -8,8 +8,9 @@ if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto - from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream._internal.basic_types import DecodedMessage from faststream.message import StreamMessage + from faststream.response.response import PublishCommand class GzipCodec: @@ -19,11 +20,10 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, - destination: str = "", ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(msg, serializer) + raw_bytes, _ = encode_message(cmd.body, serializer) return gzip.compress(raw_bytes), "application/gzip" diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py b/docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py index e5888bd0c0..38861f75cb 100644 --- a/docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py +++ b/docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py @@ -8,8 +8,9 @@ if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto - from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream._internal.basic_types import DecodedMessage from faststream.message import StreamMessage + from faststream.response.response import PublishCommand class GzipCodec: @@ -19,11 +20,10 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, - destination: str = "", ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(msg, serializer) + raw_bytes, _ = encode_message(cmd.body, serializer) return gzip.compress(raw_bytes), "application/gzip" diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_redis.py b/docs/docs_src/getting_started/serialization/codec_gzip_redis.py index ce04441eae..02136652ce 100644 --- a/docs/docs_src/getting_started/serialization/codec_gzip_redis.py +++ b/docs/docs_src/getting_started/serialization/codec_gzip_redis.py @@ -8,8 +8,9 @@ if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto - from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream._internal.basic_types import DecodedMessage from faststream.message import StreamMessage + from faststream.response.response import PublishCommand class GzipCodec: @@ -19,11 +20,10 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, - destination: str = "", ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(msg, serializer) + raw_bytes, _ = encode_message(cmd.body, serializer) return gzip.compress(raw_bytes), "application/gzip" diff --git a/faststream/_internal/parser.py b/faststream/_internal/parser.py index 60ff281950..a70772f6ad 100644 --- a/faststream/_internal/parser.py +++ b/faststream/_internal/parser.py @@ -7,8 +7,9 @@ if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto - from faststream._internal.basic_types import DecodedMessage, SendableMessage + from faststream._internal.basic_types import DecodedMessage from faststream.message import StreamMessage + from faststream.response.response import PublishCommand MsgType = TypeVar("MsgType") @@ -64,9 +65,8 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": @abstractmethod async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, - destination: str = "", ) -> tuple[bytes, str | None]: ... @@ -75,9 +75,8 @@ class BatchCodecProto(Protocol): @abstractmethod async def encode_batch( self, - msgs: Sequence["SendableMessage"], + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, - destination: str = "", ) -> list[tuple[bytes, str | None]]: ... @abstractmethod @@ -93,8 +92,7 @@ async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": async def encode( self, - msg: "SendableMessage", + cmd: "PublishCommand", serializer: "SerializerProto | None" = None, - destination: str = "", ) -> tuple[bytes, str | None]: - return encode_message(msg, serializer) + return encode_message(cmd.body, serializer) diff --git a/faststream/confluent/publisher/producer.py b/faststream/confluent/publisher/producer.py index a2e2bd7a01..fee74a9428 100644 --- a/faststream/confluent/publisher/producer.py +++ b/faststream/confluent/publisher/producer.py @@ -139,9 +139,7 @@ async def publish( cmd: "KafkaPublishCommand", ) -> "asyncio.Future[Message | None] | Message | None": """Publish a message to a topic.""" - message, content_type = await self.codec.encode( - cmd.body, self.serializer, destination=cmd.destination - ) + message, content_type = await self.codec.encode(cmd, self.serializer) headers_to_send = { "content-type": content_type or "", @@ -166,12 +164,15 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: headers_to_send = cmd.headers_to_publish() if isinstance(self.codec, BatchCodecProto): - encoded_batch = await self.codec.encode_batch( - cmd.batch_bodies, self.serializer, destination=cmd.destination - ) + encoded_batch = await self.codec.encode_batch(cmd, self.serializer) else: + from faststream.response.response import PublishCommand as _BaseCmd + encoded_batch = [ - await self.codec.encode(msg, self.serializer, destination=cmd.destination) + await self.codec.encode( + _BaseCmd(body=msg, destination=cmd.destination, _publish_type=cmd.publish_type), + self.serializer, + ) for msg in cmd.batch_bodies ] diff --git a/faststream/confluent/testing.py b/faststream/confluent/testing.py index 0ac6d25a54..1a2d9948dd 100644 --- a/faststream/confluent/testing.py +++ b/faststream/confluent/testing.py @@ -18,6 +18,7 @@ from faststream.confluent.subscriber.usecase import BatchSubscriber from faststream.exceptions import SubscriberNotFound from faststream.message import gen_cor_id +from faststream.response.response import PublishCommand as _BasePublishCommand if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto @@ -161,12 +162,17 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: serializer = self.broker.config.fd_config._serializer if isinstance(self.codec, BatchCodecProto): - encoded = await self.codec.encode_batch( - cmd.batch_bodies, serializer, destination=cmd.destination - ) + encoded = await self.codec.encode_batch(cmd, serializer) else: encoded = [ - await self.codec.encode(body, serializer, destination=cmd.destination) + await self.codec.encode( + _BasePublishCommand( + body=body, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), + serializer, + ) for body in cmd.batch_bodies ] @@ -314,8 +320,12 @@ async def build_message( codec: Optional["CodecProto"] = None, ) -> MockConfluentMessage: """Build a mock confluent_kafka.Message for a sendable message.""" + from faststream.response.publish_type import PublishType + from faststream.response.response import PublishCommand as _BaseCmd + codec_instance = codec or DefaultCodec() - msg, content_type = await codec_instance.encode(message, serializer) + publish_cmd = _BaseCmd(body=message, destination=topic, _publish_type=PublishType.PUBLISH) + msg, content_type = await codec_instance.encode(publish_cmd, serializer) k = key or b"" headers = { "content-type": content_type or "", diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index 00df984cce..3ad86e2b22 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -110,9 +110,7 @@ async def publish( cmd: "KafkaPublishCommand", ) -> Union["asyncio.Future[RecordMetadata]", "RecordMetadata"]: """Publish a message to a topic.""" - message, content_type = await self.codec.encode( - cmd.body, self.serializer, destination=cmd.destination - ) + message, content_type = await self.codec.encode(cmd, self.serializer) headers_to_send = { "content-type": content_type or "", @@ -143,13 +141,14 @@ async def publish_batch( headers_to_send = cmd.headers_to_publish() if isinstance(self.codec, BatchCodecProto): - encoded_batch = await self.codec.encode_batch( - cmd.batch_bodies, self.serializer, destination=cmd.destination - ) + encoded_batch = await self.codec.encode_batch(cmd, self.serializer) else: + from faststream.response.response import PublishCommand as _BaseCmd + encoded_batch = [ await self.codec.encode( - body, self.serializer, destination=cmd.destination + _BaseCmd(body=body, destination=cmd.destination, _publish_type=cmd.publish_type), + self.serializer, ) for body in cmd.batch_bodies ] diff --git a/faststream/kafka/testing.py b/faststream/kafka/testing.py index cf113ad07d..c179873690 100755 --- a/faststream/kafka/testing.py +++ b/faststream/kafka/testing.py @@ -21,6 +21,7 @@ from faststream.kafka.publisher.usecase import BatchPublisher from faststream.kafka.subscriber.usecase import BatchSubscriber from faststream.message import gen_cor_id +from faststream.response.response import PublishCommand as _BasePublishCommand if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto @@ -210,12 +211,17 @@ async def publish_batch( serializer = self.broker.config.fd_config._serializer if isinstance(self.codec, BatchCodecProto): - encoded = await self.codec.encode_batch( - cmd.batch_bodies, serializer, destination=cmd.destination - ) + encoded = await self.codec.encode_batch(cmd, serializer) else: encoded = [ - await self.codec.encode(body, serializer, destination=cmd.destination) + await self.codec.encode( + _BasePublishCommand( + body=body, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), + serializer, + ) for body in cmd.batch_bodies ] @@ -278,7 +284,11 @@ async def build_message( codec: Optional["CodecProto"] = None, ) -> "ConsumerRecord": """Build a Kafka ConsumerRecord for a sendable message.""" - msg, content_type = await (codec or DefaultCodec()).encode(message, serializer) + from faststream.response.publish_type import PublishType + from faststream.response.response import PublishCommand as _BaseCmd + + publish_cmd = _BaseCmd(body=message, destination=topic, _publish_type=PublishType.PUBLISH) + msg, content_type = await (codec or DefaultCodec()).encode(publish_cmd, serializer) k = key or b"" diff --git a/faststream/mqtt/publisher/producer.py b/faststream/mqtt/publisher/producer.py index 2b4b0483e8..8ad0026808 100644 --- a/faststream/mqtt/publisher/producer.py +++ b/faststream/mqtt/publisher/producer.py @@ -92,9 +92,7 @@ async def publish(self, cmd: "MQTTPublishCommand") -> None: if cmd.headers: msg = "MQTT 3.1.1 does not support message headers. Use MQTT 5.0." raise FeatureNotSupportedException(msg) - payload, _ = await self.codec.encode( - cmd.body, self.serializer, destination=cmd.destination - ) + payload, _ = await self.codec.encode(cmd, self.serializer) await self._connected_client.publish( cmd.destination, payload, @@ -119,9 +117,7 @@ async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message": await sub.start() try: - payload, _ = await self.codec.encode( - cmd.body, self.serializer, destination=cmd.destination - ) + payload, _ = await self.codec.encode(cmd, self.serializer) await self._connected_client.publish( cmd.destination, payload, @@ -148,9 +144,7 @@ def __init__( @override async def publish(self, cmd: "MQTTPublishCommand") -> None: - payload, content_type = await self.codec.encode( - cmd.body, self.serializer, destination=cmd.destination - ) + payload, content_type = await self.codec.encode(cmd, self.serializer) user_props: list[tuple[str, str]] = [ (k, str(v)) for k, v in (cmd.headers or {}).items() @@ -180,9 +174,7 @@ async def request(self, cmd: "MQTTPublishCommand") -> "zmqtt.Message": ID explicitly so the responder echoes it back and the caller can verify it on the response StreamMessage. """ - payload, content_type = await self.codec.encode( - cmd.body, self.serializer, destination=cmd.destination - ) + payload, content_type = await self.codec.encode(cmd, self.serializer) correlation_id = cmd.correlation_id or gen_cor_id() user_props: list[tuple[str, str]] = [ diff --git a/faststream/mqtt/testing.py b/faststream/mqtt/testing.py index adf4926b8f..a5fb5b0395 100644 --- a/faststream/mqtt/testing.py +++ b/faststream/mqtt/testing.py @@ -17,6 +17,8 @@ from faststream.mqtt.parser import MQTTParserV5, MQTTParserV311 from faststream.mqtt.publisher.producer import ZmqttBaseProducer from faststream.mqtt.response import MQTTPublishCommand +from faststream.response.publish_type import PublishType +from faststream.response.response import PublishCommand as _BasePublishCommand if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto @@ -253,9 +255,12 @@ async def build_message( """ if codec is None: codec = DefaultCodec() - payload, content_type = await codec.encode( - message, serializer=serializer, destination=topic + publish_cmd = _BasePublishCommand( + body=message, + destination=topic, + _publish_type=PublishType.PUBLISH, ) + payload, content_type = await codec.encode(publish_cmd, serializer=serializer) if version == "3.1.1": return zmqtt.Message( diff --git a/faststream/nats/publisher/producer.py b/faststream/nats/publisher/producer.py index c8303280e9..561baa9f4c 100644 --- a/faststream/nats/publisher/producer.py +++ b/faststream/nats/publisher/producer.py @@ -90,9 +90,7 @@ def disconnect(self) -> None: @override async def publish(self, cmd: "NatsPublishCommand") -> None: - payload, content_type = await self.codec.encode( - cmd.body, self.serializer, destination=cmd.destination - ) + payload, content_type = await self.codec.encode(cmd, self.serializer) headers_to_send = { "content-type": content_type or "", @@ -108,9 +106,7 @@ async def publish(self, cmd: "NatsPublishCommand") -> None: @override async def request(self, cmd: "NatsPublishCommand") -> "Msg": - payload, content_type = await self.codec.encode( - cmd.body, self.serializer, destination=cmd.destination - ) + payload, content_type = await self.codec.encode(cmd, self.serializer) headers_to_send = { "content-type": content_type or "", @@ -161,9 +157,7 @@ def disconnect(self) -> None: @override async def publish(self, cmd: "NatsPublishCommand") -> "PubAck": - payload, content_type = await self.codec.encode( - cmd.body, self.serializer, destination=cmd.destination - ) + payload, content_type = await self.codec.encode(cmd, self.serializer) headers_to_send = { "content-type": content_type or "", @@ -180,9 +174,7 @@ async def publish(self, cmd: "NatsPublishCommand") -> "PubAck": @override async def request(self, cmd: "NatsPublishCommand") -> "Msg": - payload, content_type = await self.codec.encode( - cmd.body, self.serializer, destination=cmd.destination - ) + payload, content_type = await self.codec.encode(cmd, self.serializer) reply_to = self.__state.connection._nc.new_inbox() future: asyncio.Future[Msg] = asyncio.Future() diff --git a/faststream/nats/testing.py b/faststream/nats/testing.py index e03195fc80..b65eac4ca0 100644 --- a/faststream/nats/testing.py +++ b/faststream/nats/testing.py @@ -16,6 +16,8 @@ from faststream.nats.parser import NatsParser from faststream.nats.publisher.producer import NatsFastProducer from faststream.nats.schemas.js_stream import is_subject_match_wildcard +from faststream.response.publish_type import PublishType +from faststream.response.response import PublishCommand as _BasePublishCommand if TYPE_CHECKING: from fast_depends.library.serializer import SerializerProto @@ -241,9 +243,12 @@ async def build_message( ) -> "PatchedMessage": if codec is None: codec = DefaultCodec() - msg, content_type = await codec.encode( - message, serializer=serializer, destination=subject + publish_cmd = _BasePublishCommand( + body=message, + destination=subject, + _publish_type=PublishType.PUBLISH, ) + msg, content_type = await codec.encode(publish_cmd, serializer=serializer) return PatchedMessage( _client=None, # type: ignore[arg-type] subject=subject, diff --git a/faststream/rabbit/parser.py b/faststream/rabbit/parser.py index 796b6d41f0..226a41f351 100644 --- a/faststream/rabbit/parser.py +++ b/faststream/rabbit/parser.py @@ -60,6 +60,7 @@ async def decode_message( async def encode_message( message: "AioPikaSendableMessage", *, + destination: str = "", persist: bool = False, reply_to: str | None = None, headers: Optional["HeadersType"] = None, @@ -80,8 +81,12 @@ async def encode_message( if isinstance(message, Message): return message + from faststream.response.publish_type import PublishType + from faststream.response.response import PublishCommand as _BaseCmd + + publish_cmd = _BaseCmd(body=message, destination=destination, _publish_type=PublishType.PUBLISH) message_body, generated_content_type = await (codec or DefaultCodec()).encode( - message, serializer + publish_cmd, serializer ) delivery_mode = ( diff --git a/faststream/rabbit/publisher/producer.py b/faststream/rabbit/publisher/producer.py index ddd1caf16a..b1960bd28e 100644 --- a/faststream/rabbit/publisher/producer.py +++ b/faststream/rabbit/publisher/producer.py @@ -195,6 +195,7 @@ async def _publish( ) -> Optional["aiormq.abc.ConfirmationFrameType"]: message = await AioPikaParser.encode_message( message=message, + destination=routing_key, serializer=self.serializer, codec=self.codec, **message_options, diff --git a/faststream/rabbit/testing.py b/faststream/rabbit/testing.py index 0b812df0ef..06b7539d25 100644 --- a/faststream/rabbit/testing.py +++ b/faststream/rabbit/testing.py @@ -152,6 +152,7 @@ async def build_message( correlation_id = correlation_id or gen_cor_id() msg = await AioPikaParser.encode_message( message=message, + destination=routing, persist=persist, reply_to=reply_to, headers=headers, diff --git a/faststream/redis/parser/binary.py b/faststream/redis/parser/binary.py index b64f3c396e..50d965e93a 100644 --- a/faststream/redis/parser/binary.py +++ b/faststream/redis/parser/binary.py @@ -31,6 +31,7 @@ async def encode( reply_to: str | None, headers: dict[str, Any] | None, correlation_id: str, + destination: str = "", serializer: Optional["SerializerProto"] = None, codec: Optional["CodecProto"] = None, ) -> bytes: @@ -39,6 +40,7 @@ async def encode( reply_to=reply_to, headers=headers, correlation_id=correlation_id, + destination=destination, serializer=serializer, codec=codec, ) diff --git a/faststream/redis/parser/message.py b/faststream/redis/parser/message.py index b0357f52cd..3496518ab9 100644 --- a/faststream/redis/parser/message.py +++ b/faststream/redis/parser/message.py @@ -35,11 +35,16 @@ async def build( reply_to: str | None, headers: dict[str, Any] | None, correlation_id: str, + destination: str = "", serializer: Optional["SerializerProto"] = None, codec: Optional["CodecProto"] = None, ) -> "MessageFormat": + from faststream.response.publish_type import PublishType + from faststream.response.response import PublishCommand as _BaseCmd + codec_instance = codec or DefaultCodec() - payload, content_type = await codec_instance.encode(message, serializer) # type: ignore[arg-type] + publish_cmd = _BaseCmd(body=message, destination=destination, _publish_type=PublishType.PUBLISH) + payload, content_type = await codec_instance.encode(publish_cmd, serializer) headers_to_send = { "correlation_id": correlation_id, diff --git a/faststream/redis/publisher/producer.py b/faststream/redis/publisher/producer.py index fb8bdc73f5..5a218282de 100644 --- a/faststream/redis/publisher/producer.py +++ b/faststream/redis/publisher/producer.py @@ -63,6 +63,7 @@ async def publish_batch(self, cmd: "RedisPublishCommand") -> int: correlation_id=cmd.correlation_id or "", reply_to=cmd.reply_to, headers=cmd.headers, + destination=cmd.destination, serializer=self.serializer, codec=self.codec, ) @@ -115,6 +116,7 @@ async def publish(self, cmd: "RedisPublishCommand") -> int | bytes: reply_to=cmd.reply_to, headers=cmd.headers, correlation_id=cmd.correlation_id or "", + destination=cmd.destination, serializer=self.serializer, codec=self.codec, ) @@ -161,6 +163,7 @@ async def request(self, cmd: "RedisPublishCommand") -> "Any": reply_to=reply_to, headers=cmd.headers, correlation_id=cmd.correlation_id or "", + destination=cmd.destination, serializer=self.serializer, codec=self.codec, ) @@ -218,6 +221,7 @@ async def publish(self, cmd: "RedisPublishCommand") -> int | bytes: reply_to=cmd.reply_to, headers=cmd.headers, correlation_id=cmd.correlation_id or "", + destination=cmd.destination, serializer=self.serializer, codec=self.codec, ) @@ -252,6 +256,7 @@ async def request(self, cmd: "RedisPublishCommand") -> "Any": reply_to=reply_to, headers=cmd.headers, correlation_id=cmd.correlation_id or "", + destination=cmd.destination, serializer=self.serializer, codec=self.codec, ) diff --git a/faststream/redis/testing.py b/faststream/redis/testing.py index 4ba89fe968..6d558f5bf5 100644 --- a/faststream/redis/testing.py +++ b/faststream/redis/testing.py @@ -195,6 +195,7 @@ async def publish(self, cmd: "RedisPublishCommand") -> int | bytes: correlation_id=cmd.correlation_id or gen_cor_id(), headers=cmd.headers, message_format=cmd.message_format, + destination=cmd.destination, serializer=self.broker.config.fd_config._serializer, codec=self.codec, ) @@ -222,6 +223,7 @@ async def request(self, cmd: "RedisPublishCommand") -> "PubSubMessage": correlation_id=cmd.correlation_id or gen_cor_id(), headers=cmd.headers, message_format=cmd.message_format, + destination=cmd.destination, serializer=self.broker.config.fd_config._serializer, codec=self.codec, ) @@ -302,6 +304,7 @@ async def build_message( message_format: type["MessageFormat"], reply_to: str = "", headers: dict[str, Any] | None = None, + destination: str = "", serializer: Optional["SerializerProto"] = None, codec: Optional["CodecProto"] = None, ) -> bytes: @@ -310,6 +313,7 @@ async def build_message( reply_to=reply_to, headers=headers, correlation_id=correlation_id, + destination=destination, serializer=serializer, codec=codec, ) diff --git a/tests/brokers/base/codec.py b/tests/brokers/base/codec.py index 3066528988..16b0bc01a4 100644 --- a/tests/brokers/base/codec.py +++ b/tests/brokers/base/codec.py @@ -103,9 +103,9 @@ async def test_codec_encode_called(self, queue: str) -> None: mock = MagicMock() class TrackingCodec(DefaultCodec): - async def encode(self, msg, serializer=None, destination=""): + async def encode(self, cmd, serializer=None): mock() - return await super().encode(msg, serializer, destination=destination) + return await super().encode(cmd, serializer) broker = self.get_broker(codec=TrackingCodec()) @@ -121,6 +121,9 @@ async def handle(m) -> None: assert mock.called, "codec.encode was not called on publish" async def test_default_codec_encode_matches_encode_message(self, queue: str) -> None: + from faststream.response.publish_type import PublishType + from faststream.response.response import PublishCommand + codec = DefaultCodec() test_cases = [ @@ -131,7 +134,9 @@ async def test_default_codec_encode_matches_encode_message(self, queue: str) -> ] for msg in test_cases: - codec_result = await codec.encode(msg, None, destination="test") + codec_result = await codec.encode( + PublishCommand(body=msg, destination="test", _publish_type=PublishType.PUBLISH), None + ) direct_result = encode_message(msg, None) assert codec_result == direct_result, ( f"DefaultCodec.encode({msg!r}) = {codec_result!r} " @@ -151,15 +156,22 @@ async def test_batch_codec_decode_batch_called( class TrackingBatchCodec(DefaultCodec): async def encode_batch( self, - msgs: Sequence[Any], + cmd, serializer: Any = None, - destination: str = "", ) -> list[tuple[bytes, str | None]]: + from faststream.response.response import PublishCommand + return [ await DefaultCodec.encode( - self, m, serializer, destination=destination + self, + PublishCommand( + body=body, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), + serializer, ) - for m in msgs + for body in cmd.batch_bodies ] async def decode_batch(self, msg: Any) -> list[Any]: @@ -188,16 +200,23 @@ async def test_batch_codec_encode_batch_called( class TrackingBatchCodec(DefaultCodec): async def encode_batch( self, - msgs: Sequence[Any], + cmd, serializer: Any = None, - destination: str = "", ) -> list[tuple[bytes, str | None]]: + from faststream.response.response import PublishCommand + encode_batch_mock() return [ await DefaultCodec.encode( - self, m, serializer, destination=destination + self, + PublishCommand( + body=body, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), + serializer, ) - for m in msgs + for body in cmd.batch_bodies ] async def decode_batch(self, msg: Any) -> list[Any]: From 1239f71c6e040d59262568ac5c95e34f35045a02 Mon Sep 17 00:00:00 2001 From: "daniel.sasu" Date: Thu, 4 Jun 2026 21:24:02 +0200 Subject: [PATCH 4/4] docs(codec): replace examples with Schema Registry codec --- .../en/getting-started/serialization/codec.md | 46 +++------ .../serialization/codec_gzip_confluent.py | 41 -------- .../serialization/codec_gzip_kafka.py | 41 -------- .../serialization/codec_gzip_mqtt.py | 41 -------- .../serialization/codec_gzip_nats.py | 41 -------- .../serialization/codec_gzip_rabbit.py | 41 -------- .../serialization/codec_gzip_redis.py | 41 -------- .../codec_schema_registry_kafka.py | 93 +++++++++++++++++++ faststream/confluent/publisher/producer.py | 6 +- faststream/confluent/testing.py | 4 +- faststream/kafka/publisher/producer.py | 6 +- faststream/kafka/testing.py | 4 +- faststream/rabbit/parser.py | 4 +- faststream/redis/parser/message.py | 5 +- tests/brokers/base/codec.py | 5 +- 15 files changed, 133 insertions(+), 286 deletions(-) delete mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_confluent.py delete mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_kafka.py delete mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py delete mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_nats.py delete mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py delete mode 100644 docs/docs_src/getting_started/serialization/codec_gzip_redis.py create mode 100644 docs/docs_src/getting_started/serialization/codec_schema_registry_kafka.py diff --git a/docs/docs/en/getting-started/serialization/codec.md b/docs/docs/en/getting-started/serialization/codec.md index 3e61071107..8e49f4534d 100644 --- a/docs/docs/en/getting-started/serialization/codec.md +++ b/docs/docs/en/getting-started/serialization/codec.md @@ -26,45 +26,25 @@ class CodecProto(Protocol): ) -> tuple[bytes, str | None]: ... ``` -- **`decode`** — receives a `StreamMessage` with raw bytes in `msg.body` and returns the decoded Python value. You can mutate `msg.body` before delegating to `decode_message`. -- **`encode`** — receives a `PublishCommand` with the outgoing message body and metadata, and an optional serializer. Returns a `(bytes, content_type)` tuple. -- **`cmd.destination`** — the target topic, subject, or queue name. Useful for codecs that need destination-specific behavior (e.g. Schema Registry topic-to-schema resolution). +- **`decode`** — receives a `StreamMessage` with raw bytes in `msg.body` and returns the decoded Python value. +- **`encode`** — receives a `PublishCommand` containing the message body, destination, and headers. Returns a `(bytes, content_type)` tuple. Access the payload via `cmd.body` and the target topic/subject/queue via `cmd.destination`. -If no codec is set, `DefaultCodec` is used automatically. It handles JSON objects, plain text, and raw bytes without any configuration. +If no codec is set, `DefaultCodec` is used automatically. It handles JSON objects, plain text, and raw bytes. -## Compression Example +## Example: Schema Registry -A Gzip codec that compresses outgoing messages and decompresses incoming ones: +A Confluent Avro codec that encodes and decodes messages using the [Confluent wire format](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format){target="_blank"} (magic byte + schema ID + Avro payload). Requires `fastavro` and `confluent-kafka`: -=== "AIOKafka" - ```python linenums="1" hl_lines="15-27 30" - {!> docs_src/getting_started/serialization/codec_gzip_kafka.py !} - ``` - -=== "Confluent" - ```python linenums="1" hl_lines="15-27 30" - {!> docs_src/getting_started/serialization/codec_gzip_confluent.py !} - ``` - -=== "RabbitMQ" - ```python linenums="1" hl_lines="15-27 30" - {!> docs_src/getting_started/serialization/codec_gzip_rabbit.py !} - ``` - -=== "NATS" - ```python linenums="1" hl_lines="15-27 30" - {!> docs_src/getting_started/serialization/codec_gzip_nats.py !} - ``` +```bash +pip install fastavro confluent-kafka +``` -=== "Redis" - ```python linenums="1" hl_lines="15-27 30" - {!> docs_src/getting_started/serialization/codec_gzip_redis.py !} - ``` +```python linenums="1" hl_lines="22-66 68-76" +{!> docs_src/getting_started/serialization/codec_schema_registry_kafka.py !} +``` -=== "MQTT" - ```python linenums="1" hl_lines="15-27 30" - {!> docs_src/getting_started/serialization/codec_gzip_mqtt.py !} - ``` +!!! note + The codec fetches and caches schemas from the registry at startup and on first encounter. The `subject` follows Confluent's naming convention: `{topic}-value`. ## Priority diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_confluent.py b/docs/docs_src/getting_started/serialization/codec_gzip_confluent.py deleted file mode 100644 index 40fb37dbb0..0000000000 --- a/docs/docs_src/getting_started/serialization/codec_gzip_confluent.py +++ /dev/null @@ -1,41 +0,0 @@ -import gzip -from typing import TYPE_CHECKING, Any - -from faststream import FastStream -from faststream.confluent import KafkaBroker -from faststream.message.utils import decode_message, encode_message - -if TYPE_CHECKING: - from fast_depends.library.serializer import SerializerProto - - from faststream._internal.basic_types import DecodedMessage - from faststream.message import StreamMessage - from faststream.response.response import PublishCommand - - -class GzipCodec: - async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": - msg.body = gzip.decompress(msg.body) - return decode_message(msg) - - async def encode( - self, - cmd: "PublishCommand", - serializer: "SerializerProto | None" = None, - ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(cmd.body, serializer) - return gzip.compress(raw_bytes), "application/gzip" - - -broker = KafkaBroker(codec=GzipCodec()) -app = FastStream(broker) - - -@broker.subscriber("test") -async def handle(body: str) -> None: - ... - - -@app.after_startup -async def test() -> None: - await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_kafka.py b/docs/docs_src/getting_started/serialization/codec_gzip_kafka.py deleted file mode 100644 index 425dccd820..0000000000 --- a/docs/docs_src/getting_started/serialization/codec_gzip_kafka.py +++ /dev/null @@ -1,41 +0,0 @@ -import gzip -from typing import TYPE_CHECKING, Any - -from faststream import FastStream -from faststream.kafka import KafkaBroker -from faststream.message.utils import decode_message, encode_message - -if TYPE_CHECKING: - from fast_depends.library.serializer import SerializerProto - - from faststream._internal.basic_types import DecodedMessage - from faststream.message import StreamMessage - from faststream.response.response import PublishCommand - - -class GzipCodec: - async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": - msg.body = gzip.decompress(msg.body) - return decode_message(msg) - - async def encode( - self, - cmd: "PublishCommand", - serializer: "SerializerProto | None" = None, - ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(cmd.body, serializer) - return gzip.compress(raw_bytes), "application/gzip" - - -broker = KafkaBroker(codec=GzipCodec()) -app = FastStream(broker) - - -@broker.subscriber("test") -async def handle(body: str) -> None: - ... - - -@app.after_startup -async def test() -> None: - await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py b/docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py deleted file mode 100644 index a1b81880e1..0000000000 --- a/docs/docs_src/getting_started/serialization/codec_gzip_mqtt.py +++ /dev/null @@ -1,41 +0,0 @@ -import gzip -from typing import TYPE_CHECKING, Any - -from faststream import FastStream -from faststream.mqtt import MQTTBroker -from faststream.message.utils import decode_message, encode_message - -if TYPE_CHECKING: - from fast_depends.library.serializer import SerializerProto - - from faststream._internal.basic_types import DecodedMessage - from faststream.message import StreamMessage - from faststream.response.response import PublishCommand - - -class GzipCodec: - async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": - msg.body = gzip.decompress(msg.body) - return decode_message(msg) - - async def encode( - self, - cmd: "PublishCommand", - serializer: "SerializerProto | None" = None, - ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(cmd.body, serializer) - return gzip.compress(raw_bytes), "application/gzip" - - -broker = MQTTBroker(codec=GzipCodec()) -app = FastStream(broker) - - -@broker.subscriber("test") -async def handle(body: str) -> None: - ... - - -@app.after_startup -async def test() -> None: - await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_nats.py b/docs/docs_src/getting_started/serialization/codec_gzip_nats.py deleted file mode 100644 index 3757c5fded..0000000000 --- a/docs/docs_src/getting_started/serialization/codec_gzip_nats.py +++ /dev/null @@ -1,41 +0,0 @@ -import gzip -from typing import TYPE_CHECKING, Any - -from faststream import FastStream -from faststream.nats import NatsBroker -from faststream.message.utils import decode_message, encode_message - -if TYPE_CHECKING: - from fast_depends.library.serializer import SerializerProto - - from faststream._internal.basic_types import DecodedMessage - from faststream.message import StreamMessage - from faststream.response.response import PublishCommand - - -class GzipCodec: - async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": - msg.body = gzip.decompress(msg.body) - return decode_message(msg) - - async def encode( - self, - cmd: "PublishCommand", - serializer: "SerializerProto | None" = None, - ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(cmd.body, serializer) - return gzip.compress(raw_bytes), "application/gzip" - - -broker = NatsBroker(codec=GzipCodec()) -app = FastStream(broker) - - -@broker.subscriber("test") -async def handle(body: str) -> None: - ... - - -@app.after_startup -async def test() -> None: - await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py b/docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py deleted file mode 100644 index 38861f75cb..0000000000 --- a/docs/docs_src/getting_started/serialization/codec_gzip_rabbit.py +++ /dev/null @@ -1,41 +0,0 @@ -import gzip -from typing import TYPE_CHECKING, Any - -from faststream import FastStream -from faststream.rabbit import RabbitBroker -from faststream.message.utils import decode_message, encode_message - -if TYPE_CHECKING: - from fast_depends.library.serializer import SerializerProto - - from faststream._internal.basic_types import DecodedMessage - from faststream.message import StreamMessage - from faststream.response.response import PublishCommand - - -class GzipCodec: - async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": - msg.body = gzip.decompress(msg.body) - return decode_message(msg) - - async def encode( - self, - cmd: "PublishCommand", - serializer: "SerializerProto | None" = None, - ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(cmd.body, serializer) - return gzip.compress(raw_bytes), "application/gzip" - - -broker = RabbitBroker(codec=GzipCodec()) -app = FastStream(broker) - - -@broker.subscriber("test") -async def handle(body: str) -> None: - ... - - -@app.after_startup -async def test() -> None: - await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_gzip_redis.py b/docs/docs_src/getting_started/serialization/codec_gzip_redis.py deleted file mode 100644 index 02136652ce..0000000000 --- a/docs/docs_src/getting_started/serialization/codec_gzip_redis.py +++ /dev/null @@ -1,41 +0,0 @@ -import gzip -from typing import TYPE_CHECKING, Any - -from faststream import FastStream -from faststream.redis import RedisBroker -from faststream.message.utils import decode_message, encode_message - -if TYPE_CHECKING: - from fast_depends.library.serializer import SerializerProto - - from faststream._internal.basic_types import DecodedMessage - from faststream.message import StreamMessage - from faststream.response.response import PublishCommand - - -class GzipCodec: - async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": - msg.body = gzip.decompress(msg.body) - return decode_message(msg) - - async def encode( - self, - cmd: "PublishCommand", - serializer: "SerializerProto | None" = None, - ) -> tuple[bytes, str | None]: - raw_bytes, _ = encode_message(cmd.body, serializer) - return gzip.compress(raw_bytes), "application/gzip" - - -broker = RedisBroker(codec=GzipCodec()) -app = FastStream(broker) - - -@broker.subscriber("test") -async def handle(body: str) -> None: - ... - - -@app.after_startup -async def test() -> None: - await broker.publish("hello", "test") diff --git a/docs/docs_src/getting_started/serialization/codec_schema_registry_kafka.py b/docs/docs_src/getting_started/serialization/codec_schema_registry_kafka.py new file mode 100644 index 0000000000..ad23d6bf4f --- /dev/null +++ b/docs/docs_src/getting_started/serialization/codec_schema_registry_kafka.py @@ -0,0 +1,93 @@ +import io +import json +import struct +from typing import TYPE_CHECKING, Any, Dict + +import fastavro +from confluent_kafka.schema_registry import SchemaRegistryClient + +from faststream import FastStream +from faststream.kafka import KafkaBroker + +if TYPE_CHECKING: + from fast_depends.library.serializer import SerializerProto + + from faststream._internal.basic_types import DecodedMessage + from faststream.message import StreamMessage + from faststream.response.response import PublishCommand + +HEADER = struct.Struct(">bI") # magic byte (0) + 4-byte schema ID + + +class SchemaRegistryCodec: + def __init__( + self, + registry_url: str, + topics: Dict[str, int], + ) -> None: + self._client = SchemaRegistryClient({"url": registry_url}) + self._schema_cache: Dict[int, Any] = {} + self._topic_schemas: Dict[str, tuple[int, Any]] = {} + + for topic, version in topics.items(): + subject = f"{topic}-value" + meta = self._client.get_version(subject, version) + schema = fastavro.parse_schema(json.loads(meta.schema.schema_str)) + self._topic_schemas[topic] = (meta.schema_id, schema) + self._schema_cache[meta.schema_id] = schema + + def _get_schema(self, schema_id: int) -> Any: + if schema_id not in self._schema_cache: + raw = self._client.get_schema(schema_id) + self._schema_cache[schema_id] = fastavro.parse_schema( + json.loads(raw.schema_str) + ) + return self._schema_cache[schema_id] + + async def decode(self, msg: "StreamMessage[Any]") -> "DecodedMessage": + schema_id = int.from_bytes(msg.body[1:5], byteorder="big") + schema = self._get_schema(schema_id) + decoded: dict[str, Any] = fastavro.schemaless_reader( + io.BytesIO(msg.body[5:]), schema + ) + return decoded # type: ignore[return-value] + + async def encode( + self, + cmd: "PublishCommand", + serializer: "SerializerProto | None" = None, + ) -> tuple[bytes, str | None]: + schema_id, schema = self._topic_schemas[cmd.destination] + body = cmd.body + data = body.model_dump(mode="json") if hasattr(body, "model_dump") else body + buf = io.BytesIO() + buf.write(HEADER.pack(0, schema_id)) + fastavro.schemaless_writer(buf, schema, data) + return buf.getvalue(), "application/avro" + + +codec = SchemaRegistryCodec( + registry_url="http://localhost:8081", + topics={ + "orders": 1, + "users": 2, + }, +) +broker = KafkaBroker(codec=codec) +app = FastStream(broker) + + +@broker.subscriber("orders") +async def handle_order(body: dict[str, Any]) -> None: + ... + + +@broker.subscriber("users") +async def handle_user(body: dict[str, Any]) -> None: + ... + + +@app.after_startup +async def test() -> None: + await broker.publish({"order_id": "123", "amount": 99.99}, "orders") + await broker.publish({"name": "John", "age": 25}, "users") diff --git a/faststream/confluent/publisher/producer.py b/faststream/confluent/publisher/producer.py index fee74a9428..a84d6668a0 100644 --- a/faststream/confluent/publisher/producer.py +++ b/faststream/confluent/publisher/producer.py @@ -170,7 +170,11 @@ async def publish_batch(self, cmd: "KafkaPublishCommand") -> None: encoded_batch = [ await self.codec.encode( - _BaseCmd(body=msg, destination=cmd.destination, _publish_type=cmd.publish_type), + _BaseCmd( + body=msg, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), self.serializer, ) for msg in cmd.batch_bodies diff --git a/faststream/confluent/testing.py b/faststream/confluent/testing.py index 1a2d9948dd..199973ad75 100644 --- a/faststream/confluent/testing.py +++ b/faststream/confluent/testing.py @@ -324,7 +324,9 @@ async def build_message( from faststream.response.response import PublishCommand as _BaseCmd codec_instance = codec or DefaultCodec() - publish_cmd = _BaseCmd(body=message, destination=topic, _publish_type=PublishType.PUBLISH) + publish_cmd = _BaseCmd( + body=message, destination=topic, _publish_type=PublishType.PUBLISH + ) msg, content_type = await codec_instance.encode(publish_cmd, serializer) k = key or b"" headers = { diff --git a/faststream/kafka/publisher/producer.py b/faststream/kafka/publisher/producer.py index 3ad86e2b22..6a9d95d4ee 100644 --- a/faststream/kafka/publisher/producer.py +++ b/faststream/kafka/publisher/producer.py @@ -147,7 +147,11 @@ async def publish_batch( encoded_batch = [ await self.codec.encode( - _BaseCmd(body=body, destination=cmd.destination, _publish_type=cmd.publish_type), + _BaseCmd( + body=body, + destination=cmd.destination, + _publish_type=cmd.publish_type, + ), self.serializer, ) for body in cmd.batch_bodies diff --git a/faststream/kafka/testing.py b/faststream/kafka/testing.py index c179873690..336b6c53d8 100755 --- a/faststream/kafka/testing.py +++ b/faststream/kafka/testing.py @@ -287,7 +287,9 @@ async def build_message( from faststream.response.publish_type import PublishType from faststream.response.response import PublishCommand as _BaseCmd - publish_cmd = _BaseCmd(body=message, destination=topic, _publish_type=PublishType.PUBLISH) + publish_cmd = _BaseCmd( + body=message, destination=topic, _publish_type=PublishType.PUBLISH + ) msg, content_type = await (codec or DefaultCodec()).encode(publish_cmd, serializer) k = key or b"" diff --git a/faststream/rabbit/parser.py b/faststream/rabbit/parser.py index 226a41f351..0437ad63e3 100644 --- a/faststream/rabbit/parser.py +++ b/faststream/rabbit/parser.py @@ -84,7 +84,9 @@ async def encode_message( from faststream.response.publish_type import PublishType from faststream.response.response import PublishCommand as _BaseCmd - publish_cmd = _BaseCmd(body=message, destination=destination, _publish_type=PublishType.PUBLISH) + publish_cmd = _BaseCmd( + body=message, destination=destination, _publish_type=PublishType.PUBLISH + ) message_body, generated_content_type = await (codec or DefaultCodec()).encode( publish_cmd, serializer ) diff --git a/faststream/redis/parser/message.py b/faststream/redis/parser/message.py index 3496518ab9..759b58998b 100644 --- a/faststream/redis/parser/message.py +++ b/faststream/redis/parser/message.py @@ -43,7 +43,9 @@ async def build( from faststream.response.response import PublishCommand as _BaseCmd codec_instance = codec or DefaultCodec() - publish_cmd = _BaseCmd(body=message, destination=destination, _publish_type=PublishType.PUBLISH) + publish_cmd = _BaseCmd( + body=message, destination=destination, _publish_type=PublishType.PUBLISH + ) payload, content_type = await codec_instance.encode(publish_cmd, serializer) headers_to_send = { @@ -73,6 +75,7 @@ async def encode( reply_to: str | None, headers: dict[str, Any] | None, correlation_id: str, + destination: str = "", serializer: Optional["SerializerProto"] = None, codec: Optional["CodecProto"] = None, ) -> bytes: diff --git a/tests/brokers/base/codec.py b/tests/brokers/base/codec.py index 16b0bc01a4..d529d90dd3 100644 --- a/tests/brokers/base/codec.py +++ b/tests/brokers/base/codec.py @@ -135,7 +135,10 @@ async def test_default_codec_encode_matches_encode_message(self, queue: str) -> for msg in test_cases: codec_result = await codec.encode( - PublishCommand(body=msg, destination="test", _publish_type=PublishType.PUBLISH), None + PublishCommand( + body=msg, destination="test", _publish_type=PublishType.PUBLISH + ), + None, ) direct_result = encode_message(msg, None) assert codec_result == direct_result, (