Skip to content

Commit

Permalink
Merge pull request #3372 from Particular/hotfix-5.2.14
Browse files Browse the repository at this point in the history
Hotfix 5.2.14
  • Loading branch information
danielmarbach committed Jan 28, 2016
2 parents 43a114a + 051644f commit 2a3f11c
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 48 deletions.
23 changes: 0 additions & 23 deletions src/NServiceBus.Core.Tests/Fakes/FakeFailureManager.cs

This file was deleted.

168 changes: 168 additions & 0 deletions src/NServiceBus.Core.Tests/Faults/FaultManagerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
namespace NServiceBus.Core.Tests.Faults
{
using System;
using System.Collections.Generic;
using NServiceBus.Faults;
using NServiceBus.Faults.Forwarder;
using NServiceBus.ObjectBuilder;
using NServiceBus.ObjectBuilder.Common;
using NServiceBus.SecondLevelRetries;
using NServiceBus.Settings;
using NServiceBus.Transports;
using NServiceBus.Unicast;
using NUnit.Framework;

[TestFixture]
public class FaultManagerTests
{
private IManageMessageFailures faultManager;
private FakeSender fakeSender;

[SetUp]
public void SetUp()
{
fakeSender = new FakeSender();
var settingsHolder = new SettingsHolder();
faultManager = new FaultManager(fakeSender, new Configure(settingsHolder, new FakeContainer(), new List<Action<IConfigureComponents>>(), null), new BusNotifications());
faultManager.Init(new Address("fake", "fake"));
}

[Test]
public void SendingToErrorQueue_WhenSerializationFailedForMessage_ShouldRemoveTTBR()
{
var exception = new InvalidOperationException();
var message = new TransportMessage("id", new Dictionary<string, string>());
message.TimeToBeReceived = TimeSpan.FromHours(1);

faultManager.SerializationFailedForMessage(message, exception);

var sentMessage = fakeSender.SentMessage;
Assert.AreEqual(TimeSpan.MaxValue, sentMessage.TimeToBeReceived);
}

[Test]
public void SendingToErrorQueue_ProcessingAlwaysFailsForMessage_WhenSentFromSLR_ShouldRemoveTTBR()
{
var exception = new InvalidOperationException();
var message = new TransportMessage("id", new Dictionary<string, string>());
message.TimeToBeReceived = TimeSpan.FromHours(1);

((FaultManager) faultManager).RetriesQueue = new Address("fake", "fake");

faultManager.ProcessingAlwaysFailsForMessage(message, exception);

var sentMessage = fakeSender.SentMessage;
Assert.AreEqual(TimeSpan.MaxValue, sentMessage.TimeToBeReceived);
}

[Test]
public void SendingToErrorQueue_ProcessingAlwaysFailsForMessage_WhenRetriesQueueIsNull_ShouldRemoveTTBR()
{
var exception = new InvalidOperationException();
var message = new TransportMessage("id", new Dictionary<string, string>());
message.TimeToBeReceived = TimeSpan.FromHours(1);

faultManager.ProcessingAlwaysFailsForMessage(message, exception);

var sentMessage = fakeSender.SentMessage;
Assert.AreEqual(TimeSpan.MaxValue, sentMessage.TimeToBeReceived);
}

[Test]
public void SendingToErrorQueue_ProcessingAlwaysFailsForMessage_WhenPolicyReturnsTimeSpanZeroOrLess_ShouldRemoveTTBR()
{
var exception = new InvalidOperationException();
var message = new TransportMessage("id", new Dictionary<string, string>());
message.TimeToBeReceived = TimeSpan.FromHours(1);

var manager = (FaultManager)faultManager;
manager.RetriesQueue = new Address("retries", "fake");
manager.SecondLevelRetriesConfiguration = new SecondLevelRetriesConfiguration
{
RetryPolicy = tm => TimeSpan.MinValue,
};

faultManager.ProcessingAlwaysFailsForMessage(message, exception);

var sentMessage = fakeSender.SentMessage;
Assert.AreEqual(TimeSpan.MaxValue, sentMessage.TimeToBeReceived);
}

[Test]
public void SendingToRetriesQueue_ProcessingAlwaysFailsForMessage_ShouldRemoveTTBR()
{
var exception = new InvalidOperationException();
var message = new TransportMessage("id", new Dictionary<string, string>());
message.TimeToBeReceived = TimeSpan.FromHours(1);

var manager = (FaultManager)faultManager;
manager.RetriesQueue = new Address("retries", "fake");
manager.SecondLevelRetriesConfiguration = new SecondLevelRetriesConfiguration
{
RetryPolicy = tm => TimeSpan.MaxValue,
};

faultManager.ProcessingAlwaysFailsForMessage(message, exception);

var sentMessage = fakeSender.SentMessage;
Assert.AreEqual(TimeSpan.MaxValue, sentMessage.TimeToBeReceived);
}

class FakeSender : ISendMessages
{
public void Send(TransportMessage message, SendOptions sendOptions)
{
SentMessage = message;
}

public TransportMessage SentMessage { get; set; }
}

class FakeContainer : IContainer
{
public void Dispose()
{
}

public object Build(Type typeToBuild)
{
return null;
}

public IContainer BuildChildContainer()
{
return null;
}

public IEnumerable<object> BuildAll(Type typeToBuild)
{
yield break;
}

public void Configure(Type component, DependencyLifecycle dependencyLifecycle)
{
}

public void Configure<T>(Func<T> component, DependencyLifecycle dependencyLifecycle)
{
}

public void ConfigureProperty(Type component, string property, object value)
{
}

public void RegisterSingleton(Type lookupType, object instance)
{
}

public bool HasComponent(Type componentType)
{
return true;
}

public void Release(object instance)
{
}
}
}
}
3 changes: 2 additions & 1 deletion src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
<Compile Include="DataBus\InMemoryDataBus.cs" />
<Compile Include="Encryption\ValidationFixture.cs" />
<Compile Include="Encryption\When_message_contains_props_and_fields_that_cannot_be_set.cs" />
<Compile Include="Faults\FaultManagerTests.cs" />
<Compile Include="Features\FeatureDefaultsTests.cs" />
<Compile Include="Logging\DefaultFactoryTests.cs" />
<Compile Include="Msmq\TimeToBeRceivedOverrideCheckerTest.cs" />
Expand Down Expand Up @@ -238,7 +239,6 @@
<Compile Include="Timeout\When_pooling_timeouts.cs" />
<Compile Include="Timeout\When_receiving_timeouts.cs" />
<Compile Include="Timeout\When_removing_timeouts_from_the_storage.cs" />
<Compile Include="Fakes\FakeFailureManager.cs" />
<Compile Include="Fakes\FakeReceiver.cs" />
<Compile Include="Transport\for_the_transactional_transport.cs" />
<Compile Include="Transport\When_specifying_a_non_zero_throughput_limit.cs" />
Expand Down Expand Up @@ -307,5 +307,6 @@
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup />
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
using System.Reflection;
using System.Transactions;
using Fakes;
using Faults;
using NServiceBus.Faults;
using NUnit.Framework;
using Satellites;
using Settings;
Expand All @@ -22,7 +22,7 @@ public abstract class SatelliteLauncherContext
public void SetUp()
{
Builder = new FuncBuilder();
InMemoryFaultManager = new Faults.InMemory.FaultManager();
InMemoryFaultManager = new NServiceBus.Faults.InMemory.FaultManager();
FakeReceiver = new FakeReceiver();

var configurationBuilder = new BusConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System;
using System.Transactions;
using Fakes;
using NServiceBus.Faults;
using NUnit.Framework;
using Settings;
using Unicast.Transport;
Expand All @@ -21,5 +22,23 @@ public void SetUp()

protected FakeReceiver fakeReceiver;
protected TransportReceiver TransportReceiver;

class FakeFailureManager : IManageMessageFailures
{
public void SerializationFailedForMessage(TransportMessage message, Exception e)
{

}

public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e)
{

}

public void Init(Address address)
{

}
}
}
}
56 changes: 35 additions & 21 deletions src/NServiceBus.Core/Faults/Forwarder/FaultManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public FaultManager(ISendMessages sender, Configure config, BusNotifications bus
/// <summary>
/// The address of the Second Level Retries input queue when SLR is enabled
/// </summary>
public Address RetriesErrorQueue { get; set; }
public Address RetriesQueue { get; set; }

public SecondLevelRetriesConfiguration SecondLevelRetriesConfiguration { get; set; }

Expand All @@ -51,7 +51,7 @@ void IManageMessageFailures.Init(Address address)
void HandleSerializationFailedForMessage(TransportMessage message, Exception e)
{
message.SetExceptionHeaders(e, localAddress ?? config.LocalAddress);
sender.Send(message, new SendOptions(ErrorQueue));
SendToErrorQueue(message, e);
}

void HandleProcessingAlwaysFailsForMessage(TransportMessage message, Exception e, int numberOfRetries)
Expand All @@ -60,51 +60,65 @@ void HandleProcessingAlwaysFailsForMessage(TransportMessage message, Exception e

if (MessageWasSentFromSLR(message))
{
sender.Send(message, new SendOptions(ErrorQueue));
busNotifications.Errors.InvokeMessageHasBeenSentToErrorQueue(message, e);
SendToErrorQueue(message, e);
return;
}

var flrPart = numberOfRetries > 0
? string.Format("Message with '{0}' id has failed FLR and", message.Id)
: string.Format("FLR is disabled and the message '{0}'", message.Id);
? $"Message with '{message.Id}' id has failed FLR and"
: $"FLR is disabled and the message '{message.Id}'";

//HACK: We need this hack here till we refactor the SLR to be a first class concept in the TransportReceiver
if (RetriesErrorQueue == null)
if (RetriesQueue == null)
{
sender.Send(message, new SendOptions(ErrorQueue));
Logger.ErrorFormat("{0} will be moved to the configured error queue.", flrPart);
busNotifications.Errors.InvokeMessageHasBeenSentToErrorQueue(message, e);
SendToErrorQueue(message, e);
return;
}

var defer = SecondLevelRetriesConfiguration.RetryPolicy.Invoke(message);

if (defer < TimeSpan.Zero)
{
Logger.ErrorFormat(
"SLR has failed to resolve the issue with message {0} and will be forwarded to the error queue at {1}",
message.Id, ErrorQueue);
SendToErrorQueue(message, e);
return;
}
sender.Send(message, new SendOptions(RetriesErrorQueue));

var retryAttempt = TransportMessageHeaderHelper.GetNumberOfRetries(message) + 1;

Logger.WarnFormat("{0} will be handed over to SLR for retry attempt {1}.", flrPart, retryAttempt);
busNotifications.Errors.InvokeMessageHasBeenSentToSecondLevelRetries(retryAttempt, message, e);
SendToRetriesQueue(message, e, flrPart);
}

void SendToErrorQueue(TransportMessage message, Exception exception)
{
Logger.ErrorFormat(
"SLR has failed to resolve the issue with message {0} and will be forwarded to the error queue at {1}",
message.Id, ErrorQueue);
message.TimeToBeReceived = TimeSpan.MaxValue;

if (message.Headers.ContainsKey(Headers.FLRetries))
{
message.Headers.Remove(Headers.FLRetries);
}

message.Headers.Remove(Headers.Retries);
if (message.Headers.ContainsKey(Headers.Retries))
{
message.Headers.Remove(Headers.Retries);
}

sender.Send(message, new SendOptions(ErrorQueue));
busNotifications.Errors.InvokeMessageHasBeenSentToErrorQueue(message, exception);
}

void SendToRetriesQueue(TransportMessage message, Exception e, string flrPart)
{
message.TimeToBeReceived = TimeSpan.MaxValue;
sender.Send(message, new SendOptions(RetriesQueue));

var retryAttempt = TransportMessageHeaderHelper.GetNumberOfRetries(message) + 1;

Logger.WarnFormat("{0} will be handed over to SLR for retry attempt {1}.", flrPart, retryAttempt);
busNotifications.Errors.InvokeMessageHasBeenSentToSecondLevelRetries(retryAttempt, message, e);
}

void TryHandleFailure(Action failureHandlingAction)
{
try
Expand All @@ -113,7 +127,7 @@ void TryHandleFailure(Action failureHandlingAction)
}
catch (QueueNotFoundException exception)
{
var errorMessage = string.Format("Could not forward failed message to error queue '{0}' as it could not be found.", exception.Queue);
var errorMessage = $"Could not forward failed message to error queue '{exception.Queue}' as it could not be found.";
Logger.Fatal(errorMessage);
throw new InvalidOperationException(errorMessage, exception);
}
Expand All @@ -127,14 +141,14 @@ void TryHandleFailure(Action failureHandlingAction)

bool MessageWasSentFromSLR(TransportMessage message)
{
if (RetriesErrorQueue == null)
if (RetriesQueue == null)
{
return false;
}

// if the reply to address == ErrorQueue and RealErrorQueue is not null, the
// SecondLevelRetries sat is running and the error happened within that sat.
return TransportMessageHeaderHelper.GetAddressOfFaultingEndpoint(message) == RetriesErrorQueue;
return TransportMessageHeaderHelper.GetAddressOfFaultingEndpoint(message) == RetriesQueue;
}

static int GetNumberOfFirstLevelRetries(TransportMessage message)
Expand Down
Loading

0 comments on commit 2a3f11c

Please sign in to comment.