Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions yarn-project/foundation/src/config/env_var.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ export type EnvVar =
| 'OTEL_EXPORTER_OTLP_TRACES_ENDPOINT'
| 'OTEL_EXPORTER_OTLP_LOGS_ENDPOINT'
| 'OTEL_COLLECT_INTERVAL_MS'
| 'OTEL_BSP_MAX_QUEUE_SIZE'
| 'OTEL_EXCLUDE_METRICS'
| 'OTEL_INCLUDE_METRICS'
| 'OTEL_MIN_TRACE_DURATION_MS'
| 'OTEL_EXPORT_TIMEOUT_MS'
| 'PUBLIC_OTEL_EXPORTER_OTLP_METRICS_ENDPOINT'
| 'PUBLIC_OTEL_INCLUDE_METRICS'
Expand Down
12 changes: 12 additions & 0 deletions yarn-project/telemetry-client/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export interface TelemetryClientConfig {
otelExportTimeoutMs: number;
otelExcludeMetrics: string[];
otelIncludeMetrics: string[];
otelMinTraceDurationMs: number;
otelBspMaxQueueSize: number;
}

export const telemetryClientConfigMappings: ConfigMappingsType<TelemetryClientConfig> = {
Expand Down Expand Up @@ -57,6 +59,16 @@ export const telemetryClientConfigMappings: ConfigMappingsType<TelemetryClientCo
: [],
defaultValue: [],
},
otelMinTraceDurationMs: {
env: 'OTEL_MIN_TRACE_DURATION_MS',
description: 'The minimum successful trace duration to export in milliseconds. Set to 0 to export all traces.',
...numberConfigHelper(10),
},
otelBspMaxQueueSize: {
env: 'OTEL_BSP_MAX_QUEUE_SIZE',
description: 'The maximum number of completed spans to queue before export.',
...numberConfigHelper(2048),
},
otelIncludeMetrics: {
env: 'OTEL_INCLUDE_METRICS',
description: 'A list of metric prefixes to include in export (ignored if OTEL_EXCLUDE_METRICS is set)',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { SpanKind, SpanStatusCode } from '@opentelemetry/api';
import { ExportResultCode } from '@opentelemetry/core';
import type { ReadableSpan, SpanExporter } from '@opentelemetry/sdk-trace-node';

import { MonitoredBatchSpanProcessor } from './monitored_batch_span_processor.js';

class CollectingSpanExporter implements SpanExporter {
public readonly spans: ReadableSpan[] = [];

export(spans: ReadableSpan[], resultCallback: Parameters<SpanExporter['export']>[1]): void {
this.spans.push(...spans);
resultCallback({ code: ExportResultCode.SUCCESS });
}

shutdown(): Promise<void> {
return Promise.resolve();
}
}

const log = { warn: () => {} } as any;

function makeSpan(durationMs: number, statusCode = SpanStatusCode.OK): ReadableSpan {
const seconds = Math.floor(durationMs / 1000);
const nanos = (durationMs - seconds * 1000) * 1_000_000;
return {
attributes: {},
droppedAttributesCount: 0,
droppedEventsCount: 0,
droppedLinksCount: 0,
duration: [seconds, nanos],
ended: true,
endTime: [seconds, nanos],
events: [],
instrumentationLibrary: {} as any,
kind: SpanKind.INTERNAL,
links: [],
name: `span-${durationMs}`,
resource: {} as any,
spanContext: () => ({ spanId: '0'.repeat(16), traceFlags: 1, traceId: '0'.repeat(32) }),
startTime: [0, 0],
status: { code: statusCode },
};
}

describe('MonitoredBatchSpanProcessor', () => {
it('does not export successful spans shorter than the configured duration', async () => {
const exporter = new CollectingSpanExporter();
const processor = new MonitoredBatchSpanProcessor(exporter, log, { minTraceDurationMs: 10 });

processor.onEnd(makeSpan(9));
processor.onEnd(makeSpan(10));
await processor.forceFlush();

expect(exporter.spans.map(span => span.name)).toEqual(['span-10']);
});

it('exports short error spans', async () => {
const exporter = new CollectingSpanExporter();
const processor = new MonitoredBatchSpanProcessor(exporter, log, { minTraceDurationMs: 10 });

processor.onEnd(makeSpan(1, SpanStatusCode.ERROR));
await processor.forceFlush();

expect(exporter.spans.map(span => span.name)).toEqual(['span-1']);
});

it('allows short successful spans when the minimum duration is disabled', async () => {
const exporter = new CollectingSpanExporter();
const processor = new MonitoredBatchSpanProcessor(exporter, log, { minTraceDurationMs: 0 });

processor.onEnd(makeSpan(1));
await processor.forceFlush();

expect(exporter.spans.map(span => span.name)).toEqual(['span-1']);
});
});
Original file line number Diff line number Diff line change
@@ -1,30 +1,39 @@
import type { Logger } from '@aztec/foundation/log';

import type { Context } from '@opentelemetry/api';
import { type Context, SpanStatusCode } from '@opentelemetry/api';
import { hrTimeToMilliseconds } from '@opentelemetry/core';
import type { SpanExporter } from '@opentelemetry/sdk-trace-base';
import { BatchSpanProcessor, type BufferConfig, type ReadableSpan, type Span } from '@opentelemetry/sdk-trace-node';

/** Minimum interval between drop warnings to avoid log spam. */
const DROP_WARNING_INTERVAL_MS = 30_000;

const DEFAULT_MIN_TRACE_DURATION_MS = 10;

export type MonitoredBatchSpanProcessorConfig = BufferConfig & {
minTraceDurationMs?: number;
};

/**
* Wraps BatchSpanProcessor to emit warnings when spans are dropped due to a full queue.
* The standard BatchSpanProcessor silently discards spans when its internal queue reaches
* maxQueueSize, making telemetry data loss invisible to operators.
*/
export class MonitoredBatchSpanProcessor extends BatchSpanProcessor {
private readonly maxQueueSize: number;
private readonly minTraceDurationMs: number;
private readonly log: Logger;

private approxQueueSize = 0;
private droppedSinceLastWarning = 0;
private totalDropped = 0;
private lastWarningTime = 0;

constructor(exporter: SpanExporter, log: Logger, config?: BufferConfig) {
constructor(exporter: SpanExporter, log: Logger, config?: MonitoredBatchSpanProcessorConfig) {
const maxQueueSize = config?.maxQueueSize ?? 2048;
super(exporter, { ...config, maxQueueSize });
this.maxQueueSize = maxQueueSize;
this.minTraceDurationMs = Math.max(0, config?.minTraceDurationMs ?? DEFAULT_MIN_TRACE_DURATION_MS);
this.log = log;
}

Expand All @@ -33,6 +42,10 @@ export class MonitoredBatchSpanProcessor extends BatchSpanProcessor {
}

override onEnd(span: ReadableSpan): void {
if (this.shouldDropShortSpan(span)) {
return;
}

if (this.approxQueueSize >= this.maxQueueSize) {
this.droppedSinceLastWarning++;
this.totalDropped++;
Expand All @@ -57,6 +70,14 @@ export class MonitoredBatchSpanProcessor extends BatchSpanProcessor {
await super.shutdown();
}

private shouldDropShortSpan(span: ReadableSpan): boolean {
return (
this.minTraceDurationMs > 0 &&
span.status.code !== SpanStatusCode.ERROR &&
hrTimeToMilliseconds(span.duration) < this.minTraceDurationMs
);
}

private maybeLogDropWarning(): void {
const now = Date.now();
if (now - this.lastWarningTime >= DROP_WARNING_INTERVAL_MS) {
Expand Down
7 changes: 6 additions & 1 deletion yarn-project/telemetry-client/src/otel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,12 @@ export class OpenTelemetryClient implements TelemetryClient {
const tracerProvider = new NodeTracerProvider({
resource,
spanProcessors: config.tracesCollectorUrl
? [new MonitoredBatchSpanProcessor(new OTLPTraceExporter({ url: config.tracesCollectorUrl.href }), log)]
? [
new MonitoredBatchSpanProcessor(new OTLPTraceExporter({ url: config.tracesCollectorUrl.href }), log, {
maxQueueSize: config.otelBspMaxQueueSize,
minTraceDurationMs: config.otelMinTraceDurationMs,
}),
]
: [],
});

Expand Down
7 changes: 6 additions & 1 deletion yarn-project/telemetry-client/src/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ export async function initTelemetryClient(
return telemetry;
}

if (config.metricsCollectorUrl || config.publicMetricsCollectorUrl) {
if (
config.metricsCollectorUrl ||
config.publicMetricsCollectorUrl ||
config.tracesCollectorUrl ||
config.logsCollectorUrl
) {
log.info(`Using OpenTelemetry client with custom collector`);
// Lazy load OpenTelemetry to avoid loading heavy deps at startup
const { OpenTelemetryClient } = await import('./otel.js');
Expand Down
Loading