diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 25185da878e..dc725d4e6a4 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -17,7 +17,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::{BTreeSet, HashMap}; +use std::collections::btree_map::Entry; +use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fmt; use std::future::Future; use std::sync::Arc; @@ -26,7 +27,6 @@ use std::time::Duration; use fnv::FnvHashSet; use futures::stream::FuturesUnordered; use futures::StreamExt; -use itertools::Itertools; use quickwit_actors::Mailbox; use quickwit_common::pretty::PrettySample; use quickwit_common::Progress; @@ -49,6 +49,9 @@ use quickwit_proto::metastore::{ OpenShardsRequest, OpenShardsResponse, }; use quickwit_proto::types::{IndexUid, NodeId, Position, ShardId, SourceUid}; +use rand::rngs::ThreadRng; +use rand::seq::SliceRandom; +use rand::{thread_rng, Rng, RngCore}; use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, OwnedMutexGuard}; use tokio::task::JoinHandle; @@ -91,6 +94,105 @@ fn fire_and_forget( }); } +// Returns a random position of the els `slice`, such that the element in this array is NOT +// `except_el`. +fn pick_position(els: &[&str], except_el_opt: Option<&str>, rng: &mut ThreadRng) -> Option { + let except_pos_opt = + except_el_opt.and_then(|except_el| els.iter().position(|el| *el == except_el)); + if let Some(expect_pos) = except_pos_opt { + let pos = rng.gen_range(0..els.len() - 1); + if pos >= expect_pos { + Some(pos + 1) + } else { + Some(pos) + } + } else { + Some(rng.gen_range(0..els.len())) + } +} + +/// Pick a node from the `shard_count_to_node_ids` that is different from `except_node_opt`. +/// We pick in priority nodes with the least number of shards, and we break any tie randomly. +/// +/// Once a node has been found, we update the `shard_count_to_node_ids` to reflect the new state. +/// In particular, the ingester node is moved from its previous shard_count level to its new +/// shard_count level. In particular, a shard_count entry that is empty should be removed from the +/// BTreeMap. +fn pick_one<'a>( + shard_count_to_node_ids: &mut BTreeMap>, + except_node_opt: Option<&'a str>, + rng: &mut ThreadRng, +) -> Option<&'a str> { + let (&shard_count, _) = shard_count_to_node_ids.iter().find(|(_, node_ids)| { + let Some(except_node) = except_node_opt else { + return true; + }; + if node_ids.len() >= 2 { + return true; + } + let Some(&single_node_id) = node_ids.first() else { + return false; + }; + single_node_id != except_node + })?; + let mut shard_entry = shard_count_to_node_ids.entry(shard_count); + let Entry::Occupied(occupied_shard_entry) = &mut shard_entry else { + panic!(); + }; + let nodes = occupied_shard_entry.get_mut(); + let position = pick_position(nodes, except_node_opt, rng)?; + + let node_id = nodes.swap_remove(position); + let new_shard_count = shard_count + 1; + let should_remove_entry = nodes.is_empty(); + + if should_remove_entry { + shard_count_to_node_ids.remove(&shard_count); + } + shard_count_to_node_ids + .entry(new_shard_count) + .or_default() + .push(node_id); + Some(node_id) +} + +/// Pick two ingester nodes from `shard_count_to_node_ids` different one from each other. +/// Ingesters with the lower number of shards are preferred. +fn pick_two<'a>( + shard_count_to_node_ids: &mut BTreeMap>, + rng: &mut ThreadRng, +) -> Option<(&'a str, &'a str)> { + let leader = pick_one(shard_count_to_node_ids, None, rng)?; + let follower = pick_one(shard_count_to_node_ids, Some(leader), rng)?; + Some((leader, follower)) +} + +fn allocate_shards<'a>( + node_id_shard_counts: &HashMap<&'a str, usize>, + num_shards: usize, + replication_enabled: bool, +) -> Option)>> { + let mut shard_count_to_node_ids: BTreeMap> = BTreeMap::default(); + for (&node_id, &num_shards) in node_id_shard_counts { + shard_count_to_node_ids + .entry(num_shards) + .or_default() + .push(node_id); + } + let mut rng = thread_rng(); + let mut shard_allocations: Vec<(&str, Option<&str>)> = Vec::with_capacity(num_shards); + for _ in 0..num_shards { + if replication_enabled { + let (leader, follower) = pick_two(&mut shard_count_to_node_ids, &mut rng)?; + shard_allocations.push((leader, Some(follower))); + } else { + let leader = pick_one(&mut shard_count_to_node_ids, None, &mut rng)?; + shard_allocations.push((leader, None)); + } + } + Some(shard_allocations) +} + #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] pub struct IngestControllerStats { pub num_rebalance_shards_ops: usize, @@ -424,12 +526,11 @@ impl IngestController { unavailable_leaders: &FnvHashSet, model: &ControlPlaneModel, ) -> Option)>> { - let ingesters: Vec = self + let ingesters: HashSet = self .ingester_pool .keys() .into_iter() .filter(|ingester| !unavailable_leaders.contains(ingester)) - .sorted_by(|left, right| left.cmp(right)) .collect(); let num_ingesters = ingesters.len(); @@ -444,67 +545,45 @@ impl IngestController { ); return None; } - let mut leader_follower_pairs = Vec::with_capacity(num_shards_to_allocate); - let mut num_open_shards: usize = 0; - let mut per_leader_num_open_shards: HashMap<&str, usize> = - HashMap::with_capacity(num_ingesters); + // Count of open shards per available ingester node (including the ingester with 0 open + // shards). + let mut per_node_num_open_shards: HashMap<&str, usize> = ingesters + .iter() + .map(|ingester| (ingester.as_str(), 0)) + .collect(); for shard in model.all_shards() { if shard.is_open() && !unavailable_leaders.contains(&shard.leader_id) { - num_open_shards += 1; - - *per_leader_num_open_shards - .entry(&shard.leader_id) - .or_default() += 1; - } - } - let mut num_remaining_shards_to_allocate = num_shards_to_allocate; - let num_open_shards_target = num_shards_to_allocate + num_open_shards; - let max_num_shards_to_allocate_per_node = num_open_shards_target / num_ingesters; - - // Allocate at most `max_num_shards_to_allocate_per_node` shards to each ingester. - for (leader_id, follower_id) in ingesters.iter().zip(ingesters.iter().cycle().skip(1)) { - if num_remaining_shards_to_allocate == 0 { - break; - } - let num_open_shards_inner = per_leader_num_open_shards - .get(leader_id.as_str()) - .copied() - .unwrap_or_default(); - - let num_shards_to_allocate_inner = max_num_shards_to_allocate_per_node - .saturating_sub(num_open_shards_inner) - .min(num_remaining_shards_to_allocate); - - for _ in 0..num_shards_to_allocate_inner { - num_remaining_shards_to_allocate -= 1; - - let leader = leader_id.clone(); - let mut follower_opt = None; - - if self.replication_factor > 1 { - follower_opt = Some(follower_id.clone()); + if let Some(shard_count) = + per_node_num_open_shards.get_mut(shard.leader_id.as_str()) + { + *shard_count += 1; + } + if let Some(follower_id) = shard.follower_id.as_ref() { + if let Some(shard_count) = + per_node_num_open_shards.get_mut(follower_id.as_str()) + { + *shard_count += 1; + } } - leader_follower_pairs.push((leader, follower_opt)); } } - // Allocate remaining shards one by one. - for (leader_id, follower_id) in ingesters.iter().zip(ingesters.iter().cycle().skip(1)) { - if num_remaining_shards_to_allocate == 0 { - break; - } - num_remaining_shards_to_allocate -= 1; - let leader = leader_id.clone(); - let mut follower_opt = None; - - if self.replication_factor > 1 { - follower_opt = Some(follower_id.clone()); - } - leader_follower_pairs.push((leader, follower_opt)); - } - Some(leader_follower_pairs) + assert!(self.replication_factor == 1 || self.replication_factor == 2); + let leader_follower_pairs: Vec<(&str, Option<&str>)> = allocate_shards( + &per_node_num_open_shards, + num_shards_to_allocate, + self.replication_factor == 2, + )?; + Some( + leader_follower_pairs + .into_iter() + .map(|(leader_id, follower_id)| { + (NodeId::from(leader_id), follower_id.map(NodeId::from)) + }) + .collect(), + ) } /// Calls init shards on the leaders hosting newly opened shards. @@ -786,6 +865,7 @@ impl IngestController { model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); return Ok(()); }; + info!("scaling down shard {shard_id} from {leader_id}"); let Some(ingester) = self.ingester_pool.get(&leader_id) else { model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); return Ok(()); @@ -922,24 +1002,24 @@ impl IngestController { } } - let num_open_shards_per_leader_target = num_open_shards / num_ingesters; - // We tolerate an ingester with 10% more shards than the average. - // Let's first identify the list of shards we want to "move". let num_open_shards_per_leader_threshold = - (num_open_shards_per_leader_target * 11).div_ceil(10); + (num_open_shards * 11).div_ceil(10 * num_ingesters); let mut shards_to_move: Vec = Vec::new(); + let mut rng = thread_rng(); for open_shards in per_leader_open_shards.values() { - if open_shards.len() <= num_open_shards_per_leader_threshold { - continue; + if let Some(num_shards_to_move) = open_shards + .len() + .checked_sub(num_open_shards_per_leader_threshold) + { + shards_to_move.extend( + open_shards[..] + .choose_multiple(&mut rng, num_shards_to_move) + .map(|shard_entry| shard_entry.shard.clone()), + ); } - shards_to_move.extend( - open_shards[num_open_shards_per_leader_threshold..] - .iter() - .map(|shard_entry| shard_entry.shard.clone()), - ); } shards_to_move @@ -1103,8 +1183,10 @@ pub(crate) struct RebalanceShardsCallback { pub rebalance_guard: OwnedMutexGuard<()>, } -/// Finds the shard with the highest ingestion rate on the ingester with the most number of open -/// shards. If multiple shards have the same ingestion rate, the shard with the lowest (oldest) +/// Finds a shard on the ingester with the highest number of open +/// shards for this source. +/// +/// If multiple shards are hosted on that ingester, the shard with the lowest (oldest) /// shard ID is chosen. fn find_scale_down_candidate( source_uid: &SourceUid, @@ -1118,22 +1200,19 @@ fn find_scale_down_candidate( .entry(&shard.leader_id) .and_modify(|(num_shards, candidate)| { *num_shards += 1; - - if shard - .long_term_ingestion_rate - .cmp(&candidate.long_term_ingestion_rate) - .then_with(|| shard.shard_id.cmp(&candidate.shard_id)) - .is_gt() - { + if shard.shard_id < candidate.shard_id { *candidate = shard; } }) .or_insert((1, shard)); } } + let mut rng = thread_rng(); per_leader_candidates .into_iter() - .min_by_key(|(_leader_id, (num_shards, _shard))| *num_shards) + // We use a random number to break ties... The HashMap is randomly seeded so this is + // should not make much difference, but we might want to be as explicit as possible. + .max_by_key(|(_leader_id, (num_shards, _shard))| (*num_shards, rng.next_u32())) .map(|(leader_id, (_num_shards, shard))| { (leader_id.clone().into(), shard.shard_id().clone()) }) @@ -1147,6 +1226,7 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; + use itertools::Itertools; use quickwit_actors::Universe; use quickwit_common::setup_logging_for_tests; use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT; @@ -1552,6 +1632,9 @@ mod tests { let leader_follower_pairs_opt = controller.allocate_shards(0, &FnvHashSet::default(), &model); + + // We have only one node so with a replication factor of 2, we can't + // find any solution. assert!(leader_follower_pairs_opt.is_none()); ingester_pool.insert("test-ingester-2".into(), IngesterServiceClient::mocked()); @@ -1559,57 +1642,56 @@ mod tests { let leader_follower_pairs = controller .allocate_shards(0, &FnvHashSet::default(), &model) .unwrap(); + + // We tried to allocate 0 shards, so an empty vec makes sense. assert!(leader_follower_pairs.is_empty()); let leader_follower_pairs = controller .allocate_shards(1, &FnvHashSet::default(), &model) .unwrap(); + assert_eq!(leader_follower_pairs.len(), 1); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) - ); + + // The leader follower is picked at random: both ingester have the same number of shards. + if leader_follower_pairs[0].0 == "test-ingester-1" { + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-2")) + ); + } else { + assert_eq!(leader_follower_pairs[0].0, "test-ingester-2"); + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-1")) + ); + } let leader_follower_pairs = controller .allocate_shards(2, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 2); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) - ); - assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); - assert_eq!( - leader_follower_pairs[1].1, - Some(NodeId::from("test-ingester-1")) - ); + for leader_follower_pair in leader_follower_pairs { + if leader_follower_pair.0 == "test-ingester-1" { + assert_eq!( + leader_follower_pair.1, + Some(NodeId::from("test-ingester-2")) + ); + } else { + assert_eq!(leader_follower_pair.0, "test-ingester-2"); + assert_eq!( + leader_follower_pair.1, + Some(NodeId::from("test-ingester-1")) + ); + } + } let leader_follower_pairs = controller .allocate_shards(3, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 3); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) - ); - - assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); - assert_eq!( - leader_follower_pairs[1].1, - Some(NodeId::from("test-ingester-1")) - ); - - assert_eq!(leader_follower_pairs[2].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[2].1, - Some(NodeId::from("test-ingester-2")) - ); - let index_uid = IndexUid::for_test("test-index", 0); + let source_id: SourceId = "test-source".to_string(); let open_shards = vec![Shard { index_uid: Some(index_uid.clone()), @@ -1625,10 +1707,10 @@ mod tests { .allocate_shards(3, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 3); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); + assert_eq!(leader_follower_pairs[0].0, "test-ingester-2"); assert_eq!( leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) + Some(NodeId::from("test-ingester-1")) ); assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); @@ -1667,6 +1749,7 @@ mod tests { .allocate_shards(1, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 1); + // Ingester 1 already has two shards, so ingester 2 is picked as leader assert_eq!(leader_follower_pairs[0].0, "test-ingester-2"); assert_eq!( leader_follower_pairs[0].1, @@ -1678,6 +1761,7 @@ mod tests { let leader_follower_pairs = controller .allocate_shards(4, &unavailable_leaders, &model) .unwrap(); + // Ingester 2 is unavailable. Ingester 1 has open shards. Ingester 3 ends up leader. assert_eq!(leader_follower_pairs.len(), 4); assert_eq!(leader_follower_pairs[0].0, "test-ingester-3"); assert_eq!( @@ -1697,10 +1781,10 @@ mod tests { Some(NodeId::from("test-ingester-1")) ); - assert_eq!(leader_follower_pairs[3].0, "test-ingester-1"); + assert_eq!(leader_follower_pairs[3].0, "test-ingester-3"); assert_eq!( leader_follower_pairs[3].1, - Some(NodeId::from("test-ingester-3")) + Some(NodeId::from("test-ingester-1")) ); } @@ -2152,7 +2236,8 @@ mod tests { assert_eq!(request.shard_pkeys.len(), 1); assert_eq!(request.shard_pkeys[0].index_uid(), &index_uid_clone); assert_eq!(request.shard_pkeys[0].source_id, "test-source"); - assert_eq!(request.shard_pkeys[0].shard_id(), ShardId::from(2)); + // We close the shard with the lowest id first. + assert_eq!(request.shard_pkeys[0].shard_id(), ShardId::from(1)); Err(IngestV2Error::Internal( "failed to close shards".to_string(), @@ -2523,7 +2608,7 @@ mod tests { index_uid: index_uid.clone().into(), source_id: source_id.clone(), shard_id: Some(ShardId::from(3)), - shard_state: ShardState::Closed as i32, + shard_state: ShardState::Closed as i32, //< this one is closed leader_id: "test-ingester-0".to_string(), ..Default::default() }, @@ -2552,6 +2637,7 @@ mod tests { ..Default::default() }, ]; + // That's 3 open shards on indexer-1, 2 open shard and one closed shard on indexer-0.. model.insert_shards(&index_uid, &source_id, shards); let shard_infos = BTreeSet::from_iter([ @@ -2595,8 +2681,10 @@ mod tests { model.update_shards(&source_uid, &shard_infos); let (leader_id, shard_id) = find_scale_down_candidate(&source_uid, &model).unwrap(); - assert_eq!(leader_id, "test-ingester-0"); - assert_eq!(shard_id, ShardId::from(2)); + // We pick ingester 1 has it has more open shard + assert_eq!(leader_id, "test-ingester-1"); + // Out of its shard, we take the one with the one with the lowest shard id. + assert_eq!(shard_id, ShardId::from(4)); } #[tokio::test] @@ -3073,4 +3161,121 @@ mod tests { let callback = &callbacks[0]; assert_eq!(callback.closed_shards.len(), 1); } + + // #[track_caller] + fn test_allocate_shards_aux_aux( + shard_counts_map: &HashMap<&str, usize>, + num_shards: usize, + replication_enabled: bool, + ) { + let shard_allocations_opt = + super::allocate_shards(shard_counts_map, num_shards, replication_enabled); + if num_shards == 0 { + assert_eq!(shard_allocations_opt, Some(Vec::new())); + return; + } + let num_nodes_required = if replication_enabled { 2 } else { 1 }; + if shard_counts_map.len() < num_nodes_required { + assert!(shard_allocations_opt.is_none()); + return; + } + let shard_allocations = shard_allocations_opt.unwrap(); + let mut total_counts: HashMap<&str, usize> = HashMap::default(); + assert_eq!(shard_allocations.len(), num_shards); + if num_shards == 0 { + return; + } + for (leader, follower_opt) in shard_allocations { + assert_eq!(follower_opt.is_some(), replication_enabled); + *total_counts.entry(leader).or_default() += 1; + if let Some(follower) = follower_opt { + *total_counts.entry(follower).or_default() += 1; + assert_ne!(follower, leader); + } + } + for (shard, count) in shard_counts_map { + if let Some(shard_count) = total_counts.get_mut(shard) { + *shard_count += *count; + } + } + let (min, max) = total_counts + .values() + .copied() + .minmax() + .into_option() + .unwrap(); + if !replication_enabled { + // If replication is enabled, we can end up being forced to not spread shards as evenly + // as we would wish. For instance, if there are only two nodes initially + // unbalanced. + assert!(min + 1 >= max); + } else { + let (previous_min, previous_max) = shard_counts_map + .values() + .copied() + .minmax() + .into_option() + .unwrap(); + // The algorithm is supposed to reduce the variance. + // Of course sometimes it is not possible. For instance for 3 nodes that are + // perfectly balanced to begin with, if we as for a single shard. + assert!((previous_max - previous_min).max(1) >= (max - min)); + } + } + + fn test_allocate_shards_aux(shard_counts: &[usize]) { + let mut shard_counts_map: HashMap<&str, usize> = HashMap::new(); + let shards = (0..shard_counts.len()) + .map(|i| format!("shard-{}", i)) + .collect::>(); + for (shard, &shard_count) in shards.iter().zip(shard_counts.iter()) { + shard_counts_map.insert(shard.as_str(), shard_count); + } + for i in 0..10 { + test_allocate_shards_aux_aux(&shard_counts_map, i, false); + test_allocate_shards_aux_aux(&shard_counts_map, i, true); + } + } + + use proptest::prelude::*; + + proptest! { + #[test] + fn test_proptest_allocate_shards(shard_counts in proptest::collection::vec(0..10usize, 0..10usize)) { + test_allocate_shards_aux(&shard_counts); + } + } + + #[test] + fn test_allocate_shards_prop_test() { + test_allocate_shards_aux(&[]); + test_allocate_shards_aux(&[1]); + test_allocate_shards_aux(&[1, 1]); + test_allocate_shards_aux(&[1, 2]); + test_allocate_shards_aux(&[1, 4]); + test_allocate_shards_aux(&[2, 3, 2]); + test_allocate_shards_aux(&[2, 4, 6]); + test_allocate_shards_aux(&[2, 3, 10]); + } + + #[test] + fn test_allocate_shards_prop_test_bug() { + test_allocate_shards_aux(&[7, 7, 7]); + } + + #[test] + fn test_pick_one() { + let mut shard_counts = BTreeMap::default(); + shard_counts.insert(1, vec!["node1", "node2"]); + let mut rng = rand::thread_rng(); + let node = pick_one(&mut shard_counts, Some("node2"), &mut rng).unwrap(); + assert_eq!(node, "node1"); + assert_eq!(shard_counts.len(), 2); + assert_eq!(&shard_counts.get(&1).unwrap()[..], &["node2"]); + assert_eq!(&shard_counts.get(&2).unwrap()[..], &["node1"]); + let node = pick_one(&mut shard_counts, None, &mut rng).unwrap(); + assert_eq!(node, "node2"); + assert_eq!(shard_counts.len(), 1); + assert_eq!(&shard_counts.get(&2).unwrap()[..], &["node1", "node2"]); + } } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 2c8cfcc20a6..73472ce2631 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -69,9 +69,9 @@ fn max_scroll_ttl() -> Duration { *MAX_SCROLL_TTL_LOCK.get_or_init(|| { let split_deletion_grace_period = shared_consts::split_deletion_grace_period(); assert!( - split_deletion_grace_period - >= shared_consts::MINIMUM_DELETION_GRACE_PERIOD, - "The split deletion grace period is too short ({split_deletion_grace_period:?}). This should not happen." + split_deletion_grace_period >= shared_consts::MINIMUM_DELETION_GRACE_PERIOD, + "The split deletion grace period is too short ({split_deletion_grace_period:?}). This \ + should not happen." ); // We remove an extra margin of 2minutes from the split deletion grace period. split_deletion_grace_period - Duration::from_secs(60 * 2)