Skip to content

Commit

Permalink
docs: add
Browse files Browse the repository at this point in the history
  • Loading branch information
nikolay-komarevskiy committed Jun 28, 2024
1 parent 973b377 commit 39567c9
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
node::Node,
nodes_fetch::{Fetch, NodesFetchActor, NodesFetcher},
snapshot::routing_snapshot::RoutingSnapshot,
type_aliases::GlobalShared,
type_aliases::AtomicSwap,
},
route_provider::RouteProvider,
},
Expand All @@ -46,17 +46,28 @@ const HEALTH_CHECK_PERIOD: Duration = Duration::from_secs(1);

const DYNAMIC_ROUTE_PROVIDER: &str = "DynamicRouteProvider";

///
/// A dynamic route provider.
/// It spawns the discovery service (`NodesFetchActor`) for fetching the latest nodes topology.
/// It also spawns the `HealthManagerActor`, which orchestrates the health check tasks for each node and updates routing snapshot.
#[derive(Debug)]
pub struct DynamicRouteProvider<S> {
/// Fetcher for fetching the latest nodes topology.
fetcher: Arc<dyn Fetch>,
/// Periodicity of fetching the latest nodes topology.
fetch_period: Duration,
/// Interval for retrying fetching the nodes in case of error.
fetch_retry_interval: Duration,
/// Health checker for checking the health of the nodes.
checker: Arc<dyn HealthCheck>,
/// Periodicity of checking the health of the nodes.
check_period: Duration,
snapshot: GlobalShared<S>,
/// Snapshot of the routing nodes.
routing_snapshot: AtomicSwap<S>,
/// Task tracker for managing the spawned tasks.
tracker: TaskTracker,
/// Initial seed nodes, which are used for the initial fetching of the nodes.
seeds: Vec<Node>,
/// Cancellation token for stopping the spawned tasks.
token: CancellationToken,
}

