Skip to content

Commit

Permalink
Fix the ingest rate displayed in the CLI. (#4682)
Browse files Browse the repository at this point in the history
* Fix the ingest rate displayed in the CLI.

The CLI was showing the average rate since the CLI was started.

This PR changes it to an estimation over the last 2 seconds.

* Bugfix in the rate estimator

The rate estimator was returning inflated results whenever
we did not record any work during a bucket period.

The fix does two things:
- It replace the 1-bit color logic `(bucket_ord / num_buckets) % 2`` to a more generic and more accurate 8 bit hash `bucket_ord % 243`.

Collision may only happen if no write targetted this specific bucket for `num_buckets x 243 x bucket_period`.

Upon read, it checks for the hash and dismisses buckets that are not matching the hash.

* Update quickwit/quickwit-common/src/tower/rate_estimator.rs

Co-authored-by: trinity-1686a <[email protected]>

---------

Co-authored-by: trinity-1686a <[email protected]>
  • Loading branch information
fulmicoton and trinity-1686a authored May 10, 2024
1 parent 894188f commit fab785c
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 94 deletions.
21 changes: 16 additions & 5 deletions quickwit/quickwit-cli/src/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::borrow::Cow;
use std::collections::VecDeque;
use std::fmt::Display;
use std::io::{stdout, Stdout, Write};
use std::num::NonZeroUsize;
use std::ops::Div;
use std::path::PathBuf;
use std::str::FromStr;
Expand All @@ -36,6 +37,7 @@ use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use numfmt::{Formatter, Scales};
use quickwit_actors::ActorHandle;
use quickwit_common::tower::{Rate, RateEstimator, SmaRateEstimator};
use quickwit_common::uri::Uri;
use quickwit_config::{ConfigFormat, IndexConfig};
use quickwit_indexing::models::IndexingStatistics;
Expand Down Expand Up @@ -932,6 +934,11 @@ impl Tabled for Quantiles {

pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
debug!(args=?args, "ingest-docs");
let mut rate_estimator = SmaRateEstimator::new(
NonZeroUsize::new(8).unwrap(),
Duration::from_millis(250),
Duration::from_secs(1),
);
if let Some(input_path) = &args.input_path_opt {
println!("❯ Ingesting documents from {}.", input_path.display());
} else {
Expand All @@ -947,13 +954,17 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
progress_bar.enable_steady_tick(Duration::from_millis(100));
progress_bar.set_style(progress_bar_style());
progress_bar.set_message("0MiB/s");
let update_progress_bar = |ingest_event: IngestEvent| {
// It is not used by the rate estimator anyway.
let useless_start_time = Instant::now();
let mut update_progress_bar = |ingest_event: IngestEvent| {
match ingest_event {
IngestEvent::IngestedDocBatch(num_bytes) => progress_bar.inc(num_bytes as u64),
IngestEvent::IngestedDocBatch(num_bytes) => {
rate_estimator.update(useless_start_time, Instant::now(), num_bytes as u64);
progress_bar.inc(num_bytes as u64)
}
IngestEvent::Sleep => {} // To
};
let throughput =
progress_bar.position() as f64 / progress_bar.elapsed().as_secs_f64() / 1024.0 / 1024.0;
let throughput = rate_estimator.work() as f64 / (1024 * 1024) as f64;
progress_bar.set_message(format!("{throughput:.1} MiB/s"));
};

Expand All @@ -970,7 +981,7 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
&args.index_id,
ingest_source,
batch_size_limit_opt,
Some(&update_progress_bar),
Some(&mut update_progress_bar),
args.commit_type,
)
.await?;
Expand Down
Loading

0 comments on commit fab785c

Please sign in to comment.