Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -778,4 +778,11 @@ public enum ENDPOINT_TIMEOUT_TYPE { ENDPOINT_TIMEOUT, GLOBAL_TIMEOUT, HTTP_CONNE
*/
public static final String ELASTICSEARCH_CUSTOM_DATA_PROVIDER_CLASS
= "analytics.custom_data_provider_class";

/**
* Message context property holding the stack of span wrapper IDs that tracks the chain of currently open
* tracing spans, used to resolve the parent of the next span.
*/
public static final String PARENT_STACK_PROPERTY = "synapse.parent.stack";

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,18 @@ public void init() {
}

sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
.addSpanProcessor(BatchSpanProcessor.builder(spanExporter)
.setMaxQueueSize(getIntProperty(TelemetryConstants.OPENTELEMETRY_BSP_MAX_QUEUE_SIZE,
TelemetryConstants.OPENTELEMETRY_BSP_DEFAULT_MAX_QUEUE_SIZE))
.setMaxExportBatchSize(getIntProperty(TelemetryConstants.OPENTELEMETRY_BSP_MAX_EXPORT_BATCH_SIZE,
TelemetryConstants.OPENTELEMETRY_BSP_DEFAULT_MAX_EXPORT_BATCH_SIZE))
Comment thread
KalinduGandara marked this conversation as resolved.
.setScheduleDelay(Duration.ofMillis(getIntProperty(
TelemetryConstants.OPENTELEMETRY_BSP_SCHEDULE_DELAY_MILLIS,
TelemetryConstants.OPENTELEMETRY_BSP_DEFAULT_SCHEDULE_DELAY_MILLIS)))
.setExporterTimeout(Duration.ofMillis(getIntProperty(
TelemetryConstants.OPENTELEMETRY_BSP_EXPORT_TIMEOUT_MILLIS,
TelemetryConstants.OPENTELEMETRY_BSP_DEFAULT_EXPORT_TIMEOUT_MILLIS)))
.build())
.setResource(Resource.getDefault().merge(TelemetryUtil.getTracerProviderResource(TelemetryConstants.SERVICE_NAME)))
.build();

Expand Down Expand Up @@ -243,4 +254,17 @@ private int getMetricIntervalSeconds() {
}
return metricIntervalSeconds;
}


