diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index 82af447012a..1bd649dce60 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -58,7 +58,7 @@ pub fn solve( fn remove_extraneous_shards(problem: &SchedulingProblem, solution: &mut SchedulingSolution) { let mut num_shards_per_source: Vec = vec![0; problem.num_sources()]; - for indexer_assignment in &mut solution.indexer_assignments { + for indexer_assignment in &solution.indexer_assignments { if let Some((&source_ord, _)) = indexer_assignment.num_shards_per_source.last_key_value() { assert!(source_ord < problem.num_sources() as SourceOrd); } @@ -81,8 +81,10 @@ fn remove_extraneous_shards(problem: &SchedulingProblem, solution: &mut Scheduli let mut nodes_with_source: HashMap> = HashMap::default(); for (node_id, indexer_assignment) in solution.indexer_assignments.iter().enumerate() { - for &source in indexer_assignment.num_shards_per_source.keys() { - nodes_with_source.entry(source).or_default().push(node_id); + for (&source, &num_shards) in &indexer_assignment.num_shards_per_source { + if num_shards > 0 { + nodes_with_source.entry(source).or_default().push(node_id); + } } } @@ -292,6 +294,9 @@ fn compute_unassigned_sources( .collect(); for indexer_assignment in &solution.indexer_assignments { for (&source_ord, &num_shards) in &indexer_assignment.num_shards_per_source { + if num_shards == 0 { + continue; + } let Entry::Occupied(mut entry) = unassigned_sources.entry(source_ord) else { panic!("The solution contains more shards than the actual problem."); }; @@ -603,10 +608,8 @@ mod tests { let mut solution = SchedulingSolution::with_num_indexers(num_nodes); for (node_id, indexer_assignment) in indexer_assignments.iter().enumerate() { for (source_ord, num_shards) in indexer_assignment.iter().copied().enumerate() { - if num_shards > 0 { - solution.indexer_assignments[node_id] - .add_shards(source_ord as u32, num_shards); - } + solution.indexer_assignments[node_id] + .add_shards(source_ord as u32, num_shards); } } solution @@ -624,6 +627,23 @@ mod tests { }) } + #[test] + fn test_problem_leading_to_zero_shard() { + let mut problem = SchedulingProblem::with_indexer_cpu_capacities( + vec![CpuCapacity::from_cpu_millis(0), + CpuCapacity::from_cpu_millis(0), + ]); + problem.add_source(0, NonZeroU32::new(1).unwrap()); + let mut previous_solution = problem.new_solution(); + previous_solution.indexer_assignments[0].add_shards(0, 0); + previous_solution.indexer_assignments[1].add_shards(0, 0); + let (solution, still_unassigned) = solve(&problem, previous_solution); + assert_eq!(solution.indexer_assignments[0].num_shards(0), 0); + assert_eq!(solution.indexer_assignments[1].num_shards(0), 0); + assert!(still_unassigned.is_empty()); + + } + proptest! { #[test] fn test_proptest_post_conditions((problem, solution) in problem_solution_strategy()) {