Expand All @@ -65,7 +76,7 @@ where
S: RoutingSnapshot + 'static,
{
fn route(&self) -> Result<Url, AgentError> {
let snapshot = self.snapshot.load();
let snapshot = self.routing_snapshot.load();
let node = snapshot.next().ok_or_else(|| {
AgentError::RouteProviderError("No healthy API nodes found.".to_string())
})?;
Expand All @@ -77,7 +88,7 @@ impl<S> DynamicRouteProvider<S>
where
S: RoutingSnapshot + 'static,
{
///
/// Creates a new instance of `DynamicRouteProvider`.
pub fn new(snapshot: S, seeds: Vec<Node>, http_client: Client) -> Self {
let fetcher = Arc::new(NodesFetcher::new(
http_client.clone(),
Expand All @@ -91,31 +102,31 @@ where
checker,
check_period: HEALTH_CHECK_PERIOD,
seeds,
snapshot: Arc::new(ArcSwap::from_pointee(snapshot)),
routing_snapshot: Arc::new(ArcSwap::from_pointee(snapshot)),
tracker: TaskTracker::new(),
token: CancellationToken::new(),
}
}

///
/// Sets the fetcher for fetching the latest nodes topology.
pub fn with_fetcher(mut self, fetcher: Arc<dyn Fetch>) -> Self {
self.fetcher = fetcher;
self
}

///
/// Sets the periodicity of fetching the latest nodes topology.
pub fn with_fetch_period(mut self, period: Duration) -> Self {
self.fetch_period = period;
self
}

///
/// Sets the interval for retrying fetching the nodes in case of error.
pub fn with_checker(mut self, checker: Arc<dyn HealthCheck>) -> Self {
self.checker = checker;
self
}

///
/// Sets the periodicity of checking the health of the nodes.
pub fn with_check_period(mut self, period: Duration) -> Self {
self.check_period = period;
self
Expand All @@ -133,14 +144,14 @@ where
// Communication channel between NodesFetchActor and HealthManagerActor.
let (fetch_sender, fetch_receiver) = watch::channel(None);

// Communication channel with HealthManagerActor to receive info about healthy seeds.
// Communication channel with HealthManagerActor to receive info about healthy seed nodes (used only once).
let (init_sender, mut init_receiver) = mpsc::channel(1);

// Start the receiving part first.
let health_manager_actor = HealthManagerActor::new(
Arc::clone(&self.checker),
self.check_period,
Arc::clone(&self.snapshot),
Arc::clone(&self.routing_snapshot),
fetch_receiver,
init_sender,
self.token.clone(),
Expand All @@ -156,7 +167,7 @@ where
error!("{DYNAMIC_ROUTE_PROVIDER}: failed to send results to HealthManager: {err:?}");
}

// Try await healthy seeds.
// Try await for healthy seeds.
let found_healthy_seeds =
match timeout(TIMEOUT_AWAIT_HEALTHY_SEED, init_receiver.recv()).await {
Ok(_) => {
Expand All @@ -174,24 +185,25 @@ where
false
}
};
// We can close the channel now.
init_receiver.close();

let fetch_actor = NodesFetchActor::new(
Arc::clone(&self.fetcher),
self.fetch_period,
self.fetch_retry_interval,
fetch_sender,
Arc::clone(&self.snapshot),
Arc::clone(&self.routing_snapshot),
self.token.clone(),
);
self.tracker.spawn(async move { fetch_actor.run().await });
info!(
"{DYNAMIC_ROUTE_PROVIDER}: NodesFetchActor and HealthManagerActor started successfully"
);

(found_healthy_seeds)
.then_some(())
.ok_or(anyhow!("No healthy seeds found"))
(found_healthy_seeds).then_some(()).ok_or(anyhow!(
"No healthy seeds found, they may become healthy later ..."
))
}

/// Kill all running tasks.
Expand Down Expand Up @@ -364,7 +376,7 @@ mod tests {
.await
.unwrap_err()
.to_string()
.contains("No healthy seeds found"));
.contains("No healthy seeds found, they may become healthy later ..."));

// Test 1: calls to route() return an error, as no healthy seeds exist.
for _ in 0..4 {
Expand Down Expand Up @@ -461,7 +473,7 @@ mod tests {
.await
.unwrap_err()
.to_string()
.contains("No healthy seeds found"));
.contains("No healthy seeds found, they may become healthy later ..."));

// Test: calls to route() return an error, as no healthy seeds exist.
for _ in 0..4 {
Expand Down
76 changes: 49 additions & 27 deletions ic-agent/src/agent/http_transport/dynamic_routing/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,50 @@ use crate::agent::http_transport::dynamic_routing::{
messages::{FetchedNodes, NodeHealthState},
node::Node,
snapshot::routing_snapshot::RoutingSnapshot,
type_aliases::{GlobalShared, ReceiverMpsc, ReceiverWatch, SenderMpsc},
type_aliases::{AtomicSwap, ReceiverMpsc, ReceiverWatch, SenderMpsc},
};

const CHANNEL_BUFFER: usize = 128;

///
/// A trait representing a health check of the node.
#[async_trait]
pub trait HealthCheck: Send + Sync + Debug {
///
/// Checks the health of the node.
async fn check(&self, node: &Node) -> anyhow::Result<HealthCheckStatus>;
}

///
/// A struct representing the health check status of the node.
#[derive(Clone, PartialEq, Debug, Default)]
pub struct HealthCheckStatus {
///
pub latency: Option<Duration>,
latency: Option<Duration>,
}

///
impl HealthCheckStatus {
///
/// Creates a new `HealthCheckStatus` instance.
pub fn new(latency: Option<Duration>) -> Self {
Self { latency }
}

///
/// Checks if the node is healthy.
pub fn is_healthy(&self) -> bool {
self.latency.is_some()
}

/// Get the latency of the health check.
pub fn latency(&self) -> Option<Duration> {
self.latency
}
}

///
/// A struct implementing the `HealthCheck` for the nodes.
#[derive(Debug)]
pub struct HealthChecker {
http_client: Client,
timeout: Duration,
}

///
impl HealthChecker {
///
/// Creates a new `HealthChecker` instance.
pub fn new(http_client: Client, timeout: Duration) -> Self {
Self {
http_client,
Expand Down Expand Up @@ -96,16 +98,22 @@ impl HealthCheck for HealthChecker {

const HEALTH_CHECK_ACTOR: &str = "HealthCheckActor";

/// A struct performing the health check of the node and sending the health status to the listener.
struct HealthCheckActor {
/// The health checker.
checker: Arc<dyn HealthCheck>,
/// The period of the health check.
period: Duration,
/// The node to check.
node: Node,
/// The sender channel (listener) to send the health status.
sender_channel: SenderMpsc<NodeHealthState>,
/// The cancellation token of the actor.
token: CancellationToken,
}

impl HealthCheckActor {
pub fn new(
fn new(
checker: Arc<dyn HealthCheck>,
period: Duration,
node: Node,
Expand All @@ -121,7 +129,8 @@ impl HealthCheckActor {
}
}

pub async fn run(self) {
/// Runs the actor.
async fn run(self) {
let mut interval = time::interval(self.period);
loop {
tokio::select! {
Expand All @@ -143,33 +152,46 @@ impl HealthCheckActor {
}
}

///
/// The name of the health manager actor.
pub const HEALTH_MANAGER_ACTOR: &str = "HealthManagerActor";

///
/// A struct managing the health checks of the nodes.
/// It receives the fetched nodes from the `NodesFetchActor` and starts the health checks for them.
/// It also receives the health status of the nodes from the `HealthCheckActor/s` and updates the routing snapshot.
pub struct HealthManagerActor<S> {
/// The health checker.
checker: Arc<dyn HealthCheck>,
/// The period of the health check.
period: Duration,
nodes_snapshot: GlobalShared<S>,
/// The routing snapshot, storing the nodes.
routing_snapshot: AtomicSwap<S>,
/// The receiver channel to listen to the fetched nodes messages.
fetch_receiver: ReceiverWatch<FetchedNodes>,
/// The sender channel to send the health status of the nodes back to HealthManagerActor.
check_sender: SenderMpsc<NodeHealthState>,
/// The receiver channel to receive the health status of the nodes from the `HealthCheckActor/s`.
check_receiver: ReceiverMpsc<NodeHealthState>,
/// The sender channel to send the initialization status to DynamicRouteProvider (used only once in the init phase).
init_sender: SenderMpsc<bool>,
/// The cancellation token of the actor.
token: CancellationToken,
/// The cancellation token for all the health checks.
nodes_token: CancellationToken,
/// The task tracker of the health checks, waiting for the tasks to exit (graceful termination).
nodes_tracker: TaskTracker,
/// The flag indicating if this actor is initialized with healthy nodes.
is_initialized: bool,
}

impl<S> HealthManagerActor<S>
where
S: RoutingSnapshot,
{
///
/// Creates a new `HealthManagerActor` instance.
pub fn new(
checker: Arc<dyn HealthCheck>,
period: Duration,
nodes_snapshot: GlobalShared<S>,
routing_snapshot: AtomicSwap<S>,
fetch_receiver: ReceiverWatch<FetchedNodes>,
init_sender: SenderMpsc<bool>,
token: CancellationToken,
Expand All @@ -179,7 +201,7 @@ where
Self {
checker,
period,
nodes_snapshot,
routing_snapshot,
fetch_receiver,
check_sender,
check_receiver,
Expand All @@ -191,11 +213,11 @@ where
}
}

///
/// Runs the actor.
pub async fn run(mut self) {
loop {
tokio::select! {
// Check if a new array of fetched nodes appeared in the channel from NodesFetchService.
// Process a new array of fetched nodes from NodesFetchActor, if it appeared in the channel.
result = self.fetch_receiver.changed() => {
if let Err(err) = result {
error!("{HEALTH_MANAGER_ACTOR}: nodes fetch sender has been dropped: {err:?}");
Expand All @@ -206,7 +228,7 @@ where
let Some(FetchedNodes { nodes }) = self.fetch_receiver.borrow_and_update().clone() else { continue };
self.handle_fetch_update(nodes).await;
}
// Receive health check messages from all running NodeHealthChecker/s.
// Receive health check messages from all running HealthCheckActor/s.
Some(msg) = self.check_receiver.recv() => {
self.handle_health_update(msg).await;
}
Expand All @@ -221,13 +243,13 @@ where
}

async fn handle_health_update(&mut self, msg: NodeHealthState) {
let current_snapshot = self.nodes_snapshot.load_full();
let current_snapshot = self.routing_snapshot.load_full();
let mut new_snapshot = (*current_snapshot).clone();
if let Err(err) = new_snapshot.update_node(&msg.node, msg.health.clone()) {
error!("{HEALTH_MANAGER_ACTOR}: failed to update snapshot: {err:?}");
return;
}
self.nodes_snapshot.store(Arc::new(new_snapshot));
self.routing_snapshot.store(Arc::new(new_snapshot));
if !self.is_initialized && msg.health.is_healthy() {
self.is_initialized = true;
// If TIMEOUT_AWAIT_HEALTHY_SEED has been exceeded, the receiver was dropped and send would thus fail. We ignore the failure.
Expand All @@ -244,11 +266,11 @@ where
return;
}
debug!("{HEALTH_MANAGER_ACTOR}: fetched nodes received {:?}", nodes);
let current_snapshot = self.nodes_snapshot.load_full();
let current_snapshot = self.routing_snapshot.load_full();
let mut new_snapshot = (*current_snapshot).clone();
// If the snapshot has changed, store it and restart all node's health checks.
if let Ok(true) = new_snapshot.sync_nodes(&nodes) {
self.nodes_snapshot.store(Arc::new(new_snapshot));
self.routing_snapshot.store(Arc::new(new_snapshot));
self.stop_all_checks().await;
self.start_checks(nodes.to_vec());
}
Expand Down
Loading

0 comments on commit 39567c9

Please sign in to comment.