Skip to content

Commit

Permalink
Merge branch 'main' into k8s_deploy_script
Browse files Browse the repository at this point in the history
  • Loading branch information
ElFantasma committed Feb 22, 2024
2 parents 7a817d7 + 842d4fd commit 4c230b3
Show file tree
Hide file tree
Showing 105 changed files with 3,851 additions and 3,773 deletions.
2 changes: 2 additions & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,5 @@ needless_pass_by_ref_mut = "allow"
box_default = "allow"
# remove once fix to https://github.com/rust-lang/rust-clippy/issues/11764 is available on CI.
map_identity = "allow"
# &*x is not equivalent to x, because it affects borrowing in closures.
borrow_deref_ref = "allow"
26 changes: 4 additions & 22 deletions node/actors/bft/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! The inner data of the consensus state machine. This is shared between the different roles.
use crate::{misc, PayloadManager};
use crate::PayloadManager;
use std::sync::Arc;
use tracing::instrument;
use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;

Expand All @@ -10,8 +9,6 @@ use zksync_consensus_storage as storage;
pub struct Config {
/// The validator's secret key.
pub secret_key: validator::SecretKey,
/// A vector of public keys for all the validators in the network.
pub validator_set: validator::ValidatorSet,
/// The maximum size of the payload of a block, in bytes. We will
/// reject blocks with payloads larger than this.
pub max_payload_size: usize,
Expand All @@ -24,23 +21,8 @@ pub struct Config {
}

impl Config {
/// Computes the validator for the given view.
#[instrument(level = "trace", ret)]
pub fn view_leader(&self, view_number: validator::ViewNumber) -> validator::PublicKey {
let index = view_number.0 as usize % self.validator_set.len();
self.validator_set.get(index).unwrap().clone()
}

/// Calculate the consensus threshold, the minimum number of votes for any consensus action to be valid,
/// for a given number of replicas.
#[instrument(level = "trace", ret)]
pub fn threshold(&self) -> usize {
misc::consensus_threshold(self.validator_set.len())
}

/// Calculate the maximum number of faulty replicas, for a given number of replicas.
#[instrument(level = "trace", ret)]
pub fn faulty_replicas(&self) -> usize {
misc::faulty_replicas(self.validator_set.len())
/// Genesis.
pub fn genesis(&self) -> &validator::Genesis {
self.block_store.genesis()
}
}
8 changes: 2 additions & 6 deletions node/actors/bft/src/leader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@
//! and aggregates replica messages. It mainly acts as a central point of communication for the replicas. Note that
//! our consensus node will perform both the replica and leader roles simultaneously.

mod replica_commit;
mod replica_prepare;
pub(crate) mod replica_commit;
pub(crate) mod replica_prepare;
mod state_machine;
#[cfg(test)]
mod tests;

#[cfg(test)]
pub(crate) use self::replica_commit::Error as ReplicaCommitError;
#[cfg(test)]
pub(crate) use self::replica_prepare::Error as ReplicaPrepareError;
pub(crate) use self::state_machine::StateMachine;
75 changes: 45 additions & 30 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Handler of a ReplicaCommit message.
use super::StateMachine;
use crate::metrics;
use std::collections::HashMap;
Expand Down Expand Up @@ -34,6 +35,9 @@ pub(crate) enum Error {
/// The processing node is not a lead for this message's view.
#[error("we are not a leader for this message's view")]
NotLeaderInView,
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(anyhow::Error),
/// Duplicate message from a replica.
#[error("duplicate message from a replica (existing message: {existing_message:?}")]
DuplicateMessage {
Expand All @@ -55,47 +59,51 @@ impl StateMachine {
// ----------- Checking origin of the message --------------

// Unwrap message.
let message = signed_message.msg;
let message = &signed_message.msg;
let author = &signed_message.key;

// Check protocol version compatibility.
if !crate::PROTOCOL_VERSION.compatible(&message.protocol_version) {
if !crate::PROTOCOL_VERSION.compatible(&message.view.protocol_version) {
return Err(Error::IncompatibleProtocolVersion {
message_version: message.protocol_version,
message_version: message.view.protocol_version,
local_version: crate::PROTOCOL_VERSION,
});
}

// Check that the message signer is in the validator set.
let validator_index =
self.config
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
if !self.config.genesis().validators.contains(author) {
return Err(Error::NonValidatorSigner {
signer: author.clone(),
});
}

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Commit) < (self.view, self.phase) {
if (message.view.number, validator::Phase::Commit) < (self.view, self.phase) {
return Err(Error::Old {
current_view: self.view,
current_phase: self.phase,
});
}

// If the message is for a view when we are not a leader, we discard it.
if self.config.view_leader(message.view) != self.config.secret_key.public() {
if self
.config
.genesis()
.validators
.view_leader(message.view.number)
!= self.config.secret_key.public()
{
return Err(Error::NotLeaderInView);
}

// If we already have a message from the same validator and for the same view, we discard it.
if let Some(existing_message) = self
.commit_message_cache
.get(&message.view)
.get(&message.view.number)
.and_then(|x| x.get(author))
{
return Err(Error::DuplicateMessage {
existing_message: existing_message.msg,
existing_message: existing_message.msg.clone(),
});
}

Expand All @@ -104,60 +112,67 @@ impl StateMachine {
// Check the signature on the message.
signed_message.verify().map_err(Error::InvalidSignature)?;

message
.verify(self.config.genesis())
.map_err(Error::InvalidMessage)?;

// ----------- All checks finished. Now we process the message. --------------

// TODO: we have a bug here since we don't check whether replicas commit
// to the same proposal.

// We add the message to the incrementally-constructed QC.
self.commit_qcs
.entry(message.view)
.or_insert(CommitQC::new(message, &self.config.validator_set))
.add(&signed_message.sig, validator_index);
.entry(message.view.number)
.or_insert_with(|| CommitQC::new(message.clone(), self.config.genesis()))
.add(&signed_message, self.config.genesis());

// We store the message in our cache.
let cache_entry = self.commit_message_cache.entry(message.view).or_default();
cache_entry.insert(author.clone(), signed_message);
let cache_entry = self
.commit_message_cache
.entry(message.view.number)
.or_default();
cache_entry.insert(author.clone(), signed_message.clone());

// Now we check if we have enough messages to continue.
let mut by_proposal: HashMap<_, Vec<_>> = HashMap::new();
for msg in cache_entry.values() {
by_proposal.entry(msg.msg.proposal).or_default().push(msg);
}
let Some((_, replica_messages)) = by_proposal
.into_iter()
.find(|(_, v)| v.len() >= self.config.threshold())
let threshold = self.config.genesis().validators.threshold();
let Some((_, replica_messages)) =
by_proposal.into_iter().find(|(_, v)| v.len() >= threshold)
else {
return Ok(());
};
debug_assert_eq!(replica_messages.len(), self.config.threshold());
debug_assert_eq!(replica_messages.len(), threshold);

// ----------- Update the state machine --------------

let now = ctx.now();
metrics::METRICS
.leader_commit_phase_latency
.observe_latency(now - self.phase_start);
self.view = message.view.next();
self.view = message.view.number.next();
self.phase = validator::Phase::Prepare;
self.phase_start = now;

// ----------- Prepare our message and send it. --------------

// Remove replica commit messages for this view, so that we don't create a new leader commit
// for this same view if we receive another replica commit message after this.
self.commit_message_cache.remove(&message.view);
self.commit_message_cache.remove(&message.view.number);

// Consume the incrementally-constructed QC for this view.
let justification = self.commit_qcs.remove(&message.view).unwrap();
let justification = self.commit_qcs.remove(&message.view.number).unwrap();

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
message: self
.config
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderCommit(
validator::LeaderCommit {
protocol_version: crate::PROTOCOL_VERSION,
justification,
},
validator::LeaderCommit { justification },
)),
recipient: Target::Broadcast,
};
Expand Down
Loading

0 comments on commit 4c230b3

Please sign in to comment.