diff --git a/node/actors/bft/src/config.rs b/node/actors/bft/src/config.rs index 1399fc92..beeb32ea 100644 --- a/node/actors/bft/src/config.rs +++ b/node/actors/bft/src/config.rs @@ -1,9 +1,9 @@ //! The inner data of the consensus state machine. This is shared between the different roles. use crate::{misc, PayloadManager}; use std::sync::Arc; -use zksync_consensus_storage as storage; use tracing::instrument; use zksync_consensus_roles::validator; +use zksync_consensus_storage as storage; /// Configuration of the bft actor. #[derive(Debug)] diff --git a/node/actors/bft/src/leader/state_machine.rs b/node/actors/bft/src/leader/state_machine.rs index 80413b44..0d65ea6a 100644 --- a/node/actors/bft/src/leader/state_machine.rs +++ b/node/actors/bft/src/leader/state_machine.rs @@ -104,7 +104,7 @@ impl StateMachine { ctx: &ctx::Ctx, config: &Config, mut prepare_qc: sync::watch::Receiver>, - pipe: &OutputPipe, + pipe: &OutputPipe, ) -> ctx::Result<()> { let mut next_view = validator::ViewNumber(0); loop { @@ -158,7 +158,8 @@ impl StateMachine { Some(proposal) if proposal != highest_qc.message.proposal => (proposal, None), // The previous block was finalized, so we can propose a new block. _ => { - let payload = cfg.payload_manager + let payload = cfg + .payload_manager .propose(ctx, highest_qc.message.proposal.number.next()) .await?; metrics::METRICS @@ -173,7 +174,7 @@ impl StateMachine { // ----------- Prepare our message and send it -------------- // Broadcast the leader prepare message to all replicas (ourselves included). - let msg = cfg + let msg = cfg .secret_key .sign_msg(validator::ConsensusMsg::LeaderPrepare( validator::LeaderPrepare { diff --git a/node/actors/bft/src/leader/tests.rs b/node/actors/bft/src/leader/tests.rs index 43dc8f4f..41f1ee6f 100644 --- a/node/actors/bft/src/leader/tests.rs +++ b/node/actors/bft/src/leader/tests.rs @@ -5,30 +5,32 @@ use crate::testonly::ut_harness::UTHarness; use assert_matches::assert_matches; use pretty_assertions::assert_eq; use rand::Rng; -use zksync_concurrency::{ctx,scope}; +use zksync_concurrency::{ctx, scope}; use zksync_consensus_roles::validator::{self, LeaderCommit, Phase, ViewNumber}; #[tokio::test] async fn replica_prepare_sanity() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); - + util.new_leader_prepare(ctx).await; Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_sanity_yield_leader_prepare() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let replica_prepare = util.new_replica_prepare(|_| {}); let leader_prepare = util .process_replica_prepare(ctx, replica_prepare.clone()) @@ -49,17 +51,19 @@ async fn replica_prepare_sanity_yield_leader_prepare() { util.new_prepare_qc(|msg| *msg = replica_prepare.msg) ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_sanity_yield_leader_prepare_reproposal() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); - + util.new_replica_commit(ctx).await; util.process_replica_timeout(ctx).await; let replica_prepare = util.new_replica_prepare(|_| {}).msg; @@ -81,7 +85,9 @@ async fn replica_prepare_sanity_yield_leader_prepare_reproposal() { assert_eq!(map.len(), 1); assert_eq!(*map.first_key_value().unwrap().0, replica_prepare); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] @@ -112,8 +118,8 @@ async fn replica_prepare_incompatible_protocol_version() { async fn replica_prepare_non_validator_signer() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let replica_prepare = util.new_replica_prepare(|_| {}).msg; @@ -128,17 +134,19 @@ async fn replica_prepare_non_validator_signer() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_old_view() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let replica_prepare = util.new_replica_prepare(|_| {}); util.leader.view = util.replica.view.next(); util.leader.phase = Phase::Prepare; @@ -151,17 +159,19 @@ async fn replica_prepare_old_view() { }) ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_during_commit() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let replica_prepare = util.new_replica_prepare(|_| {}); util.leader.view = util.replica.view; util.leader.phase = Phase::Commit; @@ -176,15 +186,17 @@ async fn replica_prepare_during_commit() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_not_leader_in_view() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,2).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); let replica_prepare = util.new_replica_prepare(|msg| { @@ -194,15 +206,17 @@ async fn replica_prepare_not_leader_in_view() { let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!(res, Err(ReplicaPrepareError::NotLeaderInView)); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_already_exists() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,2).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); util.set_owner_as_view_leader(); @@ -222,15 +236,17 @@ async fn replica_prepare_already_exists() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_num_received_below_threshold() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,2).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); util.set_owner_as_view_leader(); @@ -241,15 +257,17 @@ async fn replica_prepare_num_received_below_threshold() { .unwrap() .is_none()); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_invalid_sig() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let mut replica_prepare = util.new_replica_prepare(|_| {}); @@ -257,32 +275,36 @@ async fn replica_prepare_invalid_sig() { let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!(res, Err(ReplicaPrepareError::InvalidSignature(_))); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_invalid_commit_qc() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let replica_prepare = util.new_replica_prepare(|msg| msg.high_qc = ctx.rng().gen()); let res = util.process_replica_prepare(ctx, replica_prepare).await; assert_matches!(res, Err(ReplicaPrepareError::InvalidHighQC(..))); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_high_qc_of_current_view() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let view = ViewNumber(1); let qc_view = ViewNumber(1); util.set_view(view); @@ -297,17 +319,19 @@ async fn replica_prepare_high_qc_of_current_view() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_prepare_high_qc_of_future_view() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let view = ViewNumber(1); let qc_view = ViewNumber(2); util.set_view(view); @@ -322,28 +346,32 @@ async fn replica_prepare_high_qc_of_future_view() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_commit_sanity() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); util.new_leader_commit(ctx).await; Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_commit_sanity_yield_leader_commit() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let replica_commit = util.new_replica_commit(ctx).await; @@ -363,7 +391,9 @@ async fn replica_commit_sanity_yield_leader_commit() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] @@ -373,7 +403,7 @@ async fn replica_commit_incompatible_protocol_version() { scope::run!(ctx, |ctx,s| async { let (mut util,runner) = UTHarness::new(ctx,1).await; s.spawn_bg(runner.run(ctx)); - + let incompatible_protocol_version = util.incompatible_protocol_version(); let mut replica_commit = util.new_replica_commit(ctx).await.msg; replica_commit.protocol_version = incompatible_protocol_version; @@ -395,8 +425,8 @@ async fn replica_commit_incompatible_protocol_version() { async fn replica_commit_non_validator_signer() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let replica_commit = util.new_replica_commit(ctx).await.msg; @@ -411,17 +441,19 @@ async fn replica_commit_non_validator_signer() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_commit_old() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let mut replica_commit = util.new_replica_commit(ctx).await.msg; replica_commit.view = util.replica.view.prev(); let replica_commit = util.owner_key().sign_msg(replica_commit); @@ -434,17 +466,19 @@ async fn replica_commit_old() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_commit_not_leader_in_view() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,2).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); - + let current_view_leader = util.view_leader(util.replica.view); assert_ne!(current_view_leader, util.owner_key().public()); @@ -452,17 +486,19 @@ async fn replica_commit_not_leader_in_view() { let res = util.process_replica_commit(ctx, replica_commit).await; assert_matches!(res, Err(ReplicaCommitError::NotLeaderInView)); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_commit_already_exists() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,2).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); - + let replica_commit = util.new_replica_commit(ctx).await; assert!(util .process_replica_commit(ctx, replica_commit.clone()) @@ -479,17 +515,19 @@ async fn replica_commit_already_exists() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_commit_num_received_below_threshold() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,2).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); - + let replica_prepare = util.new_replica_prepare(|_| {}); assert!(util .process_replica_prepare(ctx, replica_prepare.clone()) @@ -510,15 +548,17 @@ async fn replica_commit_num_received_below_threshold() { .await .unwrap(); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_commit_invalid_sig() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let mut replica_commit = util.new_current_replica_commit(|_| {}); @@ -526,15 +566,17 @@ async fn replica_commit_invalid_sig() { let res = util.process_replica_commit(ctx, replica_commit).await; assert_matches!(res, Err(ReplicaCommitError::InvalidSignature(..))); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn replica_commit_unexpected_proposal() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let replica_commit = util.new_current_replica_commit(|_| {}); @@ -542,5 +584,7 @@ async fn replica_commit_unexpected_proposal() { .await .unwrap(); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index 6801aab4..45897508 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -37,11 +37,20 @@ pub const PROTOCOL_VERSION: validator::ProtocolVersion = validator::ProtocolVers /// Payload proposal and verification trait. #[async_trait::async_trait] -pub trait PayloadManager : std::fmt::Debug + Send + Sync { +pub trait PayloadManager: std::fmt::Debug + Send + Sync { /// Used by leader to propose a payload for the next block. - async fn propose(&self, ctx: &ctx::Ctx, number: validator::BlockNumber) -> ctx::Result; + async fn propose( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result; /// Used by replica to verify a payload for the next block proposed by the leader. - async fn verify(&self, ctx: &ctx::Ctx, number: validator::BlockNumber, payload: &validator::Payload) -> ctx::Result<()>; + async fn verify( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + payload: &validator::Payload, + ) -> ctx::Result<()>; } pub(crate) type OutputPipe = ctx::channel::UnboundedSender; @@ -56,7 +65,8 @@ impl Config { ) -> anyhow::Result<()> { let cfg = Arc::new(self); let res = scope::run!(ctx, |ctx, s| async { - let mut replica = replica::StateMachine::start(ctx, cfg.clone(), pipe.send.clone()).await?; + let mut replica = + replica::StateMachine::start(ctx, cfg.clone(), pipe.send.clone()).await?; let mut leader = leader::StateMachine::new(ctx, cfg.clone(), pipe.send.clone()); s.spawn_bg(leader::StateMachine::run_proposer( diff --git a/node/actors/bft/src/replica/state_machine.rs b/node/actors/bft/src/replica/state_machine.rs index 22882256..7feb3f69 100644 --- a/node/actors/bft/src/replica/state_machine.rs +++ b/node/actors/bft/src/replica/state_machine.rs @@ -1,4 +1,4 @@ -use crate::{Config, metrics, OutputPipe}; +use crate::{metrics, Config, OutputPipe}; use std::{ collections::{BTreeMap, HashMap}, sync::Arc, @@ -40,7 +40,12 @@ impl StateMachine { ) -> ctx::Result { let backup = match config.replica_store.state(ctx).await? { Some(backup) => backup, - None => config.block_store.last_block(ctx).await?.justification.into(), + None => config + .block_store + .last_block(ctx) + .await? + .justification + .into(), }; let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new(); for proposal in backup.proposals { @@ -128,7 +133,8 @@ impl StateMachine { high_qc: self.high_qc.clone(), proposals, }; - self.config.replica_store + self.config + .replica_store .set_state(ctx, &backup) .await .wrap("put_replica_state")?; diff --git a/node/actors/bft/src/replica/tests.rs b/node/actors/bft/src/replica/tests.rs index 9b3ab59b..43d7797a 100644 --- a/node/actors/bft/src/replica/tests.rs +++ b/node/actors/bft/src/replica/tests.rs @@ -1,8 +1,8 @@ use super::{leader_commit, leader_prepare}; -use crate::{Config, testonly, testonly::ut_harness::UTHarness}; +use crate::{testonly, testonly::ut_harness::UTHarness, Config}; use assert_matches::assert_matches; use rand::Rng; -use zksync_concurrency::{ctx,scope}; +use zksync_concurrency::{ctx, scope}; use zksync_consensus_roles::validator::{ self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, ViewNumber, }; @@ -11,8 +11,8 @@ use zksync_consensus_roles::validator::{ async fn leader_prepare_sanity() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); let leader_prepare = util.new_leader_prepare(ctx).await; @@ -20,17 +20,19 @@ async fn leader_prepare_sanity() { .await .unwrap(); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test] async fn leader_prepare_reproposal_sanity() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); - + util.new_replica_commit(ctx).await; util.process_replica_timeout(ctx).await; let leader_prepare = util.new_leader_prepare(ctx).await; @@ -39,15 +41,16 @@ async fn leader_prepare_reproposal_sanity() { .await .unwrap(); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_incompatible_protocol_version() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { + scope::run!(ctx, |ctx,s| async { let (mut util,runner) = UTHarness::new(ctx,1).await; s.spawn_bg(runner.run(ctx)); @@ -68,13 +71,12 @@ async fn leader_prepare_incompatible_protocol_version() { }).await.unwrap(); } - #[tokio::test] async fn leader_prepare_sanity_yield_replica_commit() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let leader_prepare = util.new_leader_prepare(ctx).await; @@ -91,16 +93,17 @@ async fn leader_prepare_sanity_yield_replica_commit() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_invalid_leader() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,2).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); let view = ViewNumber(2); @@ -134,16 +137,17 @@ async fn leader_prepare_invalid_leader() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_old_view() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; @@ -158,17 +162,19 @@ async fn leader_prepare_old_view() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - /// Tests that `WriteBlockStore::verify_payload` is applied before signing a vote. #[tokio::test] async fn leader_prepare_invalid_payload() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_with_payload(ctx,1,Box::new(testonly::RejectPayload)).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = + UTHarness::new_with_payload(ctx, 1, Box::new(testonly::RejectPayload)).await; s.spawn_bg(runner.run(ctx)); let leader_prepare = util.new_leader_prepare(ctx).await; @@ -186,21 +192,27 @@ async fn leader_prepare_invalid_payload() { ) .unwrap(), }; - util.replica.config.block_store.store_block(ctx, block).await.unwrap(); + util.replica + .config + .block_store + .store_block(ctx, block) + .await + .unwrap(); let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(leader_prepare::Error::ProposalInvalidPayload(..))); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_invalid_sig() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let mut leader_prepare = util.new_leader_prepare(ctx).await; @@ -208,54 +220,57 @@ async fn leader_prepare_invalid_sig() { let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(leader_prepare::Error::InvalidSignature(..))); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_invalid_prepare_qc() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; leader_prepare.justification = ctx.rng().gen(); let leader_prepare = util.owner_key().sign_msg(leader_prepare); let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(leader_prepare::Error::InvalidPrepareQC(_))); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_invalid_high_qc() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; leader_prepare.justification = util.new_prepare_qc(|msg| msg.high_qc = ctx.rng().gen()); let leader_prepare = util.owner_key().sign_msg(leader_prepare); let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(leader_prepare::Error::InvalidHighQC(_))); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_proposal_oversized_payload() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let payload_oversize = Config::PAYLOAD_MAX_SIZE + 1; let payload_vec = vec![0; payload_oversize]; let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; @@ -272,36 +287,38 @@ async fn leader_prepare_proposal_oversized_payload() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_proposal_mismatched_payload() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; leader_prepare.proposal_payload = Some(ctx.rng().gen()); let leader_prepare = util.owner_key().sign_msg(leader_prepare); let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(leader_prepare::Error::ProposalMismatchedPayload)); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_proposal_when_previous_not_finalized() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let replica_prepare = util.new_replica_prepare(|_| {}); let mut leader_prepare = util .process_replica_prepare(ctx, replica_prepare) @@ -317,16 +334,17 @@ async fn leader_prepare_proposal_when_previous_not_finalized() { Err(leader_prepare::Error::ProposalWhenPreviousNotFinalized) ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_proposal_invalid_parent_hash() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let replica_prepare = util.new_replica_prepare(|_| {}); @@ -354,18 +372,19 @@ async fn leader_prepare_proposal_invalid_parent_hash() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_proposal_non_sequential_number() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { + scope::run!(ctx, |ctx,s| async { let (mut util,runner) = UTHarness::new(ctx,1).await; s.spawn_bg(runner.run(ctx)); - + let replica_prepare = util.new_replica_prepare(|_| {}); let mut leader_prepare = util .process_replica_prepare(ctx, replica_prepare.clone()) @@ -394,16 +413,15 @@ async fn leader_prepare_proposal_non_sequential_number() { }).await.unwrap(); } - #[tokio::test] async fn leader_prepare_reproposal_without_quorum() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); - + let replica_prepare = util.new_replica_prepare(|_| {}).msg; let mut leader_prepare = util .process_replica_prepare_all(ctx, replica_prepare.clone()) @@ -428,16 +446,17 @@ async fn leader_prepare_reproposal_without_quorum() { let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(leader_prepare::Error::ReproposalWithoutQuorum)); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_reproposal_when_finalized() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; @@ -446,16 +465,17 @@ async fn leader_prepare_reproposal_when_finalized() { let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(leader_prepare::Error::ReproposalWhenFinalized)); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_prepare_reproposal_invalid_block() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); let mut leader_prepare = util.new_leader_prepare(ctx).await.msg; @@ -465,16 +485,17 @@ async fn leader_prepare_reproposal_invalid_block() { let res = util.process_leader_prepare(ctx, leader_prepare).await; assert_matches!(res, Err(leader_prepare::Error::ReproposalInvalidBlock)); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_commit_sanity() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); let leader_commit = util.new_leader_commit(ctx).await; @@ -482,18 +503,19 @@ async fn leader_commit_sanity() { .await .unwrap(); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_commit_sanity_yield_replica_prepare() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let leader_commit = util.new_leader_commit(ctx).await; let replica_prepare = util .process_leader_commit(ctx, leader_commit.clone()) @@ -509,18 +531,19 @@ async fn leader_commit_sanity_yield_replica_prepare() { } ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_commit_incompatible_protocol_version() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { + scope::run!(ctx, |ctx,s| async { let (mut util,runner) = UTHarness::new(ctx,1).await; s.spawn_bg(runner.run(ctx)); - + let incompatible_protocol_version = util.incompatible_protocol_version(); let mut leader_commit = util.new_leader_commit(ctx).await.msg; leader_commit.protocol_version = incompatible_protocol_version; @@ -538,15 +561,14 @@ async fn leader_commit_incompatible_protocol_version() { }).await.unwrap(); } - #[tokio::test] async fn leader_commit_invalid_leader() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,2).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); - + let current_view_leader = util.view_leader(util.replica.view); assert_ne!(current_view_leader, util.owner_key().public()); @@ -556,37 +578,39 @@ async fn leader_commit_invalid_leader() { .await; assert_matches!(res, Err(leader_commit::Error::InvalidLeader { .. })); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_commit_invalid_sig() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let mut leader_commit = util.new_leader_commit(ctx).await; leader_commit.sig = rng.gen(); let res = util.process_leader_commit(ctx, leader_commit).await; assert_matches!(res, Err(leader_commit::Error::InvalidSignature { .. })); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - #[tokio::test] async fn leader_commit_invalid_commit_qc() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; s.spawn_bg(runner.run(ctx)); - + let mut leader_commit = util.new_leader_commit(ctx).await.msg; leader_commit.justification = rng.gen(); let res = util @@ -594,6 +618,7 @@ async fn leader_commit_invalid_commit_qc() { .await; assert_matches!(res, Err(leader_commit::Error::InvalidJustification { .. })); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - diff --git a/node/actors/bft/src/testonly/make.rs b/node/actors/bft/src/testonly/make.rs index 1395e9dd..61506568 100644 --- a/node/actors/bft/src/testonly/make.rs +++ b/node/actors/bft/src/testonly/make.rs @@ -1,8 +1,8 @@ //! This module contains utilities that are only meant for testing purposes. -use crate::{PayloadManager,Config}; +use crate::{Config, PayloadManager}; +use rand::Rng as _; use zksync_concurrency::ctx; use zksync_consensus_roles::validator; -use rand::Rng as _; /// Produces random payload. #[derive(Debug)] @@ -10,12 +10,23 @@ pub struct RandomPayload; #[async_trait::async_trait] impl PayloadManager for RandomPayload { - async fn propose(&self, ctx: &ctx::Ctx, _number: validator::BlockNumber) -> ctx::Result { - let mut payload = validator::Payload(vec![0;Config::PAYLOAD_MAX_SIZE]); + async fn propose( + &self, + ctx: &ctx::Ctx, + _number: validator::BlockNumber, + ) -> ctx::Result { + let mut payload = validator::Payload(vec![0; Config::PAYLOAD_MAX_SIZE]); ctx.rng().fill(&mut payload.0[..]); Ok(payload) } - async fn verify(&self, _ctx: &ctx::Ctx, _number: validator::BlockNumber, _payload: &validator::Payload) -> ctx::Result<()> { Ok(()) } + async fn verify( + &self, + _ctx: &ctx::Ctx, + _number: validator::BlockNumber, + _payload: &validator::Payload, + ) -> ctx::Result<()> { + Ok(()) + } } /// propose() blocks indefinitely. @@ -24,12 +35,23 @@ pub struct PendingPayload; #[async_trait::async_trait] impl PayloadManager for PendingPayload { - async fn propose(&self, ctx: &ctx::Ctx, _number: validator::BlockNumber) -> ctx::Result { + async fn propose( + &self, + ctx: &ctx::Ctx, + _number: validator::BlockNumber, + ) -> ctx::Result { ctx.canceled().await; Err(ctx::Canceled.into()) } - async fn verify(&self, _ctx: &ctx::Ctx, _number: validator::BlockNumber, _payload: &validator::Payload) -> ctx::Result<()> { Ok(()) } + async fn verify( + &self, + _ctx: &ctx::Ctx, + _number: validator::BlockNumber, + _payload: &validator::Payload, + ) -> ctx::Result<()> { + Ok(()) + } } /// verify() doesn't accept any payload. @@ -38,11 +60,20 @@ pub struct RejectPayload; #[async_trait::async_trait] impl PayloadManager for RejectPayload { - async fn propose(&self, _ctx: &ctx::Ctx, _number: validator::BlockNumber) -> ctx::Result { + async fn propose( + &self, + _ctx: &ctx::Ctx, + _number: validator::BlockNumber, + ) -> ctx::Result { Ok(validator::Payload(vec![])) } - async fn verify(&self, _ctx: &ctx::Ctx, _number: validator::BlockNumber, _payload: &validator::Payload) -> ctx::Result<()> { + async fn verify( + &self, + _ctx: &ctx::Ctx, + _number: validator::BlockNumber, + _payload: &validator::Payload, + ) -> ctx::Result<()> { Err(anyhow::anyhow!("invalid payload").into()) } } diff --git a/node/actors/bft/src/testonly/node.rs b/node/actors/bft/src/testonly/node.rs index 3b5fc1bc..20c08b5f 100644 --- a/node/actors/bft/src/testonly/node.rs +++ b/node/actors/bft/src/testonly/node.rs @@ -1,5 +1,5 @@ use super::Fuzz; -use crate::{io, testonly,PayloadManager}; +use crate::{io, testonly, PayloadManager}; use rand::Rng; use std::sync::Arc; use zksync_concurrency::{ctx, scope}; diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index d191dc2c..2d87a536 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,12 +1,12 @@ use super::{Behavior, Node}; -use crate::{Config,testonly}; +use crate::{testonly, Config}; use anyhow::Context; use std::{collections::HashMap, sync::Arc}; use tracing::Instrument as _; use zksync_concurrency::{ctx, oneshot, scope, signal}; use zksync_consensus_network as network; use zksync_consensus_roles::validator; -use zksync_consensus_storage::{BlockStore,testonly::in_memory}; +use zksync_consensus_storage::{testonly::in_memory, BlockStore}; use zksync_consensus_utils::pipe; #[derive(Clone, Copy)] @@ -28,16 +28,16 @@ impl Test { pub(crate) async fn run(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let rng = &mut ctx.rng(); let nets: Vec<_> = network::testonly::Instance::new(rng, self.nodes.len(), 1); - let keys: Vec<_> = nets + let keys: Vec<_> = nets .iter() .map(|node| node.consensus_config().key.clone()) .collect(); let (genesis_block, _) = testonly::make_genesis(&keys, validator::Payload(vec![]), validator::BlockNumber(0)); let mut nodes = vec![]; - for (i,net) in nets.into_iter().enumerate() { + for (i, net) in nets.into_iter().enumerate() { let block_store = Box::new(in_memory::BlockStore::new(genesis_block.clone())); - let block_store = Arc::new(BlockStore::new(ctx,block_store,10).await?); + let block_store = Arc::new(BlockStore::new(ctx, block_store, 10).await?); nodes.push(Node { net, behavior: self.nodes[i], @@ -57,7 +57,9 @@ impl Test { s.spawn_bg(run_nodes(ctx, self.network, &nodes)); for n in &honest { s.spawn(async { - n.block_store.wait_for_block(ctx,validator::BlockNumber(self.blocks_to_finalize as u64)).await?; + n.block_store + .wait_for_block(ctx, validator::BlockNumber(self.blocks_to_finalize as u64)) + .await?; Ok(()) }); } diff --git a/node/actors/bft/src/testonly/ut_harness.rs b/node/actors/bft/src/testonly/ut_harness.rs index c837437c..cdd1bdff 100644 --- a/node/actors/bft/src/testonly/ut_harness.rs +++ b/node/actors/bft/src/testonly/ut_harness.rs @@ -1,12 +1,10 @@ use crate::{ - testonly, - PayloadManager, io::OutputMessage, leader, leader::{ReplicaCommitError, ReplicaPrepareError}, replica, replica::{LeaderCommitError, LeaderPrepareError}, - Config, + testonly, Config, PayloadManager, }; use assert_matches::assert_matches; use rand::Rng; @@ -43,11 +41,15 @@ impl Runner { impl UTHarness { /// Creates a new `UTHarness` with the specified validator set size. - pub(crate) async fn new(ctx: &ctx::Ctx, num_validators: usize) -> (UTHarness,Runner) { - Self::new_with_payload(ctx,num_validators,Box::new(testonly::RandomPayload)).await + pub(crate) async fn new(ctx: &ctx::Ctx, num_validators: usize) -> (UTHarness, Runner) { + Self::new_with_payload(ctx, num_validators, Box::new(testonly::RandomPayload)).await } - pub(crate) async fn new_with_payload(ctx: &ctx::Ctx, num_validators: usize, payload_manager: Box) -> (UTHarness,Runner) { + pub(crate) async fn new_with_payload( + ctx: &ctx::Ctx, + num_validators: usize, + payload_manager: Box, + ) -> (UTHarness, Runner) { let mut rng = ctx.rng(); let keys: Vec<_> = (0..num_validators).map(|_| rng.gen()).collect(); let (genesis, validator_set) = @@ -55,7 +57,7 @@ impl UTHarness { // Initialize the storage. let block_store = Box::new(in_memory::BlockStore::new(genesis)); - let block_store = Arc::new(BlockStore::new(ctx,block_store,10).await.unwrap()); + let block_store = Arc::new(BlockStore::new(ctx, block_store, 10).await.unwrap()); // Create the pipe. let (send, recv) = ctx::channel::unbounded(); @@ -67,7 +69,9 @@ impl UTHarness { payload_manager, }); let leader = leader::StateMachine::new(ctx, cfg.clone(), send.clone()); - let replica = replica::StateMachine::start(ctx, cfg.clone(), send.clone()).await.unwrap(); + let replica = replica::StateMachine::start(ctx, cfg.clone(), send.clone()) + .await + .unwrap(); let mut this = UTHarness { leader, replica, @@ -75,11 +79,11 @@ impl UTHarness { keys, }; let _: Signed = this.try_recv().unwrap(); - (this,Runner(block_store)) + (this, Runner(block_store)) } /// Creates a new `UTHarness` with minimally-significant validator set size. - pub(crate) async fn new_many(ctx: &ctx::Ctx) -> (UTHarness,Runner) { + pub(crate) async fn new_many(ctx: &ctx::Ctx) -> (UTHarness, Runner) { let num_validators = 6; assert!(crate::misc::faulty_replicas(num_validators) > 0); UTHarness::new(ctx, num_validators).await @@ -210,15 +214,10 @@ impl UTHarness { let prepare_qc = self.leader.prepare_qc.subscribe(); self.leader.process_replica_prepare(ctx, msg).await?; if prepare_qc.has_changed().unwrap() { - let prepare_qc = prepare_qc.borrow().clone().unwrap(); - leader::StateMachine::propose( - ctx, - &self.leader.config, - prepare_qc, - &self.leader.pipe, - ) - .await - .unwrap(); + let prepare_qc = prepare_qc.borrow().clone().unwrap(); + leader::StateMachine::propose(ctx, &self.leader.config, prepare_qc, &self.leader.pipe) + .await + .unwrap(); } Ok(self.try_recv()) } diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 2f9e827d..0ca95e7f 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -2,7 +2,7 @@ use crate::{ misc::consensus_threshold, testonly::{ut_harness::UTHarness, Behavior, Network, Test}, }; -use zksync_concurrency::{ctx,scope}; +use zksync_concurrency::{ctx, scope}; use zksync_consensus_roles::validator::Phase; async fn run_test(behavior: Behavior, network: Network) { @@ -69,26 +69,27 @@ async fn byzantine_real_network() { async fn timeout_leader_no_prepares() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); - + util.new_replica_prepare(|_| {}); util.produce_block_after_timeout(ctx).await; Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - /// Testing liveness after the network becomes idle with leader having some cached prepare messages for the current view. #[tokio::test] async fn timeout_leader_some_prepares() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); - + let replica_prepare = util.new_replica_prepare(|_| {}); assert!(util .process_replica_prepare(ctx, replica_prepare) @@ -97,17 +98,18 @@ async fn timeout_leader_some_prepares() { .is_none()); util.produce_block_after_timeout(ctx).await; Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - /// Testing liveness after the network becomes idle with leader in commit phase. #[tokio::test] async fn timeout_leader_in_commit() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); util.new_leader_prepare(ctx).await; @@ -115,17 +117,18 @@ async fn timeout_leader_in_commit() { assert_eq!(util.leader.phase, Phase::Commit); util.produce_block_after_timeout(ctx).await; Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - /// Testing liveness after the network becomes idle with replica in commit phase. #[tokio::test] async fn timeout_replica_in_commit() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); util.new_replica_commit(ctx).await; @@ -133,7 +136,9 @@ async fn timeout_replica_in_commit() { assert_eq!(util.leader.phase, Phase::Commit); util.produce_block_after_timeout(ctx).await; Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } /// Testing liveness after the network becomes idle with leader having some cached commit messages for the current view. @@ -141,8 +146,8 @@ async fn timeout_replica_in_commit() { async fn timeout_leader_some_commits() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); let replica_commit = util.new_replica_commit(ctx).await; @@ -155,26 +160,28 @@ async fn timeout_leader_some_commits() { assert_eq!(util.leader_phase(), Phase::Commit); util.produce_block_after_timeout(ctx).await; Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - /// Testing liveness after the network becomes idle with leader in a consecutive prepare phase. #[tokio::test] async fn timeout_leader_in_consecutive_prepare() { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new_many(ctx).await; + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; s.spawn_bg(runner.run(ctx)); util.new_leader_commit(ctx).await; util.produce_block_after_timeout(ctx).await; Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } - /// Not being able to propose a block shouldn't cause a deadlock. #[tokio::test] async fn non_proposing_leader() { diff --git a/node/libs/roles/src/validator/messages/consensus.rs b/node/libs/roles/src/validator/messages/consensus.rs index cf611eb4..e04516d6 100644 --- a/node/libs/roles/src/validator/messages/consensus.rs +++ b/node/libs/roles/src/validator/messages/consensus.rs @@ -308,7 +308,7 @@ pub struct CommitQC { } impl CommitQC { - /// Header of the certified block. + /// Header of the certified block. pub fn header(&self) -> &BlockHeader { &self.message.proposal } diff --git a/node/libs/storage/src/block_store.rs b/node/libs/storage/src/block_store.rs index 908ebe4b..81a42665 100644 --- a/node/libs/storage/src/block_store.rs +++ b/node/libs/storage/src/block_store.rs @@ -1,10 +1,10 @@ //! Defines storage layer for finalized blocks. +use std::collections::BTreeMap; use std::fmt; -use zksync_concurrency::{ctx,sync}; +use zksync_concurrency::{ctx, sync}; use zksync_consensus_roles::validator; -use std::collections::BTreeMap; -#[derive(Debug,Clone)] +#[derive(Debug, Clone)] pub struct BlockStoreState { pub first: validator::CommitQC, pub last: validator::CommitQC, @@ -12,7 +12,7 @@ pub struct BlockStoreState { impl BlockStoreState { pub fn contains(&self, number: validator::BlockNumber) -> bool { - self.first.header().number <= number && number <= self.last.header().number + self.first.header().number <= number && number <= self.last.header().number } pub fn next(&self) -> validator::BlockNumber { @@ -31,14 +31,22 @@ pub trait PersistentBlockStore: fmt::Debug + Send + Sync { async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result; /// Gets a block by its number. Should return an error if block is missing. - async fn block(&self, ctx: &ctx::Ctx, number: validator::BlockNumber) -> ctx::Result; + async fn block( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result; /// Persistently store a block. /// Implementations are only required to accept a block directly after the current last block, /// so that the stored blocks always constitute a continuous range. /// Implementation should return only after the block is stored PERSISTENTLY - /// consensus liveness property depends on this behavior. - async fn store_next_block(&self, ctx: &ctx::Ctx, block: &validator::FinalBlock) -> ctx::Result<()>; + async fn store_next_block( + &self, + ctx: &ctx::Ctx, + block: &validator::FinalBlock, + ) -> ctx::Result<()>; } #[derive(Debug)] @@ -46,7 +54,7 @@ struct Inner { inmem: sync::watch::Sender, persisted: BlockStoreState, // TODO: this data structure can be optimized to a VecDeque (or even just a cyclical buffer). - cache: BTreeMap, + cache: BTreeMap, cache_capacity: usize, } @@ -57,7 +65,11 @@ pub struct BlockStore { } impl BlockStore { - pub async fn new(ctx: &ctx::Ctx, persistent: Box, cache_capacity: usize) -> ctx::Result { + pub async fn new( + ctx: &ctx::Ctx, + persistent: Box, + cache_capacity: usize, + ) -> ctx::Result { if cache_capacity < 1 { return Err(anyhow::anyhow!("cache_capacity has to be >=1").into()); } @@ -67,16 +79,21 @@ impl BlockStore { } Ok(Self { persistent, - inner: sync::watch::channel(Inner{ + inner: sync::watch::channel(Inner { inmem: sync::watch::channel(state.clone()).0, persisted: state, cache: BTreeMap::new(), cache_capacity, - }).0, + }) + .0, }) } - pub async fn block(&self, ctx: &ctx::Ctx, number: validator::BlockNumber) -> ctx::Result> { + pub async fn block( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result> { { let inner = self.inner.borrow(); if !inner.inmem.borrow().contains(number) { @@ -86,18 +103,25 @@ impl BlockStore { return Ok(Some(block.clone())); } } - Ok(Some(self.persistent.block(ctx,number).await?)) + Ok(Some(self.persistent.block(ctx, number).await?)) } pub async fn last_block(&self, ctx: &ctx::Ctx) -> ctx::Result { let last = self.inner.borrow().inmem.borrow().last.header().number; - Ok(self.block(ctx,last).await?.unwrap()) + Ok(self.block(ctx, last).await?.unwrap()) } - pub async fn queue_block(&self, ctx: &ctx::Ctx, block: validator::FinalBlock) -> ctx::OrCanceled<()> { + pub async fn queue_block( + &self, + ctx: &ctx::Ctx, + block: validator::FinalBlock, + ) -> ctx::OrCanceled<()> { let number = block.header().number; sync::wait_for(ctx, &mut self.subscribe(), |inmem| inmem.next() >= number).await?; - sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| inner.cache.len() < inner.cache_capacity).await?; + sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| { + inner.cache.len() < inner.cache_capacity + }) + .await?; self.inner.send_if_modified(|inner| { if !inner.inmem.send_if_modified(|inmem| { // It may happen that the same block is queued by 2 calls. @@ -109,16 +133,23 @@ impl BlockStore { }) { return false; } - inner.cache.insert(number,block); + inner.cache.insert(number, block); true }); Ok(()) } - pub async fn store_block(&self, ctx: &ctx::Ctx, block: validator::FinalBlock) -> ctx::OrCanceled<()> { + pub async fn store_block( + &self, + ctx: &ctx::Ctx, + block: validator::FinalBlock, + ) -> ctx::OrCanceled<()> { let number = block.header().number; - self.queue_block(ctx,block).await?; - sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| inner.persisted.contains(number)).await?; + self.queue_block(ctx, block).await?; + sync::wait_for(ctx, &mut self.inner.subscribe(), |inner| { + inner.persisted.contains(number) + }) + .await?; Ok(()) } @@ -127,19 +158,25 @@ impl BlockStore { } pub async fn run_background_tasks(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let res = async { + let res = async { let inner = &mut self.inner.subscribe(); loop { - let block = sync::wait_for(ctx, inner, |inner| !inner.cache.is_empty()).await? - .cache.first_key_value().unwrap().1.clone(); - self.persistent.store_next_block(ctx,&block).await?; + let block = sync::wait_for(ctx, inner, |inner| !inner.cache.is_empty()) + .await? + .cache + .first_key_value() + .unwrap() + .1 + .clone(); + self.persistent.store_next_block(ctx, &block).await?; self.inner.send_modify(|inner| { - debug_assert!(inner.persisted.next()==block.header().number); + debug_assert!(inner.persisted.next() == block.header().number); inner.persisted.last = block.justification.clone(); inner.cache.remove(&block.header().number); }); } - }.await; + } + .await; match res { Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), Err(ctx::Error::Internal(err)) => Err(err), diff --git a/node/libs/storage/src/lib.rs b/node/libs/storage/src/lib.rs index c283d505..98ed3ec0 100644 --- a/node/libs/storage/src/lib.rs +++ b/node/libs/storage/src/lib.rs @@ -2,14 +2,14 @@ //! but this crate only exposes an abstraction of a database, so we can easily switch to a different storage engine in the future. #![allow(missing_docs)] +mod block_store; pub mod proto; +mod replica_store; #[cfg(feature = "rocksdb")] pub mod rocksdb; pub mod testonly; #[cfg(test)] mod tests; -mod block_store; -mod replica_store; -pub use crate::block_store::{BlockStoreState,PersistentBlockStore,BlockStore}; -pub use crate::replica_store::{ReplicaStore,ReplicaState,Proposal}; +pub use crate::block_store::{BlockStore, BlockStoreState, PersistentBlockStore}; +pub use crate::replica_store::{Proposal, ReplicaState, ReplicaStore}; diff --git a/node/libs/storage/src/replica_store.rs b/node/libs/storage/src/replica_store.rs index ecbdc380..6243cc72 100644 --- a/node/libs/storage/src/replica_store.rs +++ b/node/libs/storage/src/replica_store.rs @@ -1,7 +1,7 @@ //! Defines storage layer for persistent replica state. use crate::proto; -use std::fmt; use anyhow::Context as _; +use std::fmt; use zksync_concurrency::ctx; use zksync_consensus_roles::validator; use zksync_protobuf::{read_required, required, ProtoFmt}; @@ -104,5 +104,3 @@ impl ProtoFmt for ReplicaState { } } } - - diff --git a/node/libs/storage/src/rocksdb.rs b/node/libs/storage/src/rocksdb.rs index 1b791ef2..18a37399 100644 --- a/node/libs/storage/src/rocksdb.rs +++ b/node/libs/storage/src/rocksdb.rs @@ -2,15 +2,11 @@ //! chain of blocks, not a tree (assuming we have all blocks and not have any gap). It allows for basic functionality like inserting a block, //! getting a block, checking if a block is contained in the DB. We also store the head of the chain. Storing it explicitly allows us to fetch //! the current head quickly. -use crate::{ReplicaState,ReplicaStore,PersistentBlockStore,BlockStoreState}; -use std::sync::Arc; +use crate::{BlockStoreState, PersistentBlockStore, ReplicaState, ReplicaStore}; use anyhow::Context as _; use rocksdb::{Direction, IteratorMode, ReadOptions}; -use std::{ - fmt, - path::Path, - sync::RwLock, -}; +use std::sync::Arc; +use std::{fmt, path::Path, sync::RwLock}; use zksync_concurrency::{ctx, scope}; use zksync_consensus_roles::validator; @@ -65,30 +61,37 @@ impl Store { let mut options = rocksdb::Options::default(); options.create_missing_column_families(true); options.create_if_missing(true); - Ok(Self(RwLock::new(scope::wait_blocking(|| { - rocksdb::DB::open(&options, path).context("Failed opening RocksDB") - }).await?))) + Ok(Self(RwLock::new( + scope::wait_blocking(|| { + rocksdb::DB::open(&options, path).context("Failed opening RocksDB") + }) + .await?, + ))) } fn state_blocking(&self) -> anyhow::Result { let db = self.0.read().unwrap(); let mut options = ReadOptions::default(); - options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); - let (_,last) = db.iterator_opt(DatabaseKey::BLOCK_HEAD_ITERATOR, options) + options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); + let (_, last) = db + .iterator_opt(DatabaseKey::BLOCK_HEAD_ITERATOR, options) .next() .context("Head block not found")? .context("RocksDB error reading head block")?; - let last : validator::FinalBlock = zksync_protobuf::decode(&last).context("Failed decoding head block bytes")?; + let last: validator::FinalBlock = + zksync_protobuf::decode(&last).context("Failed decoding head block bytes")?; let mut options = ReadOptions::default(); - options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); - let (_, first) = db.iterator_opt(IteratorMode::Start, options) + options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); + let (_, first) = db + .iterator_opt(IteratorMode::Start, options) .next() .context("First stored block not found")? .context("RocksDB error reading first stored block")?; - let first : validator::FinalBlock = zksync_protobuf::decode(&first).context("Failed decoding first stored block bytes")?; - Ok(BlockStoreState{ + let first: validator::FinalBlock = + zksync_protobuf::decode(&first).context("Failed decoding first stored block bytes")?; + Ok(BlockStoreState { first: first.justification, last: last.justification, }) @@ -104,22 +107,31 @@ impl fmt::Debug for Store { #[async_trait::async_trait] impl PersistentBlockStore for Arc { async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { - Ok(scope::wait_blocking(|| { self.state_blocking() }).await?) + Ok(scope::wait_blocking(|| self.state_blocking()).await?) } - async fn block(&self, _ctx: &ctx::Ctx, number: validator::BlockNumber) -> ctx::Result { + async fn block( + &self, + _ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result { Ok(scope::wait_blocking(|| { let db = self.0.read().unwrap(); let block = db .get(DatabaseKey::Block(number).encode_key()) .context("RocksDB error")? .context("not found")?; - zksync_protobuf::decode(&block) - .context("failed decoding block") - }).await.context(number)?) + zksync_protobuf::decode(&block).context("failed decoding block") + }) + .await + .context(number)?) } - async fn store_next_block(&self, _ctx: &ctx::Ctx, block: &validator::FinalBlock) -> ctx::Result<()> { + async fn store_next_block( + &self, + _ctx: &ctx::Ctx, + block: &validator::FinalBlock, + ) -> ctx::Result<()> { Ok(scope::wait_blocking(|| { let db = self.0.write().unwrap(); let block_number = block.header().number; @@ -129,17 +141,20 @@ impl PersistentBlockStore for Arc { zksync_protobuf::encode(block), ); // Commit the transaction. - db.write(write_batch).context("Failed writing block to database")?; + db.write(write_batch) + .context("Failed writing block to database")?; anyhow::Ok(()) - }).await?) + }) + .await?) } } #[async_trait::async_trait] impl ReplicaStore for Arc { async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - Ok(scope::wait_blocking(|| { - let Some(raw_state) = self.0 + Ok(scope::wait_blocking(|| { + let Some(raw_state) = self + .0 .read() .unwrap() .get(DatabaseKey::ReplicaState.encode_key()) @@ -150,17 +165,21 @@ impl ReplicaStore for Arc { zksync_protobuf::decode(&raw_state) .map(Some) .context("Failed to decode replica state!") - }).await?) + }) + .await?) } async fn set_state(&self, _ctx: &ctx::Ctx, state: &ReplicaState) -> ctx::Result<()> { Ok(scope::wait_blocking(|| { - self.0.write().unwrap() + self.0 + .write() + .unwrap() .put( DatabaseKey::ReplicaState.encode_key(), zksync_protobuf::encode(state), ) .context("Failed putting ReplicaState to RocksDB") - }).await?) + }) + .await?) } } diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 3552ac8e..aed232ce 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -1,29 +1,29 @@ //! In-memory storage implementation. -use crate::{BlockStoreState,ReplicaState,PersistentBlockStore}; +use crate::{BlockStoreState, PersistentBlockStore, ReplicaState}; use anyhow::Context as _; -use std::{sync::Mutex}; use std::collections::BTreeMap; -use zksync_concurrency::{ctx}; +use std::sync::Mutex; +use zksync_concurrency::ctx; use zksync_consensus_roles::validator; /// In-memory block store. #[derive(Debug)] -pub struct BlockStore(Mutex>); +pub struct BlockStore(Mutex>); /// In-memory replica store. -#[derive(Debug,Default)] +#[derive(Debug, Default)] pub struct ReplicaStore(Mutex>); impl BlockStore { /// Creates a new store containing only the specified `genesis_block`. pub fn new(genesis: validator::FinalBlock) -> Self { - Self(Mutex::new([(genesis.header().number,genesis)].into())) + Self(Mutex::new([(genesis.header().number, genesis)].into())) } } #[async_trait::async_trait] impl PersistentBlockStore for BlockStore { - async fn state(&self, _ctx :&ctx::Ctx) -> ctx::Result { + async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { let blocks = self.0.lock().unwrap(); Ok(BlockStoreState { first: blocks.first_key_value().unwrap().1.justification.clone(), @@ -31,20 +31,34 @@ impl PersistentBlockStore for BlockStore { }) } - async fn block(&self, _ctx: &ctx::Ctx, number: validator::BlockNumber) -> ctx::Result { - Ok(self.0.lock().unwrap().get(&number).context("not found")?.clone()) + async fn block( + &self, + _ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result { + Ok(self + .0 + .lock() + .unwrap() + .get(&number) + .context("not found")? + .clone()) } - async fn store_next_block(&self, _ctx: &ctx::Ctx, block: &validator::FinalBlock) -> ctx::Result<()> { + async fn store_next_block( + &self, + _ctx: &ctx::Ctx, + block: &validator::FinalBlock, + ) -> ctx::Result<()> { let mut blocks = self.0.lock().unwrap(); let got = block.header().number; let want = blocks.last_key_value().unwrap().0.next(); if got != want { return Err(anyhow::anyhow!("got block {got:?}, while expected {want:?}").into()); } - blocks.insert(got,block.clone()); + blocks.insert(got, block.clone()); Ok(()) - } + } } #[async_trait::async_trait] diff --git a/node/libs/storage/src/tests/mod.rs b/node/libs/storage/src/tests/mod.rs index 0832ff0c..50fdd1db 100644 --- a/node/libs/storage/src/tests/mod.rs +++ b/node/libs/storage/src/tests/mod.rs @@ -1,9 +1,9 @@ use super::*; -use crate::{PersistentBlockStore,ReplicaState}; +use crate::{PersistentBlockStore, ReplicaState}; use async_trait::async_trait; +use rand::Rng; use zksync_concurrency::ctx; use zksync_consensus_roles::validator::{self}; -use rand::Rng; #[cfg(feature = "rocksdb")] mod rocksdb; @@ -11,14 +11,22 @@ mod rocksdb; #[async_trait] trait InitStore { type Store: PersistentBlockStore; - async fn init_store(&self, ctx: &ctx::Ctx, genesis_block: &validator::FinalBlock) -> Self::Store; + async fn init_store( + &self, + ctx: &ctx::Ctx, + genesis_block: &validator::FinalBlock, + ) -> Self::Store; } #[async_trait] impl InitStore for () { type Store = testonly::in_memory::BlockStore; - async fn init_store(&self, _ctx: &ctx::Ctx, genesis_block: &validator::FinalBlock) -> Self::Store { + async fn init_store( + &self, + _ctx: &ctx::Ctx, + genesis_block: &validator::FinalBlock, + ) -> Self::Store { Self::Store::new(genesis_block.clone()) } } @@ -28,7 +36,7 @@ fn make_genesis(rng: &mut impl Rng) -> validator::FinalBlock { } fn make_block(rng: &mut impl Rng, parent: &validator::BlockHeader) -> validator::FinalBlock { - validator::testonly::make_block(rng,parent,validator::ProtocolVersion::EARLIEST) + validator::testonly::make_block(rng, parent, validator::ProtocolVersion::EARLIEST) } async fn dump(ctx: &ctx::Ctx, store: &dyn PersistentBlockStore) -> Vec { @@ -36,11 +44,11 @@ async fn dump(ctx: &ctx::Ctx, store: &dyn PersistentBlockStore) -> Vec(rng); + zksync_protobuf::testonly::test_encode_random::<_, ReplicaState>(rng); } diff --git a/node/libs/storage/src/tests/rocksdb.rs b/node/libs/storage/src/tests/rocksdb.rs index 063c7668..226f94fa 100644 --- a/node/libs/storage/src/tests/rocksdb.rs +++ b/node/libs/storage/src/tests/rocksdb.rs @@ -7,9 +7,13 @@ use tempfile::TempDir; impl InitStore for TempDir { type Store = Arc; - async fn init_store(&self, ctx: &ctx::Ctx, genesis_block: &validator::FinalBlock) -> Self::Store { + async fn init_store( + &self, + ctx: &ctx::Ctx, + genesis_block: &validator::FinalBlock, + ) -> Self::Store { let db = Arc::new(rocksdb::Store::new(self.path()).await.unwrap()); - db.store_next_block(ctx,genesis_block).await.unwrap(); + db.store_next_block(ctx, genesis_block).await.unwrap(); db } } @@ -23,10 +27,10 @@ async fn initializing_store_twice() { let store = temp_dir.init_store(ctx, &blocks[0]).await; blocks.push(make_block(rng, blocks[0].header())); store.store_next_block(ctx, &blocks[1]).await.unwrap(); - assert_eq!(dump(ctx,&store).await, blocks); + assert_eq!(dump(ctx, &store).await, blocks); drop(store); let store = temp_dir.init_store(ctx, &blocks[0]).await; - assert_eq!(dump(ctx,&store).await, blocks); + assert_eq!(dump(ctx, &store).await, blocks); } #[tokio::test]