Skip to content

Commit

Permalink
Merge branch 'hotfix-4.1.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
John Simons committed Jul 21, 2014
2 parents a6699ce + 5648d19 commit 581eee9
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 45 deletions.
2 changes: 1 addition & 1 deletion buildsupport/GitVersionTask/Build/GitVersionTask.targets
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
SolutionDirectory="$(SolutionDir)"
ProjectFile="$(ProjectPath)"
AppendRevision="$(GitVersionAppendRevision)"
SignAssembly="$(SignAssembly)"
AssemblyVersioningScheme="MajorMinor"
CompileFiles ="@(Compile)">
<Output
TaskParameter="AssemblyInfoTempFilePath"
Expand Down
Binary file modified buildsupport/GitVersionTask/GitVersionCore.dll
Binary file not shown.
Binary file modified buildsupport/GitVersionTask/GitVersionTask.dll
Binary file not shown.
Binary file modified buildsupport/GitVersionTask/LibGit2Sharp.dll
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions src/NServiceBus.Core.Tests/NServiceBus.Core.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@
<Compile Include="Serializers\XML\SomeEnum.cs" />
<Compile Include="Serializers\XML\Using_Infer_Type_With_Non_Nested_Class.cs" />
<Compile Include="Timeout\FakeMessageSender.cs" />
<Compile Include="Timeout\RavenTimeoutPersisterTests.cs" />
<Compile Include="Timeout\When_fetching_timeouts_from_storage.cs" />
<Compile Include="Timeout\When_pooling_timeouts.cs" />
<Compile Include="Timeout\When_receiving_timeouts.cs" />
Expand Down
260 changes: 260 additions & 0 deletions src/NServiceBus.Core.Tests/Timeout/RavenTimeoutPersisterTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
namespace NServiceBus.Core.Tests.Timeout
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using NServiceBus.Persistence.Raven;
using NServiceBus.Persistence.Raven.TimeoutPersister;
using NServiceBus.Timeout.Core;
using NUnit.Framework;
using Raven.Client;
using Raven.Client.Document;

[TestFixture]
public class RavenTimeoutPersisterTests
{
[TestCase, Repeat(200)]
public void Should_not_skip_timeouts()
{
var db = Guid.NewGuid().ToString();
documentStore = new DocumentStore
{
Url = "http://localhost:8080",
DefaultDatabase = db,
}.Initialize();
persister = new RavenTimeoutPersistence(new StoreAccessor(documentStore))
{
TriggerCleanupEvery = TimeSpan.FromHours(1), // Make sure cleanup doesn't run automatically
};

var startSlice = DateTime.UtcNow.AddYears(-10);
// avoid cleanup from running during the test by making it register as being run
Assert.AreEqual(0, persister.GetCleanupChunk(startSlice).Count());

var expected = new List<Tuple<string, DateTime>>();
var lastExpectedTimeout = DateTime.UtcNow;
var finishedAdding = false;

new Thread(() =>
{
var sagaId = Guid.NewGuid();
for (var i = 0; i < 10000; i++)
{
var td = new TimeoutData
{
SagaId = sagaId,
Destination = new Address("queue", "machine"),
Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(5, 20)),
OwningTimeoutManager = string.Empty,
};
persister.Add(td);
expected.Add(new Tuple<string, DateTime>(td.Id, td.Time));
lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout;
}
finishedAdding = true;
Console.WriteLine("*** Finished adding ***");
}).Start();

// Mimic the behavior of the TimeoutPersister coordinator
var found = 0;
TimeoutData tempTd;
while (!finishedAdding || startSlice < lastExpectedTimeout)
{
DateTime nextRetrieval;
var timeoutDatas = persister.GetNextChunk(startSlice, out nextRetrieval);
foreach (var timeoutData in timeoutDatas)
{
if (startSlice < timeoutData.Item2)
{
startSlice = timeoutData.Item2;
}

Assert.IsTrue(persister.TryRemove(timeoutData.Item1, out tempTd));
found++;
}
}

WaitForIndexing(documentStore);

// If the persister reports stale results have been seen at one point during its normal operation,
// we need to perform manual cleaup.
while (true)
{
var chunkToCleanup = persister.GetCleanupChunk(DateTime.UtcNow.AddDays(1)).ToArray();
Console.WriteLine("Cleanup: got a chunk of size " + chunkToCleanup.Length);
if (chunkToCleanup.Length == 0) break;

found += chunkToCleanup.Length;
foreach (var tuple in chunkToCleanup)
{
Assert.IsTrue(persister.TryRemove(tuple.Item1, out tempTd));
}

WaitForIndexing(documentStore);
}

using (var session = documentStore.OpenSession())
{
var results = session.Query<TimeoutData>().ToList();
Assert.AreEqual(0, results.Count);
}

Assert.AreEqual(expected.Count, found);
}

