Skip to content

Commit

Permalink
works
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Oct 25, 2023
1 parent aae7393 commit 586cace
Show file tree
Hide file tree
Showing 29 changed files with 277 additions and 177 deletions.
1 change: 1 addition & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions node/actors/consensus/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ pub(crate) enum Error {
#[error("we are not a leader for this message's view")]
WhenNotLeaderInView,
#[error("duplicate message from a replica (existing message: {existing_message:?}")]
DuplicateMessage { existing_message: validator::ReplicaCommit },
DuplicateMessage {
existing_message: validator::ReplicaCommit,
},
#[error("number of received messages is below threshold. waiting for more (received: {num_messages:?}, threshold: {threshold:?}")]
NumReceivedBelowThreshold {
num_messages: usize,
Expand Down Expand Up @@ -61,15 +63,15 @@ impl StateMachine {
.get(&message.view)
.and_then(|x| x.get(author))
{
return Err(Error::DuplicateMessage { existing_message: existing_message.msg.clone() });
return Err(Error::DuplicateMessage {
existing_message: existing_message.msg,
});
}

// ----------- Checking the signed part of the message --------------

// Check the signature on the message.
signed_message
.verify()
.map_err(Error::InvalidSignature)?;
signed_message.verify().map_err(Error::InvalidSignature)?;

// ----------- Checking the contents of the message --------------

Expand Down
21 changes: 10 additions & 11 deletions node/actors/consensus/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ impl StateMachine {
// ----------- Checking the signed part of the message --------------

// Check the signature on the message.
signed_message
.verify()
.map_err(Error::InvalidSignature)?;
signed_message.verify().map_err(Error::InvalidSignature)?;

// ----------- Checking the contents of the message --------------

Expand Down Expand Up @@ -121,7 +119,7 @@ impl StateMachine {
// Get all the replica prepare messages for this view. Note that we consume the
// messages here. That's purposeful, so that we don't create a new block proposal
// for this same view if we receive another replica prepare message after this.
let replica_messages : Vec<_> = self
let replica_messages: Vec<_> = self
.prepare_message_cache
.remove(&message.view)
.unwrap()
Expand All @@ -136,27 +134,27 @@ impl StateMachine {

for vote in replica_messages.iter() {
*count
.entry(vote.msg.high_vote.proposal.clone())
.entry(vote.msg.high_vote.proposal)
.or_default() += 1;
}

let highest_vote : Option<validator::BlockHeader> = count
let highest_vote: Option<validator::BlockHeader> = count
.iter()
// We only take one value from the iterator because there can only be at most one block with a quorum of 2f+1 votes.
.find(|(_, v)| **v > 2 * consensus.faulty_replicas())
.map(|(h, _)| h)
.cloned();

// Get the highest CommitQC.
let highest_qc : &validator::CommitQC = replica_messages
let highest_qc: &validator::CommitQC = replica_messages
.iter()
.map(|s| &s.msg.high_qc)
.max_by_key(|qc| qc.message.view)
.unwrap();

// Create the block proposal to send to the replicas,
// and the commit vote to store in our block proposal cache.
let (proposal,payload) = match highest_vote {
let (proposal, payload) = match highest_vote {
// The previous block was not finalized, so we need to propose it again.
// For this we only need the header, since we are guaranteed that at least
// f+1 honest replicas have the block can broadcast when finalized
Expand All @@ -172,8 +170,9 @@ impl StateMachine {
metrics::METRICS
.leader_proposal_payload_size
.observe(payload.0.len());
let proposal = validator::BlockHeader::new(&highest_qc.message.proposal, payload.hash());
(proposal,Some(payload))
let proposal =
validator::BlockHeader::new(&highest_qc.message.proposal, payload.hash());
(proposal, Some(payload))
}
};

Expand All @@ -182,7 +181,7 @@ impl StateMachine {
self.view = message.view;
self.phase = validator::Phase::Commit;
self.phase_start = ctx.now();
self.block_proposal_cache = Some(proposal.clone());
self.block_proposal_cache = Some(proposal);

// ----------- Prepare our message and send it --------------

Expand Down
6 changes: 4 additions & 2 deletions node/actors/consensus/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ impl StateMachine {
let (label, result) = match &input.msg {
validator::ConsensusMsg::ReplicaPrepare(_) => (
metrics::ConsensusMsgLabel::ReplicaPrepare,
self.process_replica_prepare(ctx, consensus, input.cast().unwrap()).map_err(Error::ReplicaPrepare),
self.process_replica_prepare(ctx, consensus, input.cast().unwrap())
.map_err(Error::ReplicaPrepare),
),
validator::ConsensusMsg::ReplicaCommit(_) => (
metrics::ConsensusMsgLabel::ReplicaCommit,
self.process_replica_commit(ctx, consensus, input.cast().unwrap()).map_err(Error::ReplicaCommit),
self.process_replica_commit(ctx, consensus, input.cast().unwrap())
.map_err(Error::ReplicaCommit),
),
_ => unreachable!(),
};
Expand Down
4 changes: 2 additions & 2 deletions node/actors/consensus/src/leader/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ async fn replica_commit() {
match consensus.leader.process_replica_commit(
ctx,
&consensus.inner,
test_replica_msg.cast().unwrap()
test_replica_msg.cast().unwrap(),
) {
Err(super::replica_commit::Error::UnexpectedProposal) => {},
Err(super::replica_commit::Error::UnexpectedProposal) => {}
res => panic!("unexpected result {res:?}"),
}
}
17 changes: 11 additions & 6 deletions node/actors/consensus/src/replica/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,17 @@ impl StateMachine {
// TODO(gprusak): for availability of finalized blocks,
// replicas should be able to broadcast highest quorums without
// the corresponding block (same goes for synchronization).
let Some(cache) = self.block_proposal_cache.get(&commit_qc.message.proposal.number) else { return };
let Some(payload) = cache.get(&commit_qc.message.proposal.payload) else { return };
let Some(cache) = self
.block_proposal_cache
.get(&commit_qc.message.proposal.number)
else {
return;
};
let Some(payload) = cache.get(&commit_qc.message.proposal.payload) else {
return;
};
let block = validator::FinalBlock {
header: commit_qc.message.proposal.clone(),
header: commit_qc.message.proposal,
payload: payload.clone(),
justification: commit_qc.clone(),
};
Expand All @@ -29,8 +36,6 @@ impl StateMachine {
block.header.hash()
);

consensus
.pipe
.send(OutputMessage::FinalizedBlock(block));
consensus.pipe.send(OutputMessage::FinalizedBlock(block));
}
}
18 changes: 12 additions & 6 deletions node/actors/consensus/src/replica/leader_commit.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use super::StateMachine;
use crate::{inner::ConsensusInner};
use crate::inner::ConsensusInner;
use anyhow::Context as _;
use concurrency::ctx;
use roles::validator;
use tracing::instrument;
use anyhow::Context as _;

#[derive(thiserror::Error, Debug)]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) enum Error {
#[error("bad protocol version")]
BadProtocolVersion,
#[error("invalid leader (correct leader: {correct_leader:?}, received leader: {received_leader:?})]")]
InvalidLeader {
correct_leader: validator::PublicKey,
Expand Down Expand Up @@ -43,6 +45,11 @@ impl StateMachine {
let author = &signed_message.key;
let view = message.justification.message.view;

// Check protocol version.
if message.justification.message.proposal.protocol_version != validator::CURRENT_VERSION {
return Err(Error::BadProtocolVersion);
}

// Check that it comes from the correct leader.
if author != &consensus.view_leader(view) {
return Err(Error::InvalidLeader {
Expand All @@ -62,9 +69,7 @@ impl StateMachine {
// ----------- Checking the signed part of the message --------------

// Check the signature on the message.
signed_message
.verify()
.map_err(Error::InvalidSignature)?;
signed_message.verify().map_err(Error::InvalidSignature)?;

// ----------- Checking the justification of the message --------------

Expand All @@ -87,7 +92,8 @@ impl StateMachine {

// Start a new view. But first we skip to the view of this message.
self.view = view;
self.start_new_view(ctx, consensus).context("start_new_view()")?;
self.start_new_view(ctx, consensus)
.context("start_new_view()")?;

Ok(())
}
Expand Down
43 changes: 26 additions & 17 deletions node/actors/consensus/src/replica/leader_prepare.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use super::StateMachine;
use crate::{inner::ConsensusInner};
use crate::inner::ConsensusInner;
use anyhow::Context as _;
use concurrency::ctx;
use network::io::{ConsensusInputMessage, Target};
use roles::validator;
use std::collections::HashMap;
use tracing::instrument;
use anyhow::Context as _;

#[derive(thiserror::Error, Debug)]
#[allow(clippy::missing_docs_in_private_items)]
pub(crate) enum Error {
#[error("bad protocol version")]
BadProtocolVersion,
#[error("invalid leader (correct leader: {correct_leader:?}, received leader: {received_leader:?}])")]
InvalidLeader {
correct_leader: validator::PublicKey,
Expand All @@ -26,7 +28,9 @@ pub(crate) enum Error {
InvalidPrepareQC(#[source] anyhow::Error),
#[error("invalid high QC")]
InvalidHighQC(#[source] anyhow::Error),
#[error("high QC of a future view (high QC view: {high_qc_view:?}, current view: {current_view:?}")]
#[error(
"high QC of a future view (high QC view: {high_qc_view:?}, current view: {current_view:?}"
)]
HighQCOfFutureView {
high_qc_view: validator::ViewNumber,
current_view: validator::ViewNumber,
Expand All @@ -47,7 +51,9 @@ pub(crate) enum Error {
},
#[error("block proposal with mismatched payload")]
ProposalMismatchedPayload,
#[error("block proposal with an oversized payload (payload size: {payload_size}, block: {header:?}")]
#[error(
"block proposal with an oversized payload (payload size: {payload_size}, block: {header:?}"
)]
ProposalOversizedPayload {
payload_size: usize,
header: validator::BlockHeader,
Expand Down Expand Up @@ -78,6 +84,11 @@ impl StateMachine {
let author = &signed_message.key;
let view = message.view;

// Check protocol version.
if message.proposal.protocol_version != validator::CURRENT_VERSION {
return Err(Error::BadProtocolVersion);
}

// Check that it comes from the correct leader.
if author != &consensus.view_leader(view) {
return Err(Error::InvalidLeader {
Expand All @@ -96,9 +107,7 @@ impl StateMachine {

// ----------- Checking the signed part of the message --------------

signed_message
.verify()
.map_err(Error::InvalidSignature)?;
signed_message.verify().map_err(Error::InvalidSignature)?;

// ----------- Checking the justification of the message --------------

Expand All @@ -113,19 +122,17 @@ impl StateMachine {
let mut vote_count: HashMap<_, usize> = HashMap::new();

for (msg, signers) in &message.justification.map {
*vote_count
.entry(msg.high_vote.proposal)
.or_default() += signers.len();
*vote_count.entry(msg.high_vote.proposal).or_default() += signers.len();
}

let highest_vote : Option<validator::BlockHeader> = vote_count
let highest_vote: Option<validator::BlockHeader> = vote_count
.into_iter()
// We only take one value from the iterator because there can only be at most one block with a quorum of 2f+1 votes.
.find(|(_, v)| *v > 2 * consensus.faulty_replicas())
.map(|(h, _)| h);

// Get the highest CommitQC and verify it.
let highest_qc : validator::CommitQC = message
let highest_qc: validator::CommitQC = message
.justification
.map
.keys()
Expand Down Expand Up @@ -162,7 +169,7 @@ impl StateMachine {
if payload.0.len() > ConsensusInner::PAYLOAD_MAX_SIZE {
return Err(Error::ProposalOversizedPayload {
payload_size: payload.0.len(),
header: message.proposal.clone(),
header: message.proposal,
});
}

Expand All @@ -172,7 +179,9 @@ impl StateMachine {
}

// Check that we finalized the previous block.
if highest_vote.is_some() && highest_vote.as_ref() != Some(&highest_qc.message.proposal) {
if highest_vote.is_some()
&& highest_vote.as_ref() != Some(&highest_qc.message.proposal)
{
return Err(Error::ProposalWhenPreviousNotFinalized);
}

Expand All @@ -181,7 +190,7 @@ impl StateMachine {
return Err(Error::ProposalInvalidParentHash {
correct_parent_hash: highest_qc.message.proposal.hash(),
received_parent_hash: message.proposal.parent,
header: message.proposal.clone(),
header: message.proposal,
});
}

Expand All @@ -190,7 +199,7 @@ impl StateMachine {
return Err(Error::ProposalNonSequentialNumber {
correct_number: highest_qc.message.proposal.number.next(),
received_number: message.proposal.number,
header: message.proposal.clone(),
header: message.proposal,
});
}
}
Expand Down Expand Up @@ -230,7 +239,7 @@ impl StateMachine {
self.block_proposal_cache
.entry(message.proposal.number)
.or_default()
.insert(payload.hash(),payload.clone());
.insert(payload.hash(), payload.clone());
}

// Backup our state.
Expand Down
4 changes: 2 additions & 2 deletions node/actors/consensus/src/replica/new_view.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::{StateMachine};
use super::StateMachine;
use crate::ConsensusInner;
use anyhow::Context as _;
use concurrency::ctx;
use network::io::{ConsensusInputMessage, Target};
use roles::validator;
use tracing::instrument;
use anyhow::Context as _;

impl StateMachine {
/// This blocking method is used whenever we start a new view.
Expand Down
Loading

0 comments on commit 586cace

Please sign in to comment.