Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Dec 13, 2023
1 parent b325cbe commit 72590fe
Show file tree
Hide file tree
Showing 17 changed files with 207 additions and 221 deletions.
7 changes: 2 additions & 5 deletions node/actors/bft/src/inner.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
//! The inner data of the consensus state machine. This is shared between the different roles.

use crate::{
io::{OutputMessage},
misc,
};
use zksync_concurrency::{ctx::channel};
use crate::{io::OutputMessage, misc};
use tracing::instrument;
use zksync_concurrency::ctx::channel;
use zksync_consensus_roles::validator;

/// The ConsensusInner struct, it contains data to be shared with the state machines. This is never supposed
Expand Down
42 changes: 19 additions & 23 deletions node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::StateMachine;
use crate::{metrics};
use crate::metrics;
use std::collections::HashMap;
use tracing::instrument;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, ProtocolVersion};
use std::collections::HashMap;

/// Errors that can occur when processing a "replica commit" message.
#[derive(Debug, thiserror::Error)]
Expand All @@ -23,9 +23,6 @@ pub(crate) enum Error {
/// Signer of the message.
signer: validator::PublicKey,
},
/// Unexpected proposal.
//#[error("unexpected proposal")]
//UnexpectedProposal,
/// Past view or phase.
#[error("past view/phase (current view: {current_view:?}, current phase: {current_phase:?})")]
Old {
Expand All @@ -43,17 +40,6 @@ pub(crate) enum Error {
/// Existing message from the same replica.
existing_message: validator::ReplicaCommit,
},
/// Number of received messages is below threshold.
/*#[error(
"number of received messages is below threshold. waiting for more (received: {num_messages:?}, \
threshold: {threshold:?}"
)]
NumReceivedBelowThreshold {
/// Number of received messages.
num_messages: usize,
/// Threshold for message count.
threshold: usize,
},*/
/// Invalid message signature.
#[error("invalid signature: {0:#}")]
InvalidSignature(#[source] validator::Error),
Expand Down Expand Up @@ -81,7 +67,7 @@ impl StateMachine {
}

// Check that the message signer is in the validator set.
self.inner
self.inner
.validator_set
.index(author)
.ok_or(Error::NonValidatorSigner {
Expand Down Expand Up @@ -127,10 +113,18 @@ impl StateMachine {

// Now we check if we have enough messages to continue.
let mut by_proposal: HashMap<_, Vec<_>> = HashMap::new();
for msg in self.commit_message_cache.get(&message.view).unwrap().values() {
by_proposal.entry(msg.msg.proposal.clone()).or_default().push(msg);
for msg in self
.commit_message_cache
.get(&message.view)
.unwrap()
.values()
{
by_proposal.entry(msg.msg.proposal).or_default().push(msg);
}
let Some((_,replica_messages)) = by_proposal.into_iter().find(|(_,v)|v.len()>=self.inner.threshold()) else {
let Some((_, replica_messages)) = by_proposal
.into_iter()
.find(|(_, v)| v.len() >= self.inner.threshold())
else {
return Ok(());
};
debug_assert!(replica_messages.len() == self.inner.threshold());
Expand All @@ -150,12 +144,14 @@ impl StateMachine {
// Create the justification for our message.
let justification = validator::CommitQC::from(
&replica_messages.into_iter().cloned().collect::<Vec<_>>()[..],
&self.inner.validator_set
).expect("Couldn't create justification from valid replica messages!");
&self.inner.validator_set,
)
.expect("Couldn't create justification from valid replica messages!");

// Broadcast the leader commit message to all replicas (ourselves included).
let output_message = ConsensusInputMessage {
message: self.inner
message: self
.inner
.secret_key
.sign_msg(validator::ConsensusMsg::LeaderCommit(
validator::LeaderCommit {
Expand Down
23 changes: 5 additions & 18 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ pub(crate) enum Error {
/// Existing message from the same replica.
existing_message: validator::ReplicaPrepare,
},
/// Number of received messages is below a threshold.
#[error(
"number of received messages below threshold. waiting for more (received: {num_messages:?}, \
threshold: {threshold:?}"
)]
NumReceivedBelowThreshold {
/// Number of received messages.
num_messages: usize,
/// Threshold for message count.
threshold: usize,
},
/// High QC of a future view.
#[error(
"high QC of a future view (high QC view: {high_qc_view:?}, current view: {current_view:?}"
Expand Down Expand Up @@ -169,10 +158,7 @@ impl StateMachine {
let num_messages = self.prepare_message_cache.get(&message.view).unwrap().len();

if num_messages < self.inner.threshold() {
return Err(Error::NumReceivedBelowThreshold {
num_messages,
threshold: self.inner.threshold(),
});
return Ok(());
}

// Get all the replica prepare messages for this view. Note that we consume the
Expand All @@ -186,16 +172,17 @@ impl StateMachine {
.collect();

debug_assert!(num_messages == self.inner.threshold());

// ----------- Update the state machine --------------

self.view = message.view;
self.phase = validator::Phase::Commit;
self.phase_start = ctx.now();

// Create the justification for our message.
let justification = validator::PrepareQC::from(&replica_messages, &self.inner.validator_set)
.expect("Couldn't create justification from valid replica messages!");
let justification =
validator::PrepareQC::from(&replica_messages, &self.inner.validator_set)
.expect("Couldn't create justification from valid replica messages!");
self.prepare_qc.send_replace(Some(justification));
Ok(())
}
Expand Down
39 changes: 27 additions & 12 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ use std::{
unreachable,
};
use tracing::instrument;
use zksync_concurrency::{ctx, sync, error::Wrap as _, metrics::LatencyHistogramExt as _, time};
use zksync_consensus_roles::validator;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator;

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
/// those messages. When participating in consensus we are not the leader most of the time.
pub(crate) struct StateMachine {
/// Consensus configuration and output channel.
pub(crate) inner: Arc<ConsensusInner>,
/// The current view number. This might not match the replica's view number, we only have this here
/// to make the leader advance monotonically in time and stop it from accepting messages from the past.
Expand Down Expand Up @@ -93,6 +94,10 @@ impl StateMachine {
Ok(())
}

/// In a loop, receives a PrepareQC and sends a LeaderPrepare containing it.
/// Every subsequent PrepareQC has to be for a higher view than the previous one (otherwise it
/// is skipped). In case payload generation takes too long, some PrepareQC may be elided, so
/// that the validator doesn't spend time on generating payloads for already expired views.
pub(crate) async fn run_proposer(
ctx: &ctx::Ctx,
inner: &ConsensusInner,
Expand All @@ -101,14 +106,19 @@ impl StateMachine {
) -> ctx::Result<()> {
let mut next_view = validator::ViewNumber(0);
loop {
sync::changed(ctx, &mut prepare_qc).await?;
let Some(prepare_qc) = prepare_qc.borrow_and_update().clone() else { continue };
if next_view < prepare_qc.view() { continue };
let Some(prepare_qc) = sync::changed(ctx, &mut prepare_qc).await?.clone() else {
continue;
};
if prepare_qc.view() < next_view {
continue;
};
next_view = prepare_qc.view();
Self::propose(ctx,inner,payload_source, prepare_qc).await?;
Self::propose(ctx, inner, payload_source, prepare_qc).await?;
}
}

/// Sends a LeaderPrepare for the given PrepareQC.
/// Uses `payload_source` to generate a payload if needed.
pub(crate) async fn propose(
ctx: &ctx::Ctx,
inner: &ConsensusInner,
Expand All @@ -119,7 +129,7 @@ impl StateMachine {
// in this situation, we require 2*f+1 votes, where f is the maximum number of faulty replicas.
let mut count: HashMap<_, usize> = HashMap::new();

for (vote,signers) in justification.map.iter() {
for (vote, signers) in justification.map.iter() {
*count.entry(vote.high_vote.proposal).or_default() += signers.len();
}

Expand All @@ -131,7 +141,9 @@ impl StateMachine {
.cloned();

// Get the highest CommitQC.
let highest_qc: &validator::CommitQC = justification.map.keys()
let highest_qc: &validator::CommitQC = justification
.map
.keys()
.map(|s| &s.high_qc)
.max_by_key(|qc| qc.message.view)
.unwrap();
Expand Down Expand Up @@ -172,10 +184,13 @@ impl StateMachine {
justification,
},
));
inner.pipe.send(ConsensusInputMessage {
message: msg,
recipient: Target::Broadcast,
}.into());
inner.pipe.send(
ConsensusInputMessage {
message: msg,
recipient: Target::Broadcast,
}
.into(),
);
Ok(())
}
}
41 changes: 20 additions & 21 deletions node/actors/bft/src/leader/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ async fn replica_prepare_already_exists() {
assert!(util
.process_replica_prepare(ctx, replica_prepare.clone())
.await
.is_err());
.unwrap()
.is_none());
let res = util
.process_replica_prepare(ctx, replica_prepare.clone())
.await;
Expand All @@ -192,14 +193,11 @@ async fn replica_prepare_num_received_below_threshold() {

util.set_owner_as_view_leader();
let replica_prepare = util.new_replica_prepare(|_| {});
let res = util.process_replica_prepare(ctx, replica_prepare).await;
assert_matches!(
res,
Err(ReplicaPrepareError::NumReceivedBelowThreshold {
num_messages: 1,
threshold: 2
})
);
assert!(util
.process_replica_prepare(ctx, replica_prepare)
.await
.unwrap()
.is_none());
}

#[tokio::test]
Expand Down Expand Up @@ -304,7 +302,9 @@ async fn replica_commit_incompatible_protocol_version() {
let incompatible_protocol_version = util.incompatible_protocol_version();
let mut replica_commit = util.new_replica_commit(ctx).await.msg;
replica_commit.protocol_version = incompatible_protocol_version;
let res = util.process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit)).await;
let res = util
.process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit))
.await;
assert_matches!(
res,
Err(ReplicaCommitError::IncompatibleProtocolVersion { message_version, local_version }) => {
Expand Down Expand Up @@ -368,16 +368,12 @@ async fn replica_commit_already_exists() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
let mut util = UTHarness::new(ctx, 2).await;

let view = ViewNumber(2);
util.set_replica_view(view);
util.set_leader_view(view);
assert_eq!(util.view_leader(view), util.owner_key().public());
let replica_commit = util.new_replica_commit(ctx).await;
assert!(util
.process_replica_commit(ctx, replica_commit.clone())
.await
.is_err());
.unwrap()
.is_none());
let res = util
.process_replica_commit(ctx, replica_commit.clone())
.await;
Expand All @@ -399,7 +395,8 @@ async fn replica_commit_num_received_below_threshold() {
assert!(util
.process_replica_prepare(ctx, replica_prepare.clone())
.await
.is_err());
.unwrap()
.is_none());
let replica_prepare = util.keys[1].sign_msg(replica_prepare.msg);
let leader_prepare = util
.process_replica_prepare(ctx, replica_prepare)
Expand All @@ -410,9 +407,9 @@ async fn replica_commit_num_received_below_threshold() {
.process_leader_prepare(ctx, leader_prepare)
.await
.unwrap();
util
.process_replica_commit(ctx, replica_commit.clone())
.await.unwrap();
util.process_replica_commit(ctx, replica_commit.clone())
.await
.unwrap();
}

#[tokio::test]
Expand All @@ -432,5 +429,7 @@ async fn replica_commit_unexpected_proposal() {
let ctx = &ctx::test_root(&ctx::RealClock);
let mut util = UTHarness::new(ctx, 1).await;
let replica_commit = util.new_current_replica_commit(|_| {});
util.process_replica_commit(ctx, replica_commit).await.unwrap();
util.process_replica_commit(ctx, replica_commit)
.await
.unwrap();
}
20 changes: 11 additions & 9 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use crate::io::{InputMessage, OutputMessage};
use inner::ConsensusInner;
use std::sync::Arc;
use zksync_concurrency::{scope,ctx};
use zksync_concurrency::{ctx, scope};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::ReplicaStore;
use zksync_consensus_utils::pipe::ActorPipe;
Expand Down Expand Up @@ -55,23 +55,24 @@ pub async fn run(
validator_set: validator::ValidatorSet,
storage: ReplicaStore,
payload_source: &dyn PayloadSource,

) -> anyhow::Result<()> {
let inner = Arc::new(ConsensusInner {
pipe: pipe.send,
secret_key,
validator_set,
});
let res = scope::run!(ctx,|ctx,s| async {
let res = scope::run!(ctx, |ctx, s| async {
let mut replica = replica::StateMachine::start(ctx, inner.clone(), storage).await?;
let mut leader = leader::StateMachine::new(ctx, inner.clone());

s.spawn_bg(leader::StateMachine::run_proposer(ctx, &*inner, payload_source, leader.prepare_qc.subscribe()));
s.spawn_bg(leader::StateMachine::run_proposer(
ctx,
&inner,
payload_source,
leader.prepare_qc.subscribe(),
));

tracing::info!(
"Starting consensus actor {:?}",
inner.secret_key.public()
);
tracing::info!("Starting consensus actor {:?}", inner.secret_key.public());

// This is the infinite loop where the consensus actually runs. The validator waits for either
// a message from the network or for a timeout, and processes each accordingly.
Expand Down Expand Up @@ -107,7 +108,8 @@ pub async fn run(
let _ = req.ack.send(());
res?;
}
}).await;
})
.await;
match res {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
Expand Down
4 changes: 1 addition & 3 deletions node/actors/bft/src/replica/leader_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,7 @@ impl StateMachine {

// Start a new view. But first we skip to the view of this message.
self.view = view;
self.start_new_view(ctx)
.await
.wrap("start_new_view()")?;
self.start_new_view(ctx).await.wrap("start_new_view()")?;

Ok(())
}
Expand Down
Loading

0 comments on commit 72590fe

Please sign in to comment.