Skip to content

Commit

Permalink
chore: add error priority
Browse files Browse the repository at this point in the history
Signed-off-by: iGxnon <[email protected]>
  • Loading branch information
iGxnon committed Dec 29, 2023
1 parent ec60762 commit 79cb6a1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 26 deletions.
36 changes: 24 additions & 12 deletions curp/src/client_new/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ use crate::{
rpc::{
self,
connect::{BypassedConnect, ConnectApi},
ConfChange, CurpError, FetchClusterRequest, FetchClusterResponse, FetchReadStateRequest,
Member, ProposeConfChangeRequest, ProposeId, ProposeRequest, Protocol, PublishRequest,
ReadState, ShutdownRequest, WaitSyncedRequest,
ConfChange, CurpError, CurpErrorPriority, FetchClusterRequest, FetchClusterResponse,
FetchReadStateRequest, Member, ProposeConfChangeRequest, ProposeId, ProposeRequest,
Protocol, PublishRequest, ReadState, ShutdownRequest, WaitSyncedRequest,
},
};

Expand Down Expand Up @@ -254,7 +254,7 @@ impl<C: Command> Unary<C> {
.await;
let super_quorum = super_quorum(size);

let mut err = None;
let mut err: Option<CurpError> = None;
let mut execute_result: Option<C::ER> = None;
let mut ok_cnt = 0;

Check warning on line 259 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L249-L259

Added lines #L249 - L259 were not covered by tests

Expand All @@ -263,10 +263,16 @@ impl<C: Command> Unary<C> {
Ok(resp) => resp.into_inner(),
Err(e) => {
warn!("propose cmd({propose_id}) to server({id}) error: {e:?}");
if e.return_early() {
if e.priority() == CurpErrorPriority::ReturnImmediately {
return Err(e);
}
err = Some(e);
if let Some(old_err) = err.as_ref() {
if old_err.priority() <= e.priority() {
err = Some(e);
}
} else {
err = Some(e);
}
continue;

Check warning on line 276 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L261-L276

Added lines #L261 - L276 were not covered by tests
}
};
Expand Down Expand Up @@ -384,7 +390,7 @@ impl<C: Command> ClientApi for Unary<C> {
futures::future::Either::Left((fast_result, slow_round)) => match fast_result {
Ok(er) => er.map(|e| (e, None)),
Err(err) => {
if err.return_early() {
if err.priority() > CurpErrorPriority::Low {
return Err(err);
}

Check warning on line 395 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L388-L395

Added lines #L388 - L395 were not covered by tests
// fallback to slow round if fast round failed
Expand All @@ -395,7 +401,7 @@ impl<C: Command> ClientApi for Unary<C> {
futures::future::Either::Right((slow_result, fast_round)) => match slow_result {
Ok(er) => er.map(|(asr, e)| (e, Some(asr))),
Err(err) => {
if err.return_early() {
if err.priority() > CurpErrorPriority::Low {
return Err(err);
}
let fr = fast_round.await?;
Expand All @@ -406,7 +412,7 @@ impl<C: Command> ClientApi for Unary<C> {
} else {
let (fr, sr) = futures::future::join(fast_round, slow_round).await;
if let Err(err) = fr {
if err.return_early() {
if err.priority() > CurpErrorPriority::Low {
return Err(err);
}
}
Expand Down Expand Up @@ -515,17 +521,23 @@ impl<C: Command> ClientApi for Unary<C> {
let mut max_term = 0;
let mut res = None;
let mut ok_cnt = 0;
let mut err = None;
let mut err: Option<CurpError> = None;

Check warning on line 524 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L509-L524

Added lines #L509 - L524 were not covered by tests

while let Some((id, resp)) = responses.next().await {
let inner = match resp {
Ok(r) => r,
Err(e) => {
warn!("fetch cluster from {} failed, {:?}", id, e);
if e.return_early() {
if e.priority() == CurpErrorPriority::ReturnImmediately {
return Err(e);
}
err = Some(e);
if let Some(old_err) = err.as_ref() {
if old_err.priority() <= e.priority() {
err = Some(e);
}
} else {
err = Some(e);
}
continue;

Check warning on line 541 in curp/src/client_new/unary.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/client_new/unary.rs#L526-L541

Added lines #L526 - L541 were not covered by tests
}
};
Expand Down
40 changes: 26 additions & 14 deletions curp/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,24 +635,36 @@ impl CurpError {
Self::Internal(reason.into())
}

/// Errors that should return early to the retry layer when we
/// got the error at propose stage
pub(crate) fn return_early(&self) -> bool {
matches!(
*self,
/// Get the priority of the error
pub(crate) fn priority(&self) -> CurpErrorPriority {
match *self {

Check warning on line 640 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L639-L640

Added lines #L639 - L640 were not covered by tests
CurpError::Duplicated(_)
| CurpError::ShuttingDown(_)
| CurpError::InvalidConfig(_)
| CurpError::NodeAlreadyExists(_)
| CurpError::NodeNotExists(_)
| CurpError::LearnerNotCatchUp(_)
| CurpError::ExpiredClientId(_)
| CurpError::WrongClusterVersion(_)
| CurpError::Redirect(_)
)
| CurpError::ShuttingDown(_)
| CurpError::InvalidConfig(_)
| CurpError::NodeAlreadyExists(_)
| CurpError::NodeNotExists(_)
| CurpError::LearnerNotCatchUp(_)
| CurpError::ExpiredClientId(_)
| CurpError::Redirect(_) => CurpErrorPriority::ReturnImmediately,
CurpError::WrongClusterVersion(_) => CurpErrorPriority::High,

Check warning on line 649 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L648-L649

Added lines #L648 - L649 were not covered by tests
CurpError::RpcTransport(_) | CurpError::Internal(_) | CurpError::KeyConflict(_) => {
CurpErrorPriority::Low

Check warning on line 651 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L651

Added line #L651 was not covered by tests
}
}
}

Check warning on line 654 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L654

Added line #L654 was not covered by tests
}

/// The priority of curp error, indicate which error should be handled in retry layer
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]

Check warning on line 658 in curp/src/rpc/mod.rs

View check run for this annotation

Codecov / codecov/patch

curp/src/rpc/mod.rs#L658

Added line #L658 was not covered by tests
pub(crate) enum CurpErrorPriority {
/// Low priority
Low = 1,
/// High priority
High = 2,
/// Should be returned immediately if any server response it
ReturnImmediately = 3,
}

impl<E: std::error::Error + 'static> From<E> for CurpError {
#[inline]
fn from(value: E) -> Self {
Expand Down

0 comments on commit 79cb6a1

Please sign in to comment.