diff --git a/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt b/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt index ee54cabd8de..19f9f9da639 100644 --- a/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt +++ b/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt @@ -2419,6 +2419,7 @@ namespace NServiceBus.Transport public class ErrorContext { public ErrorContext(System.Exception exception, System.Collections.Generic.Dictionary headers, string nativeMessageId, System.ReadOnlyMemory body, NServiceBus.Transport.TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { } + public ErrorContext(System.Exception exception, System.Collections.Generic.Dictionary headers, string nativeMessageId, System.ReadOnlyMemory body, NServiceBus.Transport.ReceiveProperties receiveProperties, NServiceBus.Transport.TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { } public int DelayedDeliveriesPerformed { get; } public System.Exception Exception { get; } public NServiceBus.Extensibility.ContextBag Extensions { get; } @@ -2481,10 +2482,12 @@ namespace NServiceBus.Transport public class IncomingMessage { public IncomingMessage(string nativeMessageId, System.Collections.Generic.Dictionary headers, System.ReadOnlyMemory body) { } + public IncomingMessage(string nativeMessageId, System.Collections.Generic.Dictionary headers, System.ReadOnlyMemory body, NServiceBus.Transport.ReceiveProperties receiveProperties) { } public System.ReadOnlyMemory Body { get; } public System.Collections.Generic.Dictionary Headers { get; } public string MessageId { get; } public string NativeMessageId { get; } + public NServiceBus.Transport.ReceiveProperties ReceiveProperties { get; } } public static class IncomingMessageExtensions { @@ -2494,11 +2497,13 @@ namespace NServiceBus.Transport public class MessageContext : NServiceBus.Extensibility.IExtendable { public MessageContext(string nativeMessageId, System.Collections.Generic.Dictionary headers, System.ReadOnlyMemory body, NServiceBus.Transport.TransportTransaction transportTransaction, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { } + public MessageContext(string nativeMessageId, System.Collections.Generic.Dictionary headers, System.ReadOnlyMemory body, NServiceBus.Transport.ReceiveProperties receiveProperties, NServiceBus.Transport.TransportTransaction transportTransaction, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { } public System.ReadOnlyMemory Body { get; } public NServiceBus.Extensibility.ContextBag Extensions { get; } public System.Collections.Generic.Dictionary Headers { get; } public string NativeMessageId { get; } public string ReceiveAddress { get; } + public NServiceBus.Transport.ReceiveProperties ReceiveProperties { get; } public NServiceBus.Transport.TransportTransaction TransportTransaction { get; } } public class MulticastTransportOperation : NServiceBus.Transport.IOutgoingTransportOperation @@ -2541,6 +2546,19 @@ namespace NServiceBus.Transport public System.Collections.Generic.IReadOnlyCollection SendingAddresses { get; } public void BindSending(string transportAddress) { } } + public sealed class ReceiveProperties : System.Collections.Generic.IEnumerable>, System.Collections.Generic.IReadOnlyCollection>, System.Collections.Generic.IReadOnlyDictionary, System.Collections.IEnumerable + { + public ReceiveProperties() { } + public ReceiveProperties(System.Collections.Generic.Dictionary dictionary) { } + public int Count { get; } + public string this[string key] { get; } + public System.Collections.Generic.IEnumerable Keys { get; } + public System.Collections.Generic.IEnumerable Values { get; } + public static NServiceBus.Transport.ReceiveProperties Empty { get; } + public bool ContainsKey(string key) { } + public System.Collections.Generic.IEnumerator> GetEnumerator() { } + public bool TryGetValue(string key, [System.Diagnostics.CodeAnalysis.MaybeNullWhen(false)] out string value) { } + } public class ReceiveSettings { public ReceiveSettings(string id, NServiceBus.Transport.QueueAddress receiveAddress, bool usePublishSubscribe, bool purgeOnStartup, string errorQueue) { } diff --git a/src/NServiceBus.Core.Tests/Recoverability/ErrorContextTests.cs b/src/NServiceBus.Core.Tests/Recoverability/ErrorContextTests.cs index ece0664acf5..31cc54c7107 100644 --- a/src/NServiceBus.Core.Tests/Recoverability/ErrorContextTests.cs +++ b/src/NServiceBus.Core.Tests/Recoverability/ErrorContextTests.cs @@ -1,9 +1,10 @@ namespace NServiceBus.Core.Tests.Recoverability; using System; -using Extensibility; +using System.Collections.Generic; +using NServiceBus.Extensibility; +using NServiceBus.Transport; using NUnit.Framework; -using Transport; [TestFixture] public class ErrorContextTests @@ -17,4 +18,48 @@ public void Can_pass_additional_information_via_context_bag() Assert.That(context.Extensions.Get("MyKey"), Is.EqualTo("MyValue")); } + + [Test] + public void Should_carry_receive_properties_on_incoming_message() + { + var context = new ContextBag(); + var receiveProperties = new ReceiveProperties(new Dictionary { ["Native.Key"] = "Value" }); + + var errorContext = new ErrorContext( + exception: new InvalidOperationException("Test"), + headers: new Dictionary { [Headers.MessageId] = "id" }, + nativeMessageId: "native-id", + body: ReadOnlyMemory.Empty, + receiveProperties: receiveProperties, + transportTransaction: new TransportTransaction(), + immediateProcessingFailures: 1, + receiveAddress: "queue", + context: context + ); + + using (Assert.EnterMultipleScope()) + { + Assert.That(errorContext.Message.ReceiveProperties, Is.SameAs(receiveProperties)); + Assert.That(errorContext.Message.ReceiveProperties["Native.Key"], Is.EqualTo("Value")); + } + } + + [Test] + public void Should_default_receive_properties_to_empty_when_not_provided() + { + var context = new ContextBag(); + + var errorContext = new ErrorContext( + exception: new InvalidOperationException("Test"), + headers: [], + nativeMessageId: "native-id", + body: ReadOnlyMemory.Empty, + transportTransaction: new TransportTransaction(), + immediateProcessingFailures: 1, + receiveAddress: "queue", + context: context + ); + + Assert.That(errorContext.Message.ReceiveProperties, Is.SameAs(ReceiveProperties.Empty)); + } } \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs b/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs index fa88ed971f3..c09c1851aaf 100644 --- a/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs +++ b/src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs @@ -1,6 +1,7 @@ namespace NServiceBus.Core.Tests.Routing; using System; +using System.Collections.Frozen; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -60,7 +61,7 @@ await behavior.Invoke(testableRoutingContext, context => public async Task Should_copy_message_state_for_multiple_routing_strategies() { var behavior = new RoutingToDispatchConnector(); - IEnumerable operations = null; + List operations = null; var testableRoutingContext = new TestableRoutingContext { RoutingStrategies = @@ -78,11 +79,11 @@ public async Task Should_copy_message_state_for_multiple_routing_strategies() testableRoutingContext.Message = new OutgoingMessage("ID", originalHeaders, Array.Empty()); await behavior.Invoke(testableRoutingContext, context => { - operations = context.Operations; + operations = [.. context.Operations]; return Task.CompletedTask; }); - Assert.That(operations.ToList(), Has.Count.EqualTo(2)); + Assert.That(operations, Has.Count.EqualTo(2)); TransportOperation destination1Operation = operations.ElementAt(0); using (Assert.EnterMultipleScope()) @@ -91,13 +92,20 @@ await behavior.Invoke(testableRoutingContext, context => Assert.That((destination1Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination1")); } Dictionary destination1Headers = destination1Operation.Message.Headers; - Assert.That(destination1Headers, Contains.Item(new KeyValuePair("SomeHeaderKey", "SomeHeaderValue"))); - Assert.That(destination1Headers, Contains.Item(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"))); - Assert.That(destination1Headers, Does.Not.Contain(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2"))); - Assert.That(destination1Headers, Is.Not.SameAs(originalHeaders)); + using (Assert.EnterMultipleScope()) + { + Assert.That(destination1Headers, Contains.Item(new KeyValuePair("SomeHeaderKey", "SomeHeaderValue"))); + Assert.That(destination1Headers, Contains.Item(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"))); + Assert.That(destination1Headers, Does.Not.Contain(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2"))); + Assert.That(destination1Headers, Is.Not.SameAs(originalHeaders)); + } + DispatchProperties destination1DispatchProperties = destination1Operation.Properties; - Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair("SomeKey", "SomeValue"))); - Assert.That(destination1DispatchProperties, Is.Not.SameAs(originalDispatchProperties)); + using (Assert.EnterMultipleScope()) + { + Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair("SomeKey", "SomeValue"))); + Assert.That(destination1DispatchProperties, Is.Not.SameAs(originalDispatchProperties)); + } TransportOperation destination2Operation = operations.ElementAt(1); using (Assert.EnterMultipleScope()) @@ -106,10 +114,14 @@ await behavior.Invoke(testableRoutingContext, context => Assert.That((destination2Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination2")); } Dictionary destination2Headers = destination2Operation.Message.Headers; - Assert.That(destination2Headers, Contains.Item(new KeyValuePair("SomeHeaderKey", "SomeHeaderValue"))); - Assert.That(destination2Headers, Contains.Item(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2"))); - Assert.That(destination2Headers, Does.Not.Contain(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"))); - Assert.That(destination2Headers, Is.Not.SameAs(originalHeaders)); + using (Assert.EnterMultipleScope()) + { + Assert.That(destination2Headers, Contains.Item(new KeyValuePair("SomeHeaderKey", "SomeHeaderValue"))); + Assert.That(destination2Headers, Contains.Item(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2"))); + Assert.That(destination2Headers, Does.Not.Contain(new KeyValuePair("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1"))); + Assert.That(destination2Headers, Is.Not.SameAs(originalHeaders)); + } + DispatchProperties destination2DispatchProperties = destination2Operation.Properties; Assert.That(destination2DispatchProperties, Is.Not.SameAs(originalDispatchProperties)); using (Assert.EnterMultipleScope()) @@ -242,4 +254,114 @@ public override AddressTag Apply(Dictionary headers) } class MyMessage : IMessage; + + [Test] + public async Task Should_merge_receive_properties_when_declared_by_transport() + { + var behavior = new RoutingToDispatchConnector(); + + var receiveProperties = new ReceiveProperties(new Dictionary + { + ["MessageGroupId"] = "group-123", + ["MessageDeduplicationId"] = "dedup-456" + }); + + TransportOperation transportOperation = null; + + var routingContext = new TestableRoutingContext + { + RoutingStrategies = [new UnicastRoutingStrategy("destination")] + }; + routingContext.Extensions.Set(new IncomingMessage(routingContext.Message.MessageId, [], Array.Empty(), receiveProperties)); + + await behavior.Invoke(routingContext, context => + { + transportOperation = context.Operations.Single(); + return Task.CompletedTask; + }); + + using (Assert.EnterMultipleScope()) + { + Assert.That(transportOperation, Is.Not.Null); + Assert.That(transportOperation!.Properties["MessageGroupId"], Is.EqualTo("group-123")); + Assert.That(transportOperation!.Properties["MessageDeduplicationId"], Is.EqualTo("dedup-456")); + } + } + + [Test] + public async Task Should_not_override_user_set_dispatch_property() + { + var behavior = new RoutingToDispatchConnector(); + + var receiveProperties = new ReceiveProperties(new Dictionary + { + ["MessageGroupId"] = "incoming-group" + }); + + var userDispatchProperties = new DispatchProperties + { + ["MessageGroupId"] = "user-specified-group" + }; + + TransportOperation transportOperation = null; + + var routingContext = new TestableRoutingContext + { + RoutingStrategies = [new UnicastRoutingStrategy("destination")] + }; + routingContext.Extensions.Set(new IncomingMessage(routingContext.Message.MessageId, [], Array.Empty(), receiveProperties)); + routingContext.Extensions.Set(userDispatchProperties); + + await behavior.Invoke(routingContext, context => + { + transportOperation = context.Operations.Single(); + return Task.CompletedTask; + }); + + Assert.That(transportOperation!.Properties["MessageGroupId"], Is.EqualTo("user-specified-group"), + "User-set DispatchProperties should take precedence over ReceiveProperties"); + } + + [Test] + public async Task Should_preserve_user_dispatch_properties_even_with_receive_properties() + { + var behavior = new RoutingToDispatchConnector(); + + var receiveProperties = new ReceiveProperties(new Dictionary + { + ["MessageGroupId"] = "incoming-group", + ["DeduplicationId"] = "incoming-dedup" + }); + + var userDispatchProperties = new DispatchProperties + { + ["MessageGroupId"] = "user-group", + ["Custom.Property"] = "custom-value" + }; + + TransportOperation transportOperation = null; + + var routingContext = new TestableRoutingContext + { + RoutingStrategies = [new UnicastRoutingStrategy("destination")] + }; + routingContext.Extensions.Set(new IncomingMessage(routingContext.Message.MessageId, [], Array.Empty(), receiveProperties)); + routingContext.Extensions.Set(userDispatchProperties); + + await behavior.Invoke(routingContext, context => + { + transportOperation = context.Operations.Single(); + return Task.CompletedTask; + }); + + using (Assert.EnterMultipleScope()) + { + Assert.That(transportOperation!.Properties["MessageGroupId"], Is.EqualTo("user-group"), + "User-set property wins"); + Assert.That(transportOperation.Properties["DeduplicationId"], Is.EqualTo("incoming-dedup"), + "Receive property merged when not set by user"); + Assert.That(transportOperation.Properties["Custom.Property"], Is.EqualTo("custom-value"), + "User custom property preserved"); + } + } } \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Transports/MessageContextTests.cs b/src/NServiceBus.Core.Tests/Transports/MessageContextTests.cs new file mode 100644 index 00000000000..546caa985e1 --- /dev/null +++ b/src/NServiceBus.Core.Tests/Transports/MessageContextTests.cs @@ -0,0 +1,48 @@ +namespace NServiceBus.Core.Tests.Transports; + +using System; +using System.Collections.Generic; +using NServiceBus.Extensibility; +using NServiceBus.Transport; +using NUnit.Framework; + +[TestFixture] +public class MessageContextTests +{ + [Test] + public void Should_accept_receive_properties_via_ctor() + { + var receiveProperties = new ReceiveProperties(new Dictionary + { + ["Native.CustomProperty"] = "CustomValue", + ["MessageGroupId"] = "group-123" + }); + + var context = new MessageContext( + nativeMessageId: "native-id", + headers: [], + body: ReadOnlyMemory.Empty, + receiveProperties: receiveProperties, + transportTransaction: new TransportTransaction(), + receiveAddress: "queue@machine", + context: new ContextBag() + ); + + Assert.That(context.ReceiveProperties, Is.SameAs(receiveProperties)); + } + + [Test] + public void Should_default_receive_properties_to_empty() + { + var context = new MessageContext( + nativeMessageId: "native-id", + headers: [], + body: ReadOnlyMemory.Empty, + transportTransaction: new TransportTransaction(), + receiveAddress: "queue@machine", + context: new ContextBag() + ); + + Assert.That(context.ReceiveProperties, Is.SameAs(ReceiveProperties.Empty)); + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Transports/ReceivePropertiesTests.cs b/src/NServiceBus.Core.Tests/Transports/ReceivePropertiesTests.cs new file mode 100644 index 00000000000..67f86708167 --- /dev/null +++ b/src/NServiceBus.Core.Tests/Transports/ReceivePropertiesTests.cs @@ -0,0 +1,69 @@ +namespace NServiceBus.Core.Tests.Transports; + +using System.Collections.Generic; +using NServiceBus.Transport; +using NUnit.Framework; + +[TestFixture] +public class ReceivePropertiesTests +{ + [Test] + public void Should_initialize_empty() + { + var properties = new ReceiveProperties(); + Assert.That(properties, Is.Empty); + } + + [Test] + public void Should_wrap_provided_dictionary() + { + var source = new Dictionary + { + ["Key1"] = "Value1", + ["Key2"] = "Value2" + }; + + var properties = new ReceiveProperties(source); + + using (Assert.EnterMultipleScope()) + { + Assert.That(properties, Has.Count.EqualTo(2)); + Assert.That(properties["Key1"], Is.EqualTo("Value1")); + Assert.That(properties["Key2"], Is.EqualTo("Value2")); + } + } + + [Test] + public void Should_be_same_reference_as_source() + { + var source = new Dictionary { ["Key"] = "Value" }; + var properties = new ReceiveProperties(source); + + Assert.That(properties, Is.Not.Empty); + Assert.That(properties["Key"], Is.EqualTo("Value")); + } + + [Test] + public void Empty_should_be_immutable_singleton() + { + var empty1 = ReceiveProperties.Empty; + var empty2 = ReceiveProperties.Empty; + + using (Assert.EnterMultipleScope()) + { + Assert.That(empty1, Is.SameAs(empty2)); + Assert.That(empty1, Is.Empty); + } + } + + [Test] + public void Should_be_readonly_via_interface() + { + var properties = new ReceiveProperties(new Dictionary { ["Key"] = "Value" }); + + Assert.That(properties, Is.InstanceOf>()); + Assert.That(properties.ContainsKey("Key"), Is.True); + Assert.That(properties.TryGetValue("Key", out var value), Is.True); + Assert.That(value, Is.EqualTo("Value")); + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Envelopes/EnvelopeUnwrapper.cs b/src/NServiceBus.Core/Envelopes/EnvelopeUnwrapper.cs index 74e47b26544..3b3bd71202b 100644 --- a/src/NServiceBus.Core/Envelopes/EnvelopeUnwrapper.cs +++ b/src/NServiceBus.Core/Envelopes/EnvelopeUnwrapper.cs @@ -8,7 +8,8 @@ namespace NServiceBus; class EnvelopeUnwrapper(IEnvelopeHandler[] envelopeHandlers, IncomingPipelineMetrics metrics) { - static IncomingMessage GetDefaultIncomingMessage(MessageContext messageContext) => new(messageContext.NativeMessageId, messageContext.Headers, messageContext.Body); + static IncomingMessage GetDefaultIncomingMessage(MessageContext messageContext) => + new(messageContext.NativeMessageId, messageContext.Headers, messageContext.Body, messageContext.ReceiveProperties); internal IncomingMessageHandle UnwrapEnvelope(MessageContext messageContext) { @@ -36,7 +37,7 @@ internal IncomingMessageHandle UnwrapEnvelope(MessageContext messageContext) "Unwrapped the message (NativeID: {0} using {1}", messageContext.NativeMessageId, envelopeHandler.GetType().Name); } - return new IncomingMessageHandle(new IncomingMessage(messageContext.NativeMessageId, headers, bufferWriter.WrittenMemory), bufferWriter); + return new IncomingMessageHandle(new IncomingMessage(messageContext.NativeMessageId, headers, bufferWriter.WrittenMemory, messageContext.ReceiveProperties), bufferWriter); } if (Log.IsDebugEnabled) diff --git a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs index c8181545073..bc4be17db8b 100644 --- a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs +++ b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs @@ -22,9 +22,17 @@ public override Task Invoke(IRoutingContext context, Func 1; foreach (var strategy in context.RoutingStrategies) { - operations[index] = context.ToTransportOperation(strategy, dispatchConsistency, copySharedMutableMessageState); - index++; + var transportOperation = context.ToTransportOperation(strategy, dispatchConsistency, copySharedMutableMessageState); + + foreach (var (propertyName, propertyValue) in receiveProperties) + { + //if dispatch property is not already set, set it + _ = transportOperation.Properties.TryAdd(propertyName, propertyValue); + } + + operations[index++] = transportOperation; } if (isDebugEnabled) @@ -44,7 +59,7 @@ public override Task Invoke(IRoutingContext context, Func(out var pendingOperations)) diff --git a/src/NServiceBus.Core/Pipeline/Outgoing/SendComponent.cs b/src/NServiceBus.Core/Pipeline/Outgoing/SendComponent.cs index 8228a648012..c00d0880656 100644 --- a/src/NServiceBus.Core/Pipeline/Outgoing/SendComponent.cs +++ b/src/NServiceBus.Core/Pipeline/Outgoing/SendComponent.cs @@ -27,7 +27,8 @@ public static SendComponent Initialize(PipelineSettings pipelineSettings, Hostin pipelineSettings.Register(new OutgoingPhysicalToRoutingConnector(), "Starts the message dispatch pipeline"); - pipelineSettings.Register(new RoutingToDispatchConnector(), "Decides if the current message should be batched or immediately be dispatched to the transport"); + pipelineSettings.Register(new RoutingToDispatchConnector(), + "Decides if the current message should be batched or immediately be dispatched to the transport"); pipelineSettings.Register(new BatchToDispatchConnector(), "Passes batched messages over to the immediate dispatch part of the pipeline"); pipelineSettings.Register(b => new ImmediateDispatchTerminator(b.GetRequiredService()), "Hands the outgoing messages over to the transport for immediate delivery"); diff --git a/src/NServiceBus.Core/Transports/ErrorContext.cs b/src/NServiceBus.Core/Transports/ErrorContext.cs index 52829dd0970..b94e89e4a4a 100644 --- a/src/NServiceBus.Core/Transports/ErrorContext.cs +++ b/src/NServiceBus.Core/Transports/ErrorContext.cs @@ -21,18 +21,36 @@ public class ErrorContext /// The receive address. /// A which can be used to extend the current object. public ErrorContext(Exception exception, Dictionary headers, string nativeMessageId, ReadOnlyMemory body, TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, ContextBag context) + : this(exception, headers, nativeMessageId, body, ReceiveProperties.Empty, transportTransaction, immediateProcessingFailures, receiveAddress, context) + { + } + + /// + /// Initializes the error context with receive properties. + /// + /// The exception that caused the message processing failure. + /// The message headers. + /// The native message ID. + /// The message body. + /// Properties received from the transport that can be propagated to outgoing messages. + /// Transaction (along with connection if applicable) used to receive the message. + /// Number of failed immediate processing attempts. + /// The receive address. + /// A which can be used to extend the current object. + public ErrorContext(Exception exception, Dictionary headers, string nativeMessageId, ReadOnlyMemory body, ReceiveProperties receiveProperties, TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, ContextBag context) { ArgumentNullException.ThrowIfNull(exception); ArgumentNullException.ThrowIfNull(transportTransaction); ArgumentOutOfRangeException.ThrowIfNegative(immediateProcessingFailures); ArgumentException.ThrowIfNullOrWhiteSpace(receiveAddress); ArgumentNullException.ThrowIfNull(context); + ArgumentNullException.ThrowIfNull(receiveProperties); Exception = exception; TransportTransaction = transportTransaction; ImmediateProcessingFailures = immediateProcessingFailures; - Message = new IncomingMessage(nativeMessageId, headers, body); + Message = new IncomingMessage(nativeMessageId, headers, body, receiveProperties); ReceiveAddress = receiveAddress; diff --git a/src/NServiceBus.Core/Transports/IncomingMessage.cs b/src/NServiceBus.Core/Transports/IncomingMessage.cs index 8d6c11123b6..e18163cf15e 100644 --- a/src/NServiceBus.Core/Transports/IncomingMessage.cs +++ b/src/NServiceBus.Core/Transports/IncomingMessage.cs @@ -15,9 +15,22 @@ public class IncomingMessage /// The message headers. /// The message body. public IncomingMessage(string nativeMessageId, Dictionary headers, ReadOnlyMemory body) + : this(nativeMessageId, headers, body, ReceiveProperties.Empty) + { + } + + /// + /// Creates a new message with receive properties. + /// + /// The native message ID. + /// The message headers. + /// The message body. + /// Properties received from the transport. + public IncomingMessage(string nativeMessageId, Dictionary headers, ReadOnlyMemory body, ReceiveProperties receiveProperties) { ArgumentException.ThrowIfNullOrWhiteSpace(nativeMessageId); ArgumentNullException.ThrowIfNull(headers); + ArgumentNullException.ThrowIfNull(receiveProperties); if (headers.TryGetValue(NServiceBus.Headers.MessageId, out var originalMessageId) && !string.IsNullOrEmpty(originalMessageId)) { @@ -35,6 +48,8 @@ public IncomingMessage(string nativeMessageId, Dictionary header Headers = headers; Body = body; + + ReceiveProperties = receiveProperties; } /// @@ -52,6 +67,11 @@ public IncomingMessage(string nativeMessageId, Dictionary header /// public Dictionary Headers { get; } + /// + /// Properties received from the transport that can be propagated to outgoing dispatch operations. + /// + public ReceiveProperties ReceiveProperties { get; } + /// /// Gets/sets a byte array to the body content of the message. /// diff --git a/src/NServiceBus.Core/Transports/Learning/AsyncFile.cs b/src/NServiceBus.Core/Transports/Learning/AsyncFile.cs index 98e508059dc..51a46256015 100644 --- a/src/NServiceBus.Core/Transports/Learning/AsyncFile.cs +++ b/src/NServiceBus.Core/Transports/Learning/AsyncFile.cs @@ -20,7 +20,7 @@ public static async Task WriteBytes(string filePath, ReadOnlyMemory bytes, } //write to temp file first so we can do a atomic move - public static async Task WriteTextAtomic(string targetPath, string text, CancellationToken cancellationToken = default) + public static async Task WriteTextAtomic(string targetPath, string text, DateTime? creationTime, CancellationToken cancellationToken = default) { var tempFile = Path.GetTempFileName(); var bytes = Encoding.UTF8.GetBytes(text); @@ -56,6 +56,10 @@ public static async Task WriteTextAtomic(string targetPath, string text, Cancell } File.Move(tempFile, targetPath); + if (creationTime.HasValue) + { + File.SetCreationTime(targetPath, creationTime.Value); + } } static FileStream CreateWriteStream(string filePath, FileMode fileMode) @@ -63,11 +67,23 @@ static FileStream CreateWriteStream(string filePath, FileMode fileMode) return new FileStream(filePath, fileMode, FileAccess.Write, FileShare.None, bufferSize: 4096, useAsync: true); } - public static Task WriteText(string filePath, string text, CancellationToken cancellationToken = default) + public static async Task WriteText(string filePath, string text, CancellationToken cancellationToken = default) + { + var bytes = Encoding.UTF8.GetBytes(text); + + await WriteBytes(filePath, bytes, cancellationToken).ConfigureAwait(false); + } + + public static async Task WriteText(string filePath, string text, DateTime? creationTime, CancellationToken cancellationToken = default) { var bytes = Encoding.UTF8.GetBytes(text); - return WriteBytes(filePath, bytes, cancellationToken); + await WriteBytes(filePath, bytes, cancellationToken).ConfigureAwait(false); + + if (creationTime.HasValue) + { + File.SetCreationTime(filePath, creationTime.Value); + } } public static async Task ReadText(string filePath, CancellationToken cancellationToken = default) diff --git a/src/NServiceBus.Core/Transports/Learning/DirectoryBasedTransaction.cs b/src/NServiceBus.Core/Transports/Learning/DirectoryBasedTransaction.cs index 4680e632462..5ba34336f15 100644 --- a/src/NServiceBus.Core/Transports/Learning/DirectoryBasedTransaction.cs +++ b/src/NServiceBus.Core/Transports/Learning/DirectoryBasedTransaction.cs @@ -53,7 +53,7 @@ public void ClearPendingOutgoingOperations() { } } - public Task Enlist(string messagePath, string messageContents, CancellationToken cancellationToken = default) + public Task Enlist(string messagePath, string messageContents, DateTime? creationTime, CancellationToken cancellationToken = default) { var inProgressFileName = Path.GetFileNameWithoutExtension(messagePath) + ".out"; @@ -62,7 +62,7 @@ public Task Enlist(string messagePath, string messageContents, CancellationToken outgoingFiles.Enqueue(new OutgoingFile(committedPath, messagePath)); - return AsyncFile.WriteText(txPath, messageContents, cancellationToken); + return AsyncFile.WriteText(txPath, messageContents, creationTime, cancellationToken); } public bool Complete() diff --git a/src/NServiceBus.Core/Transports/Learning/ILearningTransportTransaction.cs b/src/NServiceBus.Core/Transports/Learning/ILearningTransportTransaction.cs index 0b18d40500f..fb9956ee7f1 100644 --- a/src/NServiceBus.Core/Transports/Learning/ILearningTransportTransaction.cs +++ b/src/NServiceBus.Core/Transports/Learning/ILearningTransportTransaction.cs @@ -1,5 +1,6 @@ namespace NServiceBus; +using System; using System.Threading; using System.Threading.Tasks; @@ -15,7 +16,7 @@ interface ILearningTransportTransaction void ClearPendingOutgoingOperations(); - Task Enlist(string messagePath, string messageContents, CancellationToken cancellationToken = default); + Task Enlist(string messagePath, string messageContents, DateTime? creationTime, CancellationToken cancellationToken = default); bool Complete(); } \ No newline at end of file diff --git a/src/NServiceBus.Core/Transports/Learning/LearningTransportDispatcher.cs b/src/NServiceBus.Core/Transports/Learning/LearningTransportDispatcher.cs index be40c54ac2d..efa24037ee1 100644 --- a/src/NServiceBus.Core/Transports/Learning/LearningTransportDispatcher.cs +++ b/src/NServiceBus.Core/Transports/Learning/LearningTransportDispatcher.cs @@ -110,6 +110,12 @@ await AsyncFile.WriteBytes(bodyPath, message.Body, cancellationToken) message.Headers[LearningTransportHeaders.TimeToBeReceived] = timeToBeReceived.MaxTime.ToString(); } + DateTime? creationTime = null; + if (transportOperation.Properties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedString)) + { + creationTime = DateTime.Parse(fileCreatedString); + } + var messagePath = Path.Combine(destinationPath, nativeMessageId) + ".metadata.txt"; var headerPayload = HeaderSerializer.Serialize(message.Headers); @@ -122,13 +128,13 @@ await AsyncFile.WriteBytes(bodyPath, message.Body, cancellationToken) if (transportOperation.RequiredDispatchConsistency != DispatchConsistency.Isolated && transaction.TryGet(out var directoryBasedTransaction)) { - await directoryBasedTransaction.Enlist(messagePath, headerPayload, cancellationToken) + await directoryBasedTransaction.Enlist(messagePath, headerPayload, creationTime, cancellationToken) .ConfigureAwait(false); } else { // atomic avoids the file being locked when the receiver tries to process it - await AsyncFile.WriteTextAtomic(messagePath, headerPayload, cancellationToken) + await AsyncFile.WriteTextAtomic(messagePath, headerPayload, creationTime, cancellationToken) .ConfigureAwait(false); } } diff --git a/src/NServiceBus.Core/Transports/Learning/LearningTransportMessagePump.cs b/src/NServiceBus.Core/Transports/Learning/LearningTransportMessagePump.cs index 6d924c61ce0..a6a8068aa24 100644 --- a/src/NServiceBus.Core/Transports/Learning/LearningTransportMessagePump.cs +++ b/src/NServiceBus.Core/Transports/Learning/LearningTransportMessagePump.cs @@ -2,6 +2,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Threading; @@ -310,19 +311,18 @@ async Task ProcessFile(ILearningTransportTransaction transaction, string message var bodyPath = Path.Combine(bodyDir, $"{messageId}{BodyFileSuffix}"); var headers = HeaderSerializer.Deserialize(message); + var fileCreatedAt = File.GetCreationTimeUtc(transaction.FileToProcess); + if (headers.Remove(LearningTransportHeaders.TimeToBeReceived, out var ttbrString)) { var ttbr = TimeSpan.Parse(ttbrString); - //file.move preserves create time - var sentTime = File.GetCreationTimeUtc(transaction.FileToProcess); - var utcNow = DateTime.UtcNow; - if (sentTime + ttbr < utcNow) + if (fileCreatedAt + ttbr < utcNow) { await transaction.Commit(messageProcessingCancellationToken).ConfigureAwait(false); - log.InfoFormat("Dropping message '{0}' as the specified TimeToBeReceived of '{1}' expired since sending the message at '{2:O}'. Current UTC time is '{3:O}'", messageId, ttbrString, sentTime, utcNow); + log.InfoFormat("Dropping message '{0}' as the specified TimeToBeReceived of '{1}' expired since sending the message at '{2:O}'. Current UTC time is '{3:O}'", messageId, ttbrString, fileCreatedAt, utcNow); return; } } @@ -337,8 +337,12 @@ async Task ProcessFile(ILearningTransportTransaction transaction, string message } var processingContext = new ContextBag(); + var receiveProperties = new ReceiveProperties(new Dictionary + { + ["LearningTransport.FileCreatedAt"] = fileCreatedAt.ToString("O") + }); - var messageContext = new MessageContext(messageId, headers, body, transportTransaction, ReceiveAddress, processingContext); + var messageContext = new MessageContext(messageId, headers, body, receiveProperties, transportTransaction, ReceiveAddress, processingContext); try { @@ -359,7 +363,7 @@ async Task ProcessFile(ILearningTransportTransaction transaction, string message headers = HeaderSerializer.Deserialize(message); headers.Remove(LearningTransportHeaders.TimeToBeReceived); - var errorContext = new ErrorContext(exception, headers, messageId, body, transportTransaction, processingFailures, ReceiveAddress, processingContext); + var errorContext = new ErrorContext(exception, headers, messageId, body, receiveProperties, transportTransaction, processingFailures, ReceiveAddress, processingContext); ErrorHandleResult result; diff --git a/src/NServiceBus.Core/Transports/Learning/NoTransaction.cs b/src/NServiceBus.Core/Transports/Learning/NoTransaction.cs index 264dfc0d12d..f128ff60a0d 100644 --- a/src/NServiceBus.Core/Transports/Learning/NoTransaction.cs +++ b/src/NServiceBus.Core/Transports/Learning/NoTransaction.cs @@ -22,7 +22,8 @@ public Task BeginTransaction(string incomingFilePath, CancellationToken ca return AsyncFile.Move(incomingFilePath, FileToProcess, cancellationToken); } - public Task Enlist(string messagePath, string messageContents, CancellationToken cancellationToken = default) => AsyncFile.WriteText(messagePath, messageContents, cancellationToken); + public Task Enlist(string messagePath, string messageContents, DateTime? creationTime, CancellationToken cancellationToken = default) + => AsyncFile.WriteText(messagePath, messageContents, creationTime, cancellationToken); public Task Commit(CancellationToken cancellationToken = default) => Task.CompletedTask; diff --git a/src/NServiceBus.Core/Transports/MessageContext.cs b/src/NServiceBus.Core/Transports/MessageContext.cs index 3c1fed2cd6f..7d191154923 100644 --- a/src/NServiceBus.Core/Transports/MessageContext.cs +++ b/src/NServiceBus.Core/Transports/MessageContext.cs @@ -7,7 +7,7 @@ /// /// Allows the transport to pass relevant info to the pipeline. /// -public partial class MessageContext : IExtendable +public class MessageContext : IExtendable { /// /// Initializes the context. @@ -19,9 +19,25 @@ public partial class MessageContext : IExtendable /// The receive address. /// A which can be used to extend the current object. public MessageContext(string nativeMessageId, Dictionary headers, ReadOnlyMemory body, TransportTransaction transportTransaction, string receiveAddress, ContextBag context) + : this(nativeMessageId, headers, body, ReceiveProperties.Empty, transportTransaction, receiveAddress, context) + { + } + + /// + /// Initializes the context with receive properties. + /// + /// The native message ID. + /// The message headers. + /// The message body. + /// Properties received from the transport that can be propagated to outgoing messages. + /// Transaction (along with connection if applicable) used to receive the message. + /// The receive address. + /// A which can be used to extend the current object. + public MessageContext(string nativeMessageId, Dictionary headers, ReadOnlyMemory body, ReceiveProperties receiveProperties, TransportTransaction transportTransaction, string receiveAddress, ContextBag context) { ArgumentException.ThrowIfNullOrWhiteSpace(nativeMessageId); ArgumentNullException.ThrowIfNull(headers); + ArgumentNullException.ThrowIfNull(receiveProperties); ArgumentNullException.ThrowIfNull(transportTransaction); ArgumentException.ThrowIfNullOrWhiteSpace(receiveAddress); ArgumentNullException.ThrowIfNull(context); @@ -29,6 +45,7 @@ public MessageContext(string nativeMessageId, Dictionary headers Headers = headers; Body = body; NativeMessageId = nativeMessageId; + ReceiveProperties = receiveProperties; Extensions = context; ReceiveAddress = receiveAddress; TransportTransaction = transportTransaction; @@ -51,6 +68,11 @@ public MessageContext(string nativeMessageId, Dictionary headers /// public ReadOnlyMemory Body { get; } + /// + /// Properties received from the transport that can be propagated to outgoing dispatch operations. + /// + public ReceiveProperties ReceiveProperties { get; } + /// /// Transaction (along with connection if applicable) used to receive the message. /// diff --git a/src/NServiceBus.Core/Transports/ReceiveProperties.cs b/src/NServiceBus.Core/Transports/ReceiveProperties.cs new file mode 100644 index 00000000000..f4eecbab4c0 --- /dev/null +++ b/src/NServiceBus.Core/Transports/ReceiveProperties.cs @@ -0,0 +1,56 @@ +#nullable enable + +namespace NServiceBus.Transport; + +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; + +/// +/// Properties received from the transport that can be propagated to outgoing dispatch operations. +/// Transports populate this with native message metadata that should survive audit and error operations. +/// +[SuppressMessage("Naming", "CA1710:Identifiers should have correct suffix", Justification = "Name reflects domain semantics, not collection implementation.")] +public sealed class ReceiveProperties : IReadOnlyDictionary +{ + readonly Dictionary properties; + + /// + /// An empty instance. + /// + public static ReceiveProperties Empty { get; } = new(); + + /// + /// Creates an empty instance of . + /// + public ReceiveProperties() => properties = []; + + /// + /// Creates a from the provided dictionary. + /// The dictionary is stored by reference — do not mutate it after passing to this constructor. + /// + public ReceiveProperties(Dictionary dictionary) => properties = dictionary; + + /// + public string this[string key] => properties[key]; + + /// + public IEnumerable Keys => properties.Keys; + + /// + public IEnumerable Values => properties.Values; + + /// + public int Count => properties.Count; + + /// + public bool ContainsKey(string key) => properties.ContainsKey(key); + + /// + public bool TryGetValue(string key, [MaybeNullWhen(false)] out string value) => properties.TryGetValue(key, out value); + + /// + public IEnumerator> GetEnumerator() => properties.GetEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); +} \ No newline at end of file diff --git a/src/NServiceBus.Learning.AcceptanceTests/When_message_sent_with_LearningTransport.cs b/src/NServiceBus.Learning.AcceptanceTests/When_message_sent_with_LearningTransport.cs new file mode 100644 index 00000000000..998485d6070 --- /dev/null +++ b/src/NServiceBus.Learning.AcceptanceTests/When_message_sent_with_LearningTransport.cs @@ -0,0 +1,440 @@ +namespace NServiceBus.AcceptanceTests; + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using AcceptanceTesting; +using AcceptanceTesting.Customization; +using EndpointTemplates; +using NServiceBus.Pipeline; +using NUnit.Framework; +using Transport; + +public class When_message_sent_with_LearningTransport : NServiceBusAcceptanceTest +{ + [Test] + public async Task Should_preserve_file_created_time_as_receive_property() + { + var context = await Scenario.Define() + .WithEndpoint(b => b.When(session => session.SendLocal(new TestMessage()))) + .Run(); + + using (Assert.EnterMultipleScope()) + { + Assert.That(context.MessageReceived, Is.True, "Message was not received"); + Assert.That(context.FileCreatedAt, Is.Not.Null, "FileCreatedAt property should be present"); + Assert.That(DateTime.TryParse(context.FileCreatedAt, out _), Is.True, "FileCreatedAt should be a valid datetime"); + } + } + + [Test] + public async Task Should_preserve_file_created_time_property_on_dispatched_copies() + { + var context = await Scenario.Define() + .WithEndpoint(b => b.When(session => session.SendLocal(new TestMessage()))) + .WithEndpoint() + .Run(); + + Assert.That(context.MessageAudited, Is.True, "Message was not audited"); + } + + [Test] + public async Task Should_not_preserve_receive_properties_on_outgoing_messages() + { + var context = await Scenario.Define() + .WithEndpoint(b => b.When(session => session.SendLocal(new TestMessage()))) + .Run(); + + Assert.That(context.MessageReceived, Is.True, "Message was received"); + } + + [Test] + public async Task Should_not_override_audit_properties_with_receive_properties_when_dispatch_properties_are_used() + { + var context = await Scenario.Define() + .WithEndpoint(b => b.When(session => session.SendLocal(new OutgoingTestMessage()))) + .WithEndpoint() + .Run(); + + Assert.That(context.MessageReceived, Is.True, "Message was not received"); + } + + [Test] + public async Task Should_preserve_file_created_time_property_when_moved_to_error_queue() + { + var context = await Scenario.Define() + .WithEndpoint(b => b + .DoNotFailOnErrorMessages() + .When((session, ctx) => session.SendLocal(new TestMessage())) + ) + .WithEndpoint() + .Run(); + + using (Assert.EnterMultipleScope()) + { + Assert.That(context.MessageMovedToErrorQueue, Is.True, "Message was not moved to error queue"); + Assert.That(context.FileCreatedAt, Is.Not.Null, "FileCreatedAt property should be present on message in error queue"); + Assert.That(DateTime.TryParse(context.FileCreatedAt, out _), Is.True, "FileCreatedAt should be a valid datetime"); + } + } + + [Test] + public async Task Should_not_override_error_queue_dispatch_properties_with_receive_properties() + { + var context = await Scenario.Define() + .WithEndpoint(b => b + .DoNotFailOnErrorMessages() + .When((session, ctx) => session.SendLocal(new TestMessage())) + ) + .WithEndpoint() + .Run(); + + using (Assert.EnterMultipleScope()) + { + Assert.That(context.MessageMovedToErrorQueue, Is.True, "Message was not moved to error queue"); + Assert.That(context.FileCreatedAt, Is.Not.Null, "FileCreatedAt property should be captured from original message"); + Assert.That(context.ErrorQueueFileCreatedAtDiffersFromOriginal, Is.True, "Error queue message should have FileCreatedAt from dispatch properties, not receive properties"); + } + } + + class Context : ScenarioContext + { + public bool MessageReceived { get; set; } + public string FileCreatedAt { get; set; } + public bool MessageAudited { get; set; } + public bool MessageMovedToErrorQueue { get; set; } + public bool ErrorQueueFileCreatedAtDiffersFromOriginal { get; set; } + } + + class EndPointThatReceivesFromAnotherAndAuditsEndpoint : EndpointConfigurationBuilder + { + public EndPointThatReceivesFromAnotherAndAuditsEndpoint() => EndpointSetup(endpointConfiguration => + { + endpointConfiguration.AuditProcessedMessagesTo(Conventions.EndpointNamingConvention(typeof(AuditSpyForEndPointThatReceivesFromAnotherAndAuditsEndpoint))); + endpointConfiguration.Pipeline.Register(behavior: new AuditHeaderOverrideBehavior(), description: "Override headers on audit messages"); + }); + + class OutgoingTestMessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(OutgoingTestMessage message, IMessageHandlerContext context) + { + testContext.MessageReceived = true; + + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + testContext.FileCreatedAt = fileCreatedAt; + } + else + { + testContext.MarkAsFailed(new Exception("Failed to propagate receive properties from the original message.")); + } + + return Task.CompletedTask; + } + } + } + + class AuditHeaderOverrideBehavior : Behavior + { + public override Task Invoke(IRoutingContext context, Func next) + { + context.Extensions.Get()["LearningTransport.FileCreatedAt"] = DateTime.UtcNow.AddDays(10).ToString("o"); + + return next(); + } + } + + class AuditSpyForEndPointThatReceivesFromAnotherAndAuditsEndpoint : EndpointConfigurationBuilder + { + public AuditSpyForEndPointThatReceivesFromAnotherAndAuditsEndpoint() => + EndpointSetup(); + + public class AuditMessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(OutgoingTestMessage message, IMessageHandlerContext context) + { + testContext.MessageAudited = true; + + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + if (fileCreatedAt == testContext.FileCreatedAt) + { + testContext.MarkAsFailed(new Exception("Receive properties from the original message is propagated to audit messages.")); + } + + testContext.MarkAsCompleted(testContext.MessageAudited, testContext.FileCreatedAt != fileCreatedAt); + } + else + { + testContext.MarkAsFailed(new Exception("Failed to propagate receive properties from the original message.")); + } + + return Task.CompletedTask; + } + } + } + + class SendingEndpoint : EndpointConfigurationBuilder + { + public SendingEndpoint() => EndpointSetup(); + + class TestMessageHandler(Context testContext) : IHandleMessages + { + public async Task Handle(TestMessage message, IMessageHandlerContext context) + { + testContext.MessageReceived = true; + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + testContext.FileCreatedAt = fileCreatedAt; + + await context.SendLocal(new OutgoingTestMessage()); + } + else + { + testContext.MarkAsFailed(new Exception("Failed to retrieve receive properties from the message context.")); + } + } + } + + //handler for the outgoing message to verify that receive properties are not propagated to outgoing messages + class OutgoingTestMessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(OutgoingTestMessage message, IMessageHandlerContext context) + { + testContext.MessageReceived = true; + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + if (fileCreatedAt == testContext.FileCreatedAt) + { + testContext.MarkAsFailed(new Exception("Receive properties from the original message is propagated to outgoing messages.")); + } + + testContext.MarkAsCompleted(testContext.MessageReceived, testContext.FileCreatedAt != fileCreatedAt); + } + else + { + testContext.MarkAsFailed(new Exception("Failed to retrieve receive properties from the message context.")); + } + return Task.CompletedTask; + } + } + } + + class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() => EndpointSetup(endpointConfiguration => + { + endpointConfiguration.AuditProcessedMessagesTo(Conventions.EndpointNamingConvention(typeof(AuditSpy))); + }); + + class TestMessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(TestMessage message, IMessageHandlerContext context) + { + testContext.MessageReceived = true; + + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + testContext.FileCreatedAt = fileCreatedAt; + } + else + { + testContext.MarkAsFailed(new Exception("Failed to retrieve receive properties from the message context.")); + } + + testContext.MarkAsCompleted(testContext.MessageReceived, testContext.FileCreatedAt != null); + + return Task.CompletedTask; + } + } + } + + class EndpointWithAuditOn : EndpointConfigurationBuilder + { + public EndpointWithAuditOn() => EndpointSetup(endpointConfiguration => + { + endpointConfiguration.AuditProcessedMessagesTo(Conventions.EndpointNamingConvention(typeof(AuditSpy))); + }); + + class TestMessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(TestMessage message, IMessageHandlerContext context) + { + testContext.MessageReceived = true; + + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + testContext.FileCreatedAt = fileCreatedAt; + } + + return Task.CompletedTask; + } + } + } + + class AuditSpy : EndpointConfigurationBuilder + { + public AuditSpy() => + EndpointSetup(); + + public class AuditMessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(TestMessage message, IMessageHandlerContext context) + { + testContext.MessageAudited = true; + + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + if (fileCreatedAt != testContext.FileCreatedAt) + { + testContext.MarkAsFailed(new Exception("Receive properties from the original message is not propagated to audit messages.")); + } + + testContext.MarkAsCompleted(testContext.MessageAudited, testContext.FileCreatedAt == fileCreatedAt); + } + else + { + testContext.MarkAsFailed(new Exception("Failed to propagate receive properties from the original message.")); + } + + return Task.CompletedTask; + } + } + } + + class EndpointWithFailingHandler : EndpointConfigurationBuilder + { + public EndpointWithFailingHandler() => EndpointSetup(endpointConfiguration => + { + endpointConfiguration.Recoverability().AddUnrecoverableException(); + endpointConfiguration.SendFailedMessagesTo(Conventions.EndpointNamingConvention(typeof(ErrorSpy))); + }); + + class TestMessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(TestMessage message, IMessageHandlerContext context) + { + testContext.MessageReceived = true; + + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + testContext.FileCreatedAt = fileCreatedAt; + } + + throw new SimulatedException("Message should be moved to error queue"); + } + } + } + + class EndpointWithFailingHandlerAndDispatchOverride : EndpointConfigurationBuilder + { + public EndpointWithFailingHandlerAndDispatchOverride() => EndpointSetup(endpointConfiguration => + { + endpointConfiguration.Recoverability().AddUnrecoverableException(); + endpointConfiguration.SendFailedMessagesTo(Conventions.EndpointNamingConvention(typeof(ErrorSpyWithDispatchPropertyVerification))); + endpointConfiguration.Pipeline.Register(behavior: new ErrorQueueDispatchPropertyOverrideBehavior(), description: "Override FileCreatedAt dispatch property on error queue messages"); + }); + + class TestMessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(TestMessage message, IMessageHandlerContext context) + { + testContext.MessageReceived = true; + + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + testContext.FileCreatedAt = fileCreatedAt; + } + + throw new SimulatedException("Message should be moved to error queue"); + } + } + } + + class ErrorQueueDispatchPropertyOverrideBehavior : Behavior + { + public override Task Invoke(IRecoverabilityContext context, Func next) + { + if (context.RecoverabilityAction is MoveToError) + { + context.RecoverabilityAction = new CustomMoveToError(context.RecoverabilityConfiguration.Failed.ErrorQueue); + } + + return next(); + } + + class CustomMoveToError(string errorQueue) : MoveToError(errorQueue) + { + public override IReadOnlyCollection GetRoutingContexts(IRecoverabilityActionContext context) + { + var routingContexts = base.GetRoutingContexts(context); + + foreach (var routingContext in routingContexts) + { + routingContext.Extensions.GetOrCreate()["LearningTransport.FileCreatedAt"] = DateTime.UtcNow.AddDays(10).ToString("o"); + } + + return routingContexts; + } + } + } + + class ErrorSpy : EndpointConfigurationBuilder + { + public ErrorSpy() => EndpointSetup(); + + public class ErrorMessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(TestMessage message, IMessageHandlerContext context) + { + testContext.MessageMovedToErrorQueue = true; + + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + testContext.FileCreatedAt = fileCreatedAt; + testContext.MarkAsCompleted(testContext.MessageMovedToErrorQueue, testContext.FileCreatedAt != null); + } + else + { + testContext.MarkAsFailed(new Exception("Failed to propagate receive properties to error queue message.")); + } + + return Task.CompletedTask; + } + } + } + + class ErrorSpyWithDispatchPropertyVerification : EndpointConfigurationBuilder + { + public ErrorSpyWithDispatchPropertyVerification() => EndpointSetup(); + + public class ErrorMessageHandler(Context testContext) : IHandleMessages + { + public Task Handle(TestMessage message, IMessageHandlerContext context) + { + testContext.MessageMovedToErrorQueue = true; + + if (context.Extensions.TryGet(out var incomingMessage) && incomingMessage.ReceiveProperties.TryGetValue("LearningTransport.FileCreatedAt", out var fileCreatedAt)) + { + if (fileCreatedAt == testContext.FileCreatedAt) + { + testContext.MarkAsFailed(new Exception("Receive properties from the original message were propagated to error queue message instead of dispatch properties.")); + } + + testContext.ErrorQueueFileCreatedAtDiffersFromOriginal = testContext.FileCreatedAt != fileCreatedAt; + testContext.MarkAsCompleted(testContext.MessageMovedToErrorQueue, testContext.ErrorQueueFileCreatedAtDiffersFromOriginal); + } + else + { + testContext.MarkAsFailed(new Exception("Failed to retrieve receive properties from error queue message.")); + } + + return Task.CompletedTask; + } + } + } + + public class TestMessage : IMessage; + + public class OutgoingTestMessage : IMessage; +} \ No newline at end of file