Skip to content

Commit

Permalink
chore: add DynamicRouteProviderBuilder
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy committed Jul 16, 2024
1 parent 32bc62e commit 8bfe51b
Showing 1 changed file with 124 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use arc_swap::ArcSwap;
use candid::Principal;
use reqwest::Client;
use tokio::{
runtime::Handle,
sync::{mpsc, watch},
time::timeout,
};
Expand Down Expand Up @@ -71,24 +72,19 @@ pub struct DynamicRouteProvider<S> {
token: CancellationToken,
}

impl<S> RouteProvider for DynamicRouteProvider<S>
where
S: RoutingSnapshot + 'static,
{
fn route(&self) -> Result<Url, AgentError> {
let snapshot = self.routing_snapshot.load();
let node = snapshot.next().ok_or_else(|| {
AgentError::RouteProviderError("No healthy API nodes found.".to_string())
})?;
Ok(node.to_routing_url())
}
/// A builder for the `DynamicRouteProvider`.
pub struct DynamicRouteProviderBuilder<S> {
fetcher: Arc<dyn Fetch>,
fetch_period: Duration,
fetch_retry_interval: Duration,
checker: Arc<dyn HealthCheck>,
check_period: Duration,
routing_snapshot: AtomicSwap<S>,
seeds: Vec<Node>,
}

impl<S> DynamicRouteProvider<S>
where
S: RoutingSnapshot + 'static,
{
/// Creates a new instance of `DynamicRouteProvider`.
impl<S> DynamicRouteProviderBuilder<S> {
/// Creates a new instance of the builder.
pub fn new(snapshot: S, seeds: Vec<Node>, http_client: Client) -> Self {
let fetcher = Arc::new(NodesFetcher::new(
http_client.clone(),
Expand All @@ -103,35 +99,75 @@ where
check_period: HEALTH_CHECK_PERIOD,
seeds,
routing_snapshot: Arc::new(ArcSwap::from_pointee(snapshot)),
tracker: TaskTracker::new(),
token: CancellationToken::new(),
}
}

/// Sets the fetcher for fetching the latest nodes topology.
/// Sets the fetcher of the nodes in the topology.
pub fn with_fetcher(mut self, fetcher: Arc<dyn Fetch>) -> Self {
self.fetcher = fetcher;
self
}

/// Sets the periodicity of fetching the latest nodes topology.
/// Sets the fetching periodicity.
pub fn with_fetch_period(mut self, period: Duration) -> Self {
self.fetch_period = period;
self
}

/// Sets the interval for retrying fetching the nodes in case of error.
/// Sets the node health checker.
pub fn with_checker(mut self, checker: Arc<dyn HealthCheck>) -> Self {
self.checker = checker;
self
}

/// Sets the periodicity of checking the health of the nodes.
/// Sets the periodicity of node health checking.
pub fn with_check_period(mut self, period: Duration) -> Self {
self.check_period = period;
self
}

/// Builds an instance of the `DynamicRouteProvider`.
pub async fn build(self) -> DynamicRouteProvider<S>
where
S: RoutingSnapshot + 'static,
{
let route_provider = DynamicRouteProvider {
fetcher: self.fetcher,
fetch_period: self.fetch_period,
fetch_retry_interval: self.fetch_retry_interval,
checker: self.checker,
check_period: self.check_period,
routing_snapshot: self.routing_snapshot,
tracker: TaskTracker::new(),
seeds: self.seeds,
token: CancellationToken::new(),
};

if let Err(err) = route_provider.run().await {
error!("{DYNAMIC_ROUTE_PROVIDER}: started in unhealthy state: {err:?}");
}

route_provider
}
}

impl<S> RouteProvider for DynamicRouteProvider<S>
where
S: RoutingSnapshot + 'static,
{
fn route(&self) -> Result<Url, AgentError> {
let snapshot = self.routing_snapshot.load();
let node = snapshot.next().ok_or_else(|| {
AgentError::RouteProviderError("No healthy API nodes found.".to_string())
})?;
Ok(node.to_routing_url())
}
}

impl<S> DynamicRouteProvider<S>
where
S: RoutingSnapshot + 'static,
{
/// Starts two background tasks:
/// - Task1: NodesFetchActor
/// - Periodically fetches existing API nodes (gets latest nodes topology) and sends discovered nodes to HealthManagerActor.
Expand Down Expand Up @@ -202,16 +238,26 @@ where
);

(found_healthy_seeds).then_some(()).ok_or(anyhow!(
"No healthy seeds found, they may become healthy later ..."
"No healthy seeds found within {TIMEOUT_AWAIT_HEALTHY_SEED:?}, they may become healthy later ..."
))
}
}

/// Kill all running tasks.
pub async fn stop(&self) {
// Gracefully stop the inner spawned tasks running in the background.
impl<S> Drop for DynamicRouteProvider<S> {
fn drop(&mut self) {
self.token.cancel();
self.tracker.close();
self.tracker.wait().await;
warn!("{DYNAMIC_ROUTE_PROVIDER}: gracefully stopped");
let tracker = self.tracker.clone();
// If no runtime is available do nothing.
if let Ok(handle) = Handle::try_current() {
let _ = handle.spawn(async move {
tracker.wait().await;
warn!("{DYNAMIC_ROUTE_PROVIDER}: stopped gracefully");
});
} else {
error!("{DYNAMIC_ROUTE_PROVIDER}: no runtime available, cannot stop the spawned tasks");
}
}
}

Expand All @@ -228,7 +274,7 @@ mod tests {
use crate::{
agent::http_transport::{
dynamic_routing::{
dynamic_route_provider::{DynamicRouteProvider, IC0_SEED_DOMAIN},
dynamic_route_provider::{DynamicRouteProviderBuilder, IC0_SEED_DOMAIN},
node::Node,
snapshot::round_robin_routing::RoundRobinRoutingSnapshot,
test_utils::{
Expand Down Expand Up @@ -266,14 +312,15 @@ mod tests {
// Configure RouteProvider
let snapshot = RoundRobinRoutingSnapshot::new();
let client = Client::builder().build().unwrap();
let route_provider = Arc::new(
DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client)
let route_provider =
DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client)
.with_fetcher(fetcher.clone())
.with_checker(checker.clone())
.with_fetch_period(fetch_interval)
.with_check_period(check_interval),
);
route_provider.run().await.expect("no healthy seeds found");
.with_check_period(check_interval)
.build()
.await;
let route_provider = Arc::new(route_provider);

// This time span is required for the snapshot to be fully updated with the new nodes and their health info.
let snapshot_update_duration = fetch_interval + 2 * check_interval;
Expand Down Expand Up @@ -341,9 +388,6 @@ mod tests {
tokio::time::sleep(snapshot_update_duration).await;
let routed_domains = route_n_times(3, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_2.domain()], 3);

// Teardown.
route_provider.stop().await;
}

#[tokio::test]
Expand All @@ -364,19 +408,18 @@ mod tests {
// Configure RouteProvider
let snapshot = RoundRobinRoutingSnapshot::new();
let client = Client::builder().build().unwrap();
let route_provider = Arc::new(
DynamicRouteProvider::new(snapshot, vec![node_1.clone(), node_2.clone()], client)
.with_fetcher(fetcher.clone())
.with_checker(checker.clone())
.with_fetch_period(fetch_interval)
.with_check_period(check_interval),
);
assert!(route_provider
.run()
.await
.unwrap_err()
.to_string()
.contains("No healthy seeds found, they may become healthy later ..."));
let route_provider = DynamicRouteProviderBuilder::new(
snapshot,
vec![node_1.clone(), node_2.clone()],
client,
)
.with_fetcher(fetcher)
.with_checker(checker.clone())
.with_fetch_period(fetch_interval)
.with_check_period(check_interval)
.build()
.await;
let route_provider = Arc::new(route_provider);

// Test 1: calls to route() return an error, as no healthy seeds exist.
for _ in 0..4 {
Expand All @@ -393,9 +436,6 @@ mod tests {
tokio::time::sleep(3 * check_interval).await;
let routed_domains = route_n_times(6, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_1.domain(), node_2.domain()], 3);

// Teardown.
route_provider.stop().await;
}

#[tokio::test]
Expand All @@ -415,14 +455,15 @@ mod tests {
// Configure RouteProvider
let snapshot = RoundRobinRoutingSnapshot::new();
let client = Client::builder().build().unwrap();
let route_provider = Arc::new(
DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client)
.with_fetcher(fetcher.clone())
let route_provider =
DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client)
.with_fetcher(fetcher)
.with_checker(checker.clone())
.with_fetch_period(fetch_interval)
.with_check_period(check_interval),
);
route_provider.run().await.expect("no healthy seeds found");
.with_check_period(check_interval)
.build()
.await;
let route_provider = Arc::new(route_provider);

// Test 1: multiple route() calls return a single domain=ic0.app, as the seed is healthy.
tokio::time::sleep(2 * check_interval).await;
Expand All @@ -439,9 +480,6 @@ mod tests {
AgentError::RouteProviderError("No healthy API nodes found.".to_string())
);
}

// Teardown.
route_provider.stop().await;
}

#[tokio::test]
Expand All @@ -461,19 +499,14 @@ mod tests {
// Configure RouteProvider
let snapshot = RoundRobinRoutingSnapshot::new();
let client = Client::builder().build().unwrap();
let route_provider = Arc::new(
DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client)
.with_fetcher(fetcher.clone())
.with_checker(checker.clone())
let route_provider =
DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client)
.with_fetcher(fetcher)
.with_checker(checker)
.with_fetch_period(fetch_interval)
.with_check_period(check_interval),
);
assert!(route_provider
.run()
.await
.unwrap_err()
.to_string()
.contains("No healthy seeds found, they may become healthy later ..."));
.with_check_period(check_interval)
.build()
.await;

// Test: calls to route() return an error, as no healthy seeds exist.
for _ in 0..4 {
Expand All @@ -484,9 +517,6 @@ mod tests {
AgentError::RouteProviderError("No healthy API nodes found.".to_string())
);
}

// Teardown.
route_provider.stop().await;
}

#[tokio::test]
Expand All @@ -507,14 +537,18 @@ mod tests {
// Configure RouteProvider
let snapshot = RoundRobinRoutingSnapshot::new();
let client = Client::builder().build().unwrap();
let route_provider = Arc::new(
DynamicRouteProvider::new(snapshot, vec![node_1.clone(), node_2.clone()], client)
.with_fetcher(fetcher.clone())
.with_checker(checker.clone())
.with_fetch_period(fetch_interval)
.with_check_period(check_interval),
);
route_provider.run().await.expect("no healthy seeds found");
let route_provider = DynamicRouteProviderBuilder::new(
snapshot,
vec![node_1.clone(), node_2.clone()],
client,
)
.with_fetcher(fetcher)
.with_checker(checker.clone())
.with_fetch_period(fetch_interval)
.with_check_period(check_interval)
.build()
.await;
let route_provider = Arc::new(route_provider);

// Test 1: calls to route() return only a healthy seed ic0.app.
let routed_domains = route_n_times(3, Arc::clone(&route_provider));
Expand All @@ -525,9 +559,6 @@ mod tests {
tokio::time::sleep(2 * check_interval).await;
let routed_domains = route_n_times(6, Arc::clone(&route_provider));
assert_routed_domains(routed_domains, vec![node_1.domain(), node_2.domain()], 3);

// Teardown.
route_provider.stop().await;
}

#[tokio::test]
Expand All @@ -548,14 +579,15 @@ mod tests {
// Configure RouteProvider
let snapshot = RoundRobinRoutingSnapshot::new();
let client = Client::builder().build().unwrap();
let route_provider = Arc::new(
DynamicRouteProvider::new(snapshot, vec![node_1.clone()], client)
let route_provider =
DynamicRouteProviderBuilder::new(snapshot, vec![node_1.clone()], client)
.with_fetcher(fetcher.clone())
.with_checker(checker.clone())
.with_fetch_period(fetch_interval)
.with_check_period(check_interval),
);
route_provider.run().await.expect("no healthy seeds found");
.with_check_period(check_interval)
.build()
.await;
let route_provider = Arc::new(route_provider);

// This time span is required for the snapshot to be fully updated with the new nodes topology and health info.
let snapshot_update_duration = fetch_interval + 2 * check_interval;
Expand All @@ -579,8 +611,5 @@ mod tests {
vec![node_1.domain(), node_2.domain(), node_3.domain()],
2,
);

// Teardown.
route_provider.stop().await;
}
}

0 comments on commit 8bfe51b

Please sign in to comment.