diff --git a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs index 7e376b8a2fe..6c134df1249 100644 --- a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs +++ b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs @@ -14,6 +14,10 @@ public class RavenTimeoutPersistence : IPersistTimeouts { readonly IDocumentStore store; + public TimeSpan CleanupGapFromTimeslice { get; set; } + public TimeSpan TriggerCleanupEvery { get; set; } + DateTime lastCleanupTime = DateTime.MinValue; + public RavenTimeoutPersistence(IDocumentStore store) { this.store = store; @@ -42,7 +46,40 @@ public RavenTimeoutPersistence(IDocumentStore store) Map = docs => from doc in docs select new { doc.SagaId } }, true); - + + TriggerCleanupEvery = TimeSpan.FromMinutes(2); + CleanupGapFromTimeslice = TimeSpan.FromMinutes(1); + } + + private static IRavenQueryable GetChunkQuery(IDocumentSession session) + { + session.Advanced.AllowNonAuthoritativeInformation = true; + return session.Query("RavenTimeoutPersistence/TimeoutDataSortedByTime") + .OrderBy(t => t.Time) + .Where(t => + t.OwningTimeoutManager == String.Empty || + t.OwningTimeoutManager == Configure.EndpointName); + } + + public IEnumerable> GetCleanupChunk(DateTime startSlice) + { + using (var session = OpenSession()) + { + var chunk = GetChunkQuery(session) + .Where(t => t.Time <= startSlice.Subtract(CleanupGapFromTimeslice)) + .Select(t => new + { + t.Id, + t.Time + }) + .Take(1024) + .ToList() + .Select(arg => new Tuple(arg.Id, arg.Time)); + + lastCleanupTime = DateTime.UtcNow; + + return chunk; + } } public List> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery) @@ -50,28 +87,31 @@ public List> GetNextChunk(DateTime startSlice, out DateT try { var now = DateTime.UtcNow; - var skip = 0; var results = new List>(); + + // Allow for occasionally cleaning up old timeouts for edge cases where timeouts have been + // added after startSlice have been set to a later timout and we might have missed them + // because of stale indexes. + if (lastCleanupTime.Add(TriggerCleanupEvery) > now || lastCleanupTime == DateTime.MinValue) + { + results.AddRange(GetCleanupChunk(startSlice)); + } + + var skip = 0; var numberOfRequestsExecutedSoFar = 0; RavenQueryStatistics stats; - do { using (var session = OpenSession()) { session.Advanced.AllowNonAuthoritativeInformation = true; - var query = session.Query("RavenTimeoutPersistence/TimeoutDataSortedByTime") - .Where( - t => - t.OwningTimeoutManager == String.Empty || - t.OwningTimeoutManager == Configure.EndpointName) - .Where( - t => - t.Time > startSlice && - t.Time <= now) - .OrderBy(t => t.Time) - .Select(t => new {t.Id, t.Time}) + var query = GetChunkQuery(session) + .Where( + t => + t.Time > startSlice && + t.Time <= now) + .Select(t => new { t.Id, t.Time }) .Statistics(out stats); do { @@ -87,33 +127,29 @@ public List> GetNextChunk(DateTime startSlice, out DateT } } while (skip < stats.TotalResults); - using (var session = OpenSession()) + // Set next execution to be now if we received stale results. + // Delay the next execution a bit if we results weren't stale and we got the full chunk. + if (stats.IsStale) + { + nextTimeToRunQuery = now; + } + else { - session.Advanced.AllowNonAuthoritativeInformation = true; - - //Retrieve next time we need to run query - var startOfNextChunk = - session.Query("RavenTimeoutPersistence/TimeoutDataSortedByTime") - .Where( - t => - t.OwningTimeoutManager == String.Empty || - t.OwningTimeoutManager == Configure.EndpointName) + using (var session = OpenSession()) + { + var beginningOfNextChunk = GetChunkQuery(session) .Where(t => t.Time > now) - .OrderBy(t => t.Time) - .Select(t => new {t.Id, t.Time}) + .Take(1) + .Select(t => t.Time) .FirstOrDefault(); - if (startOfNextChunk != null) - { - nextTimeToRunQuery = startOfNextChunk.Time; - } - else - { - nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10); + nextTimeToRunQuery = (beginningOfNextChunk == default(DateTime)) + ? DateTime.UtcNow.AddMinutes(10) + : beginningOfNextChunk.ToUniversalTime(); } - - return results; } + + return results; } catch (Exception) { @@ -151,9 +187,6 @@ public bool TryRemove(string timeoutId, out TimeoutData timeoutData) if (timeoutData == null) return false; - timeoutData.Time = DateTime.UtcNow.AddYears(-1); - session.SaveChanges(); - session.Delete(timeoutData); session.SaveChanges(); diff --git a/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj b/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj index dfa6ce3a6d9..29496992794 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj +++ b/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj @@ -108,6 +108,7 @@ + diff --git a/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs b/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs new file mode 100644 index 00000000000..8a469f789e0 --- /dev/null +++ b/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs @@ -0,0 +1,263 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using NServiceBus.Timeout.Core; +using NServiceBus.Timeout.Hosting.Windows.Persistence; +using NUnit.Framework; +using Raven.Client; +using Raven.Client.Document; + +namespace NServiceBus.Timeout.Tests +{ + public class RavenTimeoutPersisterTests + { + static RavenTimeoutPersisterTests() + { + Configure.GetEndpointNameAction = () => string.Empty; + } + + [TestCase] + public void Should_not_skip_timeouts() + { + var db = Guid.NewGuid().ToString(); + documentStore = new DocumentStore + { + Url = "http://localhost:8080", + DefaultDatabase = db, + }.Initialize(); + persister = new RavenTimeoutPersistence(documentStore) + { + TriggerCleanupEvery = TimeSpan.FromHours(1), // Make sure cleanup doesn't run automatically + }; + + var startSlice = DateTime.UtcNow.AddYears(-10); + // avoid cleanup from running during the test by making it register as being run + Assert.AreEqual(0, persister.GetCleanupChunk(startSlice).Count()); + + var expected = new List>(); + var lastExpectedTimeout = DateTime.UtcNow; + var finishedAdding = false; + + new Thread(() => + { + var sagaId = Guid.NewGuid(); + for (var i = 0; i < 10000; i++) + { + var td = new TimeoutData + { + SagaId = sagaId, + Destination = new Address("queue", "machine"), + Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(5, 20)), + OwningTimeoutManager = string.Empty, + }; + persister.Add(td); + expected.Add(new Tuple(td.Id, td.Time)); + lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout; + } + finishedAdding = true; + Console.WriteLine("*** Finished adding ***"); + }).Start(); + + // Mimic the behavior of the TimeoutPersister coordinator + var found = 0; + TimeoutData tempTd; + while (!finishedAdding || startSlice < lastExpectedTimeout) + { + DateTime nextRetrieval; + var timeoutDatas = persister.GetNextChunk(startSlice, out nextRetrieval); + foreach (var timeoutData in timeoutDatas) + { + if (startSlice < timeoutData.Item2) + { + startSlice = timeoutData.Item2; + } + + if (persister.TryRemove(timeoutData.Item1, out tempTd)) + found++; + } + } + + WaitForIndexing(documentStore); + + // If the persister reports stale results have been seen at one point during its normal operation, + // we need to perform manual cleaup. + while (true) + { + var chunkToCleanup = persister.GetCleanupChunk(DateTime.UtcNow.AddDays(1)).ToArray(); + Console.WriteLine("Cleanup: got a chunk of size " + chunkToCleanup.Length); + if (chunkToCleanup.Length == 0) break; + + found += chunkToCleanup.Length; + foreach (var tuple in chunkToCleanup) + { + Assert.IsTrue(persister.TryRemove(tuple.Item1, out tempTd)); + } + + WaitForIndexing(documentStore); + } + + using (var session = documentStore.OpenSession()) + { + var results = session.Query().ToList(); + Assert.AreEqual(0, results.Count); + } + + Assert.AreEqual(expected.Count, found); + } + + [TestCase] + public void Should_not_skip_timeouts_also_with_multiple_clients_adding_timeouts() + { + var db = Guid.NewGuid().ToString(); + documentStore = new DocumentStore + { + Url = "http://localhost:8080", + DefaultDatabase = db, + }.Initialize(); + persister = new RavenTimeoutPersistence(documentStore) + { + TriggerCleanupEvery = TimeSpan.FromDays(1), // Make sure cleanup doesn't run automatically + }; + + var startSlice = DateTime.UtcNow.AddYears(-10); + // avoid cleanup from running during the test by making it register as being run + Assert.AreEqual(0, persister.GetCleanupChunk(startSlice).Count()); + + const int insertsPerThread = 10000; + var expected1 = new List>(); + var expected2 = new List>(); + var lastExpectedTimeout = DateTime.UtcNow; + var finishedAdding1 = false; + var finishedAdding2 = false; + + new Thread(() => + { + var sagaId = Guid.NewGuid(); + for (var i = 0; i < insertsPerThread; i++) + { + var td = new TimeoutData + { + SagaId = sagaId, + Destination = new Address("queue", "machine"), + Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(1, 20)), + OwningTimeoutManager = string.Empty, + }; + persister.Add(td); + expected1.Add(new Tuple(td.Id, td.Time)); + lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout; + } + finishedAdding1 = true; + Console.WriteLine("*** Finished adding ***"); + }).Start(); + + new Thread(() => + { + using (var store = new DocumentStore + { + Url = "http://localhost:8080", + DefaultDatabase = db, + }.Initialize()) + { + var persister2 = new RavenTimeoutPersistence(store); + + var sagaId = Guid.NewGuid(); + for (var i = 0; i < insertsPerThread; i++) + { + var td = new TimeoutData + { + SagaId = sagaId, + Destination = new Address("queue", "machine"), + Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(1, 20)), + OwningTimeoutManager = string.Empty, + }; + persister2.Add(td); + expected2.Add(new Tuple(td.Id, td.Time)); + lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout; + } + } + finishedAdding2 = true; + Console.WriteLine("*** Finished adding via a second client connection ***"); + }).Start(); + + // Mimic the behavior of the TimeoutPersister coordinator + var found = 0; + TimeoutData tempTd; + while (!finishedAdding1 || !finishedAdding2 || startSlice < lastExpectedTimeout) + { + DateTime nextRetrieval; + var timeoutDatas = persister.GetNextChunk(startSlice, out nextRetrieval); + foreach (var timeoutData in timeoutDatas) + { + if (startSlice < timeoutData.Item2) + { + startSlice = timeoutData.Item2; + } + + if(persister.TryRemove(timeoutData.Item1, out tempTd)) + found++; + } + } + + WaitForIndexing(documentStore); + + // If the persister reports stale results have been seen at one point during its normal operation, + // we need to perform manual cleaup. + while (true) + { + var chunkToCleanup = persister.GetCleanupChunk(DateTime.UtcNow.AddDays(1)).ToArray(); + Console.WriteLine("Cleanup: got a chunk of size " + chunkToCleanup.Length); + if (chunkToCleanup.Length == 0) break; + + found += chunkToCleanup.Length; + foreach (var tuple in chunkToCleanup) + { + Assert.IsTrue(persister.TryRemove(tuple.Item1, out tempTd)); + } + + WaitForIndexing(documentStore); + } + + using (var session = documentStore.OpenSession()) + { + var results = session.Query().ToList(); + Assert.AreEqual(0, results.Count); + } + + Assert.AreEqual(expected1.Count + expected2.Count, found); + } + + IDocumentStore documentStore; + RavenTimeoutPersistence persister; + + [TearDown] + public void TearDown() + { + if (documentStore != null) + documentStore.Dispose(); + } + + static void WaitForIndexing(IDocumentStore store, string db = null, TimeSpan? timeout = null) + { + var databaseCommands = store.DatabaseCommands; + if (db != null) + databaseCommands = databaseCommands.ForDatabase(db); + var spinUntil = SpinWait.SpinUntil(() => databaseCommands.GetStatistics().StaleIndexes.Length == 0, timeout ?? TimeSpan.FromSeconds(20)); + Assert.True(spinUntil); + } + + static class RandomProvider + { + private static int seed = Environment.TickCount; + + private static ThreadLocal randomWrapper = new ThreadLocal(() => + new Random(Interlocked.Increment(ref seed)) + ); + + public static Random GetThreadRandom() + { + return randomWrapper.Value; + } + } + } +} diff --git a/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs b/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs index 49fe74ada12..5ccb3d4bd75 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs +++ b/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs @@ -34,6 +34,18 @@ public void Cleanup() { store.Dispose(); } + + [Test] + public void Should_retrieve_all_timeout_messages_that_expired_even_if_it_needs_to_page() + { + expected = 1024 + 5; + + Enumerable.Range(1, expected).ToList().ForEach(i => persister.Add(CreateData(DateTime.UtcNow.AddSeconds(-5)))); + + StartAndStopReceiver(5); + + WaitForMessagesThenAssert(5); + } } [TestFixture] @@ -50,7 +62,7 @@ public abstract class When_pooling_timeouts private IManageTimeouts manager; private FakeMessageSender messageSender; readonly Random rand = new Random(); - private int expected; + protected int expected; protected IPersistTimeouts persister; protected TimeoutPersisterReceiver receiver; @@ -182,14 +194,14 @@ private void Push(int total, DateTime time) Enumerable.Range(1, total).ToList().ForEach(i => manager.PushTimeout(CreateData(time))); } - private void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1) + protected void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1) { receiver.Start(); Thread.Sleep(TimeSpan.FromSeconds(secondsToWaitBeforeCallingStop)); receiver.Stop(); } - private static TimeoutData CreateData(DateTime time) + protected static TimeoutData CreateData(DateTime time) { return new TimeoutData { @@ -199,7 +211,7 @@ private static TimeoutData CreateData(DateTime time) }; } - private void WaitForMessagesThenAssert(int maxSecondsToWait) + protected void WaitForMessagesThenAssert(int maxSecondsToWait) { var maxTime = DateTime.Now.AddSeconds(maxSecondsToWait);