diff --git a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
index 25185da878e..d24de5b054b 100644
--- a/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
+++ b/quickwit/quickwit-control-plane/src/ingest/ingest_controller.rs
@@ -17,7 +17,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use std::collections::{BTreeSet, HashMap};
+use std::collections::btree_map::Entry;
+use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt;
use std::future::Future;
use std::sync::Arc;
@@ -26,7 +27,6 @@ use std::time::Duration;
use fnv::FnvHashSet;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
-use itertools::Itertools;
use quickwit_actors::Mailbox;
use quickwit_common::pretty::PrettySample;
use quickwit_common::Progress;
@@ -49,6 +49,9 @@ use quickwit_proto::metastore::{
OpenShardsRequest, OpenShardsResponse,
};
use quickwit_proto::types::{IndexUid, NodeId, Position, ShardId, SourceUid};
+use rand::rngs::ThreadRng;
+use rand::seq::SliceRandom;
+use rand::{thread_rng, Rng, RngCore};
use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, OwnedMutexGuard};
use tokio::task::JoinHandle;
@@ -91,6 +94,105 @@ fn fire_and_forget(
});
}
+// Returns a random position of the els `slice`, such that the element in this array is NOT
+// `except_el`.
+fn pick_position(els: &[&str], except_el_opt: Option<&str>, rng: &mut ThreadRng) -> Option {
+ let except_pos_opt =
+ except_el_opt.and_then(|except_el| els.iter().position(|el| *el == except_el));
+ if let Some(expect_pos) = except_pos_opt {
+ let pos = rng.gen_range(0..els.len() - 1);
+ if pos >= expect_pos {
+ Some(pos + 1)
+ } else {
+ Some(pos)
+ }
+ } else {
+ Some(rng.gen_range(0..els.len()))
+ }
+}
+
+/// Pick a node from the `shard_count_to_node_ids` that is different from `except_node_opt`.
+/// We pick in priority nodes with the least number of shards, and we break any tie randomly.
+///
+/// Once a node has been found, we update the `shard_count_to_node_ids` to reflect the new state.
+/// In particular, the ingester node is moved from its previous shard_count level to its new
+/// shard_count level. In particular, a shard_count entry that is empty should be removed from the
+/// BTreeMap.
+fn pick_one<'a>(
+ shard_count_to_node_ids: &mut BTreeMap>,
+ except_node_opt: Option<&'a str>,
+ rng: &mut ThreadRng,
+) -> Option<&'a str> {
+ let (&shard_count, _) = shard_count_to_node_ids.iter().find(|(_, node_ids)| {
+ let Some(except_node) = except_node_opt else {
+ return true;
+ };
+ if node_ids.len() >= 2 {
+ return true;
+ }
+ let Some(&single_node_id) = node_ids.first() else {
+ return false;
+ };
+ single_node_id != except_node
+ })?;
+ let mut shard_entry = shard_count_to_node_ids.entry(shard_count);
+ let Entry::Occupied(occupied_shard_entry) = &mut shard_entry else {
+ panic!();
+ };
+ let nodes = occupied_shard_entry.get_mut();
+ let position = pick_position(nodes, except_node_opt, rng)?;
+
+ let node_id = nodes.swap_remove(position);
+ let new_shard_count = shard_count + 1;
+ let should_remove_entry = nodes.is_empty();
+
+ if should_remove_entry {
+ shard_count_to_node_ids.remove(&shard_count);
+ }
+ shard_count_to_node_ids
+ .entry(new_shard_count)
+ .or_default()
+ .push(node_id);
+ Some(node_id)
+}
+
+/// Pick two ingester nodes from `shard_count_to_node_ids` different one from each other.
+/// Ingesters with the lower number of shards are preferred.
+fn pick_two<'a>(
+ shard_count_to_node_ids: &mut BTreeMap>,
+ rng: &mut ThreadRng,
+) -> Option<(&'a str, &'a str)> {
+ let leader = pick_one(shard_count_to_node_ids, None, rng)?;
+ let follower = pick_one(shard_count_to_node_ids, Some(leader), rng)?;
+ Some((leader, follower))
+}
+
+fn allocate_shards<'a>(
+ node_id_shard_counts: &HashMap<&'a str, usize>,
+ num_shards: usize,
+ replication_enabled: bool,
+) -> Option)>> {
+ let mut shard_count_to_node_ids: BTreeMap> = BTreeMap::default();
+ for (&node_id, &num_shards) in node_id_shard_counts {
+ shard_count_to_node_ids
+ .entry(num_shards)
+ .or_default()
+ .push(node_id);
+ }
+ let mut rng = thread_rng();
+ let mut shard_allocations: Vec<(&str, Option<&str>)> = Vec::with_capacity(num_shards);
+ for _ in 0..num_shards {
+ if replication_enabled {
+ let (leader, follower) = pick_two(&mut shard_count_to_node_ids, &mut rng)?;
+ shard_allocations.push((leader, Some(follower)));
+ } else {
+ let leader = pick_one(&mut shard_count_to_node_ids, None, &mut rng)?;
+ shard_allocations.push((leader, None));
+ }
+ }
+ Some(shard_allocations)
+}
+
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
pub struct IngestControllerStats {
pub num_rebalance_shards_ops: usize,
@@ -424,12 +526,11 @@ impl IngestController {
unavailable_leaders: &FnvHashSet,
model: &ControlPlaneModel,
) -> Option)>> {
- let ingesters: Vec = self
+ let ingesters: HashSet = self
.ingester_pool
.keys()
.into_iter()
.filter(|ingester| !unavailable_leaders.contains(ingester))
- .sorted_by(|left, right| left.cmp(right))
.collect();
let num_ingesters = ingesters.len();
@@ -444,67 +545,45 @@ impl IngestController {
);
return None;
}
- let mut leader_follower_pairs = Vec::with_capacity(num_shards_to_allocate);
- let mut num_open_shards: usize = 0;
- let mut per_leader_num_open_shards: HashMap<&str, usize> =
- HashMap::with_capacity(num_ingesters);
+ // Count of open shards per available ingester node (including the ingester with 0 open
+ // shards).
+ let mut per_node_num_open_shards: HashMap<&str, usize> = ingesters
+ .iter()
+ .map(|ingester| (ingester.as_str(), 0))
+ .collect();
for shard in model.all_shards() {
if shard.is_open() && !unavailable_leaders.contains(&shard.leader_id) {
- num_open_shards += 1;
-
- *per_leader_num_open_shards
- .entry(&shard.leader_id)
- .or_default() += 1;
- }
- }
- let mut num_remaining_shards_to_allocate = num_shards_to_allocate;
- let num_open_shards_target = num_shards_to_allocate + num_open_shards;
- let max_num_shards_to_allocate_per_node = num_open_shards_target / num_ingesters;
-
- // Allocate at most `max_num_shards_to_allocate_per_node` shards to each ingester.
- for (leader_id, follower_id) in ingesters.iter().zip(ingesters.iter().cycle().skip(1)) {
- if num_remaining_shards_to_allocate == 0 {
- break;
- }
- let num_open_shards_inner = per_leader_num_open_shards
- .get(leader_id.as_str())
- .copied()
- .unwrap_or_default();
-
- let num_shards_to_allocate_inner = max_num_shards_to_allocate_per_node
- .saturating_sub(num_open_shards_inner)
- .min(num_remaining_shards_to_allocate);
-
- for _ in 0..num_shards_to_allocate_inner {
- num_remaining_shards_to_allocate -= 1;
-
- let leader = leader_id.clone();
- let mut follower_opt = None;
-
- if self.replication_factor > 1 {
- follower_opt = Some(follower_id.clone());
+ if let Some(shard_count) =
+ per_node_num_open_shards.get_mut(shard.leader_id.as_str())
+ {
+ *shard_count += 1;
+ }
+ if let Some(follower_id) = shard.follower_id.as_ref() {
+ if let Some(shard_count) =
+ per_node_num_open_shards.get_mut(follower_id.as_str())
+ {
+ *shard_count += 1;
+ }
}
- leader_follower_pairs.push((leader, follower_opt));
}
}
- // Allocate remaining shards one by one.
- for (leader_id, follower_id) in ingesters.iter().zip(ingesters.iter().cycle().skip(1)) {
- if num_remaining_shards_to_allocate == 0 {
- break;
- }
- num_remaining_shards_to_allocate -= 1;
- let leader = leader_id.clone();
- let mut follower_opt = None;
-
- if self.replication_factor > 1 {
- follower_opt = Some(follower_id.clone());
- }
- leader_follower_pairs.push((leader, follower_opt));
- }
- Some(leader_follower_pairs)
+ assert!(self.replication_factor == 1 || self.replication_factor == 2);
+ let leader_follower_pairs: Vec<(&str, Option<&str>)> = allocate_shards(
+ &per_node_num_open_shards,
+ num_shards_to_allocate,
+ self.replication_factor == 2,
+ )?;
+ Some(
+ leader_follower_pairs
+ .into_iter()
+ .map(|(leader_id, follower_id)| {
+ (NodeId::from(leader_id), follower_id.map(NodeId::from))
+ })
+ .collect(),
+ )
}
/// Calls init shards on the leaders hosting newly opened shards.
@@ -786,6 +865,7 @@ impl IngestController {
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
return Ok(());
};
+ info!("scaling down shard {shard_id} from {leader_id}");
let Some(ingester) = self.ingester_pool.get(&leader_id) else {
model.release_scaling_permits(&source_uid, ScalingMode::Down, NUM_PERMITS);
return Ok(());
@@ -922,24 +1002,24 @@ impl IngestController {
}
}
- let num_open_shards_per_leader_target = num_open_shards / num_ingesters;
-
// We tolerate an ingester with 10% more shards than the average.
-
// Let's first identify the list of shards we want to "move".
let num_open_shards_per_leader_threshold =
- (num_open_shards_per_leader_target * 11).div_ceil(10);
+ (num_open_shards * 11).div_ceil(10 * num_ingesters);
let mut shards_to_move: Vec = Vec::new();
+ let mut rng = thread_rng();
for open_shards in per_leader_open_shards.values() {
- if open_shards.len() <= num_open_shards_per_leader_threshold {
- continue;
+ if let Some(num_shards_to_move) = open_shards
+ .len()
+ .checked_sub(num_open_shards_per_leader_threshold)
+ {
+ shards_to_move.extend(
+ open_shards[..]
+ .choose_multiple(&mut rng, num_shards_to_move)
+ .map(|shard_entry| shard_entry.shard.clone()),
+ );
}
- shards_to_move.extend(
- open_shards[num_open_shards_per_leader_threshold..]
- .iter()
- .map(|shard_entry| shard_entry.shard.clone()),
- );
}
shards_to_move
@@ -1103,39 +1183,36 @@ pub(crate) struct RebalanceShardsCallback {
pub rebalance_guard: OwnedMutexGuard<()>,
}
-/// Finds the shard with the highest ingestion rate on the ingester with the most number of open
-/// shards. If multiple shards have the same ingestion rate, the shard with the lowest (oldest)
+/// Finds a shard on the ingester with the highest number of open
+/// shards for this source.
+///
+/// If multiple shards are hosted on that ingester, the shard with the lowest (oldest)
/// shard ID is chosen.
fn find_scale_down_candidate(
source_uid: &SourceUid,
model: &ControlPlaneModel,
) -> Option<(NodeId, ShardId)> {
- let mut per_leader_candidates: HashMap<&String, (usize, &ShardEntry)> = HashMap::new();
+ let mut per_leader_shard_entries: HashMap<&String, Vec<&ShardEntry>> = HashMap::new();
+ let mut rng = thread_rng();
for shard in model.get_shards_for_source(source_uid)?.values() {
if shard.is_open() {
- per_leader_candidates
+ per_leader_shard_entries
.entry(&shard.leader_id)
- .and_modify(|(num_shards, candidate)| {
- *num_shards += 1;
-
- if shard
- .long_term_ingestion_rate
- .cmp(&candidate.long_term_ingestion_rate)
- .then_with(|| shard.shard_id.cmp(&candidate.shard_id))
- .is_gt()
- {
- *candidate = shard;
- }
- })
- .or_insert((1, shard));
+ .or_default()
+ .push(shard);
}
}
- per_leader_candidates
+ per_leader_shard_entries
.into_iter()
- .min_by_key(|(_leader_id, (num_shards, _shard))| *num_shards)
- .map(|(leader_id, (_num_shards, shard))| {
- (leader_id.clone().into(), shard.shard_id().clone())
+ // We use a random number to break ties... The HashMap is randomly seeded so this is
+ // should not make much difference, but we might want to be as explicit as possible.
+ .max_by_key(|(_leader_id, shard_entries)| (shard_entries.len(), rng.next_u32()))
+ .map(|(leader_id, shard_entries)| {
+ (
+ leader_id.clone().into(),
+ shard_entries.choose(&mut rng).unwrap().shard_id().clone(),
+ )
})
}
@@ -1147,6 +1224,7 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
+ use itertools::Itertools;
use quickwit_actors::Universe;
use quickwit_common::setup_logging_for_tests;
use quickwit_common::shared_consts::DEFAULT_SHARD_THROUGHPUT_LIMIT;
@@ -1552,6 +1630,9 @@ mod tests {
let leader_follower_pairs_opt =
controller.allocate_shards(0, &FnvHashSet::default(), &model);
+
+ // We have only one node so with a replication factor of 2, we can't
+ // find any solution.
assert!(leader_follower_pairs_opt.is_none());
ingester_pool.insert("test-ingester-2".into(), IngesterServiceClient::mocked());
@@ -1559,57 +1640,56 @@ mod tests {
let leader_follower_pairs = controller
.allocate_shards(0, &FnvHashSet::default(), &model)
.unwrap();
+
+ // We tried to allocate 0 shards, so an empty vec makes sense.
assert!(leader_follower_pairs.is_empty());
let leader_follower_pairs = controller
.allocate_shards(1, &FnvHashSet::default(), &model)
.unwrap();
+
assert_eq!(leader_follower_pairs.len(), 1);
- assert_eq!(leader_follower_pairs[0].0, "test-ingester-1");
- assert_eq!(
- leader_follower_pairs[0].1,
- Some(NodeId::from("test-ingester-2"))
- );
+
+ // The leader follower is picked at random: both ingester have the same number of shards.
+ if leader_follower_pairs[0].0 == "test-ingester-1" {
+ assert_eq!(
+ leader_follower_pairs[0].1,
+ Some(NodeId::from("test-ingester-2"))
+ );
+ } else {
+ assert_eq!(leader_follower_pairs[0].0, "test-ingester-2");
+ assert_eq!(
+ leader_follower_pairs[0].1,
+ Some(NodeId::from("test-ingester-1"))
+ );
+ }
let leader_follower_pairs = controller
.allocate_shards(2, &FnvHashSet::default(), &model)
.unwrap();
assert_eq!(leader_follower_pairs.len(), 2);
- assert_eq!(leader_follower_pairs[0].0, "test-ingester-1");
- assert_eq!(
- leader_follower_pairs[0].1,
- Some(NodeId::from("test-ingester-2"))
- );
- assert_eq!(leader_follower_pairs[1].0, "test-ingester-2");
- assert_eq!(
- leader_follower_pairs[1].1,
- Some(NodeId::from("test-ingester-1"))
- );
+ for leader_follower_pair in leader_follower_pairs {
+ if leader_follower_pair.0 == "test-ingester-1" {
+ assert_eq!(
+ leader_follower_pair.1,
+ Some(NodeId::from("test-ingester-2"))
+ );
+ } else {
+ assert_eq!(leader_follower_pair.0, "test-ingester-2");
+ assert_eq!(
+ leader_follower_pair.1,
+ Some(NodeId::from("test-ingester-1"))
+ );
+ }
+ }
let leader_follower_pairs = controller
.allocate_shards(3, &FnvHashSet::default(), &model)
.unwrap();
assert_eq!(leader_follower_pairs.len(), 3);
- assert_eq!(leader_follower_pairs[0].0, "test-ingester-1");
- assert_eq!(
- leader_follower_pairs[0].1,
- Some(NodeId::from("test-ingester-2"))
- );
-
- assert_eq!(leader_follower_pairs[1].0, "test-ingester-2");
- assert_eq!(
- leader_follower_pairs[1].1,
- Some(NodeId::from("test-ingester-1"))
- );
-
- assert_eq!(leader_follower_pairs[2].0, "test-ingester-1");
- assert_eq!(
- leader_follower_pairs[2].1,
- Some(NodeId::from("test-ingester-2"))
- );
-
let index_uid = IndexUid::for_test("test-index", 0);
+
let source_id: SourceId = "test-source".to_string();
let open_shards = vec![Shard {
index_uid: Some(index_uid.clone()),
@@ -1625,10 +1705,10 @@ mod tests {
.allocate_shards(3, &FnvHashSet::default(), &model)
.unwrap();
assert_eq!(leader_follower_pairs.len(), 3);
- assert_eq!(leader_follower_pairs[0].0, "test-ingester-1");
+ assert_eq!(leader_follower_pairs[0].0, "test-ingester-2");
assert_eq!(
leader_follower_pairs[0].1,
- Some(NodeId::from("test-ingester-2"))
+ Some(NodeId::from("test-ingester-1"))
);
assert_eq!(leader_follower_pairs[1].0, "test-ingester-2");
@@ -1667,6 +1747,7 @@ mod tests {
.allocate_shards(1, &FnvHashSet::default(), &model)
.unwrap();
assert_eq!(leader_follower_pairs.len(), 1);
+ // Ingester 1 already has two shards, so ingester 2 is picked as leader
assert_eq!(leader_follower_pairs[0].0, "test-ingester-2");
assert_eq!(
leader_follower_pairs[0].1,
@@ -1678,6 +1759,7 @@ mod tests {
let leader_follower_pairs = controller
.allocate_shards(4, &unavailable_leaders, &model)
.unwrap();
+ // Ingester 2 is unavailable. Ingester 1 has open shards. Ingester 3 ends up leader.
assert_eq!(leader_follower_pairs.len(), 4);
assert_eq!(leader_follower_pairs[0].0, "test-ingester-3");
assert_eq!(
@@ -1697,10 +1779,10 @@ mod tests {
Some(NodeId::from("test-ingester-1"))
);
- assert_eq!(leader_follower_pairs[3].0, "test-ingester-1");
+ assert_eq!(leader_follower_pairs[3].0, "test-ingester-3");
assert_eq!(
leader_follower_pairs[3].1,
- Some(NodeId::from("test-ingester-3"))
+ Some(NodeId::from("test-ingester-1"))
);
}
@@ -2152,8 +2234,6 @@ mod tests {
assert_eq!(request.shard_pkeys.len(), 1);
assert_eq!(request.shard_pkeys[0].index_uid(), &index_uid_clone);
assert_eq!(request.shard_pkeys[0].source_id, "test-source");
- assert_eq!(request.shard_pkeys[0].shard_id(), ShardId::from(2));
-
Err(IngestV2Error::Internal(
"failed to close shards".to_string(),
))
@@ -2523,7 +2603,7 @@ mod tests {
index_uid: index_uid.clone().into(),
source_id: source_id.clone(),
shard_id: Some(ShardId::from(3)),
- shard_state: ShardState::Closed as i32,
+ shard_state: ShardState::Closed as i32, //< this one is closed
leader_id: "test-ingester-0".to_string(),
..Default::default()
},
@@ -2552,6 +2632,7 @@ mod tests {
..Default::default()
},
];
+ // That's 3 open shards on indexer-1, 2 open shard and one closed shard on indexer-0..
model.insert_shards(&index_uid, &source_id, shards);
let shard_infos = BTreeSet::from_iter([
@@ -2594,9 +2675,9 @@ mod tests {
]);
model.update_shards(&source_uid, &shard_infos);
- let (leader_id, shard_id) = find_scale_down_candidate(&source_uid, &model).unwrap();
- assert_eq!(leader_id, "test-ingester-0");
- assert_eq!(shard_id, ShardId::from(2));
+ let (leader_id, _shard_id) = find_scale_down_candidate(&source_uid, &model).unwrap();
+ // We pick ingester 1 has it has more open shard
+ assert_eq!(leader_id, "test-ingester-1");
}
#[tokio::test]
@@ -3073,4 +3154,121 @@ mod tests {
let callback = &callbacks[0];
assert_eq!(callback.closed_shards.len(), 1);
}
+
+ // #[track_caller]
+ fn test_allocate_shards_aux_aux(
+ shard_counts_map: &HashMap<&str, usize>,
+ num_shards: usize,
+ replication_enabled: bool,
+ ) {
+ let shard_allocations_opt =
+ super::allocate_shards(shard_counts_map, num_shards, replication_enabled);
+ if num_shards == 0 {
+ assert_eq!(shard_allocations_opt, Some(Vec::new()));
+ return;
+ }
+ let num_nodes_required = if replication_enabled { 2 } else { 1 };
+ if shard_counts_map.len() < num_nodes_required {
+ assert!(shard_allocations_opt.is_none());
+ return;
+ }
+ let shard_allocations = shard_allocations_opt.unwrap();
+ let mut total_counts: HashMap<&str, usize> = HashMap::default();
+ assert_eq!(shard_allocations.len(), num_shards);
+ if num_shards == 0 {
+ return;
+ }
+ for (leader, follower_opt) in shard_allocations {
+ assert_eq!(follower_opt.is_some(), replication_enabled);
+ *total_counts.entry(leader).or_default() += 1;
+ if let Some(follower) = follower_opt {
+ *total_counts.entry(follower).or_default() += 1;
+ assert_ne!(follower, leader);
+ }
+ }
+ for (shard, count) in shard_counts_map {
+ if let Some(shard_count) = total_counts.get_mut(shard) {
+ *shard_count += *count;
+ }
+ }
+ let (min, max) = total_counts
+ .values()
+ .copied()
+ .minmax()
+ .into_option()
+ .unwrap();
+ if !replication_enabled {
+ // If replication is enabled, we can end up being forced to not spread shards as evenly
+ // as we would wish. For instance, if there are only two nodes initially
+ // unbalanced.
+ assert!(min + 1 >= max);
+ } else {
+ let (previous_min, previous_max) = shard_counts_map
+ .values()
+ .copied()
+ .minmax()
+ .into_option()
+ .unwrap();
+ // The algorithm is supposed to reduce the variance.
+ // Of course sometimes it is not possible. For instance for 3 nodes that are
+ // perfectly balanced to begin with, if we as for a single shard.
+ assert!((previous_max - previous_min).max(1) >= (max - min));
+ }
+ }
+
+ fn test_allocate_shards_aux(shard_counts: &[usize]) {
+ let mut shard_counts_map: HashMap<&str, usize> = HashMap::new();
+ let shards = (0..shard_counts.len())
+ .map(|i| format!("shard-{}", i))
+ .collect::>();
+ for (shard, &shard_count) in shards.iter().zip(shard_counts.iter()) {
+ shard_counts_map.insert(shard.as_str(), shard_count);
+ }
+ for i in 0..10 {
+ test_allocate_shards_aux_aux(&shard_counts_map, i, false);
+ test_allocate_shards_aux_aux(&shard_counts_map, i, true);
+ }
+ }
+
+ use proptest::prelude::*;
+
+ proptest! {
+ #[test]
+ fn test_proptest_allocate_shards(shard_counts in proptest::collection::vec(0..10usize, 0..10usize)) {
+ test_allocate_shards_aux(&shard_counts);
+ }
+ }
+
+ #[test]
+ fn test_allocate_shards_prop_test() {
+ test_allocate_shards_aux(&[]);
+ test_allocate_shards_aux(&[1]);
+ test_allocate_shards_aux(&[1, 1]);
+ test_allocate_shards_aux(&[1, 2]);
+ test_allocate_shards_aux(&[1, 4]);
+ test_allocate_shards_aux(&[2, 3, 2]);
+ test_allocate_shards_aux(&[2, 4, 6]);
+ test_allocate_shards_aux(&[2, 3, 10]);
+ }
+
+ #[test]
+ fn test_allocate_shards_prop_test_bug() {
+ test_allocate_shards_aux(&[7, 7, 7]);
+ }
+
+ #[test]
+ fn test_pick_one() {
+ let mut shard_counts = BTreeMap::default();
+ shard_counts.insert(1, vec!["node1", "node2"]);
+ let mut rng = rand::thread_rng();
+ let node = pick_one(&mut shard_counts, Some("node2"), &mut rng).unwrap();
+ assert_eq!(node, "node1");
+ assert_eq!(shard_counts.len(), 2);
+ assert_eq!(&shard_counts.get(&1).unwrap()[..], &["node2"]);
+ assert_eq!(&shard_counts.get(&2).unwrap()[..], &["node1"]);
+ let node = pick_one(&mut shard_counts, None, &mut rng).unwrap();
+ assert_eq!(node, "node2");
+ assert_eq!(shard_counts.len(), 1);
+ assert_eq!(&shard_counts.get(&2).unwrap()[..], &["node1", "node2"]);
+ }
}