diff --git a/composer.json b/composer.json index 9aef88b0..f9c5cecf 100644 --- a/composer.json +++ b/composer.json @@ -43,6 +43,7 @@ "ext-grpc": "*", "ext-mbstring": "*", "ext-opentelemetry": "*", + "ext-rdkafka": "*", "ext-pdo": "*", "ext-pdo_sqlite": "*", "ext-xdebug": "*", diff --git a/docs/src/instrumentation/traces.md b/docs/src/instrumentation/traces.md index 3ae435b9..5949998f 100644 --- a/docs/src/instrumentation/traces.md +++ b/docs/src/instrumentation/traces.md @@ -42,13 +42,14 @@ A DSN starts with a transport and an exporter separated by a `+` character. The Here is table list of the available transport and exporter for traces: -| Transport | Exporter | Description | Example | Default | -|-----------|-----------|--------------------------------------------------------------|-------------------------------------------|--------------| -| http(s) | otlp | OpenTelemetry exporter using HTTP protocol (over TLS) | http+otlp://localhost:4318/v1/traces | N/A | -| grpc(s) | otlp | OpenTelemetry exporter using gRPC protocol (over TLS) | grpc+otlp://localhost:4317 | N/A | -| http(s) | zipkin | Zipkin exporter using HTTP protocol (over TLS) | http+zipkin://localhost:9411/api/v2/spans | N/A | -| empty | in-memory | In-memory exporter for testing purpose | in-memory://default | N/A | -| stream | console | Console exporter for testing purpose using a stream resource | stream+console://default | php://stdout | +| Transport | Exporter | Description | Example | Default | +|-----------|-----------|---------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|--------------| +| http(s) | otlp | OpenTelemetry exporter using HTTP protocol (over TLS) | http+otlp://localhost:4318/v1/traces | N/A | +| grpc(s) | otlp | OpenTelemetry exporter using gRPC protocol (over TLS) | grpc+otlp://localhost:4317 | N/A | +| kafka | otlp | OpenTelemetry exporter using the Kafka message broker. Add query parameters for configuring the message broker. | kafka+otlp://open_telemetry_local_alpha_traces?metadata.broker.list=kafka:9092 | N/A | +| http(s) | zipkin | Zipkin exporter using HTTP protocol (over TLS) | http+zipkin://localhost:9411/api/v2/spans | N/A | +| empty | in-memory | In-memory exporter for testing purpose | in-memory://default | N/A | +| stream | console | Console exporter for testing purpose using a stream resource | stream+console://default | php://stdout | Note: The `stream+console` DSN is the only DSN than can refer to a stream resource using the `path` block. For example: `stream+console://default/file.log`. diff --git a/src/Instrumentation/Symfony/HttpKernel/TraceableHttpKernelEventSubscriber.php b/src/Instrumentation/Symfony/HttpKernel/TraceableHttpKernelEventSubscriber.php index 4108f0e2..9e008104 100644 --- a/src/Instrumentation/Symfony/HttpKernel/TraceableHttpKernelEventSubscriber.php +++ b/src/Instrumentation/Symfony/HttpKernel/TraceableHttpKernelEventSubscriber.php @@ -375,9 +375,9 @@ private function requestAttributes(Request $request): iterable /** * @param array $headers * - * @return array + * @return \Generator> */ - private function headerAttributes(HeaderBag $headerBag, array $headers): iterable + private function headerAttributes(HeaderBag $headerBag, array $headers): \Generator { foreach ($headers as $header => $attribute) { if ($headerBag->has($header)) { diff --git a/src/OpenTelemetry/Exporter/ConsoleExporterEndpoint.php b/src/OpenTelemetry/Exporter/ConsoleExporterEndpoint.php index 66f48589..86428a57 100644 --- a/src/OpenTelemetry/Exporter/ConsoleExporterEndpoint.php +++ b/src/OpenTelemetry/Exporter/ConsoleExporterEndpoint.php @@ -30,4 +30,9 @@ public function getExporter(): string { return $this->dsn->getExporter(); } + + public function getDsn(): ExporterDsn + { + return $this->dsn; + } } diff --git a/src/OpenTelemetry/Exporter/ExporterDsn.php b/src/OpenTelemetry/Exporter/ExporterDsn.php index 412c426c..fd8ec790 100644 --- a/src/OpenTelemetry/Exporter/ExporterDsn.php +++ b/src/OpenTelemetry/Exporter/ExporterDsn.php @@ -4,6 +4,7 @@ use Zenstruck\Dsn; use Zenstruck\Uri; +use Zenstruck\Uri\Part\Query; final class ExporterDsn { @@ -60,6 +61,11 @@ public function getPort(?int $default = null): ?int return $this->uri->port() ?? $default; } + public function getQuery(): Query + { + return $this->uri->query(); + } + /** * @return string[] */ diff --git a/src/OpenTelemetry/Exporter/ExporterEndpointInterface.php b/src/OpenTelemetry/Exporter/ExporterEndpointInterface.php index 199433b0..8dc129d4 100644 --- a/src/OpenTelemetry/Exporter/ExporterEndpointInterface.php +++ b/src/OpenTelemetry/Exporter/ExporterEndpointInterface.php @@ -9,4 +9,6 @@ public function getExporter(): string; public function getTransport(): ?string; public static function fromDsn(ExporterDsn $dsn): self; + + public function getDsn(): ExporterDsn; } diff --git a/src/OpenTelemetry/Exporter/OtlpExporterEndpoint.php b/src/OpenTelemetry/Exporter/OtlpExporterEndpoint.php index 73eb8daa..3d30b051 100644 --- a/src/OpenTelemetry/Exporter/OtlpExporterEndpoint.php +++ b/src/OpenTelemetry/Exporter/OtlpExporterEndpoint.php @@ -42,6 +42,10 @@ public function withSignal(string $signal): self public function __toString() { + if (TransportEnum::Kafka === $this->transport) { + return \sprintf('kafka://%s?%s', $this->dsn->getHost(), $this->dsn->getQuery()->toString()); + } + $uri = $this->uriFactory->createUri(); $uri = $uri ->withScheme($this->transport->getScheme()) @@ -78,4 +82,9 @@ public function getExporter(): string { return 'otlp'; } + + public function getDsn(): ExporterDsn + { + return $this->dsn; + } } diff --git a/src/OpenTelemetry/Log/LogExporterEndpoint.php b/src/OpenTelemetry/Log/LogExporterEndpoint.php index 628c2acc..a484f3cf 100644 --- a/src/OpenTelemetry/Log/LogExporterEndpoint.php +++ b/src/OpenTelemetry/Log/LogExporterEndpoint.php @@ -50,4 +50,9 @@ public function getExporter(): string { return $this->exporter->value; } + + public function getDsn(): ExporterDsn + { + return $this->dsn; + } } diff --git a/src/OpenTelemetry/Metric/MetricExporterEndpoint.php b/src/OpenTelemetry/Metric/MetricExporterEndpoint.php index c154874b..fae0483f 100644 --- a/src/OpenTelemetry/Metric/MetricExporterEndpoint.php +++ b/src/OpenTelemetry/Metric/MetricExporterEndpoint.php @@ -48,4 +48,9 @@ public function getExporter(): string { return $this->exporter->value; } + + public function getDsn(): ExporterDsn + { + return $this->dsn; + } } diff --git a/src/OpenTelemetry/Trace/TraceExporterEndpoint.php b/src/OpenTelemetry/Trace/TraceExporterEndpoint.php index 51591d85..70383aaf 100644 --- a/src/OpenTelemetry/Trace/TraceExporterEndpoint.php +++ b/src/OpenTelemetry/Trace/TraceExporterEndpoint.php @@ -52,4 +52,9 @@ public function getTransport(): ?string { return $this->transport?->value; } + + public function getDsn(): ExporterDsn + { + return $this->dsn; + } } diff --git a/src/OpenTelemetry/Trace/ZipkinExporterEndpoint.php b/src/OpenTelemetry/Trace/ZipkinExporterEndpoint.php index 8b66ddbf..81fe99b0 100644 --- a/src/OpenTelemetry/Trace/ZipkinExporterEndpoint.php +++ b/src/OpenTelemetry/Trace/ZipkinExporterEndpoint.php @@ -53,4 +53,9 @@ public function getExporter(): string { return 'zipkin'; } + + public function getDsn(): ExporterDsn + { + return $this->dsn; + } } diff --git a/src/OpenTelemetry/Transport/KafkaTransport.php b/src/OpenTelemetry/Transport/KafkaTransport.php new file mode 100644 index 00000000..c11a88fa --- /dev/null +++ b/src/OpenTelemetry/Transport/KafkaTransport.php @@ -0,0 +1,85 @@ + + */ +final readonly class KafkaTransport implements TransportInterface +{ + private const FLUSH_TIMEOUT = 10000; + + private Producer $producer; + private ProducerTopic $topicHandle; + + public function __construct( + Conf $configuration, + string $topic, + ) { + if (!\class_exists(Conf::class)) { + throw new \RuntimeException('The PHP extension "rdkafka" is required to use the Kafka transport.'); + } + + $this->producer = new Producer($configuration); + $this->topicHandle = $this->producer->newTopic($topic); + } + + public function contentType(): string + { + return 'application/x-protobuf'; + } + + /** + * @phpstan-return FutureInterface + */ + public function send(string $payload, ?CancellationInterface $cancellation = null): FutureInterface + { + try { + $this->topicHandle->producev(\RD_KAFKA_PARTITION_UA, 0, $payload); + } catch (\Throwable $exception) { + return new ErrorFuture($exception); + } + + return new CompletedFuture(null); + } + + public function shutdown(?CancellationInterface $cancellation = null): bool + { + return $this->flushInternal(); + } + + public function forceFlush(?CancellationInterface $cancellation = null): bool + { + return $this->flushInternal(); + } + + private function flushInternal(): bool + { + // librdkafka recommends retrying the flush operation a couple of times when it returns a null result. + $timeout = self::FLUSH_TIMEOUT; + $start = \microtime(true); + do { + $res = $this->producer->flush($timeout); + if (\RD_KAFKA_RESP_ERR_NO_ERROR === $res) { + return true; + } + + // reduce timeout + $elapsedMs = (int) \round((\microtime(true) - $start) * 1000); + $timeout = \max(0, self::FLUSH_TIMEOUT - $elapsedMs); + } while ($timeout > 0); + + return false; + } +} diff --git a/src/OpenTelemetry/Transport/KafkaTransportFactory.php b/src/OpenTelemetry/Transport/KafkaTransportFactory.php new file mode 100644 index 00000000..c8fba052 --- /dev/null +++ b/src/OpenTelemetry/Transport/KafkaTransportFactory.php @@ -0,0 +1,30 @@ +getTransport()); + } + + public function createTransport(#[\SensitiveParameter] ExporterEndpointInterface $endpoint, ExporterOptionsInterface $options): TransportInterface + { + $dsn = $endpoint->getDsn(); + $queryParameters = $dsn->getQuery()->all(); + $conf = new Conf(); + foreach ($queryParameters as $k => $v) { + $conf->set(\str_replace('_', '.', $k), (string) $v); + } + + return new KafkaTransport($conf, $dsn->getHost()); + } +} diff --git a/src/OpenTelemetry/Transport/TransportEnum.php b/src/OpenTelemetry/Transport/TransportEnum.php index 2389cb08..30f1ed88 100644 --- a/src/OpenTelemetry/Transport/TransportEnum.php +++ b/src/OpenTelemetry/Transport/TransportEnum.php @@ -11,12 +11,14 @@ enum TransportEnum: string case Http = 'http'; case Https = 'https'; case Stream = 'stream'; + case Kafka = 'kafka'; public function getScheme(): ?string { return match ($this) { self::Http, self::Grpc => 'http', self::Https, self::Grpcs => 'https', + self::Kafka => 'kafka', default => null, }; } diff --git a/src/Resources/config/services_transports.php b/src/Resources/config/services_transports.php index da38d171..ca4c53f5 100644 --- a/src/Resources/config/services_transports.php +++ b/src/Resources/config/services_transports.php @@ -2,6 +2,7 @@ use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\AbstractTransportFactory; use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\GrpcTransportFactory; +use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\KafkaTransportFactory; use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\OtlpHttpTransportFactory; use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\PsrHttpTransportFactory; use FriendsOfOpenTelemetry\OpenTelemetryBundle\OpenTelemetry\Transport\StreamTransportFactory; @@ -39,6 +40,9 @@ ->parent('open_telemetry.transport_factory.abstract') ->tag('open_telemetry.transport_factory') + ->set('open_telemetry.transport_factory.kafka', KafkaTransportFactory::class) + ->tag('open_telemetry.transport_factory') + ->set('open_telemetry.transport_factory', TransportFactory::class) ->args([ tagged_iterator('open_telemetry.transport_factory'), diff --git a/tests/Unit/OpenTelemetry/Transport/KafkaTransportFactoryTest.php b/tests/Unit/OpenTelemetry/Transport/KafkaTransportFactoryTest.php new file mode 100644 index 00000000..55f2e3df --- /dev/null +++ b/tests/Unit/OpenTelemetry/Transport/KafkaTransportFactoryTest.php @@ -0,0 +1,89 @@ +supports($endpoint, $options)); + + if ($shouldSupport) { + $transport = $factory->createTransport($endpoint, $options); + self::assertSame('application/x-protobuf', $transport->contentType()); + } + } + + /** + * @return \Generator + */ + public static function exporterProvider(): \Generator + { + // Kafka for traces + yield [ + TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-traces?metadata_broker_list=localhost:9092')), + new OtlpExporterOptions(), + true, + ]; + + // Kafka for metrics + yield [ + MetricExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-metrics?metadata_broker_list=localhost:9092')), + new MetricExporterOptions(), + true, + ]; + + // Kafka for logs + yield [ + LogExporterEndpoint::fromDsn(ExporterDsn::fromString('kafka+otlp://otel-logs?metadata_broker_list=localhost:9092')), + new OtlpExporterOptions(), + true, + ]; + + // Not Kafka transports should not be supported + yield [ + TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('grpc+otlp://localhost')), + new OtlpExporterOptions(), + false, + ]; + + yield [ + TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('http+otlp://localhost')), + new OtlpExporterOptions(), + false, + ]; + + yield [ + TraceExporterEndpoint::fromDsn(ExporterDsn::fromString('stream+console://default')), + new EmptyExporterOptions(), + false, + ]; + } +} diff --git a/tests/Unit/OpenTelemetry/Transport/KafkaTransportTest.php b/tests/Unit/OpenTelemetry/Transport/KafkaTransportTest.php new file mode 100644 index 00000000..ea9cf4c2 --- /dev/null +++ b/tests/Unit/OpenTelemetry/Transport/KafkaTransportTest.php @@ -0,0 +1,39 @@ +contentType()); + } + + public function testSendReturnsCompletedFutureOnSuccess(): void + { + $conf = new Conf(); + $transport = new KafkaTransport($conf, 'test-topic'); + + // We cannot easily mock RdKafka internals without the extension. Basic smoke test for method existence. + $future = $transport->send('payload'); + /* @phpstan-ignore-next-line */ + self::assertTrue(\method_exists($future, 'await')); // interface method presence + } +}