diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 6f9f8a0bc45..3ffcbee4219 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -5665,6 +5665,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", + "smallvec", "time", "tokio", "tower", diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index ba234f1ccaa..88ece4c70c7 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -215,6 +215,7 @@ serde_qs = { version = "0.12", features = ["warp"] } serde_with = "3.6.0" serde_yaml = "0.9" siphasher = "0.3" +smallvec = "1" sqlx = { version = "0.7", features = [ "migrate", "postgres", diff --git a/quickwit/quickwit-control-plane/Cargo.toml b/quickwit/quickwit-control-plane/Cargo.toml index 4d4e7c07fe1..ffb57b1b732 100644 --- a/quickwit/quickwit-control-plane/Cargo.toml +++ b/quickwit/quickwit-control-plane/Cargo.toml @@ -21,6 +21,7 @@ once_cell = { workspace = true } rand = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +smallvec = { workspace = true } time = { workspace = true } tokio = { workspace = true } tower = { workspace = true } diff --git a/quickwit/quickwit-control-plane/src/indexing_plan.rs b/quickwit/quickwit-control-plane/src/indexing_plan.rs index 7f7b6f309a5..92fa1cd7d45 100644 --- a/quickwit/quickwit-control-plane/src/indexing_plan.rs +++ b/quickwit/quickwit-control-plane/src/indexing_plan.rs @@ -52,6 +52,10 @@ impl PhysicalIndexingPlan { &self.indexing_tasks_per_indexer_id } + pub fn num_indexers(&self) -> usize { + self.indexing_tasks_per_indexer_id.len() + } + /// Returns the hashmap of (indexer ID, indexing tasks). pub fn indexing_tasks_per_indexer_mut(&mut self) -> &mut FnvHashMap> { &mut self.indexing_tasks_per_indexer_id diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs index f429517831a..80dc39886ea 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs @@ -40,7 +40,8 @@ use tracing::{debug, info, warn}; use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::change_tracker::{NotifyChangeOnDrop, RebuildNotifier}; use crate::indexing_scheduler::scheduling::build_physical_indexing_plan; -use crate::model::ControlPlaneModel; +use crate::metrics::ShardLocalityMetrics; +use crate::model::{ControlPlaneModel, ShardLocations}; use crate::{IndexerNodeInfo, IndexerPool}; pub(crate) const MIN_DURATION_BETWEEN_SCHEDULING: Duration = @@ -231,11 +232,16 @@ impl IndexingScheduler { return; }; + let shard_locations = model.shard_locations(); let new_physical_plan = build_physical_indexing_plan( &sources, &indexer_id_to_cpu_capacities, self.state.last_applied_physical_plan.as_ref(), + &shard_locations, ); + let shard_locality_metrics = + get_shard_locality_metrics(&new_physical_plan, &shard_locations); + crate::metrics::CONTROL_PLANE_METRICS.set_shard_locality_metrics(shard_locality_metrics); if let Some(last_applied_plan) = &self.state.last_applied_physical_plan { let plans_diff = get_indexing_plans_diff( last_applied_plan.indexing_tasks_per_indexer(), @@ -370,6 +376,33 @@ impl<'a> IndexingPlansDiff<'a> { } } +fn get_shard_locality_metrics( + physical_plan: &PhysicalIndexingPlan, + shard_locations: &ShardLocations, +) -> ShardLocalityMetrics { + let mut num_local_shards = 0; + let mut num_remote_shards = 0; + for (indexer, tasks) in physical_plan.indexing_tasks_per_indexer() { + for task in tasks { + for shard_id in &task.shard_ids { + if shard_locations + .get_shard_locations(shard_id) + .iter() + .any(|node| node.as_str() == indexer) + { + num_local_shards += 1; + } else { + num_remote_shards += 1; + } + } + } + } + ShardLocalityMetrics { + num_remote_shards, + num_local_shards, + } +} + impl<'a> fmt::Debug for IndexingPlansDiff<'a> { fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { if self.has_same_nodes() && self.has_same_tasks() { @@ -510,6 +543,7 @@ mod tests { use quickwit_proto::types::{IndexUid, PipelineUid, SourceUid}; use super::*; + use crate::model::ShardLocations; #[test] fn test_indexing_plans_diff() { let index_uid = IndexUid::from_str("index-1:11111111111111111111111111").unwrap(); @@ -798,7 +832,9 @@ mod tests { let mut indexer_max_loads = FnvHashMap::default(); indexer_max_loads.insert("indexer1".to_string(), mcpu(3_000)); indexer_max_loads.insert("indexer2".to_string(), mcpu(3_000)); - let physical_plan = build_physical_indexing_plan(&sources[..], &indexer_max_loads, None); + let shard_locations = ShardLocations::default(); + let physical_plan = + build_physical_indexing_plan(&sources[..], &indexer_max_loads, None, &shard_locations); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 2); let indexing_tasks_1 = physical_plan.indexer("indexer1").unwrap(); assert_eq!(indexing_tasks_1.len(), 2); @@ -828,7 +864,8 @@ mod tests { let indexer_id = format!("indexer-{i}"); indexer_max_loads.insert(indexer_id, mcpu(4_000)); } - let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None); + let shard_locations = ShardLocations::default(); + let _physical_indexing_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations); } } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs index 96390b23c8c..c48dfedca25 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/mod.rs @@ -20,6 +20,7 @@ pub mod scheduling_logic; pub mod scheduling_logic_model; +use std::collections::HashMap; use std::num::NonZeroU32; use fnv::{FnvHashMap, FnvHashSet}; @@ -32,6 +33,7 @@ use crate::indexing_plan::PhysicalIndexingPlan; use crate::indexing_scheduler::scheduling::scheduling_logic_model::{ IndexerAssignment, SchedulingProblem, SchedulingSolution, }; +use crate::model::ShardLocations; /// If we have several pipelines below this threshold we /// reduce the number of pipelines. @@ -326,12 +328,6 @@ fn convert_scheduling_solution_to_physical_plan_single_node( tasks } -fn pick_indexer(capacity_per_node: &[(String, u32)]) -> impl Iterator { - capacity_per_node.iter().flat_map(|(node_id, capacity)| { - std::iter::repeat(node_id.as_str()).take(*capacity as usize) - }) -} - /// This function takes a scheduling solution (which abstracts the notion of pipelines, /// and shard ids) and builds a physical plan, attempting to make as little change as possible /// to the existing pipelines. @@ -343,6 +339,7 @@ fn convert_scheduling_solution_to_physical_plan( id_to_ord_map: &IdToOrdMap, sources: &[SourceToSchedule], previous_plan_opt: Option<&PhysicalIndexingPlan>, + shard_locations: &ShardLocations, ) -> PhysicalIndexingPlan { let mut indexer_assignments = solution.indexer_assignments.clone(); let mut new_physical_plan = PhysicalIndexingPlan::with_indexer_ids(&id_to_ord_map.indexer_ids); @@ -375,7 +372,8 @@ fn convert_scheduling_solution_to_physical_plan( }; let source_ord = id_to_ord_map.source_ord(&source.source_uid).unwrap(); let mut scheduled_shards: FnvHashSet = FnvHashSet::default(); - let mut remaining_capacity_per_node: Vec<(String, u32)> = Vec::default(); + let mut remaining_num_shards_per_node: HashMap = + HashMap::with_capacity(new_physical_plan.num_indexers()); for (indexer, indexing_tasks) in new_physical_plan.indexing_tasks_per_indexer_mut() { let indexer_ord = id_to_ord_map.indexer_ord(indexer).unwrap(); let mut num_shards_for_indexer_source: u32 = @@ -398,26 +396,33 @@ fn convert_scheduling_solution_to_physical_plan( num_shards_for_indexer_source -= indexing_task.shard_ids.len() as u32; } } - remaining_capacity_per_node.push((indexer.to_string(), num_shards_for_indexer_source)); + if let Some(num_shards_for_indexer_source_non_zero) = + NonZeroU32::new(num_shards_for_indexer_source) + { + remaining_num_shards_per_node + .insert(indexer.clone(), num_shards_for_indexer_source_non_zero); + } } // Missing shards is an iterator over the shards that are not scheduled into a pipeline yet. - let missing_shards = shard_ids + let missing_shards: Vec = shard_ids .iter() .filter(|shard_id| !scheduled_shards.contains(shard_id)) - .cloned(); + .cloned() + .collect(); // Let's assign the missing shards. - - // TODO that's the logic that has to change. Eventually we want to remove shards that - // were previously allocated and create new shards to replace them. let max_shard_per_pipeline = compute_max_num_shards_per_pipeline(&source.source_type); - for (missing_shard, indexer_str) in - missing_shards.zip(pick_indexer(&remaining_capacity_per_node)) - { + + let shard_to_indexer: HashMap = assign_shards( + missing_shards, + remaining_num_shards_per_node, + shard_locations, + ); + for (shard_id, indexer) in shard_to_indexer { add_shard_to_indexer( - missing_shard, - indexer_str, + shard_id, + indexer, &source.source_uid, max_shard_per_pipeline, &mut new_physical_plan, @@ -430,6 +435,78 @@ fn convert_scheduling_solution_to_physical_plan( new_physical_plan } +/// This function is meant to be called after we have solved the scheduling +/// problem, so we already know the number of shards to be assigned on each indexer node. +/// We now need to precisely where each shard should be assigned. +/// +/// It assigns the missing shards for a given source to the indexers, given: +/// - the total number of shards that are to be scheduled on each nodes +/// - the shard locations +/// +/// This function will assign shards on a node hosting them in priority. +/// +/// The current implementation is a heuristic. +/// In the first pass, we attempt to assign as many shards as possible on the +/// node hosting them. +fn assign_shards( + missing_shards: Vec, + mut remaining_num_shards_per_node: HashMap, + shard_locations: &ShardLocations, +) -> HashMap { + let mut shard_to_indexer: HashMap = + HashMap::with_capacity(missing_shards.len()); + + // In a first pass we first assign as many shards on the their hosting nodes as possible. + let mut remaining_missing_shards: Vec = Vec::new(); + for shard_id in missing_shards { + // As a heuristic, we pick the first node hosting the shard that is available. + let indexer_hosting_shard: Option<(NonZeroU32, &str)> = shard_locations + .get_shard_locations(&shard_id) + .iter() + .map(|node_id| node_id.as_str()) + .flat_map(|node_id| { + let num_shards = remaining_num_shards_per_node.get(node_id)?; + Some((*num_shards, node_id)) + }) + .min_by_key(|(num_shards, _node_id)| *num_shards); + if let Some((_num_shards, indexer)) = indexer_hosting_shard { + decrement_num_shards(indexer, &mut remaining_num_shards_per_node); + shard_to_indexer.insert(shard_id, indexer.to_string()); + } else { + remaining_missing_shards.push(shard_id); + } + } + + for shard_id in remaining_missing_shards { + let indexer = remaining_num_shards_per_node + .keys() + .next() + .expect("failed to assign all shards. please report") + .to_string(); + decrement_num_shards(&indexer, &mut remaining_num_shards_per_node); + shard_to_indexer.insert(shard_id, indexer.to_string()); + } + assert!(remaining_num_shards_per_node.is_empty()); + + shard_to_indexer +} + +fn decrement_num_shards( + node_id: &str, + remaining_num_shards_to_schedule_per_indexers: &mut HashMap, +) { + { + let previous_num_shards = remaining_num_shards_to_schedule_per_indexers + .get_mut(node_id) + .unwrap(); + if let Some(new_num_shards) = NonZeroU32::new(previous_num_shards.get() - 1) { + *previous_num_shards = new_num_shards; + return; + } + } + remaining_num_shards_to_schedule_per_indexers.remove(node_id); +} + // Checks that's the physical solution indeed matches the scheduling solution. fn assert_post_condition_physical_plan_match_solution( physical_plan: &PhysicalIndexingPlan, @@ -446,14 +523,14 @@ fn assert_post_condition_physical_plan_match_solution( fn add_shard_to_indexer( missing_shard: ShardId, - indexer: &str, + indexer: String, source_uid: &SourceUid, max_shard_per_pipeline: NonZeroU32, new_physical_plan: &mut PhysicalIndexingPlan, ) { let indexer_tasks = new_physical_plan .indexing_tasks_per_indexer_mut() - .entry(indexer.to_string()) + .entry(indexer) .or_default(); let indexing_task_opt = indexer_tasks @@ -530,15 +607,63 @@ pub fn build_physical_indexing_plan( sources: &[SourceToSchedule], indexer_id_to_cpu_capacities: &FnvHashMap, previous_plan_opt: Option<&PhysicalIndexingPlan>, + shard_locations: &ShardLocations, ) -> PhysicalIndexingPlan { + // Asserts that the source are valid. + check_sources(sources); + + // We convert our problem into a simplified scheduling problem. + // In this simplified version, nodes and sources are just ids. + // Instead of individual shard ids, we just keep count of shards. + // Similarly, instead of accurate locality, we just keep the number of shards local + // to an indexer. + let (id_to_ord_map, problem) = + convert_to_simplified_problem(indexer_id_to_cpu_capacities, sources, shard_locations); + + // Populate the previous solution, if any. + let mut previous_solution = problem.new_solution(); + if let Some(previous_plan) = previous_plan_opt { + convert_physical_plan_to_solution(previous_plan, &id_to_ord_map, &mut previous_solution); + } + + // Compute the new scheduling solution using a heuristic. + let new_solution = scheduling_logic::solve(problem, previous_solution); + + // Convert the new scheduling solution back to a physical plan. + let new_physical_plan = convert_scheduling_solution_to_physical_plan( + &new_solution, + &id_to_ord_map, + sources, + previous_plan_opt, + shard_locations, + ); + + assert_post_condition_physical_plan_match_solution( + &new_physical_plan, + &new_solution, + &id_to_ord_map, + ); + + new_physical_plan +} + +/// Makes any checks on the sources. +/// Sharded sources are not allowed to have no shards. +fn check_sources(sources: &[SourceToSchedule]) { for source in sources { if let SourceToScheduleType::Sharded { shard_ids, .. } = &source.source_type { assert!(!shard_ids.is_empty()) } } +} +fn convert_to_simplified_problem<'a>( + indexer_id_to_cpu_capacities: &'a FnvHashMap, + sources: &'a [SourceToSchedule], + shard_locations: &ShardLocations, +) -> (IdToOrdMap<'a>, SchedulingProblem) { // Convert our problem to a scheduling problem. - let mut id_to_ord_map = IdToOrdMap::default(); + let mut id_to_ord_map: IdToOrdMap<'a> = IdToOrdMap::default(); // We use a Vec as a `IndexOrd` -> Max load map. let mut indexer_cpu_capacities: Vec = @@ -554,46 +679,36 @@ pub fn build_physical_indexing_plan( for source in sources { if let Some(source_ord) = populate_problem(source, &mut problem) { let registered_source_ord = id_to_ord_map.add_source(source); + if let SourceToScheduleType::Sharded { shard_ids, .. } = &source.source_type { + for shard_id in shard_ids { + for &indexer in shard_locations.get_shard_locations(shard_id) { + let Some(indexer_ord) = id_to_ord_map.indexer_ord(indexer.as_str()) else { + warn!("failed to find indexer ord for indexer {indexer}"); + continue; + }; + problem.inc_affinity(source_ord, indexer_ord); + } + } + } assert_eq!(source_ord, registered_source_ord); } } - - // Populate the previous solution. - let mut previous_solution = problem.new_solution(); - if let Some(previous_plan) = previous_plan_opt { - convert_physical_plan_to_solution(previous_plan, &id_to_ord_map, &mut previous_solution); - } - - // Compute the new scheduling solution - let new_solution = scheduling_logic::solve(problem, previous_solution); - - // Convert the new scheduling solution back to a physical plan. - let new_physical_plan = convert_scheduling_solution_to_physical_plan( - &new_solution, - &id_to_ord_map, - sources, - previous_plan_opt, - ); - - assert_post_condition_physical_plan_match_solution( - &new_physical_plan, - &new_solution, - &id_to_ord_map, - ); - - new_physical_plan + (id_to_ord_map, problem) } #[cfg(test)] mod tests { + use std::collections::{HashMap, HashSet}; use std::num::NonZeroU32; use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; use fnv::FnvHashMap; + use itertools::Itertools; use quickwit_proto::indexing::{mcpu, CpuCapacity, IndexingTask}; - use quickwit_proto::types::{IndexUid, PipelineUid, ShardId, SourceUid}; + use quickwit_proto::types::{IndexUid, NodeId, PipelineUid, ShardId, SourceUid}; + use rand::seq::SliceRandom; use super::{ build_physical_indexing_plan, @@ -601,6 +716,9 @@ mod tests { SourceToScheduleType, }; use crate::indexing_plan::PhysicalIndexingPlan; + use crate::indexing_scheduler::get_shard_locality_metrics; + use crate::indexing_scheduler::scheduling::assign_shards; + use crate::model::ShardLocations; fn source_id() -> SourceUid { static COUNTER: AtomicUsize = AtomicUsize::new(0); @@ -649,10 +767,12 @@ mod tests { let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); indexer_id_to_cpu_capacities.insert(indexer1.clone(), mcpu(16_000)); indexer_id_to_cpu_capacities.insert(indexer2.clone(), mcpu(16_000)); + let shard_locations = ShardLocations::default(); let indexing_plan = build_physical_indexing_plan( &[source_0, source_1, source_2], &indexer_id_to_cpu_capacities, None, + &shard_locations, ); assert_eq!(indexing_plan.indexing_tasks_per_indexer().len(), 2); @@ -670,21 +790,67 @@ mod tests { assert_eq!(node2_plan.len(), 4); assert_eq!(&node2_plan[0].source_id, &source_uid0.source_id); - assert_eq!( - &node2_plan[0].shard_ids, - &[ShardId::from(0), ShardId::from(1), ShardId::from(2)] - ); + + let mut shard_ids: HashSet = HashSet::default(); + let mut shard_lens = Vec::new(); + shard_lens.push(node2_plan[0].shard_ids.len()); + shard_ids.extend(node2_plan[0].shard_ids.iter().cloned()); assert_eq!(&node2_plan[1].source_id, &source_uid0.source_id); - assert_eq!( - &node2_plan[1].shard_ids, - &[ShardId::from(3), ShardId::from(4), ShardId::from(5)] - ); + shard_lens.push(node2_plan[1].shard_ids.len()); + shard_ids.extend(node2_plan[1].shard_ids.iter().cloned()); assert_eq!(&node2_plan[2].source_id, &source_uid0.source_id); + shard_lens.push(node2_plan[2].shard_ids.len()); + shard_ids.extend(node2_plan[2].shard_ids.iter().cloned()); + assert_eq!(shard_ids.len(), 8); + assert_eq!(&node2_plan[3].source_id, &source_uid2.source_id); + shard_lens.sort(); + assert_eq!(&shard_lens[..], &[2, 3, 3]); + } + + #[test] + fn test_build_physical_plan_with_locality() { + let num_indexers = 10; + let num_shards: usize = 1000; + let indexers: Vec = (0..num_indexers) + .map(|indexer_id| NodeId::new(format!("indexer{indexer_id}"))) + .collect(); + let source_uids: Vec = std::iter::repeat_with(source_id).take(1_000).collect(); + let shard_ids: Vec = (0..num_shards as u64).map(ShardId::from).collect(); + let sources: Vec = (0..num_shards) + .map(|i| SourceToSchedule { + source_uid: source_uids[i].clone(), + source_type: SourceToScheduleType::Sharded { + shard_ids: vec![shard_ids[i].clone()], + load_per_shard: NonZeroU32::new(250).unwrap(), + }, + }) + .collect(); + + let mut indexer_id_to_cpu_capacities = FnvHashMap::default(); + for indexer in &indexers { + indexer_id_to_cpu_capacities.insert(indexer.as_str().to_string(), mcpu(16_000)); + } + let mut rng = rand::thread_rng(); + + let mut shard_locations = ShardLocations::default(); + for shard_id in &shard_ids { + let indexer = indexers[..].choose(&mut rng).unwrap(); + shard_locations.add_location(shard_id, indexer); + } + + let plan = build_physical_indexing_plan( + &sources, + &indexer_id_to_cpu_capacities, + None, + &shard_locations, + ); + assert_eq!(plan.indexing_tasks_per_indexer().len(), num_indexers); + let metrics = get_shard_locality_metrics(&plan, &shard_locations); assert_eq!( - &node2_plan[2].shard_ids, - &[ShardId::from(6), ShardId::from(7)] + metrics.num_remote_shards + metrics.num_local_shards, + num_shards ); - assert_eq!(&node2_plan[3].source_id, &source_uid2.source_id); + assert!(metrics.num_remote_shards < 10); } #[tokio::test] @@ -701,10 +867,12 @@ mod tests { let indexer1 = "indexer1".to_string(); let mut indexer_max_loads = FnvHashMap::default(); + let shard_locations = ShardLocations::default(); { indexer_max_loads.insert(indexer1.clone(), mcpu(1_999)); // This test what happens when there isn't enough capacity on the cluster. - let physical_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None); + let physical_plan = + build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1); let expected_tasks = physical_plan.indexer(&indexer1).unwrap(); assert_eq!(expected_tasks.len(), 2); @@ -713,7 +881,8 @@ mod tests { { indexer_max_loads.insert(indexer1.clone(), mcpu(2_000)); // This test what happens when there isn't enough capacity on the cluster. - let physical_plan = build_physical_indexing_plan(&sources, &indexer_max_loads, None); + let physical_plan = + build_physical_indexing_plan(&sources, &indexer_max_loads, None, &shard_locations); assert_eq!(physical_plan.indexing_tasks_per_indexer().len(), 1); let expected_tasks = physical_plan.indexer(&indexer1).unwrap(); assert_eq!(expected_tasks.len(), 2); @@ -775,10 +944,12 @@ mod tests { for indexing_task in indexing_tasks { indexing_plan.add_indexing_task("node1", indexing_task); } + let shard_locations = ShardLocations::default(); let new_plan = build_physical_indexing_plan( &sources, &indexer_id_to_cpu_capacities, Some(&indexing_plan), + &shard_locations, ); let indexing_tasks = new_plan.indexer("node1").unwrap(); assert_eq!(indexing_tasks.len(), 2); @@ -813,12 +984,17 @@ mod tests { for indexing_task in indexing_tasks { indexing_plan.add_indexing_task(NODE, indexing_task); } + let shard_locations = ShardLocations::default(); let new_plan = build_physical_indexing_plan( &sources, &indexer_id_to_cpu_capacities, Some(&indexing_plan), + &shard_locations, ); let mut indexing_tasks = new_plan.indexer(NODE).unwrap().to_vec(); + for indexing_task in &mut indexing_tasks { + indexing_task.shard_ids.sort(); + } // We sort indexing tasks for normalization purpose indexing_tasks.sort_by_key(|task| task.shard_ids[0].clone()); indexing_tasks @@ -842,23 +1018,12 @@ mod tests { mcpu(400), ); assert_eq!(indexing_tasks_1.len(), 2); - assert_eq!( - &indexing_tasks_1[0].shard_ids, - &[ - ShardId::from(0), - ShardId::from(1), - ShardId::from(2), - ShardId::from(3), - ShardId::from(4), - ShardId::from(5), - ShardId::from(6), - ShardId::from(7) - ] - ); - assert_eq!( - &indexing_tasks_1[1].shard_ids, - &[ShardId::from(8), ShardId::from(9), ShardId::from(10)] - ); + let indexing_tasks_len_1: Vec = indexing_tasks_1 + .iter() + .map(|task| task.shard_ids.len()) + .sorted() + .collect(); + assert_eq!(&indexing_tasks_len_1, &[3, 8]); let pipeline_tasks1: Vec<(PipelineUid, &[ShardId])> = indexing_tasks_1 .iter() @@ -873,27 +1038,12 @@ mod tests { mcpu(600), ); assert_eq!(indexing_tasks_2.len(), 3); - assert_eq!( - &indexing_tasks_2[0].shard_ids, - &[ - ShardId::from(0), - ShardId::from(1), - ShardId::from(2), - ShardId::from(3), - ShardId::from(4) - ] - ); - assert_eq!( - &indexing_tasks_2[1].shard_ids, - &[ - ShardId::from(5), - ShardId::from(6), - ShardId::from(8), - ShardId::from(9), - ShardId::from(10) - ] - ); - assert_eq!(&indexing_tasks_2[2].shard_ids, &[ShardId::from(7)]); + let indexing_tasks_len_2: Vec = indexing_tasks_2 + .iter() + .map(|task| task.shard_ids.len()) + .sorted() + .collect(); + assert_eq!(&indexing_tasks_len_2, &[1, 5, 5]); // Now the load comes back to normal // The hysteresis takes effect. We do not switch back to 2 pipelines. @@ -922,27 +1072,12 @@ mod tests { mcpu(200), ); assert_eq!(indexing_tasks_4.len(), 2); - assert_eq!( - &indexing_tasks_4[0].shard_ids, - &[ - ShardId::from(0), - ShardId::from(1), - ShardId::from(2), - ShardId::from(3), - ShardId::from(4), - ShardId::from(7) - ] - ); - assert_eq!( - &indexing_tasks_4[1].shard_ids, - &[ - ShardId::from(5), - ShardId::from(6), - ShardId::from(8), - ShardId::from(9), - ShardId::from(10) - ] - ); + let indexing_tasks_len_4: Vec = indexing_tasks_4 + .iter() + .map(|task| task.shard_ids.len()) + .sorted() + .collect(); + assert_eq!(&indexing_tasks_len_4, &[5, 6]); } /// We want to make sure for small pipelines, we still reschedule them with the same @@ -964,17 +1099,49 @@ mod tests { } #[test] - fn test_pick_indexer_for_shard() { - let indexer_capacity = vec![ - ("node1".to_string(), 1), - ("node2".to_string(), 0), - ("node3".to_string(), 2), - ("node4".to_string(), 2), - ("node5".to_string(), 0), - ("node6".to_string(), 0), + fn test_assign_missing_shards() { + let shard0 = ShardId::from(0); + let shard1 = ShardId::from(1); + let shard2 = ShardId::from(2); + let shard3 = ShardId::from(3); + + let missing_shards = vec![ + shard0.clone(), + shard1.clone(), + shard2.clone(), + shard3.clone(), ]; - let indexers: Vec<&str> = super::pick_indexer(&indexer_capacity).collect(); - assert_eq!(indexers, &["node1", "node3", "node3", "node4", "node4"]); + let node1 = NodeId::new("node1".to_string()); + let node2 = NodeId::new("node2".to_string()); + // This node is missing from the capacity map. + // It should not be assigned any task despite being pressent in shard locations. + let node_missing = NodeId::new("node_missing".to_string()); + let mut remaining_num_shards_per_node = HashMap::default(); + remaining_num_shards_per_node + .insert(node1.as_str().to_string(), NonZeroU32::new(3).unwrap()); + remaining_num_shards_per_node + .insert(node2.as_str().to_string(), NonZeroU32::new(1).unwrap()); + + let mut shard_locations: ShardLocations = ShardLocations::default(); + // shard1 on 1 + shard_locations.add_location(&shard1, &node1); + // shard2 on 2 + shard_locations.add_location(&shard2, &node2); + // shard3 on both 1 and 2 + shard_locations.add_location(&shard3, &node1); + shard_locations.add_location(&shard3, &node2); + shard_locations.add_location(&shard0, &node_missing); + + let shard_to_indexer = assign_shards( + missing_shards, + remaining_num_shards_per_node, + &shard_locations, + ); + assert_eq!(shard_to_indexer.len(), 4); + assert_eq!(shard_to_indexer.get(&shard1).unwrap(), "node1"); + assert_eq!(shard_to_indexer.get(&shard2).unwrap(), "node2"); + assert_eq!(shard_to_indexer.get(&shard3).unwrap(), "node1"); + assert_eq!(shard_to_indexer.get(&shard0).unwrap(), "node1"); } #[test] @@ -1004,7 +1171,8 @@ mod tests { ]; let mut capacities = FnvHashMap::default(); capacities.insert("indexer-1".to_string(), CpuCapacity::from_cpu_millis(8000)); - build_physical_indexing_plan(&sources_to_schedule, &capacities, None); + let shard_locations = ShardLocations::default(); + build_physical_indexing_plan(&sources_to_schedule, &capacities, None, &shard_locations); } #[test] diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs index cc5dbbd1efb..15feb765b72 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic.rs @@ -19,8 +19,9 @@ use std::cmp::Reverse; use std::collections::btree_map::Entry; -use std::collections::{BTreeMap, BinaryHeap, HashMap}; +use std::collections::BTreeMap; +use itertools::Itertools; use quickwit_proto::indexing::CpuCapacity; use tracing::warn; @@ -44,14 +45,33 @@ pub fn solve( mut problem: SchedulingProblem, previous_solution: SchedulingSolution, ) -> SchedulingSolution { - // We first inflate the indexer capacities to make sure they globally have at least 110% of the - // total problem load. + // We first inflate the indexer capacities to make sure they globally + // have at least 110% of the total problem load. This is done proportionally + // to their original capacity. inflate_node_capacities_if_necessary(&mut problem); + // As a heuristic, to offer stability, we work iteratively + // from the previous solution. let mut solution = previous_solution; + // We first run a few asserts to ensure that the problem is correct. check_contract_conditions(&problem, &solution); + // Due to scale down, or entire removal of sources some shards we might have + // too many shards in the current solution. + // Let's first shave off the extraneous shards. remove_extraneous_shards(&problem, &mut solution); + // Because the load associated to shards can change, some indexers + // may have too much work assigned to them. + // Again, we shave off some shards to make sure they are + // within their capacity. enforce_indexers_cpu_capacity(&problem, &mut solution); - place_unassigned_shards(problem, solution) + // The solution now meets the constraint, but it does not necessarily + // contains all of the shards that we need to assign. + // + // We first assign sources to indexers that have some affinity with them + // (provided they have the capacity.) + place_unassigned_shards_with_affinity(&problem, &mut solution); + // Finally we assign the remaining shards, regardess of whether they have affinity + // or not. + place_unassigned_shards_ignoring_affinity(problem, &solution) } // ------------------------------------------------------------------------- @@ -81,7 +101,7 @@ fn remove_extraneous_shards(problem: &SchedulingProblem, solution: &mut Scheduli }) .collect(); - let mut nodes_with_source: HashMap> = HashMap::default(); + let mut nodes_with_source: BTreeMap> = BTreeMap::default(); for (node_id, indexer_assignment) in solution.indexer_assignments.iter().enumerate() { for (&source, &num_shards) in &indexer_assignment.num_shards_per_source { if num_shards > 0 { @@ -200,12 +220,13 @@ fn attempt_place_unassigned_shards( partial_solution: &SchedulingSolution, ) -> Result { let mut solution = partial_solution.clone(); - let mut indexers_with_most_available_capacity: BinaryHeap<(CpuCapacity, IndexerOrd)> = - compute_indexer_available_capacity(problem, &solution); for source in unassigned_shards { + let indexers_with_most_available_capacity = + compute_indexer_available_capacity(problem, &solution) + .sorted_by_key(|(indexer_ord, capacity)| Reverse((*capacity, *indexer_ord))); place_unassigned_shards_single_source( source, - &mut indexers_with_most_available_capacity, + indexers_with_most_available_capacity, &mut solution, )?; } @@ -213,25 +234,66 @@ fn attempt_place_unassigned_shards( Ok(solution) } +fn place_unassigned_shards_with_affinity( + problem: &SchedulingProblem, + solution: &mut SchedulingSolution, +) { + let mut unassigned_shards: Vec = compute_unassigned_sources(problem, solution); + unassigned_shards.sort_by_key(|source| { + let load = source.num_shards * source.load_per_shard.get(); + Reverse(load) + }); + for source in &unassigned_shards { + // List of indexer with a non-null affinity and some available capacity, sorted by + // (affinity, available capacity) in that order. + let indexers_with_affinity_and_available_capacity = source + .affinities + .iter() + .filter(|&(_, &affinity)| affinity != 0u32) + .map(|(&indexer_ord, affinity)| { + let available_capacity = + solution.indexer_assignments[indexer_ord].indexer_available_capacity(problem); + let capacity = CpuCapacity::from_cpu_millis(available_capacity as u32); + (indexer_ord, affinity, capacity) + }) + .sorted_by_key(|(indexer_ord, affinity, capacity)| { + Reverse((*affinity, *capacity, *indexer_ord)) + }) + .map(|(indexer_ord, _, capacity)| (indexer_ord, capacity)); + let _ = place_unassigned_shards_single_source( + source, + indexers_with_affinity_and_available_capacity, + solution, + ); + } +} + // ---------------------------------------------------- // Phase 3 // Place unassigned sources. // // We use a greedy algorithm as a simple heuristic here. -// We go through the sources in decreasing order of their load. // -// We then try to place as many shards as possible in the node with the -// highest available capacity. +// We go through the sources in decreasing order of their load, +// in two passes. +// +// In the first pase, we have a look at +// the nodes with which there is an affinity. // -// If this algorithm fails to place all remaining shards, we inflate the node capacities by 20% -// in the scheduling problem and start from the beginning. +// If one of them has room for all of the shards, then we assign all +// of the shards to it. +// +// In the second pass, we just put as many shards as possible on the node +// with the highest available capacity. +// +// If this algorithm fails to place all remaining shards, we inflate +// the node capacities by 20% in the scheduling problem and start from the beginning. #[must_use] -fn place_unassigned_shards( +fn place_unassigned_shards_ignoring_affinity( mut problem: SchedulingProblem, - partial_solution: SchedulingSolution, + partial_solution: &SchedulingSolution, ) -> SchedulingSolution { - let mut unassigned_shards: Vec = - compute_unassigned_sources(&problem, &partial_solution); + let mut unassigned_shards: Vec = compute_unassigned_sources(&problem, partial_solution); unassigned_shards.sort_by_key(|source| { let load = source.num_shards * source.load_per_shard.get(); Reverse(load) @@ -244,7 +306,7 @@ fn place_unassigned_shards( // 1.2^30 is about 240. // If we reach 30 attempts we are certain to have a logical bug. for attempt_number in 0..30 { - match attempt_place_unassigned_shards(&unassigned_shards[..], &problem, &partial_solution) { + match attempt_place_unassigned_shards(&unassigned_shards[..], &problem, partial_solution) { Ok(solution) => { if attempt_number != 0 { warn!( @@ -289,29 +351,30 @@ struct NotEnoughCapacity; /// amongst the node with their given node capacity. fn place_unassigned_shards_single_source( source: &Source, - indexer_available_capacities: &mut BinaryHeap<(CpuCapacity, IndexerOrd)>, + mut indexer_with_capacities: impl Iterator, solution: &mut SchedulingSolution, ) -> Result<(), NotEnoughCapacity> { let mut num_shards = source.num_shards; while num_shards > 0 { - let mut node_with_most_capacity = indexer_available_capacities.peek_mut().unwrap(); - let node_id = node_with_most_capacity.1; - let available_capacity = &mut node_with_most_capacity.0; + let Some((indexer_ord, available_capacity)) = indexer_with_capacities.next() else { + return Err(NotEnoughCapacity); + }; let num_placable_shards = available_capacity.cpu_millis() / source.load_per_shard; let num_shards_to_place = num_placable_shards.min(num_shards); - if num_shards_to_place == 0 { - return Err(NotEnoughCapacity); - } - // TODO take in account colocation. // Update the solution, the shard load, and the number of shards to place. - solution.indexer_assignments[node_id].add_shards(source.source_ord, num_shards_to_place); - *available_capacity = *available_capacity - - CpuCapacity::from_cpu_millis(num_shards_to_place * source.load_per_shard.get()); + if num_shards_to_place == 0u32 { + // No need to fill indexer_assignments with empty assignments. + continue; + } + solution.indexer_assignments[indexer_ord] + .add_shards(source.source_ord, num_shards_to_place); num_shards -= num_shards_to_place; } Ok(()) } +/// Compute the sources/shards that have not been assigned to any indexer yet. +/// Affinity are also updated, with the limitation described in `Source`. fn compute_unassigned_sources( problem: &SchedulingProblem, solution: &SchedulingSolution, @@ -320,7 +383,7 @@ fn compute_unassigned_sources( .sources() .map(|source| (source.source_ord as SourceOrd, source)) .collect(); - for indexer_assignment in &solution.indexer_assignments { + for (indexer_ord, indexer_assignment) in solution.indexer_assignments.iter().enumerate() { for (&source_ord, &num_shards) in &indexer_assignment.num_shards_per_source { if num_shards == 0 { continue; @@ -328,8 +391,7 @@ fn compute_unassigned_sources( let Entry::Occupied(mut entry) = unassigned_sources.entry(source_ord) else { panic!("The solution contains more shards than the actual problem."); }; - entry.get_mut().num_shards -= num_shards; - if entry.get().num_shards == 0 { + if !entry.get_mut().remove_shards(indexer_ord, num_shards) { entry.remove(); } } @@ -340,22 +402,23 @@ fn compute_unassigned_sources( /// Builds a BinaryHeap with the different indexer capacities. /// /// Panics if one of the indexer is over-assigned. -fn compute_indexer_available_capacity( - problem: &SchedulingProblem, - solution: &SchedulingSolution, -) -> BinaryHeap<(CpuCapacity, IndexerOrd)> { - let mut indexer_available_capacity: BinaryHeap<(CpuCapacity, IndexerOrd)> = - BinaryHeap::with_capacity(problem.num_indexers()); - for indexer_assignment in &solution.indexer_assignments { - let available_capacity: i32 = indexer_assignment.indexer_available_capacity(problem); - assert!(available_capacity >= 0i32); - indexer_available_capacity.push(( - CpuCapacity::from_cpu_millis(available_capacity as u32), - indexer_assignment.indexer_ord, - )); - } - indexer_available_capacity +fn compute_indexer_available_capacity<'a>( + problem: &'a SchedulingProblem, + solution: &'a SchedulingSolution, +) -> impl Iterator + 'a { + solution + .indexer_assignments + .iter() + .map(|indexer_assignment| { + let available_capacity: i32 = indexer_assignment.indexer_available_capacity(problem); + assert!(available_capacity >= 0i32); + ( + indexer_assignment.indexer_ord, + CpuCapacity::from_cpu_millis(available_capacity as u32), + ) + }) } + #[cfg(test)] mod tests { use std::num::NonZeroU32; @@ -482,7 +545,8 @@ mod tests { Source { source_ord: 0, load_per_shard: NonZeroU32::new(1_000).unwrap(), - num_shards: 4 + num_shards: 4, + affinities: BTreeMap::default(), } ); } @@ -505,7 +569,8 @@ mod tests { Source { source_ord: 0, load_per_shard: NonZeroU32::new(1_000).unwrap(), - num_shards: 5 - (1 + 2) + num_shards: 5 - (1 + 2), + affinities: Default::default(), } ); assert_eq!( @@ -513,7 +578,8 @@ mod tests { Source { source_ord: 1, load_per_shard: NonZeroU32::new(2_000).unwrap(), - num_shards: 15 - (3 + 3) + num_shards: 15 - (3 + 3), + affinities: Default::default(), } ); } @@ -523,10 +589,24 @@ mod tests { let mut problem = SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(4_000)]); problem.add_source(4, NonZeroU32::new(1_000).unwrap()); let partial_solution = problem.new_solution(); - let solution = place_unassigned_shards(problem, partial_solution); + let solution = place_unassigned_shards_ignoring_affinity(problem, &partial_solution); assert_eq!(solution.indexer_assignments[0].num_shards(0), 4); } + #[test] + fn test_place_unassigned_shards_with_affinity() { + let mut problem = + SchedulingProblem::with_indexer_cpu_capacities(vec![mcpu(4_000), mcpu(4000)]); + problem.add_source(4, NonZeroU32::new(1_000).unwrap()); + problem.add_source(4, NonZeroU32::new(1_000).unwrap()); + problem.inc_affinity(0, 1); + problem.inc_affinity(1, 0); + let mut solution = problem.new_solution(); + place_unassigned_shards_with_affinity(&problem, &mut solution); + assert_eq!(solution.indexer_assignments[0].num_shards(1), 4); + assert_eq!(solution.indexer_assignments[1].num_shards(0), 4); + } + #[test] fn test_place_unassigned_shards_reach_capacity() { let mut problem = @@ -548,7 +628,8 @@ mod tests { Source { source_ord: 0, load_per_shard: NonZeroU32::new(1_000).unwrap(), - num_shards: 5 - (1 + 2) + num_shards: 5 - (1 + 2), + affinities: Default::default(), } ); assert_eq!( @@ -556,7 +637,8 @@ mod tests { Source { source_ord: 1, load_per_shard: NonZeroU32::new(2_000).unwrap(), - num_shards: 15 - (3 + 3) + num_shards: 15 - (3 + 3), + affinities: Default::default(), } ); } diff --git a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs index e503edaa45e..89d47fc50b4 100644 --- a/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs +++ b/quickwit/quickwit-control-plane/src/indexing_scheduler/scheduling/scheduling_logic_model.rs @@ -26,13 +26,59 @@ use quickwit_proto::indexing::CpuCapacity; pub type SourceOrd = u32; pub type IndexerOrd = usize; -#[derive(Clone, Copy, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub struct Source { pub source_ord: SourceOrd, pub load_per_shard: NonZeroU32, + /// Affinities of the source for each indexer. + /// In the begginning, affinities are initialized to be the count of shards of the source + /// that are located on the indexer. + /// + /// As we compute unassigned sources, we decrease the affinity by the given number of shards, + /// saturating at 0. + /// + /// As a result we only have the invariant + /// and `affinity(source, indexer) <= num shard of source on indexer` + pub affinities: BTreeMap, pub num_shards: u32, } +impl Source { + // Remove a given number of shards, located on the given indexer. + // Returns `false` if and only if all of the shards have been removed. + // + // This function also decrease the affinity of the source for the given indexer + // by num_shards_to_remove in a saturating way. + // + // # Panics + // + // If the source does have that many total number of shards to begin with. + pub fn remove_shards(&mut self, indexer_ord: usize, num_shards_to_remove: u32) -> bool { + if num_shards_to_remove == 0u32 { + return self.num_shards > 0u32; + } + let entry = self.affinities.entry(indexer_ord); + self.num_shards = self + .num_shards + .checked_sub(num_shards_to_remove) + .expect("removing more shards than available."); + if self.num_shards == 0u32 { + self.affinities.clear(); + return false; + } + if let Entry::Occupied(mut affinity_with_indexer_entry) = entry { + let affinity_with_indexer: &mut u32 = affinity_with_indexer_entry.get_mut(); + let affinity_after_removal = affinity_with_indexer.saturating_sub(num_shards_to_remove); + if affinity_after_removal == 0u32 { + affinity_with_indexer_entry.remove(); + } else { + *affinity_with_indexer = affinity_after_removal; + } + } + true + } +} + #[derive(Debug)] pub struct SchedulingProblem { sources: Vec, @@ -51,6 +97,7 @@ impl SchedulingProblem { assert!(indexer_cpu_capacities .iter() .all(|cpu_capacity| cpu_capacity.cpu_millis() > 0)); + // TODO assert for affinity. SchedulingProblem { sources: Vec::new(), indexer_cpu_capacities, @@ -90,7 +137,7 @@ impl SchedulingProblem { } pub fn sources(&self) -> impl Iterator + '_ { - self.sources.iter().copied() + self.sources.iter().cloned() } pub fn add_source(&mut self, num_shards: u32, load_per_shard: NonZeroU32) -> SourceOrd { @@ -99,10 +146,21 @@ impl SchedulingProblem { source_ord, num_shards, load_per_shard, + affinities: Default::default(), }); source_ord } + /// Increases the affinity source <-> indexer by 1. + /// This is done to record that the indexer is hosting one shard of the source. + pub fn inc_affinity(&mut self, source_ord: SourceOrd, indexer_ord: IndexerOrd) { + let affinity: &mut u32 = self.sources[source_ord as usize] + .affinities + .entry(indexer_ord) + .or_default(); + *affinity += 1; + } + pub fn source_load_per_shard(&self, source_ord: SourceOrd) -> NonZeroU32 { self.sources[source_ord as usize].load_per_shard } @@ -189,3 +247,62 @@ impl SchedulingSolution { self.indexer_assignments.len() } } + +#[cfg(test)] +mod tests { + use super::*; + + fn test_source() -> Source { + let mut affinities: BTreeMap = Default::default(); + affinities.insert(7, 3u32); + affinities.insert(11, 2u32); + Source { + source_ord: 0u32, + load_per_shard: NonZeroU32::new(1000u32).unwrap(), + affinities, + num_shards: 2 + 3, + } + } + + #[test] + fn test_source_remove_simple() { + let mut source = test_source(); + assert!(source.remove_shards(7, 2)); + assert_eq!(source.num_shards, 5 - 2); + assert_eq!(source.affinities.get(&7).copied(), Some(1)); + assert_eq!(source.affinities.get(&11).copied(), Some(2)); + } + + #[test] + fn test_source_remove_all_affinity() { + let mut source = test_source(); + assert!(source.remove_shards(7, 3)); + assert_eq!(source.num_shards, 5 - 3); + assert!(!source.affinities.contains_key(&7)); + assert_eq!(source.affinities.get(&11).copied(), Some(2)); + } + + #[test] + fn test_source_remove_more_than_affinity() { + let mut source = test_source(); + assert!(source.remove_shards(7, 4)); + assert_eq!(source.num_shards, 5 - 4); + assert!(!source.affinities.contains_key(&7)); + assert_eq!(source.affinities.get(&11).copied(), Some(2)); + } + + #[test] + fn test_source_remove_all_shards() { + let mut source = test_source(); + assert!(!source.remove_shards(7, 5)); + assert_eq!(source.num_shards, 0); + assert!(source.affinities.is_empty()); + } + + #[test] + #[should_panic] + fn test_source_remove_more_than_all_shards() { + let mut source = test_source(); + assert!(source.remove_shards(7, 6)); + } +} diff --git a/quickwit/quickwit-control-plane/src/metrics.rs b/quickwit/quickwit-control-plane/src/metrics.rs index 6ebf7db5c21..564576429c3 100644 --- a/quickwit/quickwit-control-plane/src/metrics.rs +++ b/quickwit/quickwit-control-plane/src/metrics.rs @@ -22,6 +22,12 @@ use quickwit_common::metrics::{ new_counter, new_gauge, new_gauge_vec, IntCounter, IntGauge, IntGaugeVec, }; +#[derive(Debug, Clone, Copy)] +pub struct ShardLocalityMetrics { + pub num_remote_shards: usize, + pub num_local_shards: usize, +} + pub struct ControlPlaneMetrics { pub indexes_total: IntGauge, pub restart_total: IntCounter, @@ -29,10 +35,30 @@ pub struct ControlPlaneMetrics { pub metastore_error_aborted: IntCounter, pub metastore_error_maybe_executed: IntCounter, pub open_shards_total: IntGaugeVec<1>, + pub local_shards: IntGauge, + pub remote_shards: IntGauge, +} + +impl ControlPlaneMetrics { + pub fn set_shard_locality_metrics(&self, shard_locality_metrics: ShardLocalityMetrics) { + self.local_shards + .set(shard_locality_metrics.num_local_shards as i64); + self.remote_shards + .set(shard_locality_metrics.num_remote_shards as i64); + } } impl Default for ControlPlaneMetrics { fn default() -> Self { + let shards = new_gauge_vec( + "shards", + "Number of (remote/local) shards in the indexing plan", + "control_plane", + &[], + ["locality"], + ); + let local_shards = shards.with_label_values(["local"]); + let remote_shards = shards.with_label_values(["remote"]); ControlPlaneMetrics { indexes_total: new_gauge("indexes_total", "Number of indexes.", "control_plane", &[]), restart_total: new_counter( @@ -64,6 +90,8 @@ impl Default for ControlPlaneMetrics { &[], ["index_id"], ), + local_shards, + remote_shards, } } } diff --git a/quickwit/quickwit-control-plane/src/model/mod.rs b/quickwit/quickwit-control-plane/src/model/mod.rs index af64cc9c479..4bf3d3ddd8c 100644 --- a/quickwit/quickwit-control-plane/src/model/mod.rs +++ b/quickwit/quickwit-control-plane/src/model/mod.rs @@ -39,7 +39,7 @@ use quickwit_proto::metastore::{ MetastoreError, MetastoreService, MetastoreServiceClient, SourceType, }; use quickwit_proto::types::{IndexId, IndexUid, NodeId, ShardId, SourceId, SourceUid}; -pub(super) use shard_table::{ScalingMode, ShardEntry, ShardStats, ShardTable}; +pub(super) use shard_table::{ScalingMode, ShardEntry, ShardLocations, ShardStats, ShardTable}; use tracing::{info, instrument, warn}; /// The control plane maintains a model in sync with the metastore. @@ -72,6 +72,10 @@ impl ControlPlaneModel { self.shard_table.num_sources() } + pub fn shard_locations(&self) -> ShardLocations { + self.shard_table.shard_locations() + } + #[cfg(test)] pub fn num_shards(&self) -> usize { self.shard_table.num_shards() diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index 06f21354346..4f61f68cbc6 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -18,7 +18,7 @@ // along with this program. If not, see . use std::collections::hash_map::Entry; -use std::collections::BTreeSet; +use std::collections::{BTreeSet, HashMap}; use std::ops::{Deref, DerefMut}; use std::time::Duration; @@ -111,6 +111,31 @@ impl ShardTableEntry { } } +#[derive(Default)] +pub struct ShardLocations<'a> { + shard_locations: HashMap<&'a ShardId, smallvec::SmallVec<[&'a NodeId; 2]>>, +} + +impl<'a> ShardLocations<'a> { + pub(crate) fn add_location(&mut self, shard_id: &'a ShardId, ingester_id: &'a NodeId) { + let locations = self.shard_locations.entry(shard_id).or_default(); + if locations.contains(&ingester_id) { + warn!("shard {shard_id:?} was registered twice the same ingester {ingester_id:?}"); + } else { + locations.push(ingester_id); + } + } + + /// Returns the list of indexer holding the given shard. + /// No guarantee is made on the order of the returned list. + pub fn get_shard_locations(&self, shard_id: &ShardId) -> &[&'a NodeId] { + let Some(node_ids) = self.shard_locations.get(shard_id) else { + return &[]; + }; + node_ids.as_slice() + } +} + // A table that keeps track of the existing shards for each index and source, // and for each ingester, the list of shards it is supposed to host. // @@ -146,6 +171,20 @@ fn remove_shard_from_ingesters_internal( } impl ShardTable { + /// Returns a ShardLocations object that maps each shard to the list of ingesters hosting it. + /// All shards are considered regardless of their state (including unavailable). + pub fn shard_locations(&self) -> ShardLocations { + let mut shard_locations = ShardLocations::default(); + for (ingester_id, source_shards) in &self.ingester_shards { + for shard_ids in source_shards.values() { + for shard_id in shard_ids { + shard_locations.add_location(shard_id, ingester_id); + } + } + } + shard_locations + } + /// Removes all the entries that match the target index ID. pub fn delete_index(&mut self, index_id: &str) { let shards_removed = self @@ -1154,4 +1193,131 @@ mod tests { assert_eq!(new_available_permits, previous_available_permits); } + + #[test] + fn test_shard_locations() { + let shard1 = ShardId::from("shard1"); + let shard2 = ShardId::from("shard1"); + let unlisted_shard = ShardId::from("unlisted"); + let node1 = NodeId::new("node1".to_string()); + let node2 = NodeId::new("node2".to_string()); + let mut shard_locations = ShardLocations::default(); + shard_locations.add_location(&shard1, &node1); + shard_locations.add_location(&shard1, &node2); + // add location called several times should counted once. + shard_locations.add_location(&shard2, &node2); + assert_eq!( + shard_locations.get_shard_locations(&shard1), + &[&node1, &node2] + ); + assert_eq!( + shard_locations.get_shard_locations(&shard2), + &[&node1, &node2] + ); + // If the shard is not listed, we do not panic but just return an empty list. + assert!(shard_locations + .get_shard_locations(&unlisted_shard) + .is_empty()); + } + + #[test] + fn test_shard_table_shard_locations() { + let mut shard_table = ShardTable::default(); + + let index_uid0: IndexUid = IndexUid::for_test("test-index0", 0); + let source_id = "test-source0".to_string(); + shard_table.add_source(&index_uid0, &source_id); + + let index_uid1: IndexUid = IndexUid::for_test("test-index1", 0); + let source_id = "test-source1".to_string(); + shard_table.add_source(&index_uid1, &source_id); + + let source_uid0 = SourceUid { + index_uid: index_uid0.clone(), + source_id: source_id.clone(), + }; + + let source_uid1 = SourceUid { + index_uid: index_uid1.clone(), + source_id: source_id.clone(), + }; + + let make_shard = |source_uid: &SourceUid, + leader_id: &str, + shard_id: u64, + follower_id: Option<&str>, + shard_state: ShardState| { + Shard { + index_uid: source_uid.index_uid.clone().into(), + source_id: source_uid.source_id.clone(), + shard_id: Some(ShardId::from(shard_id)), + leader_id: leader_id.to_string(), + follower_id: follower_id.map(|s| s.to_string()), + shard_state: shard_state as i32, + ..Default::default() + } + }; + + shard_table.insert_shards( + &source_uid0.index_uid, + &source_uid0.source_id, + vec![ + make_shard( + &source_uid0, + "indexer1", + 0, + Some("indexer2"), + ShardState::Open, + ), + make_shard(&source_uid0, "indexer1", 1, None, ShardState::Closed), + make_shard(&source_uid0, "indexer2", 2, None, ShardState::Open), + ], + ); + + shard_table.insert_shards( + &source_uid1.index_uid, + &source_uid1.source_id, + vec![ + make_shard( + &source_uid1, + "indexer2", + 3, + Some("indexer1"), + ShardState::Unavailable, + ), + make_shard( + &source_uid1, + "indexer2", + 3, + Some("indexer1"), + ShardState::Open, + ), + ], + ); + + let shard_locations = shard_table.shard_locations(); + let get_sorted_locations_for_shard = |shard_id: u64| { + let mut locations = shard_locations + .get_shard_locations(&ShardId::from(shard_id)) + .to_vec(); + locations.sort(); + locations + }; + assert_eq!( + &get_sorted_locations_for_shard(0u64), + &[&NodeId::from("indexer1"), &NodeId::from("indexer2")] + ); + assert_eq!( + &get_sorted_locations_for_shard(1u64), + &[&NodeId::from("indexer1")] + ); + assert_eq!( + &get_sorted_locations_for_shard(2u64), + &[&NodeId::from("indexer2")] + ); + assert_eq!( + &get_sorted_locations_for_shard(3u64), + &[&NodeId::from("indexer1"), &NodeId::from("indexer2")] + ); + } } diff --git a/quickwit/quickwit-proto/src/types/shard_id.rs b/quickwit/quickwit-proto/src/types/shard_id.rs index 9d31ce5e77f..525487a4ae9 100644 --- a/quickwit/quickwit-proto/src/types/shard_id.rs +++ b/quickwit/quickwit-proto/src/types/shard_id.rs @@ -26,6 +26,10 @@ use serde::{Deserialize, Serialize}; use ulid::Ulid; /// Shard ID. +/// Shard ID are required to be globally unique. +/// +/// In other words, there cannot be two shards belonging to two different sources +/// with the same shard ID. #[derive(Clone, Debug, Default, Eq, PartialEq, Hash, Ord, PartialOrd)] pub struct ShardId(ByteString);