From a493bebb969f4196acaa21612a69fdb40a18bb15 Mon Sep 17 00:00:00 2001 From: Nikolay Komarevskiy Date: Wed, 17 Jul 2024 16:47:53 +0200 Subject: [PATCH] chore: remove errors in RoutingSnapshot --- .../dynamic_routing/health_check.rs | 7 +-- .../snapshot/latency_based_routing.rs | 46 ++++++++----------- .../snapshot/round_robin_routing.rs | 43 +++++++---------- .../snapshot/routing_snapshot.rs | 14 +++--- 4 files changed, 44 insertions(+), 66 deletions(-) diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs b/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs index befbd99e..1c0d0b39 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs @@ -245,10 +245,7 @@ where async fn handle_health_update(&mut self, msg: NodeHealthState) { let current_snapshot = self.routing_snapshot.load_full(); let mut new_snapshot = (*current_snapshot).clone(); - if let Err(err) = new_snapshot.update_node(&msg.node, msg.health.clone()) { - error!("{HEALTH_MANAGER_ACTOR}: failed to update snapshot: {err:?}"); - return; - } + new_snapshot.update_node(&msg.node, msg.health.clone()); self.routing_snapshot.store(Arc::new(new_snapshot)); if !self.is_initialized && msg.health.is_healthy() { self.is_initialized = true; @@ -269,7 +266,7 @@ where let current_snapshot = self.routing_snapshot.load_full(); let mut new_snapshot = (*current_snapshot).clone(); // If the snapshot has changed, store it and restart all node's health checks. - if let Ok(true) = new_snapshot.sync_nodes(&nodes) { + if new_snapshot.sync_nodes(&nodes) { self.routing_snapshot.store(Arc::new(new_snapshot)); self.stop_all_checks().await; self.start_checks(nodes.to_vec()); diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs index c5c33cfd..1ae10136 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs @@ -84,7 +84,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { idx.map(|idx| self.weighted_nodes[idx].node.clone()) } - fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result { + fn sync_nodes(&mut self, nodes: &[Node]) -> bool { let new_nodes = HashSet::from_iter(nodes.iter().cloned()); // Find nodes removed from topology. let nodes_removed: Vec<_> = self @@ -107,12 +107,13 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { let idx = self.weighted_nodes.iter().position(|x| x.node == node); idx.map(|idx| self.weighted_nodes.swap_remove(idx)); } - Ok(has_added_nodes || has_removed_nodes) + + has_added_nodes || has_removed_nodes } - fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result { + fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool { if !self.existing_nodes.contains(node) { - return Ok(false); + return false; } // If latency is None (meaning Node is unhealthy), we assign some big value @@ -135,7 +136,8 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { weight, }) } - Ok(true) + + true } } @@ -174,9 +176,7 @@ mod tests { let node = Node::new("api1.com").unwrap(); let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); // Act - let is_updated = snapshot - .update_node(&node, health) - .expect("node update failed"); + let is_updated = snapshot.update_node(&node, health); // Assert assert!(!is_updated); assert!(snapshot.weighted_nodes.is_empty()); @@ -192,9 +192,7 @@ mod tests { let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); snapshot.existing_nodes.insert(node.clone()); // Check first update - let is_updated = snapshot - .update_node(&node, health) - .expect("node update failed"); + let is_updated = snapshot.update_node(&node, health); assert!(is_updated); assert!(snapshot.has_nodes()); let weighted_node = snapshot.weighted_nodes.first().unwrap(); @@ -206,9 +204,7 @@ mod tests { assert_eq!(snapshot.next().unwrap(), node); // Check second update let health = HealthCheckStatus::new(Some(Duration::from_secs(2))); - let is_updated = snapshot - .update_node(&node, health) - .expect("node update failed"); + let is_updated = snapshot.update_node(&node, health); assert!(is_updated); let weighted_node = snapshot.weighted_nodes.first().unwrap(); assert_eq!( @@ -218,9 +214,7 @@ mod tests { assert_eq!(weighted_node.weight, 1.0 / 1.5); // Check third update let health = HealthCheckStatus::new(Some(Duration::from_secs(3))); - let is_updated = snapshot - .update_node(&node, health) - .expect("node update failed"); + let is_updated = snapshot.update_node(&node, health); assert!(is_updated); let weighted_node = snapshot.weighted_nodes.first().unwrap(); assert_eq!( @@ -230,9 +224,7 @@ mod tests { assert_eq!(weighted_node.weight, 0.5); // Check forth update with none let health = HealthCheckStatus::new(None); - let is_updated = snapshot - .update_node(&node, health) - .expect("node update failed"); + let is_updated = snapshot.update_node(&node, health); assert!(is_updated); let weighted_node = snapshot.weighted_nodes.first().unwrap(); let avg_latency = Duration::from_secs_f64((MAX_LATENCY.as_secs() as f64 + 6.0) / 4.0); @@ -249,7 +241,7 @@ mod tests { let mut snapshot = LatencyRoutingSnapshot::new(); let node_1 = Node::new("api1.com").unwrap(); // Sync with node_1 - let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]); assert!(nodes_changed); assert!(snapshot.weighted_nodes.is_empty()); assert_eq!( @@ -263,7 +255,7 @@ mod tests { weight: 0.0, }); // Sync with node_1 again - let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]); assert!(!nodes_changed); assert_eq!( snapshot.existing_nodes, @@ -272,7 +264,7 @@ mod tests { assert_eq!(snapshot.weighted_nodes[0].node, node_1); // Sync with node_2 let node_2 = Node::new("api2.com").unwrap(); - let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]).unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]); assert!(nodes_changed); assert_eq!( snapshot.existing_nodes, @@ -288,9 +280,7 @@ mod tests { }); // Sync with [node_2, node_3] let node_3 = Node::new("api3.com").unwrap(); - let nodes_changed = snapshot - .sync_nodes(&[node_3.clone(), node_2.clone()]) - .unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_3.clone(), node_2.clone()]); assert!(nodes_changed); assert_eq!( snapshot.existing_nodes, @@ -304,13 +294,13 @@ mod tests { weight: 0.0, }); // Sync with [] - let nodes_changed = snapshot.sync_nodes(&[]).unwrap(); + let nodes_changed = snapshot.sync_nodes(&[]); assert!(nodes_changed); assert!(snapshot.existing_nodes.is_empty()); // Make sure all nodes were removed from the healthy_nodes assert!(snapshot.weighted_nodes.is_empty()); // Sync with [] again - let nodes_changed = snapshot.sync_nodes(&[]).unwrap(); + let nodes_changed = snapshot.sync_nodes(&[]); assert!(!nodes_changed); assert!(snapshot.existing_nodes.is_empty()); } diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs index e468d262..149e49d2 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs @@ -45,7 +45,7 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot { .cloned() } - fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result { + fn sync_nodes(&mut self, nodes: &[Node]) -> bool { let new_nodes = HashSet::from_iter(nodes.iter().cloned()); // Find nodes removed from topology. let nodes_removed: Vec<_> = self @@ -67,17 +67,18 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot { self.existing_nodes.remove(node); self.healthy_nodes.remove(node); }); - Ok(has_added_nodes || has_removed_nodes) + + has_added_nodes || has_removed_nodes } - fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result { + fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool { if !self.existing_nodes.contains(node) { - return Ok(false); + return false; } if health.is_healthy() { - Ok(self.healthy_nodes.insert(node.clone())) + self.healthy_nodes.insert(node.clone()) } else { - Ok(self.healthy_nodes.remove(node)) + self.healthy_nodes.remove(node) } } } @@ -116,17 +117,13 @@ mod tests { let healthy = HealthCheckStatus::new(Some(Duration::from_secs(1))); let unhealthy = HealthCheckStatus::new(None); // Act 1 - let is_updated = snapshot - .update_node(&node, healthy) - .expect("node update failed"); + let is_updated = snapshot.update_node(&node, healthy); // Assert assert!(!is_updated); assert!(snapshot.existing_nodes.is_empty()); assert!(snapshot.next().is_none()); // Act 2 - let is_updated = snapshot - .update_node(&node, unhealthy) - .expect("node update failed"); + let is_updated = snapshot.update_node(&node, unhealthy); // Assert assert!(!is_updated); assert!(snapshot.existing_nodes.is_empty()); @@ -142,9 +139,7 @@ mod tests { snapshot.existing_nodes.insert(node.clone()); let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); // Act - let is_updated = snapshot - .update_node(&node, health) - .expect("node update failed"); + let is_updated = snapshot.update_node(&node, health); assert!(is_updated); assert!(snapshot.has_nodes()); assert_eq!(snapshot.next().unwrap(), node); @@ -160,9 +155,7 @@ mod tests { snapshot.healthy_nodes.insert(node.clone()); let unhealthy = HealthCheckStatus::new(None); // Act - let is_updated = snapshot - .update_node(&node, unhealthy) - .expect("node update failed"); + let is_updated = snapshot.update_node(&node, unhealthy); assert!(is_updated); assert!(!snapshot.has_nodes()); assert!(snapshot.next().is_none()); @@ -174,7 +167,7 @@ mod tests { let mut snapshot = RoundRobinRoutingSnapshot::new(); let node_1 = Node::new("api1.com").unwrap(); // Sync with node_1 - let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]); assert!(nodes_changed); assert!(snapshot.healthy_nodes.is_empty()); assert_eq!( @@ -184,7 +177,7 @@ mod tests { // Add node_1 to healthy_nodes manually snapshot.healthy_nodes.insert(node_1.clone()); // Sync with node_1 again - let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]); assert!(!nodes_changed); assert_eq!( snapshot.existing_nodes, @@ -193,7 +186,7 @@ mod tests { assert_eq!(snapshot.healthy_nodes, HashSet::from_iter(vec![node_1])); // Sync with node_2 let node_2 = Node::new("api2.com").unwrap(); - let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]).unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]); assert!(nodes_changed); assert_eq!( snapshot.existing_nodes, @@ -205,9 +198,7 @@ mod tests { snapshot.healthy_nodes.insert(node_2.clone()); // Sync with [node_2, node_3] let node_3 = Node::new("api3.com").unwrap(); - let nodes_changed = snapshot - .sync_nodes(&[node_3.clone(), node_2.clone()]) - .unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_3.clone(), node_2.clone()]); assert!(nodes_changed); assert_eq!( snapshot.existing_nodes, @@ -216,13 +207,13 @@ mod tests { assert_eq!(snapshot.healthy_nodes, HashSet::from_iter(vec![node_2])); snapshot.healthy_nodes.insert(node_3); // Sync with [] - let nodes_changed = snapshot.sync_nodes(&[]).unwrap(); + let nodes_changed = snapshot.sync_nodes(&[]); assert!(nodes_changed); assert!(snapshot.existing_nodes.is_empty()); // Make sure all nodes were removed from the healthy_nodes assert!(snapshot.healthy_nodes.is_empty()); // Sync with [] again - let nodes_changed = snapshot.sync_nodes(&[]).unwrap(); + let nodes_changed = snapshot.sync_nodes(&[]); assert!(!nodes_changed); assert!(snapshot.existing_nodes.is_empty()); } diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs index ef88b8df..155b8eac 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs @@ -2,14 +2,14 @@ use std::fmt::Debug; use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; -/// +/// A trait for interacting with the snapshot of nodes (routing table). pub trait RoutingSnapshot: Send + Sync + Clone + Debug { - /// + /// Returns `true` if the snapshot has nodes. fn has_nodes(&self) -> bool; - /// + /// Get the next node in the snapshot. fn next(&self) -> Option; - /// - fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result; - /// - fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result; + /// Syncs the nodes in the snapshot with the provided list of nodes, returning `true` if the snapshot was updated. + fn sync_nodes(&mut self, nodes: &[Node]) -> bool; + /// Updates the health status of a specific node, returning `true` if the node was found and updated. + fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool; }