diff --git a/src/instana/instrumentation/kafka/kafka_python.py b/src/instana/instrumentation/kafka/kafka_python.py index 42174c9e..c4979cc1 100644 --- a/src/instana/instrumentation/kafka/kafka_python.py +++ b/src/instana/instrumentation/kafka/kafka_python.py @@ -85,6 +85,45 @@ def trace_kafka_consume( else: return res + @wrapt.patch_function_wrapper("kafka", "KafkaConsumer.poll") + def trace_kafka_poll( + wrapped: Callable[..., "kafka.KafkaConsumer.poll"], + instance: "kafka.KafkaConsumer", + args: Tuple[int, str, Tuple[Any, ...]], + kwargs: Dict[str, Any], + ) -> Dict[str, Any]: + if tracing_is_off(): + return wrapped(*args, **kwargs) + + tracer, parent_span, _ = get_tracer_tuple() + + # The KafkaConsumer.consume() from the kafka-python-ng call the + # KafkaConsumer.poll() internally, so we do not consider it here. + if parent_span and parent_span.name == "kafka-consumer": + return wrapped(*args, **kwargs) + + parent_context = ( + parent_span.get_span_context() + if parent_span + else tracer.extract( + Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True + ) + ) + + with tracer.start_as_current_span( + "kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER + ) as span: + topic = list(instance.subscription())[0] + span.set_attribute("kafka.service", topic) + span.set_attribute("kafka.access", "poll") + + try: + res = wrapped(*args, **kwargs) + except Exception as exc: + span.record_exception(exc) + else: + return res + logger.debug("Instrumenting Kafka (kafka-python)") except ImportError: pass diff --git a/tests/clients/kafka/test_confluent_kafka.py b/tests/clients/kafka/test_confluent_kafka.py index 722b4611..827a8c95 100644 --- a/tests/clients/kafka/test_confluent_kafka.py +++ b/tests/clients/kafka/test_confluent_kafka.py @@ -117,7 +117,7 @@ def test_trace_confluent_kafka_consume(self) -> None: def test_trace_confluent_kafka_poll(self) -> None: # Produce some events self.producer.produce(testenv["kafka_topic"], b"raw_bytes1") - self.producer.flush(timeout=30) + self.producer.flush() # Consume the events consumer_config = self.kafka_config.copy() @@ -128,7 +128,7 @@ def test_trace_confluent_kafka_poll(self) -> None: consumer.subscribe([testenv["kafka_topic"]]) with tracer.start_as_current_span("test"): - msg = consumer.poll(timeout=60) # noqa: F841 + msg = consumer.poll(timeout=30) # noqa: F841 consumer.close() @@ -144,13 +144,8 @@ def test_trace_confluent_kafka_poll(self) -> None: # Parent relationships assert kafka_span.p == test_span.s - # Error logging - assert not test_span.ec - assert not kafka_span.ec - assert kafka_span.n == "kafka" assert kafka_span.k == SpanKind.SERVER - assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] assert kafka_span.data["kafka"]["access"] == "poll" def test_trace_confluent_kafka_error(self) -> None: diff --git a/tests/clients/kafka/test_kafka_python.py b/tests/clients/kafka/test_kafka_python.py index f5b9de1b..3a0ecfde 100644 --- a/tests/clients/kafka/test_kafka_python.py +++ b/tests/clients/kafka/test_kafka_python.py @@ -81,8 +81,6 @@ def test_trace_kafka_python_send(self) -> None: assert kafka_span.data["kafka"]["access"] == "send" def test_trace_kafka_python_consume(self) -> None: - agent.options.allow_exit_as_root = False - # Produce some events self.producer.send(testenv["kafka_topic"], b"raw_bytes1") self.producer.send(testenv["kafka_topic"], b"raw_bytes2") @@ -125,9 +123,48 @@ def test_trace_kafka_python_consume(self) -> None: assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] assert kafka_span.data["kafka"]["access"] == "consume" - def test_trace_kafka_python_error(self) -> None: - agent.options.allow_exit_as_root = False + def test_trace_kafka_python_poll(self) -> None: + # Produce some events + self.producer.send(testenv["kafka_topic"], b"raw_bytes1") + self.producer.send(testenv["kafka_topic"], b"raw_bytes2") + self.producer.flush() + + # Consume the events + consumer = KafkaConsumer( + testenv["kafka_topic"], + bootstrap_servers=testenv["kafka_bootstrap_servers"], + auto_offset_reset="earliest", # consume earliest available messages + enable_auto_commit=False, # do not auto-commit offsets + consumer_timeout_ms=1000, + ) + + with tracer.start_as_current_span("test"): + msg = consumer.poll() # noqa: F841 + + consumer.close() + spans = self.recorder.queued_spans() + assert len(spans) == 2 + + kafka_span = spans[0] + test_span = spans[1] + + # Same traceId + assert test_span.t == kafka_span.t + + # Parent relationships + assert kafka_span.p == test_span.s + + # Error logging + assert not test_span.ec + assert not kafka_span.ec + + assert kafka_span.n == "kafka" + assert kafka_span.k == SpanKind.SERVER + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] + assert kafka_span.data["kafka"]["access"] == "poll" + + def test_trace_kafka_python_error(self) -> None: # Consume the events consumer = KafkaConsumer( "inexistent_kafka_topic",