Skip to content

Commit

Permalink
Merge branch 'hotfix-4.7.2' into support-4.7
Browse files Browse the repository at this point in the history
  • Loading branch information
John Simons committed Oct 24, 2014
2 parents 281fa01 + 8af0bf8 commit b717ff2
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 15 deletions.
133 changes: 133 additions & 0 deletions src/NServiceBus.AcceptanceTests/Distributor/Processing_SLR_message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
namespace NServiceBus.AcceptanceTests.Distributor
{
using System;
using System.Messaging;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.AcceptanceTests.ScenarioDescriptors;
using NServiceBus.Config;
using NServiceBus.Transports.Msmq;
using NUnit.Framework;

[TestFixture]
public class Processing_SLR_message : NServiceBusAcceptanceTest
{
static TimeSpan SlrDelay = TimeSpan.FromSeconds(5);

[Test]
public void Worker_should_sends_a_ready_message_to_the_distributor()
{
try
{
var queue = new MessageQueue(MsmqUtilities.GetFullPath(Address.Parse("distributor.distributor.processingslrmessage.msmq.distributor.storage")), false, true, QueueAccessMode.Receive);
queue.Purge();
}
// ReSharper disable once EmptyGeneralCatchClause
catch (Exception)
{
//NOOP
}

var context = new Context
{
Id = Guid.NewGuid()
};
Scenario.Define(context)
.WithEndpoint<Client>(b => b
.Given((bus, c) => bus.Send(new MyMessage
{
Id = c.Id
}))
)
.WithEndpoint<Distributor>()
.WithEndpoint<Worker>()
.Done(c => c.SecondAttemptSucceeded)
.Repeat(r => r.For(Transports.Msmq))
.Run();

Assert.IsTrue(context.FirstAttemptFailed);
Assert.IsTrue(context.SecondAttemptSucceeded);
}

public class Context : ScenarioContext
{
public Guid Id { get; set; }
public bool FirstAttemptFailed { get; set; }
public bool SecondAttemptSucceeded { get; set; }
}

public class Client : EndpointConfigurationBuilder
{
public Client()
{
EndpointSetup<DefaultServer>()
.AddMapping<MyMessage>(typeof(Distributor));
}
}

public class Distributor : EndpointConfigurationBuilder
{
public Distributor()
{
EndpointSetup<DefaultServer>(c => c.RunDistributor(false));
}
}

public class Worker : EndpointConfigurationBuilder
{
public Worker()
{
EndpointSetup<DefaultServer>(c => c.EnlistWithDistributor())
.AllowExceptions()
.WithConfig<TransportConfig>(c =>
{
c.MaxRetries = 0; //to skip the FLR
})
.WithConfig<UnicastBusConfig>(c =>
{
c.DistributorControlAddress = "distributor.distributor.processingslrmessage.msmq.distributor.control";
c.DistributorDataAddress = "distributor.distributor.processingslrmessage.msmq";
})
.WithConfig<SecondLevelRetriesConfig>(c =>
{
c.NumberOfRetries = 1;
c.TimeIncrease = SlrDelay;
}).WithConfig<MasterNodeConfig>(c =>
{
c.Node = "particular.net";
});
}

public class MyMessageHandler : IHandleMessages<MyMessage>
{
public Context Context { get; set; }

public IBus Bus { get; set; }

public void Handle(MyMessage request)
{
if (Context.Id != request.Id)
{
return;
}

if (!Context.FirstAttemptFailed)
{
Context.FirstAttemptFailed = true;
throw new Exception("Triggering SLR");
}
if (Bus.CurrentMessageContext.Headers.ContainsKey(Headers.Retries))
{
Context.SecondAttemptSucceeded = true;
}
}
}
}

[Serializable]
public class MyMessage : IMessage
{
public Guid Id { get; set; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
namespace NServiceBus.AcceptanceTests.Exceptions
{
using System;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Config;
using NServiceBus.MessageMutator;
using NServiceBus.Unicast;
using NServiceBus.Unicast.Transport;
using NUnit.Framework;

public class When_processing_a_message_without_an_id : NServiceBusAcceptanceTest
{
[Test]
public void Should_invoke_start_message_processing_listeners()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<Endpoint>(b => b.Given(bus => bus.SendLocal(new Message())))
.Done(c => c.StartMessageProcessingCalled)
.Run();

Assert.IsTrue(context.StartMessageProcessingCalled);
}

public class Context : ScenarioContext
{
public bool StartMessageProcessingCalled { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(c =>
{
c.Configurer.ConfigureComponent<CorruptionMutator>(DependencyLifecycle.InstancePerCall);
c.Configurer.ConfigureComponent<StartProcessingListener>(DependencyLifecycle.SingleInstance);
c.DisableTimeoutManager();
})
.WithConfig<TransportConfig>(c =>
{
c.MaxRetries = 0;
})
.AllowExceptions();
}

class StartProcessingListener : IWantToRunWhenBusStartsAndStops
{
Context context;

public StartProcessingListener(UnicastBus bus, Context context)
{
this.context = context;
bus.Transport.StartedMessageProcessing += transport_StartedMessageProcessing;
}

void transport_StartedMessageProcessing(object sender, StartedMessageProcessingEventArgs e)
{
context.StartMessageProcessingCalled = true;
}

public void Start()
{
}

public void Stop()
{
}
}

class CorruptionMutator : IMutateTransportMessages
{
public void MutateIncoming(TransportMessage transportMessage)
{
}

public void MutateOutgoing(object[] messages, TransportMessage transportMessage)
{
transportMessage.Headers[Headers.MessageId] = "";
}
}

class Handler : IHandleMessages<Message>
{
public void Handle(Message message)
{
}
}
}

[Serializable]
public class Message : IMessage
{
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<NoWarn>618</NoWarn>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
Expand All @@ -32,6 +33,7 @@
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<NoWarn>618</NoWarn>
</PropertyGroup>
<ItemGroup>
<Reference Include="Autofac">
Expand Down Expand Up @@ -97,6 +99,7 @@
<Reference Include="System.Core" />
<Reference Include="System.Data" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="System.Messaging" />
<Reference Include="System.Transactions" />
<Reference Include="System.Web" />
<Reference Include="System.Xml" />
Expand All @@ -108,6 +111,7 @@
<Compile Include="BasicMessaging\When_using_a_message_with_TimeToBeReceived_has_expired.cs" />
<Compile Include="BasicMessaging\When_Deferring_a_message.cs" />
<Compile Include="BusStartStop\When_bus_start_raises_an_inmemory_message.cs" />
<Compile Include="Distributor\Processing_SLR_message.cs" />
<Compile Include="Exceptions\StackTraceAssert.cs" />
<Compile Include="Exceptions\When_handler_and_Module_End_throws.cs" />
<Compile Include="Exceptions\When_handler_and_Uow_End_throws.cs" />
Expand All @@ -118,6 +122,7 @@
<Compile Include="Exceptions\When_Module_Begin_throws.cs" />
<Compile Include="Exceptions\When_Module_End_throws.cs" />
<Compile Include="Exceptions\When_Module_Error_throws.cs" />
<Compile Include="Exceptions\When_processing_a_message_without_an_id.cs" />
<Compile Include="Exceptions\When_serialization_throws.cs" />
<Compile Include="Exceptions\When_Uow_Begin_and_different_End_throws.cs" />
<Compile Include="Exceptions\When_Uow_Begin_throws.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
using EndpointTemplates;
using AcceptanceTesting;
using MessageMutator;
using NServiceBus.Unicast;
using NServiceBus.Unicast.Transport;
using NUnit.Framework;

#pragma warning disable 612, 618
Expand All @@ -26,6 +28,21 @@ public void Should_preserve_the_original_body_for_regular_exceptions()
Assert.AreEqual(context.OriginalBodyChecksum, context.SlrChecksum, "The body of the message sent to slr should be the same as the original message coming off the queue");

}

[Test]
public void Should_raise_FinishedMessageProcessing_event()
{
var context = new Context();

Scenario.Define(context)
.WithEndpoint<RetryEndpoint>(b => b.Given(bus => bus.SendLocal(new MessageToBeRetried())))
.Done(c => c.FinishedMessageProcessingCalledAfterFaultManagerInvoked)
.Run();

Assert.IsTrue(context.FinishedMessageProcessingCalledAfterFaultManagerInvoked);

}

[Test]
public void Should_preserve_the_original_body_for_serialization_exceptions()
{
Expand All @@ -45,6 +62,9 @@ public void Should_preserve_the_original_body_for_serialization_exceptions()

public class Context : ScenarioContext
{
public bool FinishedMessageProcessingCalledAfterFaultManagerInvoked { get; set; }
public bool FaultManagerInvoked { get; set; }

public byte OriginalBodyChecksum { get; set; }

public byte SlrChecksum { get; set; }
Expand All @@ -56,14 +76,45 @@ public class RetryEndpoint : EndpointConfigurationBuilder
{
public RetryEndpoint()
{
EndpointSetup<DefaultServer>(c => c.Configurer.ConfigureComponent<CustomFaultManager>(DependencyLifecycle.SingleInstance))
EndpointSetup<DefaultServer>(c =>
{
c.Configurer.ConfigureComponent<CustomFaultManager>(DependencyLifecycle.SingleInstance);
c.Configurer.ConfigureComponent<FinishedProcessingListener>(DependencyLifecycle.SingleInstance);
})
.WithConfig<TransportConfig>(c =>
{
c.MaxRetries = 0;
})
.AllowExceptions();
}

class FinishedProcessingListener : IWantToRunWhenBusStartsAndStops
{
readonly Context context;

public FinishedProcessingListener(UnicastBus bus, Context context)
{
this.context = context;
bus.Transport.FinishedMessageProcessing += Transport_FinishedMessageProcessing;
}

void Transport_FinishedMessageProcessing(object sender, FinishedMessageProcessingEventArgs e)
{
if (context.FaultManagerInvoked)
{
context.FinishedMessageProcessingCalledAfterFaultManagerInvoked = true;
}
}

public void Start()
{
}

public void Stop()
{
}
}

class BodyMutator : IMutateTransportMessages, NServiceBus.INeedInitialization
{
public Context Context { get; set; }
Expand Down Expand Up @@ -106,11 +157,13 @@ class CustomFaultManager : IManageMessageFailures

public void SerializationFailedForMessage(TransportMessage message, Exception e)
{
Context.FaultManagerInvoked = true;
Context.SlrChecksum = Checksum(message.Body);
}

public void ProcessingAlwaysFailsForMessage(TransportMessage message, Exception e)
{
Context.FaultManagerInvoked = true;
Context.SlrChecksum = Checksum(message.Body);
}

Expand Down
Loading

0 comments on commit b717ff2

Please sign in to comment.