Skip to content

Commit

Permalink
Merge pull request #1 from chris-ha458/clippy
Browse files Browse the repository at this point in the history
Clippy
  • Loading branch information
ahmedtadde committed Sep 1, 2023
2 parents b47c23f + 720a73d commit e445ebb
Showing 1 changed file with 75 additions and 104 deletions.
179 changes: 75 additions & 104 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/// This pkg provides a consistent hashring with bounded loads. This implementation also adds
/// partitioning logic on top of the original algorithm. For more information about the underlying algorithm,
/// please take a look at https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html.
/// please take a look at <https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html>.
///
/// This pkg is a port of ((and consistent with) the Go pkg https://github.com/buraksezer/consistent
/// This pkg is a port of ((and consistent with) the Go pkg [consistent](https://github.com/buraksezer/consistent)
///
/// # Examples
///
Expand Down Expand Up @@ -71,13 +71,13 @@ impl Display for HashRingError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
HashRingError::IllegalArgument(msg) => {
write!(f, "[HashRingError] Illegal argument: {}", msg)
write!(f, "[HashRingError] Illegal argument: {msg}")
}
HashRingError::InvalidValue(msg) => write!(f, "[HashRingError] Value error: {}", msg),
HashRingError::InvalidValue(msg) => write!(f, "[HashRingError] Value error: {msg}"),
HashRingError::StorageLock(msg) => {
write!(f, "[HashRingError] Storage lock error: {}", msg)
write!(f, "[HashRingError] Storage lock error: {msg}")
}
HashRingError::NotFound(msg) => write!(f, "[HashRingError] Not found: {}", msg),
HashRingError::NotFound(msg) => write!(f, "[HashRingError] Not found: {msg}"),
}
}
}
Expand Down Expand Up @@ -149,6 +149,7 @@ impl<H: BuildHasher> HashRingConfig<H> {
}
}

