Skip to content

Commit

Permalink
FinishedMessageProcessing not fired for messages that fail FLR and SLR
Browse files Browse the repository at this point in the history
Added a failing test proving StartedProcessingMessage is not invoked for messages without an ID
Fixed StartedMessageProcessing behavior
Added a failing test for invoking FinishedMessageProcessing after sending to SLR
Added raising FinishedMessageProcessing when handing msg off to SLR
Patched Distributor in the core
  • Loading branch information
John Simons committed Oct 24, 2014
1 parent 2ca53cb commit fea2216
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
namespace NServiceBus.AcceptanceTests.Exceptions
{
using System;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Config;
using NServiceBus.MessageMutator;
using NServiceBus.Unicast;
using NServiceBus.Unicast.Messages;
using NServiceBus.Unicast.Transport;
using NUnit.Framework;

public class Message_without_an_id : NServiceBusAcceptanceTest
{
[Test]
public void Should_invoke_start_message_processing_listeners()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<Endpoint>()
.Done(c => c.StartMessageProcessingCalled)
.Run();

Assert.IsTrue(context.StartMessageProcessingCalled);
}

public class Context : ScenarioContext
{
public bool StartMessageProcessingCalled { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>()
.WithConfig<TransportConfig>(c =>
{
c.MaxRetries = 0;
});
}

class StartProcessingListener : IWantToRunWhenBusStartsAndStops
{
readonly UnicastBus bus;
Context context;

public StartProcessingListener(UnicastBus bus, Context context)
{
this.bus = bus;
this.context = context;
bus.Transport.StartedMessageProcessing += transport_StartedMessageProcessing;
}

void transport_StartedMessageProcessing(object sender, StartedMessageProcessingEventArgs e)
{
context.StartMessageProcessingCalled = true;
}

public void Start()
{
bus.SendLocal(new Message());
}

public void Stop()
{
bus.Transport.StartedMessageProcessing -= transport_StartedMessageProcessing;
}
}

class CorruptionMutator : IMutateOutgoingTransportMessages
{
public void MutateOutgoing(LogicalMessage logicalMessage, TransportMessage transportMessage)
{
transportMessage.Headers[Headers.MessageId] = "";
}
}

class Handler : IHandleMessages<Message>
{
public void Handle(Message message)
{
}
}
}

[Serializable]
public class Message : IMessage
{
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
<Compile Include="Config\When__startup_is_complete.cs" />
<Compile Include="CriticalError\When_registering_a_custom_criticalErrorHandler.cs" />
<Compile Include="DataBus\When_sending_with_custom_IDataBus.cs" />
<Compile Include="Exceptions\Message_without_an_id.cs" />
<Compile Include="Sagas\When_message_has_a_saga_id.cs" />
<Compile Include="Volatile\When_sending_to_non_durable_endpoint.cs" />
<Compile Include="Encryption\When_using_Rijndael_with_multikey.cs" />
Expand Down
51 changes: 50 additions & 1 deletion src/NServiceBus.AcceptanceTests/Retries/When_sending_to_slr.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
using AcceptanceTesting;
using MessageMutator;
using NServiceBus.Config;
using NServiceBus.Unicast;
using NServiceBus.Unicast.Transport;
using NUnit.Framework;
using Unicast.Messages;

Expand All @@ -26,6 +28,21 @@ public void Should_preserve_the_original_body_for_regular_exceptions()
Assert.AreEqual(context.OriginalBodyChecksum, context.SlrChecksum, "The body of the message sent to slr should be the same as the original message coming off the queue");

}

[Test]
public void Should_raise_FinishedMessageProcessing_event()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<RetryEndpoint>(b => b.Given(bus => bus.SendLocal(new MessageToBeRetried())))
.Done(c => c.FinishedMessageProcessingCalledAfterFaultManagerInvoked)
.Run();

Assert.IsTrue(context.FinishedMessageProcessingCalledAfterFaultManagerInvoked);

}

[Test]
public void Should_preserve_the_original_body_for_serialization_exceptions()
{
Expand All @@ -46,6 +63,9 @@ public void Should_preserve_the_original_body_for_serialization_exceptions()

public class Context : ScenarioContext
{
public bool FinishedMessageProcessingCalledAfterFaultManagerInvoked { get; set; }
public bool FaultManagerInvoked { get; set; }

public byte OriginalBodyChecksum { get; set; }

public byte SlrChecksum { get; set; }
Expand All @@ -65,6 +85,33 @@ public RetryEndpoint()
});
}

class FinishedProcessingListener : IWantToRunWhenBusStartsAndStops
{
readonly Context context;

public FinishedProcessingListener(UnicastBus bus, Context context)
{
this.context = context;
bus.Transport.FinishedMessageProcessing += Transport_FinishedMessageProcessing;
}

void Transport_FinishedMessageProcessing(object sender, FinishedMessageProcessingEventArgs e)
{
if (context.FaultManagerInvoked)
{
context.FinishedMessageProcessingCalledAfterFaultManagerInvoked = true;
}
}

public void Start()
{
}

public void Stop()
{
}
}

class BodyMutator : IMutateTransportMessages, INeedInitialization
{
public Context Context { get; set; }
Expand Down Expand Up @@ -107,11 +154,13 @@ class CustomFaultManager : IManageMessageFailures

public void SerializationFailedForMessage(TransportMessage message, Exception e)
{
Context.FaultManagerInvoked = true;
Context.SlrChecksum = Checksum(message.Body);
}

public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e)
{
Context.FaultManagerInvoked = true;
Context.SlrChecksum = Checksum(message.Body);
}

Expand Down Expand Up @@ -141,4 +190,4 @@ public class MessageToBeRetried : IMessage
{
}
}
}
}
33 changes: 22 additions & 11 deletions src/NServiceBus.Core/Unicast/Transport/TransportReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void Start(Address address)

if (workerRunsOnThisEndpoint
&& (returnAddressForFailures.Queue.ToLower().EndsWith(".worker") || address == config.LocalAddress))
//this is a hack until we can refactor the SLR to be a feature. "Worker" is there to catch the local worker in the distributor
//this is a hack until we can refactor the SLR to be a feature. "Worker" is there to catch the local worker in the distributor
{
returnAddressForFailures = settings.Get<Address>("MasterNode.Address");

Expand Down Expand Up @@ -284,15 +284,6 @@ void EndProcess(TransportMessage message, Exception ex)

void ProcessMessage(TransportMessage message)
{
if (string.IsNullOrWhiteSpace(message.Id))
{
Logger.Error("Message without message id detected");

FailureManager.SerializationFailedForMessage(message,
new SerializationException("Message without message id received."));

return;
}
try
{
OnStartedMessageProcessing(message);
Expand All @@ -307,8 +298,27 @@ void ProcessMessage(TransportMessage message)
throw;
}

if (string.IsNullOrWhiteSpace(message.Id))
{
Logger.Error("Message without message id detected");

FailureManager.SerializationFailedForMessage(message,
new SerializationException("Message without message id received."));

return;
}


if (ShouldExitBecauseOfRetries(message))
{
try
{
OnFinishedMessageProcessing(message);
}
catch (Exception exception)
{
Logger.Error("Failed raising 'finished message processing' event.", exception);
}
return;
}

Expand Down Expand Up @@ -427,7 +437,8 @@ void DisposeManaged()
}


[ThreadStatic] static volatile bool needToAbort;
[ThreadStatic]
static volatile bool needToAbort;

static ILog Logger = LogManager.GetLogger<TransportReceiver>();
object changeMaximumMessageThroughputPerSecondLock = new object();
Expand Down

0 comments on commit fea2216

Please sign in to comment.