Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8fa6c14
Introduce ReceiveProperties that are getting merged into dispatch pro…
danielmarbach Mar 19, 2026
ef4b178
Use ReceiveProperties for file metadata preservation in the learning …
danielmarbach Mar 19, 2026
9ecbe50
Pass TransportSeam to SendComponent for dispatch property support
danielmarbach Mar 19, 2026
a714fdb
Enhance test for LearningTransport to use multiple assertion scope an…
danielmarbach Mar 19, 2026
8ec1ceb
Invert the order of conditions to make it easier to understand
SzymonPobiega Apr 20, 2026
4d5970e
Add a test for propagating the receive to dispatch properties
SzymonPobiega Apr 20, 2026
9ba6f9e
Adjust the learning transport to save dispatch property of file creat…
SzymonPobiega Apr 20, 2026
f6a425e
Ensuring that receive properties are not passed along for outgoing me…
poornimanayar Apr 21, 2026
0280d87
formatting fix
poornimanayar Apr 21, 2026
c7c4022
Formatting fix
poornimanayar Apr 21, 2026
38e8982
Receive properties value should not override dispatch properties. Add…
poornimanayar Apr 21, 2026
68aa0bb
fix formatting
poornimanayar Apr 21, 2026
72eafa5
Test if we are not overriding user-set dispatch properties with ones …
SzymonPobiega Apr 24, 2026
60f35a1
Use IncomingMessage and context MessageID because they always contain…
danielmarbach Apr 30, 2026
e3dadb5
Declare outgoing message variable for slightly better readability
danielmarbach Apr 30, 2026
746701e
Improve test clarity by using `Assert.EnterMultipleScope` and refinin…
danielmarbach Apr 30, 2026
b162bbd
Set `IncomingMessage` in test extensions to ensure `MessageId` consis…
danielmarbach Apr 30, 2026
129001b
Remove unnecessary partial
danielmarbach Apr 30, 2026
7532130
Make ReceiveProperties readonly, move to IncomingMessage, and rename …
danielmarbach May 1, 2026
ff294f5
Refactor MessageContext and tests to simplify assertions and improve …
danielmarbach May 1, 2026
3835df7
Enable nullable reference types for ReceiveProperties
danielmarbach May 1, 2026
7216260
Add tests to verify error queue behavior for file created time and di…
danielmarbach May 1, 2026
ffaf6a1
Refactor RoutingToDispatchConnector to remove receive property names …
danielmarbach May 1, 2026
e2511b9
Remove unused System.Collections.Frozen import in TransportDefinition…
danielmarbach May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2419,6 +2419,7 @@ namespace NServiceBus.Transport
public class ErrorContext
{
public ErrorContext(System.Exception exception, System.Collections.Generic.Dictionary<string, string> headers, string nativeMessageId, System.ReadOnlyMemory<byte> body, NServiceBus.Transport.TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { }
public ErrorContext(System.Exception exception, System.Collections.Generic.Dictionary<string, string> headers, string nativeMessageId, System.ReadOnlyMemory<byte> body, NServiceBus.Transport.ReceiveProperties receiveProperties, NServiceBus.Transport.TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { }
public int DelayedDeliveriesPerformed { get; }
public System.Exception Exception { get; }
public NServiceBus.Extensibility.ContextBag Extensions { get; }
Expand Down Expand Up @@ -2481,10 +2482,12 @@ namespace NServiceBus.Transport
public class IncomingMessage
{
public IncomingMessage(string nativeMessageId, System.Collections.Generic.Dictionary<string, string> headers, System.ReadOnlyMemory<byte> body) { }
public IncomingMessage(string nativeMessageId, System.Collections.Generic.Dictionary<string, string> headers, System.ReadOnlyMemory<byte> body, NServiceBus.Transport.ReceiveProperties receiveProperties) { }
public System.ReadOnlyMemory<byte> Body { get; }
public System.Collections.Generic.Dictionary<string, string> Headers { get; }
public string MessageId { get; }
public string NativeMessageId { get; }
public NServiceBus.Transport.ReceiveProperties ReceiveProperties { get; }
}
public static class IncomingMessageExtensions
{
Expand All @@ -2494,11 +2497,13 @@ namespace NServiceBus.Transport
public class MessageContext : NServiceBus.Extensibility.IExtendable
{
public MessageContext(string nativeMessageId, System.Collections.Generic.Dictionary<string, string> headers, System.ReadOnlyMemory<byte> body, NServiceBus.Transport.TransportTransaction transportTransaction, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { }
public MessageContext(string nativeMessageId, System.Collections.Generic.Dictionary<string, string> headers, System.ReadOnlyMemory<byte> body, NServiceBus.Transport.ReceiveProperties receiveProperties, NServiceBus.Transport.TransportTransaction transportTransaction, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { }
public System.ReadOnlyMemory<byte> Body { get; }
public NServiceBus.Extensibility.ContextBag Extensions { get; }
public System.Collections.Generic.Dictionary<string, string> Headers { get; }
public string NativeMessageId { get; }
public string ReceiveAddress { get; }
public NServiceBus.Transport.ReceiveProperties ReceiveProperties { get; }
public NServiceBus.Transport.TransportTransaction TransportTransaction { get; }
}
public class MulticastTransportOperation : NServiceBus.Transport.IOutgoingTransportOperation
Expand Down Expand Up @@ -2541,6 +2546,19 @@ namespace NServiceBus.Transport
public System.Collections.Generic.IReadOnlyCollection<string> SendingAddresses { get; }
public void BindSending(string transportAddress) { }
}
public sealed class ReceiveProperties : System.Collections.Generic.IEnumerable<System.Collections.Generic.KeyValuePair<string, string>>, System.Collections.Generic.IReadOnlyCollection<System.Collections.Generic.KeyValuePair<string, string>>, System.Collections.Generic.IReadOnlyDictionary<string, string>, System.Collections.IEnumerable
{
public ReceiveProperties() { }
public ReceiveProperties(System.Collections.Generic.Dictionary<string, string> dictionary) { }
public int Count { get; }
public string this[string key] { get; }
public System.Collections.Generic.IEnumerable<string> Keys { get; }
public System.Collections.Generic.IEnumerable<string> Values { get; }
public static NServiceBus.Transport.ReceiveProperties Empty { get; }
public bool ContainsKey(string key) { }
public System.Collections.Generic.IEnumerator<System.Collections.Generic.KeyValuePair<string, string>> GetEnumerator() { }
public bool TryGetValue(string key, [System.Diagnostics.CodeAnalysis.MaybeNullWhen(false)] out string value) { }
}
public class ReceiveSettings
{
public ReceiveSettings(string id, NServiceBus.Transport.QueueAddress receiveAddress, bool usePublishSubscribe, bool purgeOnStartup, string errorQueue) { }
Expand Down
49 changes: 47 additions & 2 deletions src/NServiceBus.Core.Tests/Recoverability/ErrorContextTests.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
namespace NServiceBus.Core.Tests.Recoverability;

using System;
using Extensibility;
using System.Collections.Generic;
using NServiceBus.Extensibility;
using NServiceBus.Transport;
using NUnit.Framework;
using Transport;

[TestFixture]
public class ErrorContextTests
Expand All @@ -17,4 +18,48 @@ public void Can_pass_additional_information_via_context_bag()

Assert.That(context.Extensions.Get<string>("MyKey"), Is.EqualTo("MyValue"));
}

[Test]
public void Should_carry_receive_properties_on_incoming_message()
{
var context = new ContextBag();
var receiveProperties = new ReceiveProperties(new Dictionary<string, string> { ["Native.Key"] = "Value" });

var errorContext = new ErrorContext(
exception: new InvalidOperationException("Test"),
headers: new Dictionary<string, string> { [Headers.MessageId] = "id" },
nativeMessageId: "native-id",
body: ReadOnlyMemory<byte>.Empty,
receiveProperties: receiveProperties,
transportTransaction: new TransportTransaction(),
immediateProcessingFailures: 1,
receiveAddress: "queue",
context: context
);

using (Assert.EnterMultipleScope())
{
Assert.That(errorContext.Message.ReceiveProperties, Is.SameAs(receiveProperties));
Assert.That(errorContext.Message.ReceiveProperties["Native.Key"], Is.EqualTo("Value"));
}
}

[Test]
public void Should_default_receive_properties_to_empty_when_not_provided()
{
var context = new ContextBag();

var errorContext = new ErrorContext(
exception: new InvalidOperationException("Test"),
headers: [],
nativeMessageId: "native-id",
body: ReadOnlyMemory<byte>.Empty,
transportTransaction: new TransportTransaction(),
immediateProcessingFailures: 1,
receiveAddress: "queue",
context: context
);

Assert.That(errorContext.Message.ReceiveProperties, Is.SameAs(ReceiveProperties.Empty));
}
}
148 changes: 135 additions & 13 deletions src/NServiceBus.Core.Tests/Routing/RoutingToDispatchConnectorTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace NServiceBus.Core.Tests.Routing;

using System;
using System.Collections.Frozen;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
Expand Down Expand Up @@ -60,7 +61,7 @@ await behavior.Invoke(testableRoutingContext, context =>
public async Task Should_copy_message_state_for_multiple_routing_strategies()
{
var behavior = new RoutingToDispatchConnector();
IEnumerable<TransportOperation> operations = null;
List<TransportOperation> operations = null;
var testableRoutingContext = new TestableRoutingContext
{
RoutingStrategies =
Expand All @@ -78,11 +79,11 @@ public async Task Should_copy_message_state_for_multiple_routing_strategies()
testableRoutingContext.Message = new OutgoingMessage("ID", originalHeaders, Array.Empty<byte>());
await behavior.Invoke(testableRoutingContext, context =>
{
operations = context.Operations;
operations = [.. context.Operations];
return Task.CompletedTask;
});

Assert.That(operations.ToList(), Has.Count.EqualTo(2));
Assert.That(operations, Has.Count.EqualTo(2));

TransportOperation destination1Operation = operations.ElementAt(0);
using (Assert.EnterMultipleScope())
Expand All @@ -91,13 +92,20 @@ await behavior.Invoke(testableRoutingContext, context =>
Assert.That((destination1Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination1"));
}
Dictionary<string, string> destination1Headers = destination1Operation.Message.Headers;
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
Assert.That(destination1Headers, Does.Not.Contain(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")));
Assert.That(destination1Headers, Is.Not.SameAs(originalHeaders));
using (Assert.EnterMultipleScope())
{
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
Assert.That(destination1Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
Assert.That(destination1Headers, Does.Not.Contain(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")));
Assert.That(destination1Headers, Is.Not.SameAs(originalHeaders));
}

DispatchProperties destination1DispatchProperties = destination1Operation.Properties;
Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));
Assert.That(destination1DispatchProperties, Is.Not.SameAs(originalDispatchProperties));
using (Assert.EnterMultipleScope())
{
Assert.That(destination1DispatchProperties, Contains.Item(new KeyValuePair<string, string>("SomeKey", "SomeValue")));
Assert.That(destination1DispatchProperties, Is.Not.SameAs(originalDispatchProperties));
}

TransportOperation destination2Operation = operations.ElementAt(1);
using (Assert.EnterMultipleScope())
Expand All @@ -106,10 +114,14 @@ await behavior.Invoke(testableRoutingContext, context =>
Assert.That((destination2Operation.AddressTag as UnicastAddressTag)?.Destination, Is.EqualTo("destination2"));
}
Dictionary<string, string> destination2Headers = destination2Operation.Message.Headers;
Assert.That(destination2Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
Assert.That(destination2Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")));
Assert.That(destination2Headers, Does.Not.Contain(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
Assert.That(destination2Headers, Is.Not.SameAs(originalHeaders));
using (Assert.EnterMultipleScope())
{
Assert.That(destination2Headers, Contains.Item(new KeyValuePair<string, string>("SomeHeaderKey", "SomeHeaderValue")));
Assert.That(destination2Headers, Contains.Item(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy2", "HeaderValueAddedByTheRoutingStrategy2")));
Assert.That(destination2Headers, Does.Not.Contain(new KeyValuePair<string, string>("HeaderKeyAddedByTheRoutingStrategy1", "HeaderValueAddedByTheRoutingStrategy1")));
Assert.That(destination2Headers, Is.Not.SameAs(originalHeaders));
}

DispatchProperties destination2DispatchProperties = destination2Operation.Properties;
Assert.That(destination2DispatchProperties, Is.Not.SameAs(originalDispatchProperties));
using (Assert.EnterMultipleScope())
Expand Down Expand Up @@ -242,4 +254,114 @@ public override AddressTag Apply(Dictionary<string, string> headers)
}

class MyMessage : IMessage;

[Test]
public async Task Should_merge_receive_properties_when_declared_by_transport()
{
var behavior = new RoutingToDispatchConnector();

var receiveProperties = new ReceiveProperties(new Dictionary<string, string>
{
["MessageGroupId"] = "group-123",
["MessageDeduplicationId"] = "dedup-456"
});

TransportOperation transportOperation = null;

var routingContext = new TestableRoutingContext
{
RoutingStrategies = [new UnicastRoutingStrategy("destination")]
};
routingContext.Extensions.Set(new IncomingMessage(routingContext.Message.MessageId, [], Array.Empty<byte>(), receiveProperties));

await behavior.Invoke(routingContext, context =>
{
transportOperation = context.Operations.Single();
return Task.CompletedTask;
});

using (Assert.EnterMultipleScope())
{
Assert.That(transportOperation, Is.Not.Null);
Assert.That(transportOperation!.Properties["MessageGroupId"], Is.EqualTo("group-123"));
Assert.That(transportOperation!.Properties["MessageDeduplicationId"], Is.EqualTo("dedup-456"));
}
}

[Test]
public async Task Should_not_override_user_set_dispatch_property()
{
var behavior = new RoutingToDispatchConnector();

var receiveProperties = new ReceiveProperties(new Dictionary<string, string>
{
["MessageGroupId"] = "incoming-group"
});

var userDispatchProperties = new DispatchProperties
{
["MessageGroupId"] = "user-specified-group"
};

TransportOperation transportOperation = null;

var routingContext = new TestableRoutingContext
{
RoutingStrategies = [new UnicastRoutingStrategy("destination")]
};
routingContext.Extensions.Set(new IncomingMessage(routingContext.Message.MessageId, [], Array.Empty<byte>(), receiveProperties));
routingContext.Extensions.Set(userDispatchProperties);

await behavior.Invoke(routingContext, context =>
{
transportOperation = context.Operations.Single();
return Task.CompletedTask;
});

Assert.That(transportOperation!.Properties["MessageGroupId"], Is.EqualTo("user-specified-group"),
"User-set DispatchProperties should take precedence over ReceiveProperties");
}

[Test]
public async Task Should_preserve_user_dispatch_properties_even_with_receive_properties()
{
var behavior = new RoutingToDispatchConnector();

var receiveProperties = new ReceiveProperties(new Dictionary<string, string>
{
["MessageGroupId"] = "incoming-group",
["DeduplicationId"] = "incoming-dedup"
});

var userDispatchProperties = new DispatchProperties
{
["MessageGroupId"] = "user-group",
["Custom.Property"] = "custom-value"
};

TransportOperation transportOperation = null;

var routingContext = new TestableRoutingContext
{
RoutingStrategies = [new UnicastRoutingStrategy("destination")]
};
routingContext.Extensions.Set(new IncomingMessage(routingContext.Message.MessageId, [], Array.Empty<byte>(), receiveProperties));
routingContext.Extensions.Set(userDispatchProperties);

await behavior.Invoke(routingContext, context =>
{
transportOperation = context.Operations.Single();
return Task.CompletedTask;
});

using (Assert.EnterMultipleScope())
{
Assert.That(transportOperation!.Properties["MessageGroupId"], Is.EqualTo("user-group"),
"User-set property wins");
Assert.That(transportOperation.Properties["DeduplicationId"], Is.EqualTo("incoming-dedup"),
"Receive property merged when not set by user");
Assert.That(transportOperation.Properties["Custom.Property"], Is.EqualTo("custom-value"),
"User custom property preserved");
}
}
}
48 changes: 48 additions & 0 deletions src/NServiceBus.Core.Tests/Transports/MessageContextTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
namespace NServiceBus.Core.Tests.Transports;

using System;
using System.Collections.Generic;
using NServiceBus.Extensibility;
using NServiceBus.Transport;
using NUnit.Framework;

[TestFixture]
public class MessageContextTests
{
[Test]
public void Should_accept_receive_properties_via_ctor()
{
var receiveProperties = new ReceiveProperties(new Dictionary<string, string>
{
["Native.CustomProperty"] = "CustomValue",
["MessageGroupId"] = "group-123"
});

var context = new MessageContext(
nativeMessageId: "native-id",
headers: [],
body: ReadOnlyMemory<byte>.Empty,
receiveProperties: receiveProperties,
transportTransaction: new TransportTransaction(),
receiveAddress: "queue@machine",
context: new ContextBag()
);

Assert.That(context.ReceiveProperties, Is.SameAs(receiveProperties));
}

[Test]
public void Should_default_receive_properties_to_empty()
{
var context = new MessageContext(
nativeMessageId: "native-id",
headers: [],
body: ReadOnlyMemory<byte>.Empty,
transportTransaction: new TransportTransaction(),
receiveAddress: "queue@machine",
context: new ContextBag()
);

Assert.That(context.ReceiveProperties, Is.SameAs(ReceiveProperties.Empty));
}
}
Loading