From 8af0bf85cfc1faa256a4fad9d01f96daf2b86da4 Mon Sep 17 00:00:00 2001 From: SzymonPobiega Date: Thu, 23 Oct 2014 14:12:18 +0200 Subject: [PATCH] Added a failing test proving StartedProcessingMessage is not invoked for messages without an ID Fixed StartedMessageProcessing behavior Added a failing test for invoking FinishedMessageProcessing after sending to SLR Added raising FinishedMessageProcessing when handing msg off to SLR remove some redundant props Patched Distributor in the core --- .../Distributor/Processing_SLR_message.cs | 133 ++++++++++++++++++ ...When_processing_a_message_without_an_id.cs | 99 +++++++++++++ .../NServiceBus.AcceptanceTests.csproj | 5 + .../When_sending_a_message_off_to_slr.cs | 55 +++++++- .../ReadyMessages/ReadyMessageSender.cs | 8 +- .../Unicast/Transport/TransportReceiver.cs | 33 +++-- 6 files changed, 318 insertions(+), 15 deletions(-) create mode 100644 src/NServiceBus.AcceptanceTests/Distributor/Processing_SLR_message.cs create mode 100644 src/NServiceBus.AcceptanceTests/Exceptions/When_processing_a_message_without_an_id.cs diff --git a/src/NServiceBus.AcceptanceTests/Distributor/Processing_SLR_message.cs b/src/NServiceBus.AcceptanceTests/Distributor/Processing_SLR_message.cs new file mode 100644 index 00000000000..6472cdc739c --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Distributor/Processing_SLR_message.cs @@ -0,0 +1,133 @@ +namespace NServiceBus.AcceptanceTests.Distributor +{ + using System; + using System.Messaging; + using NServiceBus.AcceptanceTesting; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NServiceBus.AcceptanceTests.ScenarioDescriptors; + using NServiceBus.Config; + using NServiceBus.Transports.Msmq; + using NUnit.Framework; + + [TestFixture] + public class Processing_SLR_message : NServiceBusAcceptanceTest + { + static TimeSpan SlrDelay = TimeSpan.FromSeconds(5); + + [Test] + public void Worker_should_sends_a_ready_message_to_the_distributor() + { + try + { + var queue = new MessageQueue(MsmqUtilities.GetFullPath(Address.Parse("distributor.distributor.processingslrmessage.msmq.distributor.storage")), false, true, QueueAccessMode.Receive); + queue.Purge(); + } + // ReSharper disable once EmptyGeneralCatchClause + catch (Exception) + { + //NOOP + } + + var context = new Context + { + Id = Guid.NewGuid() + }; + Scenario.Define(context) + .WithEndpoint(b => b + .Given((bus, c) => bus.Send(new MyMessage + { + Id = c.Id + })) + ) + .WithEndpoint() + .WithEndpoint() + .Done(c => c.SecondAttemptSucceeded) + .Repeat(r => r.For(Transports.Msmq)) + .Run(); + + Assert.IsTrue(context.FirstAttemptFailed); + Assert.IsTrue(context.SecondAttemptSucceeded); + } + + public class Context : ScenarioContext + { + public Guid Id { get; set; } + public bool FirstAttemptFailed { get; set; } + public bool SecondAttemptSucceeded { get; set; } + } + + public class Client : EndpointConfigurationBuilder + { + public Client() + { + EndpointSetup() + .AddMapping(typeof(Distributor)); + } + } + + public class Distributor : EndpointConfigurationBuilder + { + public Distributor() + { + EndpointSetup(c => c.RunDistributor(false)); + } + } + + public class Worker : EndpointConfigurationBuilder + { + public Worker() + { + EndpointSetup(c => c.EnlistWithDistributor()) + .AllowExceptions() + .WithConfig(c => + { + c.MaxRetries = 0; //to skip the FLR + }) + .WithConfig(c => + { + c.DistributorControlAddress = "distributor.distributor.processingslrmessage.msmq.distributor.control"; + c.DistributorDataAddress = "distributor.distributor.processingslrmessage.msmq"; + }) + .WithConfig(c => + { + c.NumberOfRetries = 1; + c.TimeIncrease = SlrDelay; + }).WithConfig(c => + { + c.Node = "particular.net"; + }); + } + + public class MyMessageHandler : IHandleMessages + { + public Context Context { get; set; } + + public IBus Bus { get; set; } + + public void Handle(MyMessage request) + { + if (Context.Id != request.Id) + { + return; + } + + if (!Context.FirstAttemptFailed) + { + Context.FirstAttemptFailed = true; + throw new Exception("Triggering SLR"); + } + if (Bus.CurrentMessageContext.Headers.ContainsKey(Headers.Retries)) + { + Context.SecondAttemptSucceeded = true; + } + } + } + } + + [Serializable] + public class MyMessage : IMessage + { + public Guid Id { get; set; } + } + } +} diff --git a/src/NServiceBus.AcceptanceTests/Exceptions/When_processing_a_message_without_an_id.cs b/src/NServiceBus.AcceptanceTests/Exceptions/When_processing_a_message_without_an_id.cs new file mode 100644 index 00000000000..69218e90810 --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Exceptions/When_processing_a_message_without_an_id.cs @@ -0,0 +1,99 @@ +namespace NServiceBus.AcceptanceTests.Exceptions +{ + using System; + using NServiceBus.AcceptanceTesting; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NServiceBus.Config; + using NServiceBus.MessageMutator; + using NServiceBus.Unicast; + using NServiceBus.Unicast.Transport; + using NUnit.Framework; + + public class When_processing_a_message_without_an_id : NServiceBusAcceptanceTest + { + [Test] + public void Should_invoke_start_message_processing_listeners() + { + var context = new Context(); + + Scenario.Define(context) + .WithEndpoint(b => b.Given(bus => bus.SendLocal(new Message()))) + .Done(c => c.StartMessageProcessingCalled) + .Run(); + + Assert.IsTrue(context.StartMessageProcessingCalled); + } + + public class Context : ScenarioContext + { + public bool StartMessageProcessingCalled { get; set; } + } + + public class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() + { + EndpointSetup(c => + { + c.Configurer.ConfigureComponent(DependencyLifecycle.InstancePerCall); + c.Configurer.ConfigureComponent(DependencyLifecycle.SingleInstance); + c.DisableTimeoutManager(); + }) + .WithConfig(c => + { + c.MaxRetries = 0; + }) + .AllowExceptions(); + } + + class StartProcessingListener : IWantToRunWhenBusStartsAndStops + { + Context context; + + public StartProcessingListener(UnicastBus bus, Context context) + { + this.context = context; + bus.Transport.StartedMessageProcessing += transport_StartedMessageProcessing; + } + + void transport_StartedMessageProcessing(object sender, StartedMessageProcessingEventArgs e) + { + context.StartMessageProcessingCalled = true; + } + + public void Start() + { + } + + public void Stop() + { + } + } + + class CorruptionMutator : IMutateTransportMessages + { + public void MutateIncoming(TransportMessage transportMessage) + { + } + + public void MutateOutgoing(object[] messages, TransportMessage transportMessage) + { + transportMessage.Headers[Headers.MessageId] = ""; + } + } + + class Handler : IHandleMessages + { + public void Handle(Message message) + { + } + } + } + + [Serializable] + public class Message : IMessage + { + } + } + +} \ No newline at end of file diff --git a/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj b/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj index 4e846f7ef94..14924ff7825 100644 --- a/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj +++ b/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj @@ -23,6 +23,7 @@ prompt 4 true + 618 pdbonly @@ -32,6 +33,7 @@ prompt 4 true + 618 @@ -97,6 +99,7 @@ + @@ -108,6 +111,7 @@ + @@ -118,6 +122,7 @@ + diff --git a/src/NServiceBus.AcceptanceTests/Retries/When_sending_a_message_off_to_slr.cs b/src/NServiceBus.AcceptanceTests/Retries/When_sending_a_message_off_to_slr.cs index a33530c57e3..4f1a8c6fdad 100644 --- a/src/NServiceBus.AcceptanceTests/Retries/When_sending_a_message_off_to_slr.cs +++ b/src/NServiceBus.AcceptanceTests/Retries/When_sending_a_message_off_to_slr.cs @@ -7,6 +7,8 @@ using EndpointTemplates; using AcceptanceTesting; using MessageMutator; + using NServiceBus.Unicast; + using NServiceBus.Unicast.Transport; using NUnit.Framework; #pragma warning disable 612, 618 @@ -26,6 +28,21 @@ public void Should_preserve_the_original_body_for_regular_exceptions() Assert.AreEqual(context.OriginalBodyChecksum, context.SlrChecksum, "The body of the message sent to slr should be the same as the original message coming off the queue"); } + + [Test] + public void Should_raise_FinishedMessageProcessing_event() + { + var context = new Context(); + + Scenario.Define(context) + .WithEndpoint(b => b.Given(bus => bus.SendLocal(new MessageToBeRetried()))) + .Done(c => c.FinishedMessageProcessingCalledAfterFaultManagerInvoked) + .Run(); + + Assert.IsTrue(context.FinishedMessageProcessingCalledAfterFaultManagerInvoked); + + } + [Test] public void Should_preserve_the_original_body_for_serialization_exceptions() { @@ -45,6 +62,9 @@ public void Should_preserve_the_original_body_for_serialization_exceptions() public class Context : ScenarioContext { + public bool FinishedMessageProcessingCalledAfterFaultManagerInvoked { get; set; } + public bool FaultManagerInvoked { get; set; } + public byte OriginalBodyChecksum { get; set; } public byte SlrChecksum { get; set; } @@ -56,7 +76,11 @@ public class RetryEndpoint : EndpointConfigurationBuilder { public RetryEndpoint() { - EndpointSetup(c => c.Configurer.ConfigureComponent(DependencyLifecycle.SingleInstance)) + EndpointSetup(c => + { + c.Configurer.ConfigureComponent(DependencyLifecycle.SingleInstance); + c.Configurer.ConfigureComponent(DependencyLifecycle.SingleInstance); + }) .WithConfig(c => { c.MaxRetries = 0; @@ -64,6 +88,33 @@ public RetryEndpoint() .AllowExceptions(); } + class FinishedProcessingListener : IWantToRunWhenBusStartsAndStops + { + readonly Context context; + + public FinishedProcessingListener(UnicastBus bus, Context context) + { + this.context = context; + bus.Transport.FinishedMessageProcessing += Transport_FinishedMessageProcessing; + } + + void Transport_FinishedMessageProcessing(object sender, FinishedMessageProcessingEventArgs e) + { + if (context.FaultManagerInvoked) + { + context.FinishedMessageProcessingCalledAfterFaultManagerInvoked = true; + } + } + + public void Start() + { + } + + public void Stop() + { + } + } + class BodyMutator : IMutateTransportMessages, NServiceBus.INeedInitialization { public Context Context { get; set; } @@ -106,11 +157,13 @@ class CustomFaultManager : IManageMessageFailures public void SerializationFailedForMessage(TransportMessage message, Exception e) { + Context.FaultManagerInvoked = true; Context.SlrChecksum = Checksum(message.Body); } public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e) { + Context.FaultManagerInvoked = true; Context.SlrChecksum = Checksum(message.Body); } diff --git a/src/NServiceBus.Core/Distributor/ReadyMessages/ReadyMessageSender.cs b/src/NServiceBus.Core/Distributor/ReadyMessages/ReadyMessageSender.cs index d01f570dc49..5a14bcd0de6 100644 --- a/src/NServiceBus.Core/Distributor/ReadyMessages/ReadyMessageSender.cs +++ b/src/NServiceBus.Core/Distributor/ReadyMessages/ReadyMessageSender.cs @@ -28,7 +28,7 @@ public void Start() var capacityAvailable = transport.MaximumConcurrencyLevel; SendReadyMessage(capacityAvailable, true); - transport.FinishedMessageProcessing += TransportOnFinishedMessageProcessing; + transport.StartedMessageProcessing += TransportOnStartedMessageProcessing; } public void Stop() @@ -36,11 +36,11 @@ public void Stop() //transport will be null if !WorkerRunsOnThisEndpoint if (transport != null) { - transport.FinishedMessageProcessing -= TransportOnFinishedMessageProcessing; + transport.StartedMessageProcessing -= TransportOnStartedMessageProcessing; } } - void TransportOnFinishedMessageProcessing(object sender, FinishedMessageProcessingEventArgs eventArgs) + void TransportOnStartedMessageProcessing(object sender, StartedMessageProcessingEventArgs startedMessageProcessingEventArgs) { //if there was a failure this "send" will be rolled back SendReadyMessage(); @@ -54,7 +54,9 @@ void SendReadyMessage(int capacityAvailable = 1, bool isStarting = false) readyMessage.Headers.Add(Headers.WorkerCapacityAvailable, capacityAvailable.ToString()); if (isStarting) + { readyMessage.Headers.Add(Headers.WorkerStarting, Boolean.TrueString); + } MessageSender.Send(readyMessage, DistributorControlAddress); } diff --git a/src/NServiceBus.Core/Unicast/Transport/TransportReceiver.cs b/src/NServiceBus.Core/Unicast/Transport/TransportReceiver.cs index 891a92cc97f..b6221c8961c 100644 --- a/src/NServiceBus.Core/Unicast/Transport/TransportReceiver.cs +++ b/src/NServiceBus.Core/Unicast/Transport/TransportReceiver.cs @@ -188,7 +188,7 @@ public void Start(Address address) if (Configure.Instance.WorkerRunsOnThisEndpoint() && (returnAddressForFailures.Queue.ToLower().EndsWith(".worker") || address == Address.Local)) - //this is a hack until we can refactor the SLR to be a feature. "Worker" is there to catch the local worker in the distributor + //this is a hack until we can refactor the SLR to be a feature. "Worker" is there to catch the local worker in the distributor { returnAddressForFailures = Configure.Instance.GetMasterNodeAddress(); @@ -313,15 +313,6 @@ void EndProcess(TransportMessage message, Exception ex) } void ProcessMessage(TransportMessage message) { - if (string.IsNullOrWhiteSpace(message.Id)) - { - Logger.Error("Message without message id detected"); - - FailureManager.SerializationFailedForMessage(message, - new SerializationException("Message without message id received.")); - - return; - } try { OnStartedMessageProcessing(message); @@ -336,8 +327,27 @@ void ProcessMessage(TransportMessage message) throw; } + if (string.IsNullOrWhiteSpace(message.Id)) + { + Logger.Error("Message without message id detected"); + + FailureManager.SerializationFailedForMessage(message, + new SerializationException("Message without message id received.")); + + return; + } + + if (ShouldExitBecauseOfRetries(message)) { + try + { + OnFinishedMessageProcessing(message); + } + catch (Exception exception) + { + Logger.Error("Failed raising 'finished message processing' event.", exception); + } return; } @@ -456,7 +466,8 @@ public void DisposeManaged() } - [ThreadStatic] static volatile bool needToAbort; + [ThreadStatic] + static volatile bool needToAbort; static readonly ILog Logger = LogManager.GetLogger(typeof(TransportReceiver)); readonly object changeMaximumMessageThroughputPerSecondLock = new object();