Skip to content

Commit

Permalink
too many indexers in chitchat
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 20, 2024
1 parent 3ef4f76 commit 93487f9
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/publish_docker_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ jobs:
include:
- os: ubuntu-latest
platform: linux/amd64
- os: buildjet-4vcpu-ubuntu-2204-arm
platform: linux/arm64
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,17 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
.take(max_shard_in_pipeline)
.cloned()
.collect();
// if shard_ids.is_empty() {
// continue;
// }
remaining_num_shards_to_schedule_on_node -= shard_ids.len() as u32;
let new_task = IndexingTask {
index_uid: previous_task.index_uid.clone(),
source_id: previous_task.source_id.clone(),
pipeline_uid: previous_task.pipeline_uid,
shard_ids,
};

new_tasks.push(new_task);
if new_tasks.len() >= max_num_pipelines as usize {
break;
Expand Down Expand Up @@ -420,7 +424,7 @@ fn convert_scheduling_solution_to_physical_plan(
indexer_str,
&source.source_uid,
max_shard_per_pipeline,
&mut new_physical_plan,

);
}
}
Expand All @@ -442,6 +446,21 @@ fn assert_post_condition_physical_plan_match_solution(
let mut reconstructed_solution = SchedulingSolution::with_num_indexers(num_indexers);
convert_physical_plan_to_solution(physical_plan, id_to_ord_map, &mut reconstructed_solution);
assert_eq!(solution, &reconstructed_solution);

// let's also check that the physical plan does not contain any sharded pipeline with no shards.
for (_, indexing_tasks) in physical_plan.indexing_tasks_per_indexer() {
for indexing_task in indexing_tasks {
let source_uid = SourceUid {
index_uid: indexing_task.index_uid.clone().unwrap(),
source_id: indexing_task.source_id.clone(),
};
if let Some((_, source)) = id_to_ord_map.source(&source_uid) {
if let SourceToScheduleType::Sharded { shard_ids, .. } = &source.source_type {
assert!(!shard_ids.is_empty());
}
}
}
}
}

fn add_shard_to_indexer(
Expand Down Expand Up @@ -575,6 +594,29 @@ pub fn build_physical_indexing_plan(
previous_plan_opt,
);


// let's also check that the physical plan does not contain any sharded pipeline with no shards.
{
for (_, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() {
for indexing_task in indexing_tasks {
let source_uid = SourceUid {
index_uid: indexing_task.index_uid.clone().unwrap(),
source_id: indexing_task.source_id.clone(),
};
if let Some((_, source)) = id_to_ord_map.source(&source_uid) {
if let SourceToScheduleType::Sharded { shard_ids, .. } = &source.source_type {
if shard_ids.is_empty() {
error!(sources=?sources,
previous_plan_opt=?previous_plan_opt,
num_indexers=indexer_id_to_cpu_capacities.len(),
indexing_task=?indexing_task,
"Sharded pipeline with no shards.");
}
}
}
}
}
}
assert_post_condition_physical_plan_match_solution(
&new_physical_plan,
&new_solution,
Expand Down Expand Up @@ -1080,6 +1122,39 @@ mod tests {
}
}

// #[test]
// fn test_convert_scheduling_solution_to_physical_plan_single_node_single_source_sharded_pipeline_entirely_removed() {
// let source_uid = SourceUid {
// index_uid: IndexUid::new_with_random_ulid("testindex"),
// source_id: "testsource".to_string(),
// };
// let previous_task1 = IndexingTask {
// index_uid: Some(source_uid.index_uid.clone()),
// source_id: source_uid.source_id.to_string(),
// pipeline_uid: Some(PipelineUid::new()),
// shard_ids: vec![ShardId::from(1)],
// };
// {
// let sharded_source = SourceToSchedule {
// source_uid: source_uid.clone(),
// source_type: SourceToScheduleType::Sharded {
// shard_ids: vec![
// ShardId::from(2),
// ],
// load_per_shard: NonZeroU32::new(1_000).unwrap(),
// },
// };
// let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
// 4,
// &[&previous_task1],
// &sharded_source,
// );
// assert_eq!(tasks.len(), 1);
// assert_eq!(tasks[0].index_uid(), &source_uid.index_uid);
// assert_eq!(tasks[0].shard_ids, [ShardId::from(2)]);
// }
// }

#[test]
fn test_convert_scheduling_solution_to_physical_plan_single_node_single_source_non_sharded() {
let source_uid = SourceUid {
Expand Down

0 comments on commit 93487f9

Please sign in to comment.