diff --git a/src/lib.rs b/src/lib.rs index ba332a3..589e380 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,8 @@ /// This pkg provides a consistent hashring with bounded loads. This implementation also adds /// partitioning logic on top of the original algorithm. For more information about the underlying algorithm, -/// please take a look at https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html. +/// please take a look at . /// -/// This pkg is a port of ((and consistent with) the Go pkg https://github.com/buraksezer/consistent +/// This pkg is a port of ((and consistent with) the Go pkg [consistent](https://github.com/buraksezer/consistent) /// /// # Examples /// @@ -71,13 +71,13 @@ impl Display for HashRingError { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { HashRingError::IllegalArgument(msg) => { - write!(f, "[HashRingError] Illegal argument: {}", msg) + write!(f, "[HashRingError] Illegal argument: {msg}") } - HashRingError::InvalidValue(msg) => write!(f, "[HashRingError] Value error: {}", msg), + HashRingError::InvalidValue(msg) => write!(f, "[HashRingError] Value error: {msg}"), HashRingError::StorageLock(msg) => { - write!(f, "[HashRingError] Storage lock error: {}", msg) + write!(f, "[HashRingError] Storage lock error: {msg}") } - HashRingError::NotFound(msg) => write!(f, "[HashRingError] Not found: {}", msg), + HashRingError::NotFound(msg) => write!(f, "[HashRingError] Not found: {msg}"), } } } @@ -149,6 +149,7 @@ impl HashRingConfig { } } +#[derive(Default)] struct HashRingStorage { /// The hash ring. ring: BTreeMap>, @@ -160,20 +161,9 @@ struct HashRingStorage { loads: HashMap, f64>, } -impl Default for HashRingStorage { - fn default() -> Self { - HashRingStorage { - ring: BTreeMap::new(), - members: HashSet::new(), - partitions: HashMap::new(), - loads: HashMap::new(), - } - } -} - impl HashRingStorage { fn average_load(&self, partition_count: u64, load: f64) -> f64 { - if self.members.len() == 0 { + if self.members.is_empty() { return 0.0; } @@ -194,7 +184,7 @@ impl HashRingStorage { /// Adds nodes to both the hashring and the list of members. fn add_nodes(&mut self, nodes: Vec>, vnodes: Vec<(u64, Vec)>) { - self.members.extend(nodes.iter().map(|node| node.clone())); + self.members.extend(nodes.iter().cloned()); self.ring .extend(vnodes.iter().map(|(key, value)| (*key, value.clone()))); } @@ -208,7 +198,7 @@ impl HashRingStorage { let removed_vnodes: HashSet = HashSet::from_iter(vnode_keys); self.ring.retain(|key, _| !removed_vnodes.contains(key)); - if self.members.len() == 0 { + if self.members.is_empty() { self.partitions.clear(); } } @@ -220,11 +210,7 @@ impl HashRingStorage { /// Returns the node that owns the partition. fn get_node_for_partition(&self, partition_id: u64) -> Option> { - if let Some(member) = self.partitions.get(&partition_id) { - return Some(member.clone()); - } else { - return None; - } + self.partitions.get(&partition_id).cloned() } /// Distributes the partitions across the nodes on the hashring. @@ -297,7 +283,7 @@ impl HashRingStorage { .iter() .position(|(_, member)| member == &partition_owner) { - let member_hash_keys = members_by_hash.keys().cloned().collect::>(); + let member_hash_keys = members_by_hash.keys().copied().collect::>(); // This check is kind of silly, but still... if partition_owner_position >= members_by_hash.len() { @@ -305,28 +291,30 @@ impl HashRingStorage { } while nodes.len() < n { - if let Some(member_hash) = member_hash_keys.get(partition_owner_position) { - match members_by_hash.get(member_hash) { - Some(member) => { - nodes.push(member.clone()); - } - None => unreachable!(), - } - } else { - unreachable!() - } + let member_hash = member_hash_keys + .get(partition_owner_position) + .unwrap_or_else(|| { + unreachable!( + "Member hash not found at position {}", + partition_owner_position + ) + }); + + let member = members_by_hash.get(member_hash).unwrap_or_else(|| { + unreachable!("Member not found for hash {}", member_hash) + }); + + nodes.push(member.clone()); partition_owner_position += 1; - if partition_owner_position >= members_by_hash.len() { - partition_owner_position = 0; - } + partition_owner_position %= members_by_hash.len(); } - return Ok(nodes); + Ok(nodes) } else { - return Err(HashRingError::InvalidValue(format!( + Err(HashRingError::InvalidValue(format!( "partition({partition_id}) does not have a corresponding node on the ring. specifically, no corresponding node was found from the members_by_hash argument" - ))); + ))) } } None => Err(HashRingError::IllegalArgument(format!( @@ -363,7 +351,7 @@ impl HashRing { pub fn find_partition_for_key(&self, key: &K) -> Result { let mut hasher = self.config.get_hasher(); key.hash(&mut hasher); - Ok(hasher.finish() % self.config.partition_count() as u64) + Ok(hasher.finish() % self.config.partition_count()) } pub fn list_nodes(&self) -> Result, HashRingError> { @@ -394,6 +382,7 @@ impl HashRing { Ok(storage.average_load(self.config.partition_count(), self.config.load_factor())) } + #[must_use] pub fn get_node_for_partition(&self, partition_id: u64) -> Option { let storage = self .storage @@ -411,11 +400,9 @@ impl HashRing { } pub fn locate_key(&self, key: &K) -> Option { - if let Some(partition_id) = self.find_partition_for_key(key).ok() { - self.get_node_for_partition(partition_id) - } else { - None - } + self.find_partition_for_key(key) + .map(|partition_id| self.get_node_for_partition(partition_id)) + .unwrap_or(None) } pub fn load_distribution( @@ -426,26 +413,28 @@ impl HashRing { .lock() .map_err(|e| HashRingError::StorageLock(e.to_string()))?; - Ok(storage.load_distribution().iter().try_fold( - HashMap::new(), - |mut acc, (node, load)| match String::from_utf8(node.clone()) { - Ok(s) => match s.parse::() { - Ok(n) => { - acc.insert(n, *load); - Ok(acc) - } - Err(_) => Err(HashRingError::InvalidValue(format!( - "could not parse string ({}) into type({})", - s, - std::any::type_name::(), + storage + .load_distribution() + .iter() + .try_fold( + HashMap::new(), + |mut acc, (node, load)| match String::from_utf8(node.clone()) { + Ok(s) => match s.parse::() { + Ok(n) => { + acc.insert(n, *load); + Ok(acc) + } + Err(_) => Err(HashRingError::InvalidValue(format!( + "could not parse string ({}) into type({})", + s, + std::any::type_name::(), + ))), + }, + Err(e) => Err(HashRingError::InvalidValue(format!( + "could not parse vec into string; received error: {e}" ))), }, - Err(e) => Err(HashRingError::InvalidValue(format!( - "could not parse vec into string; received error: {}", - e.to_string() - ))), - }, - )?) + ) } pub fn add_nodes(&self, nodes: Vec) -> Result<(), HashRingError> { @@ -462,7 +451,7 @@ impl HashRing { (0..self.config.replication_factor()) .map(|i| { let mut hasher = self.config.get_hasher(); - write!(&mut uid, "{} (hashring_node_replica_{})", n, i).unwrap(); + write!(&mut uid, "{n} (hashring_node_replica_{i})").unwrap(); uid.hash(&mut hasher); (hasher.finish(), n.to_string().as_bytes().to_vec()) }) @@ -508,7 +497,7 @@ impl HashRing { (0..self.config.replication_factor()) .map(|i| { let mut hasher = self.config.get_hasher(); - write!(&mut uid, "{} (hashring_node_replica_{})", n, i).unwrap(); + write!(&mut uid, "{n} (hashring_node_replica_{i})").unwrap(); uid.hash(&mut hasher); hasher.finish() }) @@ -597,12 +586,10 @@ impl HashRing { Ok(nodes .iter() - .filter_map(|n| match String::from_utf8(n.clone()) { - Ok(s) => match s.parse::() { - Ok(n) => Some(n), - Err(_) => None, - }, - Err(_) => None, + .filter_map(|n| { + String::from_utf8(n.clone()) + .ok() + .and_then(|s| s.parse::().ok()) }) .collect()) } @@ -640,7 +627,7 @@ mod test { fn test_nodes(n: usize) -> Vec { (0..n) - .map(|i| HashRingNode::new(format!("node_{}", i))) + .map(|i| HashRingNode::new(format!("node_{i}"))) .collect() } @@ -662,8 +649,8 @@ mod test { ); let mut expected_nodes = Vec::::new(); - for node in nodes.iter() { - for ring_node in ring_nodes.iter() { + for node in &nodes { + for ring_node in &ring_nodes { if node == ring_node { expected_nodes.push(node.clone()); } @@ -671,8 +658,7 @@ mod test { } assert_eq!( expected_nodes, nodes, - "ring nodes are invalid after adding nodes. Expected {:?}, got {:?}", - expected_nodes, nodes + "ring nodes are invalid after adding nodes. Expected {expected_nodes:?}, got {nodes:?}" ); } @@ -729,14 +715,13 @@ mod test { let ring_nodes_count = ring.nodes_count().unwrap(); assert_eq!( ring_nodes_count, test_node_count, - "ring nodes count is invalid after adding nodes. Expected {}, got {}", - test_node_count, ring_nodes_count + "ring nodes count is invalid after adding nodes. Expected {test_node_count}, got {ring_nodes_count}" ); let max_load = ring.average_load().unwrap(); let load_distribution = ring.load_distribution::().unwrap(); let has_overloaded_nodes = load_distribution.iter().any(|(_, load)| load > &max_load); - assert_eq!(has_overloaded_nodes, false, "ring has overloaded nodes.",); + assert!(!has_overloaded_nodes, "ring has overloaded nodes.",); } #[test] @@ -748,21 +733,17 @@ mod test { assert_eq!( ring.locate_key::(&test_key), None, - "ring supposedly has node for key {} even though no nodes have been added.", - test_key + "ring supposedly has node for key {test_key} even though no nodes have been added." ); let test_node_count: usize = 8; let nodes = test_nodes(test_node_count); ring.add_nodes(nodes.clone()).unwrap(); - assert_eq!( + assert!( ring.locate_key::(&test_key.to_string()) .is_some(), - true, - "ring unable to locate node for key {} even though {} nodes have been added.", - test_key, - test_node_count + "ring unable to locate node for key {test_key} even though {test_node_count} nodes have been added." ); } @@ -783,8 +764,7 @@ mod test { closest_nodes_count ), Err(HashRingError::IllegalArgument(format!( - "cannot get {} nodes for partition; there are only {} nodes in the ring", - closest_nodes_count, test_node_count + "cannot get {closest_nodes_count} nodes for partition; there are only {test_node_count} nodes in the ring" ))), ) } @@ -806,13 +786,9 @@ mod test { closest_nodes_count, ); - assert_eq!( + assert!( closest_nodes.is_ok(), - true, - "ring unable to get closest {} nodes for key {} even though {} nodes have been added.", - closest_nodes_count, - test_key, - test_node_count + "ring unable to get closest {closest_nodes_count} nodes for key {test_key} even though {test_node_count} nodes have been added." ); let closest_nodes = closest_nodes.unwrap(); @@ -820,10 +796,7 @@ mod test { assert_eq!( closest_nodes.len(), closest_nodes_count, - "ring returned closest {} nodes for key {} instead of requested closest {} nodes.", - closest_nodes_count, - test_key, - test_node_count + "ring returned closest {closest_nodes_count} nodes for key {test_key} instead of requested closest {test_node_count} nodes." ); let node_for_partition = ring @@ -835,9 +808,7 @@ mod test { |node| *node == node_for_partition && node_for_partition != closest_nodes[0] ), None, - "ring returned node {} for key {} as closest node even though it is not.", - node_for_partition, - test_key + "ring returned node {node_for_partition} for key {test_key} as closest node even though it is not." ); } }