Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clippy #1

Merged
merged 3 commits into from
Sep 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 75 additions & 104 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/// This pkg provides a consistent hashring with bounded loads. This implementation also adds
/// partitioning logic on top of the original algorithm. For more information about the underlying algorithm,
/// please take a look at https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html.
/// please take a look at <https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html>.
///
/// This pkg is a port of ((and consistent with) the Go pkg https://github.com/buraksezer/consistent
/// This pkg is a port of ((and consistent with) the Go pkg [consistent](https://github.com/buraksezer/consistent)
///
/// # Examples
///
Expand Down Expand Up @@ -71,13 +71,13 @@ impl Display for HashRingError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
HashRingError::IllegalArgument(msg) => {
write!(f, "[HashRingError] Illegal argument: {}", msg)
write!(f, "[HashRingError] Illegal argument: {msg}")
}
HashRingError::InvalidValue(msg) => write!(f, "[HashRingError] Value error: {}", msg),
HashRingError::InvalidValue(msg) => write!(f, "[HashRingError] Value error: {msg}"),
HashRingError::StorageLock(msg) => {
write!(f, "[HashRingError] Storage lock error: {}", msg)
write!(f, "[HashRingError] Storage lock error: {msg}")
}
HashRingError::NotFound(msg) => write!(f, "[HashRingError] Not found: {}", msg),
HashRingError::NotFound(msg) => write!(f, "[HashRingError] Not found: {msg}"),
}
}
}
Expand Down Expand Up @@ -149,6 +149,7 @@ impl<H: BuildHasher> HashRingConfig<H> {
}
}

