From fea2216dd30f25cdeb381b7131ce5fe9b2c83c88 Mon Sep 17 00:00:00 2001 From: John Simons Date: Fri, 24 Oct 2014 11:41:30 +1000 Subject: [PATCH] FinishedMessageProcessing not fired for messages that fail FLR and SLR Added a failing test proving StartedProcessingMessage is not invoked for messages without an ID Fixed StartedMessageProcessing behavior Added a failing test for invoking FinishedMessageProcessing after sending to SLR Added raising FinishedMessageProcessing when handing msg off to SLR Patched Distributor in the core --- .../Exceptions/Message_without_an_id.cs | 94 +++++++++++++++++++ .../NServiceBus.AcceptanceTests.csproj | 1 + .../Retries/When_sending_to_slr.cs | 51 +++++++++- .../Unicast/Transport/TransportReceiver.cs | 33 ++++--- 4 files changed, 167 insertions(+), 12 deletions(-) create mode 100644 src/NServiceBus.AcceptanceTests/Exceptions/Message_without_an_id.cs diff --git a/src/NServiceBus.AcceptanceTests/Exceptions/Message_without_an_id.cs b/src/NServiceBus.AcceptanceTests/Exceptions/Message_without_an_id.cs new file mode 100644 index 00000000000..56ea06560f3 --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Exceptions/Message_without_an_id.cs @@ -0,0 +1,94 @@ +namespace NServiceBus.AcceptanceTests.Exceptions +{ + using System; + using NServiceBus.AcceptanceTesting; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NServiceBus.Config; + using NServiceBus.MessageMutator; + using NServiceBus.Unicast; + using NServiceBus.Unicast.Messages; + using NServiceBus.Unicast.Transport; + using NUnit.Framework; + + public class Message_without_an_id : NServiceBusAcceptanceTest + { + [Test] + public void Should_invoke_start_message_processing_listeners() + { + var context = new Context(); + + Scenario.Define(context) + .WithEndpoint() + .Done(c => c.StartMessageProcessingCalled) + .Run(); + + Assert.IsTrue(context.StartMessageProcessingCalled); + } + + public class Context : ScenarioContext + { + public bool StartMessageProcessingCalled { get; set; } + } + + public class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() + { + EndpointSetup() + .WithConfig(c => + { + c.MaxRetries = 0; + }); + } + + class StartProcessingListener : IWantToRunWhenBusStartsAndStops + { + readonly UnicastBus bus; + Context context; + + public StartProcessingListener(UnicastBus bus, Context context) + { + this.bus = bus; + this.context = context; + bus.Transport.StartedMessageProcessing += transport_StartedMessageProcessing; + } + + void transport_StartedMessageProcessing(object sender, StartedMessageProcessingEventArgs e) + { + context.StartMessageProcessingCalled = true; + } + + public void Start() + { + bus.SendLocal(new Message()); + } + + public void Stop() + { + bus.Transport.StartedMessageProcessing -= transport_StartedMessageProcessing; + } + } + + class CorruptionMutator : IMutateOutgoingTransportMessages + { + public void MutateOutgoing(LogicalMessage logicalMessage, TransportMessage transportMessage) + { + transportMessage.Headers[Headers.MessageId] = ""; + } + } + + class Handler : IHandleMessages + { + public void Handle(Message message) + { + } + } + } + + [Serializable] + public class Message : IMessage + { + } + } + +} \ No newline at end of file diff --git a/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj b/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj index fe759cd0758..92181ed356b 100644 --- a/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj +++ b/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj @@ -65,6 +65,7 @@ + diff --git a/src/NServiceBus.AcceptanceTests/Retries/When_sending_to_slr.cs b/src/NServiceBus.AcceptanceTests/Retries/When_sending_to_slr.cs index 668f68d853b..538c0347520 100644 --- a/src/NServiceBus.AcceptanceTests/Retries/When_sending_to_slr.cs +++ b/src/NServiceBus.AcceptanceTests/Retries/When_sending_to_slr.cs @@ -7,6 +7,8 @@ using AcceptanceTesting; using MessageMutator; using NServiceBus.Config; + using NServiceBus.Unicast; + using NServiceBus.Unicast.Transport; using NUnit.Framework; using Unicast.Messages; @@ -26,6 +28,21 @@ public void Should_preserve_the_original_body_for_regular_exceptions() Assert.AreEqual(context.OriginalBodyChecksum, context.SlrChecksum, "The body of the message sent to slr should be the same as the original message coming off the queue"); } + + [Test] + public void Should_raise_FinishedMessageProcessing_event() + { + var context = new Context(); + + Scenario.Define(context) + .WithEndpoint(b => b.Given(bus => bus.SendLocal(new MessageToBeRetried()))) + .Done(c => c.FinishedMessageProcessingCalledAfterFaultManagerInvoked) + .Run(); + + Assert.IsTrue(context.FinishedMessageProcessingCalledAfterFaultManagerInvoked); + + } + [Test] public void Should_preserve_the_original_body_for_serialization_exceptions() { @@ -46,6 +63,9 @@ public void Should_preserve_the_original_body_for_serialization_exceptions() public class Context : ScenarioContext { + public bool FinishedMessageProcessingCalledAfterFaultManagerInvoked { get; set; } + public bool FaultManagerInvoked { get; set; } + public byte OriginalBodyChecksum { get; set; } public byte SlrChecksum { get; set; } @@ -65,6 +85,33 @@ public RetryEndpoint() }); } + class FinishedProcessingListener : IWantToRunWhenBusStartsAndStops + { + readonly Context context; + + public FinishedProcessingListener(UnicastBus bus, Context context) + { + this.context = context; + bus.Transport.FinishedMessageProcessing += Transport_FinishedMessageProcessing; + } + + void Transport_FinishedMessageProcessing(object sender, FinishedMessageProcessingEventArgs e) + { + if (context.FaultManagerInvoked) + { + context.FinishedMessageProcessingCalledAfterFaultManagerInvoked = true; + } + } + + public void Start() + { + } + + public void Stop() + { + } + } + class BodyMutator : IMutateTransportMessages, INeedInitialization { public Context Context { get; set; } @@ -107,11 +154,13 @@ class CustomFaultManager : IManageMessageFailures public void SerializationFailedForMessage(TransportMessage message, Exception e) { + Context.FaultManagerInvoked = true; Context.SlrChecksum = Checksum(message.Body); } public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e) { + Context.FaultManagerInvoked = true; Context.SlrChecksum = Checksum(message.Body); } @@ -141,4 +190,4 @@ public class MessageToBeRetried : IMessage { } } -} \ No newline at end of file +} diff --git a/src/NServiceBus.Core/Unicast/Transport/TransportReceiver.cs b/src/NServiceBus.Core/Unicast/Transport/TransportReceiver.cs index 234ccdee96b..8666489dd7c 100644 --- a/src/NServiceBus.Core/Unicast/Transport/TransportReceiver.cs +++ b/src/NServiceBus.Core/Unicast/Transport/TransportReceiver.cs @@ -158,7 +158,7 @@ public void Start(Address address) if (workerRunsOnThisEndpoint && (returnAddressForFailures.Queue.ToLower().EndsWith(".worker") || address == config.LocalAddress)) - //this is a hack until we can refactor the SLR to be a feature. "Worker" is there to catch the local worker in the distributor + //this is a hack until we can refactor the SLR to be a feature. "Worker" is there to catch the local worker in the distributor { returnAddressForFailures = settings.Get
("MasterNode.Address"); @@ -284,15 +284,6 @@ void EndProcess(TransportMessage message, Exception ex) void ProcessMessage(TransportMessage message) { - if (string.IsNullOrWhiteSpace(message.Id)) - { - Logger.Error("Message without message id detected"); - - FailureManager.SerializationFailedForMessage(message, - new SerializationException("Message without message id received.")); - - return; - } try { OnStartedMessageProcessing(message); @@ -307,8 +298,27 @@ void ProcessMessage(TransportMessage message) throw; } + if (string.IsNullOrWhiteSpace(message.Id)) + { + Logger.Error("Message without message id detected"); + + FailureManager.SerializationFailedForMessage(message, + new SerializationException("Message without message id received.")); + + return; + } + + if (ShouldExitBecauseOfRetries(message)) { + try + { + OnFinishedMessageProcessing(message); + } + catch (Exception exception) + { + Logger.Error("Failed raising 'finished message processing' event.", exception); + } return; } @@ -427,7 +437,8 @@ void DisposeManaged() } - [ThreadStatic] static volatile bool needToAbort; + [ThreadStatic] + static volatile bool needToAbort; static ILog Logger = LogManager.GetLogger(); object changeMaximumMessageThroughputPerSecondLock = new object();