Skip to content

Commit

Permalink
fixedWindow
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillKurdyukov committed Aug 15, 2024
1 parent c0ca09e commit 5354ec1
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 39 deletions.
1 change: 1 addition & 0 deletions slo/src/Internal/Internal.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
<PackageReference Include="Polly" Version="8.4.1" />
<PackageReference Include="prometheus-net" Version="8.0.1"/>
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1"/>
<PackageReference Include="System.Threading.RateLimiting" Version="8.0.0" />
</ItemGroup>
</Project>
72 changes: 33 additions & 39 deletions slo/src/Internal/SloContext.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
using System.Diagnostics;
using System.Threading.RateLimiting;
using Internal.Cli;
using Microsoft.Extensions.Logging;
using Polly;
using Polly.RateLimit;
using Prometheus;
using Ydb.Sdk;
using Ydb.Sdk.Value;
Expand Down Expand Up @@ -130,8 +129,10 @@ public async Task Run(RunConfig runConfig)
errorsGauge.WithLabels(statusCode.ToString(), "finally").IncTo(0);
}

var writeLimiter = Policy.RateLimit(runConfig.WriteRps, TimeSpan.FromSeconds(1), 10);
var readLimiter = Policy.RateLimit(runConfig.ReadRps, TimeSpan.FromSeconds(1), 10);
var writeLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions
{ Window = TimeSpan.FromSeconds(1), PermitLimit = runConfig.WriteRps, QueueLimit = int.MaxValue });
var readLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions
{ Window = TimeSpan.FromSeconds(1), PermitLimit = runConfig.ReadRps, QueueLimit = int.MaxValue });

var cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(runConfig.ShutdownTime));
Expand All @@ -149,7 +150,7 @@ public async Task Run(RunConfig runConfig)
_logger.LogInformation("Run task is finished");
return;

Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName,
Task ShootingTask(RateLimiter rateLimitPolicy, string shootingName,
Func<T, RunConfig, Gauge?, Task<(int, StatusCode)>> action)
{
return Task.Run(async () =>
Expand All @@ -160,42 +161,35 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName,
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
tasks.Add(rateLimitPolicy.Execute(async () =>
{
// ReSharper disable once AccessToModifiedClosure
Interlocked.Increment(ref activeTasks);
var sw = Stopwatch.StartNew();
var (attempts, statusCode) = await action(client, runConfig, errorsGauge);
string label;
if (statusCode != StatusCode.Success)
{
_logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName,
statusCode);
notOkGauge.Inc();
label = "err";
}
else
{
okGauge.Inc();
label = "ok";
}
Interlocked.Decrement(ref activeTasks);
attemptsHistogram.WithLabels(label).Observe(attempts);
latencySummary.WithLabels(label).Observe(sw.ElapsedMilliseconds);
}));
}
catch (RateLimitRejectedException e)
using var lease = await rateLimitPolicy
.AcquireAsync(cancellationToken: cancellationTokenSource.Token);
tasks.Add(Task.Run(async () =>
{
_logger.LogInformation(e, "Waiting {ShootingName} task, count active tasks: {}", shootingName,
Interlocked.Read(ref activeTasks));
// ReSharper disable once AccessToModifiedClosure
Interlocked.Increment(ref activeTasks);
await Task.Delay(e.RetryAfter, cancellationTokenSource.Token);
}
var sw = Stopwatch.StartNew();
var (attempts, statusCode) = await action(client, runConfig, errorsGauge);
string label;
if (statusCode != StatusCode.Success)
{
_logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName,
statusCode);
notOkGauge.Inc();
label = "err";
}
else
{
okGauge.Inc();
label = "ok";
}
Interlocked.Decrement(ref activeTasks);
attemptsHistogram.WithLabels(label).Observe(attempts);
latencySummary.WithLabels(label).Observe(sw.ElapsedMilliseconds);
}, cancellationTokenSource.Token));
}
await Task.WhenAll(tasks);
Expand Down

0 comments on commit 5354ec1

Please sign in to comment.