Skip to content

Commit

Permalink
Construct quorum certificate incrementally (BFT-386) (#51)
Browse files Browse the repository at this point in the history
Constructing quorum certificates from a batch of signed messages creates
redundant error checks which are hard to reason about.

Since this operation's failure is unacceptable and can only occur as a
result of a bug, it is preferred to construct incrementally upon
receiving each message.

However, this refactoring ended up with no error checks during the
incremental construction, delegating responsibility to the message
processing.

Consequently, this change can be viewed merely as code cleanup which
removes redundant error checks, and the incremental steps as a safer way
to do that.

### Notes
* Batch-mode functions throughout the stack are no longer used in
non-test code, and were moved to `testonly`. They can be removed
altogether if tests will construct incrementally as well.
    * `PrepareQC::from`
* `CommitQC::from` (also seemed to had a duplicated error check which
was removed)
    * `validator::AggregateSignature::aggregate`
    * `bn254::AggregateSignature::aggregate`
  • Loading branch information
moshababo authored Jan 4, 2024
1 parent 84cdd9e commit cf8340d
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 132 deletions.
33 changes: 20 additions & 13 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use tracing::instrument;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, ProtocolVersion};
use zksync_consensus_roles::validator::{self, CommitQC, ProtocolVersion};

/// Errors that can occur when processing a "replica commit" message.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -67,12 +67,13 @@ impl StateMachine {
}

