From 5354ec17f2dc967b46cfdfa4eedf8e60d0049a96 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 15 Aug 2024 11:26:42 +0300 Subject: [PATCH] fixedWindow --- slo/src/Internal/Internal.csproj | 1 + slo/src/Internal/SloContext.cs | 72 +++++++++++++++----------------- 2 files changed, 34 insertions(+), 39 deletions(-) diff --git a/slo/src/Internal/Internal.csproj b/slo/src/Internal/Internal.csproj index fec702c1..831fad78 100644 --- a/slo/src/Internal/Internal.csproj +++ b/slo/src/Internal/Internal.csproj @@ -17,5 +17,6 @@ + diff --git a/slo/src/Internal/SloContext.cs b/slo/src/Internal/SloContext.cs index 10db55a7..7766eaa1 100644 --- a/slo/src/Internal/SloContext.cs +++ b/slo/src/Internal/SloContext.cs @@ -1,8 +1,7 @@ using System.Diagnostics; +using System.Threading.RateLimiting; using Internal.Cli; using Microsoft.Extensions.Logging; -using Polly; -using Polly.RateLimit; using Prometheus; using Ydb.Sdk; using Ydb.Sdk.Value; @@ -130,8 +129,10 @@ public async Task Run(RunConfig runConfig) errorsGauge.WithLabels(statusCode.ToString(), "finally").IncTo(0); } - var writeLimiter = Policy.RateLimit(runConfig.WriteRps, TimeSpan.FromSeconds(1), 10); - var readLimiter = Policy.RateLimit(runConfig.ReadRps, TimeSpan.FromSeconds(1), 10); + var writeLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions + { Window = TimeSpan.FromSeconds(1), PermitLimit = runConfig.WriteRps, QueueLimit = int.MaxValue }); + var readLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions + { Window = TimeSpan.FromSeconds(1), PermitLimit = runConfig.ReadRps, QueueLimit = int.MaxValue }); var cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(runConfig.ShutdownTime)); @@ -149,7 +150,7 @@ public async Task Run(RunConfig runConfig) _logger.LogInformation("Run task is finished"); return; - Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, + Task ShootingTask(RateLimiter rateLimitPolicy, string shootingName, Func> action) { return Task.Run(async () => @@ -160,42 +161,35 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName, while (!cancellationTokenSource.Token.IsCancellationRequested) { - try - { - tasks.Add(rateLimitPolicy.Execute(async () => - { - // ReSharper disable once AccessToModifiedClosure - Interlocked.Increment(ref activeTasks); - - var sw = Stopwatch.StartNew(); - var (attempts, statusCode) = await action(client, runConfig, errorsGauge); - string label; - - if (statusCode != StatusCode.Success) - { - _logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName, - statusCode); - notOkGauge.Inc(); - label = "err"; - } - else - { - okGauge.Inc(); - label = "ok"; - } - - Interlocked.Decrement(ref activeTasks); - attemptsHistogram.WithLabels(label).Observe(attempts); - latencySummary.WithLabels(label).Observe(sw.ElapsedMilliseconds); - })); - } - catch (RateLimitRejectedException e) + using var lease = await rateLimitPolicy + .AcquireAsync(cancellationToken: cancellationTokenSource.Token); + + tasks.Add(Task.Run(async () => { - _logger.LogInformation(e, "Waiting {ShootingName} task, count active tasks: {}", shootingName, - Interlocked.Read(ref activeTasks)); + // ReSharper disable once AccessToModifiedClosure + Interlocked.Increment(ref activeTasks); - await Task.Delay(e.RetryAfter, cancellationTokenSource.Token); - } + var sw = Stopwatch.StartNew(); + var (attempts, statusCode) = await action(client, runConfig, errorsGauge); + string label; + + if (statusCode != StatusCode.Success) + { + _logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName, + statusCode); + notOkGauge.Inc(); + label = "err"; + } + else + { + okGauge.Inc(); + label = "ok"; + } + + Interlocked.Decrement(ref activeTasks); + attemptsHistogram.WithLabels(label).Observe(attempts); + latencySummary.WithLabels(label).Observe(sw.ElapsedMilliseconds); + }, cancellationTokenSource.Token)); } await Task.WhenAll(tasks);