Skip to content

Commit

Permalink
Fix the ingest rate displayed in the CLI.
Browse files Browse the repository at this point in the history
The CLI was showing the average rate since the CLI was started.

This PR changes it to an estimation over the last 2 seconds.
  • Loading branch information
fulmicoton committed Mar 6, 2024
1 parent 6a79dd9 commit 4493938
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 16 deletions.
17 changes: 12 additions & 5 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::borrow::Cow;
use std::collections::VecDeque;
use std::fmt::Display;
use std::io::{stdout, Stdout, Write};
use std::num::NonZeroUsize;
use std::ops::Div;
use std::path::PathBuf;
use std::str::FromStr;
Expand All @@ -36,6 +37,7 @@ use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use numfmt::{Formatter, Scales};
use quickwit_actors::ActorHandle;
use quickwit_common::tower::{RateEstimator, SmaRateEstimator, Rate};
use quickwit_common::uri::Uri;
use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
Expand Down Expand Up @@ -923,6 +925,7 @@ impl Tabled for Quantiles {

pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
debug!(args=?args, "ingest-docs");
let mut rate_estimator = SmaRateEstimator::new(NonZeroUsize::new(8).unwrap(), Duration::from_millis(250), Duration::from_secs(1));
if let Some(input_path) = &args.input_path_opt {
println!("❯ Ingesting documents from {}.", input_path.display());
} else {
Expand All @@ -938,13 +941,17 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
progress_bar.enable_steady_tick(Duration::from_millis(100));
progress_bar.set_style(progress_bar_style());
progress_bar.set_message("0MiB/s");
let update_progress_bar = |ingest_event: IngestEvent| {
// It is not used by the rate estimator anyway.
let useless_start_time = Instant::now();
let mut update_progress_bar = |ingest_event: IngestEvent| {
match ingest_event {
IngestEvent::IngestedDocBatch(num_bytes) => progress_bar.inc(num_bytes as u64),
IngestEvent::IngestedDocBatch(num_bytes) => {
rate_estimator.update(useless_start_time, Instant::now(), num_bytes as u64);
progress_bar.inc(num_bytes as u64)
},
IngestEvent::Sleep => {} // To
};
let throughput =
progress_bar.position() as f64 / progress_bar.elapsed().as_secs_f64() / 1024.0 / 1024.0;
let throughput = rate_estimator.work() as f64 / (1024 * 1024) as f64;
progress_bar.set_message(format!("{throughput:.1} MiB/s"));
};

Expand All @@ -961,7 +968,7 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
&args.index_id,
ingest_source,
batch_size_limit_opt,
Some(&update_progress_bar),
Some(&mut update_progress_bar),
args.commit_type,
)
.await?;
Expand Down
23 changes: 15 additions & 8 deletions quickwit/quickwit-common/src/tower/rate_estimator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct SmaRateEstimator {
struct InnerSmaRateEstimator {
anchor: Instant,
buckets: Box<[Bucket]>,
bucket_period_secs: u64,
// bucket_period_secs: u64,
bucket_period_millis: u64,
num_buckets: u64,
period: Duration,
Expand All @@ -49,14 +49,22 @@ struct InnerSmaRateEstimator {
impl SmaRateEstimator {
/// Creates a new simple moving average rate estimator.
///
/// The rate returned is the rate measured over the last `n - 1` buckets. The
/// ongoing bucket is not taken in account.
/// In other words, we are returning a rolling average that spans over a period
/// of `num_buckets * bucket_period`.
///
/// The `period` argument is just a `scaling unit`. A period of 1s means that the
/// the number returned by `work` is expressed in `bytes / second`.
///
/// This rate estimator is bucket-based and outputs the average rate of work over the previous
/// closed `n-1` buckets.
///
/// # Panics
///
/// This function panics if `bucket_period` is < 1s or `period` is < 1ms.
pub fn new(num_buckets: NonZeroUsize, bucket_period: Duration, period: Duration) -> Self {
assert!(bucket_period.as_secs() > 0);
assert!(bucket_period.as_millis() >= 100);
assert!(period.as_millis() > 0);

let mut buckets = Vec::with_capacity(num_buckets.get());
Expand All @@ -66,7 +74,6 @@ impl SmaRateEstimator {
let inner = InnerSmaRateEstimator {
anchor: Instant::now(),
buckets: buckets.into_boxed_slice(),
bucket_period_secs: bucket_period.as_secs(),
bucket_period_millis: bucket_period.as_millis() as u64,
num_buckets: num_buckets.get() as u64,
period,
Expand Down Expand Up @@ -95,8 +102,8 @@ impl Rate for SmaRateEstimator {
/// `n-1` buckets and dividing it by the duration of the `n-1` periods.
fn work(&self) -> u64 {
let now = Instant::now();
let elapsed = now.duration_since(self.inner.anchor).as_secs();
let current_bucket_ord = elapsed / self.inner.bucket_period_secs;
let elapsed = now.duration_since(self.inner.anchor).as_millis() as u64;
let current_bucket_ord = elapsed / self.inner.bucket_period_millis;
let current_bucket_idx = (current_bucket_ord % self.inner.num_buckets) as usize;
let cumulative_work: u64 = self
.inner
Expand All @@ -107,7 +114,7 @@ impl Rate for SmaRateEstimator {
.map(|(_, bucket)| bucket.work())
.sum();
let num_bucket = self.inner.num_buckets - 1;
cumulative_work * self.inner.period_millis / self.inner.bucket_period_millis / num_bucket
(cumulative_work * self.inner.period_millis) / (self.inner.bucket_period_millis * num_bucket)
}

fn period(&self) -> Duration {
Expand All @@ -117,9 +124,9 @@ impl Rate for SmaRateEstimator {

impl RateEstimator for SmaRateEstimator {
fn update(&mut self, _started_at: Instant, ended_at: Instant, work: u64) {
let elapsed = ended_at.duration_since(self.inner.anchor).as_secs();
let elapsed = ended_at.duration_since(self.inner.anchor).as_millis() as u64;
let num_buckets = self.inner.num_buckets;
let bucket_ord = elapsed / self.inner.bucket_period_secs;
let bucket_ord = elapsed / self.inner.bucket_period_millis;
let bucket_idx = bucket_ord % num_buckets;
let bucket_color = ((bucket_ord / num_buckets) & 1) << 63;
let bucket = &self.inner.buckets[bucket_idx as usize];
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl QuickwitClient {
index_id: &str,
ingest_source: IngestSource,
batch_size_limit_opt: Option<usize>,
on_ingest_event: Option<&(dyn Fn(IngestEvent) + Sync)>,
mut on_ingest_event: Option<&mut (dyn FnMut(IngestEvent) + Sync)>,
last_block_commit: CommitType,
) -> Result<(), Error> {
let ingest_path = if self.ingest_v2 {
Expand Down Expand Up @@ -295,7 +295,7 @@ impl QuickwitClient {
)
.await?;
if response.status_code() == StatusCode::TOO_MANY_REQUESTS {
if let Some(event_fn) = &on_ingest_event {
if let Some(event_fn) = &mut on_ingest_event {
event_fn(IngestEvent::Sleep)
}
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -304,7 +304,7 @@ impl QuickwitClient {
break;
}
}
if let Some(event_fn) = on_ingest_event {
if let Some(event_fn) = &mut on_ingest_event {
event_fn(IngestEvent::IngestedDocBatch(batch.len()))
}
}
Expand Down

0 comments on commit 4493938

Please sign in to comment.