#[derive(Default)]
struct HashRingStorage {
/// The hash ring.
ring: BTreeMap<u64, Vec<u8>>,
Expand All @@ -160,20 +161,9 @@ struct HashRingStorage {
loads: HashMap<Vec<u8>, f64>,
}

impl Default for HashRingStorage {
fn default() -> Self {
HashRingStorage {
ring: BTreeMap::new(),
members: HashSet::new(),
partitions: HashMap::new(),
loads: HashMap::new(),
}
}
}

impl HashRingStorage {
fn average_load(&self, partition_count: u64, load: f64) -> f64 {
if self.members.len() == 0 {
if self.members.is_empty() {
return 0.0;
}

Expand All @@ -194,7 +184,7 @@ impl HashRingStorage {

/// Adds nodes to both the hashring and the list of members.
fn add_nodes(&mut self, nodes: Vec<Vec<u8>>, vnodes: Vec<(u64, Vec<u8>)>) {
self.members.extend(nodes.iter().map(|node| node.clone()));
self.members.extend(nodes.iter().cloned());
self.ring
.extend(vnodes.iter().map(|(key, value)| (*key, value.clone())));
}
Expand All @@ -208,7 +198,7 @@ impl HashRingStorage {
let removed_vnodes: HashSet<u64> = HashSet::from_iter(vnode_keys);
self.ring.retain(|key, _| !removed_vnodes.contains(key));

if self.members.len() == 0 {
if self.members.is_empty() {
self.partitions.clear();
}
}
Expand All @@ -220,11 +210,7 @@ impl HashRingStorage {

/// Returns the node that owns the partition.
fn get_node_for_partition(&self, partition_id: u64) -> Option<Vec<u8>> {
if let Some(member) = self.partitions.get(&partition_id) {
return Some(member.clone());
} else {
return None;
}
self.partitions.get(&partition_id).cloned()
}

/// Distributes the partitions across the nodes on the hashring.
Expand Down Expand Up @@ -297,36 +283,38 @@ impl HashRingStorage {
.iter()
.position(|(_, member)| member == &partition_owner)
{
let member_hash_keys = members_by_hash.keys().cloned().collect::<Vec<_>>();
let member_hash_keys = members_by_hash.keys().copied().collect::<Vec<_>>();

// This check is kind of silly, but still...
if partition_owner_position >= members_by_hash.len() {
partition_owner_position = 0;
}

while nodes.len() < n {
if let Some(member_hash) = member_hash_keys.get(partition_owner_position) {
match members_by_hash.get(member_hash) {
Some(member) => {
nodes.push(member.clone());
}
None => unreachable!(),
}
} else {
unreachable!()
}
let member_hash = member_hash_keys
.get(partition_owner_position)
.unwrap_or_else(|| {
unreachable!(
"Member hash not found at position {}",
partition_owner_position
)
});

let member = members_by_hash.get(member_hash).unwrap_or_else(|| {
unreachable!("Member not found for hash {}", member_hash)
});

nodes.push(member.clone());

partition_owner_position += 1;
if partition_owner_position >= members_by_hash.len() {
partition_owner_position = 0;
}
partition_owner_position %= members_by_hash.len();
}

return Ok(nodes);
Ok(nodes)
} else {
return Err(HashRingError::InvalidValue(format!(
Err(HashRingError::InvalidValue(format!(
"partition({partition_id}) does not have a corresponding node on the ring. specifically, no corresponding node was found from the members_by_hash argument"
)));
)))
}
}
None => Err(HashRingError::IllegalArgument(format!(
Expand Down Expand Up @@ -363,7 +351,7 @@ impl<H: BuildHasher> HashRing<H> {
pub fn find_partition_for_key<K: Hash>(&self, key: &K) -> Result<u64, HashRingError> {
let mut hasher = self.config.get_hasher();
key.hash(&mut hasher);
Ok(hasher.finish() % self.config.partition_count() as u64)
Ok(hasher.finish() % self.config.partition_count())
}

pub fn list_nodes<N: FromStr>(&self) -> Result<Vec<N>, HashRingError> {
Expand Down Expand Up @@ -394,6 +382,7 @@ impl<H: BuildHasher> HashRing<H> {
Ok(storage.average_load(self.config.partition_count(), self.config.load_factor()))
}

#[must_use]
pub fn get_node_for_partition<N: FromStr>(&self, partition_id: u64) -> Option<N> {
let storage = self
.storage
Expand All @@ -411,11 +400,9 @@ impl<H: BuildHasher> HashRing<H> {
}

pub fn locate_key<K: Hash, N: FromStr>(&self, key: &K) -> Option<N> {
if let Some(partition_id) = self.find_partition_for_key(key).ok() {
self.get_node_for_partition(partition_id)
} else {
None
}
self.find_partition_for_key(key)
.map(|partition_id| self.get_node_for_partition(partition_id))
.unwrap_or(None)
}

pub fn load_distribution<N: FromStr + Hash + Eq>(
Expand All @@ -426,26 +413,28 @@ impl<H: BuildHasher> HashRing<H> {
.lock()
.map_err(|e| HashRingError::StorageLock(e.to_string()))?;

Ok(storage.load_distribution().iter().try_fold(
HashMap::new(),
|mut acc, (node, load)| match String::from_utf8(node.clone()) {
Ok(s) => match s.parse::<N>() {
Ok(n) => {
acc.insert(n, *load);
Ok(acc)
}
Err(_) => Err(HashRingError::InvalidValue(format!(
"could not parse string ({}) into type({})",
s,
std::any::type_name::<N>(),
storage
.load_distribution()
.iter()
.try_fold(
HashMap::new(),
|mut acc, (node, load)| match String::from_utf8(node.clone()) {
Ok(s) => match s.parse::<N>() {
Ok(n) => {
acc.insert(n, *load);
Ok(acc)
}
Err(_) => Err(HashRingError::InvalidValue(format!(
"could not parse string ({}) into type({})",
s,
std::any::type_name::<N>(),
))),
},
Err(e) => Err(HashRingError::InvalidValue(format!(
"could not parse vec<u8> into string; received error: {e}"
))),
},
Err(e) => Err(HashRingError::InvalidValue(format!(
"could not parse vec<u8> into string; received error: {}",
e.to_string()
))),
},
)?)
)
}

pub fn add_nodes<N: Display>(&self, nodes: Vec<N>) -> Result<(), HashRingError> {
Expand All @@ -462,7 +451,7 @@ impl<H: BuildHasher> HashRing<H> {
(0..self.config.replication_factor())
.map(|i| {
let mut hasher = self.config.get_hasher();
write!(&mut uid, "{} (hashring_node_replica_{})", n, i).unwrap();
write!(&mut uid, "{n} (hashring_node_replica_{i})").unwrap();
uid.hash(&mut hasher);
(hasher.finish(), n.to_string().as_bytes().to_vec())
})
Expand Down Expand Up @@ -508,7 +497,7 @@ impl<H: BuildHasher> HashRing<H> {
(0..self.config.replication_factor())
.map(|i| {
let mut hasher = self.config.get_hasher();
write!(&mut uid, "{} (hashring_node_replica_{})", n, i).unwrap();
write!(&mut uid, "{n} (hashring_node_replica_{i})").unwrap();
uid.hash(&mut hasher);
hasher.finish()
})
Expand Down Expand Up @@ -597,12 +586,10 @@ impl<H: BuildHasher> HashRing<H> {

Ok(nodes
.iter()
.filter_map(|n| match String::from_utf8(n.clone()) {
Ok(s) => match s.parse::<N>() {
Ok(n) => Some(n),
Err(_) => None,
},
Err(_) => None,
.filter_map(|n| {
String::from_utf8(n.clone())
.ok()
.and_then(|s| s.parse::<N>().ok())
})
.collect())
}
Expand Down Expand Up @@ -640,7 +627,7 @@ mod test {

fn test_nodes(n: usize) -> Vec<HashRingNode> {
(0..n)
.map(|i| HashRingNode::new(format!("node_{}", i)))
.map(|i| HashRingNode::new(format!("node_{i}")))
.collect()
}

Expand All @@ -662,17 +649,16 @@ mod test {
);

let mut expected_nodes = Vec::<HashRingNode>::new();
for node in nodes.iter() {
for ring_node in ring_nodes.iter() {
for node in &nodes {
for ring_node in &ring_nodes {
if node == ring_node {
expected_nodes.push(node.clone());
}
}
}
assert_eq!(
expected_nodes, nodes,
"ring nodes are invalid after adding nodes. Expected {:?}, got {:?}",
expected_nodes, nodes
"ring nodes are invalid after adding nodes. Expected {expected_nodes:?}, got {nodes:?}"
);
}

Expand Down Expand Up @@ -729,14 +715,13 @@ mod test {
let ring_nodes_count = ring.nodes_count().unwrap();
assert_eq!(
ring_nodes_count, test_node_count,
"ring nodes count is invalid after adding nodes. Expected {}, got {}",
test_node_count, ring_nodes_count
"ring nodes count is invalid after adding nodes. Expected {test_node_count}, got {ring_nodes_count}"
);

let max_load = ring.average_load().unwrap();
let load_distribution = ring.load_distribution::<HashRingNode>().unwrap();
let has_overloaded_nodes = load_distribution.iter().any(|(_, load)| load > &max_load);
assert_eq!(has_overloaded_nodes, false, "ring has overloaded nodes.",);
assert!(!has_overloaded_nodes, "ring has overloaded nodes.",);
}

#[test]
Expand All @@ -748,21 +733,17 @@ mod test {
assert_eq!(
ring.locate_key::<String, HashRingNode>(&test_key),
None,
"ring supposedly has node for key {} even though no nodes have been added.",
test_key
"ring supposedly has node for key {test_key} even though no nodes have been added."
);

let test_node_count: usize = 8;
let nodes = test_nodes(test_node_count);
ring.add_nodes(nodes.clone()).unwrap();

assert_eq!(
assert!(
ring.locate_key::<String, HashRingNode>(&test_key.to_string())
.is_some(),
true,
"ring unable to locate node for key {} even though {} nodes have been added.",
test_key,
test_node_count
"ring unable to locate node for key {test_key} even though {test_node_count} nodes have been added."
);
}

Expand All @@ -783,8 +764,7 @@ mod test {
closest_nodes_count
),
Err(HashRingError::IllegalArgument(format!(
"cannot get {} nodes for partition; there are only {} nodes in the ring",
closest_nodes_count, test_node_count
"cannot get {closest_nodes_count} nodes for partition; there are only {test_node_count} nodes in the ring"
))),
)
}
Expand All @@ -806,24 +786,17 @@ mod test {
closest_nodes_count,
);

assert_eq!(
assert!(
closest_nodes.is_ok(),
true,
"ring unable to get closest {} nodes for key {} even though {} nodes have been added.",
closest_nodes_count,
test_key,
test_node_count
"ring unable to get closest {closest_nodes_count} nodes for key {test_key} even though {test_node_count} nodes have been added."
);

let closest_nodes = closest_nodes.unwrap();

assert_eq!(
closest_nodes.len(),
closest_nodes_count,
"ring returned closest {} nodes for key {} instead of requested closest {} nodes.",
closest_nodes_count,
test_key,
test_node_count
"ring returned closest {closest_nodes_count} nodes for key {test_key} instead of requested closest {test_node_count} nodes."
);

let node_for_partition = ring
Expand All @@ -835,9 +808,7 @@ mod test {
|node| *node == node_for_partition && node_for_partition != closest_nodes[0]
),
None,
"ring returned node {} for key {} as closest node even though it is not.",
node_for_partition,
test_key
"ring returned node {node_for_partition} for key {test_key} as closest node even though it is not."
);
}
}