From 93487f95565ddf4b35563dc8442a55209cf99558 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 20 Feb 2024 15:02:06 +0900 Subject: [PATCH] too many indexers in chitchat --- .github/workflows/publish_docker_images.yml | 2 - .../src/indexing_scheduler/scheduling/mod.rs | 77 ++++++++++++++++++- 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/.github/workflows/publish_docker_images.yml b/.github/workflows/publish_docker_images.yml index 74852e5b47b..bf8acd49d3d 100644 --- a/.github/workflows/publish_docker_images.yml +++ b/.github/workflows/publish_docker_images.yml @@ -20,8 +20,6 @@ jobs: include: - os: ubuntu-latest platform: linux/amd64 - - os: buildjet-4vcpu-ubuntu-2204-arm - platform: linux/arm64 runs-on: ${{ matrix.os }} steps: - name: Checkout diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 0addeff8f56..e4fb04f7929 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -236,6 +236,9 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( .take(max_shard_in_pipeline) .cloned() .collect(); + // if shard_ids.is_empty() { + // continue; + // } remaining_num_shards_to_schedule_on_node -= shard_ids.len() as u32; let new_task = IndexingTask { index_uid: previous_task.index_uid.clone(), @@ -243,6 +246,7 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source( pipeline_uid: previous_task.pipeline_uid, shard_ids, }; + new_tasks.push(new_task); if new_tasks.len() >= max_num_pipelines as usize { break; @@ -420,7 +424,7 @@ fn convert_scheduling_solution_to_physical_plan( indexer_str, &source.source_uid, max_shard_per_pipeline, - &mut new_physical_plan, + ); } } @@ -442,6 +446,21 @@ fn assert_post_condition_physical_plan_match_solution( let mut reconstructed_solution = SchedulingSolution::with_num_indexers(num_indexers); convert_physical_plan_to_solution(physical_plan, id_to_ord_map, &mut reconstructed_solution); assert_eq!(solution, &reconstructed_solution); + + // let's also check that the physical plan does not contain any sharded pipeline with no shards. + for (_, indexing_tasks) in physical_plan.indexing_tasks_per_indexer() { + for indexing_task in indexing_tasks { + let source_uid = SourceUid { + index_uid: indexing_task.index_uid.clone().unwrap(), + source_id: indexing_task.source_id.clone(), + }; + if let Some((_, source)) = id_to_ord_map.source(&source_uid) { + if let SourceToScheduleType::Sharded { shard_ids, .. } = &source.source_type { + assert!(!shard_ids.is_empty()); + } + } + } + } } fn add_shard_to_indexer( @@ -575,6 +594,29 @@ pub fn build_physical_indexing_plan( previous_plan_opt, ); + + // let's also check that the physical plan does not contain any sharded pipeline with no shards. + { + for (_, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer() { + for indexing_task in indexing_tasks { + let source_uid = SourceUid { + index_uid: indexing_task.index_uid.clone().unwrap(), + source_id: indexing_task.source_id.clone(), + }; + if let Some((_, source)) = id_to_ord_map.source(&source_uid) { + if let SourceToScheduleType::Sharded { shard_ids, .. } = &source.source_type { + if shard_ids.is_empty() { + error!(sources=?sources, + previous_plan_opt=?previous_plan_opt, + num_indexers=indexer_id_to_cpu_capacities.len(), + indexing_task=?indexing_task, + "Sharded pipeline with no shards."); + } + } + } + } + } + } assert_post_condition_physical_plan_match_solution( &new_physical_plan, &new_solution, @@ -1080,6 +1122,39 @@ mod tests { } } + // #[test] + // fn test_convert_scheduling_solution_to_physical_plan_single_node_single_source_sharded_pipeline_entirely_removed() { + // let source_uid = SourceUid { + // index_uid: IndexUid::new_with_random_ulid("testindex"), + // source_id: "testsource".to_string(), + // }; + // let previous_task1 = IndexingTask { + // index_uid: Some(source_uid.index_uid.clone()), + // source_id: source_uid.source_id.to_string(), + // pipeline_uid: Some(PipelineUid::new()), + // shard_ids: vec![ShardId::from(1)], + // }; + // { + // let sharded_source = SourceToSchedule { + // source_uid: source_uid.clone(), + // source_type: SourceToScheduleType::Sharded { + // shard_ids: vec![ + // ShardId::from(2), + // ], + // load_per_shard: NonZeroU32::new(1_000).unwrap(), + // }, + // }; + // let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source( + // 4, + // &[&previous_task1], + // &sharded_source, + // ); + // assert_eq!(tasks.len(), 1); + // assert_eq!(tasks[0].index_uid(), &source_uid.index_uid); + // assert_eq!(tasks[0].shard_ids, [ShardId::from(2)]); + // } + // } + #[test] fn test_convert_scheduling_solution_to_physical_plan_single_node_single_source_non_sharded() { let source_uid = SourceUid {