Skip to content

Commit

Permalink
Merge branch 'hotfix-3.3.9' into support-3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
John Simons committed Jul 17, 2014
2 parents 0ea5c06 + 6349782 commit 29cc04d
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ public class RavenTimeoutPersistence : IPersistTimeouts
{
readonly IDocumentStore store;

public TimeSpan CleanupGapFromTimeslice { get; set; }
public TimeSpan TriggerCleanupEvery { get; set; }
DateTime lastCleanupTime = DateTime.MinValue;

public RavenTimeoutPersistence(IDocumentStore store)
{
this.store = store;
Expand Down Expand Up @@ -42,36 +46,72 @@ public RavenTimeoutPersistence(IDocumentStore store)
Map = docs => from doc in docs
select new { doc.SagaId }
}, true);


TriggerCleanupEvery = TimeSpan.FromMinutes(2);
CleanupGapFromTimeslice = TimeSpan.FromMinutes(1);
}

private static IRavenQueryable<TimeoutData> GetChunkQuery(IDocumentSession session)
{
session.Advanced.AllowNonAuthoritativeInformation = true;
return session.Query<TimeoutData>("RavenTimeoutPersistence/TimeoutDataSortedByTime")
.OrderBy(t => t.Time)
.Where(t =>
t.OwningTimeoutManager == String.Empty ||
t.OwningTimeoutManager == Configure.EndpointName);
}

public IEnumerable<Tuple<string, DateTime>> GetCleanupChunk(DateTime startSlice)
{
using (var session = OpenSession())
{
var chunk = GetChunkQuery(session)
.Where(t => t.Time <= startSlice.Subtract(CleanupGapFromTimeslice))
.Select(t => new
{
t.Id,
t.Time
})
.Take(1024)
.ToList()
.Select(arg => new Tuple<string, DateTime>(arg.Id, arg.Time));

lastCleanupTime = DateTime.UtcNow;

return chunk;
}
}

public List<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateTime nextTimeToRunQuery)
{
try
{
var now = DateTime.UtcNow;
var skip = 0;
var results = new List<Tuple<string, DateTime>>();

// Allow for occasionally cleaning up old timeouts for edge cases where timeouts have been
// added after startSlice have been set to a later timout and we might have missed them
// because of stale indexes.
if (lastCleanupTime.Add(TriggerCleanupEvery) > now || lastCleanupTime == DateTime.MinValue)
{
results.AddRange(GetCleanupChunk(startSlice));
}

var skip = 0;
var numberOfRequestsExecutedSoFar = 0;
RavenQueryStatistics stats;

do
{
using (var session = OpenSession())
{
session.Advanced.AllowNonAuthoritativeInformation = true;

var query = session.Query<TimeoutData>("RavenTimeoutPersistence/TimeoutDataSortedByTime")
.Where(
t =>
t.OwningTimeoutManager == String.Empty ||
t.OwningTimeoutManager == Configure.EndpointName)
.Where(
t =>
t.Time > startSlice &&
t.Time <= now)
.OrderBy(t => t.Time)
.Select(t => new {t.Id, t.Time})
var query = GetChunkQuery(session)
.Where(
t =>
t.Time > startSlice &&
t.Time <= now)
.Select(t => new { t.Id, t.Time })
.Statistics(out stats);
do
{
Expand All @@ -87,33 +127,29 @@ public List<Tuple<string, DateTime>> GetNextChunk(DateTime startSlice, out DateT
}
} while (skip < stats.TotalResults);

using (var session = OpenSession())
// Set next execution to be now if we received stale results.
// Delay the next execution a bit if we results weren't stale and we got the full chunk.
if (stats.IsStale)
{
nextTimeToRunQuery = now;
}
else
{
session.Advanced.AllowNonAuthoritativeInformation = true;

//Retrieve next time we need to run query
var startOfNextChunk =
session.Query<TimeoutData>("RavenTimeoutPersistence/TimeoutDataSortedByTime")
.Where(
t =>
t.OwningTimeoutManager == String.Empty ||
t.OwningTimeoutManager == Configure.EndpointName)
using (var session = OpenSession())
{
var beginningOfNextChunk = GetChunkQuery(session)
.Where(t => t.Time > now)
.OrderBy(t => t.Time)
.Select(t => new {t.Id, t.Time})
.Take(1)
.Select(t => t.Time)
.FirstOrDefault();

if (startOfNextChunk != null)
{
nextTimeToRunQuery = startOfNextChunk.Time;
}
else
{
nextTimeToRunQuery = DateTime.UtcNow.AddMinutes(10);
nextTimeToRunQuery = (beginningOfNextChunk == default(DateTime))
? DateTime.UtcNow.AddMinutes(10)
: beginningOfNextChunk.ToUniversalTime();
}

return results;
}

return results;
}
catch (Exception)
{
Expand Down Expand Up @@ -151,9 +187,6 @@ public bool TryRemove(string timeoutId, out TimeoutData timeoutData)
if (timeoutData == null)
return false;

timeoutData.Time = DateTime.UtcNow.AddYears(-1);
session.SaveChanges();

session.Delete(timeoutData);
session.SaveChanges();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
<ItemGroup>
<Compile Include="RavenTimeoutPersisterTests.cs" />
<Compile Include="When_receiving_timeouts.cs" />
<Compile Include="When_pooling_timeouts.cs" />
<Compile Include="FakeMessageSender.cs" />
Expand Down
Loading

0 comments on commit 29cc04d

Please sign in to comment.