diff --git a/CHANGELOG.md b/CHANGELOG.md index f95901ec..09e8a970 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## [0.37.1] - 2024-07-25 + * Bug fix: Add `api/v2` prefix to read_state requests for hyper transport ## [0.37.0] - 2024-07-23 diff --git a/Cargo.lock b/Cargo.lock index 7a31d329..c2e4fb46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1083,7 +1083,7 @@ dependencies = [ [[package]] name = "ic-agent" -version = "0.37.0" +version = "0.37.1" dependencies = [ "async-lock", "backoff", @@ -1148,7 +1148,7 @@ dependencies = [ [[package]] name = "ic-identity-hsm" -version = "0.37.0" +version = "0.37.1" dependencies = [ "hex", "ic-agent", @@ -1160,7 +1160,7 @@ dependencies = [ [[package]] name = "ic-transport-types" -version = "0.37.0" +version = "0.37.1" dependencies = [ "candid", "hex", @@ -1176,7 +1176,7 @@ dependencies = [ [[package]] name = "ic-utils" -version = "0.37.0" +version = "0.37.1" dependencies = [ "async-trait", "candid", @@ -1239,7 +1239,7 @@ dependencies = [ [[package]] name = "icx" -version = "0.37.0" +version = "0.37.1" dependencies = [ "anyhow", "candid", @@ -1257,7 +1257,7 @@ dependencies = [ [[package]] name = "icx-cert" -version = "0.37.0" +version = "0.37.1" dependencies = [ "anyhow", "base64", diff --git a/Cargo.toml b/Cargo.toml index a435ed5e..535c20fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ members = [ ] [workspace.package] -version = "0.37.0" +version = "0.37.1" authors = ["DFINITY Stiftung "] edition = "2021" repository = "https://github.com/dfinity/agent-rs" @@ -22,9 +22,9 @@ rust-version = "1.75.0" license = "Apache-2.0" [workspace.dependencies] -ic-agent = { path = "ic-agent", version = "0.37.0", default-features = false } -ic-utils = { path = "ic-utils", version = "0.37.0" } -ic-transport-types = { path = "ic-transport-types", version = "0.37.0" } +ic-agent = { path = "ic-agent", version = "0.37.1", default-features = false } +ic-utils = { path = "ic-utils", version = "0.37.1" } +ic-transport-types = { path = "ic-transport-types", version = "0.37.1" } ic-certification = "2.2" candid = "0.10.1" diff --git a/ic-agent/Cargo.toml b/ic-agent/Cargo.toml index 18923d8e..6be9470f 100644 --- a/ic-agent/Cargo.toml +++ b/ic-agent/Cargo.toml @@ -74,8 +74,14 @@ hyper-rustls = { version = "0.27", default-features = false, features = [ "http1", "http2", ], optional = true } -tokio = { version = "1.24.2", features = ["time"] } +tokio = { version = "1.24.2", features = ["macros", "time"] } tower = { version = "0.4.13", optional = true } +async-trait = "^0.1.0" +tracing = "^0.1.0" +arc-swap = "^1.0.0" +simple_moving_average = "^1.0.0" +tracing-subscriber = "^0.2.0" +tokio-util = { version = "^0.7.0", features = ["rt"] } rustls-webpki = "0.102" [target.'cfg(target_family = "wasm")'.dependencies] diff --git a/ic-agent/src/agent/builder.rs b/ic-agent/src/agent/builder.rs index 77c998d4..02a5d037 100644 --- a/ic-agent/src/agent/builder.rs +++ b/ic-agent/src/agent/builder.rs @@ -16,6 +16,38 @@ impl AgentBuilder { Agent::new(self.config) } + #[cfg(all(feature = "reqwest", not(target_family = "wasm")))] + /// Set the dynamic transport layer for the [`Agent`], performing continuos discovery of the API boundary nodes and routing traffic via them based on the latencies. + pub async fn with_discovery_transport(self, client: reqwest::Client) -> Self { + use crate::agent::http_transport::{ + dynamic_routing::{ + dynamic_route_provider::{DynamicRouteProviderBuilder, IC0_SEED_DOMAIN}, + node::Node, + snapshot::latency_based_routing::LatencyRoutingSnapshot, + }, + route_provider::RouteProvider, + ReqwestTransport, + }; + + // TODO: This is a temporary solution to get the seed node. + let seed = Node::new(IC0_SEED_DOMAIN).unwrap(); + + let route_provider = DynamicRouteProviderBuilder::new( + LatencyRoutingSnapshot::new(), + vec![seed], + client.clone(), + ) + .build() + .await; + + let route_provider = Arc::new(route_provider) as Arc; + + let transport = ReqwestTransport::create_with_client_route(route_provider, client) + .expect("failed to create transport"); + + self.with_transport(transport) + } + /// Set the URL of the [Agent]. #[cfg(feature = "reqwest")] pub fn with_url>(self, url: S) -> Self { diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs b/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs new file mode 100644 index 00000000..cb657ae2 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs @@ -0,0 +1,704 @@ +//! An implementation of the [`RouteProvider`](crate::agent::http_transport::route_provider::RouteProvider) for dynamic generation of routing urls. + +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use arc_swap::ArcSwap; +use candid::Principal; +use reqwest::Client; +use thiserror::Error; +use tokio::{ + runtime::Handle, + sync::{mpsc, watch}, + time::timeout, +}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tracing::{error, info, warn}; +use url::Url; + +use crate::{ + agent::http_transport::{ + dynamic_routing::{ + health_check::{HealthCheck, HealthChecker, HealthManagerActor}, + messages::FetchedNodes, + node::Node, + nodes_fetch::{Fetch, NodesFetchActor, NodesFetcher}, + snapshot::routing_snapshot::RoutingSnapshot, + type_aliases::AtomicSwap, + }, + route_provider::RouteProvider, + }, + AgentError, +}; + +/// +pub const IC0_SEED_DOMAIN: &str = "ic0.app"; + +const MAINNET_ROOT_SUBNET_ID: &str = + "tdb26-jop6k-aogll-7ltgs-eruif-6kk7m-qpktf-gdiqx-mxtrf-vb5e6-eqe"; + +const FETCH_PERIOD: Duration = Duration::from_secs(5); +const FETCH_RETRY_INTERVAL: Duration = Duration::from_millis(250); +const TIMEOUT_AWAIT_HEALTHY_SEED: Duration = Duration::from_millis(1000); +const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(2); +const HEALTH_CHECK_PERIOD: Duration = Duration::from_secs(1); + +const DYNAMIC_ROUTE_PROVIDER: &str = "DynamicRouteProvider"; + +/// A dynamic route provider. +/// It spawns the discovery service (`NodesFetchActor`) for fetching the latest nodes topology. +/// It also spawns the `HealthManagerActor`, which orchestrates the health check tasks for each node and updates routing snapshot. +#[derive(Debug)] +pub struct DynamicRouteProvider { + /// Fetcher for fetching the latest nodes topology. + fetcher: Arc, + /// Periodicity of fetching the latest nodes topology. + fetch_period: Duration, + /// Interval for retrying fetching the nodes in case of error. + fetch_retry_interval: Duration, + /// Health checker for checking the health of the nodes. + checker: Arc, + /// Periodicity of checking the health of the nodes. + check_period: Duration, + /// Snapshot of the routing nodes. + routing_snapshot: AtomicSwap, + /// Task tracker for managing the spawned tasks. + tracker: TaskTracker, + /// Initial seed nodes, which are used for the initial fetching of the nodes. + seeds: Vec, + /// Cancellation token for stopping the spawned tasks. + token: CancellationToken, +} + +/// An error that occurred when the DynamicRouteProvider service was running. +#[derive(Error, Debug)] +pub enum DynamicRouteProviderError { + /// An error when fetching topology of the API nodes. + #[error("An error when fetching API nodes: {0}")] + NodesFetchError(String), + /// An error when checking API node's health. + #[error("An error when checking API node's health: {0}")] + HealthCheckError(String), + /// An invalid domain name provided. + #[error("Provided domain name is invalid: {0}")] + InvalidDomainName(String), +} + +/// A builder for the `DynamicRouteProvider`. +pub struct DynamicRouteProviderBuilder { + fetcher: Arc, + fetch_period: Duration, + fetch_retry_interval: Duration, + checker: Arc, + check_period: Duration, + routing_snapshot: AtomicSwap, + seeds: Vec, +} + +impl DynamicRouteProviderBuilder { + /// Creates a new instance of the builder. + pub fn new(snapshot: S, seeds: Vec, http_client: Client) -> Self { + let fetcher = Arc::new(NodesFetcher::new( + http_client.clone(), + Principal::from_text(MAINNET_ROOT_SUBNET_ID).unwrap(), + None, + )); + let checker = Arc::new(HealthChecker::new(http_client, HEALTH_CHECK_TIMEOUT)); + Self { + fetcher, + fetch_period: FETCH_PERIOD, + fetch_retry_interval: FETCH_RETRY_INTERVAL, + checker, + check_period: HEALTH_CHECK_PERIOD, + seeds, + routing_snapshot: Arc::new(ArcSwap::from_pointee(snapshot)), + } + } + + /// Sets the fetcher of the nodes in the topology. + pub fn with_fetcher(mut self, fetcher: Arc) -> Self { + self.fetcher = fetcher; + self + } + + /// Sets the fetching periodicity. + pub fn with_fetch_period(mut self, period: Duration) -> Self { + self.fetch_period = period; + self + } + + /// Sets the node health checker. + pub fn with_checker(mut self, checker: Arc) -> Self { + self.checker = checker; + self + } + + /// Sets the periodicity of node health checking. + pub fn with_check_period(mut self, period: Duration) -> Self { + self.check_period = period; + self + } + + /// Builds an instance of the `DynamicRouteProvider`. + pub async fn build(self) -> DynamicRouteProvider + where + S: RoutingSnapshot + 'static, + { + let route_provider = DynamicRouteProvider { + fetcher: self.fetcher, + fetch_period: self.fetch_period, + fetch_retry_interval: self.fetch_retry_interval, + checker: self.checker, + check_period: self.check_period, + routing_snapshot: self.routing_snapshot, + tracker: TaskTracker::new(), + seeds: self.seeds, + token: CancellationToken::new(), + }; + + route_provider.run().await; + + route_provider + } +} + +impl RouteProvider for DynamicRouteProvider +where + S: RoutingSnapshot + 'static, +{ + fn route(&self) -> Result { + let snapshot = self.routing_snapshot.load(); + let node = snapshot.next().ok_or_else(|| { + AgentError::RouteProviderError("No healthy API nodes found.".to_string()) + })?; + Ok(node.to_routing_url()) + } +} + +impl DynamicRouteProvider +where + S: RoutingSnapshot + 'static, +{ + /// Starts two background tasks: + /// - Task1: NodesFetchActor + /// - Periodically fetches existing API nodes (gets latest nodes topology) and sends discovered nodes to HealthManagerActor. + /// - Task2: HealthManagerActor: + /// - Listens to the fetched nodes messages from the NodesFetchActor. + /// - Starts/stops health check tasks (HealthCheckActors) based on the newly added/removed nodes. + /// - These spawned health check tasks periodically update the snapshot with the latest node health info. + pub async fn run(&self) { + info!("{DYNAMIC_ROUTE_PROVIDER}: started ..."); + // Communication channel between NodesFetchActor and HealthManagerActor. + let (fetch_sender, fetch_receiver) = watch::channel(None); + + // Communication channel with HealthManagerActor to receive info about healthy seed nodes (used only once). + let (init_sender, mut init_receiver) = mpsc::channel(1); + + // Start the receiving part first. + let health_manager_actor = HealthManagerActor::new( + Arc::clone(&self.checker), + self.check_period, + Arc::clone(&self.routing_snapshot), + fetch_receiver, + init_sender, + self.token.clone(), + ); + self.tracker + .spawn(async move { health_manager_actor.run().await }); + + // Dispatch all seed nodes for initial health checks + if let Err(err) = fetch_sender.send(Some(FetchedNodes { + nodes: self.seeds.clone(), + })) { + error!("{DYNAMIC_ROUTE_PROVIDER}: failed to send results to HealthManager: {err:?}"); + } + + // Try await for healthy seeds. + let start = Instant::now(); + match timeout(TIMEOUT_AWAIT_HEALTHY_SEED, init_receiver.recv()).await { + Ok(_) => info!( + "{DYNAMIC_ROUTE_PROVIDER}: found healthy seeds within {:?}", + start.elapsed() + ), + Err(_) => warn!( + "{DYNAMIC_ROUTE_PROVIDER}: no healthy seeds found within {:?}", + start.elapsed() + ), + }; + // We can close the channel now. + init_receiver.close(); + + let fetch_actor = NodesFetchActor::new( + Arc::clone(&self.fetcher), + self.fetch_period, + self.fetch_retry_interval, + fetch_sender, + Arc::clone(&self.routing_snapshot), + self.token.clone(), + ); + self.tracker.spawn(async move { fetch_actor.run().await }); + info!( + "{DYNAMIC_ROUTE_PROVIDER}: NodesFetchActor and HealthManagerActor started successfully" + ); + } +} + +// Gracefully stop the inner spawned tasks running in the background. +impl Drop for DynamicRouteProvider { + fn drop(&mut self) { + self.token.cancel(); + self.tracker.close(); + let tracker = self.tracker.clone(); + // If no runtime is available do nothing. + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + tracker.wait().await; + warn!("{DYNAMIC_ROUTE_PROVIDER}: stopped gracefully"); + }); + } else { + error!("{DYNAMIC_ROUTE_PROVIDER}: no runtime available, cannot stop the spawned tasks"); + } + } +} + +#[cfg(test)] +mod tests { + use candid::Principal; + use reqwest::Client; + use std::{ + sync::{Arc, Once}, + time::{Duration, Instant}, + }; + use tracing::Level; + use tracing_subscriber::FmtSubscriber; + + use crate::{ + agent::http_transport::{ + dynamic_routing::{ + dynamic_route_provider::{ + DynamicRouteProviderBuilder, IC0_SEED_DOMAIN, MAINNET_ROOT_SUBNET_ID, + }, + node::Node, + snapshot::{ + latency_based_routing::LatencyRoutingSnapshot, + round_robin_routing::RoundRobinRoutingSnapshot, + }, + test_utils::{ + assert_routed_domains, route_n_times, NodeHealthCheckerMock, NodesFetcherMock, + }, + }, + route_provider::RouteProvider, + ReqwestTransport, + }, + Agent, AgentError, + }; + + static TRACING_INIT: Once = Once::new(); + + pub fn setup_tracing() { + TRACING_INIT.call_once(|| { + FmtSubscriber::builder().with_max_level(Level::TRACE).init(); + }); + } + + async fn assert_no_routing_via_domains( + route_provider: Arc, + excluded_domains: Vec<&str>, + timeout: Duration, + route_call_interval: Duration, + ) { + if excluded_domains.is_empty() { + panic!("List of excluded domains can't be empty"); + } + + let route_calls = 30; + let start = Instant::now(); + + while start.elapsed() < timeout { + let routed_domains = (0..route_calls) + .map(|_| { + route_provider.route().map(|url| { + let domain = url.domain().expect("no domain name in url"); + domain.to_string() + }) + }) + .collect::, _>>() + .unwrap_or_default(); + + // Exit when excluded domains are not used for routing any more. + if !routed_domains.is_empty() + && !routed_domains + .iter() + .any(|d| excluded_domains.contains(&d.as_str())) + { + return; + } + + tokio::time::sleep(route_call_interval).await; + } + panic!("Expected excluded domains {excluded_domains:?} are still observed in routing over the last {route_calls} calls"); + } + + #[tokio::test] + async fn test_mainnet() { + // Setup. + setup_tracing(); + let seed = Node::new(IC0_SEED_DOMAIN).unwrap(); + let client = Client::builder().build().unwrap(); + let route_provider = DynamicRouteProviderBuilder::new( + LatencyRoutingSnapshot::new(), + vec![seed], + client.clone(), + ) + .build() + .await; + let route_provider = Arc::new(route_provider) as Arc; + let transport = + ReqwestTransport::create_with_client_route(Arc::clone(&route_provider), client) + .expect("failed to create transport"); + let agent = Agent::builder() + .with_transport(transport) + .build() + .expect("failed to create an agent"); + let subnet_id = Principal::from_text(MAINNET_ROOT_SUBNET_ID).unwrap(); + // Assert that seed (ic0.app) is not used for routing. Henceforth, only discovered API nodes are used. + assert_no_routing_via_domains( + Arc::clone(&route_provider), + vec![IC0_SEED_DOMAIN], + Duration::from_secs(40), + Duration::from_secs(2), + ) + .await; + // Act: perform /read_state call via dynamically discovered API BNs. + let api_bns = agent + .fetch_api_boundary_nodes_by_subnet_id(subnet_id) + .await + .expect("failed to fetch api boundary nodes"); + assert!(!api_bns.is_empty()); + } + + #[tokio::test] + async fn test_routing_with_topology_and_node_health_updates() { + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + fetcher.overwrite_nodes(vec![node_1.clone()]); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // A single healthy node exists in the topology. This node happens to be the seed node. + fetcher.overwrite_nodes(vec![node_1.clone()]); + checker.overwrite_healthy_nodes(vec![node_1.clone()]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) + .with_fetcher(fetcher.clone()) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval) + .build() + .await; + let route_provider = Arc::new(route_provider); + + // This time span is required for the snapshot to be fully updated with the new nodes and their health info. + let snapshot_update_duration = fetch_interval + 2 * check_interval; + + // Test 1: multiple route() calls return a single domain=ic0.app. + // Only a single node exists, which is initially healthy. + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.domain()], 6); + + // Test 2: multiple route() calls return 3 different domains with equal fairness (repetition). + // Two healthy nodes are added to the topology. + let node_2 = Node::new("api1.com").unwrap(); + let node_3 = Node::new("api2.com").unwrap(); + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_2.clone(), node_3.clone()]); + fetcher.overwrite_nodes(vec![node_1.clone(), node_2.clone(), node_3.clone()]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![node_1.domain(), node_2.domain(), node_3.domain()], + 2, + ); + + // Test 3: multiple route() calls return 2 different domains with equal fairness (repetition). + // One node is set to unhealthy. + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_3.clone()]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.domain(), node_3.domain()], 3); + + // Test 4: multiple route() calls return 3 different domains with equal fairness (repetition). + // Unhealthy node is set back to healthy. + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_2.clone(), node_3.clone()]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![node_1.domain(), node_2.domain(), node_3.domain()], + 2, + ); + + // Test 5: multiple route() calls return 3 different domains with equal fairness (repetition). + // One healthy node is added, but another one goes unhealthy. + let node_4 = Node::new("api3.com").unwrap(); + checker.overwrite_healthy_nodes(vec![node_2.clone(), node_3.clone(), node_4.clone()]); + fetcher.overwrite_nodes(vec![ + node_1.clone(), + node_2.clone(), + node_3.clone(), + node_4.clone(), + ]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![node_2.domain(), node_3.domain(), node_4.domain()], + 2, + ); + + // Test 6: multiple route() calls return a single domain=api1.com. + // One node is set to unhealthy and one is removed from the topology. + checker.overwrite_healthy_nodes(vec![node_2.clone(), node_3.clone()]); + fetcher.overwrite_nodes(vec![node_1.clone(), node_2.clone(), node_4.clone()]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(3, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_2.domain()], 3); + } + + #[tokio::test] + async fn test_route_with_initially_unhealthy_seeds_becoming_healthy() { + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + let node_2 = Node::new("api1.com").unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // Two nodes exist, which are initially unhealthy. + fetcher.overwrite_nodes(vec![node_1.clone(), node_2.clone()]); + checker.overwrite_healthy_nodes(vec![]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = DynamicRouteProviderBuilder::new( + snapshot, + vec![node_1.clone(), node_2.clone()], + client, + ) + .with_fetcher(fetcher) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval) + .build() + .await; + let route_provider = Arc::new(route_provider); + + // Test 1: calls to route() return an error, as no healthy seeds exist. + for _ in 0..4 { + tokio::time::sleep(check_interval).await; + let result = route_provider.route(); + assert_eq!( + result.unwrap_err(), + AgentError::RouteProviderError("No healthy API nodes found.".to_string()) + ); + } + + // Test 2: calls to route() return both seeds, as they become healthy. + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_2.clone()]); + tokio::time::sleep(3 * check_interval).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.domain(), node_2.domain()], 3); + } + + #[tokio::test] + async fn test_routing_with_no_healthy_nodes_returns_an_error() { + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // A single seed node which is initially healthy. + fetcher.overwrite_nodes(vec![node_1.clone()]); + checker.overwrite_healthy_nodes(vec![node_1.clone()]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) + .with_fetcher(fetcher) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval) + .build() + .await; + let route_provider = Arc::new(route_provider); + + // Test 1: multiple route() calls return a single domain=ic0.app, as the seed is healthy. + tokio::time::sleep(2 * check_interval).await; + let routed_domains = route_n_times(3, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.domain()], 3); + + // Test 2: calls to route() return an error, as no healthy nodes exist. + checker.overwrite_healthy_nodes(vec![]); + tokio::time::sleep(2 * check_interval).await; + for _ in 0..4 { + let result = route_provider.route(); + assert_eq!( + result.unwrap_err(), + AgentError::RouteProviderError("No healthy API nodes found.".to_string()) + ); + } + } + + #[tokio::test] + async fn test_route_with_no_healthy_seeds_errors() { + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // No healthy seed nodes present, this should lead to errors. + fetcher.overwrite_nodes(vec![]); + checker.overwrite_healthy_nodes(vec![]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) + .with_fetcher(fetcher) + .with_checker(checker) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval) + .build() + .await; + + // Test: calls to route() return an error, as no healthy seeds exist. + for _ in 0..4 { + tokio::time::sleep(check_interval).await; + let result = route_provider.route(); + assert_eq!( + result.unwrap_err(), + AgentError::RouteProviderError("No healthy API nodes found.".to_string()) + ); + } + } + + #[tokio::test] + async fn test_route_with_one_healthy_and_one_unhealthy_seed() { + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + let node_2 = Node::new("api1.com").unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // One healthy seed is present, it should be discovered during the initialization time. + fetcher.overwrite_nodes(vec![node_1.clone(), node_2.clone()]); + checker.overwrite_healthy_nodes(vec![node_1.clone()]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = DynamicRouteProviderBuilder::new( + snapshot, + vec![node_1.clone(), node_2.clone()], + client, + ) + .with_fetcher(fetcher) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval) + .build() + .await; + let route_provider = Arc::new(route_provider); + + // Test 1: calls to route() return only a healthy seed ic0.app. + let routed_domains = route_n_times(3, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.domain()], 3); + + // Test 2: calls to route() return two healthy seeds, as the unhealthy seed becomes healthy. + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_2.clone()]); + tokio::time::sleep(2 * check_interval).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.domain(), node_2.domain()], 3); + } + + #[tokio::test] + async fn test_routing_with_an_empty_fetched_list_of_api_nodes() { + // Check resiliency to an empty list of fetched API nodes (this should never happen in normal IC operation). + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // One healthy seed is initially present, but the topology has no node. + fetcher.overwrite_nodes(vec![]); + checker.overwrite_healthy_nodes(vec![node_1.clone()]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = + DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) + .with_fetcher(fetcher.clone()) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval) + .build() + .await; + let route_provider = Arc::new(route_provider); + + // This time span is required for the snapshot to be fully updated with the new nodes topology and health info. + let snapshot_update_duration = fetch_interval + 2 * check_interval; + + // Test 1: multiple route() calls return a single domain=ic0.app. + // HealthManagerActor shouldn't update the snapshot, if the list of fetched nodes is empty, thus we observe the healthy seed. + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(3, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.domain()], 3); + + // Test 2: multiple route() calls should now return 3 different domains with equal fairness (repetition). + // Three nodes are added to the topology, i.e. now the fetched nodes list is non-empty. + let node_2 = Node::new("api1.com").unwrap(); + let node_3 = Node::new("api2.com").unwrap(); + fetcher.overwrite_nodes(vec![node_1.clone(), node_2.clone(), node_3.clone()]); + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_2.clone(), node_3.clone()]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![node_1.domain(), node_2.domain(), node_3.domain()], + 2, + ); + } +} + +// - none of the seeds [] are healthy +// - none of the API node [] is healthy +// - return a vector of errors: HealthCheckErrors, FetchErrors, etc. diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs b/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs new file mode 100644 index 00000000..491f010b --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs @@ -0,0 +1,302 @@ +use async_trait::async_trait; +use http::{Method, StatusCode}; +use reqwest::{Client, Request}; +use std::{ + fmt::Debug, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::{sync::mpsc, time}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tracing::{debug, error, info, warn}; +use url::Url; + +use crate::agent::http_transport::dynamic_routing::{ + dynamic_route_provider::DynamicRouteProviderError, + messages::{FetchedNodes, NodeHealthState}, + node::Node, + snapshot::routing_snapshot::RoutingSnapshot, + type_aliases::{AtomicSwap, ReceiverMpsc, ReceiverWatch, SenderMpsc}, +}; + +const CHANNEL_BUFFER: usize = 128; + +/// A trait representing a health check of the node. +#[async_trait] +pub trait HealthCheck: Send + Sync + Debug { + /// Checks the health of the node. + async fn check(&self, node: &Node) -> Result; +} + +/// A struct representing the health check status of the node. +#[derive(Clone, PartialEq, Debug, Default)] +pub struct HealthCheckStatus { + latency: Option, +} + +impl HealthCheckStatus { + /// Creates a new `HealthCheckStatus` instance. + pub fn new(latency: Option) -> Self { + Self { latency } + } + + /// Checks if the node is healthy. + pub fn is_healthy(&self) -> bool { + self.latency.is_some() + } + + /// Get the latency of the health check. + pub fn latency(&self) -> Option { + self.latency + } +} + +/// A struct implementing the `HealthCheck` for the nodes. +#[derive(Debug)] +pub struct HealthChecker { + http_client: Client, + timeout: Duration, +} + +impl HealthChecker { + /// Creates a new `HealthChecker` instance. + pub fn new(http_client: Client, timeout: Duration) -> Self { + Self { + http_client, + timeout, + } + } +} + +const HEALTH_CHECKER: &str = "HealthChecker"; + +#[async_trait] +impl HealthCheck for HealthChecker { + async fn check(&self, node: &Node) -> Result { + // API boundary node exposes /health endpoint and should respond with 204 (No Content) if it's healthy. + let url = Url::parse(&format!("https://{}/health", node.domain())).unwrap(); + + let mut request = Request::new(Method::GET, url.clone()); + *request.timeout_mut() = Some(self.timeout); + + let start = Instant::now(); + let response = self.http_client.execute(request).await.map_err(|err| { + DynamicRouteProviderError::HealthCheckError(format!( + "Failed to execute GET request to {url}: {err}" + )) + })?; + let latency = start.elapsed(); + + if response.status() != StatusCode::NO_CONTENT { + let err_msg = format!( + "{HEALTH_CHECKER}: Unexpected http status code {} for url={url} received", + response.status() + ); + error!(err_msg); + return Err(DynamicRouteProviderError::HealthCheckError(err_msg)); + } + + Ok(HealthCheckStatus::new(Some(latency))) + } +} + +const HEALTH_CHECK_ACTOR: &str = "HealthCheckActor"; + +/// A struct performing the health check of the node and sending the health status to the listener. +struct HealthCheckActor { + /// The health checker. + checker: Arc, + /// The period of the health check. + period: Duration, + /// The node to check. + node: Node, + /// The sender channel (listener) to send the health status. + sender_channel: SenderMpsc, + /// The cancellation token of the actor. + token: CancellationToken, +} + +impl HealthCheckActor { + fn new( + checker: Arc, + period: Duration, + node: Node, + sender_channel: SenderMpsc, + token: CancellationToken, + ) -> Self { + Self { + checker, + period, + node, + sender_channel, + token, + } + } + + /// Runs the actor. + async fn run(self) { + let mut interval = time::interval(self.period); + loop { + tokio::select! { + _ = interval.tick() => { + let health = self.checker.check(&self.node).await.unwrap_or_default(); + let message = NodeHealthState { + node: self.node.clone(), + health, + }; + // Inform the listener about node's health. It can only fail if the listener was closed/dropped. + self.sender_channel.send(message).await.expect("Failed to send node's health state"); + } + _ = self.token.cancelled() => { + info!("{HEALTH_CHECK_ACTOR}: was gracefully cancelled for node {:?}", self.node); + break; + } + } + } + } +} + +/// The name of the health manager actor. +pub(super) const HEALTH_MANAGER_ACTOR: &str = "HealthManagerActor"; + +/// A struct managing the health checks of the nodes. +/// It receives the fetched nodes from the `NodesFetchActor` and starts the health checks for them. +/// It also receives the health status of the nodes from the `HealthCheckActor/s` and updates the routing snapshot. +pub(super) struct HealthManagerActor { + /// The health checker. + checker: Arc, + /// The period of the health check. + period: Duration, + /// The routing snapshot, storing the nodes. + routing_snapshot: AtomicSwap, + /// The receiver channel to listen to the fetched nodes messages. + fetch_receiver: ReceiverWatch, + /// The sender channel to send the health status of the nodes back to HealthManagerActor. + check_sender: SenderMpsc, + /// The receiver channel to receive the health status of the nodes from the `HealthCheckActor/s`. + check_receiver: ReceiverMpsc, + /// The sender channel to send the initialization status to DynamicRouteProvider (used only once in the init phase). + init_sender: SenderMpsc, + /// The cancellation token of the actor. + token: CancellationToken, + /// The cancellation token for all the health checks. + nodes_token: CancellationToken, + /// The task tracker of the health checks, waiting for the tasks to exit (graceful termination). + nodes_tracker: TaskTracker, + /// The flag indicating if this actor is initialized with healthy nodes. + is_initialized: bool, +} + +impl HealthManagerActor +where + S: RoutingSnapshot, +{ + /// Creates a new `HealthManagerActor` instance. + pub fn new( + checker: Arc, + period: Duration, + routing_snapshot: AtomicSwap, + fetch_receiver: ReceiverWatch, + init_sender: SenderMpsc, + token: CancellationToken, + ) -> Self { + let (check_sender, check_receiver) = mpsc::channel(CHANNEL_BUFFER); + + Self { + checker, + period, + routing_snapshot, + fetch_receiver, + check_sender, + check_receiver, + init_sender, + token, + nodes_token: CancellationToken::new(), + nodes_tracker: TaskTracker::new(), + is_initialized: false, + } + } + + /// Runs the actor. + pub async fn run(mut self) { + loop { + tokio::select! { + // Process a new array of fetched nodes from NodesFetchActor, if it appeared in the channel. + result = self.fetch_receiver.changed() => { + if let Err(err) = result { + error!("{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {err:?}"); + self.token.cancel(); + continue; + } + // Get the latest value from the channel and mark it as seen. + let Some(FetchedNodes { nodes }) = self.fetch_receiver.borrow_and_update().clone() else { continue }; + self.handle_fetch_update(nodes).await; + } + // Receive health check messages from all running HealthCheckActor/s. + Some(msg) = self.check_receiver.recv() => { + self.handle_health_update(msg).await; + } + _ = self.token.cancelled() => { + self.stop_all_checks().await; + self.check_receiver.close(); + warn!("{HEALTH_MANAGER_ACTOR}: was gracefully cancelled, all nodes health checks stopped"); + break; + } + } + } + } + + async fn handle_health_update(&mut self, msg: NodeHealthState) { + let current_snapshot = self.routing_snapshot.load_full(); + let mut new_snapshot = (*current_snapshot).clone(); + new_snapshot.update_node(&msg.node, msg.health.clone()); + self.routing_snapshot.store(Arc::new(new_snapshot)); + if !self.is_initialized && msg.health.is_healthy() { + self.is_initialized = true; + // If TIMEOUT_AWAIT_HEALTHY_SEED has been exceeded, the receiver was dropped and send would thus fail. We ignore the failure. + let _ = self.init_sender.send(true).await; + } + } + + async fn handle_fetch_update(&mut self, nodes: Vec) { + if nodes.is_empty() { + // This is a bug in the IC registry. There should be at least one API Boundary Node in the registry. + // Updating nodes snapshot with an empty array, would lead to an irrecoverable error, as new nodes couldn't be fetched. + // We avoid such updates and just wait for a non-empty list. + error!("{HEALTH_MANAGER_ACTOR}: list of fetched nodes is empty"); + return; + } + debug!("{HEALTH_MANAGER_ACTOR}: fetched nodes received {:?}", nodes); + let current_snapshot = self.routing_snapshot.load_full(); + let mut new_snapshot = (*current_snapshot).clone(); + // If the snapshot has changed, store it and restart all node's health checks. + if new_snapshot.sync_nodes(&nodes) { + self.routing_snapshot.store(Arc::new(new_snapshot)); + self.stop_all_checks().await; + self.start_checks(nodes.to_vec()); + } + } + + fn start_checks(&mut self, nodes: Vec) { + // Create a single cancellation token for all started health checks. + self.nodes_token = CancellationToken::new(); + for node in nodes { + debug!("{HEALTH_MANAGER_ACTOR}: starting health check for node {node:?}"); + let actor = HealthCheckActor::new( + Arc::clone(&self.checker), + self.period, + node, + self.check_sender.clone(), + self.nodes_token.clone(), + ); + self.nodes_tracker.spawn(async move { actor.run().await }); + } + } + + async fn stop_all_checks(&self) { + warn!("{HEALTH_MANAGER_ACTOR}: stopping all running health checks"); + self.nodes_token.cancel(); + self.nodes_tracker.close(); + self.nodes_tracker.wait().await; + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs b/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs new file mode 100644 index 00000000..5feeae25 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs @@ -0,0 +1,16 @@ +use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; + +/// Represents a message with fetched nodes. +#[derive(Debug, Clone)] +pub struct FetchedNodes { + /// The fetched nodes. + pub nodes: Vec, +} + +/// Represents a message with the health state of a node. +pub struct NodeHealthState { + /// The node. + pub node: Node, + /// The health state of the node. + pub health: HealthCheckStatus, +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs b/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs new file mode 100644 index 00000000..07570f0f --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs @@ -0,0 +1,16 @@ +//! Dynamic routing implementation. +pub mod dynamic_route_provider; +/// Health check implementation. +pub mod health_check; +/// Messages used in dynamic routing. +pub(super) mod messages; +/// Node implementation. +pub mod node; +/// Nodes fetch implementation. +pub mod nodes_fetch; +/// Routing snapshot implementation. +pub mod snapshot; +#[cfg(test)] +pub(super) mod test_utils; +/// Type aliases used in dynamic routing. +pub(super) mod type_aliases; diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/node.rs b/ic-agent/src/agent/http_transport/dynamic_routing/node.rs new file mode 100644 index 00000000..37716da3 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/node.rs @@ -0,0 +1,60 @@ +use url::Url; + +use crate::agent::{ + http_transport::dynamic_routing::dynamic_route_provider::DynamicRouteProviderError, + ApiBoundaryNode, +}; + +/// Represents a node in the dynamic routing. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Node { + domain: String, +} + +impl Node { + /// Creates a new `Node` instance from the domain name. + pub fn new(domain: &str) -> Result { + if !is_valid_domain(domain) { + return Err(DynamicRouteProviderError::InvalidDomainName( + domain.to_string(), + )); + } + Ok(Self { + domain: domain.to_string(), + }) + } + + /// Returns the domain name of the node. + pub fn domain(&self) -> String { + self.domain.clone() + } +} + +impl Node { + /// Converts the node to a routing URL. + pub fn to_routing_url(&self) -> Url { + Url::parse(&format!("https://{}", self.domain)).expect("failed to parse URL") + } +} + +impl From<&Node> for Url { + fn from(node: &Node) -> Self { + // Parsing can't fail, as the domain was checked at node instantiation. + Url::parse(&format!("https://{}", node.domain)).expect("failed to parse URL") + } +} + +impl TryFrom<&ApiBoundaryNode> for Node { + type Error = DynamicRouteProviderError; + + fn try_from(value: &ApiBoundaryNode) -> Result { + Node::new(&value.domain) + } +} + +/// Checks if the given domain is a valid URL. +fn is_valid_domain>(domain: S) -> bool { + // Prepend scheme to make it a valid URL + let url_string = format!("http://{}", domain.as_ref()); + Url::parse(&url_string).is_ok() +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs b/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs new file mode 100644 index 00000000..7e01d145 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs @@ -0,0 +1,178 @@ +use async_trait::async_trait; +use candid::Principal; +use reqwest::Client; +use std::{fmt::Debug, sync::Arc, time::Duration}; +use tokio::time::{self, sleep}; +use tokio_util::sync::CancellationToken; +use tracing::{error, warn}; +use url::Url; + +use crate::agent::{ + http_transport::{ + dynamic_routing::{ + dynamic_route_provider::DynamicRouteProviderError, + health_check::HEALTH_MANAGER_ACTOR, + messages::FetchedNodes, + node::Node, + snapshot::routing_snapshot::RoutingSnapshot, + type_aliases::{AtomicSwap, SenderWatch}, + }, + reqwest_transport::ReqwestTransport, + }, + Agent, +}; + +const NODES_FETCH_ACTOR: &str = "NodesFetchActor"; + +/// Fetcher of nodes in the topology. +#[async_trait] +pub trait Fetch: Sync + Send + Debug { + /// Fetches the nodes from the topology. + async fn fetch(&self, url: Url) -> Result, DynamicRouteProviderError>; +} + +/// A struct representing the fetcher of the nodes from the topology. +#[derive(Debug)] +pub struct NodesFetcher { + http_client: Client, + subnet_id: Principal, + // By default, the nodes fetcher is configured to talk to the mainnet of Internet Computer, and verifies responses using a hard-coded public key. + // However, for testnets one can set up a custom public key. + root_key: Option>, +} + +impl NodesFetcher { + /// Creates a new `NodesFetcher` instance. + pub fn new(http_client: Client, subnet_id: Principal, root_key: Option>) -> Self { + Self { + http_client, + subnet_id, + root_key, + } + } +} + +#[async_trait] +impl Fetch for NodesFetcher { + async fn fetch(&self, url: Url) -> Result, DynamicRouteProviderError> { + let transport = ReqwestTransport::create_with_client(url, self.http_client.clone()) + .map_err(|err| { + DynamicRouteProviderError::NodesFetchError(format!( + "Failed to build transport: {err}" + )) + })?; + let agent = Agent::builder() + .with_transport(transport) + .build() + .map_err(|err| { + DynamicRouteProviderError::NodesFetchError(format!( + "Failed to build the agent: {err}" + )) + })?; + if let Some(key) = self.root_key.clone() { + agent.set_root_key(key); + } + let api_bns = agent + .fetch_api_boundary_nodes_by_subnet_id(self.subnet_id) + .await + .map_err(|err| { + DynamicRouteProviderError::NodesFetchError(format!( + "Failed to fetch API nodes: {err}" + )) + })?; + // If some API BNs have invalid domain names, they are discarded. + let nodes = api_bns + .iter() + .filter_map(|api_node| api_node.try_into().ok()) + .collect(); + return Ok(nodes); + } +} + +/// A struct representing the actor responsible for fetching existing nodes and communicating it with the listener. +pub(super) struct NodesFetchActor { + /// The fetcher object responsible for fetching the nodes. + fetcher: Arc, + /// Time period between fetches. + period: Duration, + /// The interval to wait before retrying to fetch the nodes in case of failures. + fetch_retry_interval: Duration, + /// Communication channel with the listener. + fetch_sender: SenderWatch, + /// The snapshot of the routing table. + routing_snapshot: AtomicSwap, + /// The token to cancel/stop the actor. + token: CancellationToken, +} + +impl NodesFetchActor +where + S: RoutingSnapshot, +{ + /// Creates a new `NodesFetchActor` instance. + pub fn new( + fetcher: Arc, + period: Duration, + retry_interval: Duration, + fetch_sender: SenderWatch, + snapshot: AtomicSwap, + token: CancellationToken, + ) -> Self { + Self { + fetcher, + period, + fetch_retry_interval: retry_interval, + fetch_sender, + routing_snapshot: snapshot, + token, + } + } + + /// Runs the actor. + pub async fn run(self) { + let mut interval = time::interval(self.period); + loop { + tokio::select! { + _ = interval.tick() => { + // Retry until success: + // - try to get a healthy node from the routing snapshot + // - if snapshot is empty, break the cycle and wait for the next fetch cycle + // - using the healthy node, try to fetch nodes from topology + // - if failure, sleep and retry + // - try send fetched nodes to the listener + // - failure should never happen, but we trace it if it does + loop { + let snapshot = self.routing_snapshot.load(); + if let Some(node) = snapshot.next() { + match self.fetcher.fetch((&node).into()).await { + Ok(nodes) => { + let msg = Some( + FetchedNodes {nodes}); + match self.fetch_sender.send(msg) { + Ok(()) => break, // message sent successfully, exist the loop + Err(err) => { + error!("{NODES_FETCH_ACTOR}: failed to send results to {HEALTH_MANAGER_ACTOR}: {err:?}"); + } + } + }, + Err(err) => { + error!("{NODES_FETCH_ACTOR}: failed to fetch nodes: {err:?}"); + } + }; + } else { + // No healthy nodes in the snapshot, break the cycle and wait for the next fetch cycle + error!("{NODES_FETCH_ACTOR}: no nodes in the snapshot"); + break; + }; + warn!("Retrying to fetch the nodes in {:?}", self.fetch_retry_interval); + sleep(self.fetch_retry_interval).await; + } + } + _ = self.token.cancelled() => { + warn!("{NODES_FETCH_ACTOR}: was gracefully cancelled"); + break; + } + } + } + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs new file mode 100644 index 00000000..1ae10136 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs @@ -0,0 +1,362 @@ +use std::{collections::HashSet, time::Duration}; + +use rand::Rng; +use simple_moving_average::{SumTreeSMA, SMA}; + +use crate::agent::http_transport::dynamic_routing::{ + health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot, +}; + +// Some big value implying that node is unhealthy, should be much bigger than node's latency. +const MAX_LATENCY: Duration = Duration::from_secs(500); + +const WINDOW_SIZE: usize = 15; + +// Algorithmic complexity: add sample - O(log(N)), get average - O(1). +// Space complexity: O(N) +type LatencyMovAvg = SumTreeSMA; + +/// A node, which stores health check latencies in the form of moving average. +#[derive(Clone, Debug)] +struct WeightedNode { + node: Node, + /// Moving mean of latencies measurements. + latency_mov_avg: LatencyMovAvg, + /// Weight of the node (invers of the average latency), used for stochastic weighted random sampling. + weight: f64, +} + +/// Routing snapshot for latency-based routing. +/// In this routing strategy, nodes are randomly selected based on their averaged latency of the last WINDOW_SIZE health checks. +/// Nodes with smaller average latencies are preferred for routing. +#[derive(Default, Debug, Clone)] +pub struct LatencyRoutingSnapshot { + weighted_nodes: Vec, + existing_nodes: HashSet, +} + +/// Implementation of the LatencyRoutingSnapshot. +impl LatencyRoutingSnapshot { + /// Creates a new LatencyRoutingSnapshot. + pub fn new() -> Self { + Self { + weighted_nodes: vec![], + existing_nodes: HashSet::new(), + } + } +} + +/// Helper function to sample nodes based on their weights. +/// Here weight index is selected based on the input number in range [0, 1] +#[inline(always)] +fn weighted_sample(weights: &[f64], number: f64) -> Option { + if !(0.0..=1.0).contains(&number) { + return None; + } + let sum: f64 = weights.iter().sum(); + let mut weighted_number = number * sum; + for (idx, weight) in weights.iter().enumerate() { + weighted_number -= weight; + if weighted_number <= 0.0 { + return Some(idx); + } + } + None +} + +impl RoutingSnapshot for LatencyRoutingSnapshot { + fn has_nodes(&self) -> bool { + !self.weighted_nodes.is_empty() + } + + fn next(&self) -> Option { + // We select a node based on it's weight, using a stochastic weighted random sampling approach. + let weights = self + .weighted_nodes + .iter() + .map(|n| n.weight) + .collect::>(); + // Generate a random float in the range [0, 1) + let mut rng = rand::thread_rng(); + let rand_num = rng.gen::(); + // Using this random float and an array of weights we get an index of the node. + let idx = weighted_sample(weights.as_slice(), rand_num); + idx.map(|idx| self.weighted_nodes[idx].node.clone()) + } + + fn sync_nodes(&mut self, nodes: &[Node]) -> bool { + let new_nodes = HashSet::from_iter(nodes.iter().cloned()); + // Find nodes removed from topology. + let nodes_removed: Vec<_> = self + .existing_nodes + .difference(&new_nodes) + .cloned() + .collect(); + let has_removed_nodes = !nodes_removed.is_empty(); + // Find nodes added to topology. + let nodes_added: Vec<_> = new_nodes + .difference(&self.existing_nodes) + .cloned() + .collect(); + let has_added_nodes = !nodes_added.is_empty(); + self.existing_nodes.extend(nodes_added); + // NOTE: newly added nodes will appear in the weighted_nodes later. + // This happens after the first node health check round and a consequent update_node() invocation. + for node in nodes_removed.into_iter() { + self.existing_nodes.remove(&node); + let idx = self.weighted_nodes.iter().position(|x| x.node == node); + idx.map(|idx| self.weighted_nodes.swap_remove(idx)); + } + + has_added_nodes || has_removed_nodes + } + + fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool { + if !self.existing_nodes.contains(node) { + return false; + } + + // If latency is None (meaning Node is unhealthy), we assign some big value + let latency = health.latency().unwrap_or(MAX_LATENCY); + + if let Some(idx) = self.weighted_nodes.iter().position(|x| &x.node == node) { + // Node is already in the array (it is not the first update_node() call). + self.weighted_nodes[idx].latency_mov_avg.add_sample(latency); + let latency_avg = self.weighted_nodes[idx].latency_mov_avg.get_average(); + // As nodes with smaller average latencies are preferred for routing, we use inverted values for weights. + self.weighted_nodes[idx].weight = 1.0 / latency_avg.as_secs_f64(); + } else { + // Node is not yet in array (first update_node() call). + let mut latency_mov_avg = LatencyMovAvg::from_zero(Duration::ZERO); + latency_mov_avg.add_sample(latency); + let weight = 1.0 / latency_mov_avg.get_average().as_secs_f64(); + self.weighted_nodes.push(WeightedNode { + latency_mov_avg, + node: node.clone(), + weight, + }) + } + + true + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashSet, time::Duration}; + + use simple_moving_average::SMA; + + use crate::agent::http_transport::dynamic_routing::{ + health_check::HealthCheckStatus, + node::Node, + snapshot::{ + latency_based_routing::{ + weighted_sample, LatencyMovAvg, LatencyRoutingSnapshot, WeightedNode, MAX_LATENCY, + }, + routing_snapshot::RoutingSnapshot, + }, + }; + + #[test] + fn test_snapshot_init() { + // Arrange + let snapshot = LatencyRoutingSnapshot::new(); + // Assert + assert!(snapshot.weighted_nodes.is_empty()); + assert!(snapshot.existing_nodes.is_empty()); + assert!(!snapshot.has_nodes()); + assert!(snapshot.next().is_none()); + } + + #[test] + fn test_update_for_non_existing_node_fails() { + // Arrange + let mut snapshot = LatencyRoutingSnapshot::new(); + let node = Node::new("api1.com").unwrap(); + let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); + // Act + let is_updated = snapshot.update_node(&node, health); + // Assert + assert!(!is_updated); + assert!(snapshot.weighted_nodes.is_empty()); + assert!(!snapshot.has_nodes()); + assert!(snapshot.next().is_none()); + } + + #[test] + fn test_update_for_existing_node_succeeds() { + // Arrange + let mut snapshot = LatencyRoutingSnapshot::new(); + let node = Node::new("api1.com").unwrap(); + let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); + snapshot.existing_nodes.insert(node.clone()); + // Check first update + let is_updated = snapshot.update_node(&node, health); + assert!(is_updated); + assert!(snapshot.has_nodes()); + let weighted_node = snapshot.weighted_nodes.first().unwrap(); + assert_eq!( + weighted_node.latency_mov_avg.get_average(), + Duration::from_secs(1) + ); + assert_eq!(weighted_node.weight, 1.0); + assert_eq!(snapshot.next().unwrap(), node); + // Check second update + let health = HealthCheckStatus::new(Some(Duration::from_secs(2))); + let is_updated = snapshot.update_node(&node, health); + assert!(is_updated); + let weighted_node = snapshot.weighted_nodes.first().unwrap(); + assert_eq!( + weighted_node.latency_mov_avg.get_average(), + Duration::from_millis(1500) + ); + assert_eq!(weighted_node.weight, 1.0 / 1.5); + // Check third update + let health = HealthCheckStatus::new(Some(Duration::from_secs(3))); + let is_updated = snapshot.update_node(&node, health); + assert!(is_updated); + let weighted_node = snapshot.weighted_nodes.first().unwrap(); + assert_eq!( + weighted_node.latency_mov_avg.get_average(), + Duration::from_millis(2000) + ); + assert_eq!(weighted_node.weight, 0.5); + // Check forth update with none + let health = HealthCheckStatus::new(None); + let is_updated = snapshot.update_node(&node, health); + assert!(is_updated); + let weighted_node = snapshot.weighted_nodes.first().unwrap(); + let avg_latency = Duration::from_secs_f64((MAX_LATENCY.as_secs() as f64 + 6.0) / 4.0); + assert_eq!(weighted_node.latency_mov_avg.get_average(), avg_latency); + assert_eq!(weighted_node.weight, 1.0 / avg_latency.as_secs_f64()); + assert_eq!(snapshot.weighted_nodes.len(), 1); + assert_eq!(snapshot.existing_nodes.len(), 1); + assert_eq!(snapshot.next().unwrap(), node); + } + + #[test] + fn test_sync_node_scenarios() { + // Arrange + let mut snapshot = LatencyRoutingSnapshot::new(); + let node_1 = Node::new("api1.com").unwrap(); + // Sync with node_1 + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]); + assert!(nodes_changed); + assert!(snapshot.weighted_nodes.is_empty()); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_1.clone()]) + ); + // Add node_1 to weighted_nodes manually + snapshot.weighted_nodes.push(WeightedNode { + node: node_1.clone(), + latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO), + weight: 0.0, + }); + // Sync with node_1 again + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]); + assert!(!nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_1.clone()]) + ); + assert_eq!(snapshot.weighted_nodes[0].node, node_1); + // Sync with node_2 + let node_2 = Node::new("api2.com").unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]); + assert!(nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_2.clone()]) + ); + // Make sure node_1 was removed from weighted_nodes too + assert!(snapshot.weighted_nodes.is_empty()); + // Add node_2 to weighted_nodes manually + snapshot.weighted_nodes.push(WeightedNode { + node: node_2.clone(), + latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO), + weight: 0.0, + }); + // Sync with [node_2, node_3] + let node_3 = Node::new("api3.com").unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_3.clone(), node_2.clone()]); + assert!(nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_3.clone(), node_2.clone()]) + ); + assert_eq!(snapshot.weighted_nodes[0].node, node_2); + // Add node_3 to weighted_nodes manually + snapshot.weighted_nodes.push(WeightedNode { + node: node_3, + latency_mov_avg: LatencyMovAvg::from_zero(Duration::ZERO), + weight: 0.0, + }); + // Sync with [] + let nodes_changed = snapshot.sync_nodes(&[]); + assert!(nodes_changed); + assert!(snapshot.existing_nodes.is_empty()); + // Make sure all nodes were removed from the healthy_nodes + assert!(snapshot.weighted_nodes.is_empty()); + // Sync with [] again + let nodes_changed = snapshot.sync_nodes(&[]); + assert!(!nodes_changed); + assert!(snapshot.existing_nodes.is_empty()); + } + + #[test] + fn test_weighted_sample() { + // Case 1: empty array + let arr: &[f64] = &[]; + let idx = weighted_sample(arr, 0.5); + assert_eq!(idx, None); + // Case 2: single element in array + let arr: &[f64] = &[1.0]; + let idx = weighted_sample(arr, 0.0); + assert_eq!(idx, Some(0)); + let idx = weighted_sample(arr, 1.0); + assert_eq!(idx, Some(0)); + // check bounds + let idx = weighted_sample(arr, -1.0); + assert_eq!(idx, None); + let idx = weighted_sample(arr, 1.1); + assert_eq!(idx, None); + // Case 3: two elements in array (second element has twice the weight of the first) + let arr: &[f64] = &[1.0, 2.0]; // prefixed_sum = [1.0, 3.0] + let idx = weighted_sample(arr, 0.0); // 0.0 * 3.0 < 1.0 + assert_eq!(idx, Some(0)); + let idx = weighted_sample(arr, 0.33); // 0.33 * 3.0 < 1.0 + assert_eq!(idx, Some(0)); // selection probability ~0.33 + let idx = weighted_sample(arr, 0.35); // 0.35 * 3.0 > 1.0 + assert_eq!(idx, Some(1)); // selection probability ~0.66 + let idx = weighted_sample(arr, 1.0); // 1.0 * 3.0 > 1.0 + assert_eq!(idx, Some(1)); + // check bounds + let idx = weighted_sample(arr, -1.0); + assert_eq!(idx, None); + let idx = weighted_sample(arr, 1.1); + assert_eq!(idx, None); + // Case 4: four elements in array + let arr: &[f64] = &[1.0, 2.0, 1.5, 2.5]; // prefixed_sum = [1.0, 3.0, 4.5, 7.0] + let idx = weighted_sample(arr, 0.14); // 0.14 * 7 < 1.0 + assert_eq!(idx, Some(0)); // probability ~0.14 + let idx = weighted_sample(arr, 0.15); // 0.15 * 7 > 1.0 + assert_eq!(idx, Some(1)); + let idx = weighted_sample(arr, 0.42); // 0.42 * 7 < 3.0 + assert_eq!(idx, Some(1)); // probability ~0.28 + let idx = weighted_sample(arr, 0.43); // 0.43 * 7 > 3.0 + assert_eq!(idx, Some(2)); + let idx = weighted_sample(arr, 0.64); // 0.64 * 7 < 4.5 + assert_eq!(idx, Some(2)); // probability ~0.22 + let idx = weighted_sample(arr, 0.65); // 0.65 * 7 > 4.5 + assert_eq!(idx, Some(3)); + let idx = weighted_sample(arr, 0.99); + assert_eq!(idx, Some(3)); // probability ~0.35 + // check bounds + let idx = weighted_sample(arr, -1.0); + assert_eq!(idx, None); + let idx = weighted_sample(arr, 1.1); + assert_eq!(idx, None); + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs new file mode 100644 index 00000000..73df1537 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs @@ -0,0 +1,6 @@ +/// Snapshot of the routing table. +pub mod latency_based_routing; +/// Node implementation. +pub mod round_robin_routing; +/// Routing snapshot implementation. +pub mod routing_snapshot; diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs new file mode 100644 index 00000000..149e49d2 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs @@ -0,0 +1,220 @@ +use std::{ + collections::HashSet, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +use crate::agent::http_transport::dynamic_routing::{ + health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot, +}; + +/// Routing snapshot, which samples nodes in a round-robin fashion. +#[derive(Default, Debug, Clone)] +pub struct RoundRobinRoutingSnapshot { + current_idx: Arc, + healthy_nodes: HashSet, + existing_nodes: HashSet, +} + +impl RoundRobinRoutingSnapshot { + /// Creates a new instance of `RoundRobinRoutingSnapshot`. + pub fn new() -> Self { + Self { + current_idx: Arc::new(AtomicUsize::new(0)), + healthy_nodes: HashSet::new(), + existing_nodes: HashSet::new(), + } + } +} + +impl RoutingSnapshot for RoundRobinRoutingSnapshot { + fn has_nodes(&self) -> bool { + !self.healthy_nodes.is_empty() + } + + fn next(&self) -> Option { + if self.healthy_nodes.is_empty() { + return None; + } + let prev_idx = self.current_idx.fetch_add(1, Ordering::Relaxed); + self.healthy_nodes + .iter() + .nth(prev_idx % self.healthy_nodes.len()) + .cloned() + } + + fn sync_nodes(&mut self, nodes: &[Node]) -> bool { + let new_nodes = HashSet::from_iter(nodes.iter().cloned()); + // Find nodes removed from topology. + let nodes_removed: Vec<_> = self + .existing_nodes + .difference(&new_nodes) + .cloned() + .collect(); + let has_removed_nodes = !nodes_removed.is_empty(); + // Find nodes added to topology. + let nodes_added: Vec<_> = new_nodes + .difference(&self.existing_nodes) + .cloned() + .collect(); + let has_added_nodes = !nodes_added.is_empty(); + // NOTE: newly added nodes will appear in the healthy_nodes later. + // This happens after the first node health check round and a consequent update_node() invocation. + self.existing_nodes.extend(nodes_added); + nodes_removed.iter().for_each(|node| { + self.existing_nodes.remove(node); + self.healthy_nodes.remove(node); + }); + + has_added_nodes || has_removed_nodes + } + + fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool { + if !self.existing_nodes.contains(node) { + return false; + } + if health.is_healthy() { + self.healthy_nodes.insert(node.clone()) + } else { + self.healthy_nodes.remove(node) + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + use std::{collections::HashSet, sync::atomic::Ordering}; + + use crate::agent::http_transport::dynamic_routing::{ + health_check::HealthCheckStatus, + node::Node, + snapshot::{ + round_robin_routing::RoundRobinRoutingSnapshot, routing_snapshot::RoutingSnapshot, + }, + }; + + #[test] + fn test_snapshot_init() { + // Arrange + let snapshot = RoundRobinRoutingSnapshot::new(); + // Assert + assert!(snapshot.healthy_nodes.is_empty()); + assert!(snapshot.existing_nodes.is_empty()); + assert!(!snapshot.has_nodes()); + assert_eq!(snapshot.current_idx.load(Ordering::SeqCst), 0); + assert!(snapshot.next().is_none()); + } + + #[test] + fn test_update_of_non_existing_node_always_returns_false() { + // Arrange + let mut snapshot = RoundRobinRoutingSnapshot::new(); + // This node is not present in existing_nodes + let node = Node::new("api1.com").unwrap(); + let healthy = HealthCheckStatus::new(Some(Duration::from_secs(1))); + let unhealthy = HealthCheckStatus::new(None); + // Act 1 + let is_updated = snapshot.update_node(&node, healthy); + // Assert + assert!(!is_updated); + assert!(snapshot.existing_nodes.is_empty()); + assert!(snapshot.next().is_none()); + // Act 2 + let is_updated = snapshot.update_node(&node, unhealthy); + // Assert + assert!(!is_updated); + assert!(snapshot.existing_nodes.is_empty()); + assert!(snapshot.next().is_none()); + } + + #[test] + fn test_update_of_existing_unhealthy_node_with_healthy_node_returns_true() { + // Arrange + let mut snapshot = RoundRobinRoutingSnapshot::new(); + let node = Node::new("api1.com").unwrap(); + // node is present in existing_nodes, but not in healthy_nodes + snapshot.existing_nodes.insert(node.clone()); + let health = HealthCheckStatus::new(Some(Duration::from_secs(1))); + // Act + let is_updated = snapshot.update_node(&node, health); + assert!(is_updated); + assert!(snapshot.has_nodes()); + assert_eq!(snapshot.next().unwrap(), node); + assert_eq!(snapshot.current_idx.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_update_of_existing_healthy_node_with_unhealthy_node_returns_true() { + // Arrange + let mut snapshot = RoundRobinRoutingSnapshot::new(); + let node = Node::new("api1.com").unwrap(); + snapshot.existing_nodes.insert(node.clone()); + snapshot.healthy_nodes.insert(node.clone()); + let unhealthy = HealthCheckStatus::new(None); + // Act + let is_updated = snapshot.update_node(&node, unhealthy); + assert!(is_updated); + assert!(!snapshot.has_nodes()); + assert!(snapshot.next().is_none()); + } + + #[test] + fn test_sync_node_scenarios() { + // Arrange + let mut snapshot = RoundRobinRoutingSnapshot::new(); + let node_1 = Node::new("api1.com").unwrap(); + // Sync with node_1 + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]); + assert!(nodes_changed); + assert!(snapshot.healthy_nodes.is_empty()); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_1.clone()]) + ); + // Add node_1 to healthy_nodes manually + snapshot.healthy_nodes.insert(node_1.clone()); + // Sync with node_1 again + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]); + assert!(!nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_1.clone()]) + ); + assert_eq!(snapshot.healthy_nodes, HashSet::from_iter(vec![node_1])); + // Sync with node_2 + let node_2 = Node::new("api2.com").unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]); + assert!(nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_2.clone()]) + ); + // Make sure node_1 was removed from healthy nodes + assert!(snapshot.healthy_nodes.is_empty()); + // Add node_2 to healthy_nodes manually + snapshot.healthy_nodes.insert(node_2.clone()); + // Sync with [node_2, node_3] + let node_3 = Node::new("api3.com").unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_3.clone(), node_2.clone()]); + assert!(nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_3.clone(), node_2.clone()]) + ); + assert_eq!(snapshot.healthy_nodes, HashSet::from_iter(vec![node_2])); + snapshot.healthy_nodes.insert(node_3); + // Sync with [] + let nodes_changed = snapshot.sync_nodes(&[]); + assert!(nodes_changed); + assert!(snapshot.existing_nodes.is_empty()); + // Make sure all nodes were removed from the healthy_nodes + assert!(snapshot.healthy_nodes.is_empty()); + // Sync with [] again + let nodes_changed = snapshot.sync_nodes(&[]); + assert!(!nodes_changed); + assert!(snapshot.existing_nodes.is_empty()); + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs new file mode 100644 index 00000000..155b8eac --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs @@ -0,0 +1,15 @@ +use std::fmt::Debug; + +use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; + +/// A trait for interacting with the snapshot of nodes (routing table). +pub trait RoutingSnapshot: Send + Sync + Clone + Debug { + /// Returns `true` if the snapshot has nodes. + fn has_nodes(&self) -> bool; + /// Get the next node in the snapshot. + fn next(&self) -> Option; + /// Syncs the nodes in the snapshot with the provided list of nodes, returning `true` if the snapshot was updated. + fn sync_nodes(&mut self, nodes: &[Node]) -> bool; + /// Updates the health status of a specific node, returning `true` if the node was found and updated. + fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> bool; +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs b/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs new file mode 100644 index 00000000..60004d75 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs @@ -0,0 +1,125 @@ +use std::collections::{HashMap, HashSet}; +use std::time::Duration; +use std::{fmt::Debug, hash::Hash, sync::Arc}; + +use arc_swap::ArcSwap; +use async_trait::async_trait; +use url::Url; + +use crate::agent::http_transport::{ + dynamic_routing::{ + dynamic_route_provider::DynamicRouteProviderError, + health_check::{HealthCheck, HealthCheckStatus}, + node::Node, + nodes_fetch::Fetch, + type_aliases::AtomicSwap, + }, + route_provider::RouteProvider, +}; + +pub(super) fn route_n_times(n: usize, f: Arc) -> Vec { + (0..n) + .map(|_| f.route().unwrap().domain().unwrap().to_string()) + .collect() +} + +pub(super) fn assert_routed_domains( + actual: Vec, + expected: Vec, + expected_repetitions: usize, +) where + T: AsRef + Eq + Hash + Debug + Ord, +{ + fn build_count_map(items: &[T]) -> HashMap<&T, usize> + where + T: Eq + Hash, + { + items.iter().fold(HashMap::new(), |mut map, item| { + *map.entry(item).or_insert(0) += 1; + map + }) + } + let count_actual = build_count_map(&actual); + let count_expected = build_count_map(&expected); + + let mut keys_actual = count_actual.keys().collect::>(); + keys_actual.sort(); + let mut keys_expected = count_expected.keys().collect::>(); + keys_expected.sort(); + // Assert all routed domains are present. + assert_eq!(keys_actual, keys_expected); + + // Assert the expected repetition count of each routed domain. + let actual_repetitions = count_actual.values().collect::>(); + assert!(actual_repetitions + .iter() + .all(|&x| x == &expected_repetitions)); +} + +#[derive(Debug)] +pub(super) struct NodesFetcherMock { + // A set of nodes, existing in the topology. + pub nodes: AtomicSwap>, +} + +#[async_trait] +impl Fetch for NodesFetcherMock { + async fn fetch(&self, _url: Url) -> Result, DynamicRouteProviderError> { + let nodes = (*self.nodes.load_full()).clone(); + Ok(nodes) + } +} + +impl Default for NodesFetcherMock { + fn default() -> Self { + Self::new() + } +} + +impl NodesFetcherMock { + pub fn new() -> Self { + Self { + nodes: Arc::new(ArcSwap::from_pointee(vec![])), + } + } + + pub fn overwrite_nodes(&self, nodes: Vec) { + self.nodes.store(Arc::new(nodes)); + } +} + +#[derive(Debug)] +pub(super) struct NodeHealthCheckerMock { + healthy_nodes: Arc>>, +} + +impl Default for NodeHealthCheckerMock { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl HealthCheck for NodeHealthCheckerMock { + async fn check(&self, node: &Node) -> Result { + let nodes = self.healthy_nodes.load_full(); + let latency = match nodes.contains(node) { + true => Some(Duration::from_secs(1)), + false => None, + }; + Ok(HealthCheckStatus::new(latency)) + } +} + +impl NodeHealthCheckerMock { + pub fn new() -> Self { + Self { + healthy_nodes: Arc::new(ArcSwap::from_pointee(HashSet::new())), + } + } + + pub fn overwrite_healthy_nodes(&self, healthy_nodes: Vec) { + self.healthy_nodes + .store(Arc::new(HashSet::from_iter(healthy_nodes))); + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs b/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs new file mode 100644 index 00000000..6be931fb --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs @@ -0,0 +1,18 @@ +use arc_swap::ArcSwap; +use std::sync::Arc; +use tokio::sync::{mpsc, watch}; + +/// A type alias for the sender end of a watch channel. +pub(super) type SenderWatch = watch::Sender>; + +/// A type alias for the receiver end of a watch channel. +pub(super) type ReceiverWatch = watch::Receiver>; + +/// A type alias for the sender end of a multi-producer, single-consumer channel. +pub(super) type SenderMpsc = mpsc::Sender; + +/// A type alias for the receiver end of a multi-producer, single-consumer channel. +pub(super) type ReceiverMpsc = mpsc::Receiver; + +/// A type alias for an atomic swap operation on a shared value. +pub(super) type AtomicSwap = Arc>; diff --git a/ic-agent/src/agent/http_transport/mod.rs b/ic-agent/src/agent/http_transport/mod.rs index 8f2220d6..7ffdd622 100644 --- a/ic-agent/src/agent/http_transport/mod.rs +++ b/ic-agent/src/agent/http_transport/mod.rs @@ -30,4 +30,6 @@ const ICP0_SUB_DOMAIN: &str = ".icp0.io"; const ICP_API_SUB_DOMAIN: &str = ".icp-api.io"; #[allow(dead_code)] const LOCALHOST_SUB_DOMAIN: &str = ".localhost"; +#[cfg(all(feature = "reqwest", not(target_family = "wasm")))] +pub mod dynamic_routing; pub mod route_provider;