diff --git a/src/NServiceBus.AcceptanceTests/DataBus/When_sending_databus_properties_from_different_environments.cs b/src/NServiceBus.AcceptanceTests/DataBus/When_sending_databus_properties_from_different_environments.cs new file mode 100644 index 00000000000..3df85c0d6f0 --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/DataBus/When_sending_databus_properties_from_different_environments.cs @@ -0,0 +1,154 @@ +namespace NServiceBus.AcceptanceTests.DataBus +{ + using System; + using System.IO; + using System.Linq; + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using EndpointTemplates; + using MessageMutator; + using NUnit.Framework; + + public class When_sending_databus_properties_from_different_environments : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_receive_messages_with_largepayload_correctly_from_windows() + { + var payloadToSend = new byte[PayloadSize]; + + var context = await Scenario.Define() + .WithEndpoint(b => b.When(session => session.Send(new MyMessageWithLargePayload + { + Payload = new DataBusProperty(payloadToSend) + }))) + .WithEndpoint() + .Done(c => c.ReceivedPayload != null) + .Run(); + + Assert.AreEqual(payloadToSend, context.ReceivedPayload, "The large payload should be marshalled correctly using the databus"); + } + + [Test] + public async Task Should_receive_messages_with_largepayload_correctly_from_linux() + { + var payloadToSend = new byte[PayloadSize]; + + var context = await Scenario.Define() + .WithEndpoint(b => b.When(session => session.Send(new MyMessageWithLargePayload + { + Payload = new DataBusProperty(payloadToSend) + }))) + .WithEndpoint() + .Done(c => c.ReceivedPayload != null) + .Run(); + + Assert.AreEqual(payloadToSend, context.ReceivedPayload, "The large payload should be marshalled correctly using the databus"); + } + + const int PayloadSize = 500; + + public class Context : ScenarioContext + { + public byte[] ReceivedPayload { get; set; } + } + + public class WindowsSender : EndpointConfigurationBuilder + { + public WindowsSender() + { + EndpointSetup(builder => + { + var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender"); + builder.UseDataBus().BasePath(basePath); + builder.UseSerialization(); + builder.ConfigureRouting().RouteToEndpoint(typeof(MyMessageWithLargePayload), typeof(Receiver)); + builder.RegisterMessageMutator(new MutateOutgoingForWindows()); + }); + } + + public class MutateOutgoingForWindows : IMutateOutgoingTransportMessages + { + public Task MutateOutgoing(MutateOutgoingTransportMessageContext context) + { + var databusHeaderKey = context.OutgoingHeaders.FirstOrDefault(f => f.Key.StartsWith("NServiceBus.DataBus.", StringComparison.OrdinalIgnoreCase)).Key; + context.OutgoingHeaders[databusHeaderKey] = context.OutgoingHeaders[databusHeaderKey].Replace("/", "\\"); + return Task.CompletedTask; + } + } + } + + + public class LinuxSender : EndpointConfigurationBuilder + { + public LinuxSender() + { + EndpointSetup(builder => + { + var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender"); + builder.UseDataBus().BasePath(basePath); + builder.UseSerialization(); + builder.ConfigureRouting().RouteToEndpoint(typeof(MyMessageWithLargePayload), typeof(Receiver)); + builder.RegisterMessageMutator(new MutateOutgoingForLinux()); + }); + } + + public class MutateOutgoingForLinux : IMutateOutgoingTransportMessages + { + public Task MutateOutgoing(MutateOutgoingTransportMessageContext context) + { + var databusHeaderKey = context.OutgoingHeaders.FirstOrDefault(f => f.Key.StartsWith("NServiceBus.DataBus.", StringComparison.OrdinalIgnoreCase)).Key; + context.OutgoingHeaders[databusHeaderKey] = context.OutgoingHeaders[databusHeaderKey].Replace("\\", "/"); + return Task.CompletedTask; + } + } + } + + public class Receiver : EndpointConfigurationBuilder + { + public Receiver() + { + EndpointSetup(builder => + { + var basePath = Path.Combine(TestContext.CurrentContext.TestDirectory, "databus", "sender"); + builder.UseDataBus().BasePath(basePath); + builder.UseSerialization(); + builder.RegisterMessageMutator(new Mutator()); + }); + } + + public class MyMessageHandler : IHandleMessages + { + public MyMessageHandler(Context context) + { + testContext = context; + } + + public Task Handle(MyMessageWithLargePayload messageWithLargePayload, IMessageHandlerContext context) + { + testContext.ReceivedPayload = messageWithLargePayload.Payload.Value; + + return Task.CompletedTask; + } + + Context testContext; + } + public class Mutator : IMutateIncomingTransportMessages + { + public Task MutateIncoming(MutateIncomingTransportMessageContext context) + { + if (context.Body.Length > PayloadSize) + { + throw new Exception("The message body is too large, which means the DataBus was not used to transfer the payload."); + } + return Task.CompletedTask; + } + } + } + + public class MyMessageWithLargePayload : ICommand + { + public DataBusProperty Payload { get; set; } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/DataBus/FileShare/AcceptanceTests.cs b/src/NServiceBus.Core.Tests/DataBus/FileShare/AcceptanceTests.cs index 70e8a156808..28191e1d71a 100644 --- a/src/NServiceBus.Core.Tests/DataBus/FileShare/AcceptanceTests.cs +++ b/src/NServiceBus.Core.Tests/DataBus/FileShare/AcceptanceTests.cs @@ -58,6 +58,20 @@ public async Task Should_handle_be_able_to_read_stored_values_concurrently() }); } + [TestCase('/')] + [TestCase('\\')] + public async Task Should_handle_be_able_to_read_stored_values_when_received_from_different_environments(char pathSeparator) + { + const string content = "Test"; + + var key = await Put(content, TimeSpan.MaxValue); + using (var stream = await dataBus.Get(key.Replace(Path.DirectorySeparatorChar, pathSeparator))) + using (var streamReader = new StreamReader(stream)) + { + Assert.AreEqual(await streamReader.ReadToEndAsync(), content); + } + } + [Test] public async Task Should_handle_max_ttl() {