From 6ba97169c238d639c5940d855db7deb31bee311d Mon Sep 17 00:00:00 2001 From: Alex Gherghisan Date: Wed, 20 May 2026 13:50:25 +0000 Subject: [PATCH] feat(telemetry): add trace export controls --- yarn-project/foundation/src/config/env_var.ts | 2 + yarn-project/telemetry-client/src/config.ts | 12 +++ .../monitored_batch_span_processor.test.ts | 76 +++++++++++++++++++ .../src/monitored_batch_span_processor.ts | 25 +++++- yarn-project/telemetry-client/src/otel.ts | 7 +- yarn-project/telemetry-client/src/start.ts | 7 +- 6 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 yarn-project/telemetry-client/src/monitored_batch_span_processor.test.ts diff --git a/yarn-project/foundation/src/config/env_var.ts b/yarn-project/foundation/src/config/env_var.ts index e086a79195e7..c62136c29ca7 100644 --- a/yarn-project/foundation/src/config/env_var.ts +++ b/yarn-project/foundation/src/config/env_var.ts @@ -102,8 +102,10 @@ export type EnvVar = | 'OTEL_EXPORTER_OTLP_TRACES_ENDPOINT' | 'OTEL_EXPORTER_OTLP_LOGS_ENDPOINT' | 'OTEL_COLLECT_INTERVAL_MS' + | 'OTEL_BSP_MAX_QUEUE_SIZE' | 'OTEL_EXCLUDE_METRICS' | 'OTEL_INCLUDE_METRICS' + | 'OTEL_MIN_TRACE_DURATION_MS' | 'OTEL_EXPORT_TIMEOUT_MS' | 'PUBLIC_OTEL_EXPORTER_OTLP_METRICS_ENDPOINT' | 'PUBLIC_OTEL_INCLUDE_METRICS' diff --git a/yarn-project/telemetry-client/src/config.ts b/yarn-project/telemetry-client/src/config.ts index 4e705c7a6fcf..814aa2474ba8 100644 --- a/yarn-project/telemetry-client/src/config.ts +++ b/yarn-project/telemetry-client/src/config.ts @@ -17,6 +17,8 @@ export interface TelemetryClientConfig { otelExportTimeoutMs: number; otelExcludeMetrics: string[]; otelIncludeMetrics: string[]; + otelMinTraceDurationMs: number; + otelBspMaxQueueSize: number; } export const telemetryClientConfigMappings: ConfigMappingsType = { @@ -57,6 +59,16 @@ export const telemetryClientConfigMappings: ConfigMappingsType[1]): void { + this.spans.push(...spans); + resultCallback({ code: ExportResultCode.SUCCESS }); + } + + shutdown(): Promise { + return Promise.resolve(); + } +} + +const log = { warn: () => {} } as any; + +function makeSpan(durationMs: number, statusCode = SpanStatusCode.OK): ReadableSpan { + const seconds = Math.floor(durationMs / 1000); + const nanos = (durationMs - seconds * 1000) * 1_000_000; + return { + attributes: {}, + droppedAttributesCount: 0, + droppedEventsCount: 0, + droppedLinksCount: 0, + duration: [seconds, nanos], + ended: true, + endTime: [seconds, nanos], + events: [], + instrumentationLibrary: {} as any, + kind: SpanKind.INTERNAL, + links: [], + name: `span-${durationMs}`, + resource: {} as any, + spanContext: () => ({ spanId: '0'.repeat(16), traceFlags: 1, traceId: '0'.repeat(32) }), + startTime: [0, 0], + status: { code: statusCode }, + }; +} + +describe('MonitoredBatchSpanProcessor', () => { + it('does not export successful spans shorter than the configured duration', async () => { + const exporter = new CollectingSpanExporter(); + const processor = new MonitoredBatchSpanProcessor(exporter, log, { minTraceDurationMs: 10 }); + + processor.onEnd(makeSpan(9)); + processor.onEnd(makeSpan(10)); + await processor.forceFlush(); + + expect(exporter.spans.map(span => span.name)).toEqual(['span-10']); + }); + + it('exports short error spans', async () => { + const exporter = new CollectingSpanExporter(); + const processor = new MonitoredBatchSpanProcessor(exporter, log, { minTraceDurationMs: 10 }); + + processor.onEnd(makeSpan(1, SpanStatusCode.ERROR)); + await processor.forceFlush(); + + expect(exporter.spans.map(span => span.name)).toEqual(['span-1']); + }); + + it('allows short successful spans when the minimum duration is disabled', async () => { + const exporter = new CollectingSpanExporter(); + const processor = new MonitoredBatchSpanProcessor(exporter, log, { minTraceDurationMs: 0 }); + + processor.onEnd(makeSpan(1)); + await processor.forceFlush(); + + expect(exporter.spans.map(span => span.name)).toEqual(['span-1']); + }); +}); diff --git a/yarn-project/telemetry-client/src/monitored_batch_span_processor.ts b/yarn-project/telemetry-client/src/monitored_batch_span_processor.ts index 669c136f8aaf..c5fd7bb81918 100644 --- a/yarn-project/telemetry-client/src/monitored_batch_span_processor.ts +++ b/yarn-project/telemetry-client/src/monitored_batch_span_processor.ts @@ -1,12 +1,19 @@ import type { Logger } from '@aztec/foundation/log'; -import type { Context } from '@opentelemetry/api'; +import { type Context, SpanStatusCode } from '@opentelemetry/api'; +import { hrTimeToMilliseconds } from '@opentelemetry/core'; import type { SpanExporter } from '@opentelemetry/sdk-trace-base'; import { BatchSpanProcessor, type BufferConfig, type ReadableSpan, type Span } from '@opentelemetry/sdk-trace-node'; /** Minimum interval between drop warnings to avoid log spam. */ const DROP_WARNING_INTERVAL_MS = 30_000; +const DEFAULT_MIN_TRACE_DURATION_MS = 10; + +export type MonitoredBatchSpanProcessorConfig = BufferConfig & { + minTraceDurationMs?: number; +}; + /** * Wraps BatchSpanProcessor to emit warnings when spans are dropped due to a full queue. * The standard BatchSpanProcessor silently discards spans when its internal queue reaches @@ -14,6 +21,7 @@ const DROP_WARNING_INTERVAL_MS = 30_000; */ export class MonitoredBatchSpanProcessor extends BatchSpanProcessor { private readonly maxQueueSize: number; + private readonly minTraceDurationMs: number; private readonly log: Logger; private approxQueueSize = 0; @@ -21,10 +29,11 @@ export class MonitoredBatchSpanProcessor extends BatchSpanProcessor { private totalDropped = 0; private lastWarningTime = 0; - constructor(exporter: SpanExporter, log: Logger, config?: BufferConfig) { + constructor(exporter: SpanExporter, log: Logger, config?: MonitoredBatchSpanProcessorConfig) { const maxQueueSize = config?.maxQueueSize ?? 2048; super(exporter, { ...config, maxQueueSize }); this.maxQueueSize = maxQueueSize; + this.minTraceDurationMs = Math.max(0, config?.minTraceDurationMs ?? DEFAULT_MIN_TRACE_DURATION_MS); this.log = log; } @@ -33,6 +42,10 @@ export class MonitoredBatchSpanProcessor extends BatchSpanProcessor { } override onEnd(span: ReadableSpan): void { + if (this.shouldDropShortSpan(span)) { + return; + } + if (this.approxQueueSize >= this.maxQueueSize) { this.droppedSinceLastWarning++; this.totalDropped++; @@ -57,6 +70,14 @@ export class MonitoredBatchSpanProcessor extends BatchSpanProcessor { await super.shutdown(); } + private shouldDropShortSpan(span: ReadableSpan): boolean { + return ( + this.minTraceDurationMs > 0 && + span.status.code !== SpanStatusCode.ERROR && + hrTimeToMilliseconds(span.duration) < this.minTraceDurationMs + ); + } + private maybeLogDropWarning(): void { const now = Date.now(); if (now - this.lastWarningTime >= DROP_WARNING_INTERVAL_MS) { diff --git a/yarn-project/telemetry-client/src/otel.ts b/yarn-project/telemetry-client/src/otel.ts index 42d698ab6ed0..a67cb76ef637 100644 --- a/yarn-project/telemetry-client/src/otel.ts +++ b/yarn-project/telemetry-client/src/otel.ts @@ -374,7 +374,12 @@ export class OpenTelemetryClient implements TelemetryClient { const tracerProvider = new NodeTracerProvider({ resource, spanProcessors: config.tracesCollectorUrl - ? [new MonitoredBatchSpanProcessor(new OTLPTraceExporter({ url: config.tracesCollectorUrl.href }), log)] + ? [ + new MonitoredBatchSpanProcessor(new OTLPTraceExporter({ url: config.tracesCollectorUrl.href }), log, { + maxQueueSize: config.otelBspMaxQueueSize, + minTraceDurationMs: config.otelMinTraceDurationMs, + }), + ] : [], }); diff --git a/yarn-project/telemetry-client/src/start.ts b/yarn-project/telemetry-client/src/start.ts index 6006b001dd9f..4204cede80ce 100644 --- a/yarn-project/telemetry-client/src/start.ts +++ b/yarn-project/telemetry-client/src/start.ts @@ -19,7 +19,12 @@ export async function initTelemetryClient( return telemetry; } - if (config.metricsCollectorUrl || config.publicMetricsCollectorUrl) { + if ( + config.metricsCollectorUrl || + config.publicMetricsCollectorUrl || + config.tracesCollectorUrl || + config.logsCollectorUrl + ) { log.info(`Using OpenTelemetry client with custom collector`); // Lazy load OpenTelemetry to avoid loading heavy deps at startup const { OpenTelemetryClient } = await import('./otel.js');