Skip to content

Commit

Permalink
chore: remove errors in RoutingSnapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy committed Jul 17, 2024
1 parent e7b6395 commit a493beb
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,7 @@ where
async fn handle_health_update(&mut self, msg: NodeHealthState) {
let current_snapshot = self.routing_snapshot.load_full();
let mut new_snapshot = (*current_snapshot).clone();
if let Err(err) = new_snapshot.update_node(&msg.node, msg.health.clone()) {
error!("{HEALTH_MANAGER_ACTOR}: failed to update snapshot: {err:?}");
return;
}
new_snapshot.update_node(&msg.node, msg.health.clone());
self.routing_snapshot.store(Arc::new(new_snapshot));
if !self.is_initialized && msg.health.is_healthy() {
self.is_initialized = true;
Expand All @@ -269,7 +266,7 @@ where
let current_snapshot = self.routing_snapshot.load_full();
let mut new_snapshot = (*current_snapshot).clone();
// If the snapshot has changed, store it and restart all node's health checks.
if let Ok(true) = new_snapshot.sync_nodes(&nodes) {
if new_snapshot.sync_nodes(&nodes) {
self.routing_snapshot.store(Arc::new(new_snapshot));
self.stop_all_checks().await;
self.start_checks(nodes.to_vec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
idx.map(|idx| self.weighted_nodes[idx].node.clone())
}

fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result<bool> {
fn sync_nodes(&mut self, nodes: &[Node]) -> bool {
let new_nodes = HashSet::from_iter(nodes.iter().cloned());
// Find nodes removed from topology.
let nodes_removed: Vec<_> = self
Expand All @@ -107,12 +107,13 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
let idx = self.weighted_nodes.iter().position(|x| x.node == node);
idx.map(|idx| self.weighted_nodes.swap_remove(idx));
}
Ok(has_added_nodes || has_removed_nodes)

has_added_nodes || has_removed_nodes
}

fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result<bool> {
fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool {
if !self.existing_nodes.contains(node) {
return Ok(false);
return false;
}

// If latency is None (meaning Node is unhealthy), we assign some big value
Expand All @@ -135,7 +136,8 @@ impl RoutingSnapshot for LatencyRoutingSnapshot {
weight,
})
}
Ok(true)

true
}
}

Expand Down Expand Up @@ -174,9 +176,7 @@ mod tests {
let node = Node::new("api1.com").unwrap();
let health = HealthCheckStatus::new(Some(Duration::from_secs(1)));
// Act
let is_updated = snapshot
.update_node(&node, health)
.expect("node update failed");
let is_updated = snapshot.update_node(&node, health);
// Assert
assert!(!is_updated);
assert!(snapshot.weighted_nodes.is_empty());
Expand All @@ -192,9 +192,7 @@ mod tests {
let health = HealthCheckStatus::new(Some(Duration::from_secs(1)));
snapshot.existing_nodes.insert(node.clone());
// Check first update
let is_updated = snapshot
.update_node(&node, health)
.expect("node update failed");
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
assert!(snapshot.has_nodes());
let weighted_node = snapshot.weighted_nodes.first().unwrap();
Expand All @@ -206,9 +204,7 @@ mod tests {
assert_eq!(snapshot.next().unwrap(), node);
// Check second update
let health = HealthCheckStatus::new(Some(Duration::from_secs(2)));
let is_updated = snapshot
.update_node(&node, health)
.expect("node update failed");
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let weighted_node = snapshot.weighted_nodes.first().unwrap();
assert_eq!(
Expand All @@ -218,9 +214,7 @@ mod tests {
assert_eq!(weighted_node.weight, 1.0 / 1.5);
// Check third update
let health = HealthCheckStatus::new(Some(Duration::from_secs(3)));
let is_updated = snapshot
.update_node(&node, health)
.expect("node update failed");
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let weighted_node = snapshot.weighted_nodes.first().unwrap();
assert_eq!(
Expand All @@ -230,9 +224,7 @@ mod tests {
assert_eq!(weighted_node.weight, 0.5);
// Check forth update with none
let health = HealthCheckStatus::new(None);
let is_updated = snapshot
.update_node(&node, health)
.expect("node update failed");
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
let weighted_node = snapshot.weighted_nodes.first().unwrap();
let avg_latency = Duration::from_secs_f64((MAX_LATENCY.as_secs() as f64 + 6.0) / 4.0);
Expand All @@ -249,7 +241,7 @@ mod tests {
let mut snapshot = LatencyRoutingSnapshot::new();
let node_1 = Node::new("api1.com").unwrap();
// Sync with node_1
let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]);
assert!(nodes_changed);
assert!(snapshot.weighted_nodes.is_empty());
assert_eq!(
Expand All @@ -263,7 +255,7 @@ mod tests {
weight: 0.0,
});
// Sync with node_1 again
let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]);
assert!(!nodes_changed);
assert_eq!(
snapshot.existing_nodes,
Expand All @@ -272,7 +264,7 @@ mod tests {
assert_eq!(snapshot.weighted_nodes[0].node, node_1);
// Sync with node_2
let node_2 = Node::new("api2.com").unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]).unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]);
assert!(nodes_changed);
assert_eq!(
snapshot.existing_nodes,
Expand All @@ -288,9 +280,7 @@ mod tests {
});
// Sync with [node_2, node_3]
let node_3 = Node::new("api3.com").unwrap();
let nodes_changed = snapshot
.sync_nodes(&[node_3.clone(), node_2.clone()])
.unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_3.clone(), node_2.clone()]);
assert!(nodes_changed);
assert_eq!(
snapshot.existing_nodes,
Expand All @@ -304,13 +294,13 @@ mod tests {
weight: 0.0,
});
// Sync with []
let nodes_changed = snapshot.sync_nodes(&[]).unwrap();
let nodes_changed = snapshot.sync_nodes(&[]);
assert!(nodes_changed);
assert!(snapshot.existing_nodes.is_empty());
// Make sure all nodes were removed from the healthy_nodes
assert!(snapshot.weighted_nodes.is_empty());
// Sync with [] again
let nodes_changed = snapshot.sync_nodes(&[]).unwrap();
let nodes_changed = snapshot.sync_nodes(&[]);
assert!(!nodes_changed);
assert!(snapshot.existing_nodes.is_empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot {
.cloned()
}

fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result<bool> {
fn sync_nodes(&mut self, nodes: &[Node]) -> bool {
let new_nodes = HashSet::from_iter(nodes.iter().cloned());
// Find nodes removed from topology.
let nodes_removed: Vec<_> = self
Expand All @@ -67,17 +67,18 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot {
self.existing_nodes.remove(node);
self.healthy_nodes.remove(node);
});
Ok(has_added_nodes || has_removed_nodes)

has_added_nodes || has_removed_nodes
}

fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result<bool> {
fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool {
if !self.existing_nodes.contains(node) {
return Ok(false);
return false;
}
if health.is_healthy() {
Ok(self.healthy_nodes.insert(node.clone()))
self.healthy_nodes.insert(node.clone())
} else {
Ok(self.healthy_nodes.remove(node))
self.healthy_nodes.remove(node)
}
}
}
Expand Down Expand Up @@ -116,17 +117,13 @@ mod tests {
let healthy = HealthCheckStatus::new(Some(Duration::from_secs(1)));
let unhealthy = HealthCheckStatus::new(None);
// Act 1
let is_updated = snapshot
.update_node(&node, healthy)
.expect("node update failed");
let is_updated = snapshot.update_node(&node, healthy);
// Assert
assert!(!is_updated);
assert!(snapshot.existing_nodes.is_empty());
assert!(snapshot.next().is_none());
// Act 2
let is_updated = snapshot
.update_node(&node, unhealthy)
.expect("node update failed");
let is_updated = snapshot.update_node(&node, unhealthy);
// Assert
assert!(!is_updated);
assert!(snapshot.existing_nodes.is_empty());
Expand All @@ -142,9 +139,7 @@ mod tests {
snapshot.existing_nodes.insert(node.clone());
let health = HealthCheckStatus::new(Some(Duration::from_secs(1)));
// Act
let is_updated = snapshot
.update_node(&node, health)
.expect("node update failed");
let is_updated = snapshot.update_node(&node, health);
assert!(is_updated);
assert!(snapshot.has_nodes());
assert_eq!(snapshot.next().unwrap(), node);
Expand All @@ -160,9 +155,7 @@ mod tests {
snapshot.healthy_nodes.insert(node.clone());
let unhealthy = HealthCheckStatus::new(None);
// Act
let is_updated = snapshot
.update_node(&node, unhealthy)
.expect("node update failed");
let is_updated = snapshot.update_node(&node, unhealthy);
assert!(is_updated);
assert!(!snapshot.has_nodes());
assert!(snapshot.next().is_none());
Expand All @@ -174,7 +167,7 @@ mod tests {
let mut snapshot = RoundRobinRoutingSnapshot::new();
let node_1 = Node::new("api1.com").unwrap();
// Sync with node_1
let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]);
assert!(nodes_changed);
assert!(snapshot.healthy_nodes.is_empty());
assert_eq!(
Expand All @@ -184,7 +177,7 @@ mod tests {
// Add node_1 to healthy_nodes manually
snapshot.healthy_nodes.insert(node_1.clone());
// Sync with node_1 again
let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]);
assert!(!nodes_changed);
assert_eq!(
snapshot.existing_nodes,
Expand All @@ -193,7 +186,7 @@ mod tests {
assert_eq!(snapshot.healthy_nodes, HashSet::from_iter(vec![node_1]));
// Sync with node_2
let node_2 = Node::new("api2.com").unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]).unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]);
assert!(nodes_changed);
assert_eq!(
snapshot.existing_nodes,
Expand All @@ -205,9 +198,7 @@ mod tests {
snapshot.healthy_nodes.insert(node_2.clone());
// Sync with [node_2, node_3]
let node_3 = Node::new("api3.com").unwrap();
let nodes_changed = snapshot
.sync_nodes(&[node_3.clone(), node_2.clone()])
.unwrap();
let nodes_changed = snapshot.sync_nodes(&[node_3.clone(), node_2.clone()]);
assert!(nodes_changed);
assert_eq!(
snapshot.existing_nodes,
Expand All @@ -216,13 +207,13 @@ mod tests {
assert_eq!(snapshot.healthy_nodes, HashSet::from_iter(vec![node_2]));
snapshot.healthy_nodes.insert(node_3);
// Sync with []
let nodes_changed = snapshot.sync_nodes(&[]).unwrap();
let nodes_changed = snapshot.sync_nodes(&[]);
assert!(nodes_changed);
assert!(snapshot.existing_nodes.is_empty());
// Make sure all nodes were removed from the healthy_nodes
assert!(snapshot.healthy_nodes.is_empty());
// Sync with [] again
let nodes_changed = snapshot.sync_nodes(&[]).unwrap();
let nodes_changed = snapshot.sync_nodes(&[]);
assert!(!nodes_changed);
assert!(snapshot.existing_nodes.is_empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::fmt::Debug;

use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node};

///
/// A trait for interacting with the snapshot of nodes (routing table).
pub trait RoutingSnapshot: Send + Sync + Clone + Debug {
///
/// Returns `true` if the snapshot has nodes.
fn has_nodes(&self) -> bool;
///
/// Get the next node in the snapshot.
fn next(&self) -> Option<Node>;
///
fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result<bool>;
///
fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result<bool>;
/// Syncs the nodes in the snapshot with the provided list of nodes, returning `true` if the snapshot was updated.
fn sync_nodes(&mut self, nodes: &[Node]) -> bool;
/// Updates the health status of a specific node, returning `true` if the node was found and updated.
fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool;
}

0 comments on commit a493beb

Please sign in to comment.