From 6d7d5f79d40ffd2efcebc376e5f5b4a0c28dceec Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 17 Jul 2024 15:35:48 +0900 Subject: [PATCH] Removing bias from shard allocation. We have noticed after running for a long time, in airmail the distribution of shard count amongst ingester seems uniform, but one or two indexers are getting most of the throughput. This could be caused by an indirect bias in the allocation of shard to ingester. For instance, in airmail, most indexes are very small, but a few of them are much larger. Small indexes have 1 shard with a very low throughput. Large indexes on the other hands have several shards with typically >2MB of throughut. Larger indexes are also more subject to scale up/down, since other indexes tend to stick to having 1 shard (we don't scale down to 0). This PR tries to remove any possible bias when assigning / removing shards in - scale up - scale down - rebalance. Scale up --------------------------- This is the most important change/bias. In presence of a tie, we were picking the ingester with the lowest ingester id. Also, on replication, the logic picking a follower was broken (for a given leader, we were always picking the same follower). The `max_num_shards_to_allocate_per_node` was also wrong (division instead of ceil div) (probably minor). Scale down ---------------------------- The code was relying on the long term ingestion rate, and then ties were solved by the hashmap iterator. The Hashmap iterator is quite random so this was probably not a problem. Rebalance ---------------------------- Arithmetic used to compute the target number of shards was a little bit inaccurate. The shard that are rebalanced are now picked at random (instead of picking the oldest shards in the model). --- .../src/control_plane.rs | 2 + .../src/ingest/ingest_controller.rs | 492 +++++++++++++----- .../src/model/shard_table.rs | 10 +- quickwit/quickwit-proto/src/ingest/mod.rs | 7 +- 4 files changed, 364 insertions(+), 147 deletions(-) diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index e26fe2d021d..26b6a00b7be 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -626,6 +626,7 @@ impl Handler for ControlPlane { .model .list_shards_for_index(&index_uid) .flat_map(|shard_entry| shard_entry.ingesters()) + .map(|node_id_ref| node_id_ref.to_owned()) .collect(); self.model.delete_index(&index_uid); @@ -750,6 +751,7 @@ impl Handler for ControlPlane { shard_entries .values() .flat_map(|shard_entry| shard_entry.ingesters()) + .map(|node_id_ref| node_id_ref.to_owned()) .collect() } else { BTreeSet::new() diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs index 25185da878e..d59fbb1ca58 100644 --- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs +++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs @@ -17,7 +17,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::collections::{BTreeSet, HashMap}; +use std::collections::btree_map::Entry; +use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::fmt; use std::future::Future; use std::sync::Arc; @@ -26,7 +27,6 @@ use std::time::Duration; use fnv::FnvHashSet; use futures::stream::FuturesUnordered; use futures::StreamExt; -use itertools::Itertools; use quickwit_actors::Mailbox; use quickwit_common::pretty::PrettySample; use quickwit_common::Progress; @@ -48,7 +48,10 @@ use quickwit_proto::metastore::{ serde_utils, MetastoreResult, MetastoreService, MetastoreServiceClient, OpenShardSubrequest, OpenShardsRequest, OpenShardsResponse, }; -use quickwit_proto::types::{IndexUid, NodeId, Position, ShardId, SourceUid}; +use quickwit_proto::types::{IndexUid, NodeId, NodeIdRef, Position, ShardId, SourceUid}; +use rand::rngs::ThreadRng; +use rand::seq::SliceRandom; +use rand::{thread_rng, Rng, RngCore}; use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, OwnedMutexGuard}; use tokio::task::JoinHandle; @@ -91,6 +94,110 @@ fn fire_and_forget( }); } +// Returns a random position of the els `slice`, such that the element in this array is NOT +// `except_el`. +fn pick_position( + els: &[&NodeIdRef], + except_el_opt: Option<&NodeIdRef>, + rng: &mut ThreadRng, +) -> Option { + let except_pos_opt = + except_el_opt.and_then(|except_el| els.iter().position(|el| *el == except_el)); + if let Some(expect_pos) = except_pos_opt { + let pos = rng.gen_range(0..els.len() - 1); + if pos >= expect_pos { + Some(pos + 1) + } else { + Some(pos) + } + } else { + Some(rng.gen_range(0..els.len())) + } +} + +/// Pick a node from the `shard_count_to_node_ids` that is different from `except_node_opt`. +/// We pick in priority nodes with the least number of shards, and we break any tie randomly. +/// +/// Once a node has been found, we update the `shard_count_to_node_ids` to reflect the new state. +/// In particular, the ingester node is moved from its previous shard_count level to its new +/// shard_count level. In particular, a shard_count entry that is empty should be removed from the +/// BTreeMap. +fn pick_one<'a>( + shard_count_to_node_ids: &mut BTreeMap>, + except_node_opt: Option<&'a NodeIdRef>, + rng: &mut ThreadRng, +) -> Option<&'a NodeIdRef> { + let (&shard_count, _) = shard_count_to_node_ids.iter().find(|(_, node_ids)| { + let Some(except_node) = except_node_opt else { + return true; + }; + if node_ids.len() >= 2 { + return true; + } + let Some(&single_node_id) = node_ids.first() else { + return false; + }; + single_node_id != except_node + })?; + let mut shard_entry = shard_count_to_node_ids.entry(shard_count); + let Entry::Occupied(occupied_shard_entry) = &mut shard_entry else { + panic!(); + }; + let nodes = occupied_shard_entry.get_mut(); + let position = pick_position(nodes, except_node_opt, rng)?; + + let node_id = nodes.swap_remove(position); + let new_shard_count = shard_count + 1; + let should_remove_entry = nodes.is_empty(); + + if should_remove_entry { + shard_count_to_node_ids.remove(&shard_count); + } + shard_count_to_node_ids + .entry(new_shard_count) + .or_default() + .push(node_id); + Some(node_id) +} + +/// Pick two ingester nodes from `shard_count_to_node_ids` different one from each other. +/// Ingesters with the lower number of shards are preferred. +fn pick_two<'a>( + shard_count_to_node_ids: &mut BTreeMap>, + rng: &mut ThreadRng, +) -> Option<(&'a NodeIdRef, &'a NodeIdRef)> { + let leader = pick_one(shard_count_to_node_ids, None, rng)?; + let follower = pick_one(shard_count_to_node_ids, Some(leader), rng)?; + Some((leader, follower)) +} + +fn allocate_shards<'a>( + node_id_shard_counts: &'a HashMap, + num_shards: usize, + replication_enabled: bool, +) -> Option)>> { + let mut shard_count_to_node_ids: BTreeMap> = BTreeMap::default(); + for (node_id, &num_shards) in node_id_shard_counts { + shard_count_to_node_ids + .entry(num_shards) + .or_default() + .push(node_id.as_ref()); + } + let mut rng = thread_rng(); + let mut shard_allocations: Vec<(&NodeIdRef, Option<&NodeIdRef>)> = + Vec::with_capacity(num_shards); + for _ in 0..num_shards { + if replication_enabled { + let (leader, follower) = pick_two(&mut shard_count_to_node_ids, &mut rng)?; + shard_allocations.push((leader, Some(follower))); + } else { + let leader = pick_one(&mut shard_count_to_node_ids, None, &mut rng)?; + shard_allocations.push((leader, None)); + } + } + Some(shard_allocations) +} + #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] pub struct IngestControllerStats { pub num_rebalance_shards_ops: usize, @@ -424,87 +531,64 @@ impl IngestController { unavailable_leaders: &FnvHashSet, model: &ControlPlaneModel, ) -> Option)>> { - let ingesters: Vec = self + // Count of open shards per available ingester node (including the ingester with 0 open + // shards). + let mut per_node_num_open_shards: HashMap = self .ingester_pool .keys() .into_iter() .filter(|ingester| !unavailable_leaders.contains(ingester)) - .sorted_by(|left, right| left.cmp(right)) + .map(|ingester| (ingester, 0)) .collect(); - let num_ingesters = ingesters.len(); + let num_ingesters = per_node_num_open_shards.len(); if num_ingesters == 0 { warn!("failed to allocate {num_shards_to_allocate} shards: no ingesters available"); return None; - } else if self.replication_factor > num_ingesters { + } + + if self.replication_factor > num_ingesters { warn!( "failed to allocate {num_shards_to_allocate} shards: replication factor is \ greater than the number of available ingesters" ); return None; } - let mut leader_follower_pairs = Vec::with_capacity(num_shards_to_allocate); - - let mut num_open_shards: usize = 0; - let mut per_leader_num_open_shards: HashMap<&str, usize> = - HashMap::with_capacity(num_ingesters); for shard in model.all_shards() { if shard.is_open() && !unavailable_leaders.contains(&shard.leader_id) { - num_open_shards += 1; - - *per_leader_num_open_shards - .entry(&shard.leader_id) - .or_default() += 1; - } - } - let mut num_remaining_shards_to_allocate = num_shards_to_allocate; - let num_open_shards_target = num_shards_to_allocate + num_open_shards; - let max_num_shards_to_allocate_per_node = num_open_shards_target / num_ingesters; - - // Allocate at most `max_num_shards_to_allocate_per_node` shards to each ingester. - for (leader_id, follower_id) in ingesters.iter().zip(ingesters.iter().cycle().skip(1)) { - if num_remaining_shards_to_allocate == 0 { - break; - } - let num_open_shards_inner = per_leader_num_open_shards - .get(leader_id.as_str()) - .copied() - .unwrap_or_default(); - - let num_shards_to_allocate_inner = max_num_shards_to_allocate_per_node - .saturating_sub(num_open_shards_inner) - .min(num_remaining_shards_to_allocate); - - for _ in 0..num_shards_to_allocate_inner { - num_remaining_shards_to_allocate -= 1; - - let leader = leader_id.clone(); - let mut follower_opt = None; - - if self.replication_factor > 1 { - follower_opt = Some(follower_id.clone()); + for ingest_node in shard.ingesters() { + if let Some(shard_count) = + per_node_num_open_shards.get_mut(ingest_node.as_str()) + { + *shard_count += 1; + } else { + // The shard is not present in the `per_node_num_open_shards` map. + // This is normal. It just means an ingester is temporarily unavailable, + // either from the control plane view (not present in the indexer pool, + // because as a result of information from + // chitchat), or because it is in the unavailable + // leaders map. + } } - leader_follower_pairs.push((leader, follower_opt)); } } - // Allocate remaining shards one by one. - for (leader_id, follower_id) in ingesters.iter().zip(ingesters.iter().cycle().skip(1)) { - if num_remaining_shards_to_allocate == 0 { - break; - } - num_remaining_shards_to_allocate -= 1; - - let leader = leader_id.clone(); - let mut follower_opt = None; - if self.replication_factor > 1 { - follower_opt = Some(follower_id.clone()); - } - leader_follower_pairs.push((leader, follower_opt)); - } - Some(leader_follower_pairs) + assert!(self.replication_factor == 1 || self.replication_factor == 2); + let leader_follower_pairs: Vec<(&NodeIdRef, Option<&NodeIdRef>)> = allocate_shards( + &per_node_num_open_shards, + num_shards_to_allocate, + self.replication_factor == 2, + )?; + Some( + leader_follower_pairs + .into_iter() + .map(|(leader_id, follower_id)| { + (leader_id.to_owned(), follower_id.map(NodeIdRef::to_owned)) + }) + .collect(), + ) } /// Calls init shards on the leaders hosting newly opened shards. @@ -786,6 +870,7 @@ impl IngestController { model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); return Ok(()); }; + info!("scaling down shard {shard_id} from {leader_id}"); let Some(ingester) = self.ingester_pool.get(&leader_id) else { model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS); return Ok(()); @@ -922,24 +1007,24 @@ impl IngestController { } } - let num_open_shards_per_leader_target = num_open_shards / num_ingesters; - // We tolerate an ingester with 10% more shards than the average. - // Let's first identify the list of shards we want to "move". let num_open_shards_per_leader_threshold = - (num_open_shards_per_leader_target * 11).div_ceil(10); + (num_open_shards * 11).div_ceil(10 * num_ingesters); let mut shards_to_move: Vec = Vec::new(); + let mut rng = thread_rng(); for open_shards in per_leader_open_shards.values() { - if open_shards.len() <= num_open_shards_per_leader_threshold { - continue; + if let Some(num_shards_to_move) = open_shards + .len() + .checked_sub(num_open_shards_per_leader_threshold) + { + shards_to_move.extend( + open_shards[..] + .choose_multiple(&mut rng, num_shards_to_move) + .map(|shard_entry| shard_entry.shard.clone()), + ); } - shards_to_move.extend( - open_shards[num_open_shards_per_leader_threshold..] - .iter() - .map(|shard_entry| shard_entry.shard.clone()), - ); } shards_to_move @@ -1103,39 +1188,36 @@ pub(crate) struct RebalanceShardsCallback { pub rebalance_guard: OwnedMutexGuard<()>, } -/// Finds the shard with the highest ingestion rate on the ingester with the most number of open -/// shards. If multiple shards have the same ingestion rate, the shard with the lowest (oldest) +/// Finds a shard on the ingester with the highest number of open +/// shards for this source. +/// +/// If multiple shards are hosted on that ingester, the shard with the lowest (oldest) /// shard ID is chosen. fn find_scale_down_candidate( source_uid: &SourceUid, model: &ControlPlaneModel, ) -> Option<(NodeId, ShardId)> { - let mut per_leader_candidates: HashMap<&String, (usize, &ShardEntry)> = HashMap::new(); + let mut per_leader_shard_entries: HashMap<&String, Vec<&ShardEntry>> = HashMap::new(); + let mut rng = thread_rng(); for shard in model.get_shards_for_source(source_uid)?.values() { if shard.is_open() { - per_leader_candidates + per_leader_shard_entries .entry(&shard.leader_id) - .and_modify(|(num_shards, candidate)| { - *num_shards += 1; - - if shard - .long_term_ingestion_rate - .cmp(&candidate.long_term_ingestion_rate) - .then_with(|| shard.shard_id.cmp(&candidate.shard_id)) - .is_gt() - { - *candidate = shard; - } - }) - .or_insert((1, shard)); + .or_default() + .push(shard); } } - per_leader_candidates + per_leader_shard_entries .into_iter() - .min_by_key(|(_leader_id, (num_shards, _shard))| *num_shards) - .map(|(leader_id, (_num_shards, shard))| { - (leader_id.clone().into(), shard.shard_id().clone()) + // We use a random number to break ties... The HashMap is randomly seeded so this is + // should not make much difference, but we might want to be as explicit as possible. + .max_by_key(|(_leader_id, shard_entries)| (shard_entries.len(), rng.next_u32())) + .map(|(leader_id, shard_entries)| { + ( + leader_id.clone().into(), + shard_entries.choose(&mut rng).unwrap().shard_id().clone(), + ) }) } @@ -1147,6 +1229,7 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; + use itertools::Itertools; use quickwit_actors::Universe; use quickwit_common::setup_logging_for_tests; use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT; @@ -1552,6 +1635,9 @@ mod tests { let leader_follower_pairs_opt = controller.allocate_shards(0, &FnvHashSet::default(), &model); + + // We have only one node so with a replication factor of 2, we can't + // find any solution. assert!(leader_follower_pairs_opt.is_none()); ingester_pool.insert("test-ingester-2".into(), IngesterServiceClient::mocked()); @@ -1559,57 +1645,56 @@ mod tests { let leader_follower_pairs = controller .allocate_shards(0, &FnvHashSet::default(), &model) .unwrap(); + + // We tried to allocate 0 shards, so an empty vec makes sense. assert!(leader_follower_pairs.is_empty()); let leader_follower_pairs = controller .allocate_shards(1, &FnvHashSet::default(), &model) .unwrap(); + assert_eq!(leader_follower_pairs.len(), 1); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) - ); + + // The leader follower is picked at random: both ingester have the same number of shards. + if leader_follower_pairs[0].0 == "test-ingester-1" { + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-2")) + ); + } else { + assert_eq!(leader_follower_pairs[0].0, "test-ingester-2"); + assert_eq!( + leader_follower_pairs[0].1, + Some(NodeId::from("test-ingester-1")) + ); + } let leader_follower_pairs = controller .allocate_shards(2, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 2); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) - ); - assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); - assert_eq!( - leader_follower_pairs[1].1, - Some(NodeId::from("test-ingester-1")) - ); + for leader_follower_pair in leader_follower_pairs { + if leader_follower_pair.0 == "test-ingester-1" { + assert_eq!( + leader_follower_pair.1, + Some(NodeId::from("test-ingester-2")) + ); + } else { + assert_eq!(leader_follower_pair.0, "test-ingester-2"); + assert_eq!( + leader_follower_pair.1, + Some(NodeId::from("test-ingester-1")) + ); + } + } let leader_follower_pairs = controller .allocate_shards(3, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 3); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) - ); - - assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); - assert_eq!( - leader_follower_pairs[1].1, - Some(NodeId::from("test-ingester-1")) - ); - - assert_eq!(leader_follower_pairs[2].0, "test-ingester-1"); - assert_eq!( - leader_follower_pairs[2].1, - Some(NodeId::from("test-ingester-2")) - ); - let index_uid = IndexUid::for_test("test-index", 0); + let source_id: SourceId = "test-source".to_string(); let open_shards = vec![Shard { index_uid: Some(index_uid.clone()), @@ -1625,10 +1710,10 @@ mod tests { .allocate_shards(3, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 3); - assert_eq!(leader_follower_pairs[0].0, "test-ingester-1"); + assert_eq!(leader_follower_pairs[0].0, "test-ingester-2"); assert_eq!( leader_follower_pairs[0].1, - Some(NodeId::from("test-ingester-2")) + Some(NodeId::from("test-ingester-1")) ); assert_eq!(leader_follower_pairs[1].0, "test-ingester-2"); @@ -1667,6 +1752,7 @@ mod tests { .allocate_shards(1, &FnvHashSet::default(), &model) .unwrap(); assert_eq!(leader_follower_pairs.len(), 1); + // Ingester 1 already has two shards, so ingester 2 is picked as leader assert_eq!(leader_follower_pairs[0].0, "test-ingester-2"); assert_eq!( leader_follower_pairs[0].1, @@ -1678,6 +1764,7 @@ mod tests { let leader_follower_pairs = controller .allocate_shards(4, &unavailable_leaders, &model) .unwrap(); + // Ingester 2 is unavailable. Ingester 1 has open shards. Ingester 3 ends up leader. assert_eq!(leader_follower_pairs.len(), 4); assert_eq!(leader_follower_pairs[0].0, "test-ingester-3"); assert_eq!( @@ -1697,10 +1784,10 @@ mod tests { Some(NodeId::from("test-ingester-1")) ); - assert_eq!(leader_follower_pairs[3].0, "test-ingester-1"); + assert_eq!(leader_follower_pairs[3].0, "test-ingester-3"); assert_eq!( leader_follower_pairs[3].1, - Some(NodeId::from("test-ingester-3")) + Some(NodeId::from("test-ingester-1")) ); } @@ -2152,8 +2239,6 @@ mod tests { assert_eq!(request.shard_pkeys.len(), 1); assert_eq!(request.shard_pkeys[0].index_uid(), &index_uid_clone); assert_eq!(request.shard_pkeys[0].source_id, "test-source"); - assert_eq!(request.shard_pkeys[0].shard_id(), ShardId::from(2)); - Err(IngestV2Error::Internal( "failed to close shards".to_string(), )) @@ -2523,7 +2608,7 @@ mod tests { index_uid: index_uid.clone().into(), source_id: source_id.clone(), shard_id: Some(ShardId::from(3)), - shard_state: ShardState::Closed as i32, + shard_state: ShardState::Closed as i32, //< this one is closed leader_id: "test-ingester-0".to_string(), ..Default::default() }, @@ -2552,6 +2637,7 @@ mod tests { ..Default::default() }, ]; + // That's 3 open shards on indexer-1, 2 open shard and one closed shard on indexer-0.. model.insert_shards(&index_uid, &source_id, shards); let shard_infos = BTreeSet::from_iter([ @@ -2594,9 +2680,9 @@ mod tests { ]); model.update_shards(&source_uid, &shard_infos); - let (leader_id, shard_id) = find_scale_down_candidate(&source_uid, &model).unwrap(); - assert_eq!(leader_id, "test-ingester-0"); - assert_eq!(shard_id, ShardId::from(2)); + let (leader_id, _shard_id) = find_scale_down_candidate(&source_uid, &model).unwrap(); + // We pick ingester 1 has it has more open shard + assert_eq!(leader_id, "test-ingester-1"); } #[tokio::test] @@ -3073,4 +3159,138 @@ mod tests { let callback = &callbacks[0]; assert_eq!(callback.closed_shards.len(), 1); } + + // #[track_caller] + fn test_allocate_shards_aux_aux( + shard_counts_map: &HashMap, + num_shards: usize, + replication_enabled: bool, + ) { + let shard_allocations_opt = + super::allocate_shards(shard_counts_map, num_shards, replication_enabled); + if num_shards == 0 { + assert_eq!(shard_allocations_opt, Some(Vec::new())); + return; + } + let num_nodes_required = if replication_enabled { 2 } else { 1 }; + if shard_counts_map.len() < num_nodes_required { + assert!(shard_allocations_opt.is_none()); + return; + } + let shard_allocations = shard_allocations_opt.unwrap(); + let mut total_counts: HashMap<&NodeIdRef, usize> = HashMap::default(); + assert_eq!(shard_allocations.len(), num_shards); + if num_shards == 0 { + return; + } + for (leader, follower_opt) in shard_allocations { + assert_eq!(follower_opt.is_some(), replication_enabled); + *total_counts.entry(leader).or_default() += 1; + if let Some(follower) = follower_opt { + *total_counts.entry(follower).or_default() += 1; + assert_ne!(follower, leader); + } + } + for (shard, count) in shard_counts_map { + if let Some(shard_count) = total_counts.get_mut(shard.as_ref()) { + *shard_count += *count; + } + } + let (min, max) = total_counts + .values() + .copied() + .minmax() + .into_option() + .unwrap(); + if !replication_enabled { + // If replication is enabled, we can end up being forced to not spread shards as evenly + // as we would wish. For instance, if there are only two nodes initially + // unbalanced. + assert!(min + 1 >= max); + } else { + let (previous_min, previous_max) = shard_counts_map + .values() + .copied() + .minmax() + .into_option() + .unwrap(); + // The algorithm is supposed to reduce the variance. + // Of course sometimes it is not possible. For instance for 3 nodes that are + // perfectly balanced to begin with, if we as for a single shard. + assert!((previous_max - previous_min).max(1) >= (max - min)); + } + } + + fn test_allocate_shards_aux(shard_counts: &[usize]) { + let mut shard_counts_map: HashMap = HashMap::new(); + let shards: Vec = (0..shard_counts.len()) + .map(|i| format!("shard-{}", i)) + .collect(); + for (shard, &shard_count) in shards.into_iter().zip(shard_counts.iter()) { + shard_counts_map.insert(NodeId::from(shard), shard_count); + } + for i in 0..10 { + test_allocate_shards_aux_aux(&shard_counts_map, i, false); + test_allocate_shards_aux_aux(&shard_counts_map, i, true); + } + } + + use proptest::prelude::*; + + proptest! { + #[test] + fn test_proptest_allocate_shards(shard_counts in proptest::collection::vec(0..10usize, 0..10usize)) { + test_allocate_shards_aux(&shard_counts); + } + } + + #[test] + fn test_allocate_shards_prop_test() { + test_allocate_shards_aux(&[]); + test_allocate_shards_aux(&[1]); + test_allocate_shards_aux(&[1, 1]); + test_allocate_shards_aux(&[1, 2]); + test_allocate_shards_aux(&[1, 4]); + test_allocate_shards_aux(&[2, 3, 2]); + test_allocate_shards_aux(&[2, 4, 6]); + test_allocate_shards_aux(&[2, 3, 10]); + } + + #[test] + fn test_allocate_shards_prop_test_bug() { + test_allocate_shards_aux(&[7, 7, 7]); + } + + #[test] + fn test_pick_one() { + let mut shard_counts = BTreeMap::default(); + shard_counts.insert( + 1, + vec![NodeIdRef::from_str("node1"), NodeIdRef::from_str("node2")], + ); + let mut rng = rand::thread_rng(); + let node = pick_one( + &mut shard_counts, + Some(NodeIdRef::from_str("node2")), + &mut rng, + ) + .unwrap(); + assert_eq!(node.as_str(), "node1"); + assert_eq!(shard_counts.len(), 2); + assert_eq!( + &shard_counts.get(&1).unwrap()[..], + &[NodeIdRef::from_str("node2")] + ); + assert_eq!( + &shard_counts.get(&2).unwrap()[..], + &[NodeIdRef::from_str("node1")] + ); + let node = pick_one(&mut shard_counts, None, &mut rng).unwrap(); + assert_eq!(node.as_str(), "node2"); + assert_eq!(shard_counts.len(), 1); + assert_eq!( + &shard_counts.get(&2).unwrap()[..], + &[NodeIdRef::from_str("node1"), NodeIdRef::from_str("node2")] + ); + } } diff --git a/quickwit/quickwit-control-plane/src/model/shard_table.rs b/quickwit/quickwit-control-plane/src/model/shard_table.rs index f07f93839c1..29c579cddcd 100644 --- a/quickwit/quickwit-control-plane/src/model/shard_table.rs +++ b/quickwit/quickwit-control-plane/src/model/shard_table.rs @@ -159,7 +159,7 @@ fn remove_shard_from_ingesters_internal( ) { for node in shard.ingesters() { let ingester_shards = ingester_shards - .get_mut(&node) + .get_mut(node) .expect("shard table reached inconsistent state"); let shard_ids = ingester_shards.get_mut(source_uid).unwrap(); let shard_was_removed = shard_ids.remove(shard.shard_id()); @@ -231,11 +231,7 @@ impl ShardTable { for shard_id in shard_ids { let shard_table_entry = self.table_entries.get(source_uid).unwrap(); debug_assert!(shard_table_entry.shard_entries.contains_key(shard_id)); - debug_assert!(shard_sets_in_shard_table.remove(&( - node.clone(), - source_uid, - shard_id - ))); + debug_assert!(shard_sets_in_shard_table.remove(&(node, source_uid, shard_id))); } } } @@ -365,7 +361,7 @@ impl ShardTable { } for shard in &opened_shards { for node in shard.ingesters() { - let ingester_shards = self.ingester_shards.entry(node).or_default(); + let ingester_shards = self.ingester_shards.entry(node.to_owned()).or_default(); let shard_ids = ingester_shards.entry(source_uid.clone()).or_default(); shard_ids.insert(shard.shard_id().clone()); } diff --git a/quickwit/quickwit-proto/src/ingest/mod.rs b/quickwit/quickwit-proto/src/ingest/mod.rs index a60be74653d..f83f4f42c86 100644 --- a/quickwit/quickwit-proto/src/ingest/mod.rs +++ b/quickwit/quickwit-proto/src/ingest/mod.rs @@ -25,9 +25,8 @@ use quickwit_common::tower::MakeLoadShedError; use self::ingester::{PersistFailureReason, ReplicateFailureReason}; use self::router::IngestFailureReason; -use super::types::NodeId; use super::GrpcServiceError; -use crate::types::{queue_id, DocUid, Position, QueueId, ShardId, SourceUid}; +use crate::types::{queue_id, DocUid, NodeIdRef, Position, QueueId, ShardId, SourceUid}; use crate::{ServiceError, ServiceErrorCode}; pub mod ingester; @@ -95,11 +94,11 @@ impl MakeLoadShedError for IngestV2Error { impl Shard { /// List of nodes that are storing the shard (the leader, and optionally the follower). - pub fn ingesters(&self) -> impl Iterator + '_ { + pub fn ingesters(&self) -> impl Iterator + '_ { [Some(&self.leader_id), self.follower_id.as_ref()] .into_iter() .flatten() - .map(|node_id| NodeId::new(node_id.clone())) + .map(|node_id| NodeIdRef::from_str(node_id)) } pub fn source_uid(&self) -> SourceUid {