Skip to content

Commit

Permalink
Make io limits for merges a node-level parameter.
Browse files Browse the repository at this point in the history
This PR also removes the index label from the IO metrics.
It also removes the limits for the delete pipeline.

Closes #4439
  • Loading branch information
fulmicoton committed Jan 22, 2024
1 parent f858455 commit ca59658
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 59 deletions.
43 changes: 31 additions & 12 deletions quickwit/quickwit-common/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use std::time::Duration;

use async_speed_limit::clock::StandardClock;
use async_speed_limit::limiter::Consume;
use async_speed_limit::Limiter;
pub use async_speed_limit::Limiter;
use bytesize::ByteSize;
use once_cell::sync::Lazy;
use pin_project::pin_project;
use prometheus::IntCounter;
Expand All @@ -53,7 +54,7 @@ fn truncate_bytes(bytes: &[u8]) -> &[u8] {
}

struct IoMetrics {
write_bytes: IntCounterVec<2>,
write_bytes: IntCounterVec<1>,
}

impl Default for IoMetrics {
Expand All @@ -63,7 +64,7 @@ impl Default for IoMetrics {
"Number of bytes written by a given component in [indexer, merger, deleter, \
split_downloader_{merge,delete}]",
"quickwit",
["index", "component"],
["component"],
);
Self { write_bytes }
}
Expand All @@ -85,9 +86,15 @@ const REFILL_DURATION: Duration = if cfg!(test) {
Duration::from_millis(100)
};

pub fn limiter(throughput: ByteSize) -> Limiter {
Limiter::builder(throughput.as_u64() as f64)
.refill(REFILL_DURATION)
.build()
}

