From f831d96c74f46f1d265b9bd5e4960af5466fb180 Mon Sep 17 00:00:00 2001 From: Nikolay Komarevskiy Date: Tue, 25 Jun 2024 17:20:16 +0200 Subject: [PATCH] feat: dynamic_route_provider --- ic-agent/Cargo.toml | 8 + .../dynamic_routing/dynamic_route_provider.rs | 596 ++++++++++++++++++ .../dynamic_routing/health_check.rs | 265 ++++++++ .../dynamic_routing/messages.rs | 11 + .../http_transport/dynamic_routing/mod.rs | 9 + .../http_transport/dynamic_routing/node.rs | 43 ++ .../dynamic_routing/nodes_fetch.rs | 143 +++++ .../snapshot/latency_based_routing.rs | 365 +++++++++++ .../dynamic_routing/snapshot/mod.rs | 3 + .../snapshot/round_robin_routing.rs | 237 +++++++ .../snapshot/routing_snapshot.rs | 10 + .../dynamic_routing/test_utils.rs | 121 ++++ .../dynamic_routing/type_aliases.rs | 12 + ic-agent/src/agent/http_transport/mod.rs | 1 + 14 files changed, 1824 insertions(+) create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/messages.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/mod.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/node.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs create mode 100644 ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs diff --git a/ic-agent/Cargo.toml b/ic-agent/Cargo.toml index 1dcacdc7..5864f104 100644 --- a/ic-agent/Cargo.toml +++ b/ic-agent/Cargo.toml @@ -44,6 +44,14 @@ simple_asn1 = "0.6.1" thiserror = { workspace = true } time = { workspace = true } url = "2.1.0" +tokio = {version = "1.24.2", features = ["full"]} +async-trait = "0.1.80" +tracing = "0.1.40" +arc-swap = "1.7.1" +anyhow = "1.0.86" +simple_moving_average = "1.0.2" +tracing-subscriber = "0.3.18" +tokio-util = { version = "0.7.11", features = ["full"] } [dependencies.hyper] version = "1.0.1" diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs b/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs new file mode 100644 index 00000000..261a37e0 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/dynamic_route_provider.rs @@ -0,0 +1,596 @@ +//! An implementation of the [`RouteProvider`](crate::agent::http_transport::route_provider::RouteProvider) for dynamic generation of routing urls. + +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use anyhow::anyhow; +use arc_swap::ArcSwap; +use candid::Principal; +use reqwest::Client; +use tokio::{ + sync::{mpsc, watch}, + time::timeout, +}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tracing::{error, info, warn}; +use url::Url; + +use crate::{ + agent::http_transport::{ + dynamic_routing::{ + health_check::{HealthCheck, HealthChecker, HealthManagerActor}, + messages::FetchedNodes, + node::Node, + nodes_fetch::{Fetch, NodesFetchActor, NodesFetcher}, + snapshot::routing_snapshot::RoutingSnapshot, + type_aliases::GlobalShared, + }, + route_provider::RouteProvider, + }, + AgentError, +}; + +pub const IC0_SEED_DOMAIN: &str = "ic0.app"; + +const MAINNET_ROOT_SUBNET_ID: &str = + "tdb26-jop6k-aogll-7ltgs-eruif-6kk7m-qpktf-gdiqx-mxtrf-vb5e6-eqe"; + +const FETCH_PERIOD: Duration = Duration::from_secs(5); +const FETCH_RETRY_INTERVAL: Duration = Duration::from_millis(250); +const TIMEOUT_AWAIT_HEALTHY_SEED: Duration = Duration::from_millis(1000); +const HEALTH_CHECK_TIMEOUT: Duration = Duration::from_secs(2); +const HEALTH_CHECK_PERIOD: Duration = Duration::from_secs(1); + +const DYNAMIC_ROUTE_PROVIDER: &str = "DynamicRouteProvider"; + +#[derive(Debug)] +pub struct DynamicRouteProvider { + fetcher: Arc, + fetch_period: Duration, + fetch_retry_interval: Duration, + checker: Arc, + check_period: Duration, + snapshot: GlobalShared, + tracker: TaskTracker, + seeds: Vec, + token: CancellationToken, +} + +impl RouteProvider for DynamicRouteProvider +where + S: RoutingSnapshot + 'static, +{ + fn route(&self) -> Result { + let snapshot = self.snapshot.load(); + let node = snapshot.next().ok_or_else(|| { + AgentError::RouteProviderError("No healthy API nodes found.".to_string()) + })?; + Ok(node.to_routing_url()) + } +} + +impl DynamicRouteProvider +where + S: RoutingSnapshot + 'static, +{ + pub fn new(snapshot: S, seeds: Vec, http_client: Client) -> Self { + let fetcher = Arc::new(NodesFetcher::new( + http_client.clone(), + Principal::from_text(MAINNET_ROOT_SUBNET_ID).unwrap(), + )); + let checker = Arc::new(HealthChecker::new(http_client, HEALTH_CHECK_TIMEOUT)); + Self { + fetcher, + fetch_period: FETCH_PERIOD, + fetch_retry_interval: FETCH_RETRY_INTERVAL, + checker, + check_period: HEALTH_CHECK_PERIOD, + seeds, + snapshot: Arc::new(ArcSwap::from_pointee(snapshot)), + tracker: TaskTracker::new(), + token: CancellationToken::new(), + } + } + + pub fn with_fetcher(mut self, fetcher: Arc) -> Self { + self.fetcher = fetcher; + self + } + + pub fn with_fetch_period(mut self, period: Duration) -> Self { + self.fetch_period = period; + self + } + + pub fn with_checker(mut self, checker: Arc) -> Self { + self.checker = checker; + self + } + + pub fn with_check_period(mut self, period: Duration) -> Self { + self.check_period = period; + self + } + + /// Starts two background tasks: + /// - Task1: NodesFetchActor + /// - Periodically fetches existing API nodes (gets latest nodes topology) and sends discovered nodes to HealthManagerActor. + /// - Task2: HealthManagerActor: + /// - Listens to the fetched nodes messages from the NodesFetchActor. + /// - Starts/stops health check tasks (HealthCheckActors) based on the newly added/removed nodes. + /// - These spawned health check tasks periodically update the snapshot with the latest node health info. + pub async fn run(&self) -> anyhow::Result<()> { + info!("{DYNAMIC_ROUTE_PROVIDER}: start run() "); + // Communication channel between NodesFetchActor and HealthManagerActor. + let (fetch_sender, fetch_receiver) = watch::channel(None); + + // Communication channel with HealthManagerActor to receive info about healthy seeds. + let (init_sender, mut init_receiver) = mpsc::channel(1); + + // Start the receiving part first. + let health_manager_actor = HealthManagerActor::new( + Arc::clone(&self.checker), + self.check_period, + Arc::clone(&self.snapshot), + fetch_receiver, + init_sender, + self.token.clone(), + ); + self.tracker + .spawn(async move { health_manager_actor.run().await }); + + // Dispatch all seed nodes for initial health checks + let start = Instant::now(); + if let Err(err) = fetch_sender.send(Some(FetchedNodes { + nodes: self.seeds.clone(), + })) { + error!("{DYNAMIC_ROUTE_PROVIDER}: failed to send results to HealthManager: {err:?}"); + } + + // Try await healthy seeds. + let found_healthy_seeds = + match timeout(TIMEOUT_AWAIT_HEALTHY_SEED, init_receiver.recv()).await { + Ok(_) => { + info!( + "{DYNAMIC_ROUTE_PROVIDER}: found healthy seeds within {:?}", + start.elapsed() + ); + true + } + Err(_) => { + warn!( + "{DYNAMIC_ROUTE_PROVIDER}: no healthy seeds found within {:?}", + start.elapsed() + ); + false + } + }; + init_receiver.close(); + + let fetch_actor = NodesFetchActor::new( + Arc::clone(&self.fetcher), + self.fetch_period, + self.fetch_retry_interval, + fetch_sender, + Arc::clone(&self.snapshot), + self.token.clone(), + ); + self.tracker.spawn(async move { fetch_actor.run().await }); + info!( + "{DYNAMIC_ROUTE_PROVIDER}: NodesFetchActor and HealthManagerActor started successfully" + ); + + (found_healthy_seeds) + .then(|| ()) + .ok_or_else(|| anyhow!("No healthy seeds found")) + } + + // Kill all running tasks. + pub async fn stop(&self) { + self.token.cancel(); + self.tracker.close(); + self.tracker.wait().await; + warn!("{DYNAMIC_ROUTE_PROVIDER}: gracefully stopped"); + } +} + +#[cfg(test)] +mod tests { + use reqwest::Client; + use std::{ + sync::{Arc, Once}, + time::Duration, + }; + use tracing::Level; + use tracing_subscriber::FmtSubscriber; + + use crate::{ + agent::http_transport::{ + dynamic_routing::{ + dynamic_route_provider::{DynamicRouteProvider, IC0_SEED_DOMAIN}, + node::Node, + snapshot::round_robin_routing::RoundRobinRoutingSnapshot, + test_utils::{ + assert_routed_domains, route_n_times, NodeHealthCheckerMock, NodesFetcherMock, + }, + }, + route_provider::RouteProvider, + }, + AgentError, + }; + + static TRACING_INIT: Once = Once::new(); + + pub fn setup_tracing() { + TRACING_INIT.call_once(|| { + FmtSubscriber::builder().with_max_level(Level::TRACE).init(); + }); + } + + #[tokio::test] + async fn test_routing_with_topology_and_node_health_updates() { + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + fetcher.overwrite_nodes(vec![node_1.clone()]); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // A single healthy node exists in the topology. This node happens to be the seed node. + fetcher.overwrite_nodes(vec![node_1.clone()]); + checker.overwrite_healthy_nodes(vec![node_1.clone()]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = Arc::new( + DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client) + .with_fetcher(fetcher.clone()) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval), + ); + route_provider.run().await.expect("no healthy seeds found"); + + // This time span is required for the snapshot to be fully updated with the new nodes and their health info. + let snapshot_update_duration = fetch_interval + 2 * check_interval; + + // Test 1: multiple route() calls return a single domain=ic0.app. + // Only a single node exists, which is initially healthy. + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.domain.clone()], 6); + + // Test 2: multiple route() calls return 3 different domains with equal fairness (repetition). + // Two healthy nodes are added to the topology. + let node_2 = Node::new("api1.com").unwrap(); + let node_3 = Node::new("api2.com").unwrap(); + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_2.clone(), node_3.clone()]); + fetcher.overwrite_nodes(vec![node_1.clone(), node_2.clone(), node_3.clone()]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![ + node_1.clone().domain, + node_2.clone().domain, + node_3.clone().domain, + ], + 2, + ); + + // Test 3: multiple route() calls return 2 different domains with equal fairness (repetition). + // One node is set to unhealthy. + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_3.clone()]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![node_1.clone().domain, node_3.clone().domain], + 3, + ); + + // Test 4: multiple route() calls return 3 different domains with equal fairness (repetition). + // Unhealthy node is set back to healthy. + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_2.clone(), node_3.clone()]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![ + node_1.clone().domain, + node_2.clone().domain, + node_3.clone().domain, + ], + 2, + ); + + // Test 5: multiple route() calls return 3 different domains with equal fairness (repetition). + // One healthy node is added, but another one goes unhealthy. + let node_4 = Node::new("api3.com").unwrap(); + checker.overwrite_healthy_nodes(vec![node_2.clone(), node_3.clone(), node_4.clone()]); + fetcher.overwrite_nodes(vec![ + node_1.clone(), + node_2.clone(), + node_3.clone(), + node_4.clone(), + ]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![ + node_2.clone().domain, + node_3.clone().domain, + node_4.clone().domain, + ], + 2, + ); + + // Test 6: multiple route() calls return a single domain=api1.com. + // One node is set to unhealthy and one is removed from the topology. + checker.overwrite_healthy_nodes(vec![node_2.clone(), node_3.clone()]); + fetcher.overwrite_nodes(vec![node_1.clone(), node_2.clone(), node_4.clone()]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(3, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_2.clone().domain], 3); + + // Teardown. + route_provider.stop().await; + } + + #[tokio::test] + async fn test_route_with_initially_unhealthy_seeds_becoming_healthy() { + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + let node_2 = Node::new("api1.com").unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // Two nodes exist, which are initially unhealthy. + fetcher.overwrite_nodes(vec![node_1.clone(), node_2.clone()]); + checker.overwrite_healthy_nodes(vec![]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = Arc::new( + DynamicRouteProvider::new(snapshot, vec![node_1.clone(), node_2.clone()], client) + .with_fetcher(fetcher.clone()) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval), + ); + assert!(route_provider + .run() + .await + .unwrap_err() + .to_string() + .contains("No healthy seeds found")); + + // Test 1: calls to route() return an error, as no healthy seeds exist. + for _ in 0..4 { + tokio::time::sleep(check_interval).await; + let result = route_provider.route(); + assert_eq!( + result.unwrap_err(), + AgentError::RouteProviderError("No healthy API nodes found.".to_string()) + ); + } + + // Test 2: calls to route() return both seeds, as they become healthy. + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_2.clone()]); + tokio::time::sleep(3 * check_interval).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![node_1.domain.clone(), node_2.domain.clone()], + 3, + ); + + // Teardown. + route_provider.stop().await; + } + + #[tokio::test] + async fn test_routing_with_no_healthy_nodes_returns_an_error() { + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // A single seed node which is initially healthy. + fetcher.overwrite_nodes(vec![node_1.clone()]); + checker.overwrite_healthy_nodes(vec![node_1.clone()]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = Arc::new( + DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client) + .with_fetcher(fetcher.clone()) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval), + ); + route_provider.run().await.expect("no healthy seeds found"); + + // Test 1: multiple route() calls return a single domain=ic0.app, as the seed is healthy. + tokio::time::sleep(2 * check_interval).await; + let routed_domains = route_n_times(3, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.clone().domain], 3); + + // Test 2: calls to route() return an error, as no healthy nodes exist. + checker.overwrite_healthy_nodes(vec![]); + tokio::time::sleep(2 * check_interval).await; + for _ in 0..4 { + let result = route_provider.route(); + assert_eq!( + result.unwrap_err(), + AgentError::RouteProviderError("No healthy API nodes found.".to_string()) + ); + } + + // Teardown. + route_provider.stop().await; + } + + #[tokio::test] + async fn test_route_with_no_healthy_seeds_errors() { + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // No healthy seed nodes present, this should lead to errors. + fetcher.overwrite_nodes(vec![]); + checker.overwrite_healthy_nodes(vec![]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = Arc::new( + DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client) + .with_fetcher(fetcher.clone()) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval), + ); + assert!(route_provider + .run() + .await + .unwrap_err() + .to_string() + .contains("No healthy seeds found")); + + // Test: calls to route() return an error, as no healthy seeds exist. + for _ in 0..4 { + tokio::time::sleep(check_interval).await; + let result = route_provider.route(); + assert_eq!( + result.unwrap_err(), + AgentError::RouteProviderError("No healthy API nodes found.".to_string()) + ); + } + + // Teardown. + route_provider.stop().await; + } + + #[tokio::test] + async fn test_route_with_one_healthy_and_one_unhealthy_seed() { + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + let node_2 = Node::new("api1.com").unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // One healthy seed is present, it should be discovered during the initialization time. + fetcher.overwrite_nodes(vec![node_1.clone(), node_2.clone()]); + checker.overwrite_healthy_nodes(vec![node_1.clone()]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = Arc::new( + DynamicRouteProvider::new(snapshot, vec![node_1.clone(), node_2.clone()], client) + .with_fetcher(fetcher.clone()) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval), + ); + route_provider.run().await.expect("no healthy seeds found"); + + // Test 1: calls to route() return only a healthy seed ic0.app. + let routed_domains = route_n_times(3, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.clone().domain], 3); + + // Test 2: calls to route() return two healthy seeds, as the unhealthy seed becomes healthy. + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_2.clone()]); + tokio::time::sleep(2 * check_interval).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![node_1.clone().domain, node_2.clone().domain], + 3, + ); + + // Teardown. + route_provider.stop().await; + } + + #[tokio::test] + async fn test_routing_with_an_empty_fetched_list_of_api_nodes() { + // Check resiliency to an empty list of fetched API nodes (this should never happen in normal IC operation). + // Setup. + setup_tracing(); + let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + let node_2 = Node::new("api1.com").unwrap(); + // Set nodes fetching params: topology, fetching periodicity. + let fetcher = Arc::new(NodesFetcherMock::new()); + let fetch_interval = Duration::from_secs(2); + // Set health checking params: healthy nodes, checking periodicity. + let checker = Arc::new(NodeHealthCheckerMock::new()); + let check_interval = Duration::from_secs(1); + // One healthy seed is initially present, but the topology has no node. + fetcher.overwrite_nodes(vec![]); + checker.overwrite_healthy_nodes(vec![node_1.clone()]); + // Configure RouteProvider + let snapshot = RoundRobinRoutingSnapshot::new(); + let client = Client::builder().build().unwrap(); + let route_provider = Arc::new( + DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client) + .with_fetcher(fetcher.clone()) + .with_checker(checker.clone()) + .with_fetch_period(fetch_interval) + .with_check_period(check_interval), + ); + route_provider.run().await.expect("no healthy seeds found"); + + // This time span is required for the snapshot to be fully updated with the new nodes topology and health info. + let snapshot_update_duration = fetch_interval + 2 * check_interval; + + // Test 1: multiple route() calls return a single domain=ic0.app. + // HealthManagerActor shouldn't update the snapshot, if the list of fetched nodes is empty, thus we observe the healthy seed. + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(3, Arc::clone(&route_provider)); + assert_routed_domains(routed_domains, vec![node_1.clone().domain], 3); + + // Test 2: multiple route() calls should now return 3 different domains with equal fairness (repetition). + // Three nodes are added to the topology, i.e. now the fetched nodes list is non-empty. + let node_2 = Node::new("api1.com").unwrap(); + let node_3 = Node::new("api2.com").unwrap(); + fetcher.overwrite_nodes(vec![node_1.clone(), node_2.clone(), node_3.clone()]); + checker.overwrite_healthy_nodes(vec![node_1.clone(), node_2.clone(), node_3.clone()]); + tokio::time::sleep(snapshot_update_duration).await; + let routed_domains = route_n_times(6, Arc::clone(&route_provider)); + assert_routed_domains( + routed_domains, + vec![ + node_1.clone().domain, + node_2.clone().domain, + node_3.clone().domain, + ], + 2, + ); + + // Teardown. + route_provider.stop().await; + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs b/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs new file mode 100644 index 00000000..a1565aa7 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs @@ -0,0 +1,265 @@ +use anyhow::bail; +use async_trait::async_trait; +use http::{Method, StatusCode}; +use reqwest::{Client, Request}; +use std::{ + fmt::Debug, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::{sync::mpsc, time}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; +use tracing::{debug, error, info, warn}; +use url::Url; + +use crate::agent::http_transport::dynamic_routing::{ + messages::{FetchedNodes, NodeHealthState}, + node::Node, + snapshot::routing_snapshot::RoutingSnapshot, + type_aliases::{GlobalShared, ReceiverMpsc, ReceiverWatch, SenderMpsc}, +}; + +const CHANNEL_BUFFER: usize = 128; + +#[async_trait] +pub trait HealthCheck: Send + Sync + Debug { + async fn check(&self, node: &Node) -> anyhow::Result; +} + +#[derive(Clone, PartialEq, Debug, Default)] +pub struct HealthCheckStatus { + pub latency: Option, +} + +impl HealthCheckStatus { + pub fn new(latency: Option) -> Self { + Self { latency } + } + + pub fn is_healthy(&self) -> bool { + self.latency.is_some() + } +} + +#[derive(Debug)] +pub struct HealthChecker { + http_client: Client, + timeout: Duration, +} + +impl HealthChecker { + pub fn new(http_client: Client, timeout: Duration) -> Self { + Self { + http_client, + timeout, + } + } +} + +const HEALTH_CHECKER: &str = "HealthChecker"; + +#[async_trait] +impl HealthCheck for HealthChecker { + async fn check(&self, node: &Node) -> anyhow::Result { + // API boundary node exposes /health endpoint and should respond with 204 (No Content) if it's healthy. + let url = Url::parse(&format!("https://{}/health", node.domain))?; + + let mut request = Request::new(Method::GET, url.clone()); + *request.timeout_mut() = Some(self.timeout); + + let start = Instant::now(); + let response = self.http_client.execute(request).await?; + let latency = start.elapsed(); + + if response.status() != StatusCode::NO_CONTENT { + let err_msg = format!( + "{HEALTH_CHECKER}: Unexpected http status code {} for url={url} received", + response.status() + ); + error!(err_msg); + bail!(err_msg); + } + + Ok(HealthCheckStatus::new(Some(latency))) + } +} + +const HEALTH_CHECK_ACTOR: &str = "HealthCheckActor"; + +struct HealthCheckActor { + checker: Arc, + period: Duration, + node: Node, + sender_channel: SenderMpsc, + token: CancellationToken, +} + +impl HealthCheckActor { + pub fn new( + checker: Arc, + period: Duration, + node: Node, + sender_channel: SenderMpsc, + token: CancellationToken, + ) -> Self { + Self { + checker, + period, + node, + sender_channel, + token, + } + } + + pub async fn run(self) { + let mut interval = time::interval(self.period); + loop { + tokio::select! { + _ = interval.tick() => { + let health = self.checker.check(&self.node).await.unwrap_or_default(); + let message = NodeHealthState { + node: self.node.clone(), + health, + }; + // Inform the listener about node's health. It can only fail if the listener was closed/dropped. + self.sender_channel.send(message).await.expect("Failed to send node's health state"); + } + _ = self.token.cancelled() => { + info!("{HEALTH_CHECK_ACTOR}: was gracefully cancelled for node {:?}", self.node); + break; + } + } + } + } +} + +pub const HEALTH_MANAGER_ACTOR: &str = "HealthManagerActor"; + +pub struct HealthManagerActor { + checker: Arc, + period: Duration, + nodes_snapshot: GlobalShared, + fetch_receiver: ReceiverWatch, + check_sender: SenderMpsc, + check_receiver: ReceiverMpsc, + init_sender: SenderMpsc, + token: CancellationToken, + nodes_token: CancellationToken, + nodes_tracker: TaskTracker, + is_initialized: bool, +} + +impl HealthManagerActor +where + S: RoutingSnapshot, +{ + pub fn new( + checker: Arc, + period: Duration, + nodes_snapshot: GlobalShared, + fetch_receiver: ReceiverWatch, + init_sender: SenderMpsc, + token: CancellationToken, + ) -> Self { + let (check_sender, check_receiver) = mpsc::channel(CHANNEL_BUFFER); + + Self { + checker, + period, + nodes_snapshot, + fetch_receiver, + check_sender, + check_receiver, + init_sender, + token, + nodes_token: CancellationToken::new(), + nodes_tracker: TaskTracker::new(), + is_initialized: false, + } + } + + pub async fn run(mut self) { + loop { + tokio::select! { + // Check if a new array of fetched nodes appeared in the channel from NodesFetchService. + result = self.fetch_receiver.changed() => { + if let Err(err) = result { + error!("{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {err:?}"); + self.token.cancel(); + continue; + } + // Get the latest value from the channel and mark it as seen. + let Some(FetchedNodes { nodes }) = self.fetch_receiver.borrow_and_update().clone() else { continue }; + self.handle_fetch_update(nodes).await; + } + // Receive health check messages from all running NodeHealthChecker/s. + Some(msg) = self.check_receiver.recv() => { + self.handle_health_update(msg).await; + } + _ = self.token.cancelled() => { + self.stop_all_checks().await; + self.check_receiver.close(); + warn!("{HEALTH_MANAGER_ACTOR}: was gracefully cancelled, all nodes health checks stopped"); + break; + } + } + } + } + + async fn handle_health_update(&mut self, msg: NodeHealthState) { + let current_snapshot = self.nodes_snapshot.load_full(); + let mut new_snapshot = (*current_snapshot).clone(); + if let Err(err) = new_snapshot.update_node(&msg.node, msg.health.clone()) { + error!("{HEALTH_MANAGER_ACTOR}: failed to update snapshot: {err:?}"); + return; + } + self.nodes_snapshot.store(Arc::new(new_snapshot)); + if !self.is_initialized && msg.health.is_healthy() { + self.is_initialized = true; + // If TIMEOUT_AWAIT_HEALTHY_SEED has been exceeded, the receiver was dropped and send would thus fail. We ignore the failure. + let _ = self.init_sender.send(true).await; + } + } + + async fn handle_fetch_update(&mut self, nodes: Vec) { + if nodes.is_empty() { + // This is a bug in the IC registry. There should be at least one API Boundary Node in the registry. + // Updating nodes snapshot with an empty array, would lead to an irrecoverable error, as new nodes couldn't be fetched. + // We avoid such updates and just wait for a non-empty list. + error!("{HEALTH_MANAGER_ACTOR}: list of fetched nodes is empty"); + return; + } + debug!("{HEALTH_MANAGER_ACTOR}: fetched nodes received {:?}", nodes); + let current_snapshot = self.nodes_snapshot.load_full(); + let mut new_snapshot = (*current_snapshot).clone(); + // If the snapshot has changed, store it and restart all node's health checks. + if let Ok(true) = new_snapshot.sync_nodes(&nodes) { + self.nodes_snapshot.store(Arc::new(new_snapshot)); + self.stop_all_checks().await; + self.start_checks(nodes.to_vec()); + } + } + + fn start_checks(&mut self, nodes: Vec) { + // Create a single cancellation token for all started health checks. + self.nodes_token = CancellationToken::new(); + for node in nodes { + debug!("{HEALTH_MANAGER_ACTOR}: starting health check for node {node:?}"); + let actor = HealthCheckActor::new( + Arc::clone(&self.checker), + self.period, + node, + self.check_sender.clone(), + self.nodes_token.clone(), + ); + self.nodes_tracker.spawn(async move { actor.run().await }); + } + } + + async fn stop_all_checks(&self) { + warn!("{HEALTH_MANAGER_ACTOR}: stopping all running health checks"); + self.nodes_token.cancel(); + self.nodes_tracker.close(); + self.nodes_tracker.wait().await; + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs b/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs new file mode 100644 index 00000000..0e3d3935 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/messages.rs @@ -0,0 +1,11 @@ +use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; + +#[derive(Debug, Clone)] +pub struct FetchedNodes { + pub nodes: Vec, +} + +pub struct NodeHealthState { + pub node: Node, + pub health: HealthCheckStatus, +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs b/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs new file mode 100644 index 00000000..42dc4e08 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/mod.rs @@ -0,0 +1,9 @@ +pub mod dynamic_route_provider; +pub mod health_check; +pub mod messages; +pub mod node; +pub mod nodes_fetch; +pub mod snapshot; +#[cfg(test)] +pub mod test_utils; +pub mod type_aliases; diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/node.rs b/ic-agent/src/agent/http_transport/dynamic_routing/node.rs new file mode 100644 index 00000000..2ba2c989 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/node.rs @@ -0,0 +1,43 @@ +use url::Url; + +use crate::agent::ApiBoundaryNode; +use anyhow::anyhow; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Node { + pub domain: String, +} + +impl Node { + pub fn new(domain: &str) -> anyhow::Result { + if !is_valid_domain(domain) { + return Err(anyhow!("Invalid domain name {domain}")); + } + Ok(Self { + domain: domain.to_string(), + }) + } +} + +impl Node { + pub fn to_routing_url(&self) -> Url { + Url::parse(&format!("https://{}/api/v2/", self.domain)).expect("failed to parse URL") + } +} + +impl From<&Node> for Url { + fn from(node: &Node) -> Self { + Url::parse(&format!("https://{}", node.domain)).expect("failed to parse URL") + } +} + +impl From<&ApiBoundaryNode> for Node { + fn from(api_bn: &ApiBoundaryNode) -> Self { + Node::new(api_bn.domain.as_str()).unwrap() + } +} + +pub fn is_valid_domain>(domain: S) -> bool { + // TODO + true +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs b/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs new file mode 100644 index 00000000..3ce8fec4 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/nodes_fetch.rs @@ -0,0 +1,143 @@ +use anyhow::Context; +use async_trait::async_trait; +use candid::Principal; +use reqwest::Client; +use std::{fmt::Debug, sync::Arc, time::Duration}; +use tokio::time::{self, sleep}; +use tokio_util::sync::CancellationToken; +use tracing::{error, warn}; +use url::Url; + +use crate::agent::{ + http_transport::{ + dynamic_routing::{ + health_check::HEALTH_MANAGER_ACTOR, messages::FetchedNodes, node::Node, + snapshot::routing_snapshot::RoutingSnapshot, type_aliases::GlobalShared, + type_aliases::SenderWatch, + }, + reqwest_transport::ReqwestTransport, + }, + Agent, +}; + +const NODES_FETCH_ACTOR: &str = "NodesFetchActor"; + +#[async_trait] +pub trait Fetch: Sync + Send + Debug { + async fn fetch(&self, url: Url) -> anyhow::Result>; +} + +#[derive(Debug)] +pub struct NodesFetcher { + http_client: Client, + subnet_id: Principal, +} + +impl NodesFetcher { + pub fn new(http_client: Client, subnet_id: Principal) -> Self { + Self { + http_client, + subnet_id, + } + } +} + +#[async_trait] +impl Fetch for NodesFetcher { + async fn fetch(&self, url: Url) -> anyhow::Result> { + let transport = ReqwestTransport::create_with_client(url, self.http_client.clone()) + .with_context(|| "Failed to build transport: {err}")?; + let agent = Agent::builder() + .with_transport(transport) + .build() + .with_context(|| "Failed to build an agent: {err}")?; + agent + .fetch_root_key() + .await + .with_context(|| "Failed to fetch root key: {err}")?; + let api_bns = agent + .fetch_api_boundary_nodes_by_subnet_id(self.subnet_id) + .await?; + let nodes: Vec = api_bns.iter().map(|node| node.into()).collect(); + return Ok(nodes); + } +} + +pub struct NodesFetchActor { + fetcher: Arc, + period: Duration, + fetch_retry_interval: Duration, + fetch_sender: SenderWatch, + snapshot: GlobalShared, + token: CancellationToken, +} + +impl NodesFetchActor +where + S: RoutingSnapshot, +{ + pub fn new( + fetcher: Arc, + period: Duration, + retry_interval: Duration, + fetch_sender: SenderWatch, + snapshot: GlobalShared, + token: CancellationToken, + ) -> Self { + Self { + fetcher, + period, + fetch_retry_interval: retry_interval, + fetch_sender, + snapshot, + token, + } + } + + pub async fn run(self) { + let mut interval = time::interval(self.period); + loop { + tokio::select! { + _ = interval.tick() => { + // Retry until success: + // - try to get a healthy node from the snapshot + // - if snapshot is empty, break the cycle and wait for the next fetch cycle + // - using the healthy node, try to fetch new nodes from topology + // - if failure, sleep and retry + // - try send fetched nodes to the listener + // - failure should never happen + loop { + let snapshot = self.snapshot.load(); + if let Some(node) = snapshot.next() { + match self.fetcher.fetch((&node).into()).await { + Ok(nodes) => { + let msg = Some( + FetchedNodes {nodes}); + match self.fetch_sender.send(msg) { + Ok(()) => break, // message sent successfully, exist the loop + Err(err) => { + error!("{NODES_FETCH_ACTOR}: failed to send results to {HEALTH_MANAGER_ACTOR}: {err:?}"); + } + } + }, + Err(err) => { + error!("{NODES_FETCH_ACTOR}: failed to fetch nodes: {err:?}"); + } + }; + } else { + // No healthy nodes in the snapshot, break the cycle and wait for the next fetch cycle + error!("{NODES_FETCH_ACTOR}: no nodes in the snapshot"); + break; + }; + warn!("Retrying to fetch the nodes in {:?}", self.fetch_retry_interval); + sleep(self.fetch_retry_interval).await; + } + } + _ = self.token.cancelled() => { + warn!("{NODES_FETCH_ACTOR}: was gracefully cancelled"); + break; + } + } + } + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs new file mode 100644 index 00000000..b8328724 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/latency_based_routing.rs @@ -0,0 +1,365 @@ +use std::{collections::HashSet, time::Duration}; + +use rand::Rng; +use simple_moving_average::{SumTreeSMA, SMA}; + +use crate::agent::http_transport::dynamic_routing::{ + health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot, +}; + +// Some big value implying that node is unhealthy, should be much bigger than node's latency. +const MAX_LATENCY: Duration = Duration::from_secs(500); + +const WINDOW_SIZE: usize = 15; + +type LatencyMovAvg = SumTreeSMA; + +#[derive(Clone, Debug)] +struct WeightedNode { + node: Node, + latency_mov_avg: LatencyMovAvg, + weight: f64, +} + +#[derive(Default, Debug, Clone)] +pub struct LatencyRoutingSnapshot { + weighted_nodes: Vec, + existing_nodes: HashSet, +} + +impl LatencyRoutingSnapshot { + pub fn new() -> Self { + Self { + weighted_nodes: vec![], + existing_nodes: HashSet::new(), + } + } +} + +// select weight index based on the input number in range [0, 1] +#[inline(always)] +fn weighted_sample(weights: &[f64], number: f64) -> Option { + if number < 0.0 || number > 1.0 { + return None; + } + let sum: f64 = weights.iter().sum(); + let mut weighted_number = number * sum; + for (idx, weight) in weights.iter().enumerate() { + weighted_number -= weight; + if weighted_number <= 0.0 { + return Some(idx); + } + } + None +} + +impl RoutingSnapshot for LatencyRoutingSnapshot { + fn has_nodes(&self) -> bool { + !self.weighted_nodes.is_empty() + } + + fn next(&self) -> Option { + // We select a node based on it's weight, using a stochastic weighted random sampling approach. + let weights = self + .weighted_nodes + .iter() + .map(|n| n.weight) + .collect::>(); + // Generate a random float in the range [0, 1) + let mut rng = rand::thread_rng(); + let rand_num = rng.gen::(); + // Using this random float and an array of weights we get an index of the node. + let idx = weighted_sample(weights.as_slice(), rand_num); + idx.map(|idx| self.weighted_nodes[idx].node.clone()) + } + + fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result { + let new_nodes = HashSet::from_iter(nodes.into_iter().cloned()); + // Find nodes removed from snapshot. + let nodes_removed: Vec<_> = self + .existing_nodes + .difference(&new_nodes) + .cloned() + .collect(); + let has_removed_nodes = !nodes_removed.is_empty(); + // Find nodes added to topology. + let nodes_added: Vec<_> = new_nodes + .difference(&self.existing_nodes) + .cloned() + .collect(); + let has_added_nodes = !nodes_added.is_empty(); + self.existing_nodes.extend(nodes_added.into_iter()); + // NOTE: newly added nodes will appear in the weighted_nodes later. + // This happens after the first node health check round and a consequent update_node() invocation. + for node in nodes_removed.into_iter() { + self.existing_nodes.remove(&node); + let idx = self.weighted_nodes.iter().position(|x| x.node == node); + idx.map(|idx| self.weighted_nodes.swap_remove(idx)); + } + Ok(has_added_nodes || has_removed_nodes) + } + + fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result { + if !self.existing_nodes.contains(node) { + return Ok(false); + } + + // If latency is None (meaning Node is unhealthy), we assign some big value + let latency = health.latency.unwrap_or(MAX_LATENCY).as_millis() as f64; + + if let Some(idx) = self.weighted_nodes.iter().position(|x| &x.node == node) { + // Node is already in the array (it is not the first update_node() call). + self.weighted_nodes[idx].latency_mov_avg.add_sample(latency); + let latency_avg = self.weighted_nodes[idx].latency_mov_avg.get_average(); + // As nodes with smaller average latencies are preferred for routing, we use inverted values for weights. + self.weighted_nodes[idx].weight = 1.0 / latency_avg; + } else { + // Node is not yet in array (first update_node() call). + let mut latency_mov_avg = LatencyMovAvg::new(); + latency_mov_avg.add_sample(latency); + let weight = 1.0 / latency_mov_avg.get_average(); + self.weighted_nodes.push(WeightedNode { + latency_mov_avg, + node: node.clone(), + weight, + }) + } + Ok(true) + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashSet, time::Duration}; + + use simple_moving_average::SMA; + + use crate::agent::http_transport::dynamic_routing::{ + health_check::HealthCheckStatus, + node::Node, + snapshot::{ + latency_based_routing::{ + weighted_sample, LatencyMovAvg, LatencyRoutingSnapshot, WeightedNode, MAX_LATENCY, + }, + routing_snapshot::RoutingSnapshot, + }, + }; + + #[test] + fn test_snapshot_init() { + // Arrange + let snapshot = LatencyRoutingSnapshot::new(); + // Assert + assert!(snapshot.weighted_nodes.is_empty()); + assert!(snapshot.existing_nodes.is_empty()); + assert!(!snapshot.has_nodes()); + assert!(snapshot.next().is_none()); + } + + #[test] + fn test_update_for_non_existing_node_fails() { + // Arrange + let mut snapshot = LatencyRoutingSnapshot::new(); + let node = Node::new("api1.com").unwrap(); + let health = HealthCheckStatus { + latency: Some(Duration::from_secs(1)), + }; + // Act + let is_updated = snapshot + .update_node(&node, health) + .expect("node update failed"); + // Assert + assert!(!is_updated); + assert!(snapshot.weighted_nodes.is_empty()); + assert!(!snapshot.has_nodes()); + assert!(snapshot.next().is_none()); + } + + #[test] + fn test_update_for_existing_node_succeeds() { + // Arrange + let mut snapshot = LatencyRoutingSnapshot::new(); + let node = Node::new("api1.com").unwrap(); + let health = HealthCheckStatus { + latency: Some(Duration::from_secs(1)), + }; + snapshot.existing_nodes.insert(node.clone()); + // Check first update + let is_updated = snapshot + .update_node(&node, health) + .expect("node update failed"); + assert!(is_updated); + assert!(snapshot.has_nodes()); + let weighted_node = snapshot.weighted_nodes.first().unwrap(); + assert_eq!(weighted_node.latency_mov_avg.get_average(), 1000.0); + assert_eq!(weighted_node.weight, 0.001); + assert_eq!(snapshot.next().unwrap(), node); + // Check second update + let health = HealthCheckStatus { + latency: Some(Duration::from_secs(2)), + }; + let is_updated = snapshot + .update_node(&node, health) + .expect("node update failed"); + assert!(is_updated); + let weighted_node = snapshot.weighted_nodes.first().unwrap(); + assert_eq!(weighted_node.latency_mov_avg.get_average(), 1500.0); + assert_eq!(weighted_node.weight, 1.0 / 1500.0); + // Check third update + let health = HealthCheckStatus { + latency: Some(Duration::from_secs(3)), + }; + let is_updated = snapshot + .update_node(&node, health) + .expect("node update failed"); + assert!(is_updated); + let weighted_node = snapshot.weighted_nodes.first().unwrap(); + assert_eq!(weighted_node.latency_mov_avg.get_average(), 2000.0); + assert_eq!(weighted_node.weight, 1.0 / 2000.0); + // Check forth update with none + let health = HealthCheckStatus { latency: None }; + let is_updated = snapshot + .update_node(&node, health) + .expect("node update failed"); + assert!(is_updated); + let weighted_node = snapshot.weighted_nodes.first().unwrap(); + assert_eq!( + weighted_node.latency_mov_avg.get_average(), + (MAX_LATENCY.as_millis() as f64 + 6000.0) / 4.0 + ); + assert_eq!( + weighted_node.weight, + 4.0 / (MAX_LATENCY.as_millis() as f64 + 6000.0) + ); + assert_eq!(snapshot.weighted_nodes.len(), 1); + assert_eq!(snapshot.existing_nodes.len(), 1); + assert_eq!(snapshot.next().unwrap(), node); + } + + #[test] + fn test_sync_node_scenarios() { + // Arrange + let mut snapshot = LatencyRoutingSnapshot::new(); + let node_1 = Node::new("api1.com").unwrap(); + // Sync with node_1 + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap(); + assert!(nodes_changed); + assert!(snapshot.weighted_nodes.is_empty()); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_1.clone()]) + ); + // Add node_1 to weighted_nodes manually + snapshot.weighted_nodes.push(WeightedNode { + node: node_1.clone(), + latency_mov_avg: LatencyMovAvg::new(), + weight: 0.0, + }); + // Sync with node_1 again + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap(); + assert!(!nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_1.clone()]) + ); + assert_eq!(snapshot.weighted_nodes[0].node, node_1); + // Sync with node_2 + let node_2 = Node::new("api2.com").unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]).unwrap(); + assert!(nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_2.clone()]) + ); + // Make sure node_1 was removed from weighted_nodes too + assert!(snapshot.weighted_nodes.is_empty()); + // Add node_2 to weighted_nodes manually + snapshot.weighted_nodes.push(WeightedNode { + node: node_2.clone(), + latency_mov_avg: LatencyMovAvg::new(), + weight: 0.0, + }); + // Sync with [node_2, node_3] + let node_3 = Node::new("api3.com").unwrap(); + let nodes_changed = snapshot + .sync_nodes(&[node_3.clone(), node_2.clone()]) + .unwrap(); + assert!(nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_3.clone(), node_2.clone()]) + ); + assert_eq!(snapshot.weighted_nodes[0].node, node_2); + // Add node_3 to weighted_nodes manually + snapshot.weighted_nodes.push(WeightedNode { + node: node_3.clone(), + latency_mov_avg: LatencyMovAvg::new(), + weight: 0.0, + }); + // Sync with [] + let nodes_changed = snapshot.sync_nodes(&[]).unwrap(); + assert!(nodes_changed); + assert!(snapshot.existing_nodes.is_empty()); + // Make sure all nodes were removed from the healthy_nodes + assert!(snapshot.weighted_nodes.is_empty()); + // Sync with [] again + let nodes_changed = snapshot.sync_nodes(&[]).unwrap(); + assert!(!nodes_changed); + assert!(snapshot.existing_nodes.is_empty()); + } + + #[test] + fn test_weighted_sample() { + // Case 1: empty array + let arr: &[f64] = &[]; + let idx = weighted_sample(arr, 0.5); + assert_eq!(idx, None); + // Case 2: single element in array + let arr: &[f64] = &[1.0]; + let idx = weighted_sample(arr, 0.0); + assert_eq!(idx, Some(0)); + let idx = weighted_sample(arr, 1.0); + assert_eq!(idx, Some(0)); + // check bounds + let idx = weighted_sample(arr, -1.0); + assert_eq!(idx, None); + let idx = weighted_sample(arr, 1.1); + assert_eq!(idx, None); + // Case 3: two elements in array (second element has twice the weight of the first) + let arr: &[f64] = &[1.0, 2.0]; // prefixed_sum = [1.0, 3.0] + let idx = weighted_sample(arr, 0.0); // 0.0 * 3.0 < 1.0 + assert_eq!(idx, Some(0)); + let idx = weighted_sample(arr, 0.33); // 0.33 * 3.0 < 1.0 + assert_eq!(idx, Some(0)); // selection probability ~0.33 + let idx = weighted_sample(arr, 0.35); // 0.35 * 3.0 > 1.0 + assert_eq!(idx, Some(1)); // selection probability ~0.66 + let idx = weighted_sample(arr, 1.0); // 1.0 * 3.0 > 1.0 + assert_eq!(idx, Some(1)); + // check bounds + let idx = weighted_sample(arr, -1.0); + assert_eq!(idx, None); + let idx = weighted_sample(arr, 1.1); + assert_eq!(idx, None); + // Case 4: four elements in array + let arr: &[f64] = &[1.0, 2.0, 1.5, 2.5]; // prefixed_sum = [1.0, 3.0, 4.5, 7.0] + let idx = weighted_sample(arr, 0.14); // 0.14 * 7 < 1.0 + assert_eq!(idx, Some(0)); // probability ~0.14 + let idx = weighted_sample(arr, 0.15); // 0.15 * 7 > 1.0 + assert_eq!(idx, Some(1)); + let idx = weighted_sample(arr, 0.42); // 0.42 * 7 < 3.0 + assert_eq!(idx, Some(1)); // probability ~0.28 + let idx = weighted_sample(arr, 0.43); // 0.43 * 7 > 3.0 + assert_eq!(idx, Some(2)); + let idx = weighted_sample(arr, 0.64); // 0.64 * 7 < 4.5 + assert_eq!(idx, Some(2)); // probability ~0.22 + let idx = weighted_sample(arr, 0.65); // 0.65 * 7 > 4.5 + assert_eq!(idx, Some(3)); + let idx = weighted_sample(arr, 0.99); + assert_eq!(idx, Some(3)); // probability ~0.35 + // check bounds + let idx = weighted_sample(arr, -1.0); + assert_eq!(idx, None); + let idx = weighted_sample(arr, 1.1); + assert_eq!(idx, None); + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs new file mode 100644 index 00000000..1c63df8b --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/mod.rs @@ -0,0 +1,3 @@ +pub mod latency_based_routing; +pub mod round_robin_routing; +pub mod routing_snapshot; diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs new file mode 100644 index 00000000..d7dc4995 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/round_robin_routing.rs @@ -0,0 +1,237 @@ +use std::{ + collections::HashSet, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +use crate::agent::http_transport::dynamic_routing::{ + health_check::HealthCheckStatus, node::Node, snapshot::routing_snapshot::RoutingSnapshot, +}; + +#[derive(Default, Debug, Clone)] +pub struct RoundRobinRoutingSnapshot { + current_idx: Arc, + healthy_nodes: HashSet, + existing_nodes: HashSet, +} + +impl RoundRobinRoutingSnapshot { + pub fn new() -> Self { + Self { + current_idx: Arc::new(AtomicUsize::new(0)), + healthy_nodes: HashSet::new(), + existing_nodes: HashSet::new(), + } + } +} + +impl RoutingSnapshot for RoundRobinRoutingSnapshot { + fn has_nodes(&self) -> bool { + !self.healthy_nodes.is_empty() + } + + fn next(&self) -> Option { + if self.healthy_nodes.is_empty() { + return None; + } + let prev_idx = self.current_idx.fetch_add(1, Ordering::Relaxed); + self.healthy_nodes + .iter() + .nth(prev_idx % self.healthy_nodes.len()) + .cloned() + } + + fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result { + let new_nodes = HashSet::from_iter(nodes.into_iter().cloned()); + // Find nodes removed from snapshot. + let nodes_removed: Vec<_> = self + .existing_nodes + .difference(&new_nodes) + .cloned() + .collect(); + let has_removed_nodes = !nodes_removed.is_empty(); + // Find nodes added to snapshot. + let nodes_added: Vec<_> = new_nodes + .difference(&self.existing_nodes) + .cloned() + .collect(); + let has_added_nodes = !nodes_added.is_empty(); + // NOTE: newly added nodes will appear in the healthy_nodes later. + // This happens after the first node health check round and a consequent update_node() invocation. + self.existing_nodes.extend(nodes_added); + nodes_removed.iter().for_each(|node| { + self.existing_nodes.remove(node); + self.healthy_nodes.remove(node); + }); + Ok(has_added_nodes || has_removed_nodes) + } + + fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result { + if !self.existing_nodes.contains(&node) { + return Ok(false); + } + if health.latency.is_some() { + Ok(self.healthy_nodes.insert(node.clone())) + } else { + Ok(self.healthy_nodes.remove(&node)) + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + use std::{collections::HashSet, sync::atomic::Ordering}; + + use crate::agent::http_transport::dynamic_routing::{ + health_check::HealthCheckStatus, + node::Node, + snapshot::{ + round_robin_routing::RoundRobinRoutingSnapshot, routing_snapshot::RoutingSnapshot, + }, + }; + + #[test] + fn test_snapshot_init() { + // Arrange + let snapshot = RoundRobinRoutingSnapshot::new(); + // Assert + assert!(snapshot.healthy_nodes.is_empty()); + assert!(snapshot.existing_nodes.is_empty()); + assert!(!snapshot.has_nodes()); + assert_eq!(snapshot.current_idx.load(Ordering::SeqCst), 0); + assert!(snapshot.next().is_none()); + } + + #[test] + fn test_update_of_non_existing_node_always_returns_false() { + // Arrange + let mut snapshot = RoundRobinRoutingSnapshot::new(); + // This node is not present in existing_nodes + let node = Node::new("api1.com").unwrap(); + let healthy = HealthCheckStatus { + latency: Some(Duration::from_secs(1)), + }; + let unhealthy = HealthCheckStatus { latency: None }; + // Act 1 + let is_updated = snapshot + .update_node(&node, healthy) + .expect("node update failed"); + // Assert + assert!(!is_updated); + assert!(snapshot.existing_nodes.is_empty()); + assert!(snapshot.next().is_none()); + // Act 2 + let is_updated = snapshot + .update_node(&node, unhealthy) + .expect("node update failed"); + // Assert + assert!(!is_updated); + assert!(snapshot.existing_nodes.is_empty()); + assert!(snapshot.next().is_none()); + } + + #[test] + fn test_update_of_existing_unhealthy_node_with_healthy_node_returns_true() { + // Arrange + let mut snapshot = RoundRobinRoutingSnapshot::new(); + let node = Node::new("api1.com").unwrap(); + // node is present in existing_nodes, but not in healthy_nodes + snapshot.existing_nodes.insert(node.clone()); + let health = HealthCheckStatus { + latency: Some(Duration::from_secs(1)), + }; + // Act + let is_updated = snapshot + .update_node(&node, health) + .expect("node update failed"); + assert!(is_updated); + assert!(snapshot.has_nodes()); + assert_eq!(snapshot.next().unwrap(), node); + assert_eq!(snapshot.current_idx.load(Ordering::SeqCst), 1); + } + + #[test] + fn test_update_of_existing_healthy_node_with_unhealthy_node_returns_true() { + // Arrange + let mut snapshot = RoundRobinRoutingSnapshot::new(); + let node = Node::new("api1.com").unwrap(); + snapshot.existing_nodes.insert(node.clone()); + snapshot.healthy_nodes.insert(node.clone()); + let unhealthy = HealthCheckStatus { latency: None }; + // Act + let is_updated = snapshot + .update_node(&node, unhealthy) + .expect("node update failed"); + assert!(is_updated); + assert!(!snapshot.has_nodes()); + assert!(snapshot.next().is_none()); + } + + #[test] + fn test_sync_node_scenarios() { + // Arrange + let mut snapshot = RoundRobinRoutingSnapshot::new(); + let node_1 = Node::new("api1.com").unwrap(); + // Sync with node_1 + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap(); + assert!(nodes_changed); + assert!(snapshot.healthy_nodes.is_empty()); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_1.clone()]) + ); + // Add node_1 to healthy_nodes manually + snapshot.healthy_nodes.insert(node_1.clone()); + // Sync with node_1 again + let nodes_changed = snapshot.sync_nodes(&[node_1.clone()]).unwrap(); + assert!(!nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_1.clone()]) + ); + assert_eq!( + snapshot.healthy_nodes, + HashSet::from_iter(vec![node_1.clone()]) + ); + // Sync with node_2 + let node_2 = Node::new("api2.com").unwrap(); + let nodes_changed = snapshot.sync_nodes(&[node_2.clone()]).unwrap(); + assert!(nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_2.clone()]) + ); + // Make sure node_1 was removed from healthy nodes + assert!(snapshot.healthy_nodes.is_empty()); + // Add node_2 to healthy_nodes manually + snapshot.healthy_nodes.insert(node_2.clone()); + // Sync with [node_2, node_3] + let node_3 = Node::new("api3.com").unwrap(); + let nodes_changed = snapshot + .sync_nodes(&[node_3.clone(), node_2.clone()]) + .unwrap(); + assert!(nodes_changed); + assert_eq!( + snapshot.existing_nodes, + HashSet::from_iter(vec![node_3.clone(), node_2.clone()]) + ); + assert_eq!( + snapshot.healthy_nodes, + HashSet::from_iter(vec![node_2.clone()]) + ); + snapshot.healthy_nodes.insert(node_3.clone()); + // Sync with [] + let nodes_changed = snapshot.sync_nodes(&[]).unwrap(); + assert!(nodes_changed); + assert!(snapshot.existing_nodes.is_empty()); + // Make sure all nodes were removed from the healthy_nodes + assert!(snapshot.healthy_nodes.is_empty()); + // Sync with [] again + let nodes_changed = snapshot.sync_nodes(&[]).unwrap(); + assert!(!nodes_changed); + assert!(snapshot.existing_nodes.is_empty()); + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs new file mode 100644 index 00000000..f4f2cc5f --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/snapshot/routing_snapshot.rs @@ -0,0 +1,10 @@ +use std::fmt::Debug; + +use crate::agent::http_transport::dynamic_routing::{health_check::HealthCheckStatus, node::Node}; + +pub trait RoutingSnapshot: Send + Sync + Clone + Debug { + fn has_nodes(&self) -> bool; + fn next(&self) -> Option; + fn sync_nodes(&mut self, nodes: &[Node]) -> anyhow::Result; + fn update_node(&mut self, node: &Node, health: HealthCheckStatus) -> anyhow::Result; +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs b/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs new file mode 100644 index 00000000..a1d659c9 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/test_utils.rs @@ -0,0 +1,121 @@ +use std::collections::{HashMap, HashSet}; +use std::time::Duration; +use std::{fmt::Debug, hash::Hash, sync::Arc}; + +use arc_swap::ArcSwap; +use async_trait::async_trait; +use url::Url; + +use crate::agent::http_transport::{ + dynamic_routing::{ + health_check::{HealthCheck, HealthCheckStatus}, + node::Node, + nodes_fetch::Fetch, + type_aliases::GlobalShared, + }, + route_provider::RouteProvider, +}; + +pub fn route_n_times(n: usize, f: Arc) -> Vec { + (0..n) + .map(|_| f.route().unwrap().domain().unwrap().to_string()) + .collect() +} + +pub fn assert_routed_domains(actual: Vec, expected: Vec, expected_repetitions: usize) +where + T: AsRef + Eq + Hash + Debug + Ord, +{ + fn build_count_map(items: &[T]) -> HashMap<&T, usize> + where + T: Eq + Hash, + { + items.iter().fold(HashMap::new(), |mut map, item| { + *map.entry(item).or_insert(0) += 1; + map + }) + } + let count_actual = build_count_map(&actual); + let count_expected = build_count_map(&expected); + + let mut keys_actual = count_actual.keys().collect::>(); + keys_actual.sort(); + let mut keys_expected = count_expected.keys().collect::>(); + keys_expected.sort(); + // Assert all routed domains are present. + assert_eq!(keys_actual, keys_expected); + + // Assert the expected repetition count of each routed domain. + let actual_repetitions = count_actual.values().collect::>(); + assert!(actual_repetitions + .iter() + .all(|&x| x == &expected_repetitions)); +} + +#[derive(Debug)] +pub struct NodesFetcherMock { + // A mocked set of nodes existing in the topology. + pub nodes: GlobalShared>, +} + +#[async_trait] +impl Fetch for NodesFetcherMock { + async fn fetch(&self, _url: Url) -> anyhow::Result> { + let nodes = (*self.nodes.load_full()).clone(); + Ok(nodes) + } +} + +impl Default for NodesFetcherMock { + fn default() -> Self { + Self::new() + } +} + +impl NodesFetcherMock { + pub fn new() -> Self { + Self { + nodes: Arc::new(ArcSwap::from_pointee(vec![])), + } + } + + pub fn overwrite_nodes(&self, nodes: Vec) { + self.nodes.store(Arc::new(nodes)); + } +} + +#[derive(Debug)] +pub struct NodeHealthCheckerMock { + pub healthy_nodes: GlobalShared>, +} + +impl Default for NodeHealthCheckerMock { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl HealthCheck for NodeHealthCheckerMock { + async fn check(&self, node: &Node) -> anyhow::Result { + let nodes = self.healthy_nodes.load_full(); + let latency = match nodes.contains(node) { + true => Some(Duration::from_secs(1)), + false => None, + }; + Ok(HealthCheckStatus { latency }) + } +} + +impl NodeHealthCheckerMock { + pub fn new() -> Self { + Self { + healthy_nodes: Arc::new(ArcSwap::from_pointee(HashSet::new())), + } + } + + pub fn overwrite_healthy_nodes(&self, healthy_nodes: Vec) { + self.healthy_nodes + .store(Arc::new(HashSet::from_iter(healthy_nodes.into_iter()))); + } +} diff --git a/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs b/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs new file mode 100644 index 00000000..92f922d4 --- /dev/null +++ b/ic-agent/src/agent/http_transport/dynamic_routing/type_aliases.rs @@ -0,0 +1,12 @@ +use std::sync::Arc; + +use arc_swap::ArcSwap; +use tokio::sync::{mpsc, watch}; + +pub type SenderWatch = watch::Sender>; +pub type ReceiverWatch = watch::Receiver>; + +pub type SenderMpsc = mpsc::Sender; +pub type ReceiverMpsc = mpsc::Receiver; + +pub type GlobalShared = Arc>; diff --git a/ic-agent/src/agent/http_transport/mod.rs b/ic-agent/src/agent/http_transport/mod.rs index 8f2220d6..f938a90d 100644 --- a/ic-agent/src/agent/http_transport/mod.rs +++ b/ic-agent/src/agent/http_transport/mod.rs @@ -30,4 +30,5 @@ const ICP0_SUB_DOMAIN: &str = ".icp0.io"; const ICP_API_SUB_DOMAIN: &str = ".icp-api.io"; #[allow(dead_code)] const LOCALHOST_SUB_DOMAIN: &str = ".localhost"; +pub mod dynamic_routing; pub mod route_provider;