Skip to content

Commit

Permalink
Merge branch 'main' into k8s_node_communication_test
Browse files Browse the repository at this point in the history
  • Loading branch information
IAvecilla committed Feb 27, 2024
2 parents fb791b7 + f03b302 commit 5b29f13
Show file tree
Hide file tree
Showing 105 changed files with 4,001 additions and 3,937 deletions.
269 changes: 131 additions & 138 deletions node/Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,5 @@ needless_pass_by_ref_mut = "allow"
box_default = "allow"
# remove once fix to https://github.com/rust-lang/rust-clippy/issues/11764 is available on CI.
map_identity = "allow"
# &*x is not equivalent to x, because it affects borrowing in closures.
borrow_deref_ref = "allow"
26 changes: 4 additions & 22 deletions node/actors/bft/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! The inner data of the consensus state machine. This is shared between the different roles.
use crate::{misc, PayloadManager};
use crate::PayloadManager;
use std::sync::Arc;
use tracing::instrument;
use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;

Expand All @@ -10,8 +9,6 @@ use zksync_consensus_storage as storage;
pub struct Config {
/// The validator's secret key.
pub secret_key: validator::SecretKey,
/// A vector of public keys for all the validators in the network.
pub validator_set: validator::ValidatorSet,
/// The maximum size of the payload of a block, in bytes. We will
/// reject blocks with payloads larger than this.
pub max_payload_size: usize,
Expand All @@ -24,23 +21,8 @@ pub struct Config {
}

impl Config {
/// Computes the validator for the given view.
#[instrument(level = "trace", ret)]
pub fn view_leader(&self, view_number: validator::ViewNumber) -> validator::PublicKey {
let index = view_number.0 as usize % self.validator_set.len();
self.validator_set.get(index).unwrap().clone()
}

/// Calculate the consensus threshold, the minimum number of votes for any consensus action to be valid,
/// for a given number of replicas.
#[instrument(level = "trace", ret)]
pub fn threshold(&self) -> usize {
misc::consensus_threshold(self.validator_set.len())
}

/// Calculate the maximum number of faulty replicas, for a given number of replicas.
#[instrument(level = "trace", ret)]
pub fn faulty_replicas(&self) -> usize {
misc::faulty_replicas(self.validator_set.len())
/// Genesis.
pub fn genesis(&self) -> &validator::Genesis {
self.block_store.genesis()
}
}
8 changes: 2 additions & 6 deletions node/actors/bft/src/leader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@
//! and aggregates replica messages. It mainly acts as a central point of communication for the replicas. Note that
//! our consensus node will perform both the replica and leader roles simultaneously.

mod replica_commit;
mod replica_prepare;
pub(crate) mod replica_commit;
pub(crate) mod replica_prepare;
mod state_machine;
#[cfg(test)]
mod tests;

#[cfg(test)]
pub(crate) use self::replica_commit::Error as ReplicaCommitError;
#[cfg(test)]
pub(crate) use self::replica_prepare::Error as ReplicaPrepareError;
pub(crate) use self::state_machine::StateMachine;
75 changes: 45 additions & 30 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Handler of a ReplicaCommit message.
use super::StateMachine;
use crate::metrics;
use std::collections::HashMap;
Expand Down Expand Up @@ -34,6 +35,9 @@ pub(crate) enum Error {
/// The processing node is not a lead for this message's view.
#[error("we are not a leader for this message's view")]
NotLeaderInView,
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(anyhow::Error),
/// Duplicate message from a replica.
#[error("duplicate message from a replica (existing message: {existing_message:?}")]
DuplicateMessage {
Expand All @@ -55,47 +59,51 @@ impl StateMachine {
// ----------- Checking origin of the message --------------

// Unwrap message.
let message = signed_message.msg;
let message = &signed_message.msg;
let author = &signed_message.key;

// Check protocol version compatibility.
if !crate::PROTOCOL_VERSION.compatible(&message.protocol_version) {
if !crate::PROTOCOL_VERSION.compatible(&message.view.protocol_version) {
return Err(Error::IncompatibleProtocolVersion {
message_version: message.protocol_version,
message_version: message.view.protocol_version,
local_version: crate::PROTOCOL_VERSION,
});
}

// Check that the message signer is in the validator set.
let validator_index =
self.config
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
if !self.config.genesis().validators.contains(author) {
return Err(Error::NonValidatorSigner {
signer: author.clone(),
});
}

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Commit) < (self.view, self.phase) {
if (message.view.number, validator::Phase::Commit) < (self.view, self.phase) {
return Err(Error::Old {
current_view: self.view,
current_phase: self.phase,
});
}

// If the message is for a view when we are not a leader, we discard it.
if self.config.view_leader(message.view) != self.config.secret_key.public() {
if self
.config
.genesis()
.validators
.view_leader(message.view.number)
!= self.config.secret_key.public()
{
return Err(Error::NotLeaderInView);
}

// If we already have a message from the same validator and for the same view, we discard it.
if let Some(existing_message) = self
.commit_message_cache
.get(&message.view)
.get(&message.view.number)
.and_then(|x| x.get(author))
{
return Err(Error::DuplicateMessage {
existing_message: existing_message.msg,
existing_message: existing_message.msg.clone(),
});
}

Expand All @@ -104,60 +112,67 @@ impl StateMachine {
// Check the signature on the message.
signed_message.verify().map_err(Error::InvalidSignature)?;

message
.verify(self.config.genesis())
.map_err(Error::InvalidMessage)?;

// ----------- All checks finished. Now we process the message. --------------

// TODO: we have a bug here since we don't check whether replicas commit
// to the same proposal.

// We add the message to the incrementally-constructed QC.
self.commit_qcs
.entry(message.view)
.or_insert(CommitQC::new(message, &self.config.validator_set))
.add(&signed_message.sig, validator_index);
.entry(message.view.number)
.or_insert_with(|| CommitQC::new(message.clone(), self.config.genesis()))
.add(&signed_message, self.config.genesis());

// We store the message in our cache.
let cache_entry = self.commit_message_cache.entry(message.view).or_default();
cache_entry.insert(author.clone(), signed_message);
let cache_entry = self
.commit_message_cache
.entry(message.view.number)
.or_default();
cache_entry.insert(author.clone(), signed_message.clone());

// Now we check if we have enough messages to continue.
let mut by_proposal: HashMap<_, Vec<_>> = HashMap::new();
for msg in cache_entry.values() {
by_proposal.entry(msg.msg.proposal).or_default().push(msg);
}
let Some((_, replica_messages)) = by_proposal
.into_iter()
.find(|(_, v)| v.len() >= self.config.threshold())
let threshold = self.config.genesis().validators.threshold();
let Some((_, replica_messages)) =
by_proposal.into_iter().find(|(_, v)| v.len() >= threshold)
else {
return Ok(());
};
debug_assert_eq!(replica_messages.len(), self.config.threshold());
debug_assert_eq!(replica_messages.len(), threshold);

// ----------- Update the state machine --------------

let now = ctx.now();
metrics::METRICS
.leader_commit_phase_latency
.observe_latency(now - self.phase_start);
self.view = message.view.next();
self.view = message.view.number.next();
self.phase = validator::Phase::Prepare;
self.phase_start = now;

// ----------- Prepare our message and send it. --------------

// Remove replica commit messages for this view, so that we don't create a new leader commit
// for this same view if we receive another replica commit message after this.
self.commit_message_cache.remove(&message.view);
self.commit_message_cache.remove(&message.view.number);

// Consume the incrementally-constructed QC for this view.
let justification = self.commit_qcs.remove(&message.view).unwrap();
let justification = self.commit_qcs.remove(&message.view.number).unwrap();

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
message: self
.config
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderCommit(
validator::LeaderCommit {
protocol_version: crate::PROTOCOL_VERSION,
justification,
},
validator::LeaderCommit { justification },
)),
recipient: Target::Broadcast,
};
Expand Down
91 changes: 38 additions & 53 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Handler of a ReplicaPrepare message.
use super::StateMachine;
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap};
Expand Down Expand Up @@ -37,22 +38,12 @@ pub(crate) enum Error {
/// Existing message from the same replica.
existing_message: validator::ReplicaPrepare,
},
/// High QC of a future view.
#[error(
"high QC of a future view (high QC view: {high_qc_view:?}, current view: {current_view:?}"
)]
HighQCOfFutureView {
/// Received high QC view.
high_qc_view: validator::ViewNumber,
/// Current view.
current_view: validator::ViewNumber,
},
/// Invalid message signature.
#[error("invalid signature: {0:#}")]
InvalidSignature(#[source] validator::Error),
/// Invalid `HighQC` message.
#[error("invalid high QC: {0:#}")]
InvalidHighQC(#[source] anyhow::Error),
/// Invalid message.
#[error(transparent)]
InvalidMessage(validator::ReplicaPrepareVerifyError),
/// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable.
#[error(transparent)]
Internal(#[from] ctx::Error),
Expand Down Expand Up @@ -84,39 +75,43 @@ impl StateMachine {
let author = &signed_message.key;

// Check protocol version compatibility.
if !crate::PROTOCOL_VERSION.compatible(&message.protocol_version) {
if !crate::PROTOCOL_VERSION.compatible(&message.view.protocol_version) {
return Err(Error::IncompatibleProtocolVersion {
message_version: message.protocol_version,
message_version: message.view.protocol_version,
local_version: crate::PROTOCOL_VERSION,
});
}

// Check that the message signer is in the validator set.
let validator_index =
self.config
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
if !self.config.genesis().validators.contains(author) {
return Err(Error::NonValidatorSigner {
signer: author.clone(),
});
}

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Prepare) < (self.view, self.phase) {
if (message.view.number, validator::Phase::Prepare) < (self.view, self.phase) {
return Err(Error::Old {
current_view: self.view,
current_phase: self.phase,
});
}

// If the message is for a view when we are not a leader, we discard it.
if self.config.view_leader(message.view) != self.config.secret_key.public() {
if self
.config
.genesis()
.validators
.view_leader(message.view.number)
!= self.config.secret_key.public()
{
return Err(Error::NotLeaderInView);
}

// If we already have a message from the same validator and for the same view, we discard it.
if let Some(existing_message) = self
.prepare_message_cache
.get(&message.view)
.get(&message.view.number)
.and_then(|x| x.get(author))
{
return Err(Error::Exists {
Expand All @@ -129,60 +124,50 @@ impl StateMachine {
// Check the signature on the message.
signed_message.verify().map_err(Error::InvalidSignature)?;

// ----------- Checking the contents of the message --------------

// Verify the high QC.
// Verify the message.
message
.high_qc
.verify(&self.config.validator_set, self.config.threshold())
.map_err(Error::InvalidHighQC)?;

// If the high QC is for a future view, we discard the message.
// This check is not necessary for correctness, but it's useful to
// guarantee that our proposals don't contain QCs from the future.
if message.high_qc.message.view >= message.view {
return Err(Error::HighQCOfFutureView {
high_qc_view: message.high_qc.message.view,
current_view: message.view,
});
}
.verify(self.config.genesis())
.map_err(Error::InvalidMessage)?;

// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
self.prepare_qcs.entry(message.view).or_default().add(
&signed_message,
validator_index,
&self.config.validator_set,
);
self.prepare_qcs
.entry(message.view.number)
.or_insert_with(|| validator::PrepareQC::new(message.view.clone()))
.add(&signed_message, self.config.genesis());

// We store the message in our cache.
self.prepare_message_cache
.entry(message.view)
.entry(message.view.number)
.or_default()
.insert(author.clone(), signed_message);

// Now we check if we have enough messages to continue.
let num_messages = self.prepare_message_cache.get(&message.view).unwrap().len();
let num_messages = self
.prepare_message_cache
.get(&message.view.number)
.unwrap()
.len();

if num_messages < self.config.threshold() {
if num_messages < self.config.genesis().validators.threshold() {
return Ok(());
}

// Remove replica prepare messages for this view, so that we don't create a new block proposal
// for this same view if we receive another replica prepare message after this.
self.prepare_message_cache.remove(&message.view);
self.prepare_message_cache.remove(&message.view.number);

debug_assert_eq!(num_messages, self.config.threshold());
debug_assert_eq!(num_messages, self.config.genesis().validators.threshold());

// ----------- Update the state machine --------------

self.view = message.view;
self.view = message.view.number;
self.phase = validator::Phase::Commit;
self.phase_start = ctx.now();

// Consume the incrementally-constructed QC for this view.
let justification = self.prepare_qcs.remove(&message.view).unwrap();
let justification = self.prepare_qcs.remove(&message.view.number).unwrap();

self.prepare_qc.send_replace(Some(justification));
Ok(())
Expand Down
Loading

0 comments on commit 5b29f13

Please sign in to comment.