Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/instana/instrumentation/kafka/kafka_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,45 @@ def trace_kafka_consume(
else:
return res

@wrapt.patch_function_wrapper("kafka", "KafkaConsumer.poll")
def trace_kafka_poll(
wrapped: Callable[..., "kafka.KafkaConsumer.poll"],
instance: "kafka.KafkaConsumer",
args: Tuple[int, str, Tuple[Any, ...]],
kwargs: Dict[str, Any],
) -> Dict[str, Any]:
if tracing_is_off():
return wrapped(*args, **kwargs)

tracer, parent_span, _ = get_tracer_tuple()

# The KafkaConsumer.consume() from the kafka-python-ng call the
# KafkaConsumer.poll() internally, so we do not consider it here.
if parent_span and parent_span.name == "kafka-consumer":
return wrapped(*args, **kwargs)

parent_context = (
parent_span.get_span_context()
if parent_span
else tracer.extract(
Format.KAFKA_HEADERS, {}, disable_w3c_trace_context=True
)
)

with tracer.start_as_current_span(
"kafka-consumer", span_context=parent_context, kind=SpanKind.CONSUMER
) as span:
topic = list(instance.subscription())[0]
span.set_attribute("kafka.service", topic)
span.set_attribute("kafka.access", "poll")

try:
res = wrapped(*args, **kwargs)
except Exception as exc:
span.record_exception(exc)
else:
return res

logger.debug("Instrumenting Kafka (kafka-python)")
except ImportError:
pass
9 changes: 2 additions & 7 deletions tests/clients/kafka/test_confluent_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def test_trace_confluent_kafka_consume(self) -> None:
def test_trace_confluent_kafka_poll(self) -> None:
# Produce some events
self.producer.produce(testenv["kafka_topic"], b"raw_bytes1")
self.producer.flush(timeout=30)
self.producer.flush()

# Consume the events
consumer_config = self.kafka_config.copy()
Expand All @@ -128,7 +128,7 @@ def test_trace_confluent_kafka_poll(self) -> None:
consumer.subscribe([testenv["kafka_topic"]])

with tracer.start_as_current_span("test"):
msg = consumer.poll(timeout=60) # noqa: F841
msg = consumer.poll(timeout=30) # noqa: F841

consumer.close()

Expand All @@ -144,13 +144,8 @@ def test_trace_confluent_kafka_poll(self) -> None:
# Parent relationships
assert kafka_span.p == test_span.s

# Error logging
assert not test_span.ec
assert not kafka_span.ec

assert kafka_span.n == "kafka"
assert kafka_span.k == SpanKind.SERVER
assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"]
assert kafka_span.data["kafka"]["access"] == "poll"

def test_trace_confluent_kafka_error(self) -> None:
Expand Down
45 changes: 41 additions & 4 deletions tests/clients/kafka/test_kafka_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ def test_trace_kafka_python_send(self) -> None:
assert kafka_span.data["kafka"]["access"] == "send"

def test_trace_kafka_python_consume(self) -> None:
agent.options.allow_exit_as_root = False

# Produce some events
self.producer.send(testenv["kafka_topic"], b"raw_bytes1")
self.producer.send(testenv["kafka_topic"], b"raw_bytes2")
Expand Down Expand Up @@ -125,9 +123,48 @@ def test_trace_kafka_python_consume(self) -> None:
assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"]
assert kafka_span.data["kafka"]["access"] == "consume"

def test_trace_kafka_python_error(self) -> None:
agent.options.allow_exit_as_root = False
def test_trace_kafka_python_poll(self) -> None:
# Produce some events
self.producer.send(testenv["kafka_topic"], b"raw_bytes1")
self.producer.send(testenv["kafka_topic"], b"raw_bytes2")
self.producer.flush()

# Consume the events
consumer = KafkaConsumer(
testenv["kafka_topic"],
bootstrap_servers=testenv["kafka_bootstrap_servers"],
auto_offset_reset="earliest", # consume earliest available messages
enable_auto_commit=False, # do not auto-commit offsets
consumer_timeout_ms=1000,
)

with tracer.start_as_current_span("test"):
msg = consumer.poll() # noqa: F841

consumer.close()

spans = self.recorder.queued_spans()
assert len(spans) == 2

kafka_span = spans[0]
test_span = spans[1]

# Same traceId
assert test_span.t == kafka_span.t

# Parent relationships
assert kafka_span.p == test_span.s

# Error logging
assert not test_span.ec
assert not kafka_span.ec

assert kafka_span.n == "kafka"
assert kafka_span.k == SpanKind.SERVER
assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"]
assert kafka_span.data["kafka"]["access"] == "poll"

def test_trace_kafka_python_error(self) -> None:
# Consume the events
consumer = KafkaConsumer(
"inexistent_kafka_topic",
Expand Down