From 4d48da038ea44d42dc0de52ae70b4c260f52e5e0 Mon Sep 17 00:00:00 2001 From: Itamar Syn-Hershko Date: Wed, 16 Jul 2014 02:42:41 +0300 Subject: [PATCH 1/8] Don't lose timeouts Backwards port of the fix to #2133 --- .../Persistence/RavenTimeoutPersistence.cs | 129 +++++---- .../NServiceBus.Timeout.Tests.csproj | 1 + .../RavenTimeoutPersisterTests.cs | 263 ++++++++++++++++++ 3 files changed, 333 insertions(+), 60 deletions(-) create mode 100644 tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs diff --git a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs index 7e376b8a2fe..4e76adb5d80 100644 --- a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs +++ b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs @@ -14,6 +14,10 @@ public class RavenTimeoutPersistence : IPersistTimeouts { readonly IDocumentStore store; + public TimeSpan CleanupGapFromTimeslice { get; set; } + public TimeSpan TriggerCleanupEvery { get; set; } + DateTime lastCleanupTime = DateTime.MinValue; + public RavenTimeoutPersistence(IDocumentStore store) { this.store = store; @@ -42,7 +46,40 @@ public RavenTimeoutPersistence(IDocumentStore store) Map = docs => from doc in docs select new { doc.SagaId } }, true); - + + TriggerCleanupEvery = TimeSpan.FromMinutes(2); + CleanupGapFromTimeslice = TimeSpan.FromMinutes(1); + } + + private static IRavenQueryable GetChunkQuery(IDocumentSession session) + { + session.Advanced.AllowNonAuthoritativeInformation = true; + return session.Query("RavenTimeoutPersistence/TimeoutDataSortedByTime") + .OrderBy(t => t.Time) + .Where(t => + t.OwningTimeoutManager == String.Empty || + t.OwningTimeoutManager == Configure.EndpointName); + } + + public IEnumerable> GetCleanupChunk(DateTime startSlice) + { + using (var session = OpenSession()) + { + var chunk = GetChunkQuery(session) + .Where(t => t.Time <= startSlice.Subtract(CleanupGapFromTimeslice)) + .Select(t => new + { + t.Id, + t.Time + }) + .Take(1024) + .ToList() + .Select(arg => new Tuple(arg.Id, arg.Time)); + + lastCleanupTime = DateTime.UtcNow; + + return chunk; + } } public List> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery) @@ -50,70 +87,45 @@ public List> GetNextChunk(DateTime startSlice, out DateT try { var now = DateTime.UtcNow; - var skip = 0; var results = new List>(); - var numberOfRequestsExecutedSoFar = 0; - RavenQueryStatistics stats; - do + // Allow for occasionally cleaning up old timeouts for edge cases where timeouts have been + // added after startSlice have been set to a later timout and we might have missed them + // because of stale indexes. + if (lastCleanupTime.Add(TriggerCleanupEvery) > now || lastCleanupTime == DateTime.MinValue) { - using (var session = OpenSession()) - { - session.Advanced.AllowNonAuthoritativeInformation = true; - - var query = session.Query("RavenTimeoutPersistence/TimeoutDataSortedByTime") - .Where( - t => - t.OwningTimeoutManager == String.Empty || - t.OwningTimeoutManager == Configure.EndpointName) - .Where( - t => - t.Time > startSlice && - t.Time <= now) - .OrderBy(t => t.Time) - .Select(t => new {t.Id, t.Time}) - .Statistics(out stats); - do - { - results.AddRange(query - .Skip(skip) - .Take(1024) - .ToList() - .Select(arg => new Tuple(arg.Id, arg.Time))); - - skip += 1024; - } while (skip < stats.TotalResults && - ++numberOfRequestsExecutedSoFar < session.Advanced.MaxNumberOfRequestsPerSession); - } - } while (skip < stats.TotalResults); + results.AddRange(GetCleanupChunk(startSlice)); + } + RavenQueryStatistics stats; using (var session = OpenSession()) { - session.Advanced.AllowNonAuthoritativeInformation = true; - - //Retrieve next time we need to run query - var startOfNextChunk = - session.Query("RavenTimeoutPersistence/TimeoutDataSortedByTime") - .Where( - t => - t.OwningTimeoutManager == String.Empty || - t.OwningTimeoutManager == Configure.EndpointName) - .Where(t => t.Time > now) - .OrderBy(t => t.Time) - .Select(t => new {t.Id, t.Time}) - .FirstOrDefault(); - - if (startOfNextChunk != null) - { - nextTimeToRunQuery = startOfNextChunk.Time; - } - else - { - nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10); - } + var query = GetChunkQuery(session) + .Where( + t => + t.Time > startSlice && + t.Time <= now) + .Select(t => new {t.Id, t.Time}) + .Statistics(out stats) + .Take(1024); + + results.AddRange(query + .ToList() + .Select(arg => new Tuple(arg.Id, arg.Time))); + } - return results; + // Set next execution to be now if we haven't consumed the entire thing or received stale results. + // Delay the next execution a bit if we results weren't stale and we got the full chunk. + if (stats.TotalResults > 1024 || stats.IsStale) + { + nextTimeToRunQuery = now; + } + else + { + nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10); } + + return results; } catch (Exception) { @@ -151,9 +163,6 @@ public bool TryRemove(string timeoutId, out TimeoutData timeoutData) if (timeoutData == null) return false; - timeoutData.Time = DateTime.UtcNow.AddYears(-1); - session.SaveChanges(); - session.Delete(timeoutData); session.SaveChanges(); diff --git a/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj b/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj index dfa6ce3a6d9..29496992794 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj +++ b/tests/timeout/NServiceBus.Timeout.Tests/NServiceBus.Timeout.Tests.csproj @@ -108,6 +108,7 @@ + diff --git a/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs b/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs new file mode 100644 index 00000000000..8a469f789e0 --- /dev/null +++ b/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs @@ -0,0 +1,263 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using NServiceBus.Timeout.Core; +using NServiceBus.Timeout.Hosting.Windows.Persistence; +using NUnit.Framework; +using Raven.Client; +using Raven.Client.Document; + +namespace NServiceBus.Timeout.Tests +{ + public class RavenTimeoutPersisterTests + { + static RavenTimeoutPersisterTests() + { + Configure.GetEndpointNameAction = () => string.Empty; + } + + [TestCase] + public void Should_not_skip_timeouts() + { + var db = Guid.NewGuid().ToString(); + documentStore = new DocumentStore + { + Url = "http://localhost:8080", + DefaultDatabase = db, + }.Initialize(); + persister = new RavenTimeoutPersistence(documentStore) + { + TriggerCleanupEvery = TimeSpan.FromHours(1), // Make sure cleanup doesn't run automatically + }; + + var startSlice = DateTime.UtcNow.AddYears(-10); + // avoid cleanup from running during the test by making it register as being run + Assert.AreEqual(0, persister.GetCleanupChunk(startSlice).Count()); + + var expected = new List>(); + var lastExpectedTimeout = DateTime.UtcNow; + var finishedAdding = false; + + new Thread(() => + { + var sagaId = Guid.NewGuid(); + for (var i = 0; i < 10000; i++) + { + var td = new TimeoutData + { + SagaId = sagaId, + Destination = new Address("queue", "machine"), + Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(5, 20)), + OwningTimeoutManager = string.Empty, + }; + persister.Add(td); + expected.Add(new Tuple(td.Id, td.Time)); + lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout; + } + finishedAdding = true; + Console.WriteLine("*** Finished adding ***"); + }).Start(); + + // Mimic the behavior of the TimeoutPersister coordinator + var found = 0; + TimeoutData tempTd; + while (!finishedAdding || startSlice < lastExpectedTimeout) + { + DateTime nextRetrieval; + var timeoutDatas = persister.GetNextChunk(startSlice, out nextRetrieval); + foreach (var timeoutData in timeoutDatas) + { + if (startSlice < timeoutData.Item2) + { + startSlice = timeoutData.Item2; + } + + if (persister.TryRemove(timeoutData.Item1, out tempTd)) + found++; + } + } + + WaitForIndexing(documentStore); + + // If the persister reports stale results have been seen at one point during its normal operation, + // we need to perform manual cleaup. + while (true) + { + var chunkToCleanup = persister.GetCleanupChunk(DateTime.UtcNow.AddDays(1)).ToArray(); + Console.WriteLine("Cleanup: got a chunk of size " + chunkToCleanup.Length); + if (chunkToCleanup.Length == 0) break; + + found += chunkToCleanup.Length; + foreach (var tuple in chunkToCleanup) + { + Assert.IsTrue(persister.TryRemove(tuple.Item1, out tempTd)); + } + + WaitForIndexing(documentStore); + } + + using (var session = documentStore.OpenSession()) + { + var results = session.Query().ToList(); + Assert.AreEqual(0, results.Count); + } + + Assert.AreEqual(expected.Count, found); + } + + [TestCase] + public void Should_not_skip_timeouts_also_with_multiple_clients_adding_timeouts() + { + var db = Guid.NewGuid().ToString(); + documentStore = new DocumentStore + { + Url = "http://localhost:8080", + DefaultDatabase = db, + }.Initialize(); + persister = new RavenTimeoutPersistence(documentStore) + { + TriggerCleanupEvery = TimeSpan.FromDays(1), // Make sure cleanup doesn't run automatically + }; + + var startSlice = DateTime.UtcNow.AddYears(-10); + // avoid cleanup from running during the test by making it register as being run + Assert.AreEqual(0, persister.GetCleanupChunk(startSlice).Count()); + + const int insertsPerThread = 10000; + var expected1 = new List>(); + var expected2 = new List>(); + var lastExpectedTimeout = DateTime.UtcNow; + var finishedAdding1 = false; + var finishedAdding2 = false; + + new Thread(() => + { + var sagaId = Guid.NewGuid(); + for (var i = 0; i < insertsPerThread; i++) + { + var td = new TimeoutData + { + SagaId = sagaId, + Destination = new Address("queue", "machine"), + Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(1, 20)), + OwningTimeoutManager = string.Empty, + }; + persister.Add(td); + expected1.Add(new Tuple(td.Id, td.Time)); + lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout; + } + finishedAdding1 = true; + Console.WriteLine("*** Finished adding ***"); + }).Start(); + + new Thread(() => + { + using (var store = new DocumentStore + { + Url = "http://localhost:8080", + DefaultDatabase = db, + }.Initialize()) + { + var persister2 = new RavenTimeoutPersistence(store); + + var sagaId = Guid.NewGuid(); + for (var i = 0; i < insertsPerThread; i++) + { + var td = new TimeoutData + { + SagaId = sagaId, + Destination = new Address("queue", "machine"), + Time = DateTime.UtcNow.AddSeconds(RandomProvider.GetThreadRandom().Next(1, 20)), + OwningTimeoutManager = string.Empty, + }; + persister2.Add(td); + expected2.Add(new Tuple(td.Id, td.Time)); + lastExpectedTimeout = (td.Time > lastExpectedTimeout) ? td.Time : lastExpectedTimeout; + } + } + finishedAdding2 = true; + Console.WriteLine("*** Finished adding via a second client connection ***"); + }).Start(); + + // Mimic the behavior of the TimeoutPersister coordinator + var found = 0; + TimeoutData tempTd; + while (!finishedAdding1 || !finishedAdding2 || startSlice < lastExpectedTimeout) + { + DateTime nextRetrieval; + var timeoutDatas = persister.GetNextChunk(startSlice, out nextRetrieval); + foreach (var timeoutData in timeoutDatas) + { + if (startSlice < timeoutData.Item2) + { + startSlice = timeoutData.Item2; + } + + if(persister.TryRemove(timeoutData.Item1, out tempTd)) + found++; + } + } + + WaitForIndexing(documentStore); + + // If the persister reports stale results have been seen at one point during its normal operation, + // we need to perform manual cleaup. + while (true) + { + var chunkToCleanup = persister.GetCleanupChunk(DateTime.UtcNow.AddDays(1)).ToArray(); + Console.WriteLine("Cleanup: got a chunk of size " + chunkToCleanup.Length); + if (chunkToCleanup.Length == 0) break; + + found += chunkToCleanup.Length; + foreach (var tuple in chunkToCleanup) + { + Assert.IsTrue(persister.TryRemove(tuple.Item1, out tempTd)); + } + + WaitForIndexing(documentStore); + } + + using (var session = documentStore.OpenSession()) + { + var results = session.Query().ToList(); + Assert.AreEqual(0, results.Count); + } + + Assert.AreEqual(expected1.Count + expected2.Count, found); + } + + IDocumentStore documentStore; + RavenTimeoutPersistence persister; + + [TearDown] + public void TearDown() + { + if (documentStore != null) + documentStore.Dispose(); + } + + static void WaitForIndexing(IDocumentStore store, string db = null, TimeSpan? timeout = null) + { + var databaseCommands = store.DatabaseCommands; + if (db != null) + databaseCommands = databaseCommands.ForDatabase(db); + var spinUntil = SpinWait.SpinUntil(() => databaseCommands.GetStatistics().StaleIndexes.Length == 0, timeout ?? TimeSpan.FromSeconds(20)); + Assert.True(spinUntil); + } + + static class RandomProvider + { + private static int seed = Environment.TickCount; + + private static ThreadLocal randomWrapper = new ThreadLocal(() => + new Random(Interlocked.Increment(ref seed)) + ); + + public static Random GetThreadRandom() + { + return randomWrapper.Value; + } + } + } +} From 8a8afdd523017f044788b234c89186cf83f1b8bb Mon Sep 17 00:00:00 2001 From: John Simons Date: Wed, 16 Jul 2014 12:33:53 +1000 Subject: [PATCH 2/8] Touching file to trigger build --- .../Persistence/RavenTimeoutPersistence.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs index 4e76adb5d80..51ade9b8c1b 100644 --- a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs +++ b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs @@ -13,10 +13,10 @@ namespace NServiceBus.Timeout.Hosting.Windows.Persistence public class RavenTimeoutPersistence : IPersistTimeouts { readonly IDocumentStore store; - + DateTime lastCleanupTime = DateTime.MinValue; + public TimeSpan CleanupGapFromTimeslice { get; set; } public TimeSpan TriggerCleanupEvery { get; set; } - DateTime lastCleanupTime = DateTime.MinValue; public RavenTimeoutPersistence(IDocumentStore store) { @@ -194,4 +194,4 @@ IDocumentSession OpenSession() static readonly ILog Logger = LogManager.GetLogger("RavenTimeoutPersistence"); } -} \ No newline at end of file +} From 8f9bb9efad7ce16ee8047407b6628181938d1b4b Mon Sep 17 00:00:00 2001 From: Itamar Syn-Hershko Date: Thu, 17 Jul 2014 01:30:46 +0300 Subject: [PATCH 3/8] Fix test --- .../Persistence/RavenTimeoutPersistence.cs | 4 ++-- .../RavenTimeoutPersisterTests.cs | 4 +++- .../When_fetching_timeouts_from_storage.cs | 14 ++++++++++++-- .../When_pooling_timeouts.cs | 2 +- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs index 4e76adb5d80..2e426aa95ba 100644 --- a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs +++ b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs @@ -12,7 +12,7 @@ namespace NServiceBus.Timeout.Hosting.Windows.Persistence public class RavenTimeoutPersistence : IPersistTimeouts { - readonly IDocumentStore store; + public readonly IDocumentStore store; public TimeSpan CleanupGapFromTimeslice { get; set; } public TimeSpan TriggerCleanupEvery { get; set; } @@ -92,7 +92,7 @@ public List> GetNextChunk(DateTime startSlice, out DateT // Allow for occasionally cleaning up old timeouts for edge cases where timeouts have been // added after startSlice have been set to a later timout and we might have missed them // because of stale indexes. - if (lastCleanupTime.Add(TriggerCleanupEvery) > now || lastCleanupTime == DateTime.MinValue) + if (TriggerCleanupEvery == TimeSpan.MinValue || lastCleanupTime == DateTime.MinValue || lastCleanupTime.Add(TriggerCleanupEvery) > now) { results.AddRange(GetCleanupChunk(startSlice)); } diff --git a/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs b/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs index 8a469f789e0..4b4a0e066a8 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs +++ b/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs @@ -7,6 +7,8 @@ using NUnit.Framework; using Raven.Client; using Raven.Client.Document; +using Raven.Client.Embedded; +using Raven.Database; namespace NServiceBus.Timeout.Tests { @@ -237,7 +239,7 @@ public void TearDown() documentStore.Dispose(); } - static void WaitForIndexing(IDocumentStore store, string db = null, TimeSpan? timeout = null) + public static void WaitForIndexing(IDocumentStore store, string db = null, TimeSpan? timeout = null) { var databaseCommands = store.DatabaseCommands; if (db != null) diff --git a/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs b/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs index 0591937e63a..681d06923b4 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs +++ b/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs @@ -12,7 +12,7 @@ namespace NServiceBus.Timeout.Tests [TestFixture] public class When_fetching_timeouts_from_storage_with_raven : When_fetching_timeouts_from_storage { - private IDocumentStore store; + protected IDocumentStore store; protected override IPersistTimeouts CreateTimeoutPersister() { @@ -138,11 +138,21 @@ public void Should_set_the_next_run() }); DateTime nextTimeToRunQuery; + var ravenPersiter = persister as RavenTimeoutPersistence; + if (ravenPersiter != null) + { + RavenTimeoutPersisterTests.WaitForIndexing(ravenPersiter.store); + persister.GetNextChunk(DateTime.UtcNow.AddYears(-3), out nextTimeToRunQuery); + + Assert.LessOrEqual(nextTimeToRunQuery.Ticks, DateTime.UtcNow.AddMinutes(10).Ticks); + } + persister.GetNextChunk(DateTime.UtcNow.AddYears(-3), out nextTimeToRunQuery); var totalMilliseconds = (expected - nextTimeToRunQuery).Duration().TotalMilliseconds; - Assert.True(totalMilliseconds < 200); + if (ravenPersiter == null) + Assert.Less(totalMilliseconds, 200); } protected List> GetNextChunk() diff --git a/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs b/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs index 49fe74ada12..f04fe892b93 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs +++ b/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs @@ -26,7 +26,7 @@ protected override IPersistTimeouts CreateTimeoutPersister() store.Conventions.MaxNumberOfRequestsPerSession = 10; store.Initialize(); - return new RavenTimeoutPersistence(store); + return new RavenTimeoutPersistence(store){CleanupGapFromTimeslice = TimeSpan.FromSeconds(1), TriggerCleanupEvery = TimeSpan.MinValue}; } [TearDown] From 521ef2db317844932a5d7fc4243fd80f4bdeadfa Mon Sep 17 00:00:00 2001 From: Itamar Syn-Hershko Date: Thu, 17 Jul 2014 02:34:24 +0300 Subject: [PATCH 4/8] Minimizing the wait time for nextTimeToQuery --- .../Persistence/RavenTimeoutPersistence.cs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs index 2e426aa95ba..a78f282a694 100644 --- a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs +++ b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs @@ -97,6 +97,7 @@ public List> GetNextChunk(DateTime startSlice, out DateT results.AddRange(GetCleanupChunk(startSlice)); } + DateTime beginningOfNextChunk; RavenQueryStatistics stats; using (var session = OpenSession()) { @@ -105,25 +106,31 @@ public List> GetNextChunk(DateTime startSlice, out DateT t => t.Time > startSlice && t.Time <= now) - .Select(t => new {t.Id, t.Time}) + .Select(t => new { t.Id, t.Time }) .Statistics(out stats) .Take(1024); results.AddRange(query .ToList() .Select(arg => new Tuple(arg.Id, arg.Time))); + + beginningOfNextChunk = GetChunkQuery(session) + .Where(t => t.Time > now) + .Take(1) + .Select(t => t.Time) + .FirstOrDefault(); } + nextTimeToRunQuery = (beginningOfNextChunk == default(DateTime)) + ? nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10) + : beginningOfNextChunk.ToUniversalTime(); + // Set next execution to be now if we haven't consumed the entire thing or received stale results. // Delay the next execution a bit if we results weren't stale and we got the full chunk. if (stats.TotalResults > 1024 || stats.IsStale) { nextTimeToRunQuery = now; } - else - { - nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10); - } return results; } From c34b0d2fdf420af8609d5c636bc01ebe5a3b5321 Mon Sep 17 00:00:00 2001 From: Itamar Syn-Hershko Date: Thu, 17 Jul 2014 02:34:57 +0300 Subject: [PATCH 5/8] Revert "Fix test" This reverts commit 8f9bb9efad7ce16ee8047407b6628181938d1b4b. --- .../Persistence/RavenTimeoutPersistence.cs | 4 ++-- .../RavenTimeoutPersisterTests.cs | 4 +--- .../When_fetching_timeouts_from_storage.cs | 14 ++------------ .../When_pooling_timeouts.cs | 2 +- 4 files changed, 6 insertions(+), 18 deletions(-) diff --git a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs index a78f282a694..44eee8d11ae 100644 --- a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs +++ b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs @@ -12,7 +12,7 @@ namespace NServiceBus.Timeout.Hosting.Windows.Persistence public class RavenTimeoutPersistence : IPersistTimeouts { - public readonly IDocumentStore store; + readonly IDocumentStore store; public TimeSpan CleanupGapFromTimeslice { get; set; } public TimeSpan TriggerCleanupEvery { get; set; } @@ -92,7 +92,7 @@ public List> GetNextChunk(DateTime startSlice, out DateT // Allow for occasionally cleaning up old timeouts for edge cases where timeouts have been // added after startSlice have been set to a later timout and we might have missed them // because of stale indexes. - if (TriggerCleanupEvery == TimeSpan.MinValue || lastCleanupTime == DateTime.MinValue || lastCleanupTime.Add(TriggerCleanupEvery) > now) + if (lastCleanupTime.Add(TriggerCleanupEvery) > now || lastCleanupTime == DateTime.MinValue) { results.AddRange(GetCleanupChunk(startSlice)); } diff --git a/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs b/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs index 4b4a0e066a8..8a469f789e0 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs +++ b/tests/timeout/NServiceBus.Timeout.Tests/RavenTimeoutPersisterTests.cs @@ -7,8 +7,6 @@ using NUnit.Framework; using Raven.Client; using Raven.Client.Document; -using Raven.Client.Embedded; -using Raven.Database; namespace NServiceBus.Timeout.Tests { @@ -239,7 +237,7 @@ public void TearDown() documentStore.Dispose(); } - public static void WaitForIndexing(IDocumentStore store, string db = null, TimeSpan? timeout = null) + static void WaitForIndexing(IDocumentStore store, string db = null, TimeSpan? timeout = null) { var databaseCommands = store.DatabaseCommands; if (db != null) diff --git a/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs b/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs index 681d06923b4..0591937e63a 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs +++ b/tests/timeout/NServiceBus.Timeout.Tests/When_fetching_timeouts_from_storage.cs @@ -12,7 +12,7 @@ namespace NServiceBus.Timeout.Tests [TestFixture] public class When_fetching_timeouts_from_storage_with_raven : When_fetching_timeouts_from_storage { - protected IDocumentStore store; + private IDocumentStore store; protected override IPersistTimeouts CreateTimeoutPersister() { @@ -138,21 +138,11 @@ public void Should_set_the_next_run() }); DateTime nextTimeToRunQuery; - var ravenPersiter = persister as RavenTimeoutPersistence; - if (ravenPersiter != null) - { - RavenTimeoutPersisterTests.WaitForIndexing(ravenPersiter.store); - persister.GetNextChunk(DateTime.UtcNow.AddYears(-3), out nextTimeToRunQuery); - - Assert.LessOrEqual(nextTimeToRunQuery.Ticks, DateTime.UtcNow.AddMinutes(10).Ticks); - } - persister.GetNextChunk(DateTime.UtcNow.AddYears(-3), out nextTimeToRunQuery); var totalMilliseconds = (expected - nextTimeToRunQuery).Duration().TotalMilliseconds; - if (ravenPersiter == null) - Assert.Less(totalMilliseconds, 200); + Assert.True(totalMilliseconds < 200); } protected List> GetNextChunk() diff --git a/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs b/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs index f04fe892b93..49fe74ada12 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs +++ b/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs @@ -26,7 +26,7 @@ protected override IPersistTimeouts CreateTimeoutPersister() store.Conventions.MaxNumberOfRequestsPerSession = 10; store.Initialize(); - return new RavenTimeoutPersistence(store){CleanupGapFromTimeslice = TimeSpan.FromSeconds(1), TriggerCleanupEvery = TimeSpan.MinValue}; + return new RavenTimeoutPersistence(store); } [TearDown] From c88e11c53981b57cc16f5f92cb69425406ffa829 Mon Sep 17 00:00:00 2001 From: Itamar Syn-Hershko Date: Thu, 17 Jul 2014 02:51:58 +0300 Subject: [PATCH 6/8] Optimization --- .../Persistence/RavenTimeoutPersistence.cs | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs index 44eee8d11ae..762a286e036 100644 --- a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs +++ b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs @@ -97,7 +97,6 @@ public List> GetNextChunk(DateTime startSlice, out DateT results.AddRange(GetCleanupChunk(startSlice)); } - DateTime beginningOfNextChunk; RavenQueryStatistics stats; using (var session = OpenSession()) { @@ -113,24 +112,29 @@ public List> GetNextChunk(DateTime startSlice, out DateT results.AddRange(query .ToList() .Select(arg => new Tuple(arg.Id, arg.Time))); - - beginningOfNextChunk = GetChunkQuery(session) - .Where(t => t.Time > now) - .Take(1) - .Select(t => t.Time) - .FirstOrDefault(); } - nextTimeToRunQuery = (beginningOfNextChunk == default(DateTime)) - ? nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10) - : beginningOfNextChunk.ToUniversalTime(); - // Set next execution to be now if we haven't consumed the entire thing or received stale results. // Delay the next execution a bit if we results weren't stale and we got the full chunk. if (stats.TotalResults > 1024 || stats.IsStale) { nextTimeToRunQuery = now; } + else + { + using (var session = OpenSession()) + { + var beginningOfNextChunk = GetChunkQuery(session) + .Where(t => t.Time > now) + .Take(1) + .Select(t => t.Time) + .FirstOrDefault(); + + nextTimeToRunQuery = (beginningOfNextChunk == default(DateTime)) + ? nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10) + : beginningOfNextChunk.ToUniversalTime(); + } + } return results; } From 809fcbb0069925094218dc006bdcdf612e88497f Mon Sep 17 00:00:00 2001 From: John Simons Date: Thu, 17 Jul 2014 10:52:15 +1000 Subject: [PATCH 7/8] Added back the loop that keeps querying for more results. Otherwise we could skip results that are suppose to trigger at the same time because the startSlice uses greater then logic. --- .../Persistence/RavenTimeoutPersistence.cs | 60 ++++++++++++------- .../When_pooling_timeouts.cs | 20 +++++-- 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs index 762a286e036..6bfbb9e5089 100644 --- a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs +++ b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs @@ -97,26 +97,44 @@ public List> GetNextChunk(DateTime startSlice, out DateT results.AddRange(GetCleanupChunk(startSlice)); } + var skip = 0; + var numberOfRequestsExecutedSoFar = 0; RavenQueryStatistics stats; - using (var session = OpenSession()) + do { - var query = GetChunkQuery(session) - .Where( - t => - t.Time > startSlice && - t.Time <= now) - .Select(t => new { t.Id, t.Time }) - .Statistics(out stats) - .Take(1024); - - results.AddRange(query - .ToList() - .Select(arg => new Tuple(arg.Id, arg.Time))); - } + using (var session = OpenSession()) + { + session.Advanced.AllowNonAuthoritativeInformation = true; + + var query = session.Query("RavenTimeoutPersistence/TimeoutDataSortedByTime") + .Where( + t => + t.OwningTimeoutManager == String.Empty || + t.OwningTimeoutManager == Configure.EndpointName) + .Where( + t => + t.Time > startSlice && + t.Time <= now) + .OrderBy(t => t.Time) + .Select(t => new { t.Id, t.Time }) + .Statistics(out stats); + do + { + results.AddRange(query + .Skip(skip) + .Take(1024) + .ToList() + .Select(arg => new Tuple(arg.Id, arg.Time))); + + skip += 1024; + } while (skip < stats.TotalResults && + ++numberOfRequestsExecutedSoFar < session.Advanced.MaxNumberOfRequestsPerSession); + } + } while (skip < stats.TotalResults); - // Set next execution to be now if we haven't consumed the entire thing or received stale results. + // Set next execution to be now if we received stale results. // Delay the next execution a bit if we results weren't stale and we got the full chunk. - if (stats.TotalResults > 1024 || stats.IsStale) + if (stats.IsStale) { nextTimeToRunQuery = now; } @@ -125,13 +143,13 @@ public List> GetNextChunk(DateTime startSlice, out DateT using (var session = OpenSession()) { var beginningOfNextChunk = GetChunkQuery(session) - .Where(t => t.Time > now) - .Take(1) - .Select(t => t.Time) - .FirstOrDefault(); + .Where(t => t.Time > now) + .Take(1) + .Select(t => t.Time) + .FirstOrDefault(); nextTimeToRunQuery = (beginningOfNextChunk == default(DateTime)) - ? nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10) + ? DateTime.UtcNow.AddMinutes(10) : beginningOfNextChunk.ToUniversalTime(); } } diff --git a/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs b/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs index 49fe74ada12..5ccb3d4bd75 100644 --- a/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs +++ b/tests/timeout/NServiceBus.Timeout.Tests/When_pooling_timeouts.cs @@ -34,6 +34,18 @@ public void Cleanup() { store.Dispose(); } + + [Test] + public void Should_retrieve_all_timeout_messages_that_expired_even_if_it_needs_to_page() + { + expected = 1024 + 5; + + Enumerable.Range(1, expected).ToList().ForEach(i => persister.Add(CreateData(DateTime.UtcNow.AddSeconds(-5)))); + + StartAndStopReceiver(5); + + WaitForMessagesThenAssert(5); + } } [TestFixture] @@ -50,7 +62,7 @@ public abstract class When_pooling_timeouts private IManageTimeouts manager; private FakeMessageSender messageSender; readonly Random rand = new Random(); - private int expected; + protected int expected; protected IPersistTimeouts persister; protected TimeoutPersisterReceiver receiver; @@ -182,14 +194,14 @@ private void Push(int total, DateTime time) Enumerable.Range(1, total).ToList().ForEach(i => manager.PushTimeout(CreateData(time))); } - private void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1) + protected void StartAndStopReceiver(int secondsToWaitBeforeCallingStop = 1) { receiver.Start(); Thread.Sleep(TimeSpan.FromSeconds(secondsToWaitBeforeCallingStop)); receiver.Stop(); } - private static TimeoutData CreateData(DateTime time) + protected static TimeoutData CreateData(DateTime time) { return new TimeoutData { @@ -199,7 +211,7 @@ private static TimeoutData CreateData(DateTime time) }; } - private void WaitForMessagesThenAssert(int maxSecondsToWait) + protected void WaitForMessagesThenAssert(int maxSecondsToWait) { var maxTime = DateTime.Now.AddSeconds(maxSecondsToWait); From 6349782784c85e71ebe90b20657e9e7416577fd0 Mon Sep 17 00:00:00 2001 From: John Simons Date: Thu, 17 Jul 2014 11:02:11 +1000 Subject: [PATCH 8/8] Making use of GetChunkQuery --- .../Persistence/RavenTimeoutPersistence.cs | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs index 6bfbb9e5089..6c134df1249 100644 --- a/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs +++ b/src/timeout/NServiceBus.Timeout.Hosting.Windows/Persistence/RavenTimeoutPersistence.cs @@ -106,17 +106,12 @@ public List> GetNextChunk(DateTime startSlice, out DateT { session.Advanced.AllowNonAuthoritativeInformation = true; - var query = session.Query("RavenTimeoutPersistence/TimeoutDataSortedByTime") - .Where( - t => - t.OwningTimeoutManager == String.Empty || - t.OwningTimeoutManager == Configure.EndpointName) - .Where( - t => - t.Time > startSlice && - t.Time <= now) - .OrderBy(t => t.Time) - .Select(t => new { t.Id, t.Time }) + var query = GetChunkQuery(session) + .Where( + t => + t.Time > startSlice && + t.Time <= now) + .Select(t => new { t.Id, t.Time }) .Statistics(out stats); do {