diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 13278547b53..c2afc0f462d 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5860,6 +5860,7 @@ version = "0.8.0" dependencies = [ "anyhow", "async-trait", + "bytesize", "fnv", "futures", "itertools 0.13.0", diff --git a/quickwit/quickwit-control-plane/Cargo.toml b/quickwit/quickwit-control-plane/Cargo.toml index ffb57b1b732..024704736e0 100644 --- a/quickwit/quickwit-control-plane/Cargo.toml +++ b/quickwit/quickwit-control-plane/Cargo.toml @@ -13,6 +13,7 @@ license.workspace = true [dependencies] anyhow = { workspace = true } async-trait = { workspace = true } +bytesize = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 45543609da2..6ec869819e4 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -26,11 +26,13 @@ use std::num::NonZeroU32; use std::sync::Arc; use std::time::{Duration, Instant}; +use bytesize::ByteSize; use fnv::{FnvHashMap, FnvHashSet}; use itertools::Itertools; use quickwit_proto::control_plane::{RebuildPlanRequest, RebuildPlanResponse}; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY, + PIPELINE_THROUGHTPUT, }; use quickwit_proto::metastore::SourceType; use quickwit_proto::types::NodeId; @@ -127,20 +129,25 @@ impl fmt::Debug for IndexingScheduler { /// This function averages their statistics. /// /// For the moment, this function only takes in account the measured throughput, -/// and assumes a constant CPU usage of 4 vCPU = 30mb/s. +/// and assumes a constant CPU usage of 4 vCPU = 20mb/s. /// /// It does not take in account the variation that could raise from the different /// doc mapping / nature of the data, etc. fn compute_load_per_shard(shard_entries: &[&ShardEntry]) -> NonZeroU32 { - let num_shards = shard_entries.len().max(1) as u32; - let average_throughput_per_shard: u32 = shard_entries + let num_shards = shard_entries.len().max(1) as u64; + let average_throughput_per_shard_bytes: u64 = shard_entries .iter() - .map(|shard_entry| u32::from(shard_entry.ingestion_rate.0)) - .sum::() - .div_ceil(num_shards); - let num_cpu_millis = (PIPELINE_FULL_CAPACITY.cpu_millis() * average_throughput_per_shard) / 20; + .map(|shard_entry| shard_entry.ingestion_rate.0 as u64 * bytesize::MIB) + .sum::() + .div_ceil(num_shards) + // A shard throughput cannot exceed PIPELINE_THROUGHPUT in the long term (this is enforced + // by the configuration). + .min(PIPELINE_THROUGHTPUT.as_u64()); + let num_cpu_millis = ((PIPELINE_FULL_CAPACITY.cpu_millis() as u64 + * average_throughput_per_shard_bytes) as u64) + / PIPELINE_THROUGHTPUT.as_u64(); const MIN_CPU_LOAD_PER_SHARD: u32 = 50u32; - NonZeroU32::new(num_cpu_millis.max(MIN_CPU_LOAD_PER_SHARD)).unwrap() + NonZeroU32::new((num_cpu_millis as u32).max(MIN_CPU_LOAD_PER_SHARD)).unwrap() } fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { diff --git a/quickwit/quickwit-proto/src/indexing/mod.rs b/quickwit/quickwit-proto/src/indexing/mod.rs index b79311f21fd..968c575aee5 100644 --- a/quickwit/quickwit-proto/src/indexing/mod.rs +++ b/quickwit/quickwit-proto/src/indexing/mod.rs @@ -22,6 +22,7 @@ use std::fmt::{Display, Formatter}; use std::hash::Hash; use std::ops::{Add, Mul, Sub}; +use bytesize::ByteSize; use quickwit_actors::AskError; use quickwit_common::pubsub::Event; use quickwit_common::tower::{MakeLoadShedError, RpcName}; @@ -176,9 +177,17 @@ impl Display for PipelineMetrics { } /// One full pipeline (including merging) is assumed to consume 4 CPU threads. -/// The actual number somewhere between 3 and 4. +/// The actual number somewhere between 3 and 4. Quickwit is not super sensitive to this number. +/// +/// It simply impacts the point where we prefer to work on balancing the load over the different +/// indexers and the point where we prefer improving other feature of the system (shard locality, +/// grouping pipelines associated to a given index on the same node, etc.). pub const PIPELINE_FULL_CAPACITY: CpuCapacity = CpuCapacity::from_cpu_millis(4_000u32); +/// One full pipeline (including merging) is supposed to have the capacity to index at least 20mb/s. +/// This is a defensive value: In reality, this is typically above 30mb/s. +pub const PIPELINE_THROUGHTPUT: ByteSize = ByteSize::mb(20); + /// The CpuCapacity represents an amount of CPU resource available. /// /// It is usually expressed in CPU millis (For instance, one full CPU thread is