[TestCase, Repeat(200)]
public void Should_not_skip_timeouts_also_with_multiple_clients_adding_timeouts()
{
var db = Guid.NewGuid().ToString();
documentStore = new DocumentStore
{
Url = "http://localhost:8080",
DefaultDatabase = db,
}.Initialize();
persister = new RavenTimeoutPersistence(new StoreAccessor(documentStore))
{
TriggerCleanupEvery = TimeSpan.FromDays(1), // Make sure cleanup doesn't run automatically
};

var startSlice = DateTime.UtcNow.AddYears(-10);
// avoid cleanup from running during the test by making it register as being run
Assert.AreEqual(0, persister.GetCleanupChunk(startSlice).Count());

const int insertsPerThread = 10000;
var expected1 = new List<Tuple<string, DateTime>>();
var expected2 = new List<Tuple<string, DateTime>>();
var lastExpectedTimeout = DateTime.UtcNow;
var finishedAdding1 = false;
var finishedAdding2 = false;

new Thread(() =>
{
var sagaId = Guid.NewGuid();
for (var i = 0; i < insertsPerThread; i++)
{
var td = new TimeoutData
{
SagaId = sagaId,
Destination = new Address("queue", "machine"),
Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(1, 20)),
OwningTimeoutManager = string.Empty,
};
persister.Add(td);
expected1.Add(new Tuple<string, DateTime>(td.Id, td.Time));
lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout;
}
finishedAdding1 = true;
Console.WriteLine("*** Finished adding ***");
}).Start();

new Thread(() =>
{
using (var store = new DocumentStore
{
Url = "http://localhost:8080",
DefaultDatabase = db,
}.Initialize())
{
var persister2 = new RavenTimeoutPersistence(new StoreAccessor(store));
var sagaId = Guid.NewGuid();
for (var i = 0; i < insertsPerThread; i++)
{
var td = new TimeoutData
{
SagaId = sagaId,
Destination = new Address("queue", "machine"),
Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(1, 20)),
OwningTimeoutManager = string.Empty,
};
persister2.Add(td);
expected2.Add(new Tuple<string, DateTime>(td.Id, td.Time));
lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout;
}
}
finishedAdding2 = true;
Console.WriteLine("*** Finished adding via a second client connection ***");
}).Start();

// Mimic the behavior of the TimeoutPersister coordinator
var found = 0;
TimeoutData tempTd;
while (!finishedAdding1 || !finishedAdding2 || startSlice < lastExpectedTimeout)
{
DateTime nextRetrieval;
var timeoutDatas = persister.GetNextChunk(startSlice, out nextRetrieval);
foreach (var timeoutData in timeoutDatas)
{
if (startSlice < timeoutData.Item2)
{
startSlice = timeoutData.Item2;
}

Assert.IsTrue(persister.TryRemove(timeoutData.Item1, out tempTd)); // Raven returns duplicates, so we can't assert on this here
found++;
}
}

WaitForIndexing(documentStore);

