Skip to content

Commit

Permalink
added comment
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Jul 16, 2024
1 parent 075be7a commit 9d88c2d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
9 changes: 9 additions & 0 deletions quickwit/quickwit-control-plane/src/ingest/scaling_arbiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,16 @@ use crate::model::{ScalingMode, ShardStats};
pub(crate) struct ScalingArbiter {
// Threshold in MiB/s below which we decrease the number of shards.
scale_down_shards_threshold_mib_per_sec: f32,

// Threshold in MiB/s above which we increase the number of shards.
// In order to make scaling up reactive, the decision is mostly taken by inspecting the short
// term threshold.
//
// However, this threshold is based on a very short window of time: 5s.
//
// In order to avoid having back and forth scaling up and down in response to temporary
// punctual spikes of a few MB, we also compute what would be the long term ingestion rate
// after scaling up, and double check that it is above the long term threshold.
scale_up_shards_short_term_threshold_mib_per_sec: f32,
scale_up_shards_long_term_threshold_mib_per_sec: f32,
}
Expand Down
14 changes: 7 additions & 7 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,17 @@ impl LocalShardsSnapshot {
pub(super) struct BroadcastLocalShardsTask {
cluster: Cluster,
weak_state: WeakIngesterState,
shard_throughput_time_series: ShardThroughputTimeSeries,
shard_throughput_time_series_map: ShardThroughputTimeSeriesMap,
}

const SHARD_THROUGHPUT_LONG_TERM_WINDOW_LEN: usize = 12;

#[derive(Default)]
struct ShardThroughputTimeSeries {
struct ShardThroughputTimeSeriesMap {
shard_time_series: HashMap<(SourceUid, ShardId), ShardThroughputTimeSerie>,
}

impl ShardThroughputTimeSeries {
impl ShardThroughputTimeSeriesMap {
// Records a list of shard throughputs.
//
// A new time series is created for each new shard_ids.
Expand Down Expand Up @@ -262,7 +262,7 @@ impl BroadcastLocalShardsTask {
let mut broadcaster = Self {
cluster,
weak_state,
shard_throughput_time_series: Default::default(),
shard_throughput_time_series_map: Default::default(),
};
tokio::spawn(async move { broadcaster.run().await })
}
Expand Down Expand Up @@ -311,11 +311,11 @@ impl BroadcastLocalShardsTask {
})
.collect();

self.shard_throughput_time_series
self.shard_throughput_time_series_map
.record_shard_throughputs(ingestion_rates);

let per_source_shard_infos = self
.shard_throughput_time_series
.shard_throughput_time_series_map
.get_per_source_shard_infos();

for shard_infos in per_source_shard_infos.values() {
Expand Down Expand Up @@ -581,7 +581,7 @@ mod tests {
let mut task = BroadcastLocalShardsTask {
cluster,
weak_state,
shard_throughput_time_series: Default::default(),
shard_throughput_time_series_map: Default::default(),
};
let previous_snapshot = task.snapshot_local_shards().await.unwrap();
assert!(previous_snapshot.per_source_shard_infos.is_empty());
Expand Down

0 comments on commit 9d88c2d

Please sign in to comment.