diff --git a/quickwit/quickwit-cli/src/index/mod.rs b/quickwit/quickwit-cli/src/index/mod.rs index 5102119bbe3..d3551918122 100644 --- a/quickwit/quickwit-cli/src/index/mod.rs +++ b/quickwit/quickwit-cli/src/index/mod.rs @@ -21,6 +21,7 @@ use std::borrow::Cow; use std::collections::VecDeque; use std::fmt::Display; use std::io::{stdout, Stdout, Write}; +use std::num::NonZeroUsize; use std::ops::Div; use std::path::PathBuf; use std::str::FromStr; @@ -36,6 +37,7 @@ use indicatif::{ProgressBar, ProgressStyle}; use itertools::Itertools; use numfmt::{Formatter, Scales}; use quickwit_actors::ActorHandle; +use quickwit_common::tower::{Rate, RateEstimator, SmaRateEstimator}; use quickwit_common::uri::Uri; use quickwit_config::{ConfigFormat, IndexConfig}; use quickwit_indexing::models::IndexingStatistics; @@ -932,6 +934,11 @@ impl Tabled for Quantiles { pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> { debug!(args=?args, "ingest-docs"); + let mut rate_estimator = SmaRateEstimator::new( + NonZeroUsize::new(8).unwrap(), + Duration::from_millis(250), + Duration::from_secs(1), + ); if let Some(input_path) = &args.input_path_opt { println!("❯ Ingesting documents from {}.", input_path.display()); } else { @@ -947,13 +954,17 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> { progress_bar.enable_steady_tick(Duration::from_millis(100)); progress_bar.set_style(progress_bar_style()); progress_bar.set_message("0MiB/s"); - let update_progress_bar = |ingest_event: IngestEvent| { + // It is not used by the rate estimator anyway. + let useless_start_time = Instant::now(); + let mut update_progress_bar = |ingest_event: IngestEvent| { match ingest_event { - IngestEvent::IngestedDocBatch(num_bytes) => progress_bar.inc(num_bytes as u64), + IngestEvent::IngestedDocBatch(num_bytes) => { + rate_estimator.update(useless_start_time, Instant::now(), num_bytes as u64); + progress_bar.inc(num_bytes as u64) + } IngestEvent::Sleep => {} // To }; - let throughput = - progress_bar.position() as f64 / progress_bar.elapsed().as_secs_f64() / 1024.0 / 1024.0; + let throughput = rate_estimator.work() as f64 / (1024 * 1024) as f64; progress_bar.set_message(format!("{throughput:.1} MiB/s")); }; @@ -970,7 +981,7 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> { &args.index_id, ingest_source, batch_size_limit_opt, - Some(&update_progress_bar), + Some(&mut update_progress_bar), args.commit_type, ) .await?; diff --git a/quickwit/quickwit-common/src/tower/rate_estimator.rs b/quickwit/quickwit-common/src/tower/rate_estimator.rs index 589e5237704..22ad5a671cf 100644 --- a/quickwit/quickwit-common/src/tower/rate_estimator.rs +++ b/quickwit/quickwit-common/src/tower/rate_estimator.rs @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use super::{ConstantRate, Rate}; +use super::Rate; pub trait RateEstimator: Rate { fn update(&mut self, started_at: Instant, ended_at: Instant, work: u64); @@ -39,16 +39,22 @@ pub struct SmaRateEstimator { struct InnerSmaRateEstimator { anchor: Instant, buckets: Box<[Bucket]>, - bucket_period_secs: u64, bucket_period_millis: u64, - num_buckets: u64, - period: Duration, period_millis: u64, + num_buckets: u64, } impl SmaRateEstimator { /// Creates a new simple moving average rate estimator. /// + /// The rate returned is the rate measured over the last `n - 1` buckets. The + /// ongoing bucket is not taken in account. + /// In other words, we are returning a rolling average that spans over a period + /// of `num_buckets * bucket_period`. + /// + /// The `period` argument is just a `scaling unit`. A period of 1s means that the + /// the number returned by `work` is expressed in `bytes / second`. + /// /// This rate estimator is bucket-based and outputs the average rate of work over the previous /// closed `n-1` buckets. /// @@ -56,7 +62,7 @@ impl SmaRateEstimator { /// /// This function panics if `bucket_period` is < 1s or `period` is < 1ms. pub fn new(num_buckets: NonZeroUsize, bucket_period: Duration, period: Duration) -> Self { - assert!(bucket_period.as_secs() > 0); + assert!(bucket_period.as_millis() >= 100); assert!(period.as_millis() > 0); let mut buckets = Vec::with_capacity(num_buckets.get()); @@ -66,10 +72,8 @@ impl SmaRateEstimator { let inner = InnerSmaRateEstimator { anchor: Instant::now(), buckets: buckets.into_boxed_slice(), - bucket_period_secs: bucket_period.as_secs(), bucket_period_millis: bucket_period.as_millis() as u64, num_buckets: num_buckets.get() as u64, - period, period_millis: period.as_millis() as u64, }; Self { @@ -77,14 +81,21 @@ impl SmaRateEstimator { } } - /// Seeds the rate estimator with an initial rate. - pub fn with_initial_rate(self, initial_rate: ConstantRate) -> Self { - let initial_work = initial_rate.work() * self.inner.bucket_period_millis - / initial_rate.period().as_millis() as u64; - for bucket_ord in 0..self.inner.num_buckets { - self.inner.buckets[bucket_ord as usize].increment_work(initial_work, 0); - } - self + fn work_in_bucket(&self, bucket_ord: u64) -> u64 { + self.inner.buckets[bucket_ord as usize % self.inner.buckets.len()] + .work_for_bucket(bucket_ord) + } + + fn work_at(&self, now: Instant) -> u64 { + let elapsed_ms: u64 = now.duration_since(self.inner.anchor).as_millis() as u64; + let current_bucket_ord = elapsed_ms / self.inner.bucket_period_millis; + let num_buckets = self.inner.num_buckets - 1u64; + let bucket_range = current_bucket_ord.saturating_sub(num_buckets)..current_bucket_ord; + let cumulative_work: u64 = bucket_range + .map(|bucket_ord| self.work_in_bucket(bucket_ord)) + .sum(); + (cumulative_work * self.inner.period_millis) + / (self.inner.bucket_period_millis * num_buckets) } } @@ -94,68 +105,95 @@ impl Rate for SmaRateEstimator { /// This estimation is computed by summing the amount of work performed tracked in the previous /// `n-1` buckets and dividing it by the duration of the `n-1` periods. fn work(&self) -> u64 { - let now = Instant::now(); - let elapsed = now.duration_since(self.inner.anchor).as_secs(); - let current_bucket_ord = elapsed / self.inner.bucket_period_secs; - let current_bucket_idx = (current_bucket_ord % self.inner.num_buckets) as usize; - let cumulative_work: u64 = self - .inner - .buckets - .iter() - .enumerate() - .filter(|(bucket_idx, _)| *bucket_idx != current_bucket_idx) - .map(|(_, bucket)| bucket.work()) - .sum(); - let num_bucket = self.inner.num_buckets - 1; - cumulative_work * self.inner.period_millis / self.inner.bucket_period_millis / num_bucket + self.work_at(Instant::now()) } fn period(&self) -> Duration { - self.inner.period + Duration::from_millis(self.inner.period_millis) } } +#[inline] +fn compute_bucket_ord_hash(bucket_ord: u64) -> u8 { + // We pick 241 because it is the highest prime number below 256 + // that can be computed easily. + // + // The fact that it is prime makes it so that it is complemented by the + // bucket id for any value of num_buckets (well except multiples of 241) + // thanks to the chinese theorem. + (bucket_ord % 241) as u8 +} + impl RateEstimator for SmaRateEstimator { fn update(&mut self, _started_at: Instant, ended_at: Instant, work: u64) { - let elapsed = ended_at.duration_since(self.inner.anchor).as_secs(); + let elapsed = ended_at.duration_since(self.inner.anchor).as_millis() as u64; let num_buckets = self.inner.num_buckets; - let bucket_ord = elapsed / self.inner.bucket_period_secs; - let bucket_idx = bucket_ord % num_buckets; - let bucket_color = ((bucket_ord / num_buckets) & 1) << 63; - let bucket = &self.inner.buckets[bucket_idx as usize]; - bucket.increment_work(work, bucket_color); + let bucket_ord = elapsed / self.inner.bucket_period_millis; + let bucket = &self.inner.buckets[(bucket_ord % num_buckets) as usize]; + bucket.increment_work(work, bucket_ord); } } -/// Rate estimator bucket. The 63 least significant bits of the atomic integer store the amount of -/// work, while the most significant bit is used to indicate whether the bucket needs to be reset. -/// The reset bit is also called the "color" of a bucket in an attempt to make the code more -/// readable. After each complete pass over the buckets, the color is flipped. The color `0` -/// corresponds to the even passes, while the color `1` corresponds to the odd passes. +/// Rate estimator bucket. The 56 least significant bits of the atomic integer store the amount of +/// work, while the most significant 8 bits are encoding a well-thought hash of the bucket ord. +/// +/// The hash is used to ensure that we know exactly when to reset the bucket's work. #[derive(Debug, Default)] struct Bucket { + // This atomic is actually encoding two things: + // - low bits [0..56): the amount of work recorded in the bucket. + // - high bits [56..64): the bucket ord, or rather its last 8 bits. bits: AtomicU64, } -impl Bucket { - const COLOR_MASK: u64 = 1 << 63; +const WORK_MASK: u64 = (1u64 << 56) - 1; - const WORK_MASK: u64 = u64::MAX - Self::COLOR_MASK; +struct BucketVal { + work: u64, + bucket_ord_hash: u8, +} - fn work(&self) -> u64 { - self.bits.load(Ordering::Relaxed) & Self::WORK_MASK +impl From for BucketVal { + #[inline] + fn from(bucket_bits: u64) -> BucketVal { + BucketVal { + work: bucket_bits & WORK_MASK, + bucket_ord_hash: (bucket_bits >> 56) as u8, + } } +} - fn increment_work(&self, work: u64, expected_bucket_color: u64) { +impl From for u64 { + #[inline] + fn from(value: BucketVal) -> Self { + (value.bucket_ord_hash as u64) << 56 | value.work + } +} + +impl Bucket { + fn work_for_bucket(&self, bucket_ord: u64) -> u64 { + let bucket_val = BucketVal::from(self.bits.load(Ordering::Relaxed)); + if bucket_val.bucket_ord_hash == compute_bucket_ord_hash(bucket_ord) { + bucket_val.work + } else { + 0 + } + } + + fn increment_work(&self, work: u64, bucket_ord: u64) { + let expected_bucket_ord_hash: u8 = compute_bucket_ord_hash(bucket_ord); let current_bits = self.bits.fetch_add(work, Ordering::Relaxed) + work; - let current_bucket_color = current_bits & Self::COLOR_MASK; + let bucket_val = BucketVal::from(current_bits); - // If the current bucket color is not the expected one, we need to flip its color and reset - // the amount of work. - if current_bucket_color != expected_bucket_color { + // This is not the bucket we targetted, we need to retry and update the bucket with the new + // bucket_ord and a resetted value. + if bucket_val.bucket_ord_hash != expected_bucket_ord_hash { let mut expected_bits = current_bits; - let new_color = !current_bits & Self::COLOR_MASK; - let new_bits = new_color | work; + let new_bits: u64 = BucketVal { + work, + bucket_ord_hash: expected_bucket_ord_hash, + } + .into(); while let Err(current_bits) = self.bits.compare_exchange( expected_bits, @@ -163,7 +201,7 @@ impl Bucket { Ordering::AcqRel, Ordering::Acquire, ) { - if current_bits & Self::COLOR_MASK == new_color { + if BucketVal::from(current_bits).bucket_ord_hash == expected_bucket_ord_hash { // Some thread managed to successfully flip the color. We're good. self.bits.fetch_add(work, Ordering::Relaxed); break; @@ -185,32 +223,31 @@ mod tests { #[test] fn test_bucket() { - const RED: u64 = 0 << 63; - const BLACK: u64 = 1 << 63; - let bucket = Bucket::default(); - assert_eq!(bucket.work(), 0); + assert_eq!(bucket.work_for_bucket(0u64), 0); // First pass, the bucket is red. - bucket.increment_work(1, RED); - assert_eq!(bucket.work(), 1); + bucket.increment_work(1, 0u64); + assert_eq!(bucket.work_for_bucket(0u64), 1); + assert_eq!(bucket.work_for_bucket(1u64), 0); - bucket.increment_work(2, RED); - assert_eq!(bucket.work(), 3); + bucket.increment_work(2, 0u64); + assert_eq!(bucket.work_for_bucket(0u64), 3); // Second pass, the bucket is now black. - bucket.increment_work(5, BLACK); - assert_eq!(bucket.work(), 5); + bucket.increment_work(5, 1u64); + assert_eq!(bucket.work_for_bucket(1u64), 5); + assert_eq!(bucket.work_for_bucket(0u64), 0); - bucket.increment_work(7, BLACK); - assert_eq!(bucket.work(), 12); + bucket.increment_work(7, 1u64); + assert_eq!(bucket.work_for_bucket(1u64), 12); // Third pass, the bucket is red again. - bucket.increment_work(9, RED); - assert_eq!(bucket.work(), 9); + bucket.increment_work(9, 2u64); + assert_eq!(bucket.work_for_bucket(2u64), 9); - bucket.increment_work(11, RED); - assert_eq!(bucket.work(), 20); + bucket.increment_work(11, 2u64); + assert_eq!(bucket.work_for_bucket(2u64), 20); for num_threads in [1, 2, 3, 5, 10, 20] { let barrier = Arc::new(Barrier::new(num_threads)); @@ -227,13 +264,13 @@ mod tests { barrier.wait(); // First time we increment the work in this second pass. All the threads will // attempt to flip the bucket's color. Only one should succeed. - bucket.increment_work(i as u64, BLACK); + bucket.increment_work(i as u64, 3u64); })); } for handle in handles { handle.join().unwrap(); } - assert_eq!(bucket.work(), cumulative_work); + assert_eq!(bucket.work_for_bucket(3u64), cumulative_work); } } @@ -252,36 +289,72 @@ mod tests { let started_at = anchor; let ended_at = started_at + Duration::from_millis(0); estimator.update(started_at, ended_at, 100); - assert_eq!(estimator.inner.buckets[0].work(), 100); + assert_eq!(estimator.inner.buckets[0].work_for_bucket(0), 100); let ended_at = started_at + Duration::from_millis(999); estimator.update(started_at, ended_at, 200); - assert_eq!(estimator.inner.buckets[0].work(), 300); + assert_eq!(estimator.inner.buckets[0].work_for_bucket(0), 300); - assert_eq!(estimator.work(), 0); + assert_eq!(estimator.work_at(anchor), 0); let ended_at = started_at + Duration::from_millis(1_000); estimator.update(started_at, ended_at, 300); - assert_eq!(estimator.inner.buckets[1].work(), 300); + assert_eq!(estimator.inner.buckets[1].work_for_bucket(1), 300); let ended_at = started_at + Duration::from_millis(1_999); estimator.update(started_at, ended_at, 600); - assert_eq!(estimator.inner.buckets[1].work(), 900); + assert_eq!(estimator.inner.buckets[1].work_for_bucket(1), 900); - assert_eq!(estimator.work(), 45); + assert_eq!( + estimator.work_at(anchor + Duration::from_secs(2)), + (300 + 900) / 20 + ); let ended_at = started_at + Duration::from_millis(2_000); estimator.update(started_at, ended_at, 800); - assert_eq!(estimator.inner.buckets[2].work(), 800); + assert_eq!(estimator.inner.buckets[2].work_for_bucket(2), 800); let ended_at = started_at + Duration::from_millis(2_999); estimator.update(started_at, ended_at, 1_000); - assert_eq!(estimator.inner.buckets[2].work(), 1_800); + assert_eq!(estimator.inner.buckets[2].work_for_bucket(2), 1_800); - assert_eq!(estimator.work(), 135); + assert_eq!(estimator.work_at(anchor + Duration::from_secs(3)), 135); let ended_at = started_at + Duration::from_millis(3_000); estimator.update(started_at, ended_at, 500); - assert_eq!(estimator.inner.buckets[0].work(), 500); + assert_eq!(estimator.inner.buckets[0].work_for_bucket(0), 0); + assert_eq!(estimator.inner.buckets[0].work_for_bucket(3), 500); + } + + #[test] + fn test_sma_rate_skipped_bucket() { + let num_buckets = NonZeroUsize::new(10).unwrap(); + let bucket_period = Duration::from_secs(1); + let period = Duration::from_secs(1); + + let mut estimator = SmaRateEstimator::new(num_buckets, bucket_period, period); + + assert_eq!(estimator.work(), 0); + + let anchor = estimator.inner.anchor; + + // We fill all of the bucket with 100 work. + for i in 0..10 { + let ended_at = anchor + Duration::from_secs(1) * i; + estimator.update(ended_at, ended_at, 100); + } + + assert_eq!(estimator.work_at(anchor + Duration::from_secs(10)), 100); + + // Now let's assume there isn't any work ongoing for 4s. + // Over the last 9 seconds, we have received 500 works + // + // After the reset, we should have the following buckets: + // We expect a mean of 44 work/s. + // |0, 0, 0, 0, 0, 100*, 100, 100, 100, 100| + // + // Since the current bucket (idx = 5) is not taken into account, this leads + // to an average of 400 / 9 = 44 work units. + assert_eq!(estimator.work_at(anchor + Duration::from_secs(15)), 44); } } diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index e90267482a8..fe2f796a38c 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -259,7 +259,7 @@ impl QuickwitClient { index_id: &str, ingest_source: IngestSource, batch_size_limit_opt: Option, - on_ingest_event: Option<&(dyn Fn(IngestEvent) + Sync)>, + mut on_ingest_event: Option<&mut (dyn FnMut(IngestEvent) + Sync)>, last_block_commit: CommitType, ) -> Result<(), Error> { let ingest_path = if self.ingest_v2 { @@ -297,16 +297,16 @@ impl QuickwitClient { ) .await?; if response.status_code() == StatusCode::TOO_MANY_REQUESTS { - if let Some(event_fn) = &on_ingest_event { + if let Some(event_fn) = &mut on_ingest_event { event_fn(IngestEvent::Sleep) } - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(500)).await; } else { response.check().await?; break; } } - if let Some(event_fn) = on_ingest_event { + if let Some(event_fn) = &mut on_ingest_event { event_fn(IngestEvent::IngestedDocBatch(batch.len())) } } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index ff0d39bebf2..5e92984328c 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -270,13 +270,11 @@ async fn start_ingest_client_if_needed( ) .await?; let num_buckets = NonZeroUsize::new(60).expect("60 should be non-zero"); - let initial_rate = ConstantRate::new(ByteSize::mib(50).as_u64(), Duration::from_secs(1)); let rate_estimator = SmaRateEstimator::new( num_buckets, Duration::from_secs(10), Duration::from_millis(100), - ) - .with_initial_rate(initial_rate); + ); let memory_capacity = ingest_api_service.ask(GetMemoryCapacity).await?; let min_rate = ConstantRate::new(ByteSize::mib(1).as_u64(), Duration::from_millis(100)); let rate_modulator = RateModulator::new(rate_estimator.clone(), memory_capacity, min_rate);