Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Construct quorum certificate incrementally (BFT-386) #51

Merged
merged 11 commits into from
Jan 4, 2024
39 changes: 19 additions & 20 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{inner::ConsensusInner, metrics, Consensus};
use tracing::instrument;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, ProtocolVersion};
use zksync_consensus_roles::validator::{self, CommitQCBuilder, ProtocolVersion};

/// Errors that can occur when processing a "replica commit" message.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -81,12 +81,13 @@ impl StateMachine {
}

// Check that the message signer is in the validator set.
consensus
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
let validator_index =
consensus
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Commit) < (self.view, self.phase) {
Expand Down Expand Up @@ -127,6 +128,12 @@ impl StateMachine {

// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
self.commit_qc
.entry(message.view)
.or_insert(CommitQCBuilder::new(message, consensus.validator_set.len()))
.add(&signed_message.sig, validator_index);

// We store the message in our cache.
self.commit_message_cache
.entry(message.view)
Expand Down Expand Up @@ -157,20 +164,12 @@ impl StateMachine {

// ----------- Prepare our message and send it. --------------

// Get all the replica commit messages for this view. Note that we consume the
// messages here. That's purposeful, so that we don't create a new leader commit
// Remove replica commit messages for this view, so that we don't create a new leader commit
// for this same view if we receive another replica commit message after this.
let replica_messages = self
.commit_message_cache
.remove(&message.view)
.unwrap()
.values()
.cloned()
.collect::<Vec<_>>();

// Create the justification for our message.
let justification = validator::CommitQC::from(&replica_messages, &consensus.validator_set)
.expect("Couldn't create justification from valid replica messages!");
self.commit_message_cache.remove(&message.view);

// Consume the incrementally-constructed QC for this view.
let justification = self.commit_qc.remove(&message.view).unwrap().take();

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
Expand Down
26 changes: 16 additions & 10 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, ProtocolVersion};
use zksync_consensus_roles::validator::{self, PrepareQCBuilder, ProtocolVersion};

/// Errors that can occur when processing a "replica prepare" message.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -107,12 +107,13 @@ impl StateMachine {
}

// Check that the message signer is in the validator set.
consensus
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
let validator_index =
consensus
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Prepare) < (self.view, self.phase) {
Expand Down Expand Up @@ -163,6 +164,12 @@ impl StateMachine {

// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
self.prepare_qc
.entry(message.view)
.or_insert(PrepareQCBuilder::new(consensus.validator_set.len()))
.add(&signed_message, validator_index);

// We store the message in our cache.
self.prepare_message_cache
.entry(message.view)
Expand Down Expand Up @@ -247,9 +254,8 @@ impl StateMachine {

// ----------- Prepare our message and send it --------------

// Create the justification for our message.
let justification = validator::PrepareQC::from(&replica_messages, &consensus.validator_set)
.expect("Couldn't create justification from valid replica messages!");
// Consume the incrementally-constructed QC for this view.
let justification = self.prepare_qc.remove(&message.view).unwrap().take();

// Broadcast the leader prepare message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
Expand Down
11 changes: 10 additions & 1 deletion node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::{
};
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, time};
use zksync_consensus_roles::validator;
use zksync_consensus_roles::{
validator,
validator::{CommitQCBuilder, PrepareQCBuilder},
};

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
Expand All @@ -29,11 +32,15 @@ pub(crate) struct StateMachine {
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaPrepare>>,
>,
/// Prepare QC builders indexed by view number.
pub(crate) prepare_qc: BTreeMap<validator::ViewNumber, PrepareQCBuilder>,
/// A cache of replica commit messages indexed by view number and validator.
pub(crate) commit_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaCommit>>,
>,
/// Commit QC builders indexed by view number.
pub(crate) commit_qc: BTreeMap<validator::ViewNumber, CommitQCBuilder>,
}
brunoffranca marked this conversation as resolved.
Show resolved Hide resolved

impl StateMachine {
Expand All @@ -47,7 +54,9 @@ impl StateMachine {
phase_start: ctx.now(),
block_proposal_cache: None,
prepare_message_cache: BTreeMap::new(),
prepare_qc: BTreeMap::new(),
commit_message_cache: BTreeMap::new(),
commit_qc: BTreeMap::new(),
}
}

Expand Down
17 changes: 9 additions & 8 deletions node/libs/crypto/src/bn254/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,16 @@ impl Ord for Signature {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AggregateSignature(G1);

impl AggregateSignature {
/// Generates an aggregate signature from a list of signatures.
pub fn aggregate<'a>(sigs: impl IntoIterator<Item = &'a Signature>) -> Self {
let mut agg = G1Affine::zero().into_projective();
for sig in sigs {
agg.add_assign(&sig.0)
}
impl Default for AggregateSignature {
fn default() -> Self {
Self(G1Affine::zero().into_projective())
}
}

AggregateSignature(agg)
impl AggregateSignature {
// Add a signature to the aggregation.
pub fn add(&mut self, sig: &Signature) {
self.0.add_assign(&sig.0)
}

/// This function is intentionally non-generic and disallow inlining to ensure that compilation optimizations can be effectively applied.
Expand Down
11 changes: 11 additions & 0 deletions node/libs/crypto/src/bn254/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,14 @@ impl Distribution<AggregateSignature> for Standard {
AggregateSignature(p)
}
}

impl AggregateSignature {
/// Generate a new aggregate signature from a list of signatures.
pub fn aggregate<'a>(sigs: impl IntoIterator<Item = &'a Signature>) -> Self {
let mut agg = Self::default();
for sig in sigs {
agg.add(sig);
}
agg
}
}
10 changes: 4 additions & 6 deletions node/libs/roles/src/validator/keys/aggregate_signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ use zksync_consensus_crypto::{bn254, ByteFmt, Text, TextFmt};
use zksync_consensus_utils::enum_util::Variant;

/// An aggregate signature from a validator.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Default)]
pub struct AggregateSignature(pub(crate) bn254::AggregateSignature);

impl AggregateSignature {
/// Generate a new aggregate signature from a list of signatures.
pub fn aggregate<'a>(sigs: impl IntoIterator<Item = &'a Signature>) -> Self {
Self(bn254::AggregateSignature::aggregate(
sigs.into_iter().map(|sig| &sig.0).collect::<Vec<_>>(),
))
// Add a signature to the aggregation.
pub fn add(&mut self, sig: &Signature) {
self.0.add(&sig.0)
}

/// Verify a list of messages against a list of public keys.
Expand Down
Loading
Loading