Skip to content

Commit

Permalink
fix: add retry for client initial cluster node discovery
Browse files Browse the repository at this point in the history
Signed-off-by: bsbds <[email protected]>
  • Loading branch information
bsbds committed Sep 10, 2024
1 parent 0a04784 commit c7a6a6a
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions crates/curp/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use parking_lot::RwLock;
use tokio::task::JoinHandle;
#[cfg(not(madsim))]
use tonic::transport::ClientTlsConfig;
use tracing::debug;
use tracing::{debug, warn};
#[cfg(madsim)]
use utils::ClientTlsConfig;
use utils::{build_endpoint, config::ClientConfig};
Expand Down Expand Up @@ -298,6 +298,27 @@ impl ClientBuilder {
/// Return `tonic::Status` for connection failure or some server errors.
#[inline]
pub async fn discover_from(mut self, addrs: Vec<String>) -> Result<Self, tonic::Status> {
/// Sleep duration in secs when the cluster is unavailable
const DISCOVER_SLEEP_DURATION: u64 = 1;
loop {
match self.try_discover_from(&addrs).await {
Ok(()) => return Ok(self),
Err(e) if matches!(e.code(), tonic::Code::Unavailable) => {
warn!("cluster is starting, sleep for {DISCOVER_SLEEP_DURATION} secs");
tokio::time::sleep(Duration::from_secs(DISCOVER_SLEEP_DURATION)).await;
}
Err(e) => return Err(e),
}
}
}

/// Discover the initial states from some endpoints
///
/// # Errors
///
/// Return `tonic::Status` for connection failure or some server errors.
#[inline]
pub async fn try_discover_from(&mut self, addrs: &[String]) -> Result<(), tonic::Status> {
let propose_timeout = *self.config.propose_timeout();
let mut futs: FuturesUnordered<_> = addrs
.iter()
Expand Down Expand Up @@ -330,16 +351,25 @@ impl ClientBuilder {
self.all_members = if self.is_raw_curp {
Some(r.into_peer_urls())
} else {
Some(r.into_client_urls())
Some(Self::ensure_no_empty_address(r.into_client_urls())?)
};
return Ok(self);
return Ok(());
}
Err(e) => err = e,
}
}
Err(err)
}

/// Ensures that no server has an empty list of addresses.
fn ensure_no_empty_address(
urls: HashMap<ServerId, Vec<String>>,
) -> Result<HashMap<ServerId, Vec<String>>, tonic::Status> {
(!urls.values().any(Vec::is_empty))
.then_some(urls)
.ok_or(tonic::Status::unavailable("cluster not published"))
}

/// Init state builder
fn init_state_builder(&self) -> StateBuilder {
let mut builder = StateBuilder::new(
Expand Down

0 comments on commit c7a6a6a

Please sign in to comment.