Skip to content

Commit

Permalink
In-memory persister does not throw concurrency exceptions (#4149)
Browse files Browse the repository at this point in the history
* Failing test for concurrently processing saga using in-memory persistence

* InMemorySagaPersister.Save is now concurrency safe

* Fix the acceptance test

* Addressed review comments
  • Loading branch information
DavidBoike authored and danielmarbach committed Sep 28, 2016
1 parent bf207b3 commit d061342
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
<Compile Include="Routing\MessageDrivenSubscriptions\When_subscribing_to_scaled_out_publisher.cs" />
<Compile Include="Sagas\When_a_base_class_mapped_is_handled_by_a_saga.cs" />
<Compile Include="Sagas\When_a_base_class_message_starts_a_saga.cs" />
<Compile Include="Sagas\When_saga_started_concurrently.cs" />
<Compile Include="Serialization\When_configuring_custom_xml_namespace.cs" />
<Compile Include="Serialization\When_registering_additional_deserializers.cs" />
<Compile Include="Basic\When_no_content_type.cs" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
namespace NServiceBus.AcceptanceTests.Sagas
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NUnit.Framework;

public class When_saga_started_concurrently : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_start_single_saga()
{
var context = await Scenario.Define<Context>(c => { c.SomeId = Guid.NewGuid().ToString(); })
.WithEndpoint<ConcurrentHandlerEndpoint>(b =>
{
b.When((session, ctx) =>
{
var t1 = session.SendLocal(new StartMessageOne
{
SomeId = ctx.SomeId
});
var t2 = session.SendLocal(new StartMessageTwo
{
SomeId = ctx.SomeId
});
return Task.WhenAll(t1, t2);
});
})
.Done(c => c.PlacedSagaId != Guid.Empty && c.BilledSagaId != Guid.Empty)
.Run();

Assert.AreNotEqual(Guid.Empty, context.PlacedSagaId);
Assert.AreNotEqual(Guid.Empty, context.BilledSagaId);
Assert.AreEqual(context.PlacedSagaId, context.BilledSagaId, "Both messages should have been handled by the same saga, but SagaIds don't match.");
}

class Context : ScenarioContext
{
public string SomeId { get; set; }
public Guid PlacedSagaId { get; set; }
public Guid BilledSagaId { get; set; }
public bool SagaCompleted { get; set; }
}

class ConcurrentHandlerEndpoint : EndpointConfigurationBuilder
{
public ConcurrentHandlerEndpoint()
{
EndpointSetup<DefaultServer>(b =>
{
b.LimitMessageProcessingConcurrencyTo(2);
b.Recoverability().Immediate(immediate => immediate.NumberOfRetries(3));
});
}

class ConcurrentlyStartedSaga : Saga<ConcurrentlyStartedSagaData>,
IAmStartedByMessages<StartMessageTwo>,
IAmStartedByMessages<StartMessageOne>
{
public Context Context { get; set; }

public async Task Handle(StartMessageOne message, IMessageHandlerContext context)
{
Data.Placed = true;
await context.SendLocal(new SuccessfulProcessing
{
SagaId = Data.Id,
Type = nameof(StartMessageOne)
});
CheckForCompletion(context);
}

public async Task Handle(StartMessageTwo message, IMessageHandlerContext context)
{
Data.Billed = true;
await context.SendLocal(new SuccessfulProcessing
{
SagaId = Data.Id,
Type = nameof(StartMessageTwo)
});
CheckForCompletion(context);
}

protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ConcurrentlyStartedSagaData> mapper)
{
mapper.ConfigureMapping<StartMessageOne>(msg => msg.SomeId).ToSaga(saga => saga.OrderId);
mapper.ConfigureMapping<StartMessageTwo>(msg => msg.SomeId).ToSaga(saga => saga.OrderId);
}

void CheckForCompletion(IMessageHandlerContext context)
{
if (!Data.Billed || !Data.Placed)
{
return;
}
MarkAsComplete();
Context.SagaCompleted = true;
}
}

class ConcurrentlyStartedSagaData : ContainSagaData
{
public virtual string OrderId { get; set; }
public virtual bool Placed { get; set; }
public virtual bool Billed { get; set; }
}

// Intercepts the messages sent out by the saga
class LogSuccessfulHandler : IHandleMessages<SuccessfulProcessing>
{
public Context Context { get; set; }

public Task Handle(SuccessfulProcessing message, IMessageHandlerContext context)
{
if (message.Type == nameof(StartMessageOne))
{
Context.PlacedSagaId = message.SagaId;
}
else if (message.Type == nameof(StartMessageTwo))
{
Context.BilledSagaId = message.SagaId;
}
else
{
throw new Exception("Unknown type");
}

return Task.FromResult(0);
}
}
}

