From ca59658a381027ef22dca950b104c8f8e7da33cd Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 22 Jan 2024 18:16:46 +0900 Subject: [PATCH] Make io limits for merges a node-level parameter. This PR also removes the index label from the IO metrics. It also removes the limits for the delete pipeline. Closes #4439 --- quickwit/quickwit-common/src/io.rs | 43 +++++++++++++------ .../quickwit-config/src/index_config/mod.rs | 20 ++++++--- .../src/index_config/serialize.rs | 1 + .../quickwit-config/src/node_config/mod.rs | 7 +++ .../src/actors/index_serializer.rs | 5 +-- .../quickwit-indexing/src/actors/indexer.rs | 2 +- .../src/actors/indexing_pipeline.rs | 2 +- .../src/actors/indexing_service.rs | 12 +++--- .../src/actors/merge_pipeline.rs | 26 +++-------- .../src/actors/delete_task_pipeline.rs | 15 ++----- 10 files changed, 74 insertions(+), 59 deletions(-) diff --git a/quickwit/quickwit-common/src/io.rs b/quickwit/quickwit-common/src/io.rs index 64cb4413e3f..73e197e7d9c 100644 --- a/quickwit/quickwit-common/src/io.rs +++ b/quickwit/quickwit-common/src/io.rs @@ -35,7 +35,8 @@ use std::time::Duration; use async_speed_limit::clock::StandardClock; use async_speed_limit::limiter::Consume; -use async_speed_limit::Limiter; +pub use async_speed_limit::Limiter; +use bytesize::ByteSize; use once_cell::sync::Lazy; use pin_project::pin_project; use prometheus::IntCounter; @@ -53,7 +54,7 @@ fn truncate_bytes(bytes: &[u8]) -> &[u8] { } struct IoMetrics { - write_bytes: IntCounterVec<2>, + write_bytes: IntCounterVec<1>, } impl Default for IoMetrics { @@ -63,7 +64,7 @@ impl Default for IoMetrics { "Number of bytes written by a given component in [indexer, merger, deleter, \ split_downloader_{merge,delete}]", "quickwit", - ["index", "component"], + ["component"], ); Self { write_bytes } } @@ -85,9 +86,15 @@ const REFILL_DURATION: Duration = if cfg!(test) { Duration::from_millis(100) }; +pub fn limiter(throughput: ByteSize) -> Limiter { + Limiter::builder(throughput.as_u64() as f64) + .refill(REFILL_DURATION) + .build() +} + #[derive(Clone)] pub struct IoControls { - throughput_limiter: Limiter, + throughput_limiter_opt: Option, bytes_counter: IntCounter, progress: Progress, kill_switch: KillSwitch, @@ -98,7 +105,7 @@ impl Default for IoControls { let default_bytes_counter = IntCounter::new("default_write_num_bytes", "Default write counter.").unwrap(); IoControls { - throughput_limiter: Limiter::new(f64::INFINITY), + throughput_limiter_opt: None, progress: Progress::default(), kill_switch: KillSwitch::default(), bytes_counter: default_bytes_counter, @@ -131,13 +138,20 @@ impl IoControls { Ok(guard) } - pub fn set_index_and_component(mut self, index: &str, component: &str) -> Self { - self.bytes_counter = IO_METRICS.write_bytes.with_label_values([index, component]); + pub fn set_component(mut self, component: &str) -> Self { + self.bytes_counter = IO_METRICS.write_bytes.with_label_values([component]); self } - pub fn set_throughput_limit(mut self, throughput: f64) -> Self { - self.throughput_limiter = Limiter::builder(throughput).refill(REFILL_DURATION).build(); + pub fn set_throughput_limit(self, throughput: ByteSize) -> Self { + let throughput_limiter = Limiter::builder(throughput.as_u64() as f64) + .refill(REFILL_DURATION) + .build(); + self.set_throughput_limiter_opt(Some(throughput_limiter)) + } + + pub fn set_throughput_limiter_opt(mut self, throughput_limiter_opt: Option) -> Self { + self.throughput_limiter_opt = throughput_limiter_opt; self } @@ -157,7 +171,9 @@ impl IoControls { } fn consume_blocking(&self, num_bytes: usize) -> io::Result<()> { let _guard = self.check_if_alive()?; - self.throughput_limiter.blocking_consume(num_bytes); + if let Some(throughput_limiter) = &self.throughput_limiter_opt { + throughput_limiter.blocking_consume(num_bytes); + } self.bytes_counter.inc_by(num_bytes as u64); Ok(()) } @@ -212,9 +228,12 @@ impl ControlledWrite { if len > 0 { let waiter = this.io_controls_access.apply(|io_controls| { io_controls.bytes_counter.inc_by(len as u64); - io_controls.throughput_limiter.consume(len) + io_controls + .throughput_limiter_opt + .as_ref() + .map(|limiter| limiter.consume(len)) }); - *this.waiter = Some(waiter) + *this.waiter = waiter } } res diff --git a/quickwit/quickwit-config/src/index_config/mod.rs b/quickwit/quickwit-config/src/index_config/mod.rs index 4ad20c4e4e2..61cf9fc1ec1 100644 --- a/quickwit/quickwit-config/src/index_config/mod.rs +++ b/quickwit/quickwit-config/src/index_config/mod.rs @@ -38,6 +38,7 @@ use quickwit_doc_mapper::{ use quickwit_proto::types::IndexId; use serde::{Deserialize, Serialize}; pub use serialize::load_index_config_from_user_config; +use tracing::warn; use crate::index_config::serialize::VersionedIndexConfig; use crate::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig}; @@ -97,14 +98,11 @@ pub struct IndexingResources { #[schema(value_type = String, default = "2 GB")] #[serde(default = "IndexingResources::default_heap_size")] pub heap_size: ByteSize, - /// Sets the maximum write IO throughput in bytes/sec for the merge and delete pipelines. - /// The IO limit is applied both to the downloader and to the merge executor. - /// On hardware where IO is limited, this parameter can help limiting the impact of - /// merges/deletes on indexing. + // DEPRECATED: See #4439 #[schema(value_type = String)] #[serde(default)] - #[serde(skip_serializing_if = "Option::is_none")] - pub max_merge_write_throughput: Option, + #[serde(skip_serializing)] + max_merge_write_throughput: Option, } impl PartialEq for IndexingResources { @@ -125,6 +123,16 @@ impl IndexingResources { ..Default::default() } } + + pub fn validate(&self) -> anyhow::Result<()> { + if self.max_merge_write_throughput.is_some() { + warn!( + "`max_merge_write_throughput` is deprecated and will be removed in a future \ + version. See #4439. A global limit now exists in indexer configuration." + ); + } + Ok(()) + } } impl Default for IndexingResources { diff --git a/quickwit/quickwit-config/src/index_config/serialize.rs b/quickwit/quickwit-config/src/index_config/serialize.rs index 065126cb803..a293d5bd01d 100644 --- a/quickwit/quickwit-config/src/index_config/serialize.rs +++ b/quickwit/quickwit-config/src/index_config/serialize.rs @@ -105,6 +105,7 @@ impl IndexConfigForSerialization { build_doc_mapper(&self.doc_mapping, &self.search_settings)?; self.indexing_settings.merge_policy.validate()?; + self.indexing_settings.resources.validate()?; Ok(IndexConfig { index_id: self.index_id, diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 78ba58caa9d..5c39b57c0f8 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -91,6 +91,11 @@ pub struct IndexerConfig { pub split_store_max_num_splits: usize, #[serde(default = "IndexerConfig::default_max_concurrent_split_uploads")] pub max_concurrent_split_uploads: usize, + /// Limits the IO throughput of the `SplitDownloader` and the `MergeExecutor`. + /// On hardware where IO is constraints, it makes sure that Merges (a batch operation) + /// does not starve indexing itself (as it is a latency sensitive operation). + #[serde(default)] + pub max_merge_write_throughput: Option, /// Enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry /// Protocol (OTLP). #[serde(default = "IndexerConfig::default_enable_otlp_endpoint")] @@ -143,6 +148,7 @@ impl IndexerConfig { split_store_max_num_splits: 3, max_concurrent_split_uploads: 4, cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32, + max_merge_write_throughput: None, }; Ok(indexer_config) } @@ -157,6 +163,7 @@ impl Default for IndexerConfig { split_store_max_num_splits: Self::default_split_store_max_num_splits(), max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(), cpu_capacity: Self::default_cpu_capacity(), + max_merge_write_throughput: None, } } } diff --git a/quickwit/quickwit-indexing/src/actors/index_serializer.rs b/quickwit/quickwit-indexing/src/actors/index_serializer.rs index b16620794ec..47960384a45 100644 --- a/quickwit/quickwit-indexing/src/actors/index_serializer.rs +++ b/quickwit/quickwit-indexing/src/actors/index_serializer.rs @@ -84,10 +84,7 @@ impl Handler for IndexSerializer { let io_controls = IoControls::default() .set_progress(ctx.progress().clone()) .set_kill_switch(ctx.kill_switch().clone()) - .set_index_and_component( - split_builder.split_attrs.pipeline_id.index_uid.index_id(), - "index_serializer", - ); + .set_component("index_serializer"); controlled_directory.set_io_controls(io_controls); } let split = split_builder.finalize()?; diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index c007b1f96c2..f97cdee7eed 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -125,7 +125,7 @@ impl IndexerState { let io_controls = IoControls::default() .set_progress(ctx.progress().clone()) .set_kill_switch(ctx.kill_switch().clone()) - .set_index_and_component(self.pipeline_id.index_uid.index_id(), "indexer"); + .set_component("indexer"); let indexed_split = IndexedSplitBuilder::new_in_dir( self.pipeline_id.clone(), diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index b7c8adc8aa1..44f7b94f9fc 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -878,7 +878,7 @@ mod tests { split_store: split_store.clone(), merge_policy: default_merge_policy(), max_concurrent_split_uploads: 2, - merge_max_io_num_bytes_per_sec: None, + merge_io_throughput_limiter_opt: None, event_broker: Default::default(), }; let merge_pipeline = MergePipeline::new(merge_pipeline_params, universe.spawn_ctx()); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index a3cfa219a23..d3132ed63a4 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -33,8 +33,9 @@ use quickwit_actors::{ }; use quickwit_cluster::Cluster; use quickwit_common::fs::get_cache_directory_path; +use quickwit_common::io::Limiter; use quickwit_common::pubsub::EventBroker; -use quickwit_common::temp_dir; +use quickwit_common::{io, temp_dir}; use quickwit_config::{ build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID, }; @@ -128,6 +129,7 @@ pub struct IndexingService { max_concurrent_split_uploads: usize, merge_pipeline_handles: HashMap, cooperative_indexing_permits: Option>, + merge_io_throughput_limiter_opt: Option, event_broker: EventBroker, } @@ -160,6 +162,8 @@ impl IndexingService { indexer_config.split_store_max_num_splits, indexer_config.split_store_max_num_bytes, ); + let merge_io_throughput_limiter_opt = + indexer_config.max_merge_write_throughput.map(io::limiter); let split_cache_dir_path = get_cache_directory_path(&data_dir_path); let local_split_store = LocalSplitStore::open(split_cache_dir_path, split_store_space_quota).await?; @@ -185,6 +189,7 @@ impl IndexingService { counters: Default::default(), max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads, merge_pipeline_handles: HashMap::new(), + merge_io_throughput_limiter_opt, cooperative_indexing_permits, event_broker, }) @@ -293,10 +298,7 @@ impl IndexingService { metastore: self.metastore.clone(), split_store: split_store.clone(), merge_policy: merge_policy.clone(), - merge_max_io_num_bytes_per_sec: index_config - .indexing_settings - .resources - .max_merge_write_throughput, + merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), max_concurrent_split_uploads: self.max_concurrent_split_uploads, event_broker: self.event_broker.clone(), }; diff --git a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs index 01cbf23ff70..69f71936268 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_pipeline.rs @@ -21,12 +21,11 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; -use bytesize::ByteSize; use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Inbox, Mailbox, SpawnContext, Supervisable, HEARTBEAT, }; -use quickwit_common::io::IoControls; +use quickwit_common::io::{IoControls, Limiter}; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::KillSwitch; @@ -270,25 +269,14 @@ impl MergePipeline { .set_kill_switch(self.kill_switch.clone()) .spawn(merge_packager); - let max_merge_write_throughput: f64 = self - .params - .merge_max_io_num_bytes_per_sec - .as_ref() - .map(|bytes_per_sec| bytes_per_sec.as_u64() as f64) - .unwrap_or(f64::INFINITY); - let split_downloader_io_controls = IoControls::default() - .set_throughput_limit(max_merge_write_throughput) - .set_index_and_component( - self.params.pipeline_id.index_uid.index_id(), - "split_downloader_merge", - ); + .set_throughput_limiter_opt(self.params.merge_io_throughput_limiter_opt.clone()) + .set_component("split_downloader_merge"); // The merge and split download share the same throughput limiter. // This is how cloning the `IoControls` works. - let merge_executor_io_controls = split_downloader_io_controls - .clone() - .set_index_and_component(self.params.pipeline_id.index_uid.index_id(), "merger"); + let merge_executor_io_controls = + split_downloader_io_controls.clone().set_component("merger"); let merge_executor = MergeExecutor::new( self.params.pipeline_id.clone(), @@ -473,7 +461,7 @@ pub struct MergePipelineParams { pub split_store: IndexingSplitStore, pub merge_policy: Arc, pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline. - pub merge_max_io_num_bytes_per_sec: Option, + pub merge_io_throughput_limiter_opt: Option, pub event_broker: EventBroker, } @@ -533,7 +521,7 @@ mod tests { split_store, merge_policy: default_merge_policy(), max_concurrent_split_uploads: 2, - merge_max_io_num_bytes_per_sec: None, + merge_io_throughput_limiter_opt: None, event_broker: Default::default(), }; let pipeline = MergePipeline::new(pipeline_params, universe.spawn_ctx()); diff --git a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs index 3a51b18fe59..278a8efd22b 100644 --- a/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs +++ b/quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs @@ -194,19 +194,12 @@ impl DeleteTaskPipeline { pipeline_uid: PipelineUid::from_u128(0u128), source_id: "unknown".to_string(), }; - let throughput_limit: f64 = index_config - .indexing_settings - .resources - .max_merge_write_throughput - .as_ref() - .map(|bytes_per_sec| bytes_per_sec.as_u64() as f64) - .unwrap_or(f64::INFINITY); - let delete_executor_io_controls = IoControls::default() - .set_throughput_limit(throughput_limit) - .set_index_and_component(self.index_uid.index_id(), "deleter"); + + let delete_executor_io_controls = IoControls::default().set_component("deleter"); + let split_download_io_controls = delete_executor_io_controls .clone() - .set_index_and_component(self.index_uid.index_id(), "split_downloader_delete"); + .set_component("split_downloader_delete"); let delete_executor = MergeExecutor::new( index_pipeline_id, self.metastore.clone(),