// If the persister reports stale results have been seen at one point during its normal operation,
// we need to perform manual cleaup.
while (true)
{
var chunkToCleanup = persister.GetCleanupChunk(DateTime.UtcNow.AddDays(1)).ToArray();
Console.WriteLine("Cleanup: got a chunk of size " + chunkToCleanup.Length);
if (chunkToCleanup.Length == 0) break;

found += chunkToCleanup.Length;
foreach (var tuple in chunkToCleanup)
{
Assert.IsTrue(persister.TryRemove(tuple.Item1, out tempTd));
}

WaitForIndexing(documentStore);
}

using (var session = documentStore.OpenSession())
{
var results = session.Query<TimeoutData>().ToList();
Assert.AreEqual(0, results.Count);
}

Assert.AreEqual(expected1.Count + expected2.Count, found);
}

IDocumentStore documentStore;
RavenTimeoutPersistence persister;

[TearDown]
public void TearDown()
{
if (documentStore != null)
documentStore.Dispose();
}

static void WaitForIndexing(IDocumentStore store, string db = null, TimeSpan? timeout = null)
{
var databaseCommands = store.DatabaseCommands;
if (db != null)
databaseCommands = databaseCommands.ForDatabase(db);
var spinUntil = SpinWait.SpinUntil(() => databaseCommands.GetStatistics().StaleIndexes.Length == 0, timeout ?? TimeSpan.FromSeconds(20));
Assert.True(spinUntil);
}

static class RandomProvider
{
private static int seed = Environment.TickCount;

private static ThreadLocal<Random> randomWrapper = new ThreadLocal<Random>(() =>
new Random(Interlocked.Increment(ref seed))
);

public static Random GetThreadRandom()
{
return randomWrapper.Value;
}
}
}
}
24 changes: 17 additions & 7 deletions src/NServiceBus.Core.Tests/Timeout/When_pooling_timeouts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace NServiceBus.Core.Tests.Timeout
using Raven.Client.Embedded;

[TestFixture]
[Explicit]
public class When_pooling_timeouts_with_raven : When_pooling_timeouts
{
private IDocumentStore store;
Expand All @@ -37,10 +36,21 @@ public void Cleanup()
{
store.Dispose();
}

[Test]
public void Should_retrieve_all_timeout_messages_that_expired_even_if_it_needs_to_page()
{
expected = 1024 + 5;

Enumerable.Range(1, expected).ToList().ForEach(i => persister.Add(CreateData(DateTime.UtcNow.AddSeconds(-5))));

StartAndStopReceiver(5);

WaitForMessagesThenAssert(5);
}
}

[TestFixture]
[Explicit]
public class When_pooling_timeouts_with_inMemory : When_pooling_timeouts
{
protected override IPersistTimeouts CreateTimeoutPersister()
Expand All @@ -54,7 +64,7 @@ public abstract class When_pooling_timeouts
private IManageTimeouts manager;
private FakeMessageSender messageSender;
readonly Random rand = new Random();
private int expected;
protected int expected;

protected IPersistTimeouts persister;
protected TimeoutPersisterReceiver receiver;
Expand Down Expand Up @@ -187,14 +197,14 @@ private void Push(int total, DateTime time)
Enumerable.Range(1, total).ToList().ForEach(i => manager.PushTimeout(CreateData(time)));
}

private void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1)
protected void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1)
{
receiver.Start();
Thread.Sleep(TimeSpan.FromSeconds(secondsToWaitBeforeCallingStop));
receiver.Stop();
}

private static TimeoutData CreateData(DateTime time)
protected static TimeoutData CreateData(DateTime time)
{
return new TimeoutData
{
Expand All @@ -204,7 +214,7 @@ private static TimeoutData CreateData(DateTime time)
};
}

private void WaitForMessagesThenAssert(int maxSecondsToWait)
protected void WaitForMessagesThenAssert(int maxSecondsToWait)
{
var maxTime = DateTime.Now.AddSeconds(maxSecondsToWait);

Expand All @@ -216,4 +226,4 @@ private void WaitForMessagesThenAssert(int maxSecondsToWait)
Assert.AreEqual(expected, messageSender.MessagesSent);
}
}
}
}
Loading

0 comments on commit 581eee9

Please sign in to comment.