private int getIntProperty(String propertyName, String defaultValue) {
String value = SynapsePropertiesLoader.getPropertyValue(propertyName, defaultValue);
try {
return Integer.parseInt(value.trim());
} catch (NumberFormatException e) {
logger.warn("Invalid integer for " + propertyName + ": '" + value
+ "'. Using default value: " + defaultValue);
return Integer.parseInt(defaultValue);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2026, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management;

import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;

import java.util.Stack;

/**
* Maintains a per-message stack of span wrapper IDs, tracking the chain of currently open spans so that the most
* recently started span can be resolved as the parent of the next one.
* <p>
* The stack is stored as a message context property ({@link SynapseConstants#PARENT_STACK_PROPERTY}) and is therefore
* carried along the message flow, and cloned when the message context is cloned.
*/
public final class ParentSpanWrapperStackManager {

private ParentSpanWrapperStackManager() {}

/**
* Pushes the given span wrapper ID onto the parent stack of the message, creating the stack if it does not
* exist yet.
*
* @param spanWrapperId ID of the span wrapper that has just been started.
* @param synCtx Message context that owns the stack.
*/
public static void push(String spanWrapperId, MessageContext synCtx) {
if (spanWrapperId == null || synCtx == null) {
return;
}
Stack<String> parentStack = getStack(synCtx);
if (parentStack == null) {
parentStack = new Stack<>();
synCtx.setProperty(SynapseConstants.PARENT_STACK_PROPERTY, parentStack);
}
parentStack.push(spanWrapperId);
}

/**
* Pops the topmost span wrapper ID from the parent stack of the message, if a non-empty stack exists.
*
* @param synCtx Message context that owns the stack.
*/
public static void pop(MessageContext synCtx) {
if (synCtx == null) {
return;
}
Stack<String> parentStack = getStack(synCtx);
if (parentStack != null && !parentStack.isEmpty()) {
parentStack.pop();
}
}

/**
* Returns the span wrapper ID at the top of the parent stack without removing it.
*
* @param synCtx Message context that owns the stack.
* @return The current parent span wrapper ID, or {@code null} if there is no parent.
*/
public static String peekParentSpanWrapperId(MessageContext synCtx) {
if (synCtx == null) {
return null;
}
Stack<String> parentStack = getStack(synCtx);
if (parentStack != null && !parentStack.isEmpty()) {
return parentStack.peek();
}
return null;
}

/**
* Creates a copy of the given parent stack. Used when a message context is cloned, so that each branch
* maintains its own independent stack.
*
* @param parentStack Parent stack to copy.
* @return A new stack containing the same span wrapper IDs.
*/
public static Stack<String> copyOf(Stack<String> parentStack) {
Stack<String> clone = new Stack<>();
if (parentStack != null){
clone.addAll(parentStack);
}
return clone;
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

@SuppressWarnings("unchecked")
private static Stack<String> getStack(MessageContext synCtx) {
Object parentStackObj = synCtx.getProperty(SynapseConstants.PARENT_STACK_PROPERTY);
if (parentStackObj instanceof Stack) {
return (Stack<String>) parentStackObj;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,31 @@ public class TelemetryConstants {
public static final String OPENTELEMETRY_METRIC_PUSH_INTERVAL_SECONDS = "opentelemetry.metric.push.interval.seconds";

public static final String OPENTELEMETRY_METRIC_DEFAULT_PUSH_INTERVAL_SECONDS = "60";

/**
* Maximum number of spans kept in the queue before being dropped.
*/
public static final String OPENTELEMETRY_BSP_MAX_QUEUE_SIZE = "opentelemetry.bsp.max.queue.size";
public static final String OPENTELEMETRY_BSP_DEFAULT_MAX_QUEUE_SIZE = "2048";

/**
* Maximum number of spans exported in a single batch.
*/
public static final String OPENTELEMETRY_BSP_MAX_EXPORT_BATCH_SIZE = "opentelemetry.bsp.max.export.batch.size";
public static final String OPENTELEMETRY_BSP_DEFAULT_MAX_EXPORT_BATCH_SIZE = "512";

/**
* Delay interval (in milliseconds) between two consecutive span exports.
*/
public static final String OPENTELEMETRY_BSP_SCHEDULE_DELAY_MILLIS = "opentelemetry.bsp.schedule.delay.millis";
public static final String OPENTELEMETRY_BSP_DEFAULT_SCHEDULE_DELAY_MILLIS = "5000";

/**
* Maximum time (in milliseconds) the exporter is allowed to run before being cancelled.
*/
public static final String OPENTELEMETRY_BSP_EXPORT_TIMEOUT_MILLIS = "opentelemetry.bsp.export.timeout.millis";
public static final String OPENTELEMETRY_BSP_DEFAULT_EXPORT_TIMEOUT_MILLIS = "30000";

/**
* HTTP protocol constant.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.parentresolving;

import org.apache.synapse.MessageContext;
import org.apache.synapse.aspects.flow.statistics.data.raw.StatisticDataUnit;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.ParentSpanWrapperStackManager;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.stores.SpanStore;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.models.SpanWrapper;

Expand All @@ -33,7 +35,14 @@ public class LatestActiveParentResolver extends AbstractParentResolver {
* @param spanStore The span store object.
* @return Resolved parent span wrapper.
*/
public static SpanWrapper resolveParent(SpanStore spanStore) {
public static SpanWrapper resolveParent(SpanStore spanStore, MessageContext synCtx) {
String parentSpanWrapperId = ParentSpanWrapperStackManager.peekParentSpanWrapperId(synCtx);
if (parentSpanWrapperId != null) {
SpanWrapper parentSpan = spanStore.getSpanWrapper(parentSpanWrapperId);
if (parentSpan != null) {
return parentSpan;
}
}
return resolveLatestActiveSpanWrapper(spanStore);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ public static SpanWrapper resolveParent(StatisticDataUnit child,
}

// Resolve based on latest active span
return LatestActiveParentResolver.resolveParent(spanStore);
return LatestActiveParentResolver.resolveParent(spanStore, synCtx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.aspects.flow.statistics.data.raw.StatisticDataUnit;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.ParentSpanWrapperStackManager;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.helpers.SpanTagger;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.helpers.TracingUtils;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.models.ContinuationStateSequenceInfo;
Expand Down Expand Up @@ -114,6 +115,7 @@ public SpanWrapper addSpanWrapper(String spanId,
componentUniqueIdWiseSpanWrappers.put(statisticDataUnit.getComponentId(), spanWrapper);
}
activeSpanWrappers.add(spanWrapper);
ParentSpanWrapperStackManager.push(spanId, synCtx);
return spanWrapper;
}

Expand Down Expand Up @@ -197,6 +199,7 @@ public void finishSpan(SpanWrapper spanWrapper, MessageContext synCtx, boolean i
}
spanWrapper.getSpan().end();
activeSpanWrappers.remove(spanWrapper);
ParentSpanWrapperStackManager.pop(synCtx);
}
Comment thread
KalinduGandara marked this conversation as resolved.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.ParentSpanWrapperStackManager;
import org.apache.synapse.commons.json.JsonUtil;
import org.apache.synapse.commons.CorrelationConstants;
import org.apache.synapse.continuation.ContinuationStackManager;
Expand Down Expand Up @@ -193,7 +194,10 @@ public static MessageContext cloneMessageContext(MessageContext synCtx, boolean
obj = (OMElement) ((OMElement) obj).cloneOMElement();
} else if (obj instanceof ResponseState) {
// do nothing and let the same reference to go to the cloned context
} else{
} else if (obj instanceof Stack
&& strkey.equals(SynapseConstants.PARENT_STACK_PROPERTY)) {
obj = ParentSpanWrapperStackManager.copyOf((Stack<String>) obj);
} else {
/**
* Need to add conditions according to type if found in
* future
Expand Down
Loading