#[derive(Default)]
struct HashRingStorage {
/// The hash ring.
ring: BTreeMap<u64, Vec<u8>>,
Expand All @@ -160,20 +161,9 @@ struct HashRingStorage {
loads: HashMap<Vec<u8>, f64>,
}

impl Default for HashRingStorage {
fn default() -> Self {
HashRingStorage {
ring: BTreeMap::new(),
members: HashSet::new(),
partitions: HashMap::new(),
loads: HashMap::new(),
}
}
}

impl HashRingStorage {
fn average_load(&self, partition_count: u64, load: f64) -> f64 {
if self.members.len() == 0 {
if self.members.is_empty() {
return 0.0;
}

Expand All @@ -194,7 +184,7 @@ impl HashRingStorage {

/// Adds nodes to both the hashring and the list of members.
fn add_nodes(&mut self, nodes: Vec<Vec<u8>>, vnodes: Vec<(u64, Vec<u8>)>) {
self.members.extend(nodes.iter().map(|node| node.clone()));
self.members.extend(nodes.iter().cloned());
self.ring
.extend(vnodes.iter().map(|(key, value)| (*key, value.clone())));
}
Expand All @@ -208,7 +198,7 @@ impl HashRingStorage {
let removed_vnodes: HashSet<u64> = HashSet::from_iter(vnode_keys);
self.ring.retain(|key, _| !removed_vnodes.contains(key));

if self.members.len() == 0 {
if self.members.is_empty() {
self.partitions.clear();
}
}
Expand All @@ -220,11 +210,7 @@ impl HashRingStorage {

/// Returns the node that owns the partition.
fn get_node_for_partition(&self, partition_id: u64) -> Option<Vec<u8>> {
if let Some(member) = self.partitions.get(&partition_id) {
return Some(member.clone());
} else {
return None;
}
self.partitions.get(&partition_id).cloned()
}

/// Distributes the partitions across the nodes on the hashring.
Expand Down Expand Up @@ -297,36 +283,38 @@ impl HashRingStorage {
.iter()
.position(|(_, member)| member == &partition_owner)
{
let member_hash_keys = members_by_hash.keys().cloned().collect::<Vec<_>>();
let member_hash_keys = members_by_hash.keys().copied().collect::<Vec<_>>();

// This check is kind of silly, but still...
if partition_owner_position >= members_by_hash.len() {
partition_owner_position = 0;
}

while nodes.len() < n {
if let Some(member_hash) = member_hash_keys.get(partition_owner_position) {
match members_by_hash.get(member_hash) {
Some(member) => {
nodes.push(member.clone());
}
None => unreachable!(),
}
} else {
unreachable!()
}
let member_hash = member_hash_keys
.get(partition_owner_position)
.unwrap_or_else(|| {
unreachable!(
"Member hash not found at position {}",
partition_owner_position
)
});

let member = members_by_hash.get(member_hash).unwrap_or_else(|| {
unreachable!("Member not found for hash {}", member_hash)
});

nodes.push(member.clone());

partition_owner_position += 1;
if partition_owner_position >= members_by_hash.len() {
partition_owner_position = 0;
}
partition_owner_position %= members_by_hash.len();
}

return Ok(nodes);
Ok(nodes)
} else {
return Err(HashRingError::InvalidValue(format!(
Err(HashRingError::InvalidValue(format!(
"partition({partition_id}) does not have a corresponding node on the ring. specifically, no corresponding node was found from the members_by_hash argument"
)));
)))
}
}
None => Err(HashRingError::IllegalArgument(format!(
Expand Down Expand Up @@ -363,7 +351,7 @@ impl<H: BuildHasher> HashRing<H> {
pub fn find_partition_for_key<K: Hash>(&self, key: &K) -> Result<u64, HashRingError> {
let mut hasher = self.config.get_hasher();
key.hash(&mut hasher);
Ok(hasher.finish() % self.config.partition_count() as u64)
Ok(hasher.finish() % self.config.partition_count())
}

pub fn list_nodes<N: FromStr>(&self) -> Result<Vec<N>, HashRingError> {
Expand Down Expand Up @@ -394,6 +382,7 @@ impl<H: BuildHasher> HashRing<H> {
Ok(storage.average_load(self.config.partition_count(), self.config.load_factor()))
}

#[must_use]
pub fn get_node_for_partition<N: FromStr>(&self, partition_id: u64) -> Option<N> {
let storage = self
.storage
Expand All @@ -411,11 +400,9 @@ impl<H: BuildHasher> HashRing<H> {
}

pub fn locate_key<K: Hash, N: FromStr>(&self, key: &K) -> Option<N> {
if let Some(partition_id) = self.find_partition_for_key(key).ok() {
self.get_node_for_partition(partition_id)
} else {
None
}
self.find_partition_for_key(key)
.map(|partition_id| self.get_node_for_partition(partition_id))
.unwrap_or(None)
}

pub fn load_distribution<N: FromStr + Hash + Eq>(
Expand All @@ -426,26 +413,28 @@ impl<H: BuildHasher> HashRing<H> {
.lock()
.map_err(|e| HashRingError::StorageLock(e.to_string()))?;

Ok(storage.load_distribution().iter().try_fold(
HashMap::new(),
|mut acc, (node, load)| match String::from_utf8(node.clone()) {
Ok(s) => match s.parse::<N>() {
Ok(n) => {
acc.insert(n, *load);
Ok(acc)
}
Err(_) => Err(HashRingError::InvalidValue(format!(
"could not parse string ({}) into type({})",
s,
std::any::type_name::<N>(),
storage
.load_distribution()
.iter()
.try_fold(
HashMap::new(),
|mut acc, (node, load)| match String::from_utf8(node.clone()) {
Ok(s) => match s.parse::<N>() {
Ok(n) => {
acc.insert(n, *load);
Ok(acc)
}
Err(_) => Err(HashRingError::InvalidValue(format!(
"could not parse string ({}) into type({})",
s,
std::any::type_name::<N>(),
))),
},
Err(e) => Err(HashRingError::InvalidValue(format!(
"could not parse vec<u8> into string; received error: {e}"
))),
},
Err(e) => Err(HashRingError::InvalidValue(format!(
"could not parse vec<u8> into string; received error: {}",
e.to_string()
))),
},
)?)
)
}

pub fn add_nodes<N: Display>(&self, nodes: Vec<N>) -> Result<(), HashRingError> {
Expand All @@ -462,7 +451,7 @@ impl<H: BuildHasher> HashRing<H> {
(0..self.config.replication_factor())
.map(|i| {
let mut hasher = self.config.get_hasher();
write!(&mut uid, "{} (hashring_node_replica_{})", n, i).unwrap();
write!(&mut uid, "{n} (hashring_node_replica_{i})").unwrap();
uid.hash(&mut hasher);
(hasher.finish(), n.to_string().as_bytes().to_vec())
})
Expand Down Expand Up @@ -508,7 +497,7 @@ impl<H: BuildHasher> HashRing<H> {
(0..self.config.replication_factor())
.map(|i| {
let mut hasher = self.config.get_hasher();
write!(&mut uid, "{} (hashring_node_replica_{})", n, i).unwrap();
write!(&mut uid, "{n} (hashring_node_replica_{i})").unwrap();
uid.hash(&mut hasher);
hasher.finish()
})
Expand Down Expand Up @@ -597,12 +586,10 @@ impl<H: BuildHasher> HashRing<H> {

Ok(nodes
.iter()
.filter_map(|n| match String::from_utf8(n.clone()) {
Ok(s) => match s.parse::<N>() {
Ok(n) => Some(n),
Err(_) => None,
},
Err(_) => None,
.filter_map(|n| {
String::from_utf8(n.clone())
.ok()
.and_then(|s| s.parse::<N>().ok())
})
.collect())
}
Expand Down Expand Up @@ -640,7 +627,7 @@ mod test {

fn test_nodes(n: usize) -> Vec<HashRingNode> {
(0..n)
.map(|i| HashRingNode::new(format!("node_{}", i)))
.map(|i| HashRingNode::new(format!("node_{i}")))
.collect()
}

Expand All @@ -662,17 +649,16 @@ mod test {
);

let mut expected_nodes = Vec::<HashRingNode>::new();
for node in nodes.iter() {
for ring_node in ring_nodes.iter() {
for node in &nodes {
for ring_node in &ring_nodes {
if node == ring_node {
expected_nodes.push(node.clone());
}
}
}
assert_eq!(
expected_nodes, nodes,
"ring nodes are invalid after adding nodes. Expected {:?}, got {:?}",
expected_nodes, nodes
"ring nodes are invalid after adding nodes. Expected {expected_nodes:?}, got {nodes:?}"
);
}

Expand Down Expand Up @@ -729,14 +715,13 @@ mod test {
let ring_nodes_count = ring.nodes_count().unwrap();
assert_eq!(
ring_nodes_count, test_node_count,
"ring nodes count is invalid after adding nodes. Expected {}, got {}",
test_node_count, ring_nodes_count
"ring nodes count is invalid after adding nodes. Expected {test_node_count}, got {ring_nodes_count}"
);

let max_load = ring.average_load().unwrap();
let load_distribution = ring.load_distribution::<HashRingNode>().unwrap();
let has_overloaded_nodes = load_distribution.iter().any(|(_, load)| load > &max_load);
assert_eq!(has_overloaded_nodes, false, "ring has overloaded nodes.",);
assert!(!has_overloaded_nodes, "ring has overloaded nodes.",);
}

#[test]
Expand All @@ -748,21 +733,17 @@ mod test {
assert_eq!(
ring.locate_key::<String, HashRingNode>(&test_key),
None,
"ring supposedly has node for key {} even though no nodes have been added.",
test_key
"ring supposedly has node for key {test_key} even though no nodes have been added."
);

let test_node_count: usize = 8;
let nodes = test_nodes(test_node_count);
ring.add_nodes(nodes.clone()).unwrap();

assert_eq!(
assert!(
ring.locate_key::<String, HashRingNode>(&test_key.to_string())
.is_some(),
true,
"ring unable to locate node for key {} even though {} nodes have been added.",
test_key,
test_node_count
"ring unable to locate node for key {test_key} even though {test_node_count} nodes have been added."
);
}

Expand All @@ -783,8 +764,7 @@ mod test {
closest_nodes_count
),
Err(HashRingError::IllegalArgument(format!(
"cannot get {} nodes for partition; there are only {} nodes in the ring",
closest_nodes_count, test_node_count
"cannot get {closest_nodes_count} nodes for partition; there are only {test_node_count} nodes in the ring"
))),
)
}
Expand All @@ -806,24 +786,17 @@ mod test {
closest_nodes_count,
);

assert_eq!(
assert!(
closest_nodes.is_ok(),
true,
"ring unable to get closest {} nodes for key {} even though {} nodes have been added.",
closest_nodes_count,
test_key,
test_node_count
"ring unable to get closest {closest_nodes_count} nodes for key {test_key} even though {test_node_count} nodes have been added."
);

let closest_nodes = closest_nodes.unwrap();

assert_eq!(
closest_nodes.len(),
closest_nodes_count,
"ring returned closest {} nodes for key {} instead of requested closest {} nodes.",
closest_nodes_count,
test_key,
test_node_count
"ring returned closest {closest_nodes_count} nodes for key {test_key} instead of requested closest {test_node_count} nodes."
);

let node_for_partition = ring
Expand All @@ -835,9 +808,7 @@ mod test {
|node| *node == node_for_partition && node_for_partition != closest_nodes[0]
),
None,
"ring returned node {} for key {} as closest node even though it is not.",
node_for_partition,
test_key
"ring returned node {node_for_partition} for key {test_key} as closest node even though it is not."
);
}
}

0 comments on commit e445ebb

Please sign in to comment.