Skip to content

Commit

Permalink
refactor: define custom error values for replica messages
Browse files Browse the repository at this point in the history
  • Loading branch information
moshababo committed Oct 11, 2023
1 parent 1d943ae commit 075143d
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 53 deletions.
71 changes: 58 additions & 13 deletions node/actors/consensus/src/leader/error.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,66 @@
use thiserror::Error;
use roles::validator;

#[derive(Error, Debug)]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) enum Error {
#[error("Received replica commit message for a proposal that we don't have.")]
MissingProposal,
#[error(transparent)]
Other(#[from] anyhow::Error),
pub(crate) enum ReplicaMessageError {
#[error("commit for a missing proposal")]
CommitMissingProposal,
#[error("commit for a past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")]
CommitOld {
current_view: validator::ViewNumber,
current_phase: validator::Phase,
},
#[error("prepare for a past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")]
PrepareOld {
current_view: validator::ViewNumber,
current_phase: validator::Phase,
},
#[error("commit for a view when we are not a leader")]
CommitWhenNotLeaderInView,
#[error("prepare for a view when we are not a leader")]
PrepareWhenNotLeaderInView,
#[error("commit duplicated (existing message: {existing_message:?}")]
CommitDuplicated {
existing_message: String
},
#[error("prepare duplicated (existing message: {existing_message:?}")]
PrepareDuplicated {
existing_message: String
},
#[error("commit while number of received messages below threshold, waiting for more (received: {num_messages:?}, threshold: {threshold:?}")]
CommitNumReceivedBelowThreshold {
num_messages: usize,
threshold: usize,
},
#[error("prepare while number of received messages below threshold, waiting for more (received: {num_messages:?}, threshold: {threshold:?}")]
PrepareNumReceivedBelowThreshold {
num_messages: usize,
threshold: usize,
},
#[error("prepare with high QC of a future view (high QC view: {high_qc_view:?}, current view: {current_view:?}")]
PrepareHighQCOfFutureView {
high_qc_view: validator::ViewNumber,
current_view: validator::ViewNumber,
},
#[error("commit with invalid signature")]
CommitInvalidSignature {
inner_err: crypto::bls12_381::Error
},
#[error("prepare with invalid signature")]
PrepareInvalidSignature {
inner_err: crypto::bls12_381::Error
},
#[error("prepare with invalid high QC")]
PrepareInvalidHighQC {
inner_err: anyhow::Error
}
}

// TODO: This is only a temporary measure because anyhow::Error doesn't implement
// PartialEq. When we change all errors to thiserror, we can just derive PartialEq.
impl PartialEq for Error {

/// Needed due to inner errors.
impl PartialEq for ReplicaMessageError {
fn eq(&self, other: &Self) -> bool {
matches!(
(self, other),
(Error::MissingProposal, Error::MissingProposal) | (Error::Other(_), Error::Other(_))
)
self.to_string() == other.to_string()
}
}
}
35 changes: 18 additions & 17 deletions node/actors/consensus/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::StateMachine;
use crate::{inner::ConsensusInner, leader::error::Error, metrics};
use anyhow::{anyhow, Context};
use crate::{inner::ConsensusInner, leader::error::ReplicaMessageError, metrics};
use concurrency::{ctx, metrics::LatencyHistogramExt as _};
use network::io::{ConsensusInputMessage, Target};
use roles::validator;
Expand All @@ -13,7 +12,7 @@ impl StateMachine {
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
signed_message: validator::Signed<validator::ReplicaCommit>,
) -> Result<(), Error> {
) -> Result<(), ReplicaMessageError> {
// ----------- Checking origin of the message --------------

// Unwrap message.
Expand All @@ -22,15 +21,15 @@ impl StateMachine {

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Commit) < (self.view, self.phase) {
return Err(Error::Other(anyhow!("Received replica commit message for a past view/phase.\nCurrent view: {:?}\nCurrent phase: {:?}",
self.view, self.phase)));
return Err(ReplicaMessageError::CommitOld {
current_view: self.view,
current_phase: self.phase
})
}

// If the message is for a view when we are not a leader, we discard it.
if consensus.view_leader(message.view) != consensus.secret_key.public() {
return Err(Error::Other(anyhow!(
"Received replica commit message for a view when we are not a leader."
)));
return Err(ReplicaMessageError::CommitWhenNotLeaderInView);
}

// If we already have a message from the same validator and for the same view, we discard it.
Expand All @@ -39,25 +38,26 @@ impl StateMachine {
.get(&message.view)
.and_then(|x| x.get(author))
{
return Err(Error::Other(anyhow!(
"Received replica commit message that we already have.\nExisting message: {:?}",
existing_message
)));
return Err(ReplicaMessageError::CommitDuplicated {
existing_message: format!("{:?}", existing_message),
})
}

// ----------- Checking the signed part of the message --------------

// Check the signature on the message.
signed_message
.verify()
.with_context(|| "Received replica commit message with invalid signature.")?;
.map_err(|err| ReplicaMessageError::CommitInvalidSignature {
inner_err: err
})?;

// ----------- Checking the contents of the message --------------

// We only accept replica commit messages for proposals that we have cached. That's so
// we don't need to store replica commit messages for different proposals.
if self.block_proposal_cache != Some(message) {
return Err(Error::MissingProposal);
return Err(ReplicaMessageError::CommitMissingProposal);
}

// ----------- All checks finished. Now we process the message. --------------
Expand All @@ -72,9 +72,10 @@ impl StateMachine {
let num_messages = self.commit_message_cache.get(&message.view).unwrap().len();

if num_messages < consensus.threshold() {
return Err(Error::Other(anyhow!("Received replica commit message. Waiting for more messages.\nCurrent number of messages: {}\nThreshold: {}",
num_messages,
consensus.threshold())));
return Err(ReplicaMessageError::CommitNumReceivedBelowThreshold {
num_messages,
threshold: consensus.threshold()
})
}

debug_assert!(num_messages == consensus.threshold());
Expand Down
45 changes: 24 additions & 21 deletions node/actors/consensus/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use super::StateMachine;
use crate::{inner::ConsensusInner, leader::error::Error, metrics};
use anyhow::{anyhow, Context as _};
use crate::{inner::ConsensusInner, leader::error::ReplicaMessageError, metrics};
use concurrency::ctx;
use network::io::{ConsensusInputMessage, Target};
use rand::Rng;
Expand All @@ -15,7 +14,7 @@ impl StateMachine {
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
signed_message: validator::Signed<validator::ReplicaPrepare>,
) -> Result<(), Error> {
) -> Result<(), ReplicaMessageError> {
// ----------- Checking origin of the message --------------

// Unwrap message.
Expand All @@ -24,15 +23,15 @@ impl StateMachine {

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Prepare) < (self.view, self.phase) {
return Err(Error::Other(anyhow!("Received replica prepare message for a past view/phase.\nCurrent view: {:?}\nCurrent phase: {:?}",
self.view, self.phase)));
return Err(ReplicaMessageError::PrepareOld {
current_view: self.view,
current_phase: self.phase,
})
}

// If the message is for a view when we are not a leader, we discard it.
if consensus.view_leader(message.view) != consensus.secret_key.public() {
return Err(Error::Other(anyhow!(
"Received replica prepare message for a view when we are not a leader."
)));
return Err(ReplicaMessageError::PrepareWhenNotLeaderInView)
}

// If we already have a message from the same validator and for the same view, we discard it.
Expand All @@ -41,35 +40,38 @@ impl StateMachine {
.get(&message.view)
.and_then(|x| x.get(author))
{
return Err(Error::Other(anyhow!(
"Received replica prepare message that we already have.\nExisting message: {:?}",
existing_message
)));
return Err(ReplicaMessageError::PrepareDuplicated {
existing_message: format!("{:?}", existing_message),
})
}

// ----------- Checking the signed part of the message --------------

// Check the signature on the message.
signed_message
.verify()
.with_context(|| "Received replica prepare message with invalid signature.")?;
.map_err(|err| ReplicaMessageError::PrepareInvalidSignature {
inner_err: err
})?;

// ----------- Checking the contents of the message --------------

// Verify the high QC.
message
.high_qc
.verify(&consensus.validator_set, consensus.threshold())
.with_context(|| "Received replica prepare message with invalid high QC.")?;
.map_err(|err| ReplicaMessageError::PrepareInvalidHighQC {
inner_err: err
})?;

// If the high QC is for a future view, we discard the message.
// This check is not necessary for correctness, but it's useful to
// guarantee that our proposals don't contain QCs from the future.
if message.high_qc.message.view >= message.view {
return Err(Error::Other(anyhow!(
"Received replica prepare message with a high QC from the future.\nHigh QC view: {:?}\nCurrent view: {:?}",
message.high_qc.message.view, message.view
)));
return Err(ReplicaMessageError::PrepareHighQCOfFutureView {
high_qc_view: message.high_qc.message.view,
current_view: message.view,
})
}

// ----------- All checks finished. Now we process the message. --------------
Expand All @@ -84,9 +86,10 @@ impl StateMachine {
let num_messages = self.prepare_message_cache.get(&message.view).unwrap().len();

if num_messages < consensus.threshold() {
return Err(Error::Other(anyhow!("Received replica prepare message. Waiting for more messages.\nCurrent number of messages: {}\nThreshold: {}",
num_messages,
consensus.threshold())));
return Err(ReplicaMessageError::PrepareNumReceivedBelowThreshold {
num_messages,
threshold: consensus.threshold()
})
}

// ----------- Creating the block proposal --------------
Expand Down
4 changes: 2 additions & 2 deletions node/actors/consensus/src/leader/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{leader::error::Error, testonly};
use crate::{leader::error::ReplicaMessageError, testonly};
use concurrency::ctx;
use rand::{rngs::StdRng, Rng, SeedableRng};
use roles::validator;
Expand Down Expand Up @@ -36,6 +36,6 @@ async fn replica_commit() {
&consensus.inner,
test_replica_msg.cast().unwrap()
),
Err(Error::MissingProposal)
Err(ReplicaMessageError::CommitMissingProposal)
);
}

0 comments on commit 075143d

Please sign in to comment.