diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 6cfbdcfc7ac..cc5f2b33fc5 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -676,10 +676,8 @@ impl IngestController { model: &mut ControlPlaneModel, progress: &Progress, ) -> MetastoreResult<()> { - const NUM_PERMITS: u64 = 1; - if !model - .acquire_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS) + .acquire_scaling_permits(&source_uid, ScalingMode::Up) .unwrap_or(false) { return Ok(()); @@ -698,7 +696,7 @@ impl IngestController { if successful_source_uids.is_empty() { // We did not manage to create the shard. // We can release our permit. - model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Up); warn!( index_uid=%source_uid.index_uid, source_id=%source_uid.source_id, @@ -722,7 +720,7 @@ impl IngestController { source_id=%source_uid.source_id, "scaling up number of shards to {new_num_open_shards} failed: {metastore_error:?}" ); - model.release_scaling_permits(&source_uid, ScalingMode::Up, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Up); Err(metastore_error) } } @@ -860,10 +858,12 @@ impl IngestController { model: &mut ControlPlaneModel, progress: &Progress, ) -> MetastoreResult<()> { - const NUM_PERMITS: u64 = 1; + if shard_stats.num_open_shards == 0 { + return Ok(()); + } if !model - .acquire_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS) + .acquire_scaling_permits(&source_uid, ScalingMode::Down) .unwrap_or(false) { return Ok(()); @@ -876,12 +876,12 @@ impl IngestController { "scaling down number of shards to {new_num_open_shards}" ); let Some((leader_id, shard_id)) = find_scale_down_candidate(&source_uid, model) else { - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); }; info!("scaling down shard {shard_id} from {leader_id}"); let Some(ingester) = self.ingester_pool.get(&leader_id) else { - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); }; let shard_pkeys = vec![ShardPKey { @@ -896,7 +896,7 @@ impl IngestController { .await { warn!("failed to scale down number of shards: {error}"); - model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); + model.release_scaling_permits(&source_uid, ScalingMode::Down); return Ok(()); } model.close_shards(&source_uid, &[shard_id]); diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index ca314233f6a..d4e02f67c2c 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -378,10 +378,9 @@ impl ControlPlaneModel { &mut self, source_uid: &SourceUid, scaling_mode: ScalingMode, - num_permits: u64, ) -> Option { self.shard_table - .acquire_scaling_permits(source_uid, scaling_mode, num_permits) + .acquire_scaling_permits(source_uid, scaling_mode) } pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { @@ -389,14 +388,9 @@ impl ControlPlaneModel { .drain_scaling_permits(source_uid, scaling_mode) } - pub fn release_scaling_permits( - &mut self, - source_uid: &SourceUid, - scaling_mode: ScalingMode, - num_permits: u64, - ) { + pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { self.shard_table - .release_scaling_permits(source_uid, scaling_mode, num_permits) + .release_scaling_permits(source_uid, scaling_mode) } } diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 29c579cddcd..b2581adcbad 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -544,14 +544,13 @@ impl ShardTable { &mut self, source_uid: &SourceUid, scaling_mode: ScalingMode, - num_permits: u64, ) -> Option { let table_entry = self.table_entries.get_mut(source_uid)?; let scaling_rate_limiter = match scaling_mode { ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter, ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter, }; - Some(scaling_rate_limiter.acquire(num_permits)) + Some(scaling_rate_limiter.acquire(1)) } pub fn drain_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { @@ -564,18 +563,13 @@ impl ShardTable { } } - pub fn release_scaling_permits( - &mut self, - source_uid: &SourceUid, - scaling_mode: ScalingMode, - num_permits: u64, - ) { + pub fn release_scaling_permits(&mut self, source_uid: &SourceUid, scaling_mode: ScalingMode) { if let Some(table_entry) = self.table_entries.get_mut(source_uid) { let scaling_rate_limiter = match scaling_mode { ScalingMode::Up => &mut table_entry.scaling_up_rate_limiter, ScalingMode::Down => &mut table_entry.scaling_down_rate_limiter, }; - scaling_rate_limiter.release(num_permits); + scaling_rate_limiter.release(1); } } }