From 486aaf26a9b002fa2c627e6ef2bc116e2b9a1ff0 Mon Sep 17 00:00:00 2001 From: andreasohlund Date: Thu, 29 Sep 2016 14:50:56 +0200 Subject: [PATCH 1/4] Adding failing tests --- .../NServiceBus.Core.Tests.csproj | 2 + .../InMemorySubscriptionStorageTests.cs | 30 ++++++++++++ .../Msmq/MsmqSubscriptionStorageTests.cs | 46 +++++++++++++++++++ src/NServiceBus.sln.DotSettings | 1 + 4 files changed, 79 insertions(+) create mode 100644 src/NServiceBus.Core.Tests/Persistence/InMemory/InMemorySubscriptionStorageTests.cs create mode 100644 src/NServiceBus.Core.Tests/Persistence/Msmq/MsmqSubscriptionStorageTests.cs diff --git a/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj b/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj index 69f1f479a01..800bc3fce10 100644 --- a/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj +++ b/src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj @@ -128,6 +128,8 @@ + + diff --git a/src/NServiceBus.Core.Tests/Persistence/InMemory/InMemorySubscriptionStorageTests.cs b/src/NServiceBus.Core.Tests/Persistence/InMemory/InMemorySubscriptionStorageTests.cs new file mode 100644 index 00000000000..6c1a4a431a7 --- /dev/null +++ b/src/NServiceBus.Core.Tests/Persistence/InMemory/InMemorySubscriptionStorageTests.cs @@ -0,0 +1,30 @@ +namespace NServiceBus.Persistence.InMemory.Tests +{ + using System.Linq; + using NServiceBus.InMemory.SubscriptionStorage; + using NServiceBus.Unicast.Subscriptions; + using NServiceBus.Unicast.Subscriptions.MessageDrivenSubscriptions; + using NUnit.Framework; + + [TestFixture] + class InMemorySubscriptionStorageTests + { + [Test] + public void Should_ignore_message_version_on_subscriptions() + { + ISubscriptionStorage storage = new InMemorySubscriptionStorage(); + + storage.Subscribe(new Address("subscriberA", "subscriberA"), new[] + { + new MessageType("SomeMessage", "1.0.0") + }); + + var subscribers = storage.GetSubscriberAddressesForMessage(new[] + { + new MessageType("SomeMessage", "2.0.0") + }); + + Assert.AreEqual("subscriberA", subscribers.Single().Queue); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Persistence/Msmq/MsmqSubscriptionStorageTests.cs b/src/NServiceBus.Core.Tests/Persistence/Msmq/MsmqSubscriptionStorageTests.cs new file mode 100644 index 00000000000..e426c5d3baa --- /dev/null +++ b/src/NServiceBus.Core.Tests/Persistence/Msmq/MsmqSubscriptionStorageTests.cs @@ -0,0 +1,46 @@ +namespace NServiceBus.Core.Tests.Persistence.Msmq +{ + using System; + using System.Linq; + using System.Messaging; + using NServiceBus.Persistence.SubscriptionStorage; + using NServiceBus.Unicast.Subscriptions.MessageDrivenSubscriptions; + using NUnit.Framework; + using MessageType = NServiceBus.Unicast.Subscriptions.MessageType; + + public class MsmqSubscriptionStorageTests + { + [Test] + public void Should_ignore_message_version_on_subscriptions() + { + var testQueueName = "ShouldIgnoreMessageVersionOnSubscriptions"; + var testQueueNativeAddress = Environment.MachineName + MsmqUtilities.PRIVATE + testQueueName; + + if (MessageQueue.Exists(testQueueNativeAddress)) + { + new MessageQueue(testQueueNativeAddress).Purge(); + } + else + { + MessageQueue.Create(testQueueNativeAddress); + } + + ISubscriptionStorage subscriptionStorage = new MsmqSubscriptionStorage + { + Queue = Address.Parse(testQueueName) + }; + + subscriptionStorage.Init(); + + subscriptionStorage.Subscribe(new Address("subscriberA", "server1"), new[] { new MessageType("SomeMessage", "1.0.0") }); + + + var subscribers = subscriptionStorage.GetSubscriberAddressesForMessage(new[] + { + new MessageType("SomeMessage", "2.0.0") + }); + + Assert.AreEqual("subscriberA", subscribers.Single().Queue); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.sln.DotSettings b/src/NServiceBus.sln.DotSettings index 87e28c5706a..ddcbdb9c4e8 100644 --- a/src/NServiceBus.sln.DotSettings +++ b/src/NServiceBus.sln.DotSettings @@ -581,6 +581,7 @@ II.2.12 <HandlesEvent /> True True True + True From 3a6750da7543288b814e5704a0bfcaaabe546bc2 Mon Sep 17 00:00:00 2001 From: andreasohlund Date: Thu, 29 Sep 2016 14:52:24 +0200 Subject: [PATCH 2/4] Ignore message version for subscriptions --- .../MessageDrivenSubscriptions/MessageType.cs | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageType.cs b/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageType.cs index a18a35c057a..e91bbc221cc 100644 --- a/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageType.cs +++ b/src/NServiceBus.Core/Unicast/Subscriptions/MessageDrivenSubscriptions/MessageType.cs @@ -18,19 +18,19 @@ public MessageType(Type type) } /// - /// Initializes the message type from the given string. + /// Initializes the message type from the given string. /// public MessageType(string messageTypeString) { var parts = messageTypeString.Split(','); - - Version = ParseVersion(messageTypeString); + + Version = ParseVersion(messageTypeString); TypeName = parts.First(); } /// - /// Initializes the message type from the given string. + /// Initializes the message type from the given string. /// public MessageType(string typeName, string versionString) { @@ -39,20 +39,20 @@ public MessageType(string typeName, string versionString) } /// - /// Initializes the message type from the given string. + /// Initializes the message type from the given string. /// - public MessageType(string typeName,Version version) + public MessageType(string typeName, Version version) { Version = version; TypeName = typeName; } - + Version ParseVersion(string versionString) { const string version = "Version="; var index = versionString.IndexOf(version); - - if(index >= 0) + + if (index >= 0) versionString = versionString.Substring(index + version.Length) .Split(',').First(); return Version.Parse(versionString); @@ -62,7 +62,7 @@ Version ParseVersion(string versionString) /// /// TypeName of the message /// - public string TypeName { get;private set; } + public string TypeName { get; private set; } /// /// Version of the message @@ -84,7 +84,7 @@ public bool Equals(MessageType other) { if (ReferenceEquals(null, other)) return false; if (ReferenceEquals(this, other)) return true; - return Equals(other.TypeName, TypeName) && other.Version.Major == Version.Major; + return Equals(other.TypeName, TypeName); } /// @@ -94,8 +94,8 @@ public override bool Equals(object obj) { if (ReferenceEquals(null, obj)) return false; if (ReferenceEquals(this, obj)) return true; - if (obj.GetType() != typeof (MessageType)) return false; - return Equals((MessageType) obj); + if (obj.GetType() != typeof(MessageType)) return false; + return Equals((MessageType)obj); } /// @@ -103,10 +103,7 @@ public override bool Equals(object obj) /// public override int GetHashCode() { - unchecked - { - return (TypeName.GetHashCode()*397) ^ Version.GetHashCode(); - } + return TypeName.GetHashCode(); } /// From 26388b2eb8e744f4eadab84dc0032885b6958aec Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Mon, 3 Oct 2016 10:18:15 +0200 Subject: [PATCH 3/4] InMemorySagaPersistence handles concurrently started sagas properly --- .../NServiceBus.AcceptanceTests.csproj | 1 + .../Sagas/When_saga_started_concurrently.cs | 172 +++++++++++++ src/NServiceBus.Core/NServiceBus.Core.csproj | 1 - .../SagaPersister/InMemorySagaPersister.cs | 149 ++++++++---- src/NServiceBus.Core/Sagas/SagaMetadata.cs | 2 +- .../Utils/WeakKeyDictionary.cs | 227 ------------------ 6 files changed, 270 insertions(+), 282 deletions(-) create mode 100644 src/NServiceBus.AcceptanceTests/Sagas/When_saga_started_concurrently.cs delete mode 100644 src/NServiceBus.Core/Utils/WeakKeyDictionary.cs diff --git a/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj b/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj index 31e4ee13e9b..d364c011279 100644 --- a/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj +++ b/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj @@ -96,6 +96,7 @@ + diff --git a/src/NServiceBus.AcceptanceTests/Sagas/When_saga_started_concurrently.cs b/src/NServiceBus.AcceptanceTests/Sagas/When_saga_started_concurrently.cs new file mode 100644 index 00000000000..f9914d225c5 --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Sagas/When_saga_started_concurrently.cs @@ -0,0 +1,172 @@ +namespace NServiceBus.AcceptanceTests.Sagas +{ + using System; + using System.Threading.Tasks; + using NServiceBus.AcceptanceTesting; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NServiceBus.Config; + using NServiceBus.Saga; + using NUnit.Framework; + + public class When_saga_started_concurrently : NServiceBusAcceptanceTest + { + [Test] + public void Should_start_single_saga() + { + var context = new Context + { + SomeId = Guid.NewGuid().ToString() + }; + + Scenario.Define(context) + .WithEndpoint(b => + { + b.When(bus => + { + Parallel.Invoke(() => + { + bus.SendLocal(new StartMessageOne + { + SomeId = context.SomeId + }); + }, () => + { + bus.SendLocal(new StartMessageTwo + { + SomeId = context.SomeId + } + ); + }); + }); + }) + .AllowExceptions() + .Done(c => c.PlacedSagaId != Guid.Empty && c.BilledSagaId != Guid.Empty) + .Run(); + + Assert.AreNotEqual(Guid.Empty, context.PlacedSagaId); + Assert.AreNotEqual(Guid.Empty, context.BilledSagaId); + Assert.AreEqual(context.PlacedSagaId, context.BilledSagaId, "Both messages should have been handled by the same saga, but SagaIds don't match."); + } + + class Context : ScenarioContext + { + public string SomeId { get; set; } + public Guid PlacedSagaId { get; set; } + public Guid BilledSagaId { get; set; } + public bool SagaCompleted { get; set; } + } + + class ConcurrentHandlerEndpoint : EndpointConfigurationBuilder + { + public ConcurrentHandlerEndpoint() + { + EndpointSetup(b => { }) + .WithConfig(c => + { + c.MaxRetries = 3; + c.MaximumConcurrencyLevel = 2; + }) + .WithConfig(c => + { + c.Enabled = false; + }); + } + + class ConcurrentlyStartedSaga : Saga, + IAmStartedByMessages, + IAmStartedByMessages + { + public Context Context { get; set; } + + public void Handle(StartMessageOne message) + { + Data.OrderId = message.SomeId; + Data.Placed = true; + Bus.SendLocal(new SuccessfulProcessing + { + SagaId = Data.Id, + Type = nameof(StartMessageOne) + }); + CheckForCompletion(); + } + + public void Handle(StartMessageTwo message) + { + Data.OrderId = message.SomeId; + Data.Billed = true; + Bus.SendLocal(new SuccessfulProcessing + { + SagaId = Data.Id, + Type = nameof(StartMessageTwo) + }); + CheckForCompletion(); + } + + protected override void ConfigureHowToFindSaga(SagaPropertyMapper mapper) + { + mapper.ConfigureMapping(msg => msg.SomeId).ToSaga(saga => saga.OrderId); + mapper.ConfigureMapping(msg => msg.SomeId).ToSaga(saga => saga.OrderId); + } + + void CheckForCompletion() + { + if (!Data.Billed || !Data.Placed) + { + return; + } + MarkAsComplete(); + Context.SagaCompleted = true; + } + } + + class ConcurrentlyStartedSagaData : ContainSagaData + { + [Unique] + public virtual string OrderId { get; set; } + public virtual bool Placed { get; set; } + public virtual bool Billed { get; set; } + } + + // Intercepts the messages sent out by the saga + class LogSuccessfulHandler : IHandleMessages + { + public Context Context { get; set; } + + public void Handle(SuccessfulProcessing message) + { + if (message.Type == nameof(StartMessageOne)) + { + Context.PlacedSagaId = message.SagaId; + } + else if (message.Type == nameof(StartMessageTwo)) + { + Context.BilledSagaId = message.SagaId; + } + else + { + throw new Exception("Unknown type"); + } + } + } + } + + [Serializable] + class StartMessageOne : ICommand + { + public string SomeId { get; set; } + } + + [Serializable] + class StartMessageTwo : ICommand + { + public string SomeId { get; set; } + } + + [Serializable] + class SuccessfulProcessing : ICommand + { + public string Type { get; set; } + public Guid SagaId { get; set; } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/NServiceBus.Core.csproj b/src/NServiceBus.Core/NServiceBus.Core.csproj index d74a5b1846d..7cbf274954d 100644 --- a/src/NServiceBus.Core/NServiceBus.Core.csproj +++ b/src/NServiceBus.Core/NServiceBus.Core.csproj @@ -266,7 +266,6 @@ - diff --git a/src/NServiceBus.Core/Persistence/InMemory/SagaPersister/InMemorySagaPersister.cs b/src/NServiceBus.Core/Persistence/InMemory/SagaPersister/InMemorySagaPersister.cs index 63bb38f5ebc..67e3e0a8dd3 100644 --- a/src/NServiceBus.Core/Persistence/InMemory/SagaPersister/InMemorySagaPersister.cs +++ b/src/NServiceBus.Core/Persistence/InMemory/SagaPersister/InMemorySagaPersister.cs @@ -2,10 +2,8 @@ namespace NServiceBus.InMemory.SagaPersister { using System; using System.Collections.Concurrent; - using System.Linq; - using System.Threading; + using System.Runtime.CompilerServices; using NServiceBus.Sagas; - using NServiceBus.Utils; using Saga; using Serializers.Json; @@ -15,10 +13,8 @@ namespace NServiceBus.InMemory.SagaPersister class InMemorySagaPersister : ISagaPersister { readonly SagaMetaModel sagaModel; - int version; - - JsonMessageSerializer serializer = new JsonMessageSerializer(null); ConcurrentDictionary data = new ConcurrentDictionary(); + ConcurrentDictionary lockers = new ConcurrentDictionary(); public InMemorySagaPersister(SagaMetaModel sagaModel) { @@ -28,26 +24,35 @@ public InMemorySagaPersister(SagaMetaModel sagaModel) public void Complete(IContainSagaData saga) { VersionedSagaEntity value; - data.TryRemove(saga.Id, out value); + if (data.TryRemove(saga.Id, out value)) + { + object lockToken; + lockers.TryRemove(value.LockTokenKey, out lockToken); + } } public TSagaData Get(string propertyName, object propertyValue) where TSagaData : IContainSagaData { - var values = data.Values.Where(x => x.SagaEntity is TSagaData); - foreach (var entity in values) + foreach (var entity in data.Values) { - var prop = entity.SagaEntity.GetType().GetProperty(propertyName); + if (!(entity.SagaData is TSagaData)) + { + continue; + } + + var prop = typeof(TSagaData).GetProperty(propertyName); if (prop == null) { continue; } - if (!prop.GetValue(entity.SagaEntity, null).Equals(propertyValue)) + var existingValue = prop.GetValue(entity.SagaData); + + if (existingValue.ToString() != propertyValue.ToString()) { continue; } - var clone = (TSagaData)DeepClone(entity.SagaEntity); - entity.RecordRead(clone, version); - return clone; + var sagaData = entity.Read(); + return sagaData; } return default(TSagaData); } @@ -55,28 +60,26 @@ public TSagaData Get(string propertyName, object propertyValue) where public TSagaData Get(Guid sagaId) where TSagaData : IContainSagaData { VersionedSagaEntity result; - if (data.TryGetValue(sagaId, out result) && (result != null) && (result.SagaEntity is TSagaData)) + if (data.TryGetValue(sagaId, out result) && result?.SagaData is TSagaData) { - var clone = (TSagaData)DeepClone(result.SagaEntity); - result.RecordRead(clone, version); - return clone; + var sagaData = result.Read(); + return sagaData; } return default(TSagaData); } public void Save(IContainSagaData saga) { - ValidateUniqueProperties(saga); - - VersionedSagaEntity sagaEntity; - if (data.TryGetValue(saga.Id, out sagaEntity)) + var lockenTokenKey = $"{saga.GetType().FullName}"; + var lockToken = lockers.GetOrAdd(lockenTokenKey, key => new object()); + lock (lockToken) { - sagaEntity.ConcurrencyCheck(saga, version); - } + ValidateUniqueProperties(saga); - data.AddOrUpdate(saga.Id, id => new VersionedSagaEntity { SagaEntity = DeepClone(saga) }, (id, original) => new VersionedSagaEntity { SagaEntity = DeepClone(saga), VersionCache = original.VersionCache }); - - Interlocked.Increment(ref version); + data.AddOrUpdate(saga.Id, + id => new VersionedSagaEntity(saga, lockenTokenKey), + (id, original) => new VersionedSagaEntity(saga, lockenTokenKey, original)); // we can never end up here. + } } public void Update(IContainSagaData saga) @@ -86,17 +89,16 @@ public void Update(IContainSagaData saga) void ValidateUniqueProperties(IContainSagaData saga) { - var sagaMetaData = sagaModel.FindByEntityName(saga.GetType().FullName); - - if (!sagaMetaData.CorrelationProperties.Any()) return; + var sagaType = saga.GetType(); + var sagaMetaData = sagaModel.FindByEntityName(sagaType.FullName); - var sagasFromSameType = from s in data - where - (s.Value.SagaEntity.GetType() == saga.GetType() && (s.Key != saga.Id)) - select s.Value; + if (sagaMetaData.CorrelationProperties.Count == 0) return; - foreach (var storedSaga in sagasFromSameType) + // ReSharper disable once LoopCanBeConvertedToQuery + foreach (var storedSaga in data) { + if (storedSaga.Value.SagaData.GetType() != sagaType || (storedSaga.Key == saga.Id)) continue; + foreach (var correlationProperty in sagaMetaData.CorrelationProperties) { var uniqueProperty = saga.GetType().GetProperty(correlationProperty.Name); @@ -104,43 +106,84 @@ void ValidateUniqueProperties(IContainSagaData saga) { continue; } + var inComingSagaPropertyValue = uniqueProperty.GetValue(saga, null); - var storedSagaPropertyValue = uniqueProperty.GetValue(storedSaga.SagaEntity, null); + var storedSagaPropertyValue = uniqueProperty.GetValue(storedSaga.Value.SagaData, null); if (inComingSagaPropertyValue.Equals(storedSagaPropertyValue)) { - var message = string.Format("Cannot store a saga. The saga with id '{0}' already has property '{1}' with value '{2}'.", storedSaga.SagaEntity.Id, uniqueProperty, storedSagaPropertyValue); + var message = $"Cannot store a saga. The saga with id '{storedSaga.Value.SagaData.Id}' already has property '{uniqueProperty}' with value '{storedSagaPropertyValue}'."; throw new InvalidOperationException(message); } } } } - IContainSagaData DeepClone(IContainSagaData source) + class VersionedSagaEntity { - var json = serializer.SerializeObject(source); + public VersionedSagaEntity(IContainSagaData sagaData, string lockTokenKey, VersionedSagaEntity original = null) + { + LockTokenKey = lockTokenKey; + SagaData = DeepClone(sagaData); + if (original != null) + { + original.ConcurrencyCheck(sagaData); - return (IContainSagaData)serializer.DeserializeObject(json, source.GetType()); - } + versionCache = original.versionCache; + version = original.version; + version++; + } + else + { + versionCache = new ConditionalWeakTable(); + versionCache.Add(sagaData, new SagaVersion(version)); + } + } - class VersionedSagaEntity - { - public IContainSagaData SagaEntity; + public TSagaData Read() + where TSagaData : IContainSagaData + { + var clone = DeepClone(SagaData); + versionCache.Add(clone, new SagaVersion(version)); + return (TSagaData)clone; + } + + void ConcurrencyCheck(IContainSagaData sagaEntity) + { + SagaVersion v; + if (!versionCache.TryGetValue(sagaEntity, out v)) + { + throw new Exception($"InMemorySagaPersister in an inconsistent state: entity Id[{sagaEntity.Id}] not read."); + } - public WeakKeyDictionary VersionCache = new WeakKeyDictionary(); + if (v.Version != version) + { + throw new Exception($"InMemorySagaPersister concurrency violation: saga entity Id[{sagaEntity.Id}] already saved."); + } + } - public void RecordRead(IContainSagaData sagaEntity, int currentVersion) + static IContainSagaData DeepClone(IContainSagaData source) { - VersionCache[sagaEntity] = currentVersion; + var json = serializer.SerializeObject(source); + return (IContainSagaData)serializer.DeserializeObject(json, source.GetType()); } - public void ConcurrencyCheck(IContainSagaData sagaEntity, int currentVersion) + public IContainSagaData SagaData; + public string LockTokenKey; + + ConditionalWeakTable versionCache; + + int version; + + static JsonMessageSerializer serializer = new JsonMessageSerializer(null); + + class SagaVersion { - int v; - if (!VersionCache.TryGetValue(sagaEntity, out v)) - throw new Exception(string.Format("InMemorySagaPersister in an inconsistent state: entity Id[{0}] not read.", sagaEntity.Id)); + public SagaVersion(long version) + { + Version = version; + } - if (v != currentVersion) - throw new Exception(string.Format("InMemorySagaPersister concurrency violation: saga entity Id[{0}] already saved.", sagaEntity.Id)); + public long Version; } } } diff --git a/src/NServiceBus.Core/Sagas/SagaMetadata.cs b/src/NServiceBus.Core/Sagas/SagaMetadata.cs index 5cdcb7320ff..8356a623661 100644 --- a/src/NServiceBus.Core/Sagas/SagaMetadata.cs +++ b/src/NServiceBus.Core/Sagas/SagaMetadata.cs @@ -33,7 +33,7 @@ public SagaMetadata(IEnumerable messages, IEnumerable /// Properties this saga is correlated on /// - public IEnumerable CorrelationProperties; + public List CorrelationProperties; /// /// The name of the saga diff --git a/src/NServiceBus.Core/Utils/WeakKeyDictionary.cs b/src/NServiceBus.Core/Utils/WeakKeyDictionary.cs deleted file mode 100644 index c49ab617f94..00000000000 --- a/src/NServiceBus.Core/Utils/WeakKeyDictionary.cs +++ /dev/null @@ -1,227 +0,0 @@ -using System; - -namespace NServiceBus.Utils -{ - using System.Collections.Generic; - - internal class WeakReference : WeakReference where T : class - { - public static WeakReference Create(T target) - { - if (target == null) - return WeakNullReference.Singleton; - - return new WeakReference(target); - } - - protected WeakReference(T target) - : base(target, false) { } - - public new T Target - { - get { return (T)base.Target; } - } - } - - internal class WeakNullReference : WeakReference where T : class - { - public static readonly WeakNullReference Singleton = new WeakNullReference(); - - private WeakNullReference() - : base(null) - { - } - - public override bool IsAlive - { - get { return true; } - } - } - - internal sealed class WeakKeyReference : WeakReference where T : class - { - public readonly int HashCode; - - public WeakKeyReference(T key, WeakKeyComparer comparer) - : base(key) - { - // retain the object's hash code immediately so that even - // if the target is GC'ed we will be able to find and - // remove the dead weak reference. - HashCode = comparer.GetHashCode(key); - } - } - - internal sealed class WeakKeyComparer : IEqualityComparer - where T : class - { - private IEqualityComparer comparer; - - internal WeakKeyComparer(IEqualityComparer comparer) - { - if (comparer == null) - comparer = EqualityComparer.Default; - - this.comparer = comparer; - } - - public int GetHashCode(object obj) - { - var weakKey = obj as WeakKeyReference; - if (weakKey != null) return weakKey.HashCode; - return comparer.GetHashCode((T)obj); - } - - // Note: There are actually 9 cases to handle here. - // - // Let Wa = Alive Weak Reference - // Let Wd = Dead Weak Reference - // Let S = Strong Reference - // - // x | y | Equals(x,y) - // ------------------------------------------------- - // Wa | Wa | comparer.Equals(x.Target, y.Target) - // Wa | Wd | false - // Wa | S | comparer.Equals(x.Target, y) - // Wd | Wa | false - // Wd | Wd | x == y - // Wd | S | false - // S | Wa | comparer.Equals(x, y.Target) - // S | Wd | false - // S | S | comparer.Equals(x, y) - // ------------------------------------------------- - public new bool Equals(object x, object y) - { - bool xIsDead, yIsDead; - var first = GetTarget(x, out xIsDead); - var second = GetTarget(y, out yIsDead); - - if (xIsDead) - return yIsDead && x == y; - - if (yIsDead) - return false; - - return comparer.Equals(first, second); - } - - private static T GetTarget(object obj, out bool isDead) - { - var wref = obj as WeakKeyReference; - T target; - if (wref != null) - { - target = wref.Target; - isDead = !wref.IsAlive; - } - else - { - target = (T)obj; - isDead = false; - } - return target; - } - } - - internal sealed class WeakKeyDictionary : BaseDictionary - where TKey : class - { - private Dictionary dictionary; - private WeakKeyComparer comparer; - - public WeakKeyDictionary() - : this(0, null) { } - - public WeakKeyDictionary(int capacity) - : this(capacity, null) { } - - public WeakKeyDictionary(IEqualityComparer comparer) - : this(0, comparer) { } - - public WeakKeyDictionary(int capacity, IEqualityComparer comparer) - { - this.comparer = new WeakKeyComparer(comparer); - dictionary = new Dictionary(capacity, this.comparer); - } - - // WARNING: The count returned here may include entries for which - // either the key or value objects have already been garbage - // collected. Call RemoveCollectedEntries to weed out collected - // entries and update the count accordingly. - public override int Count - { - get { return dictionary.Count; } - } - - public override void Add(TKey key, TValue value) - { - if (key == null) throw new ArgumentNullException("key"); - WeakReference weakKey = new WeakKeyReference(key, comparer); - dictionary.Add(weakKey, value); - } - - public override bool ContainsKey(TKey key) - { - return dictionary.ContainsKey(key); - } - - public override bool Remove(TKey key) - { - return dictionary.Remove(key); - } - - public override bool TryGetValue(TKey key, out TValue value) - { - return dictionary.TryGetValue(key, out value); - } - - protected override void SetValue(TKey key, TValue value) - { - WeakReference weakKey = new WeakKeyReference(key, comparer); - dictionary[weakKey] = value; - } - - public override void Clear() - { - dictionary.Clear(); - } - - public override IEnumerator> GetEnumerator() - { - foreach (var kvp in dictionary) - { - var weakKey = (WeakReference)(kvp.Key); - var key = weakKey.Target; - var value = kvp.Value; - if (weakKey.IsAlive) - yield return new KeyValuePair(key, value); - } - } - - // Removes the left-over weak references for entries in the dictionary - // whose key or value has already been reclaimed by the garbage - // collector. This will reduce the dictionary's Count by the number - // of dead key-value pairs that were eliminated. - public void RemoveCollectedEntries() - { - List toRemove = null; - foreach (var pair in dictionary) - { - var weakKey = (WeakReference)(pair.Key); - - if (!weakKey.IsAlive) - { - if (toRemove == null) - toRemove = new List(); - toRemove.Add(weakKey); - } - } - - if (toRemove != null) - { - foreach (var key in toRemove) - dictionary.Remove(key); - } - } - } -} \ No newline at end of file From f64678f9985fce22e0e0c21c0ec249ce420096ca Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Tue, 4 Oct 2016 10:31:02 +0200 Subject: [PATCH 4/4] Apply reformatting --- .../SagaPersister/InMemorySagaPersister.cs | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/src/NServiceBus.Core/Persistence/InMemory/SagaPersister/InMemorySagaPersister.cs b/src/NServiceBus.Core/Persistence/InMemory/SagaPersister/InMemorySagaPersister.cs index 67e3e0a8dd3..458e43eabd0 100644 --- a/src/NServiceBus.Core/Persistence/InMemory/SagaPersister/InMemorySagaPersister.cs +++ b/src/NServiceBus.Core/Persistence/InMemory/SagaPersister/InMemorySagaPersister.cs @@ -3,19 +3,15 @@ namespace NServiceBus.InMemory.SagaPersister using System; using System.Collections.Concurrent; using System.Runtime.CompilerServices; + using NServiceBus.Saga; using NServiceBus.Sagas; - using Saga; - using Serializers.Json; + using NServiceBus.Serializers.Json; /// - /// In memory implementation of ISagaPersister for quick development. + /// In memory implementation of ISagaPersister for quick development. /// class InMemorySagaPersister : ISagaPersister { - readonly SagaMetaModel sagaModel; - ConcurrentDictionary data = new ConcurrentDictionary(); - ConcurrentDictionary lockers = new ConcurrentDictionary(); - public InMemorySagaPersister(SagaMetaModel sagaModel) { this.sagaModel = sagaModel; @@ -45,12 +41,14 @@ public TSagaData Get(string propertyName, object propertyValue) where { continue; } + var existingValue = prop.GetValue(entity.SagaData); if (existingValue.ToString() != propertyValue.ToString()) { continue; } + var sagaData = entity.Read(); return sagaData; } @@ -97,7 +95,10 @@ void ValidateUniqueProperties(IContainSagaData saga) // ReSharper disable once LoopCanBeConvertedToQuery foreach (var storedSaga in data) { - if (storedSaga.Value.SagaData.GetType() != sagaType || (storedSaga.Key == saga.Id)) continue; + if (storedSaga.Value.SagaData.GetType() != sagaType || (storedSaga.Key == saga.Id)) + { + continue; + } foreach (var correlationProperty in sagaMetaData.CorrelationProperties) { @@ -118,8 +119,14 @@ void ValidateUniqueProperties(IContainSagaData saga) } } + readonly SagaMetaModel sagaModel; + ConcurrentDictionary data = new ConcurrentDictionary(); + ConcurrentDictionary lockers = new ConcurrentDictionary(); + class VersionedSagaEntity { + static JsonMessageSerializer serializer = new JsonMessageSerializer(null); + public VersionedSagaEntity(IContainSagaData sagaData, string lockTokenKey, VersionedSagaEntity original = null) { LockTokenKey = lockTokenKey; @@ -144,7 +151,7 @@ public TSagaData Read() { var clone = DeepClone(SagaData); versionCache.Add(clone, new SagaVersion(version)); - return (TSagaData)clone; + return (TSagaData) clone; } void ConcurrencyCheck(IContainSagaData sagaEntity) @@ -164,17 +171,16 @@ void ConcurrencyCheck(IContainSagaData sagaEntity) static IContainSagaData DeepClone(IContainSagaData source) { var json = serializer.SerializeObject(source); - return (IContainSagaData)serializer.DeserializeObject(json, source.GetType()); + return (IContainSagaData) serializer.DeserializeObject(json, source.GetType()); } - public IContainSagaData SagaData; public string LockTokenKey; - ConditionalWeakTable versionCache; + public IContainSagaData SagaData; int version; - static JsonMessageSerializer serializer = new JsonMessageSerializer(null); + ConditionalWeakTable versionCache; class SagaVersion { @@ -187,4 +193,4 @@ public SagaVersion(long version) } } } -} +} \ No newline at end of file