diff --git a/ic-agent/src/agent/agent_config.rs b/ic-agent/src/agent/agent_config.rs index 54bd51dfd..a946a284a 100644 --- a/ic-agent/src/agent/agent_config.rs +++ b/ic-agent/src/agent/agent_config.rs @@ -5,9 +5,10 @@ use crate::{ use reqwest::Client; use std::{sync::Arc, time::Duration}; -use super::route_provider::RouteProvider; +use super::{route_provider::RouteProvider, IC_ROOT_KEY}; /// A configuration for an agent. +#[derive(Clone)] #[non_exhaustive] pub struct AgentConfig { /// See [`with_nonce_factory`](super::AgentBuilder::with_nonce_factory). @@ -31,6 +32,8 @@ pub struct AgentConfig { /// See [`with_call_v3_endpoint`](super::AgentBuilder::with_call_v3_endpoint). #[cfg(feature = "experimental_sync_call")] pub use_call_v3_endpoint: bool, + /// See [`with_preset_root_key`](super::AgentBuilder::with_preset_root_key). + pub root_key: Option>, } impl Default for AgentConfig { @@ -47,6 +50,7 @@ impl Default for AgentConfig { max_tcp_error_retries: 0, #[cfg(feature = "experimental_sync_call")] use_call_v3_endpoint: false, + root_key: None, } } } diff --git a/ic-agent/src/agent/builder.rs b/ic-agent/src/agent/builder.rs index 429e0a152..38f9fdba0 100644 --- a/ic-agent/src/agent/builder.rs +++ b/ic-agent/src/agent/builder.rs @@ -1,13 +1,22 @@ use url::Url; use crate::{ - agent::{agent_config::AgentConfig, Agent}, + agent::{ + agent_config::AgentConfig, + route_provider::{ + dynamic_routing::{ + dynamic_route_provider::{DynamicRouteProviderBuilder, IC0_SEED_DOMAIN}, + node::Node, + snapshot::latency_based_routing::LatencyRoutingSnapshot, + }, + RouteProvider, + }, + Agent, + }, AgentError, Identity, NonceFactory, NonceGenerator, }; use std::sync::Arc; -use super::route_provider::RouteProvider; - /// A builder for an [`Agent`]. #[derive(Default)] pub struct AgentBuilder { @@ -20,19 +29,8 @@ impl AgentBuilder { Agent::new(self.config) } - #[cfg(all(feature = "reqwest", not(target_family = "wasm")))] /// Set the dynamic transport layer for the [`Agent`], performing continuos discovery of the API boundary nodes and routing traffic via them based on the latencies. pub async fn with_discovery_transport(self, client: reqwest::Client) -> Self { - use crate::agent::http_transport::{ - dynamic_routing::{ - dynamic_route_provider::{DynamicRouteProviderBuilder, IC0_SEED_DOMAIN}, - node::Node, - snapshot::latency_based_routing::LatencyRoutingSnapshot, - }, - route_provider::RouteProvider, - ReqwestTransport, - }; - // TODO: This is a temporary solution to get the seed node. let seed = Node::new(IC0_SEED_DOMAIN).unwrap(); @@ -46,10 +44,8 @@ impl AgentBuilder { let route_provider = Arc::new(route_provider) as Arc; - let transport = ReqwestTransport::create_with_client_route(route_provider, client) - .expect("failed to create transport"); - - self.with_transport(transport) + self.with_http_client(client) + .with_arc_route_provider(route_provider) } /// Set the URL of the [Agent]. diff --git a/ic-agent/src/agent/mod.rs b/ic-agent/src/agent/mod.rs index a739b9dba..7d3b1cf1d 100644 --- a/ic-agent/src/agent/mod.rs +++ b/ic-agent/src/agent/mod.rs @@ -30,6 +30,7 @@ use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns}; use reqwest::{Body, Client, Request}; use route_provider::RouteProvider; use time::OffsetDateTime; +use url::Url; #[cfg(test)] mod agent_test; @@ -144,6 +145,7 @@ type AgentFuture<'a, V> = Pin> + ' /// This agent does not understand Candid, and only acts on byte buffers. #[derive(Clone)] pub struct Agent { + // If adding any BN-specific fields, exclude them from clone_with_url. nonce_factory: Arc, identity: Arc, ingress_expiry: Duration, @@ -176,11 +178,13 @@ impl Agent { /// Create an instance of an [`Agent`]. pub fn new(config: agent_config::AgentConfig) -> Result { - Ok(Agent { + let agent = Agent { nonce_factory: config.nonce_factory, identity: config.identity, ingress_expiry: config.ingress_expiry.unwrap_or(DEFAULT_INGRESS_EXPIRY), - root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())), + root_key: Arc::new(RwLock::new( + config.root_key.unwrap_or_else(|| IC_ROOT_KEY.to_vec()), + )), client: config.client.unwrap_or_else(|| { #[cfg(not(target_family = "wasm"))] { @@ -213,7 +217,9 @@ impl Agent { false } }, - }) + }; + agent.route_provider.notify_start(agent.clone()); + Ok(agent) } /// Set the identity provider for signing messages. @@ -1244,6 +1250,12 @@ impl Agent { Ok((status, body)) } } + + fn clone_with_url(&self, url: Url) -> Self { + let mut clone = self.clone(); + clone.route_provider = Arc::new(url); + clone + } } const DEFAULT_INGRESS_EXPIRY: Duration = Duration::from_secs(240); diff --git a/ic-agent/src/agent/route_provider.rs b/ic-agent/src/agent/route_provider.rs index d958faaaa..013b19d51 100644 --- a/ic-agent/src/agent/route_provider.rs +++ b/ic-agent/src/agent/route_provider.rs @@ -5,7 +5,7 @@ use std::{ }; use url::Url; -use crate::agent::AgentError; +use crate::{agent::AgentError, Agent}; pub mod dynamic_routing; @@ -33,6 +33,8 @@ pub trait RouteProvider: std::fmt::Debug + Send + Sync { /// appearing first. The returned vector can contain fewer than `n` URLs if /// fewer are available. fn n_ordered_routes(&self, n: usize) -> Result, AgentError>; + + fn notify_start(&self, agent: Agent) {} } /// A simple implementation of the [`RouteProvider`] which produces an even distribution of the urls from the input ones. diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs b/ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs index ec6022a5e..721a23cc4 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/dynamic_route_provider.rs @@ -17,16 +17,13 @@ use url::Url; use crate::{ agent::route_provider::{ dynamic_routing::{ - health_check::{HealthCheck, HealthChecker, HealthManagerActor}, - messages::FetchedNodes, - node::Node, - nodes_fetch::{Fetch, NodesFetchActor, NodesFetcher}, - snapshot::routing_snapshot::RoutingSnapshot, + health_check::health_check_manager_actor, messages::FetchedNodes, node::Node, + nodes_fetch::nodes_fetch_actor, snapshot::routing_snapshot::RoutingSnapshot, type_aliases::AtomicSwap, }, RouteProvider, }, - AgentError, + Agent, AgentError, }; /// @@ -48,16 +45,14 @@ const DYNAMIC_ROUTE_PROVIDER: &str = "DynamicRouteProvider"; /// It also spawns the `HealthManagerActor`, which orchestrates the health check tasks for each node and updates routing snapshot. #[derive(Debug)] pub struct DynamicRouteProvider { - /// Fetcher for fetching the latest nodes topology. - fetcher: Arc, /// Periodicity of fetching the latest nodes topology. fetch_period: Duration, /// Interval for retrying fetching the nodes in case of error. fetch_retry_interval: Duration, - /// Health checker for checking the health of the nodes. - checker: Arc, /// Periodicity of checking the health of the nodes. check_period: Duration, + /// Timeout for checking the health of the nodes. + check_timeout: Duration, /// Snapshot of the routing nodes. routing_snapshot: AtomicSwap, /// Initial seed nodes, which are used for the initial fetching of the nodes. @@ -82,11 +77,10 @@ pub enum DynamicRouteProviderError { /// A builder for the `DynamicRouteProvider`. pub struct DynamicRouteProviderBuilder { - fetcher: Arc, fetch_period: Duration, fetch_retry_interval: Duration, - checker: Arc, check_period: Duration, + check_timeout: Duration, routing_snapshot: AtomicSwap, seeds: Vec, } @@ -94,38 +88,25 @@ pub struct DynamicRouteProviderBuilder { impl DynamicRouteProviderBuilder { /// Creates a new instance of the builder. pub fn new(snapshot: S, seeds: Vec, http_client: Client) -> Self { - let fetcher = Arc::new(NodesFetcher::new( - http_client.clone(), - Principal::from_text(MAINNET_ROOT_SUBNET_ID).unwrap(), - None, - )); - let checker = Arc::new(HealthChecker::new(http_client, HEALTH_CHECK_TIMEOUT)); Self { - fetcher, fetch_period: FETCH_PERIOD, fetch_retry_interval: FETCH_RETRY_INTERVAL, - checker, check_period: HEALTH_CHECK_PERIOD, + check_timeout: HEALTH_CHECK_TIMEOUT, seeds, routing_snapshot: Arc::new(ArcSwap::from_pointee(snapshot)), } } - /// Sets the fetcher of the nodes in the topology. - pub fn with_fetcher(mut self, fetcher: Arc) -> Self { - self.fetcher = fetcher; - self - } - /// Sets the fetching periodicity. pub fn with_fetch_period(mut self, period: Duration) -> Self { self.fetch_period = period; self } - /// Sets the node health checker. - pub fn with_checker(mut self, checker: Arc) -> Self { - self.checker = checker; + /// Sets the timeout for node health checking. + pub fn with_check_timeout(mut self, timeout: Duration) -> Self { + self.check_timeout = timeout; self } @@ -140,20 +121,15 @@ impl DynamicRouteProviderBuilder { where S: RoutingSnapshot + 'static, { - let route_provider = DynamicRouteProvider { - fetcher: self.fetcher, + DynamicRouteProvider { fetch_period: self.fetch_period, fetch_retry_interval: self.fetch_retry_interval, - checker: self.checker, check_period: self.check_period, + check_timeout: self.check_timeout, routing_snapshot: self.routing_snapshot, seeds: self.seeds, stop: StopSource::new(), - }; - - route_provider.run().await; - - route_provider + } } } @@ -180,12 +156,7 @@ where let urls = nodes.iter().map(|n| n.to_routing_url()).collect(); Ok(urls) } -} -impl DynamicRouteProvider -where - S: RoutingSnapshot + 'static, -{ /// Starts two background tasks: /// - Task1: NodesFetchActor /// - Periodically fetches existing API nodes (gets latest nodes topology) and sends discovered nodes to HealthManagerActor. @@ -193,7 +164,7 @@ where /// - Listens to the fetched nodes messages from the NodesFetchActor. /// - Starts/stops health check tasks (HealthCheckActors) based on the newly added/removed nodes. /// - These spawned health check tasks periodically update the snapshot with the latest node health info. - pub async fn run(&self) { + fn notify_start(&self, agent: Agent) { info!("{DYNAMIC_ROUTE_PROVIDER}: started ..."); // Communication channel between NodesFetchActor and HealthManagerActor. let (fetch_sender, fetch_receiver) = async_watch::channel(None); @@ -202,47 +173,54 @@ where let (init_sender, init_receiver) = async_channel::bounded(1); // Start the receiving part first. - let health_manager_actor = HealthManagerActor::new( - Arc::clone(&self.checker), + crate::util::spawn(health_check_manager_actor( + agent.client.clone(), self.check_period, + self.check_timeout, Arc::clone(&self.routing_snapshot), fetch_receiver, init_sender, self.stop.token(), - ); - crate::util::spawn(async move { health_manager_actor.run().await }); - - // Dispatch all seed nodes for initial health checks - if let Err(err) = fetch_sender.send(Some(FetchedNodes { - nodes: self.seeds.clone(), - })) { - error!("{DYNAMIC_ROUTE_PROVIDER}: failed to send results to HealthManager: {err:?}"); - } + )); + let seeds = self.seeds.clone(); + let routing_snapshot = self.routing_snapshot.clone(); + let fetch_period = self.fetch_period; + let fetch_retry_interval = self.fetch_retry_interval; + let stop_token = self.stop.token(); + + crate::util::spawn(async move { + // Dispatch all seed nodes for initial health checks + if let Err(err) = fetch_sender.send(Some(FetchedNodes { nodes: seeds })) { + error!( + "{DYNAMIC_ROUTE_PROVIDER}: failed to send results to HealthManager: {err:?}" + ); + } - // Try await for healthy seeds. - let start = Instant::now(); - select! { - _ = crate::util::sleep(TIMEOUT_AWAIT_HEALTHY_SEED).fuse() => warn!( - "{DYNAMIC_ROUTE_PROVIDER}: no healthy seeds found within {:?}", - start.elapsed() - ), - _ = init_receiver.recv().fuse() => info!( - "{DYNAMIC_ROUTE_PROVIDER}: found healthy seeds within {:?}", - start.elapsed() + // Try await for healthy seeds. + let start = Instant::now(); + select! { + _ = crate::util::sleep(TIMEOUT_AWAIT_HEALTHY_SEED).fuse() => warn!( + "{DYNAMIC_ROUTE_PROVIDER}: no healthy seeds found within {:?}", + start.elapsed() + ), + _ = init_receiver.recv().fuse() => info!( + "{DYNAMIC_ROUTE_PROVIDER}: found healthy seeds within {:?}", + start.elapsed() + ) + } + // We can close the channel now. + init_receiver.close(); + nodes_fetch_actor( + agent, + Principal::from_text(MAINNET_ROOT_SUBNET_ID).unwrap(), + fetch_period, + fetch_retry_interval, + fetch_sender, + routing_snapshot, + stop_token, ) - } - // We can close the channel now. - init_receiver.close(); - - let fetch_actor = NodesFetchActor::new( - Arc::clone(&self.fetcher), - self.fetch_period, - self.fetch_retry_interval, - fetch_sender, - Arc::clone(&self.routing_snapshot), - self.stop.token(), - ); - crate::util::spawn(async move { fetch_actor.run().await }); + .await; + }); info!( "{DYNAMIC_ROUTE_PROVIDER}: NodesFetchActor and HealthManagerActor started successfully" ); @@ -252,32 +230,42 @@ where #[cfg(test)] mod tests { use candid::Principal; + use ic_certification::{empty, fork, label, labeled, leaf, Certificate, HashTree}; + use k256::ecdsa::{ + signature::{Keypair, Signer}, + Signature, SigningKey, + }; + use mockito::{Mock, Server, ServerOpts}; + use rand::thread_rng; use reqwest::Client; use std::{ - sync::{Arc, Once}, + collections::HashMap, + sync::{Arc, Once, OnceLock}, time::{Duration, Instant}, }; use tracing::Level; use tracing_subscriber::FmtSubscriber; - use crate::agent::{ - route_provider::{ - dynamic_routing::{ - dynamic_route_provider::{ - DynamicRouteProviderBuilder, IC0_SEED_DOMAIN, MAINNET_ROOT_SUBNET_ID, - }, - node::Node, - snapshot::{ - latency_based_routing::LatencyRoutingSnapshot, - round_robin_routing::RoundRobinRoutingSnapshot, - }, - test_utils::{ - assert_routed_domains, route_n_times, NodeHealthCheckerMock, NodesFetcherMock, + use crate::{ + agent::{ + route_provider::{ + dynamic_routing::{ + dynamic_route_provider::{ + DynamicRouteProviderBuilder, IC0_SEED_DOMAIN, MAINNET_ROOT_SUBNET_ID, + }, + node::Node, + snapshot::{ + latency_based_routing::LatencyRoutingSnapshot, + round_robin_routing::RoundRobinRoutingSnapshot, + }, + test_utils::{assert_routed_domains, route_n_times}, }, + RouteProvider, }, - RouteProvider, + Agent, AgentError, }, - Agent, AgentError, + identity::Secp256k1Identity, + Identity, }; static TRACING_INIT: Once = Once::new(); @@ -288,6 +276,74 @@ mod tests { }); } + static MOCK_SERVER: OnceLock = OnceLock::new(); + + pub fn mock_topology(nodes: Vec<(Node, bool)>, root_domain: &str) -> (Vec, Agent) { + let server = MOCK_SERVER.get_or_init(|| { + Server::new_with_opts(ServerOpts { + port: 80, + ..<_>::default() + }) + }); + let mut node_tree = empty(); + let mut mocks = vec![]; + for (node, healthy) in &nodes { + let nk = SigningKey::random(&mut thread_rng()); + let id = Secp256k1Identity::from_private_key(nk.into()); + mocks.push( + server + .mock("GET", "/health") + .match_header("host", &*node.domain()) + .with_status(if *healthy { 204 } else { 418 }) + .create(), + ); + node_tree = fork( + node_tree, + label( + id.sender().unwrap().to_text(), + fork( + fork( + label("domain", leaf(node.domain())), + label("ipv4_address", leaf("127.0.0.1")), + ), + label("ipv6_address", leaf("::1")), + ), + ), + ); + } + let sk = SigningKey::random(&mut thread_rng()); + let final_tree = label("api_boundary_nodes", node_tree); + let signature: Signature = sk.sign(&final_tree.digest()); + let certificate = Certificate { + delegation: None, + tree: final_tree, + signature: signature.to_bytes().to_vec(), + }; + mocks.push( + server + .mock( + "POST", + &*format!("/api/v2/subnet/{}/read_state", MAINNET_ROOT_SUBNET_ID), + ) + .match_header("host", root_domain) + .with_body(serde_cbor::to_vec(&certificate).unwrap()) + .create(), + ); + mocks.push( + server + .mock("GET", "/health") + .match_header("host", root_domain) + .with_status(204) + .create(), + ); + let agent = Agent::builder() + .with_url(&format!("http://{root_domain}")) + .with_preset_root_key(sk.verifying_key().to_sec1_bytes().into_vec()) + .build() + .unwrap(); + (mocks, agent) + } + async fn assert_no_routing_via_domains( route_provider: Arc, excluded_domains: Vec<&str>, @@ -365,24 +421,18 @@ mod tests { async fn test_routing_with_topology_and_node_health_updates() { // Setup. setup_tracing(); - let node_1 = Node::new(IC0_SEED_DOMAIN).unwrap(); + let node_1 = Node::new("n1.routing_with_topology.localhost").unwrap(); // Set nodes fetching params: topology, fetching periodicity. - let fetcher = Arc::new(NodesFetcherMock::new()); - fetcher.overwrite_nodes(vec![node_1.clone()]); + // A single healthy node exists in the topology. This node happens to be the seed node. + let (_mocks, agent) = mock_topology(vec![], "routing_with_topology_1.localhost"); let fetch_interval = Duration::from_secs(2); // Set health checking params: healthy nodes, checking periodicity. - let checker = Arc::new(NodeHealthCheckerMock::new()); let check_interval = Duration::from_secs(1); - // A single healthy node exists in the topology. This node happens to be the seed node. - fetcher.overwrite_nodes(vec![node_1.clone()]); - checker.overwrite_healthy_nodes(vec![node_1.clone()]); // Configure RouteProvider let snapshot = RoundRobinRoutingSnapshot::new(); let client = Client::builder().build().unwrap(); let route_provider = DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client) - .with_fetcher(fetcher.clone()) - .with_checker(checker.clone()) .with_fetch_period(fetch_interval) .with_check_period(check_interval) .build() diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs b/ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs index 512fda8a6..0ee46a49e 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/health_check.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use futures_util::FutureExt; use http::{Method, StatusCode}; use reqwest::{Client, Request}; @@ -16,18 +15,11 @@ use crate::agent::route_provider::dynamic_routing::{ messages::{FetchedNodes, NodeHealthState}, node::Node, snapshot::routing_snapshot::RoutingSnapshot, - type_aliases::{AtomicSwap, ReceiverMpsc, ReceiverWatch, SenderMpsc}, + type_aliases::{AtomicSwap, ReceiverWatch, SenderMpsc}, }; const CHANNEL_BUFFER: usize = 128; -/// A trait representing a health check of the node. -#[async_trait] -pub trait HealthCheck: Send + Sync + Debug { - /// Checks the health of the node. - async fn check(&self, node: &Node) -> Result; -} - /// A struct representing the health check status of the node. #[derive(Clone, PartialEq, Debug, Default)] pub struct HealthCheckStatus { @@ -51,244 +43,143 @@ impl HealthCheckStatus { } } -/// A struct implementing the `HealthCheck` for the nodes. -#[derive(Debug)] -pub struct HealthChecker { - http_client: Client, - timeout: Duration, -} - -impl HealthChecker { - /// Creates a new `HealthChecker` instance. - pub fn new(http_client: Client, timeout: Duration) -> Self { - Self { - http_client, - timeout, - } - } -} - const HEALTH_CHECKER: &str = "HealthChecker"; -#[async_trait] -impl HealthCheck for HealthChecker { - async fn check(&self, node: &Node) -> Result { - // API boundary node exposes /health endpoint and should respond with 204 (No Content) if it's healthy. - let url = Url::parse(&format!("https://{}/health", node.domain())).unwrap(); - - let mut request = Request::new(Method::GET, url.clone()); - *request.timeout_mut() = Some(self.timeout); - - let start = Instant::now(); - let response = self.http_client.execute(request).await.map_err(|err| { - DynamicRouteProviderError::HealthCheckError(format!( - "Failed to execute GET request to {url}: {err}" - )) - })?; - let latency = start.elapsed(); - - if response.status() != StatusCode::NO_CONTENT { - let err_msg = format!( - "{HEALTH_CHECKER}: Unexpected http status code {} for url={url} received", - response.status() - ); - error!(err_msg); - return Err(DynamicRouteProviderError::HealthCheckError(err_msg)); - } - - Ok(HealthCheckStatus::new(Some(latency))) +pub(crate) async fn health_check( + client: &Client, + check_timeout: Duration, + node: &Node, +) -> Result { + // API boundary node exposes /health endpoint and should respond with 204 (No Content) if it's healthy. + let url = Url::parse(&format!("https://{}/health", node.domain())).unwrap(); + + let mut request = Request::new(Method::GET, url.clone()); + *request.timeout_mut() = Some(check_timeout); + + let start = Instant::now(); + let response = client.execute(request).await.map_err(|err| { + DynamicRouteProviderError::HealthCheckError(format!( + "Failed to execute GET request to {url}: {err}" + )) + })?; + let latency = start.elapsed(); + + if response.status() != StatusCode::NO_CONTENT { + let err_msg = format!( + "{HEALTH_CHECKER}: Unexpected http status code {} for url={url} received", + response.status() + ); + error!(err_msg); + return Err(DynamicRouteProviderError::HealthCheckError(err_msg)); } + + Ok(HealthCheckStatus::new(Some(latency))) } const HEALTH_CHECK_ACTOR: &str = "HealthCheckActor"; -/// A struct performing the health check of the node and sending the health status to the listener. -struct HealthCheckActor { - /// The health checker. - checker: Arc, - /// The period of the health check. +/// The name of the health manager actor. +pub(super) const HEALTH_MANAGER_ACTOR: &str = "HealthManagerActor"; + +pub(crate) async fn health_check_actor( + client: Client, period: Duration, - /// The node to check. + timeout: Duration, node: Node, - /// The sender channel (listener) to send the health status. sender_channel: SenderMpsc, - /// The cancellation token of the actor. token: StopToken, -} - -impl HealthCheckActor { - fn new( - checker: Arc, - period: Duration, - node: Node, - sender_channel: SenderMpsc, - token: StopToken, - ) -> Self { - Self { - checker, - period, - node, - sender_channel, - token, - } - } - - /// Runs the actor. - async fn run(self) { - loop { - futures_util::select! { - _ = crate::util::sleep(self.period).fuse() => { - let health = self.checker.check(&self.node).await.unwrap_or_default(); - let message = NodeHealthState { - node: self.node.clone(), - health, - }; - // Inform the listener about node's health. It can only fail if the listener was closed/dropped. - self.sender_channel.send(message).await.expect("Failed to send node's health state"); - } - _ = self.token.clone().fuse() => { - info!("{HEALTH_CHECK_ACTOR}: was gracefully cancelled for node {:?}", self.node); - break; - } +) { + loop { + futures_util::select! { + _ = crate::util::sleep(period).fuse() => { + let health = health_check(&client, timeout, &node).await.unwrap_or_default(); + let message = NodeHealthState { + node: node.clone(), + health, + }; + // Inform the listener about node's health. It can only fail if the listener was closed/dropped. + sender_channel.send(message).await.expect("Failed to send node's health state"); + } + _ = token.clone().fuse() => { + info!("{HEALTH_CHECK_ACTOR}: was gracefully cancelled for node {node:?}"); + break; } } } } -/// The name of the health manager actor. -pub(super) const HEALTH_MANAGER_ACTOR: &str = "HealthManagerActor"; - -/// A struct managing the health checks of the nodes. -/// It receives the fetched nodes from the `NodesFetchActor` and starts the health checks for them. -/// It also receives the health status of the nodes from the `HealthCheckActor/s` and updates the routing snapshot. -pub(super) struct HealthManagerActor { - /// The health checker. - checker: Arc, - /// The period of the health check. +pub(crate) async fn health_check_manager_actor( + client: Client, period: Duration, - /// The routing snapshot, storing the nodes. + timeout: Duration, routing_snapshot: AtomicSwap, - /// The receiver channel to listen to the fetched nodes messages. - fetch_receiver: ReceiverWatch, - /// The sender channel to send the health status of the nodes back to HealthManagerActor. - check_sender: SenderMpsc, - /// The receiver channel to receive the health status of the nodes from the `HealthCheckActor/s`. - check_receiver: ReceiverMpsc, - /// The sender channel to send the initialization status to DynamicRouteProvider (used only once in the init phase). + mut fetch_receiver: ReceiverWatch, init_sender: SenderMpsc, - /// The cancellation token of the actor. token: StopToken, - /// The cancellation source for all the health checks. - nodes_stop: StopSource, - /// The flag indicating if this actor is initialized with healthy nodes. - is_initialized: bool, -} - -impl HealthManagerActor -where - S: RoutingSnapshot, -{ - /// Creates a new `HealthManagerActor` instance. - pub fn new( - checker: Arc, - period: Duration, - routing_snapshot: AtomicSwap, - fetch_receiver: ReceiverWatch, - init_sender: SenderMpsc, - token: StopToken, - ) -> Self { - let (check_sender, check_receiver) = async_channel::bounded(CHANNEL_BUFFER); - - Self { - checker, - period, - routing_snapshot, - fetch_receiver, - check_sender, - check_receiver, - init_sender, - token, - nodes_stop: StopSource::new(), - is_initialized: false, - } - } - - /// Runs the actor. - pub async fn run(mut self) { - loop { - futures_util::select_biased! { - _ = self.token.clone().fuse() => { - self.check_receiver.close(); - trace!("{HEALTH_MANAGER_ACTOR}: was gracefully cancelled, all nodes health checks stopped"); - break; - } - // Process a new array of fetched nodes from NodesFetchActor, if it appeared in the channel. - result = self.fetch_receiver.recv().fuse() => { - match result { - Err(err) => { - error!("{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {err:?}"); - continue; // will hit the stoptoken next +) { + let (check_sender, check_receiver) = async_channel::bounded(CHANNEL_BUFFER); + let mut is_initialized = false; + let mut nodes_stop: StopSource; + loop { + futures_util::select_biased! { + _ = token.clone().fuse() => { + check_receiver.close(); + trace!("{HEALTH_MANAGER_ACTOR}: was gracefully cancelled, all nodes health checks stopped"); + break; + } + // Process a new array of fetched nodes from NodesFetchActor, if it appeared in the channel. + result = fetch_receiver.recv().fuse() => { + match result { + Err(err) => { + error!("{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {err:?}"); + continue; // will hit the stoptoken next + } + Ok(Some(FetchedNodes { nodes })) => { + if nodes.is_empty() { + // This is a bug in the IC registry. There should be at least one API Boundary Node in the registry. + // Updating nodes snapshot with an empty array, would lead to an irrecoverable error, as new nodes couldn't be fetched. + // We avoid such updates and just wait for a non-empty list. + error!("{HEALTH_MANAGER_ACTOR}: list of fetched nodes is empty"); + return; } - Ok(Some(FetchedNodes { nodes })) => { - self.handle_fetch_update(nodes).await; + debug!("{HEALTH_MANAGER_ACTOR}: fetched nodes received {:?}", nodes); + let current_snapshot = routing_snapshot.load_full(); + let mut new_snapshot = (*current_snapshot).clone(); + // If the snapshot has changed, store it and restart all node's health checks. + if new_snapshot.sync_nodes(&nodes) { + routing_snapshot.store(Arc::new(new_snapshot)); + warn!("{HEALTH_MANAGER_ACTOR}: stopping all running health checks"); + nodes_stop = StopSource::new(); + for node in &nodes { + debug!("{HEALTH_MANAGER_ACTOR}: starting health check for node {node:?}"); + crate::util::spawn(health_check_actor( + client.clone(), + period, + timeout, + node.clone(), + check_sender.clone(), + nodes_stop.token(), + )); + } } - Ok(None) => continue, } + Ok(None) => continue, } - // Receive health check messages from all running HealthCheckActor/s. - msg = self.check_receiver.recv().fuse() => { - if let Ok(msg) = msg { - self.handle_health_update(msg).await; + } + // Receive health check messages from all running HealthCheckActor/s. + msg = check_receiver.recv().fuse() => { + if let Ok(msg) = msg { + let current_snapshot = routing_snapshot.load_full(); + let mut new_snapshot = (*current_snapshot).clone(); + new_snapshot.update_node(&msg.node, msg.health.clone()); + routing_snapshot.store(Arc::new(new_snapshot)); + if !is_initialized && msg.health.is_healthy() { + is_initialized = true; + // If TIMEOUT_AWAIT_HEALTHY_SEED has been exceeded, the receiver was dropped and send would thus fail. We ignore the failure. + let _ = init_sender.send(true).await; } } } } } - - async fn handle_health_update(&mut self, msg: NodeHealthState) { - let current_snapshot = self.routing_snapshot.load_full(); - let mut new_snapshot = (*current_snapshot).clone(); - new_snapshot.update_node(&msg.node, msg.health.clone()); - self.routing_snapshot.store(Arc::new(new_snapshot)); - if !self.is_initialized && msg.health.is_healthy() { - self.is_initialized = true; - // If TIMEOUT_AWAIT_HEALTHY_SEED has been exceeded, the receiver was dropped and send would thus fail. We ignore the failure. - let _ = self.init_sender.send(true).await; - } - } - - async fn handle_fetch_update(&mut self, nodes: Vec) { - if nodes.is_empty() { - // This is a bug in the IC registry. There should be at least one API Boundary Node in the registry. - // Updating nodes snapshot with an empty array, would lead to an irrecoverable error, as new nodes couldn't be fetched. - // We avoid such updates and just wait for a non-empty list. - error!("{HEALTH_MANAGER_ACTOR}: list of fetched nodes is empty"); - return; - } - debug!("{HEALTH_MANAGER_ACTOR}: fetched nodes received {:?}", nodes); - let current_snapshot = self.routing_snapshot.load_full(); - let mut new_snapshot = (*current_snapshot).clone(); - // If the snapshot has changed, store it and restart all node's health checks. - if new_snapshot.sync_nodes(&nodes) { - self.routing_snapshot.store(Arc::new(new_snapshot)); - warn!("{HEALTH_MANAGER_ACTOR}: stopping all running health checks"); - self.reset_checks(nodes.to_vec()); - } - } - - fn reset_checks(&mut self, nodes: Vec) { - // Create a cancellation source for all started health checks. - self.nodes_stop = StopSource::new(); - for node in nodes { - debug!("{HEALTH_MANAGER_ACTOR}: starting health check for node {node:?}"); - let actor = HealthCheckActor::new( - Arc::clone(&self.checker), - self.period, - node, - self.check_sender.clone(), - self.nodes_stop.token(), - ); - crate::util::spawn(async move { actor.run().await }); - } - } } diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs b/ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs index 67ff9677c..7d2dc5c40 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/nodes_fetch.rs @@ -1,167 +1,91 @@ -use async_trait::async_trait; use candid::Principal; use futures_util::FutureExt; -use reqwest::Client; -use std::{fmt::Debug, sync::Arc, time::Duration}; +use std::time::Duration; use stop_token::StopToken; use tracing::{error, warn}; use url::Url; -use crate::agent::{ - route_provider::dynamic_routing::{ - dynamic_route_provider::DynamicRouteProviderError, - health_check::HEALTH_MANAGER_ACTOR, - messages::FetchedNodes, - node::Node, - snapshot::routing_snapshot::RoutingSnapshot, - type_aliases::{AtomicSwap, SenderWatch}, +use crate::{ + agent::{ + route_provider::dynamic_routing::{ + health_check::HEALTH_MANAGER_ACTOR, + messages::FetchedNodes, + node::Node, + snapshot::routing_snapshot::RoutingSnapshot, + type_aliases::{AtomicSwap, SenderWatch}, + }, + Agent, }, - Agent, + AgentError, }; const NODES_FETCH_ACTOR: &str = "NodesFetchActor"; -/// Fetcher of nodes in the topology. -#[async_trait] -pub trait Fetch: Sync + Send + Debug { - /// Fetches the nodes from the topology. - async fn fetch(&self, url: Url) -> Result, DynamicRouteProviderError>; -} - -/// A struct representing the fetcher of the nodes from the topology. -#[derive(Debug)] -pub struct NodesFetcher { - http_client: Client, +async fn fetch_subnet_nodes( + agent: &Agent, subnet_id: Principal, - // By default, the nodes fetcher is configured to talk to the mainnet of Internet Computer, and verifies responses using a hard-coded public key. - // However, for testnets one can set up a custom public key. - root_key: Option>, + url: Url, +) -> Result, AgentError> { + let agent = agent.clone_with_url(url); + let api_bns = agent + .fetch_api_boundary_nodes_by_subnet_id(subnet_id) + .await?; + // If some API BNs have invalid domain names, they are discarded. + let nodes = api_bns + .iter() + .filter_map(|api_node| api_node.try_into().ok()) + .collect(); + Ok(nodes) } -impl NodesFetcher { - /// Creates a new `NodesFetcher` instance. - pub fn new(http_client: Client, subnet_id: Principal, root_key: Option>) -> Self { - Self { - http_client, - subnet_id, - root_key, - } - } -} - -#[async_trait] -impl Fetch for NodesFetcher { - async fn fetch(&self, url: Url) -> Result, DynamicRouteProviderError> { - let agent = Agent::builder() - .with_http_client(self.http_client.clone()) - .with_url(url) - .build() - .map_err(|err| { - DynamicRouteProviderError::NodesFetchError(format!( - "Failed to build the agent: {err}" - )) - })?; - if let Some(key) = self.root_key.clone() { - agent.set_root_key(key); - } - let api_bns = agent - .fetch_api_boundary_nodes_by_subnet_id(self.subnet_id) - .await - .map_err(|err| { - DynamicRouteProviderError::NodesFetchError(format!( - "Failed to fetch API nodes: {err}" - )) - })?; - // If some API BNs have invalid domain names, they are discarded. - let nodes = api_bns - .iter() - .filter_map(|api_node| api_node.try_into().ok()) - .collect(); - return Ok(nodes); - } -} - -/// A struct representing the actor responsible for fetching existing nodes and communicating it with the listener. -pub(super) struct NodesFetchActor { - /// The fetcher object responsible for fetching the nodes. - fetcher: Arc, - /// Time period between fetches. +pub(crate) async fn nodes_fetch_actor( + agent: Agent, + subnet_id: Principal, period: Duration, - /// The interval to wait before retrying to fetch the nodes in case of failures. - fetch_retry_interval: Duration, - /// Communication channel with the listener. + retry_interval: Duration, fetch_sender: SenderWatch, - /// The snapshot of the routing table. - routing_snapshot: AtomicSwap, - /// The token to cancel/stop the actor. + snapshot: AtomicSwap, token: StopToken, -} - -impl NodesFetchActor -where - S: RoutingSnapshot, -{ - /// Creates a new `NodesFetchActor` instance. - pub fn new( - fetcher: Arc, - period: Duration, - retry_interval: Duration, - fetch_sender: SenderWatch, - snapshot: AtomicSwap, - token: StopToken, - ) -> Self { - Self { - fetcher, - period, - fetch_retry_interval: retry_interval, - fetch_sender, - routing_snapshot: snapshot, - token, - } - } - - /// Runs the actor. - pub async fn run(self) { - loop { - futures_util::select! { - _ = crate::util::sleep(self.period).fuse() => { - // Retry until success: - // - try to get a healthy node from the routing snapshot - // - if snapshot is empty, break the cycle and wait for the next fetch cycle - // - using the healthy node, try to fetch nodes from topology - // - if failure, sleep and retry - // - try send fetched nodes to the listener - // - failure should never happen, but we trace it if it does - loop { - let snapshot = self.routing_snapshot.load(); - if let Some(node) = snapshot.next_node() { - match self.fetcher.fetch((&node).into()).await { - Ok(nodes) => { - let msg = Some(FetchedNodes {nodes}); - match self.fetch_sender.send(msg) { - Ok(()) => break, // message sent successfully, exist the loop - Err(err) => { - error!("{NODES_FETCH_ACTOR}: failed to send results to {HEALTH_MANAGER_ACTOR}: {err:?}"); - } +) { + loop { + futures_util::select! { + _ = crate::util::sleep(period).fuse() => { + // Retry until success: + // - try to get a healthy node from the routing snapshot + // - if snapshot is empty, break the cycle and wait for the next fetch cycle + // - using the healthy node, try to fetch nodes from topology + // - if failure, sleep and retry + // - try send fetched nodes to the listener + // - failure should never happen, but we trace it if it does + loop { + let snapshot = snapshot.load(); + if let Some(node) = snapshot.next_node() { + match fetch_subnet_nodes(&agent, subnet_id, (&node).into()).await { + Ok(nodes) => { + let msg = Some(FetchedNodes {nodes}); + match fetch_sender.send(msg) { + Ok(()) => break, // message sent successfully, exist the loop + Err(err) => { + error!("{NODES_FETCH_ACTOR}: failed to send results to {HEALTH_MANAGER_ACTOR}: {err:?}"); } - }, - Err(err) => { - error!("{NODES_FETCH_ACTOR}: failed to fetch nodes: {err:?}"); } - }; - } else { - // No healthy nodes in the snapshot, break the cycle and wait for the next fetch cycle - error!("{NODES_FETCH_ACTOR}: no nodes in the snapshot"); - break; + }, + Err(err) => { + error!("{NODES_FETCH_ACTOR}: failed to fetch nodes: {err:?}"); + } }; - warn!("Retrying to fetch the nodes in {:?}", self.fetch_retry_interval); - crate::util::sleep(self.fetch_retry_interval).await; - } - } - _ = self.token.clone().fuse() => { - warn!("{NODES_FETCH_ACTOR}: was gracefully cancelled"); - break; - } + } else { + // No healthy nodes in the snapshot, break the cycle and wait for the next fetch cycle + error!("{NODES_FETCH_ACTOR}: no nodes in the snapshot"); + break; + }; + warn!("Retrying to fetch the nodes in {:?}", retry_interval); + crate::util::sleep(retry_interval).await; + } + } + _ = token.clone().fuse() => { + warn!("{NODES_FETCH_ACTOR}: was gracefully cancelled"); + break; } } } diff --git a/ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs b/ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs index fea43cafe..a872393e2 100644 --- a/ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs +++ b/ic-agent/src/agent/route_provider/dynamic_routing/test_utils.rs @@ -1,21 +1,7 @@ -use std::collections::{HashMap, HashSet}; -use std::time::Duration; +use std::collections::HashMap; use std::{fmt::Debug, hash::Hash, sync::Arc}; -use arc_swap::ArcSwap; -use async_trait::async_trait; -use url::Url; - -use crate::agent::route_provider::{ - dynamic_routing::{ - dynamic_route_provider::DynamicRouteProviderError, - health_check::{HealthCheck, HealthCheckStatus}, - node::Node, - nodes_fetch::Fetch, - type_aliases::AtomicSwap, - }, - RouteProvider, -}; +use crate::agent::route_provider::RouteProvider; pub(super) fn route_n_times(n: usize, f: Arc) -> Vec { (0..n) @@ -55,71 +41,3 @@ pub(super) fn assert_routed_domains( .iter() .all(|&x| x == &expected_repetitions)); } - -#[derive(Debug)] -pub(super) struct NodesFetcherMock { - // A set of nodes, existing in the topology. - pub nodes: AtomicSwap>, -} - -#[async_trait] -impl Fetch for NodesFetcherMock { - async fn fetch(&self, _url: Url) -> Result, DynamicRouteProviderError> { - let nodes = (*self.nodes.load_full()).clone(); - Ok(nodes) - } -} - -impl Default for NodesFetcherMock { - fn default() -> Self { - Self::new() - } -} - -impl NodesFetcherMock { - pub fn new() -> Self { - Self { - nodes: Arc::new(ArcSwap::from_pointee(vec![])), - } - } - - pub fn overwrite_nodes(&self, nodes: Vec) { - self.nodes.store(Arc::new(nodes)); - } -} - -#[derive(Debug)] -pub(super) struct NodeHealthCheckerMock { - healthy_nodes: Arc>>, -} - -impl Default for NodeHealthCheckerMock { - fn default() -> Self { - Self::new() - } -} - -#[async_trait] -impl HealthCheck for NodeHealthCheckerMock { - async fn check(&self, node: &Node) -> Result { - let nodes = self.healthy_nodes.load_full(); - let latency = match nodes.contains(node) { - true => Some(Duration::from_secs(1)), - false => None, - }; - Ok(HealthCheckStatus::new(latency)) - } -} - -impl NodeHealthCheckerMock { - pub fn new() -> Self { - Self { - healthy_nodes: Arc::new(ArcSwap::from_pointee(HashSet::new())), - } - } - - pub fn overwrite_healthy_nodes(&self, healthy_nodes: Vec) { - self.healthy_nodes - .store(Arc::new(HashSet::from_iter(healthy_nodes))); - } -}