#[derive(Clone)]
pub struct IoControls {
throughput_limiter: Limiter,
throughput_limiter_opt: Option<Limiter>,
bytes_counter: IntCounter,
progress: Progress,
kill_switch: KillSwitch,
Expand All @@ -98,7 +105,7 @@ impl Default for IoControls {
let default_bytes_counter =
IntCounter::new("default_write_num_bytes", "Default write counter.").unwrap();
IoControls {
throughput_limiter: Limiter::new(f64::INFINITY),
throughput_limiter_opt: None,
progress: Progress::default(),
kill_switch: KillSwitch::default(),
bytes_counter: default_bytes_counter,
Expand Down Expand Up @@ -131,13 +138,20 @@ impl IoControls {
Ok(guard)
}

pub fn set_index_and_component(mut self, index: &str, component: &str) -> Self {
self.bytes_counter = IO_METRICS.write_bytes.with_label_values([index, component]);
pub fn set_component(mut self, component: &str) -> Self {
self.bytes_counter = IO_METRICS.write_bytes.with_label_values([component]);
self
}

pub fn set_throughput_limit(mut self, throughput: f64) -> Self {
self.throughput_limiter = Limiter::builder(throughput).refill(REFILL_DURATION).build();
pub fn set_throughput_limit(self, throughput: ByteSize) -> Self {
let throughput_limiter = Limiter::builder(throughput.as_u64() as f64)
.refill(REFILL_DURATION)
.build();
self.set_throughput_limiter_opt(Some(throughput_limiter))
}

pub fn set_throughput_limiter_opt(mut self, throughput_limiter_opt: Option<Limiter>) -> Self {
self.throughput_limiter_opt = throughput_limiter_opt;
self
}

Expand All @@ -157,7 +171,9 @@ impl IoControls {
}
fn consume_blocking(&self, num_bytes: usize) -> io::Result<()> {
let _guard = self.check_if_alive()?;
self.throughput_limiter.blocking_consume(num_bytes);
if let Some(throughput_limiter) = &self.throughput_limiter_opt {
throughput_limiter.blocking_consume(num_bytes);
}
self.bytes_counter.inc_by(num_bytes as u64);
Ok(())
}
Expand Down Expand Up @@ -212,9 +228,12 @@ impl<A: IoControlsAccess, W: AsyncWrite> ControlledWrite<A, W> {
if len > 0 {
let waiter = this.io_controls_access.apply(|io_controls| {
io_controls.bytes_counter.inc_by(len as u64);
io_controls.throughput_limiter.consume(len)
io_controls
.throughput_limiter_opt
.as_ref()
.map(|limiter| limiter.consume(len))
});
*this.waiter = Some(waiter)
*this.waiter = waiter
}
}
res
Expand Down
20 changes: 14 additions & 6 deletions quickwit/quickwit-config/src/index_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use quickwit_doc_mapper::{
use quickwit_proto::types::IndexId;
use serde::{Deserialize, Serialize};
pub use serialize::load_index_config_from_user_config;
use tracing::warn;

use crate::index_config::serialize::VersionedIndexConfig;
use crate::merge_policy_config::{MergePolicyConfig, StableLogMergePolicyConfig};
Expand Down Expand Up @@ -97,14 +98,11 @@ pub struct IndexingResources {
#[schema(value_type = String, default = "2 GB")]
#[serde(default = "IndexingResources::default_heap_size")]
pub heap_size: ByteSize,
/// Sets the maximum write IO throughput in bytes/sec for the merge and delete pipelines.
/// The IO limit is applied both to the downloader and to the merge executor.
/// On hardware where IO is limited, this parameter can help limiting the impact of
/// merges/deletes on indexing.
// DEPRECATED: See #4439
#[schema(value_type = String)]
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub max_merge_write_throughput: Option<ByteSize>,
#[serde(skip_serializing)]
max_merge_write_throughput: Option<ByteSize>,
}

impl PartialEq for IndexingResources {
Expand All @@ -125,6 +123,16 @@ impl IndexingResources {
..Default::default()
}
}

pub fn validate(&self) -> anyhow::Result<()> {
if self.max_merge_write_throughput.is_some() {
warn!(
"`max_merge_write_throughput` is deprecated and will be removed in a future \
version. See #4439. A global limit now exists in indexer configuration."
);
}
Ok(())
}
}

impl Default for IndexingResources {
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-config/src/index_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl IndexConfigForSerialization {
build_doc_mapper(&self.doc_mapping, &self.search_settings)?;

self.indexing_settings.merge_policy.validate()?;
self.indexing_settings.resources.validate()?;

Ok(IndexConfig {
index_id: self.index_id,
Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ pub struct IndexerConfig {
pub split_store_max_num_splits: usize,
#[serde(default = "IndexerConfig::default_max_concurrent_split_uploads")]
pub max_concurrent_split_uploads: usize,
/// Limits the IO throughput of the `SplitDownloader` and the `MergeExecutor`.
/// On hardware where IO is constraints, it makes sure that Merges (a batch operation)
/// does not starve indexing itself (as it is a latency sensitive operation).
#[serde(default)]
pub max_merge_write_throughput: Option<ByteSize>,
/// Enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry
/// Protocol (OTLP).
#[serde(default = "IndexerConfig::default_enable_otlp_endpoint")]
Expand Down Expand Up @@ -143,6 +148,7 @@ impl IndexerConfig {
split_store_max_num_splits: 3,
max_concurrent_split_uploads: 4,
cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32,
max_merge_write_throughput: None,
};
Ok(indexer_config)
}
Expand All @@ -157,6 +163,7 @@ impl Default for IndexerConfig {
split_store_max_num_splits: Self::default_split_store_max_num_splits(),
max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(),
cpu_capacity: Self::default_cpu_capacity(),
max_merge_write_throughput: None,
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions quickwit/quickwit-indexing/src/actors/index_serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ impl Handler<IndexedSplitBatchBuilder> for IndexSerializer {
let io_controls = IoControls::default()
.set_progress(ctx.progress().clone())
.set_kill_switch(ctx.kill_switch().clone())
.set_index_and_component(
split_builder.split_attrs.pipeline_id.index_uid.index_id(),
"index_serializer",
);
.set_component("index_serializer");
controlled_directory.set_io_controls(io_controls);
}
let split = split_builder.finalize()?;
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl IndexerState {
let io_controls = IoControls::default()
.set_progress(ctx.progress().clone())
.set_kill_switch(ctx.kill_switch().clone())
.set_index_and_component(self.pipeline_id.index_uid.index_id(), "indexer");
.set_component("indexer");

let indexed_split = IndexedSplitBuilder::new_in_dir(
self.pipeline_id.clone(),
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,7 @@ mod tests {
split_store: split_store.clone(),
merge_policy: default_merge_policy(),
max_concurrent_split_uploads: 2,
merge_max_io_num_bytes_per_sec: None,
merge_io_throughput_limiter_opt: None,
event_broker: Default::default(),
};
let merge_pipeline = MergePipeline::new(merge_pipeline_params, universe.spawn_ctx());
Expand Down
12 changes: 7 additions & 5 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ use quickwit_actors::{
};
use quickwit_cluster::Cluster;
use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::io::Limiter;
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir;
use quickwit_common::{io, temp_dir};
use quickwit_config::{
build_doc_mapper, IndexConfig, IndexerConfig, SourceConfig, INGEST_API_SOURCE_ID,
};
Expand Down Expand Up @@ -128,6 +129,7 @@ pub struct IndexingService {
max_concurrent_split_uploads: usize,
merge_pipeline_handles: HashMap<MergePipelineId, MergePipelineHandle>,
cooperative_indexing_permits: Option<Arc<Semaphore>>,
merge_io_throughput_limiter_opt: Option<Limiter>,
event_broker: EventBroker,
}

Expand Down Expand Up @@ -160,6 +162,8 @@ impl IndexingService {
indexer_config.split_store_max_num_splits,
indexer_config.split_store_max_num_bytes,
);
let merge_io_throughput_limiter_opt =
indexer_config.max_merge_write_throughput.map(io::limiter);
let split_cache_dir_path = get_cache_directory_path(&data_dir_path);
let local_split_store =
LocalSplitStore::open(split_cache_dir_path, split_store_space_quota).await?;
Expand All @@ -185,6 +189,7 @@ impl IndexingService {
counters: Default::default(),
max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads,
merge_pipeline_handles: HashMap::new(),
merge_io_throughput_limiter_opt,
cooperative_indexing_permits,
event_broker,
})
Expand Down Expand Up @@ -293,10 +298,7 @@ impl IndexingService {
metastore: self.metastore.clone(),
split_store: split_store.clone(),
merge_policy: merge_policy.clone(),
merge_max_io_num_bytes_per_sec: index_config
.indexing_settings
.resources
.max_merge_write_throughput,
merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
};
Expand Down
26 changes: 7 additions & 19 deletions quickwit/quickwit-indexing/src/actors/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use bytesize::ByteSize;
use quickwit_actors::{
Actor, ActorContext, ActorExitStatus, ActorHandle, Handler, Health, Inbox, Mailbox,
SpawnContext, Supervisable, HEARTBEAT,
};
use quickwit_common::io::IoControls;
use quickwit_common::io::{IoControls, Limiter};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::TempDirectory;
use quickwit_common::KillSwitch;
Expand Down Expand Up @@ -270,25 +269,14 @@ impl MergePipeline {
.set_kill_switch(self.kill_switch.clone())
.spawn(merge_packager);

let max_merge_write_throughput: f64 = self
.params
.merge_max_io_num_bytes_per_sec
.as_ref()
.map(|bytes_per_sec| bytes_per_sec.as_u64() as f64)
.unwrap_or(f64::INFINITY);

let split_downloader_io_controls = IoControls::default()
.set_throughput_limit(max_merge_write_throughput)
.set_index_and_component(
self.params.pipeline_id.index_uid.index_id(),
"split_downloader_merge",
);
.set_throughput_limiter_opt(self.params.merge_io_throughput_limiter_opt.clone())
.set_component("split_downloader_merge");

// The merge and split download share the same throughput limiter.
// This is how cloning the `IoControls` works.
let merge_executor_io_controls = split_downloader_io_controls
.clone()
.set_index_and_component(self.params.pipeline_id.index_uid.index_id(), "merger");
let merge_executor_io_controls =
split_downloader_io_controls.clone().set_component("merger");

let merge_executor = MergeExecutor::new(
self.params.pipeline_id.clone(),
Expand Down Expand Up @@ -473,7 +461,7 @@ pub struct MergePipelineParams {
pub split_store: IndexingSplitStore,
pub merge_policy: Arc<dyn MergePolicy>,
pub max_concurrent_split_uploads: usize, //< TODO share with the indexing pipeline.
pub merge_max_io_num_bytes_per_sec: Option<ByteSize>,
pub merge_io_throughput_limiter_opt: Option<Limiter>,
pub event_broker: EventBroker,
}

Expand Down Expand Up @@ -533,7 +521,7 @@ mod tests {
split_store,
merge_policy: default_merge_policy(),
max_concurrent_split_uploads: 2,
merge_max_io_num_bytes_per_sec: None,
merge_io_throughput_limiter_opt: None,
event_broker: Default::default(),
};
let pipeline = MergePipeline::new(pipeline_params, universe.spawn_ctx());
Expand Down
15 changes: 4 additions & 11 deletions quickwit/quickwit-janitor/src/actors/delete_task_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,19 +194,12 @@ impl DeleteTaskPipeline {
pipeline_uid: PipelineUid::from_u128(0u128),
source_id: "unknown".to_string(),
};
let throughput_limit: f64 = index_config
.indexing_settings
.resources
.max_merge_write_throughput
.as_ref()
.map(|bytes_per_sec| bytes_per_sec.as_u64() as f64)
.unwrap_or(f64::INFINITY);
let delete_executor_io_controls = IoControls::default()
.set_throughput_limit(throughput_limit)
.set_index_and_component(self.index_uid.index_id(), "deleter");

let delete_executor_io_controls = IoControls::default().set_component("deleter");

let split_download_io_controls = delete_executor_io_controls
.clone()
.set_index_and_component(self.index_uid.index_id(), "split_downloader_delete");
.set_component("split_downloader_delete");
let delete_executor = MergeExecutor::new(
index_pipeline_id,
self.metastore.clone(),
Expand Down

0 comments on commit ca59658

Please sign in to comment.