class StartMessageOne : ICommand
{
public string SomeId { get; set; }
}

class StartMessageTwo : ICommand
{
public string SomeId { get; set; }
}

class SuccessfulProcessing : ICommand
{
public string Type { get; set; }
public Guid SagaId { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ public Task Complete(IContainSagaData sagaData, SynchronizedStorageSession sessi
inMemSession.Enlist(() =>
{
VersionedSagaEntity value;
data.TryRemove(sagaData.Id, out value);
if (data.TryRemove(sagaData.Id, out value))
{
object lockToken;
lockers.TryRemove(value.LockTokenKey, out lockToken);
}
});
return TaskEx.CompletedTask;
}
Expand Down Expand Up @@ -66,14 +70,19 @@ public Task Save(IContainSagaData sagaData, SagaCorrelationProperty correlationP
var inMemSession = (InMemorySynchronizedStorageSession) session;
inMemSession.Enlist(() =>
{
if (correlationProperty != SagaCorrelationProperty.None)
var lockenTokenKey = $"{sagaData.GetType().FullName}.{correlationProperty?.Name ?? "None"}.{correlationProperty?.Value ?? "None"}";
var lockToken = lockers.GetOrAdd(lockenTokenKey, key => new object());
lock (lockToken)
{
ValidateUniqueProperties(correlationProperty, sagaData);
if (correlationProperty != SagaCorrelationProperty.None)
{
ValidateUniqueProperties(correlationProperty, sagaData);
}
data.AddOrUpdate(sagaData.Id,
id => new VersionedSagaEntity(sagaData, lockenTokenKey),
(id, original) => new VersionedSagaEntity(sagaData, lockenTokenKey, original)); // we can never end up here.
}
data.AddOrUpdate(sagaData.Id,
id => new VersionedSagaEntity(sagaData),
(id, original) => new VersionedSagaEntity(sagaData, original));
});
return TaskEx.CompletedTask;
}
Expand All @@ -84,8 +93,8 @@ public Task Update(IContainSagaData sagaData, SynchronizedStorageSession session
inMemSession.Enlist(() =>
{
data.AddOrUpdate(sagaData.Id,
id => new VersionedSagaEntity(sagaData),
(id, original) => new VersionedSagaEntity(sagaData, original));
id => new VersionedSagaEntity(sagaData, $"{sagaData.GetType().FullName}.None.None"), // we can never end up here.
(id, original) => new VersionedSagaEntity(sagaData, original.LockTokenKey, original));
});
return TaskEx.CompletedTask;
}
Expand All @@ -94,6 +103,7 @@ void ValidateUniqueProperties(SagaCorrelationProperty correlationProperty, ICont
{
var sagaType = saga.GetType();
var existingSagas = new List<VersionedSagaEntity>();
// ReSharper disable once LoopCanBeConvertedToQuery
foreach (var s in data)
{
if (s.Value.SagaData.GetType() == sagaType && (s.Key != saga.Id))
Expand Down Expand Up @@ -121,11 +131,13 @@ void ValidateUniqueProperties(SagaCorrelationProperty correlationProperty, ICont
}

ConcurrentDictionary<Guid, VersionedSagaEntity> data = new ConcurrentDictionary<Guid, VersionedSagaEntity>();
ConcurrentDictionary<string, object> lockers = new ConcurrentDictionary<string, object>();

class VersionedSagaEntity
{
public VersionedSagaEntity(IContainSagaData sagaData, VersionedSagaEntity original = null)
public VersionedSagaEntity(IContainSagaData sagaData, string lockTokenKey, VersionedSagaEntity original = null)
{
LockTokenKey = lockTokenKey;
SagaData = DeepClone(sagaData);
if (original != null)
{
Expand Down Expand Up @@ -171,6 +183,7 @@ static IContainSagaData DeepClone(IContainSagaData source)
}

public IContainSagaData SagaData;
public string LockTokenKey;

ConditionalWeakTable<IContainSagaData, SagaVersion> versionCache;

Expand Down

0 comments on commit d061342

Please sign in to comment.