Skip to content

Commit

Permalink
Merge pull request #4186 from Particular/hotfix-5.2.17
Browse files Browse the repository at this point in the history
Hotfix 5.2.17
  • Loading branch information
timbussmann authored Oct 4, 2016
2 parents ec08478 + f64678f commit 9360a7e
Show file tree
Hide file tree
Showing 11 changed files with 376 additions and 306 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
<Compile Include="PerfMon\CriticalTime\When_deferring_a_message.cs" />
<Compile Include="PubSub\When_publishing_from_sendonly.cs" />
<Compile Include="PubSub\When_publishing_an_event_implementing_two_unrelated_interfaces.cs" />
<Compile Include="Sagas\When_saga_started_concurrently.cs" />
<Compile Include="Sagas\When_sagas_cant_be_found.cs" />
<Compile Include="Sagas\When_using_ReplyToOriginator.cs" />
<Compile Include="ScaleOut\When_individualization_is_enabled_for_msmq.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
namespace NServiceBus.AcceptanceTests.Sagas
{
using System;
using System.Threading.Tasks;
using NServiceBus.AcceptanceTesting;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NServiceBus.Config;
using NServiceBus.Saga;
using NUnit.Framework;

public class When_saga_started_concurrently : NServiceBusAcceptanceTest
{
[Test]
public void Should_start_single_saga()
{
var context = new Context
{
SomeId = Guid.NewGuid().ToString()
};

Scenario.Define(context)
.WithEndpoint<ConcurrentHandlerEndpoint>(b =>
{
b.When(bus =>
{
Parallel.Invoke(() =>
{
bus.SendLocal(new StartMessageOne
{
SomeId = context.SomeId
});
}, () =>
{
bus.SendLocal(new StartMessageTwo
{
SomeId = context.SomeId
}
);
});
});
})
.AllowExceptions()
.Done(c => c.PlacedSagaId != Guid.Empty && c.BilledSagaId != Guid.Empty)
.Run();

Assert.AreNotEqual(Guid.Empty, context.PlacedSagaId);
Assert.AreNotEqual(Guid.Empty, context.BilledSagaId);
Assert.AreEqual(context.PlacedSagaId, context.BilledSagaId, "Both messages should have been handled by the same saga, but SagaIds don't match.");
}

class Context : ScenarioContext
{
public string SomeId { get; set; }
public Guid PlacedSagaId { get; set; }
public Guid BilledSagaId { get; set; }
public bool SagaCompleted { get; set; }
}

class ConcurrentHandlerEndpoint : EndpointConfigurationBuilder
{
public ConcurrentHandlerEndpoint()
{
EndpointSetup<DefaultServer>(b => { })
.WithConfig<TransportConfig>(c =>
{
c.MaxRetries = 3;
c.MaximumConcurrencyLevel = 2;
})
.WithConfig<SecondLevelRetriesConfig>(c =>
{
c.Enabled = false;
});
}

class ConcurrentlyStartedSaga : Saga<ConcurrentlyStartedSagaData>,
IAmStartedByMessages<StartMessageTwo>,
IAmStartedByMessages<StartMessageOne>
{
public Context Context { get; set; }

public void Handle(StartMessageOne message)
{
Data.OrderId = message.SomeId;
Data.Placed = true;
Bus.SendLocal(new SuccessfulProcessing
{
SagaId = Data.Id,
Type = nameof(StartMessageOne)
});
CheckForCompletion();
}

public void Handle(StartMessageTwo message)
{
Data.OrderId = message.SomeId;
Data.Billed = true;
Bus.SendLocal(new SuccessfulProcessing
{
SagaId = Data.Id,
Type = nameof(StartMessageTwo)
});
CheckForCompletion();
}

protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ConcurrentlyStartedSagaData> mapper)
{
mapper.ConfigureMapping<StartMessageOne>(msg => msg.SomeId).ToSaga(saga => saga.OrderId);
mapper.ConfigureMapping<StartMessageTwo>(msg => msg.SomeId).ToSaga(saga => saga.OrderId);
}

void CheckForCompletion()
{
if (!Data.Billed || !Data.Placed)
{
return;
}
MarkAsComplete();
Context.SagaCompleted = true;
}
}

class ConcurrentlyStartedSagaData : ContainSagaData
{
[Unique]
public virtual string OrderId { get; set; }
public virtual bool Placed { get; set; }
public virtual bool Billed { get; set; }
}

// Intercepts the messages sent out by the saga
class LogSuccessfulHandler : IHandleMessages<SuccessfulProcessing>
{
public Context Context { get; set; }

public void Handle(SuccessfulProcessing message)
{
if (message.Type == nameof(StartMessageOne))
{
Context.PlacedSagaId = message.SagaId;
}
else if (message.Type == nameof(StartMessageTwo))
{
Context.BilledSagaId = message.SagaId;
}
else
{
throw new Exception("Unknown type");
}
}
}
}

[Serializable]
class StartMessageOne : ICommand
{
public string SomeId { get; set; }
}

[Serializable]
class StartMessageTwo : ICommand
{
public string SomeId { get; set; }
}

[Serializable]
class SuccessfulProcessing : ICommand
{
public string Type { get; set; }
public Guid SagaId { get; set; }
}
}
}
2 changes: 2 additions & 0 deletions src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@
<Compile Include="Msmq\ConnectionStringParserTests.cs" />
<Compile Include="ObservableTests.cs" />
<Compile Include="Persistence\InMemory\InMemorySagaPersistenceFixture.cs" />
<Compile Include="Persistence\InMemory\InMemorySubscriptionStorageTests.cs" />
<Compile Include="Persistence\Msmq\MsmqSubscriptionStorageTests.cs" />
<Compile Include="Persistence\PersistenceExtentionsTests.cs" />
<Compile Include="Persistence\PersistenceStorageMergerTests.cs" />
<Compile Include="Sagas\SagaModelTests.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace NServiceBus.Persistence.InMemory.Tests
{
using System.Linq;
using NServiceBus.InMemory.SubscriptionStorage;
using NServiceBus.Unicast.Subscriptions;
using NServiceBus.Unicast.Subscriptions.MessageDrivenSubscriptions;
using NUnit.Framework;

[TestFixture]
class InMemorySubscriptionStorageTests
{
[Test]
public void Should_ignore_message_version_on_subscriptions()
{
ISubscriptionStorage storage = new InMemorySubscriptionStorage();

storage.Subscribe(new Address("subscriberA", "subscriberA"), new[]
{
new MessageType("SomeMessage", "1.0.0")
});

var subscribers = storage.GetSubscriberAddressesForMessage(new[]
{
new MessageType("SomeMessage", "2.0.0")
});

Assert.AreEqual("subscriberA", subscribers.Single().Queue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
namespace NServiceBus.Core.Tests.Persistence.Msmq
{
using System;
using System.Linq;
using System.Messaging;
using NServiceBus.Persistence.SubscriptionStorage;
using NServiceBus.Unicast.Subscriptions.MessageDrivenSubscriptions;
using NUnit.Framework;
using MessageType = NServiceBus.Unicast.Subscriptions.MessageType;

public class MsmqSubscriptionStorageTests
{
[Test]
public void Should_ignore_message_version_on_subscriptions()
{
var testQueueName = "ShouldIgnoreMessageVersionOnSubscriptions";
var testQueueNativeAddress = Environment.MachineName + MsmqUtilities.PRIVATE + testQueueName;

if (MessageQueue.Exists(testQueueNativeAddress))
{
new MessageQueue(testQueueNativeAddress).Purge();
}
else
{
MessageQueue.Create(testQueueNativeAddress);
}

ISubscriptionStorage subscriptionStorage = new MsmqSubscriptionStorage
{
Queue = Address.Parse(testQueueName)
};

subscriptionStorage.Init();

subscriptionStorage.Subscribe(new Address("subscriberA", "server1"), new[] { new MessageType("SomeMessage", "1.0.0") });


var subscribers = subscriptionStorage.GetSubscriberAddressesForMessage(new[]
{
new MessageType("SomeMessage", "2.0.0")
});

Assert.AreEqual("subscriberA", subscribers.Single().Queue);
}
}
}
1 change: 0 additions & 1 deletion src/NServiceBus.Core/NServiceBus.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@
<Compile Include="Unicast\Transport\Config\TransportExtensions.cs" />
<Compile Include="Unicast\Transport\Config\UseTransportExtensions_Obsolete.cs" />
<Compile Include="Utils\BaseDictionary.cs" />
<Compile Include="Utils\WeakKeyDictionary.cs" />
<Compile Include="Utils\Guard.cs" />
<Compile Include="Utils\ExceptionHeaderHelper.cs" />
<Compile Include="Utils\ElevateChecker.cs" />
Expand Down
Loading

0 comments on commit 9360a7e

Please sign in to comment.