diff --git a/quickwit/quickwit-common/src/tower/rate_estimator.rs b/quickwit/quickwit-common/src/tower/rate_estimator.rs index a5ee5e43b6f..151d1dde99d 100644 --- a/quickwit/quickwit-common/src/tower/rate_estimator.rs +++ b/quickwit/quickwit-common/src/tower/rate_estimator.rs @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use super::{ConstantRate, Rate}; +use super::Rate; pub trait RateEstimator: Rate { fn update(&mut self, started_at: Instant, ended_at: Instant, work: u64); @@ -40,9 +40,8 @@ struct InnerSmaRateEstimator { anchor: Instant, buckets: Box<[Bucket]>, bucket_period_millis: u64, - num_buckets: u64, - period: Duration, period_millis: u64, + num_buckets: u64, } impl SmaRateEstimator { @@ -75,7 +74,6 @@ impl SmaRateEstimator { buckets: buckets.into_boxed_slice(), bucket_period_millis: bucket_period.as_millis() as u64, num_buckets: num_buckets.get() as u64, - period, period_millis: period.as_millis() as u64, }; Self { @@ -83,14 +81,21 @@ impl SmaRateEstimator { } } - /// Seeds the rate estimator with an initial rate. - pub fn with_initial_rate(self, initial_rate: ConstantRate) -> Self { - let initial_work = initial_rate.work() * self.inner.bucket_period_millis - / initial_rate.period().as_millis() as u64; - for bucket_ord in 0..self.inner.num_buckets { - self.inner.buckets[bucket_ord as usize].increment_work(initial_work, 0); - } - self + fn work_in_bucket(&self, bucket_ord: u64) -> u64 { + self.inner.buckets[bucket_ord as usize % self.inner.buckets.len()] + .work_for_bucket(bucket_ord) + } + + fn work_at(&self, now: Instant) -> u64 { + let elapsed_ms: u64 = now.duration_since(self.inner.anchor).as_millis() as u64; + let current_bucket_ord = elapsed_ms / self.inner.bucket_period_millis; + let num_buckets = (self.inner.num_buckets - 1) as u64; + let bucket_range = current_bucket_ord.saturating_sub(num_buckets)..current_bucket_ord; + let cumulative_work: u64 = bucket_range + .map(|bucket_ord| self.work_in_bucket(bucket_ord)) + .sum(); + (cumulative_work * self.inner.period_millis) + / (self.inner.bucket_period_millis * num_buckets) } } @@ -100,69 +105,95 @@ impl Rate for SmaRateEstimator { /// This estimation is computed by summing the amount of work performed tracked in the previous /// `n-1` buckets and dividing it by the duration of the `n-1` periods. fn work(&self) -> u64 { - let now = Instant::now(); - let elapsed = now.duration_since(self.inner.anchor).as_millis() as u64; - let current_bucket_ord = elapsed / self.inner.bucket_period_millis; - let current_bucket_idx = (current_bucket_ord % self.inner.num_buckets) as usize; - let cumulative_work: u64 = self - .inner - .buckets - .iter() - .enumerate() - .filter(|(bucket_idx, _)| *bucket_idx != current_bucket_idx) - .map(|(_, bucket)| bucket.work()) - .sum(); - let num_bucket = self.inner.num_buckets - 1; - (cumulative_work * self.inner.period_millis) - / (self.inner.bucket_period_millis * num_bucket) + self.work_at(Instant::now()) } fn period(&self) -> Duration { - self.inner.period + Duration::from_millis(self.inner.period_millis) } } +#[inline] +fn compute_bucket_ord_hash(bucket_ord: u64) -> u8 { + // We pick 241 because it is the highest prime number below 256 + // that can be computed easily. + // + // The fact that it is prime makes it so that it is complemented by the + // bucket id for any value of num_buckets (well except multiples of 241) + // thanks to the chinese theorem. + (bucket_ord % 241) as u8 +} + impl RateEstimator for SmaRateEstimator { fn update(&mut self, _started_at: Instant, ended_at: Instant, work: u64) { let elapsed = ended_at.duration_since(self.inner.anchor).as_millis() as u64; let num_buckets = self.inner.num_buckets; let bucket_ord = elapsed / self.inner.bucket_period_millis; - let bucket_idx = bucket_ord % num_buckets; - let bucket_color = ((bucket_ord / num_buckets) & 1) << 63; - let bucket = &self.inner.buckets[bucket_idx as usize]; - bucket.increment_work(work, bucket_color); + let bucket = &self.inner.buckets[(bucket_ord % num_buckets) as usize]; + bucket.increment_work(work, bucket_ord); } } -/// Rate estimator bucket. The 63 least significant bits of the atomic integer store the amount of -/// work, while the most significant bit is used to indicate whether the bucket needs to be reset. -/// The reset bit is also called the "color" of a bucket in an attempt to make the code more -/// readable. After each complete pass over the buckets, the color is flipped. The color `0` -/// corresponds to the even passes, while the color `1` corresponds to the odd passes. +/// Rate estimator bucket. The 56 least significant bits of the atomic integer store the amount of +/// work, while the most significant 8 bits are encoding a well-thought hash of the bucket ord. +/// +/// The hash is used to ensure that we know exactly when to reset the bucket's work. #[derive(Debug, Default)] struct Bucket { + // This atomic is actually encoding two things: + // - low bits [0..56): the amount of work recorded in the bucket. + // - high bits [56..64): the bucket ord, or rather its last 8 bits. bits: AtomicU64, } -impl Bucket { - const COLOR_MASK: u64 = 1 << 63; +const WORK_MASK: u64 = (1u64 << 56) - 1; - const WORK_MASK: u64 = u64::MAX - Self::COLOR_MASK; +struct BucketVal { + work: u64, + bucket_ord_hash: u8, +} - fn work(&self) -> u64 { - self.bits.load(Ordering::Relaxed) & Self::WORK_MASK +impl From for BucketVal { + #[inline] + fn from(bucket_bits: u64) -> BucketVal { + BucketVal { + work: bucket_bits & WORK_MASK, + bucket_ord_hash: (bucket_bits >> 56) as u8, + } } +} - fn increment_work(&self, work: u64, expected_bucket_color: u64) { +impl From for u64 { + #[inline] + fn from(value: BucketVal) -> Self { + (value.bucket_ord_hash as u64) << 56 | value.work + } +} + +impl Bucket { + fn work_for_bucket(&self, bucket_ord: u64) -> u64 { + let bucket_val = BucketVal::from(self.bits.load(Ordering::Relaxed)); + if bucket_val.bucket_ord_hash == compute_bucket_ord_hash(bucket_ord) { + bucket_val.work + } else { + 0 + } + } + + fn increment_work(&self, work: u64, bucket_ord: u64) { + let expected_bucket_ord_hash: u8 = compute_bucket_ord_hash(bucket_ord); let current_bits = self.bits.fetch_add(work, Ordering::Relaxed) + work; - let current_bucket_color = current_bits & Self::COLOR_MASK; + let bucket_val = BucketVal::from(current_bits); - // If the current bucket color is not the expected one, we need to flip its color and reset - // the amount of work. - if current_bucket_color != expected_bucket_color { + // This is not the bucket we targetted, we need to retry and update the bucket with the new + // bucket_ord and a resetted value. + if bucket_val.bucket_ord_hash != expected_bucket_ord_hash { let mut expected_bits = current_bits; - let new_color = !current_bits & Self::COLOR_MASK; - let new_bits = new_color | work; + let new_bits: u64 = BucketVal { + work, + bucket_ord_hash: expected_bucket_ord_hash, + } + .into(); while let Err(current_bits) = self.bits.compare_exchange( expected_bits, @@ -170,7 +201,7 @@ impl Bucket { Ordering::AcqRel, Ordering::Acquire, ) { - if current_bits & Self::COLOR_MASK == new_color { + if BucketVal::from(current_bits).bucket_ord_hash == expected_bucket_ord_hash { // Some thread managed to successfully flip the color. We're good. self.bits.fetch_add(work, Ordering::Relaxed); break; @@ -192,32 +223,31 @@ mod tests { #[test] fn test_bucket() { - const RED: u64 = 0 << 63; - const BLACK: u64 = 1 << 63; - let bucket = Bucket::default(); - assert_eq!(bucket.work(), 0); + assert_eq!(bucket.work_for_bucket(0u64), 0); // First pass, the bucket is red. - bucket.increment_work(1, RED); - assert_eq!(bucket.work(), 1); + bucket.increment_work(1, 0u64); + assert_eq!(bucket.work_for_bucket(0u64), 1); + assert_eq!(bucket.work_for_bucket(1u64), 0); - bucket.increment_work(2, RED); - assert_eq!(bucket.work(), 3); + bucket.increment_work(2, 0u64); + assert_eq!(bucket.work_for_bucket(0u64), 3); // Second pass, the bucket is now black. - bucket.increment_work(5, BLACK); - assert_eq!(bucket.work(), 5); + bucket.increment_work(5, 1u64); + assert_eq!(bucket.work_for_bucket(1u64), 5); + assert_eq!(bucket.work_for_bucket(0u64), 0); - bucket.increment_work(7, BLACK); - assert_eq!(bucket.work(), 12); + bucket.increment_work(7, 1u64); + assert_eq!(bucket.work_for_bucket(1u64), 12); // Third pass, the bucket is red again. - bucket.increment_work(9, RED); - assert_eq!(bucket.work(), 9); + bucket.increment_work(9, 2u64); + assert_eq!(bucket.work_for_bucket(2u64), 9); - bucket.increment_work(11, RED); - assert_eq!(bucket.work(), 20); + bucket.increment_work(11, 2u64); + assert_eq!(bucket.work_for_bucket(2u64), 20); for num_threads in [1, 2, 3, 5, 10, 20] { let barrier = Arc::new(Barrier::new(num_threads)); @@ -234,13 +264,13 @@ mod tests { barrier.wait(); // First time we increment the work in this second pass. All the threads will // attempt to flip the bucket's color. Only one should succeed. - bucket.increment_work(i as u64, BLACK); + bucket.increment_work(i as u64, 3u64); })); } for handle in handles { handle.join().unwrap(); } - assert_eq!(bucket.work(), cumulative_work); + assert_eq!(bucket.work_for_bucket(3u64), cumulative_work); } } @@ -259,36 +289,72 @@ mod tests { let started_at = anchor; let ended_at = started_at + Duration::from_millis(0); estimator.update(started_at, ended_at, 100); - assert_eq!(estimator.inner.buckets[0].work(), 100); + assert_eq!(estimator.inner.buckets[0].work_for_bucket(0), 100); let ended_at = started_at + Duration::from_millis(999); estimator.update(started_at, ended_at, 200); - assert_eq!(estimator.inner.buckets[0].work(), 300); + assert_eq!(estimator.inner.buckets[0].work_for_bucket(0), 300); - assert_eq!(estimator.work(), 0); + assert_eq!(estimator.work_at(anchor), 0); let ended_at = started_at + Duration::from_millis(1_000); estimator.update(started_at, ended_at, 300); - assert_eq!(estimator.inner.buckets[1].work(), 300); + assert_eq!(estimator.inner.buckets[1].work_for_bucket(1), 300); let ended_at = started_at + Duration::from_millis(1_999); estimator.update(started_at, ended_at, 600); - assert_eq!(estimator.inner.buckets[1].work(), 900); + assert_eq!(estimator.inner.buckets[1].work_for_bucket(1), 900); - assert_eq!(estimator.work(), 45); + assert_eq!( + estimator.work_at(anchor + Duration::from_secs(2)), + (300 + 900) / 20 + ); let ended_at = started_at + Duration::from_millis(2_000); estimator.update(started_at, ended_at, 800); - assert_eq!(estimator.inner.buckets[2].work(), 800); + assert_eq!(estimator.inner.buckets[2].work_for_bucket(2), 800); let ended_at = started_at + Duration::from_millis(2_999); estimator.update(started_at, ended_at, 1_000); - assert_eq!(estimator.inner.buckets[2].work(), 1_800); + assert_eq!(estimator.inner.buckets[2].work_for_bucket(2), 1_800); - assert_eq!(estimator.work(), 135); + assert_eq!(estimator.work_at(anchor + Duration::from_secs(3)), 135); let ended_at = started_at + Duration::from_millis(3_000); estimator.update(started_at, ended_at, 500); - assert_eq!(estimator.inner.buckets[0].work(), 500); + assert_eq!(estimator.inner.buckets[0].work_for_bucket(0), 0); + assert_eq!(estimator.inner.buckets[0].work_for_bucket(3), 500); + } + + #[test] + fn test_sma_rate_skipped_bucket() { + let num_buckets = NonZeroUsize::new(10).unwrap(); + let bucket_period = Duration::from_secs(1); + let period = Duration::from_secs(1); + + let mut estimator = SmaRateEstimator::new(num_buckets, bucket_period, period); + + assert_eq!(estimator.work(), 0); + + let anchor = estimator.inner.anchor; + + // We fill all of the bucket with 100 work. + for i in 0..10 { + let ended_at = anchor + Duration::from_secs(1) * i; + estimator.update(ended_at, ended_at, 100); + } + + assert_eq!(estimator.work_at(anchor + Duration::from_secs(10)), 100); + + // Now let's assume there isn't any work ongoing for 4s. + // Over the last 9 seconds, we have received 500 works + // + // After the reset, we should have the following buckets: + // We expect a mean of 44 work/s. + // |0, 0, 0, 0, 0, 100*, 100, 100, 100, 100| + // + // Since the current bucket (idx = 5) is not taken into account, this leads + // to an average of 500 / 9 = 55 work units. + assert_eq!(estimator.work_at(anchor + Duration::from_secs(15)), 44); } } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 7d3b57defac..2a9a68b47be 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -261,13 +261,11 @@ async fn start_ingest_client_if_needed( ) .await?; let num_buckets = NonZeroUsize::new(60).expect("60 should be non-zero"); - let initial_rate = ConstantRate::new(ByteSize::mib(50).as_u64(), Duration::from_secs(1)); let rate_estimator = SmaRateEstimator::new( num_buckets, Duration::from_secs(10), Duration::from_millis(100), - ) - .with_initial_rate(initial_rate); + ); let memory_capacity = ingest_api_service.ask(GetMemoryCapacity).await?; let min_rate = ConstantRate::new(ByteSize::mib(1).as_u64(), Duration::from_millis(100)); let rate_modulator = RateModulator::new(rate_estimator.clone(), memory_capacity, min_rate);