// Check that the message signer is in the validator set.
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
let validator_index =
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Commit) < (self.view, self.phase) {
Expand Down Expand Up @@ -105,6 +106,12 @@ impl StateMachine {

// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
self.commit_qcs
.entry(message.view)
.or_insert(CommitQC::new(message, &self.inner.validator_set))
.add(&signed_message.sig, validator_index);

// We store the message in our cache.
let cache_entry = self.commit_message_cache.entry(message.view).or_default();
cache_entry.insert(author.clone(), signed_message);
Expand Down Expand Up @@ -134,12 +141,12 @@ impl StateMachine {

// ----------- Prepare our message and send it. --------------

// Create the justification for our message.
let justification = validator::CommitQC::from(
&replica_messages.into_iter().cloned().collect::<Vec<_>>(),
&self.inner.validator_set,
)
.expect("Couldn't create justification from valid replica messages!");
// Remove replica commit messages for this view, so that we don't create a new leader commit
// for this same view if we receive another replica commit message after this.
self.commit_message_cache.remove(&message.view);

// Consume the incrementally-constructed QC for this view.
let justification = self.commit_qcs.remove(&message.view).unwrap();

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
Expand Down
37 changes: 19 additions & 18 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ impl StateMachine {
}

// Check that the message signer is in the validator set.
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;
let validator_index =
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
signer: author.clone(),
})?;

// If the message is from the "past", we discard it.
if (message.view, validator::Phase::Prepare) < (self.view, self.phase) {
Expand Down Expand Up @@ -148,6 +149,13 @@ impl StateMachine {

// ----------- All checks finished. Now we process the message. --------------

// We add the message to the incrementally-constructed QC.
self.prepare_qcs.entry(message.view).or_default().add(
&signed_message,
validator_index,
&self.inner.validator_set,
);

// We store the message in our cache.
self.prepare_message_cache
.entry(message.view)
Expand All @@ -161,15 +169,9 @@ impl StateMachine {
return Ok(());
}

// Get all the replica prepare messages for this view. Note that we consume the
// messages here. That's purposeful, so that we don't create a new block proposal
// Remove replica prepare messages for this view, so that we don't create a new block proposal
// for this same view if we receive another replica prepare message after this.
let replica_messages: Vec<_> = self
.prepare_message_cache
.remove(&message.view)
.unwrap()
.into_values()
.collect();
self.prepare_message_cache.remove(&message.view);

debug_assert_eq!(num_messages, self.inner.threshold());

Expand All @@ -179,10 +181,9 @@ impl StateMachine {
self.phase = validator::Phase::Commit;
self.phase_start = ctx.now();

// Create the justification for our message.
let justification =
validator::PrepareQC::from(&replica_messages, &self.inner.validator_set)
.expect("Couldn't create justification from valid replica messages!");
// Consume the incrementally-constructed QC for this view.
let justification = self.prepare_qcs.remove(&message.view).unwrap();

self.prepare_qc.send_replace(Some(justification));
Ok(())
}
Expand Down
18 changes: 12 additions & 6 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator;
use zksync_consensus_roles::validator::{self, CommitQC, PrepareQC};

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
Expand All @@ -28,13 +28,17 @@ pub(crate) struct StateMachine {
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaPrepare>>,
>,
/// Prepare QCs indexed by view number.
pub(crate) prepare_qcs: BTreeMap<validator::ViewNumber, PrepareQC>,
/// Newest prepare QC composed from the `ReplicaPrepare` messages.
pub(crate) prepare_qc: sync::watch::Sender<Option<PrepareQC>>,
/// A cache of replica commit messages indexed by view number and validator.
pub(crate) commit_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaCommit>>,
>,
/// Newest quorum certificate composed from the `ReplicaPrepare` messages.
pub(crate) prepare_qc: sync::watch::Sender<Option<validator::PrepareQC>>,
/// Commit QCs indexed by view number.
pub(crate) commit_qcs: BTreeMap<validator::ViewNumber, CommitQC>,
}

impl StateMachine {
Expand All @@ -47,8 +51,10 @@ impl StateMachine {
phase: validator::Phase::Prepare,
phase_start: ctx.now(),
prepare_message_cache: BTreeMap::new(),
prepare_qcs: BTreeMap::new(),
commit_message_cache: BTreeMap::new(),
prepare_qc: sync::watch::channel(None).0,
commit_qcs: BTreeMap::new(),
}
}

Expand Down Expand Up @@ -102,7 +108,7 @@ impl StateMachine {
ctx: &ctx::Ctx,
inner: &ConsensusInner,
payload_source: &dyn PayloadSource,
mut prepare_qc: sync::watch::Receiver<Option<validator::PrepareQC>>,
mut prepare_qc: sync::watch::Receiver<Option<PrepareQC>>,
) -> ctx::Result<()> {
let mut next_view = validator::ViewNumber(0);
loop {
Expand All @@ -123,7 +129,7 @@ impl StateMachine {
ctx: &ctx::Ctx,
inner: &ConsensusInner,
payload_source: &dyn PayloadSource,
justification: validator::PrepareQC,
justification: PrepareQC,
) -> ctx::Result<()> {
// Get the highest block voted for and check if there's a quorum of votes for it. To have a quorum
// in this situation, we require 2*f+1 votes, where f is the maximum number of faulty replicas.
Expand All @@ -139,7 +145,7 @@ impl StateMachine {
.cloned();

// Get the highest CommitQC.
let highest_qc: &validator::CommitQC = justification
let highest_qc: &CommitQC = justification
.map
.keys()
.map(|s| &s.high_qc)
Expand Down
17 changes: 9 additions & 8 deletions node/libs/crypto/src/bn254/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,16 @@ impl Ord for Signature {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AggregateSignature(G1);

impl AggregateSignature {
/// Generates an aggregate signature from a list of signatures.
pub fn aggregate<'a>(sigs: impl IntoIterator<Item = &'a Signature>) -> Self {
let mut agg = G1Affine::zero().into_projective();
for sig in sigs {
agg.add_assign(&sig.0)
}
impl Default for AggregateSignature {
fn default() -> Self {
Self(G1Affine::zero().into_projective())
}
}

AggregateSignature(agg)
impl AggregateSignature {
/// Add a signature to the aggregation.
pub fn add(&mut self, sig: &Signature) {
self.0.add_assign(&sig.0)
}

/// This function is intentionally non-generic and disallow inlining to ensure that compilation optimizations can be effectively applied.
Expand Down
11 changes: 11 additions & 0 deletions node/libs/crypto/src/bn254/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,14 @@ impl Distribution<AggregateSignature> for Standard {
AggregateSignature(p)
}
}

impl AggregateSignature {
/// Generate a new aggregate signature from a list of signatures.
pub fn aggregate<'a>(sigs: impl IntoIterator<Item = &'a Signature>) -> Self {
let mut agg = Self::default();
for sig in sigs {
agg.add(sig);
}
agg
}
}
10 changes: 4 additions & 6 deletions node/libs/roles/src/validator/keys/aggregate_signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,13 @@ use zksync_consensus_crypto::{bn254, ByteFmt, Text, TextFmt};
use zksync_consensus_utils::enum_util::Variant;

/// An aggregate signature from a validator.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Default)]
pub struct AggregateSignature(pub(crate) bn254::AggregateSignature);

impl AggregateSignature {
/// Generate a new aggregate signature from a list of signatures.
pub fn aggregate<'a>(sigs: impl IntoIterator<Item = &'a Signature>) -> Self {
Self(bn254::AggregateSignature::aggregate(
sigs.into_iter().map(|sig| &sig.0).collect::<Vec<_>>(),
))
/// Add a signature to the aggregation.
pub fn add(&mut self, sig: &Signature) {
self.0.add(&sig.0)
}

/// Verify a list of messages against a list of public keys.
Expand Down
114 changes: 33 additions & 81 deletions node/libs/roles/src/validator/messages/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! Messages related to the consensus protocol.

use super::{BlockHeader, Msg, Payload, Signed};
use crate::validator;
use anyhow::{bail, Context};
use crate::{validator, validator::Signature};
use anyhow::bail;
use bit_vec::BitVec;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::collections::{BTreeMap, BTreeSet};
use zksync_consensus_utils::enum_util::{BadVariantError, Variant};

/// Version of the consensus algorithm that the validator is using.
Expand Down Expand Up @@ -172,7 +172,7 @@ pub struct LeaderCommit {
/// A quorum certificate of replica Prepare messages. Since not all Prepare messages are
/// identical (they have different high blocks and high QCs), we need to keep the high blocks
/// and high QCs in a map. We can still aggregate the signatures though.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct PrepareQC {
/// Map from replica Prepare messages to the validators that signed them.
pub map: BTreeMap<ReplicaPrepare, Signers>,
Expand All @@ -190,46 +190,23 @@ impl PrepareQC {
.unwrap_or(ViewNumber(0))
}

/// Creates a new PrepareQC from a list of *signed* replica Prepare messages and the current validator set.
pub fn from(
signed_messages: &[Signed<ReplicaPrepare>],
validators: &ValidatorSet,
) -> anyhow::Result<Self> {
// Get the view number from the messages, they must all be equal.
let view = signed_messages
.get(0)
.context("Empty signed messages vector")?
.msg
.view;

// Create the messages map.
let mut map: BTreeMap<ReplicaPrepare, Signers> = BTreeMap::new();

for signed_message in signed_messages {
if signed_message.msg.view != view {
bail!("Signed messages aren't all for the same view.");
}

// Get index of the validator in the validator set.
let index = validators
.index(&signed_message.key)
.context("Message signer isn't in the validator set")?;

if map.contains_key(&signed_message.msg) {
map.get_mut(&signed_message.msg).unwrap().0.set(index, true);
} else {
let mut bit_vec = BitVec::from_elem(validators.len(), false);
bit_vec.set(index, true);

map.insert(signed_message.msg.clone(), Signers(bit_vec));
}
}

// Aggregate the signatures.
let signature =
validator::AggregateSignature::aggregate(signed_messages.iter().map(|v| &v.sig));
/// Add a validator's signed message.
/// * `signed_message` - A valid signed `ReplicaPrepare` message.
/// * `validator_index` - The signer index in the validator set.
/// * `validator_set` - The validator set.
pub fn add(
&mut self,
signed_message: &Signed<ReplicaPrepare>,
validator_index: usize,
validator_set: &ValidatorSet,
) {
self.map
.entry(signed_message.msg.clone())
.or_insert_with(|| Signers(BitVec::from_elem(validator_set.len(), false)))
.0
.set(validator_index, true);

Ok(Self { map, signature })
self.signature.add(&signed_message.sig);
}

/// Verifies the integrity of the PrepareQC.
Expand Down Expand Up @@ -308,46 +285,21 @@ pub struct CommitQC {
}

impl CommitQC {
/// Creates a new CommitQC from a list of *signed* replica Commit messages and the current validator set.
pub fn from(
signed_messages: &[Signed<ReplicaCommit>],
validators: &ValidatorSet,
) -> anyhow::Result<Self> {
// Store the signed messages in a Hashmap.
let message = signed_messages[0].msg;

for signed_message in signed_messages {
// Check that the votes are all for the same message.
if signed_message.msg != message {
bail!("CommitQC can only be created from votes for the same message.");
}
/// Create a new empty instance for a given `ReplicaCommit` message and a validator set size.
pub fn new(message: ReplicaCommit, validator_set: &ValidatorSet) -> Self {
Self {
message,
signers: Signers(BitVec::from_elem(validator_set.len(), false)),
signature: validator::AggregateSignature::default(),
}
}

// Store the signed messages in a Hashmap.
let msg_map: HashMap<_, _> = signed_messages
.iter()
.map(|signed_message| {
// Check that the votes are all for the same message.
if signed_message.msg != message {
bail!("QuorumCertificate can only be created from votes for the same message.");
}
Ok((&signed_message.key, &signed_message.sig))
})
.collect::<anyhow::Result<_>>()?;

// Create the signers bit map.
let bit_vec = validators
.iter()
.map(|validator| msg_map.contains_key(validator))
.collect();

// Aggregate the signatures.
let signature = validator::AggregateSignature::aggregate(msg_map.values().copied());
Ok(Self {
message,
signers: Signers(bit_vec),
signature,
})
/// Add a validator's signature.
/// * `sig` - A valid signature.
/// * `validator_index` - The signer index in the validator set.
pub fn add(&mut self, sig: &Signature, validator_index: usize) {
self.signers.0.set(validator_index, true);
self.signature.add(sig);
}

/// Verifies the signature of the CommitQC.
Expand Down
Loading

0 comments on commit cf8340d

Please sign in to comment.