Skip to content

Commit

Permalink
Avoid creating sharded pipelines with no pipelines. (#4393)
Browse files Browse the repository at this point in the history
Added unit tests.
  • Loading branch information
fulmicoton authored Jan 16, 2024
1 parent 24482d9 commit 9981ad4
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ fn get_sources_to_schedule(model: &ControlPlaneModel) -> Vec<SourceToSchedule> {
source_type: SourceToScheduleType::Sharded {
shard_ids,
// FIXME
load_per_shard: NonZeroU32::new(250u32).unwrap(),
load_per_shard: NonZeroU32::new(PIPELINE_FULL_CAPACITY.cpu_millis() / 4)
.unwrap(),
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,25 +197,30 @@ fn compute_max_num_shards_per_pipeline(source_type: &SourceToScheduleType) -> No
}
}

// This converts a scheduling solution for a given node and a given source.
// Major quirk however:
// For sharded function, this function only partially performs this conversion.
// In the resulting function some of the shards may not be allocated.
// The remaining shards will be added in postprocessing pass.
fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
mut remaining_num_shards_to_schedule_on_node: u32,
// Specific to the source.
mut previous_tasks: &[&IndexingTask],
previous_tasks: &[&IndexingTask],
source: &SourceToSchedule,
) -> Vec<IndexingTask> {
match &source.source_type {
SourceToScheduleType::Sharded {
shard_ids,
load_per_shard,
} => {
if remaining_num_shards_to_schedule_on_node == 0 {
return Vec::new();
}
// For the moment we do something voluntarily suboptimal.
let max_num_pipelines = quickwit_common::div_ceil_u32(
shard_ids.len() as u32 * load_per_shard.get(),
remaining_num_shards_to_schedule_on_node * load_per_shard.get(),
CPU_PER_PIPELINE_LOAD_LOWER_THRESHOLD.cpu_millis(),
);
if previous_tasks.len() > max_num_pipelines as usize {
previous_tasks = &previous_tasks[..max_num_pipelines as usize];
}
let max_num_shards_per_pipeline: NonZeroU32 =
compute_max_num_shards_per_pipeline(&source.source_type);
let mut new_tasks = Vec::new();
Expand All @@ -239,6 +244,9 @@ fn convert_scheduling_solution_to_physical_plan_single_node_single_source(
shard_ids,
};
new_tasks.push(new_task);
if new_tasks.len() >= max_num_pipelines as usize {
break;
}
if remaining_num_shards_to_schedule_on_node == 0 {
break;
}
Expand Down Expand Up @@ -586,7 +594,11 @@ mod tests {
use quickwit_proto::indexing::{mcpu, CpuCapacity, IndexingTask};
use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid};

use super::{build_physical_indexing_plan, SourceToSchedule, SourceToScheduleType};
use super::{
build_physical_indexing_plan,
convert_scheduling_solution_to_physical_plan_single_node_single_source, SourceToSchedule,
SourceToScheduleType,
};
use crate::indexing_plan::PhysicalIndexingPlan;

fn source_id() -> SourceUid {
Expand Down Expand Up @@ -905,4 +917,158 @@ mod tests {
capacities.insert("indexer-1".to_string(), CpuCapacity::from_cpu_millis(8000));
build_physical_indexing_plan(&sources_to_schedule, &capacities, None);
}

#[test]
fn test_convert_scheduling_solution_to_physical_plan_single_node_single_source_sharded() {
let source_uid = SourceUid {
index_uid: IndexUid::new_with_random_ulid("testindex"),
source_id: "testsource".to_string(),
};
let previous_task1 = IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(PipelineUid::new()),
shard_ids: vec![1, 4, 5],
};
let previous_task2 = IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(PipelineUid::new()),
shard_ids: vec![6, 7, 8, 9, 10],
};
{
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::Sharded {
shard_ids: vec![1, 2, 4, 6],
load_per_shard: NonZeroU32::new(1_000).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
4,
&[&previous_task1, &previous_task2],
&sharded_source,
);
assert_eq!(tasks.len(), 2);
assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str());
assert_eq!(&tasks[0].shard_ids, &[1, 4]);
assert_eq!(&tasks[1].index_uid, source_uid.index_uid.as_str());
assert_eq!(&tasks[1].shard_ids, &[6]);
}
{
// smaller shards force a merge into a single pipeline
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::Sharded {
shard_ids: vec![1, 2, 4, 6],
load_per_shard: NonZeroU32::new(250).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
4,
&[&previous_task1, &previous_task2],
&sharded_source,
);
assert_eq!(tasks.len(), 1);
assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str());
assert_eq!(&tasks[0].shard_ids, &[1, 4]);
}
}

#[test]
fn test_convert_scheduling_solution_to_physical_plan_single_node_single_source_non_sharded() {
let source_uid = SourceUid {
index_uid: IndexUid::new_with_random_ulid("testindex"),
source_id: "testsource".to_string(),
};
let pipeline_uid1 = PipelineUid::new();
let previous_task1 = IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(pipeline_uid1),
shard_ids: vec![],
};
let pipeline_uid2 = PipelineUid::new();
let previous_task2 = IndexingTask {
index_uid: source_uid.index_uid.to_string(),
source_id: source_uid.source_id.to_string(),
pipeline_uid: Some(pipeline_uid2),
shard_ids: vec![],
};
{
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 1,
load_per_pipeline: NonZeroU32::new(4000).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
1,
&[&previous_task1, &previous_task2],
&sharded_source,
);
assert_eq!(tasks.len(), 1);
assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str());
assert!(&tasks[0].shard_ids.is_empty());
assert_eq!(tasks[0].pipeline_uid.as_ref().unwrap(), &pipeline_uid1);
}
{
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 0,
load_per_pipeline: NonZeroU32::new(1_000).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
0,
&[&previous_task1, &previous_task2],
&sharded_source,
);
assert_eq!(tasks.len(), 0);
}
{
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 2,
load_per_pipeline: NonZeroU32::new(1_000).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
2,
&[&previous_task1, &previous_task2],
&sharded_source,
);
assert_eq!(tasks.len(), 2);
assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str());
assert!(&tasks[0].shard_ids.is_empty());
assert_eq!(tasks[0].pipeline_uid.as_ref().unwrap(), &pipeline_uid1);
assert_eq!(&tasks[1].index_uid, source_uid.index_uid.as_str());
assert!(&tasks[1].shard_ids.is_empty());
assert_eq!(tasks[1].pipeline_uid.as_ref().unwrap(), &pipeline_uid2);
}
{
let sharded_source = SourceToSchedule {
source_uid: source_uid.clone(),
source_type: SourceToScheduleType::NonSharded {
num_pipelines: 2,
load_per_pipeline: NonZeroU32::new(1_000).unwrap(),
},
};
let tasks = convert_scheduling_solution_to_physical_plan_single_node_single_source(
2,
&[&previous_task1],
&sharded_source,
);
assert_eq!(tasks.len(), 2);
assert_eq!(&tasks[0].index_uid, source_uid.index_uid.as_str());
assert!(&tasks[0].shard_ids.is_empty());
assert_eq!(tasks[0].pipeline_uid.as_ref().unwrap(), &pipeline_uid1);
assert_eq!(&tasks[1].index_uid, source_uid.index_uid.as_str());
assert!(&tasks[1].shard_ids.is_empty());
assert_ne!(tasks[1].pipeline_uid.as_ref().unwrap(), &pipeline_uid1);
}
}
}

0 comments on commit 9981ad4

Please sign in to comment.