diff --git a/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs b/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs index 0bd4961f125..dd85e06692d 100644 --- a/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs +++ b/quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs @@ -22,7 +22,16 @@ use crate::model::{ScalingMode, ShardStats}; pub(crate) struct ScalingArbiter { // Threshold in MiB/s below which we decrease the number of shards. scale_down_shards_threshold_mib_per_sec: f32, + // Threshold in MiB/s above which we increase the number of shards. + // In order to make scaling up reactive, the decision is mostly taken by inspecting the short + // term threshold. + // + // However, this threshold is based on a very short window of time: 5s. + // + // In order to avoid having back and forth scaling up and down in response to temporary + // punctual spikes of a few MB, we also compute what would be the long term ingestion rate + // after scaling up, and double check that it is above the long term threshold. scale_up_shards_short_term_threshold_mib_per_sec: f32, scale_up_shards_long_term_threshold_mib_per_sec: f32, } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs index 8c4502b8270..526a8dae81a 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs @@ -160,17 +160,17 @@ impl LocalShardsSnapshot { pub(super) struct BroadcastLocalShardsTask { cluster: Cluster, weak_state: WeakIngesterState, - shard_throughput_time_series: ShardThroughputTimeSeries, + shard_throughput_time_series_map: ShardThroughputTimeSeriesMap, } const SHARD_THROUGHPUT_LONG_TERM_WINDOW_LEN: usize = 12; #[derive(Default)] -struct ShardThroughputTimeSeries { +struct ShardThroughputTimeSeriesMap { shard_time_series: HashMap<(SourceUid, ShardId), ShardThroughputTimeSerie>, } -impl ShardThroughputTimeSeries { +impl ShardThroughputTimeSeriesMap { // Records a list of shard throughputs. // // A new time series is created for each new shard_ids. @@ -262,7 +262,7 @@ impl BroadcastLocalShardsTask { let mut broadcaster = Self { cluster, weak_state, - shard_throughput_time_series: Default::default(), + shard_throughput_time_series_map: Default::default(), }; tokio::spawn(async move { broadcaster.run().await }) } @@ -311,11 +311,11 @@ impl BroadcastLocalShardsTask { }) .collect(); - self.shard_throughput_time_series + self.shard_throughput_time_series_map .record_shard_throughputs(ingestion_rates); let per_source_shard_infos = self - .shard_throughput_time_series + .shard_throughput_time_series_map .get_per_source_shard_infos(); for shard_infos in per_source_shard_infos.values() { @@ -581,7 +581,7 @@ mod tests { let mut task = BroadcastLocalShardsTask { cluster, weak_state, - shard_throughput_time_series: Default::default(), + shard_throughput_time_series_map: Default::default(), }; let previous_snapshot = task.snapshot_local_shards().await.unwrap(); assert!(previous_snapshot.per_source_shard_infos.is_empty());