From 4dbcfb43ea5c65aa0b7d52c4a4f5cfe964a074ea Mon Sep 17 00:00:00 2001 From: Chen Chen Date: Wed, 14 Aug 2024 14:58:07 -0500 Subject: [PATCH] bench: fix async backoff --- src/bench.rs | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/bench.rs b/src/bench.rs index f3df46a..642da5c 100644 --- a/src/bench.rs +++ b/src/bench.rs @@ -645,6 +645,22 @@ impl RateLimiter { Self { ops } } + /// If this returns `true`, the backoff is not done. + #[inline(always)] + fn try_backoff(&self, count: u64, start: Instant) -> bool { + if self.ops == 0 { + return false; + } + // self.kops is the target rate in kops, which is op/ms + let elapsed = u64::try_from(start.elapsed().as_nanos()).unwrap(); + let ops = count * 1_000_000_000 / elapsed; + if ops <= self.ops { + return false; + } + true + } + + /// Blocking backoff. #[inline(always)] fn backoff(&self, count: u64, start: Instant) { if self.ops == 0 { @@ -838,13 +854,11 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { } } - // try limit rate after a batch is sent - rate_limiter.backoff(*counter, start); - if bench_phase_should_break(&benchmark.len, *counter, start, &mut workload) { workload.reset(); break; } + // use a loop to make sure that pending is under qd, only drain the handle if the bench // phase is not ending loop { @@ -857,7 +871,8 @@ fn bench_worker_async(map: Arc>, context: WorkerContext) { l.async_record(r.id, submit); } } - if pending <= benchmark.qd { + // if the pending queue is under depth, and backoff is not needed + if pending <= benchmark.qd && !rate_limiter.try_backoff(*counter, start) { break; } }