Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ public async Task Invoke(IRecoverabilityContext context, Func<IRecoverabilityCon
case MoveToError moveToErrorAction:
{
var failedMessage = new FailedMessage(
context.FailedMessage.MessageId,
new Dictionary<string, string>(context.FailedMessage.Headers),
context.FailedMessage.Body,
context.MessageId,
new Dictionary<string, string>(context.Headers),
context.Body,
context.Exception,
moveToErrorAction.ErrorQueue);

Expand All @@ -45,6 +45,6 @@ public async Task Invoke(IRecoverabilityContext context, Func<IRecoverabilityCon

return;

void MarkMessageAsCompleted() => scenarioContext.UnfinishedFailedMessages.AddOrUpdate(context.FailedMessage.MessageId, _ => false, static (_, _) => false);
void MarkMessageAsCompleted() => scenarioContext.UnfinishedFailedMessages.AddOrUpdate(context.MessageId, _ => false, static (_, _) => false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@
(IInvokeHandlerContext context10) => InvokeHandlerTerminator.Invoke(context10)))))))))))

(IRecoverabilityContext context0) => CaptureRecoverabilityActionBehavior.Invoke(context0,
(IRecoverabilityContext context1) => RecoverabilityRoutingConnector.Invoke(context1,
(IRoutingContext context2) => ThrowIfCannotDeferMessageBehavior.Invoke(context2,
(IRoutingContext context3) => AttachSenderRelatedInfoOnMessageBehavior.Invoke(context3,
(IRoutingContext context4) => RoutingToDispatchConnector.Invoke(context4,
(IDispatchContext context5) => ImmediateDispatchTerminator.Invoke(context5))))))
(IRecoverabilityContext context1) => ForceNewParentWhenNecessaryDuringRecoverabilityBehavior.Invoke(context1,
(IRecoverabilityContext context2) => RecoverabilityRoutingConnector.Invoke(context2,
(IRoutingContext context3) => ThrowIfCannotDeferMessageBehavior.Invoke(context3,
(IRoutingContext context4) => AttachSenderRelatedInfoOnMessageBehavior.Invoke(context4,
(IRoutingContext context5) => RoutingToDispatchConnector.Invoke(context5,
(IDispatchContext context6) => ImmediateDispatchTerminator.Invoke(context6)))))))
Original file line number Diff line number Diff line change
@@ -1,62 +1,65 @@
namespace NServiceBus.AcceptanceTests.Core.OpenTelemetry.Traces;

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting.Support;
using EndpointTemplates;
using NServiceBus;
using NServiceBus.AcceptanceTesting;
using NServiceBus.Pipeline;
using NUnit.Framework;
using Conventions = AcceptanceTesting.Customization.Conventions;

