From 39567c94731b5d3a3e476e682020c4a720ba3042 Mon Sep 17 00:00:00 2001 From: Nikolay Komarevskiy Date: Fri, 28 Jun 2024 11:54:36 +0200 Subject: [PATCH] docs: add --- .../dynamic_routing/dynamic_route_provider.rs | 50 +++++++----- .../dynamic_routing/health_check.rs | 76 ++++++++++++------- .../dynamic_routing/messages.rs | 10 +-- .../http_transport/dynamic_routing/mod.rs | 13 ++-- .../http_transport/dynamic_routing/node.rs | 10 +-- .../dynamic_routing/nodes_fetch.rs | 36 +++++---- .../snapshot/latency_based_routing.rs | 36 +++++---- .../dynamic_routing/snapshot/mod.rs | 3 - .../snapshot/round_robin_routing.rs | 22 +++--- .../dynamic_routing/test_utils.rs | 10 +-- .../dynamic_routing/type_aliases.rs | 17 +++-- 11 files changed, 158 insertions(+), 125 deletions(-) diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs b/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs index acb3ae9bc..eff84d3b9 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs @@ -25,7 +25,7 @@ use crate::{ node::Node, nodes_fetch::{Fetch, NodesFetchActor, NodesFetcher}, snapshot::routing_snapshot::RoutingSnapshot, - type_aliases::GlobalShared, + type_aliases::AtomicSwap, }, route_provider::RouteProvider, }, @@ -46,17 +46,28 @@ const HEALTH_CHECK_PERIOD: Duration = Duration::from_secs(1); const DYNAMIC_ROUTE_PROVIDER: &str = "DynamicRouteProvider"; -/// +/// A dynamic route provider. +/// It spawns the discovery service (`NodesFetchActor`) for fetching the latest nodes topology. +/// It also spawns the `HealthManagerActor`, which orchestrates the health check tasks for each node and updates routing snapshot. #[derive(Debug)] pub struct DynamicRouteProvider { + /// Fetcher for fetching the latest nodes topology. fetcher: Arc, + /// Periodicity of fetching the latest nodes topology. fetch_period: Duration, + /// Interval for retrying fetching the nodes in case of error. fetch_retry_interval: Duration, + /// Health checker for checking the health of the nodes. checker: Arc, + /// Periodicity of checking the health of the nodes. check_period: Duration, - snapshot: GlobalShared, + /// Snapshot of the routing nodes. + routing_snapshot: AtomicSwap, + /// Task tracker for managing the spawned tasks. tracker: TaskTracker, + /// Initial seed nodes, which are used for the initial fetching of the nodes. seeds: Vec, + /// Cancellation token for stopping the spawned tasks. token: CancellationToken, } @@ -65,7 +76,7 @@ where S: RoutingSnapshot + 'static, { fn route(&self) -> Result { - let snapshot = self.snapshot.load(); + let snapshot = self.routing_snapshot.load(); let node = snapshot.next().ok_or_else(|| { AgentError::RouteProviderError("No healthy API nodes found.".to_string()) })?; @@ -77,7 +88,7 @@ impl DynamicRouteProvider where S: RoutingSnapshot + 'static, { - /// + /// Creates a new instance of `DynamicRouteProvider`. pub fn new(snapshot: S, seeds: Vec, http_client: Client) -> Self { let fetcher = Arc::new(NodesFetcher::new( http_client.clone(), @@ -91,31 +102,31 @@ where checker, check_period: HEALTH_CHECK_PERIOD, seeds, - snapshot: Arc::new(ArcSwap::from_pointee(snapshot)), + routing_snapshot: Arc::new(ArcSwap::from_pointee(snapshot)), tracker: TaskTracker::new(), token: CancellationToken::new(), } } - /// + /// Sets the fetcher for fetching the latest nodes topology. pub fn with_fetcher(mut self, fetcher: Arc) -> Self { self.fetcher = fetcher; self } - /// + /// Sets the periodicity of fetching the latest nodes topology. pub fn with_fetch_period(mut self, period: Duration) -> Self { self.fetch_period = period; self } - /// + /// Sets the interval for retrying fetching the nodes in case of error. pub fn with_checker(mut self, checker: Arc) -> Self { self.checker = checker; self } - /// + /// Sets the periodicity of checking the health of the nodes. pub fn with_check_period(mut self, period: Duration) -> Self { self.check_period = period; self @@ -133,14 +144,14 @@ where // Communication channel between NodesFetchActor and HealthManagerActor. let (fetch_sender, fetch_receiver) = watch::channel(None); - // Communication channel with HealthManagerActor to receive info about healthy seeds. + // Communication channel with HealthManagerActor to receive info about healthy seed nodes (used only once). let (init_sender, mut init_receiver) = mpsc::channel(1); // Start the receiving part first. let health_manager_actor = HealthManagerActor::new( Arc::clone(&self.checker), self.check_period, - Arc::clone(&self.snapshot), + Arc::clone(&self.routing_snapshot), fetch_receiver, init_sender, self.token.clone(), @@ -156,7 +167,7 @@ where error!("{DYNAMIC_ROUTE_PROVIDER}: failed to send results to HealthManager: {err:?}"); } - // Try await healthy seeds. + // Try await for healthy seeds. let found_healthy_seeds = match timeout(TIMEOUT_AWAIT_HEALTHY_SEED, init_receiver.recv()).await { Ok(_) => { @@ -174,6 +185,7 @@ where false } }; + // We can close the channel now. init_receiver.close(); let fetch_actor = NodesFetchActor::new( @@ -181,7 +193,7 @@ where self.fetch_period, self.fetch_retry_interval, fetch_sender, - Arc::clone(&self.snapshot), + Arc::clone(&self.routing_snapshot), self.token.clone(), ); self.tracker.spawn(async move { fetch_actor.run().await }); @@ -189,9 +201,9 @@ where "{DYNAMIC_ROUTE_PROVIDER}: NodesFetchActor and HealthManagerActor started successfully" ); - (found_healthy_seeds) - .then_some(()) - .ok_or(anyhow!("No healthy seeds found")) + (found_healthy_seeds).then_some(()).ok_or(anyhow!( + "No healthy seeds found, they may become healthy later ..." + )) } /// Kill all running tasks. @@ -364,7 +376,7 @@ mod tests { .await .unwrap_err() .to_string() - .contains("No healthy seeds found")); + .contains("No healthy seeds found, they may become healthy later ...")); // Test 1: calls to route() return an error, as no healthy seeds exist. for _ in 0..4 { @@ -461,7 +473,7 @@ mod tests { .await .unwrap_err() .to_string() - .contains("No healthy seeds found")); + .contains("No healthy seeds found, they may become healthy later ...")); // Test: calls to route() return an error, as no healthy seeds exist. for _ in 0..4 { diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs b/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs index 9ce88b99f..befbd99ec 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs @@ -16,48 +16,50 @@ use crate::agent::http_transport::dynamic_routing::{ messages::{FetchedNodes, NodeHealthState}, node::Node, snapshot::routing_snapshot::RoutingSnapshot, - type_aliases::{GlobalShared, ReceiverMpsc, ReceiverWatch, SenderMpsc}, + type_aliases::{AtomicSwap, ReceiverMpsc, ReceiverWatch, SenderMpsc}, }; const CHANNEL_BUFFER: usize = 128; -/// +/// A trait representing a health check of the node. #[async_trait] pub trait HealthCheck: Send + Sync + Debug { - /// + /// Checks the health of the node. async fn check(&self, node: &Node) -> anyhow::Result; } -/// +/// A struct representing the health check status of the node. #[derive(Clone, PartialEq, Debug, Default)] pub struct HealthCheckStatus { - /// - pub latency: Option, + latency: Option, } -/// impl HealthCheckStatus { - /// + /// Creates a new `HealthCheckStatus` instance. pub fn new(latency: Option) -> Self { Self { latency } } - /// + /// Checks if the node is healthy. pub fn is_healthy(&self) -> bool { self.latency.is_some() } + + /// Get the latency of the health check. + pub fn latency(&self) -> Option { + self.latency + } } -/// +/// A struct implementing the `HealthCheck` for the nodes. #[derive(Debug)] pub struct HealthChecker { http_client: Client, timeout: Duration, } -/// impl HealthChecker { - /// + /// Creates a new `HealthChecker` instance. pub fn new(http_client: Client, timeout: Duration) -> Self { Self { http_client, @@ -96,16 +98,22 @@ impl HealthCheck for HealthChecker { const HEALTH_CHECK_ACTOR: &str = "HealthCheckActor"; +/// A struct performing the health check of the node and sending the health status to the listener. struct HealthCheckActor { + /// The health checker. checker: Arc, + /// The period of the health check. period: Duration, + /// The node to check. node: Node, + /// The sender channel (listener) to send the health status. sender_channel: SenderMpsc, + /// The cancellation token of the actor. token: CancellationToken, } impl HealthCheckActor { - pub fn new( + fn new( checker: Arc, period: Duration, node: Node, @@ -121,7 +129,8 @@ impl HealthCheckActor { } } - pub async fn run(self) { + /// Runs the actor. + async fn run(self) { let mut interval = time::interval(self.period); loop { tokio::select! { @@ -143,21 +152,34 @@ impl HealthCheckActor { } } -/// +/// The name of the health manager actor. pub const HEALTH_MANAGER_ACTOR: &str = "HealthManagerActor"; -/// +/// A struct managing the health checks of the nodes. +/// It receives the fetched nodes from the `NodesFetchActor` and starts the health checks for them. +/// It also receives the health status of the nodes from the `HealthCheckActor/s` and updates the routing snapshot. pub struct HealthManagerActor { + /// The health checker. checker: Arc, + /// The period of the health check. period: Duration, - nodes_snapshot: GlobalShared, + /// The routing snapshot, storing the nodes. + routing_snapshot: AtomicSwap, + /// The receiver channel to listen to the fetched nodes messages. fetch_receiver: ReceiverWatch, + /// The sender channel to send the health status of the nodes back to HealthManagerActor. check_sender: SenderMpsc, + /// The receiver channel to receive the health status of the nodes from the `HealthCheckActor/s`. check_receiver: ReceiverMpsc, + /// The sender channel to send the initialization status to DynamicRouteProvider (used only once in the init phase). init_sender: SenderMpsc, + /// The cancellation token of the actor. token: CancellationToken, + /// The cancellation token for all the health checks. nodes_token: CancellationToken, + /// The task tracker of the health checks, waiting for the tasks to exit (graceful termination). nodes_tracker: TaskTracker, + /// The flag indicating if this actor is initialized with healthy nodes. is_initialized: bool, } @@ -165,11 +187,11 @@ impl HealthManagerActor where S: RoutingSnapshot, { - /// + /// Creates a new `HealthManagerActor` instance. pub fn new( checker: Arc, period: Duration, - nodes_snapshot: GlobalShared, + routing_snapshot: AtomicSwap, fetch_receiver: ReceiverWatch, init_sender: SenderMpsc, token: CancellationToken, @@ -179,7 +201,7 @@ where Self { checker, period, - nodes_snapshot, + routing_snapshot, fetch_receiver, check_sender, check_receiver, @@ -191,11 +213,11 @@ where } } - /// + /// Runs the actor. pub async fn run(mut self) { loop { tokio::select! { - // Check if a new array of fetched nodes appeared in the channel from NodesFetchService. + // Process a new array of fetched nodes from NodesFetchActor, if it appeared in the channel. result = self.fetch_receiver.changed() => { if let Err(err) = result { error!("{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {err:?}"); @@ -206,7 +228,7 @@ where let Some(FetchedNodes { nodes }) = self.fetch_receiver.borrow_and_update().clone() else { continue }; self.handle_fetch_update(nodes).await; } - // Receive health check messages from all running NodeHealthChecker/s. + // Receive health check messages from all running HealthCheckActor/s. Some(msg) = self.check_receiver.recv() => { self.handle_health_update(msg).await; } @@ -221,13 +243,13 @@ where } async fn handle_health_update(&mut self, msg: NodeHealthState) { - let current_snapshot = self.nodes_snapshot.load_full(); + let current_snapshot = self.routing_snapshot.load_full(); let mut new_snapshot = (*current_snapshot).clone(); if let Err(err) = new_snapshot.update_node(&msg.node, msg.health.clone()) { error!("{HEALTH_MANAGER_ACTOR}: failed to update snapshot: {err:?}"); return; } - self.nodes_snapshot.store(Arc::new(new_snapshot)); + self.routing_snapshot.store(Arc::new(new_snapshot)); if !self.is_initialized && msg.health.is_healthy() { self.is_initialized = true; // If TIMEOUT_AWAIT_HEALTHY_SEED has been exceeded, the receiver was dropped and send would thus fail. We ignore the failure. @@ -244,11 +266,11 @@ where return; } debug!("{HEALTH_MANAGER_ACTOR}: fetched nodes received {:?}", nodes); - let current_snapshot = self.nodes_snapshot.load_full(); + let current_snapshot = self.routing_snapshot.load_full(); let mut new_snapshot = (*current_snapshot).clone(); // If the snapshot has changed, store it and restart all node's health checks. if let Ok(true) = new_snapshot.sync_nodes(&nodes) { - self.nodes_snapshot.store(Arc::new(new_snapshot)); + self.routing_snapshot.store(Arc::new(new_snapshot)); self.stop_all_checks().await; self.start_checks(nodes.to_vec()); } diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs b/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs index 90e25cee9..5feeae25c 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs @@ -1,16 +1,16 @@ use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; -/// +/// Represents a message with fetched nodes. #[derive(Debug, Clone)] pub struct FetchedNodes { - /// + /// The fetched nodes. pub nodes: Vec, } -/// +/// Represents a message with the health state of a node. pub struct NodeHealthState { - /// + /// The node. pub node: Node, - /// + /// The health state of the node. pub health: HealthCheckStatus, } diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs b/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs index d450ce678..9b6b69def 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs @@ -1,15 +1,16 @@ +//! Dynamic routing implementation. pub mod dynamic_route_provider; -/// +/// Health check implementation. pub mod health_check; -/// +/// Messages used in dynamic routing. pub mod messages; -/// +/// Node implementation. pub mod node; -/// +/// Nodes fetch implementation. pub mod nodes_fetch; -/// +/// Routing snapshot implementation. pub mod snapshot; #[cfg(test)] pub mod test_utils; -/// +/// Type aliases used in dynamic routing. pub mod type_aliases; diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/node.rs b/ic-agent/src/agent/http_transport/dynamic_routing/node.rs index d74a7cd09..482dfe89b 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/node.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/node.rs @@ -3,14 +3,14 @@ use url::Url; use crate::agent::ApiBoundaryNode; use anyhow::anyhow; -/// +/// Represents a node in the dynamic routing. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Node { domain: String, } impl Node { - /// + /// Creates a new `Node` instance from the domain name. pub fn new(domain: &str) -> anyhow::Result { if !is_valid_domain(domain) { return Err(anyhow!("Invalid domain name {domain}")); @@ -20,14 +20,14 @@ impl Node { }) } - /// + /// Returns the domain name of the node. pub fn domain(&self) -> String { self.domain.clone() } } impl Node { - /// + /// Converts the node to a routing URL. pub fn to_routing_url(&self) -> Url { Url::parse(&format!("https://{}/api/v2/", self.domain)).expect("failed to parse URL") } @@ -48,7 +48,7 @@ impl TryFrom<&ApiBoundaryNode> for Node { } } -/// +/// Checks if the given domain is a valid URL. pub fn is_valid_domain>(domain: S) -> bool { // Prepend scheme to make it a valid URL let url_string = format!("http://{}", domain.as_ref()); diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs b/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs index e10b5f55b..9f81a4016 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs @@ -12,7 +12,7 @@ use crate::agent::{ http_transport::{ dynamic_routing::{ health_check::HEALTH_MANAGER_ACTOR, messages::FetchedNodes, node::Node, - snapshot::routing_snapshot::RoutingSnapshot, type_aliases::GlobalShared, + snapshot::routing_snapshot::RoutingSnapshot, type_aliases::AtomicSwap, type_aliases::SenderWatch, }, reqwest_transport::ReqwestTransport, @@ -22,14 +22,14 @@ use crate::agent::{ const NODES_FETCH_ACTOR: &str = "NodesFetchActor"; -/// +/// Fetcher of nodes in the topology. #[async_trait] pub trait Fetch: Sync + Send + Debug { - /// + /// Fetches the nodes from the topology. async fn fetch(&self, url: Url) -> anyhow::Result>; } -/// +/// A struct representing the fetcher of the nodes from the topology. #[derive(Debug)] pub struct NodesFetcher { http_client: Client, @@ -37,7 +37,7 @@ pub struct NodesFetcher { } impl NodesFetcher { - /// + /// Creates a new `NodesFetcher` instance. pub fn new(http_client: Client, subnet_id: Principal) -> Self { Self { http_client, @@ -71,13 +71,19 @@ impl Fetch for NodesFetcher { } } -/// +/// A struct representing the actor responsible for fetching existing nodes and communicating it with the listener. pub struct NodesFetchActor { + /// The fetcher object responsible for fetching the nodes. fetcher: Arc, + /// Time period between fetches. period: Duration, + /// The interval to wait before retrying to fetch the nodes in case of failures. fetch_retry_interval: Duration, + /// Communication channel with the listener. fetch_sender: SenderWatch, - snapshot: GlobalShared, + /// The snapshot of the routing table. + routing_snapshot: AtomicSwap, + /// The token to cancel/stop the actor. token: CancellationToken, } @@ -85,13 +91,13 @@ impl NodesFetchActor where S: RoutingSnapshot, { - /// + /// Creates a new `NodesFetchActor` instance. pub fn new( fetcher: Arc, period: Duration, retry_interval: Duration, fetch_sender: SenderWatch, - snapshot: GlobalShared, + snapshot: AtomicSwap, token: CancellationToken, ) -> Self { Self { @@ -99,26 +105,26 @@ where period, fetch_retry_interval: retry_interval, fetch_sender, - snapshot, + routing_snapshot: snapshot, token, } } - /// + /// Runs the actor. pub async fn run(self) { let mut interval = time::interval(self.period); loop { tokio::select! { _ = interval.tick() => { // Retry until success: - // - try to get a healthy node from the snapshot + // - try to get a healthy node from the routing snapshot // - if snapshot is empty, break the cycle and wait for the next fetch cycle - // - using the healthy node, try to fetch new nodes from topology + // - using the healthy node, try to fetch nodes from topology // - if failure, sleep and retry // - try send fetched nodes to the listener - // - failure should never happen + // - failure should never happen, but we trace it if it does loop { - let snapshot = self.snapshot.load(); + let snapshot = self.routing_snapshot.load(); if let Some(node) = snapshot.next() { match self.fetcher.fetch((&node).into()).await { Ok(nodes) => { diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs index cb4826a68..cec2580c1 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs @@ -16,23 +16,28 @@ const WINDOW_SIZE: usize = 15; // Space complexity: O(N) type LatencyMovAvg = SumTreeSMA; +/// A node, which stores health check latencies in the form of moving average. #[derive(Clone, Debug)] struct WeightedNode { node: Node, + /// Moving mean of latencies measurements. latency_mov_avg: LatencyMovAvg, + /// Weight of the node (invers of the average latency), used for stochastic weighted random sampling. weight: f64, } -/// +/// Routing snapshot for latency-based routing. +/// In this routing strategy, nodes are randomly selected based on their averaged latency of the last WINDOW_SIZE health checks. +/// Nodes with smaller average latencies are preferred for routing. #[derive(Default, Debug, Clone)] pub struct LatencyRoutingSnapshot { weighted_nodes: Vec, existing_nodes: HashSet, } -/// +/// Implementation of the LatencyRoutingSnapshot. impl LatencyRoutingSnapshot { - /// + /// Creates a new LatencyRoutingSnapshot. pub fn new() -> Self { Self { weighted_nodes: vec![], @@ -41,7 +46,8 @@ impl LatencyRoutingSnapshot { } } -// select weight index based on the input number in range [0, 1] +/// Helper function to sample nodes based on their weights. +/// Here weight index is selected based on the input number in range [0, 1] #[inline(always)] fn weighted_sample(weights: &[f64], number: f64) -> Option { if !(0.0..=1.0).contains(&number) { @@ -80,7 +86,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result { let new_nodes = HashSet::from_iter(nodes.iter().cloned()); - // Find nodes removed from snapshot. + // Find nodes removed from topology. let nodes_removed: Vec<_> = self .existing_nodes .difference(&new_nodes) @@ -110,7 +116,7 @@ impl RoutingSnapshot for LatencyRoutingSnapshot { } // If latency is None (meaning Node is unhealthy), we assign some big value - let latency = health.latency.unwrap_or(MAX_LATENCY); + let latency = health.latency().unwrap_or(MAX_LATENCY); if let Some(idx) = self.weighted_nodes.iter().position(|x| &x.node == node) { // Node is already in the array (it is not the first update_node() call). @@ -166,9 +172,7 @@ mod tests { // Arrange let mut snapshot = LatencyRoutingSnapshot::new(); let node = Node::new("api1.com").unwrap(); - let health = HealthCheckStatus { - latency: Some(Duration::from_secs(1)), - }; + let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); // Act let is_updated = snapshot .update_node(&node, health) @@ -185,9 +189,7 @@ mod tests { // Arrange let mut snapshot = LatencyRoutingSnapshot::new(); let node = Node::new("api1.com").unwrap(); - let health = HealthCheckStatus { - latency: Some(Duration::from_secs(1)), - }; + let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); snapshot.existing_nodes.insert(node.clone()); // Check first update let is_updated = snapshot @@ -203,9 +205,7 @@ mod tests { assert_eq!(weighted_node.weight, 1.0); assert_eq!(snapshot.next().unwrap(), node); // Check second update - let health = HealthCheckStatus { - latency: Some(Duration::from_secs(2)), - }; + let health = HealthCheckStatus::new(Some(Duration::from_secs(2))); let is_updated = snapshot .update_node(&node, health) .expect("node update failed"); @@ -217,9 +217,7 @@ mod tests { ); assert_eq!(weighted_node.weight, 1.0 / 1.5); // Check third update - let health = HealthCheckStatus { - latency: Some(Duration::from_secs(3)), - }; + let health = HealthCheckStatus::new(Some(Duration::from_secs(3))); let is_updated = snapshot .update_node(&node, health) .expect("node update failed"); @@ -231,7 +229,7 @@ mod tests { ); assert_eq!(weighted_node.weight, 0.5); // Check forth update with none - let health = HealthCheckStatus { latency: None }; + let health = HealthCheckStatus::new(None); let is_updated = snapshot .update_node(&node, health) .expect("node update failed"); diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs index 3695f1d3a..1c63df8bf 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs @@ -1,6 +1,3 @@ -/// pub mod latency_based_routing; -/// pub mod round_robin_routing; -/// pub mod routing_snapshot; diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs index 21216c5ce..e468d2629 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs @@ -10,7 +10,7 @@ use crate::agent::http_transport::dynamic_routing::{ health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot, }; -/// +/// Routing snapshot, which samples nodes in a round-robin fashion. #[derive(Default, Debug, Clone)] pub struct RoundRobinRoutingSnapshot { current_idx: Arc, @@ -19,7 +19,7 @@ pub struct RoundRobinRoutingSnapshot { } impl RoundRobinRoutingSnapshot { - /// + /// Creates a new instance of `RoundRobinRoutingSnapshot`. pub fn new() -> Self { Self { current_idx: Arc::new(AtomicUsize::new(0)), @@ -47,14 +47,14 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot { fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result { let new_nodes = HashSet::from_iter(nodes.iter().cloned()); - // Find nodes removed from snapshot. + // Find nodes removed from topology. let nodes_removed: Vec<_> = self .existing_nodes .difference(&new_nodes) .cloned() .collect(); let has_removed_nodes = !nodes_removed.is_empty(); - // Find nodes added to snapshot. + // Find nodes added to topology. let nodes_added: Vec<_> = new_nodes .difference(&self.existing_nodes) .cloned() @@ -74,7 +74,7 @@ impl RoutingSnapshot for RoundRobinRoutingSnapshot { if !self.existing_nodes.contains(node) { return Ok(false); } - if health.latency.is_some() { + if health.is_healthy() { Ok(self.healthy_nodes.insert(node.clone())) } else { Ok(self.healthy_nodes.remove(node)) @@ -113,10 +113,8 @@ mod tests { let mut snapshot = RoundRobinRoutingSnapshot::new(); // This node is not present in existing_nodes let node = Node::new("api1.com").unwrap(); - let healthy = HealthCheckStatus { - latency: Some(Duration::from_secs(1)), - }; - let unhealthy = HealthCheckStatus { latency: None }; + let healthy = HealthCheckStatus::new(Some(Duration::from_secs(1))); + let unhealthy = HealthCheckStatus::new(None); // Act 1 let is_updated = snapshot .update_node(&node, healthy) @@ -142,9 +140,7 @@ mod tests { let node = Node::new("api1.com").unwrap(); // node is present in existing_nodes, but not in healthy_nodes snapshot.existing_nodes.insert(node.clone()); - let health = HealthCheckStatus { - latency: Some(Duration::from_secs(1)), - }; + let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); // Act let is_updated = snapshot .update_node(&node, health) @@ -162,7 +158,7 @@ mod tests { let node = Node::new("api1.com").unwrap(); snapshot.existing_nodes.insert(node.clone()); snapshot.healthy_nodes.insert(node.clone()); - let unhealthy = HealthCheckStatus { latency: None }; + let unhealthy = HealthCheckStatus::new(None); // Act let is_updated = snapshot .update_node(&node, unhealthy) diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs b/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs index a1d659c98..8b411ed20 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs @@ -11,7 +11,7 @@ use crate::agent::http_transport::{ health_check::{HealthCheck, HealthCheckStatus}, node::Node, nodes_fetch::Fetch, - type_aliases::GlobalShared, + type_aliases::AtomicSwap, }, route_provider::RouteProvider, }; @@ -54,8 +54,8 @@ where #[derive(Debug)] pub struct NodesFetcherMock { - // A mocked set of nodes existing in the topology. - pub nodes: GlobalShared>, + // A set of nodes, existing in the topology. + pub nodes: AtomicSwap>, } #[async_trait] @@ -86,7 +86,7 @@ impl NodesFetcherMock { #[derive(Debug)] pub struct NodeHealthCheckerMock { - pub healthy_nodes: GlobalShared>, + healthy_nodes: Arc>>, } impl Default for NodeHealthCheckerMock { @@ -103,7 +103,7 @@ impl HealthCheck for NodeHealthCheckerMock { true => Some(Duration::from_secs(1)), false => None, }; - Ok(HealthCheckStatus { latency }) + Ok(HealthCheckStatus::new(latency)) } } diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs b/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs index 33ef77ea2..04f32b661 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs @@ -1,17 +1,18 @@ -use std::sync::Arc; - use arc_swap::ArcSwap; +use std::sync::Arc; use tokio::sync::{mpsc, watch}; -/// +/// A type alias for the sender end of a watch channel. pub type SenderWatch = watch::Sender>; -/// + +/// A type alias for the receiver end of a watch channel. pub type ReceiverWatch = watch::Receiver>; -/// +/// A type alias for the sender end of a multi-producer, single-consumer channel. pub type SenderMpsc = mpsc::Sender; -/// + +/// A type alias for the receiver end of a multi-producer, single-consumer channel. pub type ReceiverMpsc = mpsc::Receiver; -/// -pub type GlobalShared = Arc>; +/// A type alias for an atomic swap operation on a shared value. +pub type AtomicSwap = Arc>;