Skip to content

Commit

Permalink
bugfix control plane
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Dec 19, 2023
1 parent 2e08c05 commit 2c23aa6
Showing 1 changed file with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub fn solve(

fn remove_extraneous_shards(problem: &SchedulingProblem, solution: &mut SchedulingSolution) {
let mut num_shards_per_source: Vec<u32> = vec![0; problem.num_sources()];
for indexer_assignment in &mut solution.indexer_assignments {
for indexer_assignment in &solution.indexer_assignments {
if let Some((&source_ord, _)) = indexer_assignment.num_shards_per_source.last_key_value() {
assert!(source_ord < problem.num_sources() as SourceOrd);
}
Expand All @@ -81,8 +81,10 @@ fn remove_extraneous_shards(problem: &SchedulingProblem, solution: &mut Scheduli

let mut nodes_with_source: HashMap<SourceOrd, Vec<IndexerOrd>> = HashMap::default();
for (node_id, indexer_assignment) in solution.indexer_assignments.iter().enumerate() {
for &source in indexer_assignment.num_shards_per_source.keys() {
nodes_with_source.entry(source).or_default().push(node_id);
for (&source, &num_shards) in &indexer_assignment.num_shards_per_source {
if num_shards > 0 {
nodes_with_source.entry(source).or_default().push(node_id);
}
}
}

Expand Down Expand Up @@ -292,6 +294,9 @@ fn compute_unassigned_sources(
.collect();
for indexer_assignment in &solution.indexer_assignments {
for (&source_ord, &num_shards) in &indexer_assignment.num_shards_per_source {
if num_shards == 0 {
continue;
}
let Entry::Occupied(mut entry) = unassigned_sources.entry(source_ord) else {
panic!("The solution contains more shards than the actual problem.");
};
Expand Down Expand Up @@ -603,10 +608,8 @@ mod tests {
let mut solution = SchedulingSolution::with_num_indexers(num_nodes);
for (node_id, indexer_assignment) in indexer_assignments.iter().enumerate() {
for (source_ord, num_shards) in indexer_assignment.iter().copied().enumerate() {
if num_shards > 0 {
solution.indexer_assignments[node_id]
.add_shards(source_ord as u32, num_shards);
}
solution.indexer_assignments[node_id]
.add_shards(source_ord as u32, num_shards);
}
}
solution
Expand All @@ -624,6 +627,23 @@ mod tests {
})
}

#[test]
fn test_problem_leading_to_zero_shard() {
let mut problem = SchedulingProblem::with_indexer_cpu_capacities(
vec![CpuCapacity::from_cpu_millis(0),
CpuCapacity::from_cpu_millis(0),
]);
problem.add_source(0, NonZeroU32::new(1).unwrap());
let mut previous_solution = problem.new_solution();
previous_solution.indexer_assignments[0].add_shards(0, 0);
previous_solution.indexer_assignments[1].add_shards(0, 0);
let (solution, still_unassigned) = solve(&problem, previous_solution);
assert_eq!(solution.indexer_assignments[0].num_shards(0), 0);
assert_eq!(solution.indexer_assignments[1].num_shards(0), 0);
assert!(still_unassigned.is_empty());

}

proptest! {
#[test]
fn test_proptest_post_conditions((problem, solution) in problem_solution_strategy()) {
Expand Down

0 comments on commit 2c23aa6

Please sign in to comment.