public class When_incoming_message_moved_to_error_queue : OpenTelemetryAcceptanceTest
{
[Test]
public void Should_add_start_new_trace_header()
public async Task Should_add_start_new_trace_header()
{
var exception = Assert.CatchAsync<MessageFailedException>(async () =>
{
await Scenario.Define<Context>()
.WithEndpoint<FailingEndpoint>(e => e
.When(s => s.SendLocal(new FailingMessage())))
.Run();
});

var failedMessage = exception.FailedMessage;
using (Assert.EnterMultipleScope())
{
Assert.That(failedMessage.Headers.ContainsKey(Headers.StartNewTrace), Is.True);
Assert.That(failedMessage.Headers[Headers.StartNewTrace], Is.EqualTo(bool.TrueString));
}
var context = await Scenario.Define<Context>()
.WithEndpoint<FailingEndpoint>(e => e
.When(s => s.SendLocal(new FailingMessage()))
.DoNotFailOnErrorMessages())
.WithEndpoint<ErrorSpy>()
.Run();

Assert.That(context.ErrorMessageHeaders[Headers.StartNewTrace], Is.EqualTo(bool.TrueString));
}

public class Context : ScenarioContext
{
public bool HandlerInvoked { get; set; }
public Dictionary<string, string> ErrorMessageHeaders { get; set; }
}

public class FailingEndpoint : EndpointConfigurationBuilder
{
public FailingEndpoint() => EndpointSetup<DefaultServer>();
static readonly string ErrorQueueAddress = Conventions.EndpointNamingConvention(typeof(ErrorSpy));

public FailingEndpoint() => EndpointSetup<DefaultServer>(c => c.SendFailedMessagesTo(ErrorQueueAddress));

[Handler]
public class FailingMessageHandler : IHandleMessages<FailingMessage>
public class FailingMessageHandler() : IHandleMessages<FailingMessage>
{
public Task Handle(FailingMessage message, IMessageHandlerContext context) => throw new SimulatedException(ErrorMessage);
}
}

Context textContext;

public FailingMessageHandler(Context textContext) => this.textContext = textContext;
class ErrorSpy : EndpointConfigurationBuilder
{
public ErrorSpy() => EndpointSetup<DefaultServer>(c => c.Pipeline.Register<ErrorMessageDetector>("Detect incoming error messages"));

public Task Handle(FailingMessage message, IMessageHandlerContext context)
class ErrorMessageDetector(Context testContext) : Behavior<ITransportReceiveContext>
{
public override Task Invoke(ITransportReceiveContext context, Func<Task> next)
{
textContext.HandlerInvoked = true;
throw new SimulatedException(ErrorMessage);
testContext.ErrorMessageHeaders = context.Message.Headers;
testContext.MarkAsCompleted();
return next();
}
}
}

public class FailingMessage : IMessage
{
}
public class FailingMessage : IMessage;

const string ErrorMessage = "oh no!";
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ public void Should_provide_error_context_to_policy()
Assert.That(context.ErrorContexts, Has.Count.EqualTo(2), "because the custom policy should have been invoked twice");
using (Assert.EnterMultipleScope())
{
Assert.That(context.ErrorContexts[0].Message, Is.Not.Null);
Assert.That(context.ErrorContexts[0].MessageId, Is.Not.Null);
Assert.That(context.ErrorContexts[0].NativeMessageId, Is.Not.Null);
Assert.That(context.ErrorContexts[0].Headers, Is.Not.Empty);
Assert.That(context.ErrorContexts[0].ReceiveProperties, Is.Not.Null);
Assert.That(context.ErrorContexts[0].Exception, Is.TypeOf<SimulatedException>());
Assert.That(context.ErrorContexts[0].DelayedDeliveriesPerformed, Is.EqualTo(0));
Assert.That(context.ErrorContexts[1].Message, Is.Not.Null);
Assert.That(context.ErrorContexts[0].DelayedDeliveriesPerformed, Is.Zero);
Assert.That(context.ErrorContexts[1].MessageId, Is.Not.Null);
Assert.That(context.ErrorContexts[1].NativeMessageId, Is.Not.Null);
Assert.That(context.ErrorContexts[1].Headers, Is.Not.Empty);
Assert.That(context.ErrorContexts[1].ReceiveProperties, Is.Not.Null);
Assert.That(context.ErrorContexts[1].Exception, Is.TypeOf<SimulatedException>());
Assert.That(context.ErrorContexts[1].DelayedDeliveriesPerformed, Is.EqualTo(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1933,21 +1933,37 @@ namespace NServiceBus.Pipeline
}
public interface IRecoverabilityActionContext : NServiceBus.Extensibility.IExtendable, NServiceBus.ICancellableContext, NServiceBus.Pipeline.IBehaviorContext
{
System.ReadOnlyMemory<byte> Body { get; }
int DelayedDeliveriesPerformed { get; }
System.Exception Exception { get; }
[System.Obsolete("For access to the message body, headers, native message ID, or the receive proper" +
"ties use the corresponding properties directly exposed on the context. Will be t" +
"reated as an error from version 11.0.0. Will be removed in version 12.0.0.", false)]
NServiceBus.Transport.IncomingMessage FailedMessage { get; }
System.Collections.Generic.Dictionary<string, string> Headers { get; }
int ImmediateProcessingFailures { get; }
string MessageId { get; }
System.Collections.Generic.IReadOnlyDictionary<string, string> Metadata { get; }
string NativeMessageId { get; }
string ReceiveAddress { get; }
NServiceBus.Transport.ReceiveProperties ReceiveProperties { get; }
}
public interface IRecoverabilityContext : NServiceBus.Extensibility.IExtendable, NServiceBus.ICancellableContext, NServiceBus.Pipeline.IBehaviorContext
{
System.ReadOnlyMemory<byte> Body { get; }
int DelayedDeliveriesPerformed { get; }
System.Exception Exception { get; }
[System.Obsolete("For access to the message body, headers, native message ID, or the receive proper" +
"ties use the corresponding properties directly exposed on the context. Will be t" +
"reated as an error from version 11.0.0. Will be removed in version 12.0.0.", false)]
NServiceBus.Transport.IncomingMessage FailedMessage { get; }
System.Collections.Generic.Dictionary<string, string> Headers { get; }
int ImmediateProcessingFailures { get; }
string MessageId { get; }
System.Collections.Generic.Dictionary<string, string> Metadata { get; }
string NativeMessageId { get; }
string ReceiveAddress { get; }
NServiceBus.Transport.ReceiveProperties ReceiveProperties { get; }
NServiceBus.RecoverabilityAction RecoverabilityAction { get; set; }
NServiceBus.RecoverabilityConfig RecoverabilityConfiguration { get; }
NServiceBus.Pipeline.IRecoverabilityActionContext PreventChanges();
Expand Down Expand Up @@ -2420,12 +2436,20 @@ namespace NServiceBus.Transport
{
public ErrorContext(System.Exception exception, System.Collections.Generic.Dictionary<string, string> headers, string nativeMessageId, System.ReadOnlyMemory<byte> body, NServiceBus.Transport.TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { }
public ErrorContext(System.Exception exception, System.Collections.Generic.Dictionary<string, string> headers, string nativeMessageId, System.ReadOnlyMemory<byte> body, NServiceBus.Transport.ReceiveProperties receiveProperties, NServiceBus.Transport.TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { }
public System.ReadOnlyMemory<byte> Body { get; }
public int DelayedDeliveriesPerformed { get; }
public System.Exception Exception { get; }
public NServiceBus.Extensibility.ContextBag Extensions { get; }
public System.Collections.Generic.Dictionary<string, string> Headers { get; }
public int ImmediateProcessingFailures { get; }
[System.Obsolete("For access to the message body, headers, native message ID, or the receive proper" +
"ties use the corresponding properties directly exposed on the context. Will be t" +
"reated as an error from version 11.0.0. Will be removed in version 12.0.0.", false)]
public NServiceBus.Transport.IncomingMessage Message { get; }
public string MessageId { get; }
public string NativeMessageId { get; }
public string ReceiveAddress { get; }
public NServiceBus.Transport.ReceiveProperties ReceiveProperties { get; }
public NServiceBus.Transport.TransportTransaction TransportTransaction { get; }
}
public enum ErrorHandleResult
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
namespace NServiceBus.Core.Tests.OpenTelemetry;

using System;
using System.Threading.Tasks;
using NUnit.Framework;
using Testing;

[TestFixture]
public class ForceNewParentWhenNecessaryDuringRecoverabilityBehaviorTests
{
[Test]
public async Task Should_not_modify_when_trace_not_present()
{
var behavior = new ForceNewParentWhenNecessaryDuringRecoverabilityBehavior();

var context = new TestableRecoverabilityContext();
await behavior.Invoke(context, _ => Task.CompletedTask);

using (Assert.EnterMultipleScope())
{
Assert.That(context.Headers, Does.Not.ContainKey(Headers.StartNewTrace));
Assert.That(context.Metadata, Does.Not.ContainKey(Headers.StartNewTrace));
}
}

[Test]
public async Task Should_add_start_new_trace_header_when_trace_present_during_delayed_delivery()
{
var behavior = new ForceNewParentWhenNecessaryDuringRecoverabilityBehavior();

var context = new TestableRecoverabilityContext
{
Headers = { { Headers.DiagnosticsTraceParent, "traceparent" } },
RecoverabilityAction = new DelayedRetry(TimeSpan.FromSeconds(10))
};

await behavior.Invoke(context, _ => Task.CompletedTask);

using (Assert.EnterMultipleScope())
{
Assert.That(context.Headers, Does.ContainKey(Headers.StartNewTrace));
Assert.That(context.Metadata, Does.ContainKey(Headers.StartNewTrace));
}
}

[Test]
public async Task Should_add_start_new_trace_metadata_when_trace_present_during_move_to_error()
{
var behavior = new ForceNewParentWhenNecessaryDuringRecoverabilityBehavior();

var context = new TestableRecoverabilityContext
{
Headers = { { Headers.DiagnosticsTraceParent, "traceparent" } },
RecoverabilityAction = new MoveToError("errorqueue")
};

await behavior.Invoke(context, _ => Task.CompletedTask);

using (Assert.EnterMultipleScope())
{
Assert.That(context.Headers, Does.Not.ContainKey(Headers.StartNewTrace));
Assert.That(context.Metadata, Does.ContainKey(Headers.StartNewTrace));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@ public void Should_update_retry_headers_when_present()
var now = DateTimeOffset.UtcNow;
var routingContexts = delayedRetryAction.GetRoutingContexts(recoverabilityContext);

var incomingMessage = recoverabilityContext.FailedMessage;

var outgoingMessageHeaders = routingContexts.Single().Message.Headers;

using (Assert.EnterMultipleScope())
{
Assert.That(outgoingMessageHeaders[Headers.DelayedRetries], Is.EqualTo("3"));
Assert.That(incomingMessage.Headers[Headers.DelayedRetries], Is.EqualTo(delayedDeliveriesPerformed.ToString()));
Assert.That(recoverabilityContext.Headers[Headers.DelayedRetries], Is.EqualTo(delayedDeliveriesPerformed.ToString()));
}

var utcDateTime = DateTimeOffsetHelper.ToDateTimeOffset(outgoingMessageHeaders[Headers.DelayedRetriesTimestamp]);
Expand All @@ -62,7 +60,7 @@ public void Should_update_retry_headers_when_present()
using (Assert.EnterMultipleScope())
{
Assert.That(utcDateTime, Is.GreaterThanOrEqualTo(adjustedNow));
Assert.That(incomingMessage.Headers[Headers.DelayedRetriesTimestamp], Is.EqualTo(originalHeadersTimestamp));
Assert.That(recoverabilityContext.Headers[Headers.DelayedRetriesTimestamp], Is.EqualTo(originalHeadersTimestamp));
}
}

Expand All @@ -79,18 +77,18 @@ public void Should_add_retry_headers_when_not_present()
using (Assert.EnterMultipleScope())
{
Assert.That(outgoingMessageHeaders[Headers.DelayedRetries], Is.EqualTo("1"));
Assert.That(recoverabilityContext.FailedMessage.Headers.ContainsKey(Headers.DelayedRetries), Is.False);
Assert.That(recoverabilityContext.Headers.ContainsKey(Headers.DelayedRetries), Is.False);
Assert.That(outgoingMessageHeaders.ContainsKey(Headers.DelayedRetriesTimestamp), Is.True);
Assert.That(recoverabilityContext.FailedMessage.Headers.ContainsKey(Headers.DelayedRetriesTimestamp), Is.False);
Assert.That(recoverabilityContext.Headers.ContainsKey(Headers.DelayedRetriesTimestamp), Is.False);
}
}

static TestableRecoverabilityContext CreateRecoverabilityContext(Dictionary<string, string> headers = null, int delayedDeliveriesPerformed = 0)
{
return new TestableRecoverabilityContext
static TestableRecoverabilityContext CreateRecoverabilityContext(Dictionary<string, string> headers = null, int delayedDeliveriesPerformed = 0) =>
new()
{
FailedMessage = new IncomingMessage("messageId", headers ?? [], ReadOnlyMemory<byte>.Empty),
MessageId = "messageId",
Headers = headers ?? [],
Body = ReadOnlyMemory<byte>.Empty,
DelayedDeliveriesPerformed = delayedDeliveriesPerformed
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public void Should_carry_receive_properties_on_incoming_message()

using (Assert.EnterMultipleScope())
{
Assert.That(errorContext.Message.ReceiveProperties, Is.SameAs(receiveProperties));
Assert.That(errorContext.Message.ReceiveProperties["Native.Key"], Is.EqualTo("Value"));
Assert.That(errorContext.ReceiveProperties, Is.SameAs(receiveProperties));
Assert.That(errorContext.ReceiveProperties["Native.Key"], Is.EqualTo("Value"));
}
}

Expand All @@ -60,6 +60,6 @@ public void Should_default_receive_properties_to_empty_when_not_provided()
context: context
);

Assert.That(errorContext.Message.ReceiveProperties, Is.SameAs(ReceiveProperties.Empty));
Assert.That(errorContext.ReceiveProperties, Is.SameAs(ReceiveProperties.Empty));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void MoveToErrorQueue_should_preserve_incoming_message_headers()

var outgoingMessageHeaders = routingContext.Message.Headers;

Assert.That(recoverabilityContext.FailedMessage.Headers, Is.SupersetOf(outgoingMessageHeaders));
Assert.That(recoverabilityContext.Headers, Is.SupersetOf(outgoingMessageHeaders));
}

[Test]
Expand Down Expand Up @@ -85,19 +85,21 @@ public void MoveToErrorQueue_should_add_metadata_to_headers()
{
Assert.That(outgoingMessageHeaders, Contains.Item(new KeyValuePair<string, string>("staticFaultMetadataKey", "staticFaultMetadataValue")));
// check for leaking headers
Assert.That(recoverabilityContext.FailedMessage.Headers.ContainsKey("staticFaultMetadataKey"), Is.False);
Assert.That(recoverabilityContext.Headers.ContainsKey("staticFaultMetadataKey"), Is.False);
}
}

static TestableRecoverabilityContext CreateRecoverabilityContext(Exception raisedException = null, string exceptionMessage = "default-message", string messageId = "default-id", Dictionary<string, string> messageHeaders = default, Dictionary<string, string> metadata = default)
{
var recoverabilityContext = new TestableRecoverabilityContext
{
FailedMessage = new IncomingMessage(messageId, messageHeaders ?? [], ReadOnlyMemory<byte>.Empty),
MessageId = messageId,
Headers = messageHeaders ?? [],
Body = ReadOnlyMemory<byte>.Empty,
Exception = raisedException ?? new Exception(exceptionMessage)
};

if (metadata != default)
if (metadata != null)
{
recoverabilityContext.Metadata = metadata;
}
Expand Down
2 changes: 1 addition & 1 deletion src/NServiceBus.Core.Tests/Routing/RoutingSettingsTests.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
namespace NServiceBus.Core.Tests.Routing
{
using NServiceBus;
using System;
using System.Reflection;
using MessageNamespaceA;
using MessageNamespaceB;
using NServiceBus;
using NServiceBus.Features;
using NServiceBus.Routing;
using NUnit.Framework;
Expand Down
Loading
Loading