diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs b/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs index eff84d3b..ceb87e08 100644 --- a/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs +++ b/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs @@ -10,6 +10,7 @@ use arc_swap::ArcSwap; use candid::Principal; use reqwest::Client; use tokio::{ + runtime::Handle, sync::{mpsc, watch}, time::timeout, }; @@ -71,24 +72,19 @@ pub struct DynamicRouteProvider { token: CancellationToken, } -impl RouteProvider for DynamicRouteProvider -where - S: RoutingSnapshot + 'static, -{ - fn route(&self) -> Result { - let snapshot = self.routing_snapshot.load(); - let node = snapshot.next().ok_or_else(|| { - AgentError::RouteProviderError("No healthy API nodes found.".to_string()) - })?; - Ok(node.to_routing_url()) - } +/// A builder for the `DynamicRouteProvider`. +pub struct DynamicRouteProviderBuilder { + fetcher: Arc, + fetch_period: Duration, + fetch_retry_interval: Duration, + checker: Arc, + check_period: Duration, + routing_snapshot: AtomicSwap, + seeds: Vec, } -impl DynamicRouteProvider -where - S: RoutingSnapshot + 'static, -{ - /// Creates a new instance of `DynamicRouteProvider`. +impl DynamicRouteProviderBuilder { + /// Creates a new instance of the builder. pub fn new(snapshot: S, seeds: Vec, http_client: Client) -> Self { let fetcher = Arc::new(NodesFetcher::new( http_client.clone(), @@ -103,35 +99,75 @@ where check_period: HEALTH_CHECK_PERIOD, seeds, routing_snapshot: Arc::new(ArcSwap::from_pointee(snapshot)), - tracker: TaskTracker::new(), - token: CancellationToken::new(), } } - /// Sets the fetcher for fetching the latest nodes topology. + /// Sets the fetcher of the nodes in the topology. pub fn with_fetcher(mut self, fetcher: Arc) -> Self { self.fetcher = fetcher; self } - /// Sets the periodicity of fetching the latest nodes topology. + /// Sets the fetching periodicity. pub fn with_fetch_period(mut self, period: Duration) -> Self { self.fetch_period = period; self } - /// Sets the interval for retrying fetching the nodes in case of error. + /// Sets the node health checker. pub fn with_checker(mut self, checker: Arc) -> Self { self.checker = checker; self } - /// Sets the periodicity of checking the health of the nodes. + /// Sets the periodicity of node health checking. pub fn with_check_period(mut self, period: Duration) -> Self { self.check_period = period; self } + /// Builds an instance of the `DynamicRouteProvider`. + pub async fn build(self) -> DynamicRouteProvider + where + S: RoutingSnapshot + 'static, + { + let route_provider = DynamicRouteProvider { + fetcher: self.fetcher, + fetch_period: self.fetch_period, + fetch_retry_interval: self.fetch_retry_interval, + checker: self.checker, + check_period: self.check_period, + routing_snapshot: self.routing_snapshot, + tracker: TaskTracker::new(), + seeds: self.seeds, + token: CancellationToken::new(), + }; + + if let Err(err) = route_provider.run().await { + error!("{DYNAMIC_ROUTE_PROVIDER}: started in unhealthy state: {err:?}"); + } + + route_provider + } +} + +impl RouteProvider for DynamicRouteProvider +where + S: RoutingSnapshot + 'static, +{ + fn route(&self) -> Result { + let snapshot = self.routing_snapshot.load(); + let node = snapshot.next().ok_or_else(|| { + AgentError::RouteProviderError("No healthy API nodes found.".to_string()) + })?; + Ok(node.to_routing_url()) + } +} + +impl DynamicRouteProvider +where + S: RoutingSnapshot + 'static, +{ /// Starts two background tasks: /// - Task1: NodesFetchActor /// - Periodically fetches existing API nodes (gets latest nodes topology) and sends discovered nodes to HealthManagerActor. @@ -202,16 +238,26 @@ where ); (found_healthy_seeds).then_some(()).ok_or(anyhow!( - "No healthy seeds found, they may become healthy later ..." + "No healthy seeds found within {TIMEOUT_AWAIT_HEALTHY_SEED:?}, they may become healthy later ..." )) } +} - /// Kill all running tasks. - pub async fn stop(&self) { +// Gracefully stop the inner spawned tasks running in the background. +impl Drop for DynamicRouteProvider { + fn drop(&mut self) { self.token.cancel(); self.tracker.close(); - self.tracker.wait().await; - warn!("{DYNAMIC_ROUTE_PROVIDER}: gracefully stopped"); + let tracker = self.tracker.clone(); + // If no runtime is available do nothing. + if let Ok(handle) = Handle::try_current() { + let _ = handle.spawn(async move { + tracker.wait().await; + warn!("{DYNAMIC_ROUTE_PROVIDER}: stopped gracefully"); + }); + } else { + error!("{DYNAMIC_ROUTE_PROVIDER}: no runtime available, cannot stop the spawned tasks"); + } } } @@ -228,7 +274,7 @@ mod tests { use crate::{ agent::http_transport::{ dynamic_routing::{ - dynamic_route_provider::{DynamicRouteProvider, IC0_SEED_DOMAIN}, + dynamic_route_provider::{DynamicRouteProviderBuilder, IC0_SEED_DOMAIN}, node::Node, snapshot::round_robin_routing::RoundRobinRoutingSnapshot, test_utils::{ @@ -266,14 +312,15 @@ mod tests { // Configure RouteProvider let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); - let route_provider = Arc::new( - DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client) + let route_provider = + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) .with_fetcher(fetcher.clone()) .with_checker(checker.clone()) .with_fetch_period(fetch_interval) - .with_check_period(check_interval), - ); - route_provider.run().await.expect("no healthy seeds found"); + .with_check_period(check_interval) + .build() + .await; + let route_provider = Arc::new(route_provider); // This time span is required for the snapshot to be fully updated with the new nodes and their health info. let snapshot_update_duration = fetch_interval + 2 * check_interval; @@ -341,9 +388,6 @@ mod tests { tokio::time::sleep(snapshot_update_duration).await; let routed_domains = route_n_times(3, Arc::clone(&route_provider)); assert_routed_domains(routed_domains, vec![node_2.domain()], 3); - - // Teardown. - route_provider.stop().await; } #[tokio::test] @@ -364,19 +408,18 @@ mod tests { // Configure RouteProvider let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); - let route_provider = Arc::new( - DynamicRouteProvider::new(snapshot, vec![node_1.clone(), node_2.clone()], client) - .with_fetcher(fetcher.clone()) - .with_checker(checker.clone()) - .with_fetch_period(fetch_interval) - .with_check_period(check_interval), - ); - assert!(route_provider - .run() - .await - .unwrap_err() - .to_string() - .contains("No healthy seeds found, they may become healthy later ...")); + let route_provider = DynamicRouteProviderBuilder::new( + snapshot, + vec![node_1.clone(), node_2.clone()], + client, + ) + .with_fetcher(fetcher) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval) + .build() + .await; + let route_provider = Arc::new(route_provider); // Test 1: calls to route() return an error, as no healthy seeds exist. for _ in 0..4 { @@ -393,9 +436,6 @@ mod tests { tokio::time::sleep(3 * check_interval).await; let routed_domains = route_n_times(6, Arc::clone(&route_provider)); assert_routed_domains(routed_domains, vec![node_1.domain(), node_2.domain()], 3); - - // Teardown. - route_provider.stop().await; } #[tokio::test] @@ -415,14 +455,15 @@ mod tests { // Configure RouteProvider let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); - let route_provider = Arc::new( - DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client) - .with_fetcher(fetcher.clone()) + let route_provider = + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) + .with_fetcher(fetcher) .with_checker(checker.clone()) .with_fetch_period(fetch_interval) - .with_check_period(check_interval), - ); - route_provider.run().await.expect("no healthy seeds found"); + .with_check_period(check_interval) + .build() + .await; + let route_provider = Arc::new(route_provider); // Test 1: multiple route() calls return a single domain=ic0.app, as the seed is healthy. tokio::time::sleep(2 * check_interval).await; @@ -439,9 +480,6 @@ mod tests { AgentError::RouteProviderError("No healthy API nodes found.".to_string()) ); } - - // Teardown. - route_provider.stop().await; } #[tokio::test] @@ -461,19 +499,14 @@ mod tests { // Configure RouteProvider let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); - let route_provider = Arc::new( - DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client) - .with_fetcher(fetcher.clone()) - .with_checker(checker.clone()) + let route_provider = + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) + .with_fetcher(fetcher) + .with_checker(checker) .with_fetch_period(fetch_interval) - .with_check_period(check_interval), - ); - assert!(route_provider - .run() - .await - .unwrap_err() - .to_string() - .contains("No healthy seeds found, they may become healthy later ...")); + .with_check_period(check_interval) + .build() + .await; // Test: calls to route() return an error, as no healthy seeds exist. for _ in 0..4 { @@ -484,9 +517,6 @@ mod tests { AgentError::RouteProviderError("No healthy API nodes found.".to_string()) ); } - - // Teardown. - route_provider.stop().await; } #[tokio::test] @@ -507,14 +537,18 @@ mod tests { // Configure RouteProvider let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); - let route_provider = Arc::new( - DynamicRouteProvider::new(snapshot, vec![node_1.clone(), node_2.clone()], client) - .with_fetcher(fetcher.clone()) - .with_checker(checker.clone()) - .with_fetch_period(fetch_interval) - .with_check_period(check_interval), - ); - route_provider.run().await.expect("no healthy seeds found"); + let route_provider = DynamicRouteProviderBuilder::new( + snapshot, + vec![node_1.clone(), node_2.clone()], + client, + ) + .with_fetcher(fetcher) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval) + .build() + .await; + let route_provider = Arc::new(route_provider); // Test 1: calls to route() return only a healthy seed ic0.app. let routed_domains = route_n_times(3, Arc::clone(&route_provider)); @@ -525,9 +559,6 @@ mod tests { tokio::time::sleep(2 * check_interval).await; let routed_domains = route_n_times(6, Arc::clone(&route_provider)); assert_routed_domains(routed_domains, vec![node_1.domain(), node_2.domain()], 3); - - // Teardown. - route_provider.stop().await; } #[tokio::test] @@ -548,14 +579,15 @@ mod tests { // Configure RouteProvider let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); - let route_provider = Arc::new( - DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client) + let route_provider = + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) .with_fetcher(fetcher.clone()) .with_checker(checker.clone()) .with_fetch_period(fetch_interval) - .with_check_period(check_interval), - ); - route_provider.run().await.expect("no healthy seeds found"); + .with_check_period(check_interval) + .build() + .await; + let route_provider = Arc::new(route_provider); // This time span is required for the snapshot to be fully updated with the new nodes topology and health info. let snapshot_update_duration = fetch_interval + 2 * check_interval; @@ -579,8 +611,5 @@ mod tests { vec![node_1.domain(), node_2.domain(), node_3.domain()], 2, ); - - // Teardown. - route_provider.stop().await; } }