From 4ea624d1d3fc3460f3babdeeaf48ffd176b39d8d Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Thu, 28 Jan 2016 15:26:12 +0100 Subject: [PATCH] Making sure we reset TimeToBeReceived before sending to error or retries queue, fixes #3363 --- .../Fakes/FakeFailureManager.cs | 23 --- .../Faults/FaultManagerTests.cs | 193 ++++++++++++++++++ .../NServiceBus.Core.Tests.csproj | 5 +- .../Satellite/SatelliteLauncherContext.cs | 4 +- .../for_the_transactional_transport.cs | 19 ++ .../Faults/Forwarder/FaultManager.cs | 55 ++--- .../SecondLevelRetries/SecondLevelRetries.cs | 2 +- 7 files changed, 246 insertions(+), 55 deletions(-) delete mode 100644 src/NServiceBus.Core.Tests/Fakes/FakeFailureManager.cs create mode 100644 src/NServiceBus.Core.Tests/Faults/FaultManagerTests.cs diff --git a/src/NServiceBus.Core.Tests/Fakes/FakeFailureManager.cs b/src/NServiceBus.Core.Tests/Fakes/FakeFailureManager.cs deleted file mode 100644 index 00d87574b2a..00000000000 --- a/src/NServiceBus.Core.Tests/Fakes/FakeFailureManager.cs +++ /dev/null @@ -1,23 +0,0 @@ -namespace NServiceBus.Core.Tests.Fakes -{ - using System; - using Faults; - - public class FakeFailureManager : IManageMessageFailures - { - public void SerializationFailedForMessage(TransportMessage message, Exception e) - { - - } - - public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e) - { - - } - - public void Init(Address address) - { - - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Faults/FaultManagerTests.cs b/src/NServiceBus.Core.Tests/Faults/FaultManagerTests.cs new file mode 100644 index 00000000000..82000b201f3 --- /dev/null +++ b/src/NServiceBus.Core.Tests/Faults/FaultManagerTests.cs @@ -0,0 +1,193 @@ +namespace NServiceBus.Core.Tests.Faults +{ + using System; + using System.Collections.Generic; + using NServiceBus.Faults; + using NServiceBus.Faults.Forwarder; + using NServiceBus.ObjectBuilder; + using NServiceBus.ObjectBuilder.Common; + using NServiceBus.Settings; + using NServiceBus.Transports; + using NServiceBus.Unicast; + using NUnit.Framework; + + [TestFixture] + public class FaultManagerTests + { + private IManageMessageFailures faultManager; + private FakeSender fakeSender; + + [SetUp] + public void SetUp() + { + fakeSender = new FakeSender(); + var settingsHolder = new SettingsHolder(); + faultManager = new FaultManager(new FakeBuilder(fakeSender), new Configure(settingsHolder, new FakeContainer(), new List>(), null)); + faultManager.Init(new Address("fake", "fake")); + } + + [Test] + public void SendingToErrorQueue_WhenSerializationFailedForMessage_ShouldRemoveTTBR() + { + var exception = new InvalidOperationException(); + var message = new TransportMessage("id", new Dictionary()); + message.TimeToBeReceived = TimeSpan.FromHours(1); + + faultManager.SerializationFailedForMessage(message, exception); + + var sentMessage = fakeSender.SentMessage; + Assert.AreEqual(TimeSpan.MaxValue, sentMessage.TimeToBeReceived); + } + + [Test] + public void SendingToErrorQueue_ProcessingAlwaysFailsForMessage_WhenSentFromSLR_ShouldRemoveTTBR() + { + var exception = new InvalidOperationException(); + var message = new TransportMessage("id", new Dictionary()); + message.TimeToBeReceived = TimeSpan.FromHours(1); + + ((FaultManager) faultManager).RetriesQueue = new Address("fake", "fake"); + + faultManager.ProcessingAlwaysFailsForMessage(message, exception); + + var sentMessage = fakeSender.SentMessage; + Assert.AreEqual(TimeSpan.MaxValue, sentMessage.TimeToBeReceived); + } + + [Test] + public void SendingToErrorQueue_ProcessingAlwaysFailsForMessage_WhenRetriesQueueIsNull_ShouldRemoveTTBR() + { + var exception = new InvalidOperationException(); + var message = new TransportMessage("id", new Dictionary()); + message.TimeToBeReceived = TimeSpan.FromHours(1); + + faultManager.ProcessingAlwaysFailsForMessage(message, exception); + + var sentMessage = fakeSender.SentMessage; + Assert.AreEqual(TimeSpan.MaxValue, sentMessage.TimeToBeReceived); + } + + [Test] + public void SendingToRetriesQueue_ProcessingAlwaysFailsForMessage_ShouldRemoveTTBR() + { + var exception = new InvalidOperationException(); + var message = new TransportMessage("id", new Dictionary()); + message.TimeToBeReceived = TimeSpan.FromHours(1); + + var manager = (FaultManager)faultManager; + manager.RetriesQueue = new Address("retries", "fake"); + + faultManager.ProcessingAlwaysFailsForMessage(message, exception); + + var sentMessage = fakeSender.SentMessage; + Assert.AreEqual(TimeSpan.MaxValue, sentMessage.TimeToBeReceived); + } + + class FakeBuilder : IBuilder + { + private readonly FakeSender sender; + + public FakeBuilder(FakeSender sender) + { + this.sender = sender; + } + + public object Build(Type typeToBuild) + { + throw new NotImplementedException(); + } + + public T Build() + { + return (dynamic) sender; + } + + public IEnumerable BuildAll(Type typeToBuild) + { + throw new NotImplementedException(); + } + + public IEnumerable BuildAll() + { + throw new NotImplementedException(); + } + + public void BuildAndDispatch(Type typeToBuild, Action action) + { + throw new NotImplementedException(); + } + + public IBuilder CreateChildBuilder() + { + throw new NotImplementedException(); + } + + public void Dispose() + { + throw new NotImplementedException(); + } + + public void Release(object instance) + { + throw new NotImplementedException(); + } + } + + class FakeSender : ISendMessages + { + public void Send(TransportMessage message, SendOptions sendOptions) + { + SentMessage = message; + } + + public TransportMessage SentMessage { get; set; } + } + + class FakeContainer : IContainer + { + public void Dispose() + { + } + + public object Build(Type typeToBuild) + { + return null; + } + + public IContainer BuildChildContainer() + { + return null; + } + + public IEnumerable BuildAll(Type typeToBuild) + { + yield break; + } + + public void Configure(Type component, DependencyLifecycle dependencyLifecycle) + { + } + + public void Configure(Func component, DependencyLifecycle dependencyLifecycle) + { + } + + public void ConfigureProperty(Type component, string property, object value) + { + } + + public void RegisterSingleton(Type lookupType, object instance) + { + } + + public bool HasComponent(Type componentType) + { + return true; + } + + public void Release(object instance) + { + } + } + } +} diff --git a/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj b/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj index 4ea7c20f8f3..bca1b500774 100644 --- a/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj +++ b/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj @@ -43,6 +43,7 @@ false + TestDlls\NServiceBus.NewCompilerBits.dll @@ -97,7 +98,8 @@ - + + @@ -213,7 +215,6 @@ - diff --git a/src/NServiceBus.Core.Tests/Satellite/SatelliteLauncherContext.cs b/src/NServiceBus.Core.Tests/Satellite/SatelliteLauncherContext.cs index 64cbc1de8b9..5272d2cc7db 100644 --- a/src/NServiceBus.Core.Tests/Satellite/SatelliteLauncherContext.cs +++ b/src/NServiceBus.Core.Tests/Satellite/SatelliteLauncherContext.cs @@ -4,7 +4,7 @@ using System.Reflection; using System.Transactions; using Fakes; - using Faults; + using NServiceBus.Faults; using NUnit.Framework; using Satellites; using Settings; @@ -22,7 +22,7 @@ public abstract class SatelliteLauncherContext public void SetUp() { Builder = new FuncBuilder(); - InMemoryFaultManager = new Faults.InMemory.FaultManager(); + InMemoryFaultManager = new NServiceBus.Faults.InMemory.FaultManager(); FakeReceiver = new FakeReceiver(); var configurationBuilder = new BusConfiguration(); diff --git a/src/NServiceBus.Core.Tests/Transport/for_the_transactional_transport.cs b/src/NServiceBus.Core.Tests/Transport/for_the_transactional_transport.cs index b277a458e69..4c6a9995356 100644 --- a/src/NServiceBus.Core.Tests/Transport/for_the_transactional_transport.cs +++ b/src/NServiceBus.Core.Tests/Transport/for_the_transactional_transport.cs @@ -3,6 +3,7 @@ using System; using System.Transactions; using Fakes; + using NServiceBus.Faults; using NUnit.Framework; using Settings; using Unicast.Transport; @@ -21,5 +22,23 @@ public void SetUp() protected FakeReceiver fakeReceiver; protected TransportReceiver TransportReceiver; + + public class FakeFailureManager : IManageMessageFailures + { + public void SerializationFailedForMessage(TransportMessage message, Exception e) + { + + } + + public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e) + { + + } + + public void Init(Address address) + { + + } + } } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Faults/Forwarder/FaultManager.cs b/src/NServiceBus.Core/Faults/Forwarder/FaultManager.cs index 893eb53a71f..bc077ee1eef 100644 --- a/src/NServiceBus.Core/Faults/Forwarder/FaultManager.cs +++ b/src/NServiceBus.Core/Faults/Forwarder/FaultManager.cs @@ -1,21 +1,19 @@ namespace NServiceBus.Faults.Forwarder { using System; - using Logging; - using ObjectBuilder; - using SecondLevelRetries.Helpers; - using Transports; - using Unicast; - using Unicast.Queuing; + using NServiceBus.Logging; + using NServiceBus.ObjectBuilder; + using NServiceBus.SecondLevelRetries.Helpers; + using NServiceBus.Transports; + using NServiceBus.Unicast; + using NServiceBus.Unicast.Queuing; /// - /// Implementation of IManageMessageFailures by forwarding messages - /// using ISendMessages. + /// Implementation of IManageMessageFailures by forwarding messages using ISendMessages. /// class FaultManager : IManageMessageFailures { - readonly IBuilder builder; - readonly Configure config; + static ILog Logger = LogManager.GetLogger(); public FaultManager(IBuilder builder, Configure config) { @@ -23,6 +21,16 @@ public FaultManager(IBuilder builder, Configure config) this.config = config; } + /// + /// Endpoint to which message failures are forwarded + /// + public Address ErrorQueue { get; set; } + + /// + /// The address of the Second Level Retries input queue when SLR is enabled + /// + public Address RetriesQueue { get; set; } + void IManageMessageFailures.SerializationFailedForMessage(TransportMessage message, Exception e) { SendFailureMessage(message, e, true); @@ -44,12 +52,13 @@ void SendFailureMessage(TransportMessage message, Exception e, bool serializatio try { - var destinationQ = RetriesErrorQueue ?? ErrorQueue; - + var destinationQ = RetriesQueue ?? ErrorQueue; + // Intentionally service-locate ISendMessages to avoid circular // resolution problem in the container var sender = builder.Build(); + message.TimeToBeReceived = TimeSpan.MaxValue; if (serializationException || MessageWasSentFromSLR(message)) { sender.Send(message, new SendOptions(ErrorQueue)); @@ -59,7 +68,7 @@ void SendFailureMessage(TransportMessage message, Exception e, bool serializatio sender.Send(message, new SendOptions(destinationQ)); //HACK: We need this hack here till we refactor the SLR to be a first class concept in the TransportReceiver - if (RetriesErrorQueue == null) + if (RetriesQueue == null) { Logger.ErrorFormat("Message with '{0}' id has failed FLR and will be moved to the configured error queue.", message.Id); } @@ -77,7 +86,7 @@ void SendFailureMessage(TransportMessage message, Exception e, bool serializatio if (queueNotFoundException != null) { - errorMessage = string.Format("Could not forward failed message to error queue '{0}' as it could not be found.", queueNotFoundException.Queue); + errorMessage = $"Could not forward failed message to error queue '{queueNotFoundException.Queue}' as it could not be found."; Logger.Fatal(errorMessage); } else @@ -92,27 +101,19 @@ void SendFailureMessage(TransportMessage message, Exception e, bool serializatio bool MessageWasSentFromSLR(TransportMessage message) { - if (RetriesErrorQueue == null) + if (RetriesQueue == null) { return false; } // if the reply to address == ErrorQueue and RealErrorQueue is not null, the // SecondLevelRetries sat is running and the error happened within that sat. - return TransportMessageHeaderHelper.GetAddressOfFaultingEndpoint(message) == RetriesErrorQueue; + return TransportMessageHeaderHelper.GetAddressOfFaultingEndpoint(message) == RetriesQueue; } - /// - /// Endpoint to which message failures are forwarded - /// - public Address ErrorQueue { get; set; } - - /// - /// The address of the Second Level Retries input queue when SLR is enabled - /// - public Address RetriesErrorQueue { get; set; } + readonly IBuilder builder; + readonly Configure config; Address localAddress; - static ILog Logger = LogManager.GetLogger(); } -} +} \ No newline at end of file diff --git a/src/NServiceBus.Core/SecondLevelRetries/SecondLevelRetries.cs b/src/NServiceBus.Core/SecondLevelRetries/SecondLevelRetries.cs index 87f25624582..b9101503eb6 100644 --- a/src/NServiceBus.Core/SecondLevelRetries/SecondLevelRetries.cs +++ b/src/NServiceBus.Core/SecondLevelRetries/SecondLevelRetries.cs @@ -34,7 +34,7 @@ protected internal override void Setup(FeatureConfigurationContext context) } var container = context.Container; - container.ConfigureProperty(fm => fm.RetriesErrorQueue, processorAddress); + container.ConfigureProperty(fm => fm.RetriesQueue, processorAddress); container.ConfigureProperty(rs => rs.InputAddress, processorAddress); var retryPolicy = context.Settings.GetOrDefault>("SecondLevelRetries.RetryPolicy"); if (retryPolicy != null)