From 9bd8c14e1973a5f5b1aa9eb5a031df12512a59e0 Mon Sep 17 00:00:00 2001 From: KalinduGandara Date: Fri, 15 May 2026 12:14:35 +0530 Subject: [PATCH] Improve Parent Resolving in Opentelemetry tracing adding support for configure Batch Span Processor (BSP) --- .../org/apache/synapse/SynapseConstants.java | 7 ++ .../management/OTLPTelemetryManager.java | 26 +++- .../ParentSpanWrapperStackManager.java | 111 ++++++++++++++++++ .../management/TelemetryConstants.java | 25 ++++ .../LatestActiveParentResolver.java | 11 +- .../parentresolving/ParentResolver.java | 2 +- .../opentelemetry/stores/SpanStore.java | 3 + .../apache/synapse/util/MessageHelper.java | 6 +- 8 files changed, 187 insertions(+), 4 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/ParentSpanWrapperStackManager.java diff --git a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java index 81a7b0683f..6a6d68b854 100644 --- a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java @@ -778,4 +778,11 @@ public enum ENDPOINT_TIMEOUT_TYPE { ENDPOINT_TIMEOUT, GLOBAL_TIMEOUT, HTTP_CONNE */ public static final String ELASTICSEARCH_CUSTOM_DATA_PROVIDER_CLASS = "analytics.custom_data_provider_class"; + + /** + * Message context property holding the stack of span wrapper IDs that tracks the chain of currently open + * tracing spans, used to resolve the parent of the next span. + */ + public static final String PARENT_STACK_PROPERTY = "synapse.parent.stack"; + } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/OTLPTelemetryManager.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/OTLPTelemetryManager.java index 9a0b2ef7e1..71fb202b5e 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/OTLPTelemetryManager.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/OTLPTelemetryManager.java @@ -138,7 +138,18 @@ public void init() { } sdkTracerProvider = SdkTracerProvider.builder() - .addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build()) + .addSpanProcessor(BatchSpanProcessor.builder(spanExporter) + .setMaxQueueSize(getIntProperty(TelemetryConstants.OPENTELEMETRY_BSP_MAX_QUEUE_SIZE, + TelemetryConstants.OPENTELEMETRY_BSP_DEFAULT_MAX_QUEUE_SIZE)) + .setMaxExportBatchSize(getIntProperty(TelemetryConstants.OPENTELEMETRY_BSP_MAX_EXPORT_BATCH_SIZE, + TelemetryConstants.OPENTELEMETRY_BSP_DEFAULT_MAX_EXPORT_BATCH_SIZE)) + .setScheduleDelay(Duration.ofMillis(getIntProperty( + TelemetryConstants.OPENTELEMETRY_BSP_SCHEDULE_DELAY_MILLIS, + TelemetryConstants.OPENTELEMETRY_BSP_DEFAULT_SCHEDULE_DELAY_MILLIS))) + .setExporterTimeout(Duration.ofMillis(getIntProperty( + TelemetryConstants.OPENTELEMETRY_BSP_EXPORT_TIMEOUT_MILLIS, + TelemetryConstants.OPENTELEMETRY_BSP_DEFAULT_EXPORT_TIMEOUT_MILLIS))) + .build()) .setResource(Resource.getDefault().merge(TelemetryUtil.getTracerProviderResource(TelemetryConstants.SERVICE_NAME))) .build(); @@ -243,4 +254,17 @@ private int getMetricIntervalSeconds() { } return metricIntervalSeconds; } + + + private int getIntProperty(String propertyName, String defaultValue) { + String value = SynapsePropertiesLoader.getPropertyValue(propertyName, defaultValue); + try { + return Integer.parseInt(value.trim()); + } catch (NumberFormatException e) { + logger.warn("Invalid integer for " + propertyName + ": '" + value + + "'. Using default value: " + defaultValue); + return Integer.parseInt(defaultValue); + } + } + } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/ParentSpanWrapperStackManager.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/ParentSpanWrapperStackManager.java new file mode 100644 index 0000000000..ed088fa130 --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/ParentSpanWrapperStackManager.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2026, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management; + +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseConstants; + +import java.util.Stack; + +/** + * Maintains a per-message stack of span wrapper IDs, tracking the chain of currently open spans so that the most + * recently started span can be resolved as the parent of the next one. + *

+ * The stack is stored as a message context property ({@link SynapseConstants#PARENT_STACK_PROPERTY}) and is therefore + * carried along the message flow, and cloned when the message context is cloned. + */ +public final class ParentSpanWrapperStackManager { + + private ParentSpanWrapperStackManager() {} + + /** + * Pushes the given span wrapper ID onto the parent stack of the message, creating the stack if it does not + * exist yet. + * + * @param spanWrapperId ID of the span wrapper that has just been started. + * @param synCtx Message context that owns the stack. + */ + public static void push(String spanWrapperId, MessageContext synCtx) { + if (spanWrapperId == null || synCtx == null) { + return; + } + Stack parentStack = getStack(synCtx); + if (parentStack == null) { + parentStack = new Stack<>(); + synCtx.setProperty(SynapseConstants.PARENT_STACK_PROPERTY, parentStack); + } + parentStack.push(spanWrapperId); + } + + /** + * Pops the topmost span wrapper ID from the parent stack of the message, if a non-empty stack exists. + * + * @param synCtx Message context that owns the stack. + */ + public static void pop(MessageContext synCtx) { + if (synCtx == null) { + return; + } + Stack parentStack = getStack(synCtx); + if (parentStack != null && !parentStack.isEmpty()) { + parentStack.pop(); + } + } + + /** + * Returns the span wrapper ID at the top of the parent stack without removing it. + * + * @param synCtx Message context that owns the stack. + * @return The current parent span wrapper ID, or {@code null} if there is no parent. + */ + public static String peekParentSpanWrapperId(MessageContext synCtx) { + if (synCtx == null) { + return null; + } + Stack parentStack = getStack(synCtx); + if (parentStack != null && !parentStack.isEmpty()) { + return parentStack.peek(); + } + return null; + } + + /** + * Creates a copy of the given parent stack. Used when a message context is cloned, so that each branch + * maintains its own independent stack. + * + * @param parentStack Parent stack to copy. + * @return A new stack containing the same span wrapper IDs. + */ + public static Stack copyOf(Stack parentStack) { + Stack clone = new Stack<>(); + if (parentStack != null){ + clone.addAll(parentStack); + } + return clone; + } + + @SuppressWarnings("unchecked") + private static Stack getStack(MessageContext synCtx) { + Object parentStackObj = synCtx.getProperty(SynapseConstants.PARENT_STACK_PROPERTY); + if (parentStackObj instanceof Stack) { + return (Stack) parentStackObj; + } + return null; + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/TelemetryConstants.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/TelemetryConstants.java index 7525bee2ac..143a1f7682 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/TelemetryConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/TelemetryConstants.java @@ -57,6 +57,31 @@ public class TelemetryConstants { public static final String OPENTELEMETRY_METRIC_PUSH_INTERVAL_SECONDS = "opentelemetry.metric.push.interval.seconds"; public static final String OPENTELEMETRY_METRIC_DEFAULT_PUSH_INTERVAL_SECONDS = "60"; + + /** + * Maximum number of spans kept in the queue before being dropped. + */ + public static final String OPENTELEMETRY_BSP_MAX_QUEUE_SIZE = "opentelemetry.bsp.max.queue.size"; + public static final String OPENTELEMETRY_BSP_DEFAULT_MAX_QUEUE_SIZE = "2048"; + + /** + * Maximum number of spans exported in a single batch. + */ + public static final String OPENTELEMETRY_BSP_MAX_EXPORT_BATCH_SIZE = "opentelemetry.bsp.max.export.batch.size"; + public static final String OPENTELEMETRY_BSP_DEFAULT_MAX_EXPORT_BATCH_SIZE = "512"; + + /** + * Delay interval (in milliseconds) between two consecutive span exports. + */ + public static final String OPENTELEMETRY_BSP_SCHEDULE_DELAY_MILLIS = "opentelemetry.bsp.schedule.delay.millis"; + public static final String OPENTELEMETRY_BSP_DEFAULT_SCHEDULE_DELAY_MILLIS = "5000"; + + /** + * Maximum time (in milliseconds) the exporter is allowed to run before being cancelled. + */ + public static final String OPENTELEMETRY_BSP_EXPORT_TIMEOUT_MILLIS = "opentelemetry.bsp.export.timeout.millis"; + public static final String OPENTELEMETRY_BSP_DEFAULT_EXPORT_TIMEOUT_MILLIS = "30000"; + /** * HTTP protocol constant. */ diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/LatestActiveParentResolver.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/LatestActiveParentResolver.java index 4435de1bb2..66e86b175f 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/LatestActiveParentResolver.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/LatestActiveParentResolver.java @@ -18,7 +18,9 @@ package org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.parentresolving; +import org.apache.synapse.MessageContext; import org.apache.synapse.aspects.flow.statistics.data.raw.StatisticDataUnit; +import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.ParentSpanWrapperStackManager; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.stores.SpanStore; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.models.SpanWrapper; @@ -33,7 +35,14 @@ public class LatestActiveParentResolver extends AbstractParentResolver { * @param spanStore The span store object. * @return Resolved parent span wrapper. */ - public static SpanWrapper resolveParent(SpanStore spanStore) { + public static SpanWrapper resolveParent(SpanStore spanStore, MessageContext synCtx) { + String parentSpanWrapperId = ParentSpanWrapperStackManager.peekParentSpanWrapperId(synCtx); + if (parentSpanWrapperId != null) { + SpanWrapper parentSpan = spanStore.getSpanWrapper(parentSpanWrapperId); + if (parentSpan != null) { + return parentSpan; + } + } return resolveLatestActiveSpanWrapper(spanStore); } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/ParentResolver.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/ParentResolver.java index 32a69f33f4..33eeffa013 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/ParentResolver.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/ParentResolver.java @@ -66,6 +66,6 @@ public static SpanWrapper resolveParent(StatisticDataUnit child, } // Resolve based on latest active span - return LatestActiveParentResolver.resolveParent(spanStore); + return LatestActiveParentResolver.resolveParent(spanStore, synCtx); } } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/stores/SpanStore.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/stores/SpanStore.java index 794eba06f5..bb9f8168a2 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/stores/SpanStore.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/stores/SpanStore.java @@ -24,6 +24,7 @@ import org.apache.synapse.MessageContext; import org.apache.synapse.SynapseConstants; import org.apache.synapse.aspects.flow.statistics.data.raw.StatisticDataUnit; +import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.ParentSpanWrapperStackManager; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.helpers.SpanTagger; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.helpers.TracingUtils; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.models.ContinuationStateSequenceInfo; @@ -114,6 +115,7 @@ public SpanWrapper addSpanWrapper(String spanId, componentUniqueIdWiseSpanWrappers.put(statisticDataUnit.getComponentId(), spanWrapper); } activeSpanWrappers.add(spanWrapper); + ParentSpanWrapperStackManager.push(spanId, synCtx); return spanWrapper; } @@ -197,6 +199,7 @@ public void finishSpan(SpanWrapper spanWrapper, MessageContext synCtx, boolean i } spanWrapper.getSpan().end(); activeSpanWrappers.remove(spanWrapper); + ParentSpanWrapperStackManager.pop(synCtx); } } diff --git a/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java b/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java index 7eaaa4d5ad..2a5676000c 100644 --- a/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java +++ b/modules/core/src/main/java/org/apache/synapse/util/MessageHelper.java @@ -35,6 +35,7 @@ import org.apache.synapse.MessageContext; import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; +import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.ParentSpanWrapperStackManager; import org.apache.synapse.commons.json.JsonUtil; import org.apache.synapse.commons.CorrelationConstants; import org.apache.synapse.continuation.ContinuationStackManager; @@ -193,7 +194,10 @@ public static MessageContext cloneMessageContext(MessageContext synCtx, boolean obj = (OMElement) ((OMElement) obj).cloneOMElement(); } else if (obj instanceof ResponseState) { // do nothing and let the same reference to go to the cloned context - } else{ + } else if (obj instanceof Stack + && strkey.equals(SynapseConstants.PARENT_STACK_PROPERTY)) { + obj = ParentSpanWrapperStackManager.copyOf((Stack) obj); + } else { /** * Need to add conditions according to type if found in * future