diff --git a/node/Cargo.lock b/node/Cargo.lock index db4a2653..2c487e91 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -2597,7 +2597,9 @@ version = "0.1.0" dependencies = [ "anyhow", "assert_matches", + "async-trait", "once_cell", + "pretty_assertions", "rand 0.8.5", "thiserror", "tokio", diff --git a/node/actors/bft/Cargo.toml b/node/actors/bft/Cargo.toml index 9bbb5b2d..06634d72 100644 --- a/node/actors/bft/Cargo.toml +++ b/node/actors/bft/Cargo.toml @@ -16,6 +16,7 @@ zksync_consensus_utils.workspace = true zksync_protobuf.workspace = true anyhow.workspace = true +async-trait.workspace = true once_cell.workspace = true rand.workspace = true thiserror.workspace = true @@ -25,6 +26,7 @@ vise.workspace = true [dev-dependencies] tokio.workspace = true assert_matches.workspace = true +pretty_assertions.workspace = true [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/node/actors/bft/src/leader/replica_commit.rs b/node/actors/bft/src/leader/replica_commit.rs index c6263409..f82e598a 100644 --- a/node/actors/bft/src/leader/replica_commit.rs +++ b/node/actors/bft/src/leader/replica_commit.rs @@ -59,7 +59,7 @@ pub(crate) enum Error { } impl StateMachine { - #[instrument(level = "trace", ret)] + #[instrument(level = "trace", skip(self), ret)] pub(crate) fn process_replica_commit( &mut self, ctx: &ctx::Ctx, diff --git a/node/actors/bft/src/leader/replica_prepare.rs b/node/actors/bft/src/leader/replica_prepare.rs index 2d06f7ab..9982aee6 100644 --- a/node/actors/bft/src/leader/replica_prepare.rs +++ b/node/actors/bft/src/leader/replica_prepare.rs @@ -1,9 +1,8 @@ use super::StateMachine; use crate::{inner::ConsensusInner, metrics}; -use rand::Rng; use std::collections::HashMap; use tracing::instrument; -use zksync_concurrency::ctx; +use zksync_concurrency::{ctx, error::Wrap}; use zksync_consensus_network::io::{ConsensusInputMessage, Target}; use zksync_consensus_roles::validator::{self, ProtocolVersion}; @@ -68,11 +67,26 @@ pub(crate) enum Error { /// Invalid `HighQC` message. #[error("invalid high QC: {0:#}")] InvalidHighQC(#[source] anyhow::Error), + /// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable. + #[error(transparent)] + Internal(#[from] ctx::Error), +} + +impl Wrap for Error { + fn with_wrap C>( + self, + f: F, + ) -> Self { + match self { + Error::Internal(err) => Error::Internal(err.with_wrap(f)), + err => err, + } + } } impl StateMachine { - #[instrument(level = "trace", ret)] - pub(crate) fn process_replica_prepare( + #[instrument(level = "trace", skip(self), ret)] + pub(crate) async fn process_replica_prepare( &mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner, @@ -214,11 +228,10 @@ impl StateMachine { Some(proposal) if proposal != highest_qc.message.proposal => (proposal, None), // The previous block was finalized, so we can propose a new block. _ => { - // TODO(bruno): For now we just create a block with a random payload. After we integrate with - // the execution layer we should have a call here to the mempool to get a real payload. - let mut payload = validator::Payload(vec![0; ConsensusInner::PAYLOAD_MAX_SIZE]); - ctx.rng().fill(&mut payload.0[..]); - + let payload = self + .payload_source + .propose(ctx, highest_qc.message.proposal.number.next()) + .await?; metrics::METRICS .leader_proposal_payload_size .observe(payload.0.len()); diff --git a/node/actors/bft/src/leader/state_machine.rs b/node/actors/bft/src/leader/state_machine.rs index 4380de83..39d8b8e9 100644 --- a/node/actors/bft/src/leader/state_machine.rs +++ b/node/actors/bft/src/leader/state_machine.rs @@ -1,17 +1,19 @@ -use crate::{metrics, ConsensusInner}; +use crate::{metrics, ConsensusInner, PayloadSource}; use std::{ collections::{BTreeMap, HashMap}, + sync::Arc, unreachable, }; use tracing::instrument; -use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _, time}; +use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, time}; use zksync_consensus_roles::validator; /// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store /// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for /// those messages. When participating in consensus we are not the leader most of the time. -#[derive(Debug)] pub(crate) struct StateMachine { + /// Payload provider for the new blocks. + pub(crate) payload_source: Arc, /// The current view number. This might not match the replica's view number, we only have this here /// to make the leader advance monotonically in time and stop it from accepting messages from the past. pub(crate) view: validator::ViewNumber, @@ -36,9 +38,10 @@ pub(crate) struct StateMachine { impl StateMachine { /// Creates a new StateMachine struct. - #[instrument(level = "trace", ret)] - pub fn new(ctx: &ctx::Ctx) -> Self { + #[instrument(level = "trace", skip(payload_source))] + pub fn new(ctx: &ctx::Ctx, payload_source: Arc) -> Self { StateMachine { + payload_source, view: validator::ViewNumber(0), phase: validator::Phase::Prepare, phase_start: ctx.now(), @@ -51,21 +54,30 @@ impl StateMachine { /// Process an input message (leaders don't time out waiting for a message). This is the /// main entry point for the state machine. We need read-access to the inner consensus struct. /// As a result, we can modify our state machine or send a message to the executor. - #[instrument(level = "trace", ret)] - pub(crate) fn process_input( + #[instrument(level = "trace", skip(self), ret)] + pub(crate) async fn process_input( &mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner, input: validator::Signed, - ) { + ) -> ctx::Result<()> { let now = ctx.now(); let label = match &input.msg { validator::ConsensusMsg::ReplicaPrepare(_) => { - let res = self + let res = match self .process_replica_prepare(ctx, consensus, input.cast().unwrap()) - .map_err(|err| { + .await + .wrap("process_replica_prepare()") + { + Ok(()) => Ok(()), + Err(super::replica_prepare::Error::Internal(err)) => { + return Err(err); + } + Err(err) => { tracing::warn!("process_replica_prepare: {err:#}"); - }); + Err(()) + } + }; metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res) } validator::ConsensusMsg::ReplicaCommit(_) => { @@ -79,5 +91,6 @@ impl StateMachine { _ => unreachable!(), }; metrics::METRICS.leader_processing_latency[&label].observe_latency(ctx.now() - now); + Ok(()) } } diff --git a/node/actors/bft/src/leader/tests.rs b/node/actors/bft/src/leader/tests.rs index f1a65be4..ca3386c3 100644 --- a/node/actors/bft/src/leader/tests.rs +++ b/node/actors/bft/src/leader/tests.rs @@ -3,109 +3,84 @@ use super::{ }; use crate::testonly::ut_harness::UTHarness; use assert_matches::assert_matches; +use pretty_assertions::assert_eq; use rand::Rng; -use zksync_consensus_roles::validator::{ - self, CommitQC, ConsensusMsg, LeaderCommit, LeaderPrepare, Phase, PrepareQC, ReplicaCommit, - ReplicaPrepare, ViewNumber, -}; +use zksync_concurrency::ctx; +use zksync_consensus_roles::validator::{self, LeaderCommit, Phase, ViewNumber}; #[tokio::test] async fn replica_prepare_sanity() { - let mut util = UTHarness::new_many().await; - - let replica_prepare = util.new_current_replica_prepare(|_| {}).cast().unwrap().msg; - util.dispatch_replica_prepare_many( - vec![replica_prepare; util.consensus_threshold()], - util.keys(), - ) - .unwrap(); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; + util.new_leader_prepare(ctx).await; } #[tokio::test] async fn replica_prepare_sanity_yield_leader_prepare() { - let mut util = UTHarness::new_one().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; - let replica_prepare = util.new_current_replica_prepare(|_| {}); - util.dispatch_replica_prepare_one(replica_prepare.clone()) - .unwrap(); + let replica_prepare = util.new_replica_prepare(|_| {}); let leader_prepare = util - .recv_signed() + .process_replica_prepare(ctx, replica_prepare.clone()) .await .unwrap() - .cast::() - .unwrap() - .msg; - - let replica_prepare = replica_prepare.cast::().unwrap().msg; - assert_matches!( - leader_prepare, - LeaderPrepare { - protocol_version, - view, - proposal, - proposal_payload: _, - justification, - } => { - assert_eq!(protocol_version, replica_prepare.protocol_version); - assert_eq!(view, replica_prepare.view); - assert_eq!(proposal.parent, replica_prepare.high_vote.proposal.hash()); - assert_eq!(justification, util.new_prepare_qc(|msg| *msg = replica_prepare)); - } + .unwrap(); + assert_eq!( + leader_prepare.msg.protocol_version, + replica_prepare.msg.protocol_version + ); + assert_eq!(leader_prepare.msg.view, replica_prepare.msg.view); + assert_eq!( + leader_prepare.msg.proposal.parent, + replica_prepare.msg.high_vote.proposal.hash() + ); + assert_eq!( + leader_prepare.msg.justification, + util.new_prepare_qc(|msg| *msg = replica_prepare.msg) ); } #[tokio::test] async fn replica_prepare_sanity_yield_leader_prepare_reproposal() { - let mut util = UTHarness::new_many().await; - - let replica_prepare: ReplicaPrepare = - util.new_unfinalized_replica_prepare().cast().unwrap().msg; - util.dispatch_replica_prepare_many( - vec![replica_prepare.clone(); util.consensus_threshold()], - util.keys(), - ) - .unwrap(); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; + util.new_replica_commit(ctx).await; + util.process_replica_timeout(ctx).await; + let replica_prepare = util.new_replica_prepare(|_| {}).msg; let leader_prepare = util - .recv_signed() - .await - .unwrap() - .cast::() - .unwrap() - .msg; + .process_replica_prepare_all(ctx, replica_prepare.clone()) + .await; - assert_matches!( - leader_prepare, - LeaderPrepare { - protocol_version, - view, - proposal, - proposal_payload, - justification, - } => { - assert_eq!(protocol_version, replica_prepare.protocol_version); - assert_eq!(view, replica_prepare.view); - assert_eq!(proposal, replica_prepare.high_vote.proposal); - assert_eq!(proposal_payload, None); - assert_matches!( - justification, - PrepareQC { map, .. } => { - assert_eq!(map.len(), 1); - assert_eq!(*map.first_key_value().unwrap().0, replica_prepare); - } - ); - } + assert_eq!( + leader_prepare.msg.protocol_version, + replica_prepare.protocol_version + ); + assert_eq!(leader_prepare.msg.view, replica_prepare.view); + assert_eq!( + leader_prepare.msg.proposal, + replica_prepare.high_vote.proposal ); + assert_eq!(leader_prepare.msg.proposal_payload, None); + let map = leader_prepare.msg.justification.map; + assert_eq!(map.len(), 1); + assert_eq!(*map.first_key_value().unwrap().0, replica_prepare); } #[tokio::test] async fn replica_prepare_incompatible_protocol_version() { - let mut util = UTHarness::new_one().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; let incompatible_protocol_version = util.incompatible_protocol_version(); - let replica_prepare = util.new_current_replica_prepare(|msg| { + let replica_prepare = util.new_replica_prepare(|msg| { msg.protocol_version = incompatible_protocol_version; }); - let res = util.dispatch_replica_prepare_one(replica_prepare); + let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!( res, Err(ReplicaPrepareError::IncompatibleProtocolVersion { message_version, local_version }) => { @@ -117,13 +92,15 @@ async fn replica_prepare_incompatible_protocol_version() { #[tokio::test] async fn replica_prepare_non_validator_signer() { - let mut util = UTHarness::new_one().await; - - let replica_prepare = util.new_current_replica_prepare(|_| {}).cast().unwrap().msg; - let non_validator_key: validator::SecretKey = util.rng().gen(); - let signed = non_validator_key.sign_msg(ConsensusMsg::ReplicaPrepare(replica_prepare)); - - let res = util.dispatch_replica_prepare_one(signed); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + + let replica_prepare = util.new_replica_prepare(|_| {}).msg; + let non_validator_key: validator::SecretKey = ctx.rng().gen(); + let res = util + .process_replica_prepare(ctx, non_validator_key.sign_msg(replica_prepare)) + .await; assert_matches!( res, Err(ReplicaPrepareError::NonValidatorSigner { signer }) => { @@ -134,14 +111,14 @@ async fn replica_prepare_non_validator_signer() { #[tokio::test] async fn replica_prepare_old_view() { - let mut util = UTHarness::new_one().await; - - util.set_replica_view(ViewNumber(1)); - util.set_leader_view(ViewNumber(2)); - util.set_leader_phase(Phase::Prepare); - - let replica_prepare = util.new_current_replica_prepare(|_| {}); - let res = util.dispatch_replica_prepare_one(replica_prepare); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + + let replica_prepare = util.new_replica_prepare(|_| {}); + util.consensus.leader.view = util.consensus.leader.view.next(); + util.consensus.leader.phase = Phase::Prepare; + let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!( res, Err(ReplicaPrepareError::Old { @@ -153,12 +130,13 @@ async fn replica_prepare_old_view() { #[tokio::test] async fn replica_prepare_during_commit() { - let mut util = UTHarness::new_one().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + util.consensus.leader.phase = Phase::Commit; - util.set_leader_phase(Phase::Commit); - - let replica_prepare = util.new_current_replica_prepare(|_| {}); - let res = util.dispatch_replica_prepare_one(replica_prepare); + let replica_prepare = util.new_replica_prepare(|_| {}); + let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!( res, Err(ReplicaPrepareError::Old { @@ -170,47 +148,48 @@ async fn replica_prepare_during_commit() { #[tokio::test] async fn replica_prepare_not_leader_in_view() { - let mut util = UTHarness::new_with(2).await; - - let current_view_leader = util.view_leader(util.replica_view()); - assert_ne!(current_view_leader, util.owner_key().public()); - - let replica_prepare = util.new_current_replica_prepare(|_| {}); - let res = util.dispatch_replica_prepare_one(replica_prepare.clone()); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 2).await; + let replica_prepare = util.new_replica_prepare(|msg| { + // Moving to the next view changes the leader. + msg.view = msg.view.next(); + }); + let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!(res, Err(ReplicaPrepareError::NotLeaderInView)); } #[tokio::test] async fn replica_prepare_already_exists() { - let mut util = UTHarness::new_with(2).await; + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 2).await; - let view = ViewNumber(2); - util.set_replica_view(view); - util.set_leader_view(view); - assert_eq!(util.view_leader(view), util.owner_key().public()); - - let replica_prepare = util.new_current_replica_prepare(|_| {}); - let _ = util.dispatch_replica_prepare_one(replica_prepare.clone()); - let res = util.dispatch_replica_prepare_one(replica_prepare.clone()); + util.set_owner_as_view_leader(); + let replica_prepare = util.new_replica_prepare(|_| {}); + assert!(util + .process_replica_prepare(ctx, replica_prepare.clone()) + .await + .is_err()); + let res = util + .process_replica_prepare(ctx, replica_prepare.clone()) + .await; assert_matches!( - res, - Err(ReplicaPrepareError::Exists { existing_message }) => { - assert_eq!(existing_message, replica_prepare.cast().unwrap().msg); + res, + Err(ReplicaPrepareError::Exists { existing_message }) => { + assert_eq!(existing_message, replica_prepare.msg); } ); } #[tokio::test] async fn replica_prepare_num_received_below_threshold() { - let mut util = UTHarness::new_with(2).await; - - let view = ViewNumber(2); - util.set_replica_view(view); - util.set_leader_view(view); - assert_eq!(util.view_leader(view), util.owner_key().public()); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 2).await; - let replica_prepare = util.new_current_replica_prepare(|_| {}); - let res = util.dispatch_replica_prepare_one(replica_prepare); + util.set_owner_as_view_leader(); + let replica_prepare = util.new_replica_prepare(|_| {}); + let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!( res, Err(ReplicaPrepareError::NumReceivedBelowThreshold { @@ -222,37 +201,36 @@ async fn replica_prepare_num_received_below_threshold() { #[tokio::test] async fn replica_prepare_invalid_sig() { - let mut util = UTHarness::new_one().await; - - let mut replica_prepare = util.new_current_replica_prepare(|_| {}); - replica_prepare.sig = util.rng().gen(); - let res = util.dispatch_replica_prepare_one(replica_prepare); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let mut replica_prepare = util.new_replica_prepare(|_| {}); + replica_prepare.sig = ctx.rng().gen(); + let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!(res, Err(ReplicaPrepareError::InvalidSignature(_))); } #[tokio::test] async fn replica_prepare_invalid_commit_qc() { - let mut util = UTHarness::new_one().await; - - let junk = util.rng().gen::(); - let replica_prepare = util.new_current_replica_prepare(|msg| msg.high_qc = junk); - let res = util.dispatch_replica_prepare_one(replica_prepare); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let replica_prepare = util.new_replica_prepare(|msg| msg.high_qc = ctx.rng().gen()); + let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!(res, Err(ReplicaPrepareError::InvalidHighQC(..))); } #[tokio::test] async fn replica_prepare_high_qc_of_current_view() { - let mut util = UTHarness::new_one().await; - + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; let view = ViewNumber(1); let qc_view = ViewNumber(1); - util.set_view(view); - let qc = util.new_commit_qc(|msg| { - msg.view = qc_view; - }); - let replica_prepare = util.new_current_replica_prepare(|msg| msg.high_qc = qc); - let res = util.dispatch_replica_prepare_one(replica_prepare); + let qc = util.new_commit_qc(|msg| msg.view = qc_view); + let replica_prepare = util.new_replica_prepare(|msg| msg.high_qc = qc); + let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!( res, Err(ReplicaPrepareError::HighQCOfFutureView { high_qc_view, current_view }) => { @@ -264,18 +242,16 @@ async fn replica_prepare_high_qc_of_current_view() { #[tokio::test] async fn replica_prepare_high_qc_of_future_view() { - let mut util = UTHarness::new_one().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; let view = ViewNumber(1); let qc_view = ViewNumber(2); - util.set_view(view); - let qc = util.new_commit_qc(|msg| { - msg.view = qc_view; - }); - - let replica_prepare = util.new_current_replica_prepare(|msg| msg.high_qc = qc); - let res = util.dispatch_replica_prepare_one(replica_prepare); + let qc = util.new_commit_qc(|msg| msg.view = qc_view); + let replica_prepare = util.new_replica_prepare(|msg| msg.high_qc = qc); + let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!( res, Err(ReplicaPrepareError::HighQCOfFutureView{ high_qc_view, current_view }) => { @@ -287,58 +263,46 @@ async fn replica_prepare_high_qc_of_future_view() { #[tokio::test] async fn replica_commit_sanity() { - let mut util = UTHarness::new_many().await; - - let replica_commit = util - .new_procedural_replica_commit_many() - .await - .cast() - .unwrap() - .msg; - util.dispatch_replica_commit_many( - vec![replica_commit; util.consensus_threshold()], - util.keys(), - ) - .unwrap(); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; + util.new_leader_commit(ctx).await; } #[tokio::test] async fn replica_commit_sanity_yield_leader_commit() { - let mut util = UTHarness::new_one().await; - - let replica_commit = util.new_procedural_replica_commit_one().await; - util.dispatch_replica_commit_one(replica_commit.clone()) - .unwrap(); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let replica_commit = util.new_replica_commit(ctx).await; let leader_commit = util - .recv_signed() + .process_replica_commit(ctx, replica_commit.clone()) .await .unwrap() - .cast::() - .unwrap() - .msg; - - let replica_commit = replica_commit.cast::().unwrap().msg; + .unwrap(); assert_matches!( - leader_commit, + leader_commit.msg, LeaderCommit { protocol_version, justification, } => { - assert_eq!(protocol_version, replica_commit.protocol_version); - assert_eq!(justification, util.new_commit_qc(|msg| *msg = replica_commit)); + assert_eq!(protocol_version, replica_commit.msg.protocol_version); + assert_eq!(justification, util.new_commit_qc(|msg| *msg = replica_commit.msg)); } ); } #[tokio::test] async fn replica_commit_incompatible_protocol_version() { - let mut util = UTHarness::new_one().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; let incompatible_protocol_version = util.incompatible_protocol_version(); let replica_commit = util.new_current_replica_commit(|msg| { msg.protocol_version = incompatible_protocol_version; }); - let res = util.dispatch_replica_commit_one(replica_commit); + let res = util.process_replica_commit(ctx, replica_commit).await; assert_matches!( res, Err(ReplicaCommitError::IncompatibleProtocolVersion { message_version, local_version }) => { @@ -350,13 +314,14 @@ async fn replica_commit_incompatible_protocol_version() { #[tokio::test] async fn replica_commit_non_validator_signer() { - let mut util = UTHarness::new_one().await; - - let replica_commit = util.new_current_replica_commit(|_| {}).cast().unwrap().msg; - let non_validator_key: validator::SecretKey = util.rng().gen(); - let signed = non_validator_key.sign_msg(ConsensusMsg::ReplicaCommit(replica_commit)); - - let res = util.dispatch_replica_commit_one(signed); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let replica_commit = util.new_current_replica_commit(|_| {}).msg; + let non_validator_key: validator::SecretKey = ctx.rng().gen(); + let res = util + .process_replica_commit(ctx, non_validator_key.sign_msg(replica_commit)) + .await; assert_matches!( res, Err(ReplicaCommitError::NonValidatorSigner { signer }) => { @@ -367,90 +332,85 @@ async fn replica_commit_non_validator_signer() { #[tokio::test] async fn replica_commit_old() { - let mut util = UTHarness::new_one().await; - - let mut replica_commit = util - .new_procedural_replica_commit_one() - .await - .cast::() - .unwrap() - .msg; - replica_commit.view = util.replica_view().prev(); - let replica_commit = util - .owner_key() - .sign_msg(ConsensusMsg::ReplicaCommit(replica_commit)); - - let res = util.dispatch_replica_commit_one(replica_commit); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let mut replica_commit = util.new_replica_commit(ctx).await.msg; + replica_commit.view = util.consensus.replica.view.prev(); + let replica_commit = util.owner_key().sign_msg(replica_commit); + let res = util.process_replica_commit(ctx, replica_commit).await; assert_matches!( res, Err(ReplicaCommitError::Old { current_view, current_phase }) => { - assert_eq!(current_view, util.replica_view()); - assert_eq!(current_phase, util.replica_phase()); + assert_eq!(current_view, util.consensus.replica.view); + assert_eq!(current_phase, util.consensus.replica.phase); } ); } #[tokio::test] async fn replica_commit_not_leader_in_view() { - let mut util = UTHarness::new_with(2).await; + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 2).await; - let current_view_leader = util.view_leader(util.replica_view()); + let current_view_leader = util.view_leader(util.consensus.replica.view); assert_ne!(current_view_leader, util.owner_key().public()); let replica_commit = util.new_current_replica_commit(|_| {}); - let res = util.dispatch_replica_commit_one(replica_commit); + let res = util.process_replica_commit(ctx, replica_commit).await; assert_matches!(res, Err(ReplicaCommitError::NotLeaderInView)); } #[tokio::test] async fn replica_commit_already_exists() { - let mut util = UTHarness::new_with(2).await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 2).await; let view = ViewNumber(2); util.set_replica_view(view); util.set_leader_view(view); assert_eq!(util.view_leader(view), util.owner_key().public()); - - let replica_prepare_one = util.new_current_replica_prepare(|_| {}); - let _ = util.dispatch_replica_prepare_one(replica_prepare_one.clone()); - let replica_prepare_two = util.key_at(1).sign_msg(replica_prepare_one.msg); - util.dispatch_replica_prepare_one(replica_prepare_two) - .unwrap(); - - let leader_prepare = util.recv_signed().await.unwrap(); - util.dispatch_leader_prepare(leader_prepare).await.unwrap(); - - let replica_commit = util.recv_signed().await.unwrap(); - let _ = util.dispatch_replica_commit_one(replica_commit.clone()); - let res = util.dispatch_replica_commit_one(replica_commit.clone()); + let replica_commit = util.new_replica_commit(ctx).await; + assert!(util + .process_replica_commit(ctx, replica_commit.clone()) + .await + .is_err()); + let res = util + .process_replica_commit(ctx, replica_commit.clone()) + .await; assert_matches!( res, Err(ReplicaCommitError::DuplicateMessage { existing_message }) => { - assert_eq!(existing_message, replica_commit.cast::().unwrap().msg) + assert_eq!(existing_message, replica_commit.msg) } ); } #[tokio::test] async fn replica_commit_num_received_below_threshold() { - let mut util = UTHarness::new_with(2).await; - - let view = ViewNumber(2); - util.set_replica_view(view); - util.set_leader_view(view); - assert_eq!(util.view_leader(view), util.owner_key().public()); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 2).await; - let replica_prepare_one = util.new_current_replica_prepare(|_| {}); - let _ = util.dispatch_replica_prepare_one(replica_prepare_one.clone()); - let replica_prepare_two = util.key_at(1).sign_msg(replica_prepare_one.msg); - util.dispatch_replica_prepare_one(replica_prepare_two) + let replica_prepare = util.new_replica_prepare(|_| {}); + assert!(util + .process_replica_prepare(ctx, replica_prepare.clone()) + .await + .is_err()); + let replica_prepare = util.keys[1].sign_msg(replica_prepare.msg); + let leader_prepare = util + .process_replica_prepare(ctx, replica_prepare) + .await + .unwrap() .unwrap(); - - let leader_prepare = util.recv_signed().await.unwrap(); - util.dispatch_leader_prepare(leader_prepare).await.unwrap(); - - let replica_commit = util.recv_signed().await.unwrap(); - let res = util.dispatch_replica_commit_one(replica_commit.clone()); + let replica_commit = util + .process_leader_prepare(ctx, leader_prepare) + .await + .unwrap(); + let res = util + .process_replica_commit(ctx, replica_commit.clone()) + .await; assert_matches!( res, Err(ReplicaCommitError::NumReceivedBelowThreshold { @@ -462,19 +422,21 @@ async fn replica_commit_num_received_below_threshold() { #[tokio::test] async fn replica_commit_invalid_sig() { - let mut util = UTHarness::new_one().await; - + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; let mut replica_commit = util.new_current_replica_commit(|_| {}); - replica_commit.sig = util.rng().gen(); - let res = util.dispatch_replica_commit_one(replica_commit); + replica_commit.sig = ctx.rng().gen(); + let res = util.process_replica_commit(ctx, replica_commit).await; assert_matches!(res, Err(ReplicaCommitError::InvalidSignature(..))); } #[tokio::test] async fn replica_commit_unexpected_proposal() { - let mut util = UTHarness::new_one().await; - + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; let replica_commit = util.new_current_replica_commit(|_| {}); - let res = util.dispatch_replica_commit_one(replica_commit); + let res = util.process_replica_commit(ctx, replica_commit).await; assert_matches!(res, Err(ReplicaCommitError::UnexpectedProposal)); } diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index 52238b10..840f905a 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -14,10 +14,10 @@ //! - [Notes on modern consensus algorithms](https://timroughgarden.github.io/fob21/andy.pdf) //! - [Blog post comparing several consensus algorithms](https://decentralizedthoughts.github.io/2023-04-01-hotstuff-2/) //! - Blog posts explaining [safety](https://seafooler.com/2022/01/24/understanding-safety-hotstuff/) and [responsiveness](https://seafooler.com/2022/04/02/understanding-responsiveness-hotstuff/) - use crate::io::{InputMessage, OutputMessage}; use anyhow::Context as _; use inner::ConsensusInner; +use std::sync::Arc; use tracing::{info, instrument}; use zksync_concurrency::ctx; use zksync_consensus_roles::validator; @@ -34,8 +34,18 @@ pub mod testonly; #[cfg(test)] mod tests; +/// Payload provider for the new blocks. +#[async_trait::async_trait] +pub trait PayloadSource: Send + Sync + 'static { + /// Propose a payload for the block `block_number`. + async fn propose( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + ) -> ctx::Result; +} + /// The Consensus struct implements the consensus algorithm and is the main entry point for the consensus actor. -#[derive(Debug)] pub struct Consensus { /// The inner struct contains the data that is shared between the consensus state machines. pub(crate) inner: ConsensusInner, @@ -47,7 +57,7 @@ pub struct Consensus { impl Consensus { /// Creates a new Consensus struct. - #[instrument(level = "trace", ret)] + #[instrument(level = "trace", skip(payload_source))] pub async fn new( ctx: &ctx::Ctx, pipe: ActorPipe, @@ -55,6 +65,7 @@ impl Consensus { secret_key: validator::SecretKey, validator_set: validator::ValidatorSet, storage: ReplicaStore, + payload_source: Arc, ) -> anyhow::Result { Ok(Consensus { inner: ConsensusInner { @@ -64,13 +75,13 @@ impl Consensus { protocol_version, }, replica: replica::StateMachine::new(ctx, storage).await?, - leader: leader::StateMachine::new(ctx), + leader: leader::StateMachine::new(ctx, payload_source), }) } /// Starts the Consensus actor. It will start running, processing incoming messages and /// sending output messages. This is a blocking method. - #[instrument(level = "trace", ret)] + #[instrument(level = "trace", skip(self), err)] pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { info!( "Starting consensus actor {:?}", @@ -99,27 +110,33 @@ impl Consensus { return Ok(()); } - match input { + let res = match input { Some(InputMessage::Network(req)) => { - match &req.msg.msg { + let res = match &req.msg.msg { validator::ConsensusMsg::ReplicaPrepare(_) | validator::ConsensusMsg::ReplicaCommit(_) => { - self.leader.process_input(ctx, &self.inner, req.msg); + self.leader.process_input(ctx, &self.inner, req.msg).await } validator::ConsensusMsg::LeaderPrepare(_) | validator::ConsensusMsg::LeaderCommit(_) => { self.replica .process_input(ctx, &self.inner, Some(req.msg)) - .await?; + .await } - } + }; + // Notify network actor that the message has been processed. // Ignore sending error. let _ = req.ack.send(()); + res } - None => { - self.replica.process_input(ctx, &self.inner, None).await?; - } + None => self.replica.process_input(ctx, &self.inner, None).await, + }; + if let Err(err) = res { + return match err { + ctx::Error::Canceled(_) => Ok(()), + ctx::Error::Internal(err) => Err(err), + }; } } } diff --git a/node/actors/bft/src/replica/leader_prepare.rs b/node/actors/bft/src/replica/leader_prepare.rs index 1605318b..82d273cf 100644 --- a/node/actors/bft/src/replica/leader_prepare.rs +++ b/node/actors/bft/src/replica/leader_prepare.rs @@ -98,6 +98,9 @@ pub(crate) enum Error { /// Proposal header corresponding to the payload. header: validator::BlockHeader, }, + /// Invalid payload. + #[error("invalid payload: {0:#}")] + ProposalInvalidPayload(#[source] anyhow::Error), /// Re-proposal without quorum. #[error("block re-proposal without quorum for the re-proposal")] ReproposalWithoutQuorum, @@ -266,6 +269,18 @@ impl StateMachine { header: message.proposal, }); } + + // Payload should be valid. + if let Err(err) = self + .storage + .verify_payload(ctx, message.proposal.number, payload) + .await + { + return Err(match err { + err @ ctx::Error::Canceled(_) => Error::Internal(err), + ctx::Error::Internal(err) => Error::ProposalInvalidPayload(err), + }); + } } // The leader is re-proposing a past block. None => { diff --git a/node/actors/bft/src/replica/tests.rs b/node/actors/bft/src/replica/tests.rs index c540a99f..00e61112 100644 --- a/node/actors/bft/src/replica/tests.rs +++ b/node/actors/bft/src/replica/tests.rs @@ -4,59 +4,47 @@ use super::{ use crate::{inner::ConsensusInner, leader::ReplicaPrepareError, testonly::ut_harness::UTHarness}; use assert_matches::assert_matches; use rand::Rng; -use std::cell::RefCell; +use zksync_concurrency::ctx; use zksync_consensus_roles::validator::{ - BlockHeaderHash, ConsensusMsg, LeaderCommit, LeaderPrepare, Payload, PrepareQC, ReplicaCommit, - ReplicaPrepare, ViewNumber, + self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, ViewNumber, }; #[tokio::test] async fn leader_prepare_sanity() { - let mut util = UTHarness::new_many().await; - - let leader_prepare = util.new_procedural_leader_prepare_many().await; - util.dispatch_leader_prepare(leader_prepare).await.unwrap(); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; + let leader_prepare = util.new_leader_prepare(ctx).await; + util.process_leader_prepare(ctx, leader_prepare) + .await + .unwrap(); } #[tokio::test] async fn leader_prepare_reproposal_sanity() { - let mut util = UTHarness::new_many().await; - - let replica_prepare: ReplicaPrepare = - util.new_unfinalized_replica_prepare().cast().unwrap().msg; - util.dispatch_replica_prepare_many( - vec![replica_prepare.clone(); util.consensus_threshold()], - util.keys(), - ) - .unwrap(); - let leader_prepare_signed = util.recv_signed().await.unwrap(); - - let leader_prepare = leader_prepare_signed - .clone() - .cast::() - .unwrap() - .msg; - assert_matches!( - leader_prepare, - LeaderPrepare {proposal_payload, .. } => { - assert_eq!(proposal_payload, None); - } - ); - - util.dispatch_leader_prepare(leader_prepare_signed) + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; + util.new_replica_commit(ctx).await; + util.process_replica_timeout(ctx).await; + let leader_prepare = util.new_leader_prepare(ctx).await; + assert!(leader_prepare.msg.proposal_payload.is_none()); + util.process_leader_prepare(ctx, leader_prepare) .await .unwrap(); } #[tokio::test] async fn leader_prepare_incompatible_protocol_version() { - let mut util = UTHarness::new_one().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; let incompatible_protocol_version = util.incompatible_protocol_version(); - let leader_prepare = util.new_rnd_leader_prepare(|msg| { + let leader_prepare = util.new_rnd_leader_prepare(&mut ctx.rng(), |msg| { msg.protocol_version = incompatible_protocol_version; }); - let res = util.dispatch_leader_prepare(leader_prepare).await; + let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!( res, Err(LeaderPrepareError::IncompatibleProtocolVersion { message_version, local_version }) => { @@ -68,45 +56,39 @@ async fn leader_prepare_incompatible_protocol_version() { #[tokio::test] async fn leader_prepare_sanity_yield_replica_commit() { - let mut util = UTHarness::new_one().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; - let leader_prepare = util.new_procedural_leader_prepare_one().await; - util.dispatch_leader_prepare(leader_prepare.clone()) - .await - .unwrap(); + let leader_prepare = util.new_leader_prepare(ctx).await; let replica_commit = util - .recv_signed() + .process_leader_prepare(ctx, leader_prepare.clone()) .await - .unwrap() - .cast::() - .unwrap() - .msg; - - let leader_prepare = leader_prepare.cast::().unwrap().msg; - assert_matches!( - replica_commit, + .unwrap(); + assert_eq!( + replica_commit.msg, ReplicaCommit { - protocol_version, - view, - proposal, - } => { - assert_eq!(protocol_version, leader_prepare.protocol_version); - assert_eq!(view, leader_prepare.view); - assert_eq!(proposal, leader_prepare.proposal); + protocol_version: leader_prepare.msg.protocol_version, + view: leader_prepare.msg.view, + proposal: leader_prepare.msg.proposal, } ); } #[tokio::test] async fn leader_prepare_invalid_leader() { - let mut util = UTHarness::new_with(2).await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 2).await; let view = ViewNumber(2); util.set_view(view); - assert_eq!(util.view_leader(view), util.key_at(0).public()); + assert_eq!(util.view_leader(view), util.keys[0].public()); - let replica_prepare_one = util.new_current_replica_prepare(|_| {}); - let res = util.dispatch_replica_prepare_one(replica_prepare_one.clone()); + let replica_prepare = util.new_replica_prepare(|_| {}); + let res = util + .process_replica_prepare(ctx, replica_prepare.clone()) + .await; assert_matches!( res, Err(ReplicaPrepareError::NumReceivedBelowThreshold { @@ -115,189 +97,162 @@ async fn leader_prepare_invalid_leader() { }) ); - let replica_prepare_two = util.key_at(1).sign_msg(replica_prepare_one.msg); - util.dispatch_replica_prepare_one(replica_prepare_two) - .unwrap(); - let msg = util.recv_signed().await.unwrap(); - let mut leader_prepare = msg.cast::().unwrap().msg; - + let replica_prepare = util.keys[1].sign_msg(replica_prepare.msg); + let mut leader_prepare = util + .process_replica_prepare(ctx, replica_prepare) + .await + .unwrap() + .unwrap() + .msg; leader_prepare.view = leader_prepare.view.next(); - assert_ne!( - util.view_leader(leader_prepare.view), - util.key_at(0).public() - ); + assert_ne!(util.view_leader(leader_prepare.view), util.keys[0].public()); - let leader_prepare = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare)); - let res = util.dispatch_leader_prepare(leader_prepare).await; + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!( res, Err(LeaderPrepareError::InvalidLeader { correct_leader, received_leader }) => { - assert_eq!(correct_leader, util.key_at(1).public()); - assert_eq!(received_leader, util.key_at(0).public()); + assert_eq!(correct_leader, util.keys[1].public()); + assert_eq!(received_leader, util.keys[0].public()); } ); } #[tokio::test] async fn leader_prepare_old_view() { - let mut util = UTHarness::new_one().await; - - let mut leader_prepare = util - .new_procedural_leader_prepare_one() - .await - .cast::() - .unwrap() - .msg; - leader_prepare.view = util.replica_view().prev(); - let leader_prepare = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare)); - - let res = util.dispatch_leader_prepare(leader_prepare).await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; + leader_prepare.view = util.consensus.replica.view.prev(); + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!( res, Err(LeaderPrepareError::Old { current_view, current_phase }) => { - assert_eq!(current_view, util.replica_view()); - assert_eq!(current_phase, util.replica_phase()); + assert_eq!(current_view, util.consensus.replica.view); + assert_eq!(current_phase, util.consensus.replica.phase); } ); } +/// Tests that `WriteBlockStore::verify_payload` is applied before signing a vote. #[tokio::test] -async fn leader_prepare_invalid_sig() { - let mut util = UTHarness::new_one().await; +async fn leader_prepare_invalid_payload() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let leader_prepare = util.new_leader_prepare(ctx).await; + + // Insert a finalized block to the storage. + // Default implementation of verify_payload() fails if + // head block number >= proposal block number. + let block = validator::FinalBlock { + header: leader_prepare.msg.proposal, + payload: leader_prepare.msg.proposal_payload.clone().unwrap(), + justification: CommitQC::from( + &[util.keys[0].sign_msg(ReplicaCommit { + protocol_version: util.protocol_version(), + view: util.consensus.replica.view, + proposal: leader_prepare.msg.proposal, + })], + &util.validator_set(), + ) + .unwrap(), + }; + util.consensus + .replica + .storage + .put_block(ctx, &block) + .await + .unwrap(); - let mut leader_prepare = util.new_rnd_leader_prepare(|_| {}); - leader_prepare.sig = util.rng().gen(); + let res = util.process_leader_prepare(ctx, leader_prepare).await; + assert_matches!(res, Err(LeaderPrepareError::ProposalInvalidPayload(..))); +} - let res = util.dispatch_leader_prepare(leader_prepare).await; +#[tokio::test] +async fn leader_prepare_invalid_sig() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let mut leader_prepare = util.new_rnd_leader_prepare(&mut ctx.rng(), |_| {}); + leader_prepare.sig = ctx.rng().gen(); + let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(LeaderPrepareError::InvalidSignature(..))); } #[tokio::test] async fn leader_prepare_invalid_prepare_qc() { - let mut util = UTHarness::new_one().await; - - let mut leader_prepare = util - .new_procedural_leader_prepare_one() - .await - .cast::() - .unwrap() - .msg; - leader_prepare.justification = util.rng().gen::(); - let leader_prepare = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare)); - - let res = util.dispatch_leader_prepare(leader_prepare).await; - assert_matches!( - res, - Err(LeaderPrepareError::InvalidPrepareQC(err)) => { - assert_eq!(err.to_string(), "PrepareQC contains messages for different views!") - } - ); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; + leader_prepare.justification = ctx.rng().gen(); + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util.process_leader_prepare(ctx, leader_prepare).await; + assert_matches!(res, Err(LeaderPrepareError::InvalidPrepareQC(_))); } #[tokio::test] async fn leader_prepare_invalid_high_qc() { - let mut util = UTHarness::new_one().await; - - let mut replica_prepare = util - .new_current_replica_prepare(|_| {}) - .cast::() - .unwrap() - .msg; - replica_prepare.high_qc = util.rng().gen(); - - let mut leader_prepare = util - .new_procedural_leader_prepare_one() - .await - .cast::() - .unwrap() - .msg; - - let high_qc = util.rng().gen(); - leader_prepare.justification = util.new_prepare_qc(|msg| msg.high_qc = high_qc); - let leader_prepare = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare)); - - let res = util.dispatch_leader_prepare(leader_prepare).await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; + leader_prepare.justification = util.new_prepare_qc(|msg| msg.high_qc = ctx.rng().gen()); + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(LeaderPrepareError::InvalidHighQC(_))); } #[tokio::test] async fn leader_prepare_proposal_oversized_payload() { - let mut util = UTHarness::new_one().await; + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; let payload_oversize = ConsensusInner::PAYLOAD_MAX_SIZE + 1; let payload_vec = vec![0; payload_oversize]; - - let mut leader_prepare = util - .new_procedural_leader_prepare_one() - .await - .cast::() - .unwrap() - .msg; + let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; leader_prepare.proposal_payload = Some(Payload(payload_vec)); - let leader_prepare_signed = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare.clone())); - - let res = util.dispatch_leader_prepare(leader_prepare_signed).await; + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util + .process_leader_prepare(ctx, leader_prepare.clone()) + .await; assert_matches!( res, Err(LeaderPrepareError::ProposalOversizedPayload{ payload_size, header }) => { assert_eq!(payload_size, payload_oversize); - assert_eq!(header, leader_prepare.proposal); + assert_eq!(header, leader_prepare.msg.proposal); } ); } #[tokio::test] async fn leader_prepare_proposal_mismatched_payload() { - let mut util = UTHarness::new_one().await; - - let mut leader_prepare = util - .new_procedural_leader_prepare_one() - .await - .cast::() - .unwrap() - .msg; - leader_prepare.proposal_payload = Some(util.rng().gen()); - let leader_prepare_signed = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare.clone())); - - let res = util.dispatch_leader_prepare(leader_prepare_signed).await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; + leader_prepare.proposal_payload = Some(ctx.rng().gen()); + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(LeaderPrepareError::ProposalMismatchedPayload)); } #[tokio::test] async fn leader_prepare_proposal_when_previous_not_finalized() { - let mut util = UTHarness::new_one().await; - - let replica_prepare = util.new_current_replica_prepare(|_| {}); - util.dispatch_replica_prepare_one(replica_prepare.clone()) - .unwrap(); - + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let replica_prepare = util.new_replica_prepare(|_| {}); let mut leader_prepare = util - .recv_signed() + .process_replica_prepare(ctx, replica_prepare) .await .unwrap() - .cast::() .unwrap() .msg; - - let high_vote = util.rng().gen(); - leader_prepare.justification = util.new_prepare_qc(|msg| msg.high_vote = high_vote); - - let leader_prepare_signed = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare.clone())); - - let res = util.dispatch_leader_prepare(leader_prepare_signed).await; + leader_prepare.justification = util.new_prepare_qc(|msg| msg.high_vote = ctx.rng().gen()); + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!( res, Err(LeaderPrepareError::ProposalWhenPreviousNotFinalized) @@ -306,31 +261,20 @@ async fn leader_prepare_proposal_when_previous_not_finalized() { #[tokio::test] async fn leader_prepare_proposal_invalid_parent_hash() { - let mut util = UTHarness::new_one().await; - - let replica_prepare_signed = util.new_current_replica_prepare(|_| {}); - let replica_prepare = replica_prepare_signed - .clone() - .cast::() - .unwrap() - .msg; - util.dispatch_replica_prepare_one(replica_prepare_signed.clone()) - .unwrap(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let replica_prepare = util.new_replica_prepare(|_| {}); let mut leader_prepare = util - .recv_signed() + .process_replica_prepare(ctx, replica_prepare.clone()) .await .unwrap() - .cast::() .unwrap() .msg; - - let junk: BlockHeaderHash = util.rng().gen(); - leader_prepare.proposal.parent = junk; - let leader_prepare_signed = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare.clone())); - - let res = util.dispatch_leader_prepare(leader_prepare_signed).await; + leader_prepare.proposal.parent = ctx.rng().gen(); + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util + .process_leader_prepare(ctx, leader_prepare.clone()) + .await; assert_matches!( res, Err(LeaderPrepareError::ProposalInvalidParentHash { @@ -338,172 +282,141 @@ async fn leader_prepare_proposal_invalid_parent_hash() { received_parent_hash, header }) => { - assert_eq!(correct_parent_hash, replica_prepare.high_vote.proposal.hash()); - assert_eq!(received_parent_hash, junk); - assert_eq!(header, leader_prepare.proposal); + assert_eq!(correct_parent_hash, replica_prepare.msg.high_vote.proposal.hash()); + assert_eq!(received_parent_hash, leader_prepare.msg.proposal.parent); + assert_eq!(header, leader_prepare.msg.proposal); } ); } #[tokio::test] async fn leader_prepare_proposal_non_sequential_number() { - let mut util = UTHarness::new_one().await; - - let replica_prepare_signed = util.new_current_replica_prepare(|_| {}); - let replica_prepare = replica_prepare_signed - .clone() - .cast::() - .unwrap() - .msg; - util.dispatch_replica_prepare_one(replica_prepare_signed) - .unwrap(); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let replica_prepare = util.new_replica_prepare(|_| {}); let mut leader_prepare = util - .recv_signed() + .process_replica_prepare(ctx, replica_prepare.clone()) .await .unwrap() - .cast::() .unwrap() .msg; - - let correct_num = replica_prepare.high_vote.proposal.number.next(); + let correct_num = replica_prepare.msg.high_vote.proposal.number.next(); assert_eq!(correct_num, leader_prepare.proposal.number); let non_seq_num = correct_num.next(); leader_prepare.proposal.number = non_seq_num; - let leader_prepare_signed = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare.clone())); - - let res = util.dispatch_leader_prepare(leader_prepare_signed).await; + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util + .process_leader_prepare(ctx, leader_prepare.clone()) + .await; assert_matches!( res, Err(LeaderPrepareError::ProposalNonSequentialNumber { correct_number, received_number, header }) => { assert_eq!(correct_number, correct_num); assert_eq!(received_number, non_seq_num); - assert_eq!(header, leader_prepare.proposal); + assert_eq!(header, leader_prepare.msg.proposal); } ); } #[tokio::test] async fn leader_prepare_reproposal_without_quorum() { - let mut util = UTHarness::new_many().await; - + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let mut util = UTHarness::new_many(ctx).await; + let replica_prepare = util.new_replica_prepare(|_| {}).msg; let mut leader_prepare = util - .new_procedural_leader_prepare_many() + .process_replica_prepare_all(ctx, replica_prepare.clone()) .await - .cast::() - .unwrap() .msg; - let rng = RefCell::new(util.new_rng()); - leader_prepare.justification = util.new_prepare_qc_many(&|msg: &mut ReplicaPrepare| { - let mut rng = rng.borrow_mut(); - msg.high_vote = rng.gen(); - }); + // Turn leader_prepare into an unjustified reproposal. + let replica_prepares: Vec<_> = util + .keys + .iter() + .map(|k| { + let mut msg = replica_prepare.clone(); + msg.high_vote = rng.gen(); + k.sign_msg(msg) + }) + .collect(); + leader_prepare.justification = + PrepareQC::from(&replica_prepares, &util.validator_set()).unwrap(); leader_prepare.proposal_payload = None; - let leader_prepare = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare)); - let res = util.dispatch_leader_prepare(leader_prepare).await; + let leader_prepare = util.keys[0].sign_msg(leader_prepare); + let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(LeaderPrepareError::ReproposalWithoutQuorum)); } #[tokio::test] async fn leader_prepare_reproposal_when_finalized() { - let mut util = UTHarness::new_one().await; - - let mut leader_prepare = util - .new_procedural_leader_prepare_one() - .await - .cast::() - .unwrap() - .msg; + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; leader_prepare.proposal_payload = None; - let leader_prepare_signed = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare.clone())); - - let res = util.dispatch_leader_prepare(leader_prepare_signed).await; + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(LeaderPrepareError::ReproposalWhenFinalized)); } #[tokio::test] async fn leader_prepare_reproposal_invalid_block() { - let mut util = UTHarness::new_one().await; - - let mut leader_prepare: LeaderPrepare = util - .new_procedural_leader_prepare_one() - .await - .cast() - .unwrap() - .msg; - - let high_vote = util.rng().gen(); - leader_prepare.justification = util.new_prepare_qc(|msg: &mut ReplicaPrepare| { - msg.high_vote = high_vote; - }); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; + leader_prepare.justification = util.new_prepare_qc(|msg| msg.high_vote = ctx.rng().gen()); leader_prepare.proposal_payload = None; - - let leader_prepare = util - .owner_key() - .sign_msg(ConsensusMsg::LeaderPrepare(leader_prepare)); - - let res = util.dispatch_leader_prepare(leader_prepare).await; + let leader_prepare = util.owner_key().sign_msg(leader_prepare); + let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(LeaderPrepareError::ReproposalInvalidBlock)); } #[tokio::test] async fn leader_commit_sanity() { - let mut util = UTHarness::new_many().await; - - let leader_commit = util.new_procedural_leader_commit_many().await; - util.dispatch_leader_commit(leader_commit).await.unwrap(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; + let leader_commit = util.new_leader_commit(ctx).await; + util.process_leader_commit(ctx, leader_commit) + .await + .unwrap(); } #[tokio::test] async fn leader_commit_sanity_yield_replica_prepare() { - let mut util = UTHarness::new_one().await; - - let leader_commit = util.new_procedural_leader_commit_one().await; - util.dispatch_leader_commit(leader_commit.clone()) - .await - .unwrap(); + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; + let leader_commit = util.new_leader_commit(ctx).await; let replica_prepare = util - .recv_signed() + .process_leader_commit(ctx, leader_commit.clone()) .await - .unwrap() - .cast::() - .unwrap() - .msg; - - let leader_commit = leader_commit.cast::().unwrap().msg; - assert_matches!( - replica_prepare, + .unwrap(); + assert_eq!( + replica_prepare.msg, ReplicaPrepare { - protocol_version, - view, - high_vote, - high_qc, - } => { - assert_eq!(protocol_version, leader_commit.protocol_version); - assert_eq!(view, leader_commit.justification.message.view.next()); - assert_eq!(high_vote, leader_commit.justification.message); - assert_eq!(high_qc, leader_commit.justification) + protocol_version: leader_commit.msg.protocol_version, + view: leader_commit.msg.justification.message.view.next(), + high_vote: leader_commit.msg.justification.message, + high_qc: leader_commit.msg.justification, } ); } #[tokio::test] async fn leader_commit_incompatible_protocol_version() { - let mut util = UTHarness::new_one().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 1).await; let incompatible_protocol_version = util.incompatible_protocol_version(); - let leader_commit = util.new_rnd_leader_commit(|msg| { + let leader_commit = util.new_rnd_leader_commit(&mut ctx.rng(), |msg| { msg.protocol_version = incompatible_protocol_version; }); - let res = util.dispatch_leader_commit(leader_commit).await; + let res = util.process_leader_commit(ctx, leader_commit).await; assert_matches!( res, Err(LeaderCommitError::IncompatibleProtocolVersion { message_version, local_version }) => { @@ -515,31 +428,34 @@ async fn leader_commit_incompatible_protocol_version() { #[tokio::test] async fn leader_commit_invalid_leader() { - let mut util = UTHarness::new_with(2).await; - - let current_view_leader = util.view_leader(util.replica_view()); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new(ctx, 2).await; + let current_view_leader = util.view_leader(util.consensus.replica.view); assert_ne!(current_view_leader, util.owner_key().public()); - let leader_commit = util.new_rnd_leader_commit(|_| {}); - let res = util.dispatch_leader_commit(leader_commit).await; + let leader_commit = util.new_rnd_leader_commit(&mut ctx.rng(), |_| {}); + let res = util.process_leader_commit(ctx, leader_commit).await; assert_matches!(res, Err(LeaderCommitError::InvalidLeader { .. })); } #[tokio::test] async fn leader_commit_invalid_sig() { - let mut util = UTHarness::new_one().await; - - let mut leader_commit = util.new_rnd_leader_commit(|_| {}); - leader_commit.sig = util.rng().gen(); - let res = util.dispatch_leader_commit(leader_commit).await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let mut util = UTHarness::new(ctx, 1).await; + let mut leader_commit = util.new_rnd_leader_commit(rng, |_| {}); + leader_commit.sig = rng.gen(); + let res = util.process_leader_commit(ctx, leader_commit).await; assert_matches!(res, Err(LeaderCommitError::InvalidSignature { .. })); } #[tokio::test] async fn leader_commit_invalid_commit_qc() { - let mut util = UTHarness::new_one().await; - - let leader_commit = util.new_rnd_leader_commit(|_| {}); - let res = util.dispatch_leader_commit(leader_commit).await; + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let mut util = UTHarness::new(ctx, 1).await; + let leader_commit = util.new_rnd_leader_commit(rng, |_| {}); + let res = util.process_leader_commit(ctx, leader_commit).await; assert_matches!(res, Err(LeaderCommitError::InvalidJustification { .. })); } diff --git a/node/actors/bft/src/testonly/make.rs b/node/actors/bft/src/testonly/make.rs index dda37078..c87fbd78 100644 --- a/node/actors/bft/src/testonly/make.rs +++ b/node/actors/bft/src/testonly/make.rs @@ -1,15 +1,31 @@ //! This module contains utilities that are only meant for testing purposes. - use crate::{ io::{InputMessage, OutputMessage}, - Consensus, + Consensus, ConsensusInner, PayloadSource, }; +use rand::Rng as _; use std::sync::Arc; use zksync_concurrency::ctx; use zksync_consensus_roles::validator; use zksync_consensus_storage::{InMemoryStorage, ReplicaStore}; use zksync_consensus_utils::pipe::{self, DispatcherPipe}; +/// Provides payload consisting of random bytes. +pub struct RandomPayloadSource; + +#[async_trait::async_trait] +impl PayloadSource for RandomPayloadSource { + async fn propose( + &self, + ctx: &ctx::Ctx, + _block_number: validator::BlockNumber, + ) -> ctx::Result { + let mut payload = validator::Payload(vec![0; ConsensusInner::PAYLOAD_MAX_SIZE]); + ctx.rng().fill(&mut payload.0[..]); + Ok(payload) + } +} + /// This creates a mock Consensus struct for unit tests. pub async fn make_consensus( ctx: &ctx::Ctx, @@ -29,6 +45,7 @@ pub async fn make_consensus( key.clone(), validator_set.clone(), ReplicaStore::from_store(Arc::new(storage)), + Arc::new(RandomPayloadSource), ); let consensus = consensus .await @@ -42,8 +59,9 @@ pub fn make_genesis( keys: &[validator::SecretKey], protocol_version: validator::ProtocolVersion, payload: validator::Payload, + block_number: validator::BlockNumber, ) -> (validator::FinalBlock, validator::ValidatorSet) { - let header = validator::BlockHeader::genesis(payload.hash()); + let header = validator::BlockHeader::genesis(payload.hash(), block_number); let validator_set = validator::ValidatorSet::new(keys.iter().map(|k| k.public())).unwrap(); let signed_messages: Vec<_> = keys .iter() diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 849e7f9b..63384362 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,4 +1,4 @@ -use super::{Behavior, Node}; +use super::{Behavior, Node, RandomPayloadSource}; use crate::{testonly, Consensus}; use anyhow::Context; use std::{collections::HashMap, sync::Arc}; @@ -36,6 +36,7 @@ impl Test { &keys, validator::ProtocolVersion::EARLIEST, validator::Payload(vec![]), + validator::BlockNumber(0), ); let nodes: Vec<_> = nodes .into_iter() @@ -110,6 +111,7 @@ async fn run_nodes(ctx: &ctx::Ctx, network: Network, nodes: &[Node]) -> anyhow:: node.net.consensus_config().key.clone(), validator_set, storage, + Arc::new(RandomPayloadSource), ) .await .context("consensus")?; diff --git a/node/actors/bft/src/testonly/ut_harness.rs b/node/actors/bft/src/testonly/ut_harness.rs index 2a5a7e5c..741b3088 100644 --- a/node/actors/bft/src/testonly/ut_harness.rs +++ b/node/actors/bft/src/testonly/ut_harness.rs @@ -5,18 +5,15 @@ use crate::{ Consensus, }; use assert_matches::assert_matches; -use rand::{rngs::StdRng, Rng}; -use zksync_concurrency::{ - ctx, - ctx::{Canceled, Ctx}, - scope, -}; +use rand::Rng; +use std::cmp::Ordering; +use zksync_concurrency::ctx; use zksync_consensus_network::io::ConsensusInputMessage; use zksync_consensus_roles::validator::{ - self, BlockHeader, CommitQC, ConsensusMsg, LeaderCommit, LeaderPrepare, Payload, Phase, - PrepareQC, ProtocolVersion, ReplicaCommit, ReplicaPrepare, SecretKey, Signed, ViewNumber, + self, BlockHeader, CommitQC, LeaderCommit, LeaderPrepare, Payload, Phase, PrepareQC, + ReplicaCommit, ReplicaPrepare, SecretKey, Signed, ViewNumber, }; -use zksync_consensus_utils::pipe::DispatcherPipe; +use zksync_consensus_utils::{enum_util::Variant, pipe::DispatcherPipe}; /// `UTHarness` provides various utilities for unit tests. /// It is designed to simplify the setup and execution of test cases by encapsulating @@ -25,128 +22,82 @@ use zksync_consensus_utils::pipe::DispatcherPipe; /// It should be instantiated once for every test case. #[cfg(test)] pub(crate) struct UTHarness { - ctx: Ctx, - rng: StdRng, - consensus: Consensus, + pub(crate) consensus: Consensus, + pub(crate) keys: Vec, pipe: DispatcherPipe, - keys: Vec, } impl UTHarness { - /// Creates a new `UTHarness` with one validator. - pub(crate) async fn new_one() -> UTHarness { - UTHarness::new_with(1).await - } - /// Creates a new `UTHarness` with minimally-significant validator set size. - pub(crate) async fn new_many() -> UTHarness { - let num_validators = 6; - assert_matches!(crate::misc::faulty_replicas(num_validators), res if res > 0); - let mut util = UTHarness::new_with(num_validators).await; - util.set_view(util.owner_as_view_leader_current_or_next()); - util - } - /// Creates a new `UTHarness` with the specified validator set size. - pub(crate) async fn new_with(num_validators: usize) -> UTHarness { - let ctx = ctx::test_root(&ctx::RealClock); + pub(crate) async fn new(ctx: &ctx::Ctx, num_validators: usize) -> UTHarness { let mut rng = ctx.rng(); let keys: Vec<_> = (0..num_validators).map(|_| rng.gen()).collect(); - let (genesis, val_set) = - crate::testonly::make_genesis(&keys, ProtocolVersion::EARLIEST, Payload(vec![])); + let (genesis, val_set) = crate::testonly::make_genesis( + &keys, + validator::ProtocolVersion::EARLIEST, + Payload(vec![]), + validator::BlockNumber(0), + ); let (mut consensus, pipe) = - crate::testonly::make_consensus(&ctx, &keys[0], &val_set, &genesis).await; + crate::testonly::make_consensus(ctx, &keys[0], &val_set, &genesis).await; consensus.leader.view = ViewNumber(1); consensus.replica.view = ViewNumber(1); - UTHarness { - ctx, - rng, consensus, pipe, keys, } } - pub(crate) async fn iterate_next(&mut self) { - let leader_commit = self.new_procedural_leader_commit_many().await; - self.dispatch_leader_commit(leader_commit).await.unwrap(); - self.recv_signed() - .await - .unwrap() - .cast::() - .unwrap(); + /// Creates a new `UTHarness` with minimally-significant validator set size. + pub(crate) async fn new_many(ctx: &ctx::Ctx) -> UTHarness { + let num_validators = 6; + assert!(crate::misc::faulty_replicas(num_validators) > 0); + UTHarness::new(ctx, num_validators).await } - /// Validate protocol liveness in the aftermath of a timeout. - /// - /// Params refer to the expected values in the next produced `ReplicaPrepare` - /// messages for the new iteration. - /// - /// * `view` - the expected view of the next `ReplicaPrepare`. - /// * `high_vote_view` - the expected view of the high vote of the next `ReplicaPrepare`. - /// * `high_qc_view` - the expected view of the high qc of the next `ReplicaPrepare`. - /// - pub(crate) async fn check_recovery_after_timeout( - &mut self, - view: ViewNumber, - high_vote_view: ViewNumber, - high_qc_view: ViewNumber, - ) { - let replica_prepare = self - .recv_signed() - .await - .unwrap() - .cast::() - .unwrap() - .msg; - - assert_eq!(replica_prepare.view, view); - assert_eq!(replica_prepare.high_vote.view, high_vote_view); - assert_eq!(replica_prepare.high_qc.message.view, high_qc_view); + /// Triggers replica timeout, validates the new ReplicaPrepare + /// then executes the whole new view to make sure that the consensus + /// recovers after a timeout. + pub(crate) async fn produce_block_after_timeout(&mut self, ctx: &ctx::Ctx) { + let want = ReplicaPrepare { + protocol_version: self.consensus.inner.protocol_version, + view: self.consensus.replica.view.next(), + high_qc: self.consensus.replica.high_qc.clone(), + high_vote: self.consensus.replica.high_vote, + }; + let replica_prepare = self.process_replica_timeout(ctx).await; + assert_eq!(want, replica_prepare.msg); - self.set_replica_view(self.owner_as_view_leader_current_or_next()); - self.iterate_next().await; + let leader_commit = self.new_leader_commit(ctx).await; + self.process_leader_commit(ctx, leader_commit) + .await + .unwrap(); } pub(crate) fn consensus_threshold(&self) -> usize { crate::misc::consensus_threshold(self.keys.len()) } - pub(crate) fn protocol_version(&self) -> ProtocolVersion { + pub(crate) fn protocol_version(&self) -> validator::ProtocolVersion { self.consensus.inner.protocol_version } - pub(crate) fn incompatible_protocol_version(&self) -> ProtocolVersion { - ProtocolVersion(self.protocol_version().0 + 1) + pub(crate) fn incompatible_protocol_version(&self) -> validator::ProtocolVersion { + validator::ProtocolVersion(self.protocol_version().0 + 1) } pub(crate) fn owner_key(&self) -> &SecretKey { &self.consensus.inner.secret_key } - pub(crate) fn owner_as_view_leader_current_or_next(&self) -> ViewNumber { - let mut view = self.replica_view(); + pub(crate) fn set_owner_as_view_leader(&mut self) { + let mut view = self.consensus.replica.view; while self.view_leader(view) != self.owner_key().public() { view = view.next(); } - view - } - - pub(crate) fn key_at(&self, index: usize) -> &SecretKey { - &self.keys[index] - } - - pub(crate) fn keys(&self) -> Vec { - self.keys.clone() - } - - pub(crate) fn rng(&mut self) -> &mut StdRng { - &mut self.rng - } - - pub(crate) fn new_rng(&self) -> StdRng { - self.ctx.rng() + self.consensus.replica.view = view; } pub(crate) fn set_view(&mut self, view: ViewNumber) { @@ -158,53 +109,31 @@ impl UTHarness { self.consensus.leader.view = view } - pub(crate) fn set_leader_phase(&mut self, phase: Phase) { - self.consensus.leader.phase = phase - } - pub(crate) fn set_replica_view(&mut self, view: ViewNumber) { self.consensus.replica.view = view } - pub(crate) fn new_unfinalized_replica_prepare(&self) -> Signed { - self.new_current_replica_prepare(|msg| { - let mut high_vote = ReplicaCommit { - protocol_version: self.protocol_version(), - view: self.consensus.replica.view.next(), - proposal: self.consensus.replica.high_qc.message.proposal, - }; - - high_vote.proposal.parent = high_vote.proposal.hash(); - high_vote.proposal.number = high_vote.proposal.number.next(); - - msg.high_vote = high_vote; - }) - } - - pub(crate) fn new_current_replica_prepare( - &self, + pub(crate) fn new_replica_prepare( + &mut self, mutate_fn: impl FnOnce(&mut ReplicaPrepare), - ) -> Signed { + ) -> Signed { + self.set_owner_as_view_leader(); let mut msg = ReplicaPrepare { protocol_version: self.protocol_version(), view: self.consensus.replica.view, high_vote: self.consensus.replica.high_vote, high_qc: self.consensus.replica.high_qc.clone(), }; - mutate_fn(&mut msg); - - self.consensus - .inner - .secret_key - .sign_msg(ConsensusMsg::ReplicaPrepare(msg)) + self.consensus.inner.secret_key.sign_msg(msg) } pub(crate) fn new_rnd_leader_prepare( &mut self, + rng: &mut impl Rng, mutate_fn: impl FnOnce(&mut LeaderPrepare), - ) -> Signed { - let payload: Payload = self.rng().gen(); + ) -> Signed { + let payload: Payload = rng.gen(); let mut msg = LeaderPrepare { protocol_version: self.protocol_version(), view: self.consensus.leader.view, @@ -214,267 +143,182 @@ impl UTHarness { payload: payload.hash(), }, proposal_payload: Some(payload), - justification: self.rng().gen(), + justification: rng.gen(), }; mutate_fn(&mut msg); - self.consensus - .inner - .secret_key - .sign_msg(ConsensusMsg::LeaderPrepare(msg)) + self.consensus.inner.secret_key.sign_msg(msg) } pub(crate) fn new_current_replica_commit( &self, mutate_fn: impl FnOnce(&mut ReplicaCommit), - ) -> Signed { + ) -> Signed { let mut msg = ReplicaCommit { protocol_version: self.protocol_version(), view: self.consensus.replica.view, proposal: self.consensus.replica.high_qc.message.proposal, }; - mutate_fn(&mut msg); - - self.consensus - .inner - .secret_key - .sign_msg(ConsensusMsg::ReplicaCommit(msg)) + self.consensus.inner.secret_key.sign_msg(msg) } pub(crate) fn new_rnd_leader_commit( &mut self, + rng: &mut impl Rng, mutate_fn: impl FnOnce(&mut LeaderCommit), - ) -> Signed { + ) -> Signed { let mut msg = LeaderCommit { protocol_version: self.protocol_version(), - justification: self.rng().gen(), + justification: rng.gen(), }; - mutate_fn(&mut msg); - - self.consensus - .inner - .secret_key - .sign_msg(ConsensusMsg::LeaderCommit(msg)) - } - - pub(crate) async fn new_procedural_leader_prepare_one(&mut self) -> Signed { - let replica_prepare = self.new_current_replica_prepare(|_| {}); - self.dispatch_replica_prepare_one(replica_prepare.clone()) - .unwrap(); - self.recv_signed().await.unwrap() - } - - pub(crate) async fn new_procedural_leader_prepare_many(&mut self) -> Signed { - let replica_prepare = self.new_current_replica_prepare(|_| {}).cast().unwrap().msg; - self.dispatch_replica_prepare_many( - vec![replica_prepare; self.consensus_threshold()], - self.keys(), - ) - .unwrap(); - self.recv_signed().await.unwrap() - } - - pub(crate) async fn new_procedural_replica_commit_one(&mut self) -> Signed { - let replica_prepare = self.new_current_replica_prepare(|_| {}); - self.dispatch_replica_prepare_one(replica_prepare.clone()) - .unwrap(); - let leader_prepare = self.recv_signed().await.unwrap(); - self.dispatch_leader_prepare(leader_prepare).await.unwrap(); - self.recv_signed().await.unwrap() + self.consensus.inner.secret_key.sign_msg(msg) } - pub(crate) async fn new_procedural_replica_commit_many(&mut self) -> Signed { - let leader_prepare = self.new_procedural_leader_prepare_many().await; - self.dispatch_leader_prepare(leader_prepare).await.unwrap(); - self.recv_signed().await.unwrap() + pub(crate) async fn new_leader_prepare(&mut self, ctx: &ctx::Ctx) -> Signed { + let replica_prepare = self.new_replica_prepare(|_| {}).msg; + self.process_replica_prepare_all(ctx, replica_prepare).await } - pub(crate) async fn new_procedural_leader_commit_one(&mut self) -> Signed { - let replica_prepare = self.new_current_replica_prepare(|_| {}); - self.dispatch_replica_prepare_one(replica_prepare.clone()) - .unwrap(); - let leader_prepare = self.recv_signed().await.unwrap(); - self.dispatch_leader_prepare(leader_prepare).await.unwrap(); - let replica_commit = self.recv_signed().await.unwrap(); - self.dispatch_replica_commit_one(replica_commit).unwrap(); - self.recv_signed().await.unwrap() - } - - pub(crate) async fn new_procedural_leader_commit_many(&mut self) -> Signed { - let replica_commit = self - .new_procedural_replica_commit_many() + pub(crate) async fn new_replica_commit(&mut self, ctx: &ctx::Ctx) -> Signed { + let leader_prepare = self.new_leader_prepare(ctx).await; + self.process_leader_prepare(ctx, leader_prepare) .await - .cast() .unwrap() - .msg; - self.dispatch_replica_commit_many( - vec![replica_commit; self.consensus_threshold()], - self.keys(), - ) - .unwrap(); - self.recv_signed().await.unwrap() } - #[allow(clippy::result_large_err)] - pub(crate) fn dispatch_replica_prepare_one( - &mut self, - msg: Signed, - ) -> Result<(), ReplicaPrepareError> { - self.consensus.leader.process_replica_prepare( - &self.ctx, - &self.consensus.inner, - msg.cast().unwrap(), - ) + pub(crate) async fn new_leader_commit(&mut self, ctx: &ctx::Ctx) -> Signed { + let replica_commit = self.new_replica_commit(ctx).await; + self.process_replica_commit_all(ctx, replica_commit.msg) + .await } - #[allow(clippy::result_large_err)] - pub(crate) fn dispatch_replica_prepare_many( + pub(crate) async fn process_leader_prepare( &mut self, - messages: Vec, - keys: Vec, - ) -> Result<(), ReplicaPrepareError> { - let len = messages.len(); - let consensus_threshold = self.consensus_threshold(); - messages - .into_iter() - .zip(keys) - .map(|(msg, key)| { - let signed = key.sign_msg(ConsensusMsg::ReplicaPrepare(msg)); - self.dispatch_replica_prepare_one(signed) - }) - .fold((0, None), |(i, _), res| { - let i = i + 1; - if i < len { - assert_matches!( - res, - Err(ReplicaPrepareError::NumReceivedBelowThreshold { - num_messages, - threshold, - }) => { - assert_eq!(num_messages, i); - assert_eq!(threshold, consensus_threshold) - } - ); - } - (i, Some(res)) - }) - .1 - .unwrap() + ctx: &ctx::Ctx, + msg: Signed, + ) -> Result, LeaderPrepareError> { + self.consensus + .replica + .process_leader_prepare(ctx, &self.consensus.inner, msg) + .await?; + Ok(self.try_recv().unwrap()) } - #[allow(clippy::result_large_err)] - pub(crate) fn dispatch_replica_commit_one( + pub(crate) async fn process_leader_commit( &mut self, - msg: Signed, - ) -> Result<(), ReplicaCommitError> { - self.consensus.leader.process_replica_commit( - &self.ctx, - &self.consensus.inner, - msg.cast().unwrap(), - ) + ctx: &ctx::Ctx, + msg: Signed, + ) -> Result, LeaderCommitError> { + self.consensus + .replica + .process_leader_commit(ctx, &self.consensus.inner, msg) + .await?; + Ok(self.try_recv().unwrap()) } #[allow(clippy::result_large_err)] - pub(crate) fn dispatch_replica_commit_many( + pub(crate) async fn process_replica_prepare( &mut self, - messages: Vec, - keys: Vec, - ) -> Result<(), ReplicaCommitError> { - let len = messages.len(); - let consensus_threshold = self.consensus_threshold(); - messages - .into_iter() - .zip(keys) - .map(|(msg, key)| { - let signed = key.sign_msg(ConsensusMsg::ReplicaCommit(msg)); - self.dispatch_replica_commit_one(signed) - }) - .fold((0, None), |(i, _), res| { - let i = i + 1; - if i < len { - assert_matches!( - res, - Err(ReplicaCommitError::NumReceivedBelowThreshold { - num_messages, - threshold, - }) => { - assert_eq!(num_messages, i); - assert_eq!(threshold, consensus_threshold) - } - ); - } - (i, Some(res)) - }) - .1 - .unwrap() + ctx: &ctx::Ctx, + msg: Signed, + ) -> Result>, ReplicaPrepareError> { + self.consensus + .leader + .process_replica_prepare(ctx, &self.consensus.inner, msg) + .await?; + Ok(self.try_recv()) } - pub(crate) async fn dispatch_leader_prepare( + pub(crate) async fn process_replica_prepare_all( &mut self, - msg: Signed, - ) -> Result<(), LeaderPrepareError> { - scope::run!(&self.ctx, |ctx, s| { - s.spawn(async { - let res = self - .consensus - .replica - .process_leader_prepare(ctx, &self.consensus.inner, msg.cast().unwrap()) - .await; - Ok(res) - }) - .join(ctx) - }) - .await - .unwrap() + ctx: &ctx::Ctx, + msg: ReplicaPrepare, + ) -> Signed { + for (i, key) in self.keys.iter().enumerate() { + let res = self + .consensus + .leader + .process_replica_prepare(ctx, &self.consensus.inner, key.sign_msg(msg.clone())) + .await; + match (i + 1).cmp(&self.consensus_threshold()) { + Ordering::Equal => res.unwrap(), + Ordering::Less => assert_matches!( + res, + Err(ReplicaPrepareError::NumReceivedBelowThreshold { + num_messages, + threshold, + }) => { + assert_eq!(num_messages, i+1); + assert_eq!(threshold, self.consensus_threshold()) + } + ), + Ordering::Greater => assert_matches!(res, Err(ReplicaPrepareError::Old { .. })), + } + } + self.try_recv().unwrap() } - pub(crate) async fn sim_timeout(&mut self) { + pub(crate) async fn process_replica_commit( + &mut self, + ctx: &ctx::Ctx, + msg: Signed, + ) -> Result>, ReplicaCommitError> { self.consensus - .replica - .process_input(&self.ctx, &self.consensus.inner, None) - .await - .unwrap() + .leader + .process_replica_commit(ctx, &self.consensus.inner, msg)?; + Ok(self.try_recv()) } - pub(crate) async fn dispatch_leader_commit( + async fn process_replica_commit_all( &mut self, - msg: Signed, - ) -> Result<(), LeaderCommitError> { - scope::run!(&self.ctx, |ctx, s| { - s.spawn(async { - let res = self - .consensus - .replica - .process_leader_commit(ctx, &self.consensus.inner, msg.cast().unwrap()) - .await; - Ok(res) - }) - .join(ctx) - }) - .await - .unwrap() - } - - pub(crate) async fn recv_signed(&mut self) -> Result, Canceled> { - self.pipe - .recv(&self.ctx) - .await - .map(|output_message| match output_message { - OutputMessage::Network(ConsensusInputMessage { - message: signed, .. - }) => signed, - }) + ctx: &ctx::Ctx, + msg: ReplicaCommit, + ) -> Signed { + for (i, key) in self.keys.iter().enumerate() { + let res = self.consensus.leader.process_replica_commit( + ctx, + &self.consensus.inner, + key.sign_msg(msg), + ); + match (i + 1).cmp(&self.consensus_threshold()) { + Ordering::Equal => res.unwrap(), + Ordering::Less => assert_matches!( + res, + Err(ReplicaCommitError::NumReceivedBelowThreshold { + num_messages, + threshold, + }) => { + assert_eq!(num_messages, i+1); + assert_eq!(threshold, self.consensus_threshold()) + } + ), + Ordering::Greater => assert_matches!(res, Err(ReplicaCommitError::Old { .. })), + } + } + self.try_recv().unwrap() } - pub(crate) fn replica_view(&self) -> ViewNumber { - self.consensus.replica.view + fn try_recv>(&mut self) -> Option> { + self.pipe.try_recv().map(|message| match message { + OutputMessage::Network(ConsensusInputMessage { message, .. }) => { + message.cast().unwrap() + } + }) } - pub(crate) fn replica_phase(&self) -> Phase { - self.consensus.replica.phase + pub(crate) async fn process_replica_timeout( + &mut self, + ctx: &ctx::Ctx, + ) -> Signed { + self.consensus + .replica + .process_input(ctx, &self.consensus.inner, None) + .await + .unwrap(); + self.try_recv().unwrap() } pub(crate) fn leader_phase(&self) -> Phase { @@ -485,60 +329,22 @@ impl UTHarness { self.consensus.inner.view_leader(view) } - pub(crate) fn new_commit_qc(&self, mutate_fn: impl FnOnce(&mut ReplicaCommit)) -> CommitQC { - let validator_set = - validator::ValidatorSet::new(self.keys.iter().map(|k| k.public())).unwrap(); - - let msg = self - .new_current_replica_commit(mutate_fn) - .cast() - .unwrap() - .msg; - - let signed_messages: Vec<_> = self.keys.iter().map(|sk| sk.sign_msg(msg)).collect(); - - CommitQC::from(&signed_messages, &validator_set).unwrap() + pub(crate) fn validator_set(&self) -> validator::ValidatorSet { + validator::ValidatorSet::new(self.keys.iter().map(|k| k.public())).unwrap() } - pub(crate) fn new_prepare_qc(&self, mutate_fn: impl FnOnce(&mut ReplicaPrepare)) -> PrepareQC { - let validator_set = - validator::ValidatorSet::new(self.keys.iter().map(|k| k.public())).unwrap(); - - let msg: ReplicaPrepare = self - .new_current_replica_prepare(mutate_fn) - .cast() - .unwrap() - .msg; - - let signed_messages: Vec<_> = self - .keys - .iter() - .map(|sk| sk.sign_msg(msg.clone())) - .collect(); - - PrepareQC::from(&signed_messages, &validator_set).unwrap() + pub(crate) fn new_commit_qc(&self, mutate_fn: impl FnOnce(&mut ReplicaCommit)) -> CommitQC { + let msg = self.new_current_replica_commit(mutate_fn).msg; + let msgs: Vec<_> = self.keys.iter().map(|k| k.sign_msg(msg)).collect(); + CommitQC::from(&msgs, &self.validator_set()).unwrap() } - pub(crate) fn new_prepare_qc_many( + pub(crate) fn new_prepare_qc( &mut self, - mutate_fn: &dyn Fn(&mut ReplicaPrepare), + mutate_fn: impl FnOnce(&mut ReplicaPrepare), ) -> PrepareQC { - let validator_set = - validator::ValidatorSet::new(self.keys.iter().map(|k| k.public())).unwrap(); - - let signed_messages: Vec<_> = self - .keys - .iter() - .map(|sk| { - let msg: ReplicaPrepare = self - .new_current_replica_prepare(|msg| mutate_fn(msg)) - .cast() - .unwrap() - .msg; - sk.sign_msg(msg.clone()) - }) - .collect(); - - PrepareQC::from(&signed_messages, &validator_set).unwrap() + let msg = self.new_replica_prepare(mutate_fn).msg; + let msgs: Vec<_> = self.keys.iter().map(|k| k.sign_msg(msg.clone())).collect(); + PrepareQC::from(&msgs, &self.validator_set()).unwrap() } } diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 50cac4e2..35e600e3 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -1,16 +1,12 @@ use crate::{ - leader::{ReplicaCommitError, ReplicaPrepareError}, misc::consensus_threshold, testonly::{ut_harness::UTHarness, Behavior, Network, Test}, }; -use assert_matches::assert_matches; -use zksync_concurrency::{ctx, testonly::abort_on_panic}; -use zksync_consensus_roles::validator::{ - LeaderCommit, LeaderPrepare, Phase, ReplicaCommit, ReplicaPrepare, -}; +use zksync_concurrency::ctx; +use zksync_consensus_roles::validator::Phase; async fn run_test(behavior: Behavior, network: Network) { - abort_on_panic(); + zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::AffineClock::new(1.)); const NODES: usize = 11; @@ -71,190 +67,79 @@ async fn byzantine_real_network() { // Testing liveness after the network becomes idle with leader having no cached prepare messages for the current view. #[tokio::test] async fn timeout_leader_no_prepares() { - let mut util = UTHarness::new_many().await; - - let base_rp = util - .new_current_replica_prepare(|_| {}) - .cast::() - .unwrap() - .msg; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; - util.sim_timeout().await; - - util.check_recovery_after_timeout( - base_rp.view.next(), - base_rp.high_vote.view, - base_rp.high_qc.message.view, - ) - .await; + util.new_replica_prepare(|_| {}); + util.produce_block_after_timeout(ctx).await; } /// Testing liveness after the network becomes idle with leader having some cached prepare messages for the current view. #[tokio::test] async fn timeout_leader_some_prepares() { - let mut util = UTHarness::new_many().await; - - let replica_prepare = util.new_current_replica_prepare(|_| {}); - let res = util.dispatch_replica_prepare_one(replica_prepare.clone()); - assert_matches!( - res, - Err(ReplicaPrepareError::NumReceivedBelowThreshold { - num_messages, - threshold, - }) => { - assert_eq!(num_messages, 1); - assert_eq!(threshold, util.consensus_threshold()) - } - ); - let base_rp = replica_prepare.cast::().unwrap().msg; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; - util.sim_timeout().await; - - util.check_recovery_after_timeout( - base_rp.view.next(), - base_rp.high_vote.view, - base_rp.high_qc.message.view, - ) - .await; + let replica_prepare = util.new_replica_prepare(|_| {}); + assert!(util + .process_replica_prepare(ctx, replica_prepare) + .await + .is_err()); + util.produce_block_after_timeout(ctx).await; } /// Testing liveness after the network becomes idle with leader in commit phase. #[tokio::test] async fn timeout_leader_in_commit() { - let mut util = UTHarness::new_many().await; - - let base_rp = util - .new_current_replica_prepare(|_| {}) - .cast::() - .unwrap() - .msg; - util.dispatch_replica_prepare_many( - vec![base_rp.clone(); util.consensus_threshold()], - util.keys(), - ) - .unwrap(); - util.recv_signed() - .await - .unwrap() - .cast::() - .unwrap(); - - util.sim_timeout().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; + util.new_leader_prepare(ctx).await; // Leader is in `Phase::Commit`, but should still accept prepares from newer views. - assert_eq!(util.leader_phase(), Phase::Commit); - - util.check_recovery_after_timeout( - base_rp.view.next(), - base_rp.high_vote.view, - base_rp.high_qc.message.view, - ) - .await; + assert_eq!(util.consensus.leader.phase, Phase::Commit); + util.produce_block_after_timeout(ctx).await; } /// Testing liveness after the network becomes idle with replica in commit phase. #[tokio::test] async fn timeout_replica_in_commit() { - let mut util = UTHarness::new_many().await; - - let base_rp = util - .new_current_replica_prepare(|_| {}) - .cast::() - .unwrap() - .msg; - - let leader_prepare = util.new_procedural_leader_prepare_many().await; - util.dispatch_leader_prepare(leader_prepare).await.unwrap(); - util.recv_signed() - .await - .unwrap() - .cast::() - .unwrap(); - - util.sim_timeout().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; + util.new_replica_commit(ctx).await; // Leader is in `Phase::Commit`, but should still accept prepares from newer views. - assert_eq!(util.leader_phase(), Phase::Commit); - - util.check_recovery_after_timeout( - base_rp.view.next(), - base_rp.view, - base_rp.high_qc.message.view, - ) - .await; + assert_eq!(util.consensus.leader.phase, Phase::Commit); + util.produce_block_after_timeout(ctx).await; } /// Testing liveness after the network becomes idle with leader having some cached commit messages for the current view. #[tokio::test] async fn timeout_leader_some_commits() { - let mut util = UTHarness::new_many().await; - - let base_rp = util - .new_current_replica_prepare(|_| {}) - .cast::() - .unwrap() - .msg; - - let replica_commit = util.new_procedural_replica_commit_many().await; - let res = util.dispatch_replica_commit_one(replica_commit); - assert_matches!( - res, - Err(ReplicaCommitError::NumReceivedBelowThreshold { - num_messages, - threshold, - }) => { - assert_eq!(num_messages, 1); - assert_eq!(threshold, util.consensus_threshold()) - } - ); - - util.sim_timeout().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; + let replica_commit = util.new_replica_commit(ctx).await; + assert!(util + .process_replica_commit(ctx, replica_commit) + .await + .is_err()); // Leader is in `Phase::Commit`, but should still accept prepares from newer views. assert_eq!(util.leader_phase(), Phase::Commit); - - util.check_recovery_after_timeout( - base_rp.view.next(), - base_rp.view, - base_rp.high_qc.message.view, - ) - .await; + util.produce_block_after_timeout(ctx).await; } /// Testing liveness after the network becomes idle with leader in a consecutive prepare phase. #[tokio::test] async fn timeout_leader_in_consecutive_prepare() { - let mut util = UTHarness::new_many().await; - - let base_rp = util - .new_current_replica_prepare(|_| {}) - .cast::() - .unwrap() - .msg; - - let replica_commit = util - .new_procedural_replica_commit_many() - .await - .cast() - .unwrap() - .msg; - util.dispatch_replica_commit_many( - vec![replica_commit; util.consensus_threshold()], - util.keys(), - ) - .unwrap(); - util.recv_signed() - .await - .unwrap() - .cast::() - .unwrap(); - - util.sim_timeout().await; + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let mut util = UTHarness::new_many(ctx).await; - util.check_recovery_after_timeout( - base_rp.view.next(), - base_rp.view, - base_rp.high_qc.message.view, - ) - .await; + util.new_leader_commit(ctx).await; + util.produce_block_after_timeout(ctx).await; } diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index 02d385e0..a65857d6 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -1,13 +1,12 @@ //! Library files for the executor. We have it separate from the binary so that we can use these files in the tools crate. - use crate::io::Dispatcher; use anyhow::Context as _; -use std::{any, sync::Arc}; +use std::{any, fmt, sync::Arc}; use zksync_concurrency::{ctx, net, scope}; -use zksync_consensus_bft::{misc::consensus_threshold, Consensus}; +use zksync_consensus_bft::{misc::consensus_threshold, Consensus, PayloadSource}; use zksync_consensus_network as network; use zksync_consensus_roles::{node, validator}; -use zksync_consensus_storage::{BlockStore, ReplicaStateStore, ReplicaStore, WriteBlockStore}; +use zksync_consensus_storage::{ReplicaStateStore, ReplicaStore, WriteBlockStore}; use zksync_consensus_sync_blocks::SyncBlocks; use zksync_consensus_utils::pipe; @@ -20,7 +19,6 @@ mod tests; pub use self::config::{proto, ConsensusConfig, ExecutorConfig, GossipConfig}; /// Validator-related part of [`Executor`]. -#[derive(Debug)] struct ValidatorExecutor { /// Consensus network configuration. config: ConsensusConfig, @@ -28,6 +26,16 @@ struct ValidatorExecutor { key: validator::SecretKey, /// Store for replica state. replica_state_store: Arc, + /// Payload proposer for new blocks. + payload_source: Arc, +} + +impl fmt::Debug for ValidatorExecutor { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("ValidatorExecutor") + .field("config", &self.config) + .finish() + } } impl ValidatorExecutor { @@ -94,6 +102,7 @@ impl Executor { config: ConsensusConfig, key: validator::SecretKey, replica_state_store: Arc, + payload_source: Arc, ) -> anyhow::Result<()> { let public = &config.key; anyhow::ensure!( @@ -112,6 +121,7 @@ impl Executor { config, key, replica_state_store, + payload_source, }); } else { tracing::info!( @@ -175,6 +185,7 @@ impl Executor { validator.key.clone(), validator_set.clone(), consensus_storage, + validator.payload_source, ) .await .context("consensus")?; diff --git a/node/actors/executor/src/testonly.rs b/node/actors/executor/src/testonly.rs index 3ee0a2eb..d88264cd 100644 --- a/node/actors/executor/src/testonly.rs +++ b/node/actors/executor/src/testonly.rs @@ -5,10 +5,7 @@ use std::collections::HashMap; use zksync_concurrency::net; use zksync_consensus_bft::testonly::make_genesis; use zksync_consensus_network::{consensus, testonly::Instance}; -use zksync_consensus_roles::{ - node, - validator::{self, Payload}, -}; +use zksync_consensus_roles::{node, validator}; impl ConsensusConfig { fn from_network_config( @@ -44,7 +41,8 @@ impl FullValidatorConfig { pub fn for_single_validator( rng: &mut impl Rng, protocol_version: validator::ProtocolVersion, - genesis_block_payload: Payload, + genesis_block_payload: validator::Payload, + genesis_block_number: validator::BlockNumber, ) -> Self { let mut net_configs = Instance::new_configs(rng, 1, 0); assert_eq!(net_configs.len(), 1); @@ -58,6 +56,7 @@ impl FullValidatorConfig { &[validator_key.clone()], protocol_version, genesis_block_payload, + genesis_block_number, ); let node_key = net_config.gossip.key.clone(); let node_config = ExecutorConfig { diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 1ed5cbff..0e035355 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -6,6 +6,7 @@ use rand::{thread_rng, Rng}; use std::iter; use test_casing::test_casing; use zksync_concurrency::{sync, testonly::abort_on_panic, time}; +use zksync_consensus_bft::testonly::RandomPayloadSource; use zksync_consensus_roles::validator::{BlockNumber, FinalBlock, Payload}; use zksync_consensus_storage::{BlockStore, InMemoryStorage}; @@ -40,7 +41,12 @@ impl FullValidatorConfig { .await .unwrap(); executor - .set_validator(self.consensus_config, self.validator_key, storage) + .set_validator( + self.consensus_config, + self.validator_key, + storage, + Arc::new(RandomPayloadSource), + ) .unwrap(); executor } @@ -71,13 +77,15 @@ async fn executor_misconfiguration(name: &str, mutation: fn(&mut FinalBlock)) { rng, validator::ProtocolVersion::EARLIEST, Payload(vec![]), + BlockNumber(0), ); let genesis_block = &mut validator.node_config.genesis_block; mutation(genesis_block); let storage = Arc::new(InMemoryStorage::new(genesis_block.clone())); let err = Executor::new(ctx, validator.node_config, validator.node_key, storage) .await - .unwrap_err(); + .err() + .unwrap(); tracing::info!(%err, "received expected validation error"); } @@ -91,13 +99,15 @@ async fn genesis_block_mismatch() { rng, validator::ProtocolVersion::EARLIEST, Payload(vec![]), + BlockNumber(0), ); let mut genesis_block = validator.node_config.genesis_block.clone(); genesis_block.header.number = BlockNumber(1); let storage = Arc::new(InMemoryStorage::new(genesis_block.clone())); let err = Executor::new(ctx, validator.node_config, validator.node_key, storage) .await - .unwrap_err(); + .err() + .unwrap(); tracing::info!(%err, "received expected validation error"); } @@ -111,6 +121,7 @@ async fn executing_single_validator() { rng, validator::ProtocolVersion::EARLIEST, Payload(vec![]), + BlockNumber(0), ); let genesis_block = &validator.node_config.genesis_block; let storage = InMemoryStorage::new(genesis_block.clone()); @@ -140,6 +151,7 @@ async fn executing_validator_and_full_node() { rng, validator::ProtocolVersion::EARLIEST, Payload(vec![]), + BlockNumber(0), ); let full_node = validator.connect_full_node(rng); @@ -186,6 +198,7 @@ async fn syncing_full_node_from_snapshot(delay_block_storage: bool) { rng, validator::ProtocolVersion::EARLIEST, Payload(vec![]), + BlockNumber(0), ); let mut full_node = validator.connect_full_node(rng); diff --git a/node/actors/sync_blocks/src/peers/tests/mod.rs b/node/actors/sync_blocks/src/peers/tests/mod.rs index 06e50b97..cf902834 100644 --- a/node/actors/sync_blocks/src/peers/tests/mod.rs +++ b/node/actors/sync_blocks/src/peers/tests/mod.rs @@ -7,7 +7,7 @@ use std::{collections::HashSet, fmt}; use test_casing::{test_casing, Product}; use zksync_concurrency::{testonly::abort_on_panic, time}; use zksync_consensus_roles::validator; -use zksync_consensus_storage::{BlockStore, InMemoryStorage}; +use zksync_consensus_storage::InMemoryStorage; mod basics; mod fakes; diff --git a/node/actors/sync_blocks/src/tests/mod.rs b/node/actors/sync_blocks/src/tests/mod.rs index 154c2e0b..f88007b2 100644 --- a/node/actors/sync_blocks/src/tests/mod.rs +++ b/node/actors/sync_blocks/src/tests/mod.rs @@ -48,7 +48,7 @@ impl TestValidators { }; let payload = Payload(vec![]); - let mut latest_block = BlockHeader::genesis(payload.hash()); + let mut latest_block = BlockHeader::genesis(payload.hash(), BlockNumber(0)); let final_blocks = (0..block_count).map(|_| { let final_block = FinalBlock { header: latest_block, diff --git a/node/deny.toml b/node/deny.toml index 8759a166..ea2300b2 100644 --- a/node/deny.toml +++ b/node/deny.toml @@ -58,6 +58,9 @@ skip = [ # Old versions required by pairing_ce & ff_ce. { name = "rand", version = "0.4" }, { name = "syn", version = "1.0" }, + + # Old versions required by criterion. + { name = "itertools", version = "0.10.5" }, ] [sources] diff --git a/node/libs/roles/src/validator/messages/block.rs b/node/libs/roles/src/validator/messages/block.rs index 5fae9ec4..adc3fe7b 100644 --- a/node/libs/roles/src/validator/messages/block.rs +++ b/node/libs/roles/src/validator/messages/block.rs @@ -131,10 +131,10 @@ impl BlockHeader { } /// Creates a genesis block. - pub fn genesis(payload: PayloadHash) -> Self { + pub fn genesis(payload: PayloadHash, number: BlockNumber) -> Self { Self { parent: BlockHeaderHash(Keccak256::default()), - number: BlockNumber(0), + number, payload, } } diff --git a/node/libs/roles/src/validator/testonly.rs b/node/libs/roles/src/validator/testonly.rs index 038aae2e..dfe1e0f4 100644 --- a/node/libs/roles/src/validator/testonly.rs +++ b/node/libs/roles/src/validator/testonly.rs @@ -36,7 +36,7 @@ pub fn make_justification( /// WARNING: it is not a fully correct FinalBlock. pub fn make_genesis_block(rng: &mut R, protocol_version: ProtocolVersion) -> FinalBlock { let payload: Payload = rng.gen(); - let header = BlockHeader::genesis(payload.hash()); + let header = BlockHeader::genesis(payload.hash(), BlockNumber(0)); let justification = make_justification(rng, &header, protocol_version); FinalBlock { header, diff --git a/node/libs/storage/src/in_memory.rs b/node/libs/storage/src/in_memory.rs index 425dd3bf..7bbb1394 100644 --- a/node/libs/storage/src/in_memory.rs +++ b/node/libs/storage/src/in_memory.rs @@ -10,30 +10,33 @@ use zksync_concurrency::{ ctx, sync::{watch, Mutex}, }; -use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; +use zksync_consensus_roles::validator; #[derive(Debug)] struct BlocksInMemoryStore { - blocks: BTreeMap, - last_contiguous_block_number: BlockNumber, + blocks: BTreeMap, + last_contiguous_block_number: validator::BlockNumber, } impl BlocksInMemoryStore { - fn head_block(&self) -> &FinalBlock { + fn head_block(&self) -> &validator::FinalBlock { self.blocks.values().next_back().unwrap() // ^ `unwrap()` is safe by construction; the storage contains at least the genesis block } - fn first_block(&self) -> &FinalBlock { + fn first_block(&self) -> &validator::FinalBlock { self.blocks.values().next().unwrap() // ^ `unwrap()` is safe by construction; the storage contains at least the genesis block } - fn block(&self, number: BlockNumber) -> Option<&FinalBlock> { + fn block(&self, number: validator::BlockNumber) -> Option<&validator::FinalBlock> { self.blocks.get(&number) } - fn missing_block_numbers(&self, range: ops::Range) -> Vec { + fn missing_block_numbers( + &self, + range: ops::Range, + ) -> Vec { let existing_numbers = self .blocks .range(range.clone()) @@ -43,7 +46,7 @@ impl BlocksInMemoryStore { .collect() } - fn put_block(&mut self, block: FinalBlock) { + fn put_block(&mut self, block: validator::FinalBlock) { let block_number = block.header.number; tracing::debug!("Inserting block #{block_number} into database"); if let Some(prev_block) = self.blocks.insert(block_number, block) { @@ -69,12 +72,12 @@ impl BlocksInMemoryStore { pub struct InMemoryStorage { blocks: Mutex, replica_state: Mutex>, - blocks_sender: watch::Sender, + blocks_sender: watch::Sender, } impl InMemoryStorage { /// Creates a new store containing only the specified `genesis_block`. - pub fn new(genesis_block: FinalBlock) -> Self { + pub fn new(genesis_block: validator::FinalBlock) -> Self { let genesis_block_number = genesis_block.header.number; Self { blocks: Mutex::new(BlocksInMemoryStore { @@ -89,38 +92,62 @@ impl InMemoryStorage { #[async_trait] impl BlockStore for InMemoryStorage { - async fn head_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn head_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { Ok(self.blocks.lock().await.head_block().clone()) } - async fn first_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn first_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { Ok(self.blocks.lock().await.first_block().clone()) } - async fn last_contiguous_block_number(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn last_contiguous_block_number( + &self, + _ctx: &ctx::Ctx, + ) -> ctx::Result { Ok(self.blocks.lock().await.last_contiguous_block_number) } - async fn block(&self, _ctx: &ctx::Ctx, number: BlockNumber) -> ctx::Result> { + async fn block( + &self, + _ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { Ok(self.blocks.lock().await.block(number).cloned()) } async fn missing_block_numbers( &self, _ctx: &ctx::Ctx, - range: ops::Range, - ) -> ctx::Result> { + range: ops::Range, + ) -> ctx::Result> { Ok(self.blocks.lock().await.missing_block_numbers(range)) } - fn subscribe_to_block_writes(&self) -> watch::Receiver { + fn subscribe_to_block_writes(&self) -> watch::Receiver { self.blocks_sender.subscribe() } } #[async_trait] impl WriteBlockStore for InMemoryStorage { - async fn put_block(&self, _ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()> { + /// Just verifies that the payload is for the successor of the current head. + async fn verify_payload( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + _payload: &validator::Payload, + ) -> ctx::Result<()> { + let head_number = self.head_block(ctx).await?.header.number; + if head_number >= block_number { + return Err(anyhow::anyhow!( + "received proposal for block {block_number:?}, while head is at {head_number:?}" + ) + .into()); + } + Ok(()) + } + + async fn put_block(&self, _ctx: &ctx::Ctx, block: &validator::FinalBlock) -> ctx::Result<()> { self.blocks.lock().await.put_block(block.clone()); self.blocks_sender.send_replace(block.header.number); Ok(()) diff --git a/node/libs/storage/src/replica_state.rs b/node/libs/storage/src/replica_state.rs index 23413fc1..5daf2a82 100644 --- a/node/libs/storage/src/replica_state.rs +++ b/node/libs/storage/src/replica_state.rs @@ -20,7 +20,7 @@ impl From for ReplicaState { } } -/// [`ReplicaStateStore`] wrapper that falls back to a specified block store. +/// Storage combining [`ReplicaStateStore`] and [`WriteBlockStore`]. #[derive(Debug, Clone)] pub struct ReplicaStore { state: Arc, @@ -64,6 +64,16 @@ impl ReplicaStore { self.state.put_replica_state(ctx, replica_state).await } + /// Verify that `payload` is a correct proposal for the block `block_number`. + pub async fn verify_payload( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + payload: &validator::Payload, + ) -> ctx::Result<()> { + self.blocks.verify_payload(ctx, block_number, payload).await + } + /// Puts a block into this storage. pub async fn put_block( &self, diff --git a/node/libs/storage/src/rocksdb.rs b/node/libs/storage/src/rocksdb.rs index a5f7b121..6366e313 100644 --- a/node/libs/storage/src/rocksdb.rs +++ b/node/libs/storage/src/rocksdb.rs @@ -18,7 +18,7 @@ use std::{ }, }; use zksync_concurrency::{ctx, scope, sync::watch}; -use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; +use zksync_consensus_roles::validator; /// Enum used to represent a key in the database. It also acts as a separator between different stores. #[derive(Debug, Clone, PartialEq, Eq)] @@ -27,8 +27,8 @@ enum DatabaseKey { /// ReplicaState -> ReplicaState ReplicaState, /// Key used to store the finalized blocks. - /// Block(BlockNumber) -> FinalBlock - Block(BlockNumber), + /// Block(validator::BlockNumber) -> validator::FinalBlock + Block(validator::BlockNumber), } impl DatabaseKey { @@ -57,11 +57,11 @@ impl DatabaseKey { } /// Parses the specified bytes as a `Self::Block(_)` key. - pub(crate) fn parse_block_key(raw_key: &[u8]) -> anyhow::Result { + pub(crate) fn parse_block_key(raw_key: &[u8]) -> anyhow::Result { let raw_key = raw_key .try_into() .context("Invalid encoding for block key")?; - Ok(BlockNumber(u64::from_be_bytes(raw_key))) + Ok(validator::BlockNumber(u64::from_be_bytes(raw_key))) } } @@ -79,7 +79,7 @@ pub struct RocksdbStorage { /// that blocks are never removed from the DB. cached_last_contiguous_block_number: AtomicU64, /// Sender of numbers of written blocks. - block_writes_sender: watch::Sender, + block_writes_sender: watch::Sender, } impl RocksdbStorage { @@ -87,7 +87,11 @@ impl RocksdbStorage { /// a new one. We need the genesis block of the chain as input. // TODO(bruno): we want to eventually start pruning old blocks, so having the genesis // block might be unnecessary. - pub async fn new(ctx: &ctx::Ctx, genesis_block: &FinalBlock, path: &Path) -> ctx::Result { + pub async fn new( + ctx: &ctx::Ctx, + genesis_block: &validator::FinalBlock, + path: &Path, + ) -> ctx::Result { let mut options = rocksdb::Options::default(); options.create_missing_column_families(true); options.create_if_missing(true); @@ -127,7 +131,7 @@ impl RocksdbStorage { self.inner.write().expect("DB lock is poisoned") } - fn head_block_blocking(&self) -> anyhow::Result { + fn head_block_blocking(&self) -> anyhow::Result { let db = self.read(); let mut options = ReadOptions::default(); @@ -141,7 +145,7 @@ impl RocksdbStorage { } /// Returns a block with the least number stored in this database. - fn first_block_blocking(&self) -> anyhow::Result { + fn first_block_blocking(&self) -> anyhow::Result { let db = self.read(); let mut options = ReadOptions::default(); @@ -154,11 +158,11 @@ impl RocksdbStorage { zksync_protobuf::decode(&first_block).context("Failed decoding first stored block bytes") } - fn last_contiguous_block_number_blocking(&self) -> anyhow::Result { + fn last_contiguous_block_number_blocking(&self) -> anyhow::Result { let last_contiguous_block_number = self .cached_last_contiguous_block_number .load(Ordering::Relaxed); - let last_contiguous_block_number = BlockNumber(last_contiguous_block_number); + let last_contiguous_block_number = validator::BlockNumber(last_contiguous_block_number); let last_contiguous_block_number = self.last_contiguous_block_number_impl(last_contiguous_block_number)?; @@ -175,8 +179,8 @@ impl RocksdbStorage { // is for the `cached_last_contiguous_block_number` to be present in the database. fn last_contiguous_block_number_impl( &self, - cached_last_contiguous_block_number: BlockNumber, - ) -> anyhow::Result { + cached_last_contiguous_block_number: validator::BlockNumber, + ) -> anyhow::Result { let db = self.read(); let mut options = ReadOptions::default(); @@ -202,7 +206,10 @@ impl RocksdbStorage { } /// Gets a block by its number. - fn block_blocking(&self, number: BlockNumber) -> anyhow::Result> { + fn block_blocking( + &self, + number: validator::BlockNumber, + ) -> anyhow::Result> { let db = self.read(); let Some(raw_block) = db @@ -219,8 +226,8 @@ impl RocksdbStorage { /// Iterates over block numbers in the specified `range` that the DB *does not* have. fn missing_block_numbers_blocking( &self, - range: ops::Range, - ) -> anyhow::Result> { + range: ops::Range, + ) -> anyhow::Result> { let db = self.read(); let mut options = ReadOptions::default(); @@ -242,7 +249,7 @@ impl RocksdbStorage { // ---------------- Write methods ---------------- /// Insert a new block into the database. - fn put_block_blocking(&self, finalized_block: &FinalBlock) -> anyhow::Result<()> { + fn put_block_blocking(&self, finalized_block: &validator::FinalBlock) -> anyhow::Result<()> { let db = self.write(); let block_number = finalized_block.header.number; tracing::debug!("Inserting new block #{block_number} into the database."); @@ -292,38 +299,62 @@ impl fmt::Debug for RocksdbStorage { #[async_trait] impl BlockStore for RocksdbStorage { - async fn head_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn head_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { Ok(scope::wait_blocking(|| self.head_block_blocking()).await?) } - async fn first_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn first_block(&self, _ctx: &ctx::Ctx) -> ctx::Result { Ok(scope::wait_blocking(|| self.first_block_blocking()).await?) } - async fn last_contiguous_block_number(&self, _ctx: &ctx::Ctx) -> ctx::Result { + async fn last_contiguous_block_number( + &self, + _ctx: &ctx::Ctx, + ) -> ctx::Result { Ok(scope::wait_blocking(|| self.last_contiguous_block_number_blocking()).await?) } - async fn block(&self, _ctx: &ctx::Ctx, number: BlockNumber) -> ctx::Result> { + async fn block( + &self, + _ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { Ok(scope::wait_blocking(|| self.block_blocking(number)).await?) } async fn missing_block_numbers( &self, _ctx: &ctx::Ctx, - range: ops::Range, - ) -> ctx::Result> { + range: ops::Range, + ) -> ctx::Result> { Ok(scope::wait_blocking(|| self.missing_block_numbers_blocking(range)).await?) } - fn subscribe_to_block_writes(&self) -> watch::Receiver { + fn subscribe_to_block_writes(&self) -> watch::Receiver { self.block_writes_sender.subscribe() } } #[async_trait] impl WriteBlockStore for RocksdbStorage { - async fn put_block(&self, _ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()> { + /// Just verifies that the payload is for the successor of the current head. + async fn verify_payload( + &self, + ctx: &ctx::Ctx, + block_number: validator::BlockNumber, + _payload: &validator::Payload, + ) -> ctx::Result<()> { + let head_number = self.head_block(ctx).await?.header.number; + if head_number >= block_number { + return Err(anyhow::anyhow!( + "received proposal for block {block_number:?}, while head is at {head_number:?}" + ) + .into()); + } + Ok(()) + } + + async fn put_block(&self, _ctx: &ctx::Ctx, block: &validator::FinalBlock) -> ctx::Result<()> { Ok(scope::wait_blocking(|| self.put_block_blocking(block)).await?) } } diff --git a/node/libs/storage/src/tests/mod.rs b/node/libs/storage/src/tests/mod.rs index 229209d1..ffcb4744 100644 --- a/node/libs/storage/src/tests/mod.rs +++ b/node/libs/storage/src/tests/mod.rs @@ -31,7 +31,7 @@ impl InitStore for () { fn genesis_block(rng: &mut impl Rng) -> FinalBlock { let payload = Payload(vec![]); FinalBlock { - header: BlockHeader::genesis(payload.hash()), + header: BlockHeader::genesis(payload.hash(), BlockNumber(0)), payload, justification: rng.gen(), } diff --git a/node/libs/storage/src/traits.rs b/node/libs/storage/src/traits.rs index b5f90c73..83e069f6 100644 --- a/node/libs/storage/src/traits.rs +++ b/node/libs/storage/src/traits.rs @@ -1,9 +1,9 @@ //! Traits for storage. use crate::types::ReplicaState; use async_trait::async_trait; -use std::{fmt, ops, sync::Arc}; +use std::{fmt, ops}; use zksync_concurrency::{ctx, sync::watch}; -use zksync_consensus_roles::validator::{BlockNumber, FinalBlock}; +use zksync_consensus_roles::validator::{BlockNumber, FinalBlock, Payload}; /// Storage of L2 blocks. /// @@ -45,53 +45,23 @@ pub trait BlockStore: fmt::Debug + Send + Sync { fn subscribe_to_block_writes(&self) -> watch::Receiver; } -#[async_trait] -impl BlockStore for Arc { - async fn head_block(&self, ctx: &ctx::Ctx) -> ctx::Result { - (**self).head_block(ctx).await - } - - async fn first_block(&self, ctx: &ctx::Ctx) -> ctx::Result { - (**self).first_block(ctx).await - } - - async fn last_contiguous_block_number(&self, ctx: &ctx::Ctx) -> ctx::Result { - (**self).last_contiguous_block_number(ctx).await - } - - async fn block(&self, ctx: &ctx::Ctx, number: BlockNumber) -> ctx::Result> { - (**self).block(ctx, number).await - } - - async fn missing_block_numbers( - &self, - ctx: &ctx::Ctx, - range: ops::Range, - ) -> ctx::Result> { - (**self).missing_block_numbers(ctx, range).await - } - - fn subscribe_to_block_writes(&self) -> watch::Receiver { - (**self).subscribe_to_block_writes() - } -} - /// Mutable storage of L2 blocks. /// -/// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. +/// Implementations **must** propagate context cancellation using [`ctx::Error::Canceled`]. #[async_trait] pub trait WriteBlockStore: BlockStore { + /// Verify that `payload` is a correct proposal for the block `block_number`. + async fn verify_payload( + &self, + ctx: &ctx::Ctx, + block_number: BlockNumber, + _payload: &Payload, + ) -> ctx::Result<()>; + /// Puts a block into this storage. async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()>; } -#[async_trait] -impl WriteBlockStore for Arc { - async fn put_block(&self, ctx: &ctx::Ctx, block: &FinalBlock) -> ctx::Result<()> { - (**self).put_block(ctx, block).await - } -} - /// Storage for [`ReplicaState`]. /// /// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. @@ -107,18 +77,3 @@ pub trait ReplicaStateStore: fmt::Debug + Send + Sync { replica_state: &ReplicaState, ) -> ctx::Result<()>; } - -#[async_trait] -impl ReplicaStateStore for Arc { - async fn replica_state(&self, ctx: &ctx::Ctx) -> ctx::Result> { - (**self).replica_state(ctx).await - } - - async fn put_replica_state( - &self, - ctx: &ctx::Ctx, - replica_state: &ReplicaState, - ) -> ctx::Result<()> { - (**self).put_replica_state(ctx, replica_state).await - } -} diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index 3b5461ed..bbeffebf 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -71,6 +71,7 @@ fn main() -> anyhow::Result<()> { &validator_keys, protocol_version, validator::Payload(vec![]), + validator::BlockNumber(0), ); // Each node will have `gossip_peers` outbound peers. diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index d4e8dfd6..b869f613 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -111,7 +111,12 @@ async fn main() -> anyhow::Result<()> { .context("Executor::new()")?; if let Some((consensus_config, validator_key)) = configs.consensus { executor - .set_validator(consensus_config, validator_key, storage.clone()) + .set_validator( + consensus_config, + validator_key, + storage.clone(), + Arc::new(zksync_consensus_bft::testonly::RandomPayloadSource), + ) .context("Executor::set_validator()")?; }