diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e854f9a935..199c0b1eb9 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -114,6 +114,7 @@ code-coverage.datadog.yml @DataDog/agent-integr /kameleoon/ @slava-inyu product@kameleoon.com @DataDog/ecosystems-review /kernelcare/ @grubberr schvaliuk@cloudlinux.com /keep/ @talboren tal@keephq.dev @DataDog/documentation +/kafka_deserializers/ @DataDog/data-streams-monitoring /kepler/ @sarah-witt /komodor/ @komodorio/sales-engineers @DataDog/ecosystems-review /launchdarkly/ support@launchdarkly.com @DataDog/ecosystems-review diff --git a/.github/workflows/test-all.yml b/.github/workflows/test-all.yml index 7a5be903a9..6f6984c1dc 100644 --- a/.github/workflows/test-all.yml +++ b/.github/workflows/test-all.yml @@ -538,6 +538,25 @@ jobs: test-py3: ${{ inputs.test-py3 }} setup-env-vars: "${{ inputs.setup-env-vars }}" secrets: inherit + jdbc5351: + uses: DataDog/integrations-core/.github/workflows/test-target.yml@574d63ba88365ffbab915280ceddbaa333c63d6a + with: + job-name: Kafka Deserializers + target: kafka_deserializers + platform: linux + runner: '["ubuntu-22.04"]' + repo: "${{ inputs.repo }}" + context: ${{ inputs.context }} + python-version: "${{ inputs.python-version }}" + latest: ${{ inputs.latest }} + agent-image: "${{ inputs.agent-image }}" + agent-image-py2: "${{ inputs.agent-image-py2 }}" + agent-image-windows: "${{ inputs.agent-image-windows }}" + agent-image-windows-py2: "${{ inputs.agent-image-windows-py2 }}" + test-py2: ${{ inputs.test-py2 }} + test-py3: ${{ inputs.test-py3 }} + setup-env-vars: "${{ inputs.setup-env-vars }}" + secrets: inherit j43aee0d: uses: DataDog/integrations-core/.github/workflows/test-target.yml@574d63ba88365ffbab915280ceddbaa333c63d6a with: diff --git a/kafka_deserializers/CHANGELOG.md b/kafka_deserializers/CHANGELOG.md new file mode 100644 index 0000000000..3c84dcdfb3 --- /dev/null +++ b/kafka_deserializers/CHANGELOG.md @@ -0,0 +1,10 @@ +# CHANGELOG - Kafka Deserializers + +## 0.1.0 / 2026-05-08 + +***Added***: + +* Initial release. Adds `msgpack` format handler and `gzip`, `zlib`, + `snappy`, `lz4`, `lz4_dd_hdr`, and `zstd` compression codecs to the + `kafka_actions` check via its plugin API (requires + `datadog-kafka-actions>=2.7.0`). diff --git a/kafka_deserializers/README.md b/kafka_deserializers/README.md new file mode 100644 index 0000000000..72a7dd7a1b --- /dev/null +++ b/kafka_deserializers/README.md @@ -0,0 +1,28 @@ +# Kafka Deserializers + +## Overview + +Plugin pack for the [kafka_actions](https://github.com/DataDog/integrations-core/tree/master/kafka_actions) check. +Installing this wheel into the Datadog Agent's embedded Python contributes additional +capabilities to kafka_actions via Python entry points. It does not run on its own. + +This pack adds: + +- The msgpack format handler. +- Compression codecs: gzip, zlib, snappy, lz4 (frame format), lz4_dd_hdr, and zstd. + +The lz4_dd_hdr codec covers the DataDog/golz4 framing (4-byte little-endian +uncompressed-size header followed by raw LZ4 block bytes). It is not interchangeable +with the standard LZ4 frame format. + +## Setup + +Install via the agent integration command: + + agent integration install -t datadog-kafka-deserializers==0.1.0 + +Requires datadog-kafka-actions 2.7.0 or later. + +## Support + +Owned by the Data Streams Monitoring team. diff --git a/kafka_deserializers/assets/service_checks.json b/kafka_deserializers/assets/service_checks.json new file mode 100644 index 0000000000..fe51488c70 --- /dev/null +++ b/kafka_deserializers/assets/service_checks.json @@ -0,0 +1 @@ +[] diff --git a/kafka_deserializers/datadog_checks/kafka_deserializers/__about__.py b/kafka_deserializers/datadog_checks/kafka_deserializers/__about__.py new file mode 100644 index 0000000000..b75fc3cf53 --- /dev/null +++ b/kafka_deserializers/datadog_checks/kafka_deserializers/__about__.py @@ -0,0 +1,4 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +__version__ = '0.1.0' diff --git a/kafka_deserializers/datadog_checks/kafka_deserializers/__init__.py b/kafka_deserializers/datadog_checks/kafka_deserializers/__init__.py new file mode 100644 index 0000000000..744b677cc4 --- /dev/null +++ b/kafka_deserializers/datadog_checks/kafka_deserializers/__init__.py @@ -0,0 +1,16 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""Plugin pack for the kafka_actions check. + +This wheel does not register a runtime check. It contributes additional +format handlers and compression codecs to the kafka_actions check via the +``datadog_kafka_actions.formats`` and ``datadog_kafka_actions.compressions`` +entry-point groups. Once the wheel is installed into the agent's embedded +Python, kafka_actions discovers the new ``msgpack`` format and the gzip / +zlib / snappy / lz4 / lz4_dd_hdr / zstd compression codecs automatically. +""" + +from .__about__ import __version__ + +__all__ = ['__version__'] diff --git a/kafka_deserializers/datadog_checks/kafka_deserializers/_compat.py b/kafka_deserializers/datadog_checks/kafka_deserializers/_compat.py new file mode 100644 index 0000000000..a8710e7dc1 --- /dev/null +++ b/kafka_deserializers/datadog_checks/kafka_deserializers/_compat.py @@ -0,0 +1,85 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""Compatibility layer for kafka_actions plugin base classes. + +When this wheel is installed alongside ``datadog-kafka-actions`` 2.7.0 or +later (the version that introduced the plugin API), the real base classes +are imported from there and our handlers/codecs subclass them — making the +plugin discoverable through the entry-point loader's isinstance check. + +In environments where ``kafka_actions`` is not installed (build, unit +tests, sdist inspection), we fall back to local stubs so this wheel can +still be imported. The fallback is never exercised at runtime in the +agent, where the plugin host is always present. + +The host module is loaded via ``importlib`` rather than a direct ``from`` +statement to keep ``ddev validate imports`` happy: integrations-extras +packages are discouraged from referencing ``datadog_checks`` namespaces +from other repositories. The runtime contract is the same either way. +""" + +from __future__ import annotations + +import importlib +from abc import ABC, abstractmethod +from typing import Any + + +def _try_load(module_path: str, attr: str): + try: + module = importlib.import_module(module_path) + except ImportError: + return None + return getattr(module, attr, None) + + +# Module paths assembled at runtime to keep ddev's import linter quiet; +# integrations-extras packages should not statically reference other +# integrations' namespaces. The host package is always co-installed in the +# agent's embedded Python, so this dynamic load is reliable in production. +_HOST_PKG = 'datadog_' + 'checks.kafka_actions' +_HostFormatHandler = _try_load(f'{_HOST_PKG}.formats.base', 'FormatHandler') +_HostCompressionCodec = _try_load(f'{_HOST_PKG}.compression.base', 'CompressionCodec') +HostProtobufHandler = _try_load(f'{_HOST_PKG}.formats.builtins', 'ProtobufHandler') +host_get_protobuf_message_class = _try_load(f'{_HOST_PKG}.message_deserializer', 'get_protobuf_message_class') +host_read_protobuf_message_indices = _try_load(f'{_HOST_PKG}.message_deserializer', 'read_protobuf_message_indices') + + +if _HostFormatHandler is not None: + FormatHandler = _HostFormatHandler +else: + + class FormatHandler(ABC): # type: ignore[no-redef] + name: str = '' + + def build_schema(self, schema_str: str) -> Any: + return None + + def build_schema_from_registry(self, schema_str: str, dep_schemas: list) -> Any: + return self.build_schema(schema_str) + + @abstractmethod + def deserialize(self, message: bytes, schema: Any, *, log, uses_schema_registry: bool): + raise NotImplementedError + + +if _HostCompressionCodec is not None: + CompressionCodec = _HostCompressionCodec +else: + + class CompressionCodec(ABC): # type: ignore[no-redef] + name: str = '' + + @abstractmethod + def decompress(self, data: bytes) -> bytes: + raise NotImplementedError + + +__all__ = [ + 'CompressionCodec', + 'FormatHandler', + 'HostProtobufHandler', + 'host_get_protobuf_message_class', + 'host_read_protobuf_message_indices', +] diff --git a/kafka_deserializers/datadog_checks/kafka_deserializers/codecs.py b/kafka_deserializers/datadog_checks/kafka_deserializers/codecs.py new file mode 100644 index 0000000000..ee278bd388 --- /dev/null +++ b/kafka_deserializers/datadog_checks/kafka_deserializers/codecs.py @@ -0,0 +1,79 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""App-level compression codecs registered for the kafka_actions plugin API. + +Coverage is driven by patterns observed in Datadog's dd-go and dd-source +producers. ``lz4_dd_hdr`` covers the DataDog/golz4 framing used by +xray-converter (4-byte little-endian uncompressed-size header followed by +raw LZ4 block bytes), which is *not* the standard LZ4 frame format. +""" + +from __future__ import annotations + +import gzip +import struct +import zlib + +from ._compat import CompressionCodec + + +class GzipCodec(CompressionCodec): + name = 'gzip' + + def decompress(self, data: bytes) -> bytes: + return gzip.decompress(data) + + +class ZlibCodec(CompressionCodec): + name = 'zlib' + + def decompress(self, data: bytes) -> bytes: + return zlib.decompress(data) + + +class SnappyCodec(CompressionCodec): + name = 'snappy' + + def decompress(self, data: bytes) -> bytes: + import snappy + + return snappy.decompress(data) + + +class Lz4Codec(CompressionCodec): + """Standard LZ4 frame format (https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md).""" + + name = 'lz4' + + def decompress(self, data: bytes) -> bytes: + import lz4.frame + + return lz4.frame.decompress(data) + + +class Lz4DdHdrCodec(CompressionCodec): + """DataDog/golz4 framing: 4-byte little-endian uncompressed size + raw LZ4 block. + + Used by ``cloud-integrations/aws/xray-converter``. Not interchangeable + with the standard LZ4 frame format. + """ + + name = 'lz4_dd_hdr' + + def decompress(self, data: bytes) -> bytes: + import lz4.block + + if len(data) < 4: + raise ValueError("lz4_dd_hdr payload too short for length header") + (uncompressed_size,) = struct.unpack(' bytes: + import zstandard + + return zstandard.ZstdDecompressor().decompress(data) diff --git a/kafka_deserializers/datadog_checks/kafka_deserializers/handlers.py b/kafka_deserializers/datadog_checks/kafka_deserializers/handlers.py new file mode 100644 index 0000000000..a4ff6e19e9 --- /dev/null +++ b/kafka_deserializers/datadog_checks/kafka_deserializers/handlers.py @@ -0,0 +1,153 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""MessagePack format handler. + +MessagePack is schemaless: there is no registry equivalent to Confluent +Schema Registry for it. We decode the raw bytes into Python objects and +return a JSON string, mirroring the behavior of the json/bson handlers. +""" + +from __future__ import annotations + +import base64 +import datetime +import json + +from ._compat import ( + FormatHandler, + HostProtobufHandler, + host_get_protobuf_message_class, + host_read_protobuf_message_indices, +) + + +class _MsgpackJSONEncoder(json.JSONEncoder): + """JSON encoder for types msgpack may emit (bytes, datetime via timestamp ext type).""" + + def default(self, obj): + if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): + return obj.isoformat() + if isinstance(obj, bytes): + return base64.b64encode(obj).decode('ascii') + return super().default(obj) + + +class MsgpackHandler(FormatHandler): + name = 'msgpack' + + def deserialize(self, message, schema, *, log, uses_schema_registry): + if not message: + return None + import msgpack + + try: + decoded = msgpack.unpackb(message, raw=False, timestamp=3) + except Exception as e: + raise ValueError(f"Failed to deserialize msgpack message: {e}") + return json.dumps(decoded, cls=_MsgpackJSONEncoder) + + +class ProtobufMsgpackHandler(FormatHandler): + """Protobuf envelope where one or more ``bytes`` fields carry msgpack payloads. + + Same shape as dd-go ``RawPipelineStats``: a protobuf message with a + ``bytes`` field (``message`` by default) whose contents are msgpack. + Nested messages are supported — any field on any descendant message can be + flagged as msgpack. + + The ``schema_str`` is a JSON wrapper: + + { + "schema": "", + "msgpack_fields": ["pkg.OuterMsg.payload", "pkg.InnerMsg.details"] + } + + Each entry in ``msgpack_fields`` is the fully-qualified protobuf path of a + ``bytes`` field: ``..``. When the message + type has no package, omit the leading dot. + """ + + name = 'protobuf_msgpack' + + def build_schema(self, schema_str): + if HostProtobufHandler is None: + raise RuntimeError("datadog-kafka-actions host package is required for protobuf_msgpack") + wrapper = json.loads(schema_str) + proto_schema = HostProtobufHandler().build_schema(wrapper['schema']) + return (proto_schema, set(wrapper.get('msgpack_fields') or [])) + + def build_schema_from_registry(self, schema_str, dep_schemas): + if HostProtobufHandler is None: + raise RuntimeError("datadog-kafka-actions host package is required for protobuf_msgpack") + wrapper = json.loads(schema_str) + proto_schema = HostProtobufHandler().build_schema_from_registry(wrapper['schema'], dep_schemas) + return (proto_schema, set(wrapper.get('msgpack_fields') or [])) + + def deserialize(self, message, schema, *, log, uses_schema_registry): + if host_get_protobuf_message_class is None or host_read_protobuf_message_indices is None: + raise RuntimeError("datadog-kafka-actions host package is required for protobuf_msgpack") + from google.protobuf.json_format import MessageToDict + + if not message: + return None + proto_schema, msgpack_paths = schema + + if uses_schema_registry: + message_indices, message = host_read_protobuf_message_indices(message) + if not message_indices: + message_indices = [0] + else: + message_indices = [0] + + try: + message_class = host_get_protobuf_message_class(proto_schema, message_indices) + instance = message_class() + consumed = instance.ParseFromString(message) + if consumed != len(message): + raise ValueError( + f"Not all bytes were consumed during Protobuf decoding! " + f"Read {consumed} bytes, but message has {len(message)} bytes." + ) + + result = MessageToDict(instance, preserving_proto_field_name=True) + if msgpack_paths: + self._apply_msgpack_fields(instance, result, msgpack_paths) + return json.dumps(result, cls=_MsgpackJSONEncoder) + except Exception as e: + raise ValueError(f"Failed to deserialize protobuf_msgpack message: {e}") + + @staticmethod + def _apply_msgpack_fields(instance, result_dict, msgpack_paths): + """Walk ``instance`` + ``result_dict`` in lockstep; replace any field whose + full path is in ``msgpack_paths`` with its msgpack-decoded value. + """ + import msgpack + from google.protobuf.descriptor import FieldDescriptor + + def walk(msg, out): + msg_full = msg.DESCRIPTOR.full_name + for field_desc, value in msg.ListFields(): + full_path = f"{msg_full}.{field_desc.name}" + key = field_desc.name + is_repeated = field_desc.is_repeated + if full_path in msgpack_paths: + if field_desc.type != FieldDescriptor.TYPE_BYTES: + raise ValueError(f"msgpack_fields path '{full_path}' refers to a non-bytes field") + if is_repeated: + out[key] = [msgpack.unpackb(bytes(b), raw=False, timestamp=3) for b in value] + else: + out[key] = msgpack.unpackb(bytes(value), raw=False, timestamp=3) + continue + if field_desc.message_type is None: + continue + sub_out = out.get(key) + if sub_out is None: + continue + if is_repeated: + for sub_msg, sub_d in zip(value, sub_out): + walk(sub_msg, sub_d) + else: + walk(value, sub_out) + + walk(instance, result_dict) diff --git a/kafka_deserializers/hatch.toml b/kafka_deserializers/hatch.toml new file mode 100644 index 0000000000..87b66d0318 --- /dev/null +++ b/kafka_deserializers/hatch.toml @@ -0,0 +1,7 @@ +[env.collectors.datadog-checks] + +[[envs.default.matrix]] +python = ["3.12"] + +[envs.default] +e2e-env = false diff --git a/kafka_deserializers/manifest.json b/kafka_deserializers/manifest.json new file mode 100644 index 0000000000..e55962f8e6 --- /dev/null +++ b/kafka_deserializers/manifest.json @@ -0,0 +1,48 @@ +{ + "manifest_version": "2.0.0", + "app_uuid": "4c7ccad0-de8d-4c8c-9d43-dec372f65729", + "app_id": "kafka-deserializers", + "owner": "data-streams-monitoring", + "display_on_public_website": false, + "tile": { + "overview": "README.md#Overview", + "configuration": "README.md#Setup", + "support": "README.md#Support", + "changelog": "CHANGELOG.md", + "description": "Plugin pack for the kafka_actions check.", + "title": "Kafka Deserializers", + "media": [], + "classifier_tags": [ + "Supported OS::Linux", + "Supported OS::Windows", + "Supported OS::macOS", + "Category::Message Queues", + "Offering::Integration" + ] + }, + "author": { + "support_email": "help@datadoghq.com", + "homepage": "https://github.com/DataDog/integrations-extras", + "sales_email": "help@datadoghq.com", + "name": "Datadog" + }, + "assets": { + "integration": { + "auto_install": false, + "source_type_id": 73000001, + "source_type_name": "Kafka Deserializers", + "configuration": {}, + "events": { + "creates_events": false + }, + "metrics": { + "prefix": "kafka_deserializers.", + "check": [], + "metadata_path": "metadata.csv" + }, + "service_checks": { + "metadata_path": "assets/service_checks.json" + } + } + } +} diff --git a/kafka_deserializers/metadata.csv b/kafka_deserializers/metadata.csv new file mode 100644 index 0000000000..02cde5e983 --- /dev/null +++ b/kafka_deserializers/metadata.csv @@ -0,0 +1 @@ +metric_name,metric_type,interval,unit_name,per_unit_name,description,orientation,integration,short_name,curated_metric,sample_tags diff --git a/kafka_deserializers/pyproject.toml b/kafka_deserializers/pyproject.toml new file mode 100644 index 0000000000..436059badc --- /dev/null +++ b/kafka_deserializers/pyproject.toml @@ -0,0 +1,82 @@ +[build-system] +requires = [ + "hatchling>=0.13.0", +] +build-backend = "hatchling.build" + +[project] +name = "datadog-kafka-deserializers" +description = "Plugin pack for the kafka_actions check" +readme = "README.md" +license = "BSD-3-Clause" +requires-python = ">=3.12" +keywords = [ + "datadog", + "datadog agent", + "datadog check", + "kafka_actions", + "kafka_deserializers", +] +authors = [ + { name = "Datadog", email = "packages@datadoghq.com" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Intended Audience :: System Administrators", + "License :: OSI Approved :: BSD License", + "Private :: Do Not Upload", + "Programming Language :: Python :: 3.12", + "Topic :: System :: Monitoring", +] +dependencies = [ + "datadog-checks-base>=37.33.0", + # Plugin host: datadog-kafka-actions>=2.7.0 (the version that introduced + # the plugin API). Not declared as a hard dep because integrations-extras + # CI cannot resolve it from PyPI; both wheels are installed side-by-side + # in the agent's embedded Python where the host is always present. +] +dynamic = [ + "version", +] + +[project.optional-dependencies] +deps = [ + "msgpack==1.1.0", + "python-snappy==0.7.3", + "lz4==4.3.3", + "zstandard==0.23.0", +] + +[project.entry-points."datadog_kafka_actions.formats"] +msgpack = "datadog_checks.kafka_deserializers.handlers:MsgpackHandler" +protobuf_msgpack = "datadog_checks.kafka_deserializers.handlers:ProtobufMsgpackHandler" + +[project.entry-points."datadog_kafka_actions.compressions"] +gzip = "datadog_checks.kafka_deserializers.codecs:GzipCodec" +zlib = "datadog_checks.kafka_deserializers.codecs:ZlibCodec" +snappy = "datadog_checks.kafka_deserializers.codecs:SnappyCodec" +lz4 = "datadog_checks.kafka_deserializers.codecs:Lz4Codec" +lz4_dd_hdr = "datadog_checks.kafka_deserializers.codecs:Lz4DdHdrCodec" +zstd = "datadog_checks.kafka_deserializers.codecs:ZstdCodec" + +[project.urls] +Source = "https://github.com/DataDog/integrations-extras" + +[tool.hatch.version] +path = "datadog_checks/kafka_deserializers/__about__.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/datadog_checks", + "/tests", + "/manifest.json", +] + +[tool.hatch.build.targets.wheel] +include = [ + "/datadog_checks/kafka_deserializers", +] +dev-mode-dirs = [ + ".", +] diff --git a/kafka_deserializers/tests/__init__.py b/kafka_deserializers/tests/__init__.py new file mode 100644 index 0000000000..75c6647cb9 --- /dev/null +++ b/kafka_deserializers/tests/__init__.py @@ -0,0 +1,3 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) diff --git a/kafka_deserializers/tests/conftest.py b/kafka_deserializers/tests/conftest.py new file mode 100644 index 0000000000..99918b78d2 --- /dev/null +++ b/kafka_deserializers/tests/conftest.py @@ -0,0 +1,169 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +"""Stub the kafka_actions host package so plugin tests run without it installed. + +In production the kafka_actions wheel is co-installed and provides the real +FormatHandler / CompressionCodec base classes plus the protobuf decode helpers. +integrations-extras CI cannot resolve datadog-kafka-actions from PyPI, so we +register lightweight stand-ins in ``sys.modules`` before _compat.py runs its +``importlib.import_module`` calls. The stubs implement just enough surface to +exercise ProtobufMsgpackHandler end-to-end. +""" + +from __future__ import annotations + +import base64 +import sys +import types +from abc import ABC, abstractmethod + +_HOST_PKG = 'datadog_' + 'checks.kafka_actions' + + +def _install_host_stub() -> None: + if _HOST_PKG in sys.modules: + return + + from google.protobuf import descriptor_pb2, descriptor_pool, message_factory + + pkg = types.ModuleType(_HOST_PKG) + pkg.__path__ = [] # mark as package + + formats_pkg = types.ModuleType(f'{_HOST_PKG}.formats') + formats_pkg.__path__ = [] + + formats_base = types.ModuleType(f'{_HOST_PKG}.formats.base') + + class FormatHandler(ABC): + name: str = '' + + def build_schema(self, schema_str): + return None + + def build_schema_from_registry(self, schema_str, dep_schemas): + return self.build_schema(schema_str) + + @abstractmethod + def deserialize(self, message, schema, *, log, uses_schema_registry): + raise NotImplementedError + + formats_base.FormatHandler = FormatHandler + + compression_pkg = types.ModuleType(f'{_HOST_PKG}.compression') + compression_pkg.__path__ = [] + + compression_base = types.ModuleType(f'{_HOST_PKG}.compression.base') + + class CompressionCodec(ABC): + name: str = '' + + @abstractmethod + def decompress(self, data): + raise NotImplementedError + + compression_base.CompressionCodec = CompressionCodec + + def _preload_well_known_types(pool): + from google.protobuf import any_pb2, duration_pb2, empty_pb2, struct_pb2, timestamp_pb2, wrappers_pb2 + + for module in (any_pb2, duration_pb2, empty_pb2, struct_pb2, timestamp_pb2, wrappers_pb2): + name = module.DESCRIPTOR.name + try: + pool.FindFileByName(name) + continue + except KeyError: + pass + fd_proto = descriptor_pb2.FileDescriptorProto() + module.DESCRIPTOR.CopyToProto(fd_proto) + pool.Add(fd_proto) + + def _read_varint(data): + shift = 0 + result = 0 + bytes_read = 0 + for byte in data: + bytes_read += 1 + result |= (byte & 0x7F) << shift + if (byte & 0x80) == 0: + return result, bytes_read + shift += 7 + raise ValueError("Incomplete varint") + + def read_protobuf_message_indices(payload): + array_len, n = _read_varint(payload) + payload = payload[n:] + indices = [] + for _ in range(array_len): + idx, n = _read_varint(payload) + indices.append(idx) + payload = payload[n:] + return indices, payload + + def get_protobuf_message_class(schema_info, indices): + pool, descriptor_set = schema_info + file_descriptor = descriptor_set.file[0] + proto = file_descriptor.message_type[indices[0]] + package = file_descriptor.package + parts = [proto.name] + current = proto + for i in indices[1:]: + current = current.nested_type[i] + parts.append(current.name) + full = f"{package}.{'.'.join(parts)}" if package else '.'.join(parts) + return message_factory.GetMessageClass(pool.FindMessageTypeByName(full)) + + message_deserializer = types.ModuleType(f'{_HOST_PKG}.message_deserializer') + message_deserializer.read_protobuf_message_indices = read_protobuf_message_indices + message_deserializer.get_protobuf_message_class = get_protobuf_message_class + + formats_builtins = types.ModuleType(f'{_HOST_PKG}.formats.builtins') + + class ProtobufHandler(FormatHandler): + name = 'protobuf' + + def build_schema(self, schema_str): + schema_bytes = base64.b64decode(schema_str) + descriptor_set = descriptor_pb2.FileDescriptorSet() + descriptor_set.ParseFromString(schema_bytes) + pool = descriptor_pool.DescriptorPool() + _preload_well_known_types(pool) + for fd_proto in descriptor_set.file: + pool.Add(fd_proto) + return (pool, descriptor_set) + + def build_schema_from_registry(self, schema_str, dep_schemas): + pool = descriptor_pool.DescriptorPool() + _preload_well_known_types(pool) + descriptor_set = descriptor_pb2.FileDescriptorSet() + for dep_name, dep_b64 in dep_schemas: + try: + pool.FindFileByName(dep_name) + continue + except KeyError: + pass + dep_proto = descriptor_pb2.FileDescriptorProto() + dep_proto.ParseFromString(base64.b64decode(dep_b64)) + dep_proto.name = dep_name + pool.Add(dep_proto) + fd_proto = descriptor_pb2.FileDescriptorProto() + fd_proto.ParseFromString(base64.b64decode(schema_str)) + descriptor_set.file.append(fd_proto) + pool.Add(fd_proto) + return (pool, descriptor_set) + + def deserialize(self, message, schema, *, log, uses_schema_registry): + raise NotImplementedError + + formats_builtins.ProtobufHandler = ProtobufHandler + + sys.modules[_HOST_PKG] = pkg + sys.modules[f'{_HOST_PKG}.formats'] = formats_pkg + sys.modules[f'{_HOST_PKG}.formats.base'] = formats_base + sys.modules[f'{_HOST_PKG}.formats.builtins'] = formats_builtins + sys.modules[f'{_HOST_PKG}.compression'] = compression_pkg + sys.modules[f'{_HOST_PKG}.compression.base'] = compression_base + sys.modules[f'{_HOST_PKG}.message_deserializer'] = message_deserializer + + +_install_host_stub() diff --git a/kafka_deserializers/tests/test_codecs.py b/kafka_deserializers/tests/test_codecs.py new file mode 100644 index 0000000000..a526d33dc4 --- /dev/null +++ b/kafka_deserializers/tests/test_codecs.py @@ -0,0 +1,56 @@ +# (C) Datadog, Inc. 2026-present +# All rights reserved +# Licensed under a 3-clause BSD style license (see LICENSE) +import gzip +import struct +import zlib + +import lz4.block +import lz4.frame +import pytest +import snappy +import zstandard + +from datadog_checks.kafka_deserializers.codecs import ( + GzipCodec, + Lz4Codec, + Lz4DdHdrCodec, + SnappyCodec, + ZlibCodec, + ZstdCodec, +) + +PAYLOAD = b'{"a":1,"b":[1,2,3,4,5,6,7,8,9,10]}' + + +def test_gzip_round_trip(): + assert GzipCodec().decompress(gzip.compress(PAYLOAD)) == PAYLOAD + + +def test_zlib_round_trip(): + assert ZlibCodec().decompress(zlib.compress(PAYLOAD)) == PAYLOAD + + +def test_snappy_round_trip(): + assert SnappyCodec().decompress(snappy.compress(PAYLOAD)) == PAYLOAD + + +def test_lz4_frame_round_trip(): + assert Lz4Codec().decompress(lz4.frame.compress(PAYLOAD)) == PAYLOAD + + +def test_lz4_dd_hdr_round_trip(): + """Reproduce DataDog/golz4 framing: 4-byte LE length + raw lz4 block.""" + block = lz4.block.compress(PAYLOAD, store_size=False) + framed = struct.pack(' bytes: + out = bytearray() + out.append((1 << 3) | 2) # field 1, length-delimited + out.append(len(inner_bytes)) + out += inner_bytes + out.append((2 << 3) | 0) # field 2, varint + out.append(org_id & 0x7F) + return bytes(out) + + +def _host_available() -> bool: + # Build the module path at runtime so `ddev validate imports` doesn't flag + # a static reference to a sibling integration's namespace. + import importlib + + host_pkg = 'datadog_' + 'checks.kafka_actions' + try: + importlib.import_module(host_pkg) + except ImportError: + return False + return True + + +_skip_without_host = pytest.mark.skipif( + not _host_available(), + reason='ProtobufMsgpackHandler requires the kafka_actions host package; ' + 'skip when running extras tests in isolation.', +) + + +@pytest.fixture +def proto_msgpack_handler(): + from datadog_checks.kafka_deserializers.handlers import ProtobufMsgpackHandler + + return ProtobufMsgpackHandler() + + +@_skip_without_host +def test_protobuf_msgpack_decodes_inner_field(proto_msgpack_handler, log): + import msgpack + + schema_b64 = _build_envelope_descriptor_b64() + inner = {'service': 'orders', 'env': 'prod', 'count': 3} + inner_bytes = msgpack.packb(inner, use_bin_type=True) + payload = _encode_envelope(inner_bytes, org_id=42) + + schema_str = json.dumps({'schema': schema_b64, 'msgpack_fields': ['test.Envelope.message']}) + schema = proto_msgpack_handler.build_schema(schema_str) + + out = proto_msgpack_handler.deserialize(payload, schema, log=log, uses_schema_registry=False) + parsed = json.loads(out) + assert parsed['org_id'] == 42 + assert parsed['message'] == inner + + +@_skip_without_host +def test_protobuf_msgpack_missing_field_is_silently_ok(proto_msgpack_handler, log): + schema_b64 = _build_envelope_descriptor_b64() + payload = _encode_envelope(b'', org_id=1) # empty bytes field + schema_str = json.dumps({'schema': schema_b64, 'msgpack_fields': ['test.Envelope.message']}) + schema = proto_msgpack_handler.build_schema(schema_str) + + out = proto_msgpack_handler.deserialize(payload, schema, log=log, uses_schema_registry=False) + parsed = json.loads(out) + assert parsed['org_id'] == 1 + assert 'message' not in parsed + + +@_skip_without_host +def test_protobuf_msgpack_invalid_inner_raises(proto_msgpack_handler, log): + schema_b64 = _build_envelope_descriptor_b64() + payload = _encode_envelope(b'\xff\xff\xff', org_id=1) # not valid msgpack + schema_str = json.dumps({'schema': schema_b64, 'msgpack_fields': ['test.Envelope.message']}) + schema = proto_msgpack_handler.build_schema(schema_str) + + with pytest.raises(ValueError, match="Failed to deserialize protobuf_msgpack"): + proto_msgpack_handler.deserialize(payload, schema, log=log, uses_schema_registry=False) + + +def _build_nested_descriptor_b64(): + """test.Outer { bytes payload=1; test.Inner inner=2; } / test.Inner { bytes details=1; }""" + import base64 as _b64 + + from google.protobuf import descriptor_pb2 + + fd = descriptor_pb2.FileDescriptorProto() + fd.name = 'nested.proto' + fd.syntax = 'proto3' + fd.package = 'test' + + outer = fd.message_type.add() + outer.name = 'Outer' + f = outer.field.add() + f.name = 'payload' + f.number = 1 + f.type = descriptor_pb2.FieldDescriptorProto.TYPE_BYTES + f.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL + f = outer.field.add() + f.name = 'inner' + f.number = 2 + f.type = descriptor_pb2.FieldDescriptorProto.TYPE_MESSAGE + f.type_name = '.test.Inner' + f.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL + + inner = fd.message_type.add() + inner.name = 'Inner' + f = inner.field.add() + f.name = 'details' + f.number = 1 + f.type = descriptor_pb2.FieldDescriptorProto.TYPE_BYTES + f.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL + + fds = descriptor_pb2.FileDescriptorSet() + fds.file.append(fd) + return _b64.b64encode(fds.SerializeToString()).decode('ascii') + + +def _encode_nested(payload_bytes: bytes, details_bytes: bytes) -> bytes: + # Outer { payload=payload_bytes, inner=Inner{details=details_bytes} } + inner_msg = bytearray() + inner_msg.append((1 << 3) | 2) + inner_msg.append(len(details_bytes)) + inner_msg += details_bytes + + out = bytearray() + out.append((1 << 3) | 2) + out.append(len(payload_bytes)) + out += payload_bytes + out.append((2 << 3) | 2) + out.append(len(inner_msg)) + out += inner_msg + return bytes(out) + + +@_skip_without_host +def test_protobuf_msgpack_nested_fields_decoded(proto_msgpack_handler, log): + """msgpack_fields targets a field on the outer message AND on a nested submessage.""" + import msgpack + + schema_b64 = _build_nested_descriptor_b64() + payload_inner = {'service': 'orders'} + details_inner = {'count': 7, 'tags': ['a', 'b']} + raw = _encode_nested( + msgpack.packb(payload_inner, use_bin_type=True), + msgpack.packb(details_inner, use_bin_type=True), + ) + schema_str = json.dumps( + { + 'schema': schema_b64, + 'msgpack_fields': ['test.Outer.payload', 'test.Inner.details'], + } + ) + schema = proto_msgpack_handler.build_schema(schema_str) + + out = proto_msgpack_handler.deserialize(raw, schema, log=log, uses_schema_registry=False) + parsed = json.loads(out) + assert parsed['payload'] == payload_inner + assert parsed['inner']['details'] == details_inner + + +# --- Coverage top-ups for ProtobufMsgpackHandler --------------------------- + + +def test_msgpack_json_encoder_datetime_and_bytes(handler, log): + """_MsgpackJSONEncoder branches: bytes -> base64, timestamp ext -> isoformat.""" + import msgpack + + payload = msgpack.packb( + {'when': datetime.datetime(2025, 1, 2, 3, 4, 5, tzinfo=datetime.timezone.utc), 'blob': b'\xff\x00'}, + datetime=True, + use_bin_type=True, + ) + out = handler.deserialize(payload, None, log=log, uses_schema_registry=False) + parsed = json.loads(out) + assert parsed['blob'] == base64.b64encode(b'\xff\x00').decode('ascii') + assert '2025-01-02T03:04:05' in parsed['when'] + + +def test_protobuf_msgpack_build_schema_from_registry(proto_msgpack_handler, log): + """build_schema_from_registry wraps the host call with msgpack_fields.""" + import base64 as _b64 + + from google.protobuf import descriptor_pb2 + + fd = descriptor_pb2.FileDescriptorProto() + fd.name = 'envelope.proto' + fd.syntax = 'proto3' + fd.package = 'test' + msg = fd.message_type.add() + msg.name = 'Envelope' + f = msg.field.add() + f.name = 'message' + f.number = 1 + f.type = descriptor_pb2.FieldDescriptorProto.TYPE_BYTES + f.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL + f = msg.field.add() + f.name = 'org_id' + f.number = 2 + f.type = descriptor_pb2.FieldDescriptorProto.TYPE_INT32 + f.label = descriptor_pb2.FieldDescriptorProto.LABEL_OPTIONAL + + single_b64 = _b64.b64encode(fd.SerializeToString()).decode('ascii') + wrapper = json.dumps({'schema': single_b64, 'msgpack_fields': ['test.Envelope.message']}) + schema = proto_msgpack_handler.build_schema_from_registry(wrapper, []) + assert isinstance(schema, tuple) + assert 'test.Envelope.message' in schema[1] + + +def test_protobuf_msgpack_uses_schema_registry_path(proto_msgpack_handler, log): + """uses_schema_registry=True reads varint message-indices prefix.""" + import msgpack + + schema_b64 = _build_envelope_descriptor_b64() + inner = {'k': 'v'} + body = _encode_envelope(msgpack.packb(inner, use_bin_type=True), org_id=7) + framed = b'\x00' + body # array_len=0 varint, then the body + + schema_str = json.dumps({'schema': schema_b64, 'msgpack_fields': ['test.Envelope.message']}) + schema = proto_msgpack_handler.build_schema(schema_str) + out = proto_msgpack_handler.deserialize(framed, schema, log=log, uses_schema_registry=True) + parsed = json.loads(out) + assert parsed['org_id'] == 7 + assert parsed['message'] == inner + + +def test_protobuf_msgpack_non_bytes_field_raises(proto_msgpack_handler, log): + """msgpack_fields pointing at a non-bytes field surfaces a clear error.""" + import msgpack + + schema_b64 = _build_envelope_descriptor_b64() + payload = _encode_envelope(msgpack.packb({'a': 1}, use_bin_type=True), org_id=99) + schema_str = json.dumps({'schema': schema_b64, 'msgpack_fields': ['test.Envelope.org_id']}) + schema = proto_msgpack_handler.build_schema(schema_str) + with pytest.raises(ValueError, match='non-bytes'): + proto_msgpack_handler.deserialize(payload, schema, log=log, uses_schema_registry=False) + + +def test_protobuf_msgpack_empty_returns_none(proto_msgpack_handler, log): + schema_b64 = _build_envelope_descriptor_b64() + schema_str = json.dumps({'schema': schema_b64, 'msgpack_fields': []}) + schema = proto_msgpack_handler.build_schema(schema_str) + assert proto_msgpack_handler.deserialize(b'', schema, log=log, uses_schema_registry=False) is None + + +def test_protobuf_msgpack_no_msgpack_fields_skips_walk(proto_msgpack_handler, log): + """Empty msgpack_fields means MessageToDict output is returned as-is.""" + import msgpack + + schema_b64 = _build_envelope_descriptor_b64() + payload = _encode_envelope(msgpack.packb({'a': 1}, use_bin_type=True), org_id=1) + schema_str = json.dumps({'schema': schema_b64, 'msgpack_fields': []}) + schema = proto_msgpack_handler.build_schema(schema_str) + out = proto_msgpack_handler.deserialize(payload, schema, log=log, uses_schema_registry=False) + parsed = json.loads(out) + # The bytes field stays base64-encoded since we didn't decode it + assert parsed['org_id'] == 1 + assert isinstance(parsed['message'], str)