diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index 896a62bebb1..11fcf287596 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -154,7 +154,8 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec { source_type: SourceToScheduleType::Sharded { shard_ids, // FIXME - load_per_shard: NonZeroU32::new(250u32).unwrap(), + load_per_shard: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4) + .unwrap(), }, }); } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index e4a750e9411..9ae4c7a4183 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -197,10 +197,15 @@ fn compute_max_num_shards_per_pipeline(source_type: &SourceToScheduleType) -> No } } +// This converts a scheduling solution for a given node and a given source. +// Major quirk however: +// For sharded function, this function only partially performs this conversion. +// In the resulting function some of the shards may not be allocated. +// The remaining shards will be added in postprocessing pass. fn convert_scheduling_solution_to_physical_plan_single_node_single_source( mut remaining_num_shards_to_schedule_on_node: u32, // Specific to the source. - mut previous_tasks: &[&IndexingTask], + previous_tasks: &[&IndexingTask], source: &SourceToSchedule, ) -> Vec { match &source.source_type { @@ -208,14 +213,14 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( shard_ids, load_per_shard, } => { + if remaining_num_shards_to_schedule_on_node == 0 { + return Vec::new(); + } // For the moment we do something voluntarily suboptimal. let max_num_pipelines = quickwit_common::div_ceil_u32( - shard_ids.len() as u32 * load_per_shard.get(), + remaining_num_shards_to_schedule_on_node * load_per_shard.get(), CPU_PER_PIPELINE_LOAD_LOWER_THRESHOLD.cpu_millis(), ); - if previous_tasks.len() > max_num_pipelines as usize { - previous_tasks = &previous_tasks[..max_num_pipelines as usize]; - } let max_num_shards_per_pipeline: NonZeroU32 = compute_max_num_shards_per_pipeline(&source.source_type); let mut new_tasks = Vec::new(); @@ -239,6 +244,9 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( shard_ids, }; new_tasks.push(new_task); + if new_tasks.len() >= max_num_pipelines as usize { + break; + } if remaining_num_shards_to_schedule_on_node == 0 { break; } @@ -586,7 +594,11 @@ mod tests { use quickwit_proto::indexing::{mcpu, CpuCapacity, IndexingTask}; use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid}; - use super::{build_physical_indexing_plan, SourceToSchedule, SourceToScheduleType}; + use super::{ + build_physical_indexing_plan, + convert_scheduling_solution_to_physical_plan_single_node_single_source, SourceToSchedule, + SourceToScheduleType, + }; use crate::indexing_plan::PhysicalIndexingPlan; fn source_id() -> SourceUid { @@ -905,4 +917,158 @@ mod tests { capacities.insert("indexer-1".to_string(), CpuCapacity::from_cpu_millis(8000)); build_physical_indexing_plan(&sources_to_schedule, &capacities, None); } + + #[test] + fn test_convert_scheduling_solution_to_physical_plan_single_node_single_source_sharded() { + let source_uid = SourceUid { + index_uid: IndexUid::new_with_random_ulid("testindex"), + source_id: "testsource".to_string(), + }; + let previous_task1 = IndexingTask { + index_uid: source_uid.index_uid.to_string(), + source_id: source_uid.source_id.to_string(), + pipeline_uid: Some(PipelineUid::new()), + shard_ids: vec![1, 4, 5], + }; + let previous_task2 = IndexingTask { + index_uid: source_uid.index_uid.to_string(), + source_id: source_uid.source_id.to_string(), + pipeline_uid: Some(PipelineUid::new()), + shard_ids: vec![6, 7, 8, 9, 10], + }; + { + let sharded_source = SourceToSchedule { + source_uid: source_uid.clone(), + source_type: SourceToScheduleType::Sharded { + shard_ids: vec![1, 2, 4, 6], + load_per_shard: NonZeroU32::new(1_000).unwrap(), + }, + }; + let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( + 4, + &[&previous_task1, &previous_task2], + &sharded_source, + ); + assert_eq!(tasks.len(), 2); + assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str()); + assert_eq!(&tasks[0].shard_ids, &[1, 4]); + assert_eq!(&tasks[1].index_uid, source_uid.index_uid.as_str()); + assert_eq!(&tasks[1].shard_ids, &[6]); + } + { + // smaller shards force a merge into a single pipeline + let sharded_source = SourceToSchedule { + source_uid: source_uid.clone(), + source_type: SourceToScheduleType::Sharded { + shard_ids: vec![1, 2, 4, 6], + load_per_shard: NonZeroU32::new(250).unwrap(), + }, + }; + let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( + 4, + &[&previous_task1, &previous_task2], + &sharded_source, + ); + assert_eq!(tasks.len(), 1); + assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str()); + assert_eq!(&tasks[0].shard_ids, &[1, 4]); + } + } + + #[test] + fn test_convert_scheduling_solution_to_physical_plan_single_node_single_source_non_sharded() { + let source_uid = SourceUid { + index_uid: IndexUid::new_with_random_ulid("testindex"), + source_id: "testsource".to_string(), + }; + let pipeline_uid1 = PipelineUid::new(); + let previous_task1 = IndexingTask { + index_uid: source_uid.index_uid.to_string(), + source_id: source_uid.source_id.to_string(), + pipeline_uid: Some(pipeline_uid1), + shard_ids: vec![], + }; + let pipeline_uid2 = PipelineUid::new(); + let previous_task2 = IndexingTask { + index_uid: source_uid.index_uid.to_string(), + source_id: source_uid.source_id.to_string(), + pipeline_uid: Some(pipeline_uid2), + shard_ids: vec![], + }; + { + let sharded_source = SourceToSchedule { + source_uid: source_uid.clone(), + source_type: SourceToScheduleType::NonSharded { + num_pipelines: 1, + load_per_pipeline: NonZeroU32::new(4000).unwrap(), + }, + }; + let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( + 1, + &[&previous_task1, &previous_task2], + &sharded_source, + ); + assert_eq!(tasks.len(), 1); + assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str()); + assert!(&tasks[0].shard_ids.is_empty()); + assert_eq!(tasks[0].pipeline_uid.as_ref().unwrap(), &pipeline_uid1); + } + { + let sharded_source = SourceToSchedule { + source_uid: source_uid.clone(), + source_type: SourceToScheduleType::NonSharded { + num_pipelines: 0, + load_per_pipeline: NonZeroU32::new(1_000).unwrap(), + }, + }; + let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( + 0, + &[&previous_task1, &previous_task2], + &sharded_source, + ); + assert_eq!(tasks.len(), 0); + } + { + let sharded_source = SourceToSchedule { + source_uid: source_uid.clone(), + source_type: SourceToScheduleType::NonSharded { + num_pipelines: 2, + load_per_pipeline: NonZeroU32::new(1_000).unwrap(), + }, + }; + let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( + 2, + &[&previous_task1, &previous_task2], + &sharded_source, + ); + assert_eq!(tasks.len(), 2); + assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str()); + assert!(&tasks[0].shard_ids.is_empty()); + assert_eq!(tasks[0].pipeline_uid.as_ref().unwrap(), &pipeline_uid1); + assert_eq!(&tasks[1].index_uid, source_uid.index_uid.as_str()); + assert!(&tasks[1].shard_ids.is_empty()); + assert_eq!(tasks[1].pipeline_uid.as_ref().unwrap(), &pipeline_uid2); + } + { + let sharded_source = SourceToSchedule { + source_uid: source_uid.clone(), + source_type: SourceToScheduleType::NonSharded { + num_pipelines: 2, + load_per_pipeline: NonZeroU32::new(1_000).unwrap(), + }, + }; + let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( + 2, + &[&previous_task1], + &sharded_source, + ); + assert_eq!(tasks.len(), 2); + assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str()); + assert!(&tasks[0].shard_ids.is_empty()); + assert_eq!(tasks[0].pipeline_uid.as_ref().unwrap(), &pipeline_uid1); + assert_eq!(&tasks[1].index_uid, source_uid.index_uid.as_str()); + assert!(&tasks[1].shard_ids.is_empty()); + assert_ne!(tasks[1].pipeline_uid.as_ref().unwrap(), &pipeline_uid1); + } + } }