From 1daec32f89493d0d4cec556b5e5f5c3044363ec7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Fran=C3=A7a?= Date: Tue, 29 Oct 2024 04:45:25 +0000 Subject: [PATCH] Part of the unit tests. --- node/actors/bft/src/chonky_bft/commit.rs | 2 +- node/actors/bft/src/chonky_bft/mod.rs | 4 +- node/actors/bft/src/chonky_bft/proposer.rs | 26 +- node/actors/bft/src/chonky_bft/testonly.rs | 309 ++++ node/actors/bft/src/chonky_bft/tests.rs | 1532 ----------------- .../actors/bft/src/chonky_bft/tests/commit.rs | 422 +++++ node/actors/bft/src/chonky_bft/tests/mod.rs | 87 + .../bft/src/chonky_bft/tests/proposal.rs | 336 ++++ .../bft/src/chonky_bft/tests/timeout.rs | 435 +++++ node/actors/bft/src/chonky_bft/timeout.rs | 4 +- node/actors/bft/src/lib.rs | 4 +- node/actors/bft/src/testonly/make.rs | 16 +- node/actors/bft/src/testonly/mod.rs | 20 +- .../src/validator/messages/leader_proposal.rs | 4 +- 14 files changed, 1637 insertions(+), 1564 deletions(-) create mode 100644 node/actors/bft/src/chonky_bft/testonly.rs delete mode 100644 node/actors/bft/src/chonky_bft/tests.rs create mode 100644 node/actors/bft/src/chonky_bft/tests/commit.rs create mode 100644 node/actors/bft/src/chonky_bft/tests/mod.rs create mode 100644 node/actors/bft/src/chonky_bft/tests/proposal.rs create mode 100644 node/actors/bft/src/chonky_bft/tests/timeout.rs diff --git a/node/actors/bft/src/chonky_bft/commit.rs b/node/actors/bft/src/chonky_bft/commit.rs index 78ca9282..afb6deb4 100644 --- a/node/actors/bft/src/chonky_bft/commit.rs +++ b/node/actors/bft/src/chonky_bft/commit.rs @@ -13,7 +13,7 @@ pub(crate) enum Error { /// Signer of the message. signer: Box, }, - /// Past view or phase. + /// Past view. #[error("past view (current view: {current_view:?})")] Old { /// Current view. diff --git a/node/actors/bft/src/chonky_bft/mod.rs b/node/actors/bft/src/chonky_bft/mod.rs index c4a4eff1..37adaddc 100644 --- a/node/actors/bft/src/chonky_bft/mod.rs +++ b/node/actors/bft/src/chonky_bft/mod.rs @@ -20,6 +20,8 @@ pub(crate) mod proposal; pub(crate) mod proposer; pub(crate) mod timeout; +#[cfg(test)] +mod testonly; #[cfg(test)] mod tests; @@ -32,7 +34,7 @@ pub(crate) struct StateMachine { /// Pipe through which replica sends network messages. pub(super) outbound_pipe: OutputSender, /// Pipe through which replica receives network requests. - inbound_pipe: sync::prunable_mpsc::Receiver, + pub(crate) inbound_pipe: sync::prunable_mpsc::Receiver, /// The sender part of the justification watch. This is used to set the justification /// and notify the proposer loop. pub(crate) justification_watch: sync::watch::Sender>, diff --git a/node/actors/bft/src/chonky_bft/proposer.rs b/node/actors/bft/src/chonky_bft/proposer.rs index b87f963c..7d564265 100644 --- a/node/actors/bft/src/chonky_bft/proposer.rs +++ b/node/actors/bft/src/chonky_bft/proposer.rs @@ -1,9 +1,14 @@ use crate::{metrics, Config, OutputSender}; use std::sync::Arc; -use zksync_concurrency::{ctx, error::Wrap as _, sync}; +use zksync_concurrency::{ctx, error::Wrap as _, sync, time}; use zksync_consensus_network::io::ConsensusInputMessage; use zksync_consensus_roles::validator; +/// Timeout for creating a proposal. If the proposal is not created in this time, the proposer +/// will quit trying to create a proposal for this view. This can be different from the replica +/// timeout for the whole view. +pub(crate) const PROPOSAL_CREATION_TIMEOUT: time::Duration = time::Duration::milliseconds(2000); + /// The proposer loop is responsible for proposing new blocks to the network. It watches for new /// justifications from the replica and if it is the leader for the view, it proposes a new block. pub(crate) async fn run_proposer( @@ -13,6 +18,7 @@ pub(crate) async fn run_proposer( mut justification_watch: sync::watch::Receiver>, ) -> ctx::Result<()> { loop { + // Wait for a new justification to be available. let Some(justification) = sync::changed(ctx, &mut justification_watch).await?.clone() else { continue; @@ -23,7 +29,20 @@ pub(crate) async fn run_proposer( continue; } - let proposal = create_proposal(ctx, cfg.clone(), justification).await?; + // Create a proposal for the given justification, within the timeout. + let proposal = match create_proposal( + &ctx.with_timeout(PROPOSAL_CREATION_TIMEOUT), + cfg.clone(), + justification, + ) + .await + { + Ok(proposal) => proposal, + Err(err) => { + tracing::error!("failed to create proposal: {}", err); + continue; + } + }; // Broadcast our proposal to all replicas (ourselves included). let msg = cfg @@ -50,9 +69,6 @@ pub(crate) async fn create_proposal( // The previous proposal was finalized, so we can propose a new block. None => { // Defensively assume that PayloadManager cannot propose until the previous block is stored. - // if we don't have the previous block, this call will halt until the other replicas timeout. - // This is fine as we can just not propose anything and let our turn end. Eventually, some other - // replica will produce some block with this block number and this function will unblock. if let Some(prev) = block_number.prev() { cfg.block_store.wait_until_persisted(ctx, prev).await?; } diff --git a/node/actors/bft/src/chonky_bft/testonly.rs b/node/actors/bft/src/chonky_bft/testonly.rs new file mode 100644 index 00000000..0822ef2e --- /dev/null +++ b/node/actors/bft/src/chonky_bft/testonly.rs @@ -0,0 +1,309 @@ +use crate::testonly::RandomPayload; +use crate::{ + chonky_bft::{self, commit, new_view, proposal, timeout, StateMachine}, + io::OutputMessage, + Config, PayloadManager, +}; +use assert_matches::assert_matches; +use std::sync::Arc; +use zksync_concurrency::ctx; +use zksync_concurrency::sync::prunable_mpsc; +use zksync_consensus_network as network; +use zksync_consensus_network::io::ConsensusReq; +use zksync_consensus_roles::validator; +use zksync_consensus_storage::{ + testonly::{in_memory, TestMemoryStorage}, + BlockStoreRunner, +}; +use zksync_consensus_utils::enum_util::Variant; + +pub(crate) const MAX_PAYLOAD_SIZE: usize = 1000; + +/// `UTHarness` provides various utilities for unit tests. +/// It is designed to simplify the setup and execution of test cases by encapsulating +/// common testing functionality. +/// +/// It should be instantiated once for every test case. +#[cfg(test)] +pub(crate) struct UTHarness { + pub(crate) replica: StateMachine, + pub(crate) keys: Vec, + output_pipe: ctx::channel::UnboundedReceiver, + input_pipe: prunable_mpsc::Sender, +} + +impl UTHarness { + /// Creates a new `UTHarness` with the specified validator set size. + pub(crate) async fn new( + ctx: &ctx::Ctx, + num_validators: usize, + ) -> (UTHarness, BlockStoreRunner) { + Self::new_with_payload_manager( + ctx, + num_validators, + Box::new(RandomPayload(MAX_PAYLOAD_SIZE)), + ) + .await + } + + /// Creates a new `UTHarness` with minimally-significant validator set size. + pub(crate) async fn new_many(ctx: &ctx::Ctx) -> (UTHarness, BlockStoreRunner) { + let num_validators = 6; + let (util, runner) = UTHarness::new(ctx, num_validators).await; + assert!(util.genesis().validators.max_faulty_weight() > 0); + (util, runner) + } + + pub(crate) async fn new_with_payload_manager( + ctx: &ctx::Ctx, + num_validators: usize, + payload_manager: Box, + ) -> (UTHarness, BlockStoreRunner) { + let rng = &mut ctx.rng(); + let setup = validator::testonly::Setup::new(rng, num_validators); + let store = TestMemoryStorage::new(ctx, &setup).await; + let (send, recv) = ctx::channel::unbounded(); + + let cfg = Arc::new(Config { + secret_key: setup.validator_keys[0].clone(), + block_store: store.blocks.clone(), + replica_store: Box::new(in_memory::ReplicaStore::default()), + payload_manager, + max_payload_size: MAX_PAYLOAD_SIZE, + }); + let (replica, input_pipe) = StateMachine::start(ctx, cfg.clone(), send.clone()) + .await + .unwrap(); + let mut this = UTHarness { + replica, + keys: setup.validator_keys.clone(), + output_pipe: recv, + input_pipe, + }; + this.process_replica_timeout_all(ctx, this.new_replica_timeout()) + .await; + (this, store.runner) + } + + pub(crate) fn owner_key(&self) -> &validator::SecretKey { + &self.replica.config.secret_key + } + + pub(crate) fn leader_key(&self) -> validator::SecretKey { + let leader = self.view_leader(self.replica.view_number); + self.keys + .iter() + .find(|key| key.public() == leader) + .unwrap() + .clone() + } + + pub(crate) fn view(&self) -> validator::View { + validator::View { + genesis: self.genesis().hash(), + number: self.replica.view_number, + } + } + + pub(crate) fn view_leader(&self, view: validator::ViewNumber) -> validator::PublicKey { + self.genesis().view_leader(view) + } + + pub(crate) fn set_owner_as_view_leader(&mut self) { + let mut view = self.replica.view_number; + while self.view_leader(view) != self.owner_key().public() { + view = view.next(); + } + self.replica.view_number = view; + } + + pub(crate) fn genesis(&self) -> &validator::Genesis { + self.replica.config.genesis() + } + + pub(crate) async fn new_leader_proposal(&self, ctx: &ctx::Ctx) -> validator::LeaderProposal { + let justification = self.replica.get_justification(); + chonky_bft::proposer::create_proposal(ctx, self.replica.config.clone(), justification) + .await + .unwrap() + } + + pub(crate) async fn new_replica_commit(&mut self, ctx: &ctx::Ctx) -> validator::ReplicaCommit { + let proposal = self.new_leader_proposal(ctx).await; + + self.process_leader_proposal(ctx, self.leader_key().sign_msg(proposal)) + .await + .unwrap() + .msg + } + + pub(crate) fn new_replica_timeout(&self) -> validator::ReplicaTimeout { + validator::ReplicaTimeout { + view: self.view(), + high_vote: self.replica.high_vote.clone(), + high_qc: self.replica.high_commit_qc.clone(), + } + } + + pub(crate) async fn new_replica_new_view(&self) -> validator::ReplicaNewView { + let justification = self.replica.get_justification(); + validator::ReplicaNewView { justification } + } + + pub(crate) async fn new_commit_qc( + &mut self, + ctx: &ctx::Ctx, + mutate_fn: impl FnOnce(&mut validator::ReplicaCommit), + ) -> validator::CommitQC { + let mut msg = self.new_replica_commit(ctx).await; + mutate_fn(&mut msg); + let mut qc = validator::CommitQC::new(msg, self.genesis()); + for key in &self.keys { + qc.add(&key.sign_msg(qc.message.clone()), self.genesis()) + .unwrap(); + } + qc + } + + pub(crate) fn new_timeout_qc( + &mut self, + mutate_fn: impl FnOnce(&mut validator::ReplicaTimeout), + ) -> validator::TimeoutQC { + let mut msg = self.new_replica_timeout(); + mutate_fn(&mut msg); + let mut qc = validator::TimeoutQC::new(msg.view.clone()); + for key in &self.keys { + qc.add(&key.sign_msg(msg.clone()), self.genesis()).unwrap(); + } + qc + } + + pub(crate) async fn process_leader_proposal( + &mut self, + ctx: &ctx::Ctx, + msg: validator::Signed, + ) -> Result, proposal::Error> { + self.replica.on_proposal(ctx, msg).await?; + Ok(self.try_recv().unwrap()) + } + + pub(crate) async fn process_replica_commit( + &mut self, + ctx: &ctx::Ctx, + msg: validator::Signed, + ) -> Result>, commit::Error> { + self.replica.on_commit(ctx, msg).await?; + Ok(self.try_recv()) + } + + pub(crate) async fn process_replica_timeout( + &mut self, + ctx: &ctx::Ctx, + msg: validator::Signed, + ) -> Result>, timeout::Error> { + self.replica.on_timeout(ctx, msg).await?; + Ok(self.try_recv()) + } + + pub(crate) async fn process_replica_new_view( + &mut self, + ctx: &ctx::Ctx, + msg: validator::Signed, + ) -> Result>, new_view::Error> { + self.replica.on_new_view(ctx, msg).await?; + Ok(self.try_recv()) + } + + pub(crate) async fn process_replica_commit_all( + &mut self, + ctx: &ctx::Ctx, + msg: validator::ReplicaCommit, + ) -> validator::Signed { + let mut threshold_reached = false; + let mut cur_weight = 0; + + for key in self.keys.iter() { + let res = self.replica.on_commit(ctx, key.sign_msg(msg.clone())).await; + let val_index = self.genesis().validators.index(&key.public()).unwrap(); + + cur_weight += self.genesis().validators.get(val_index).unwrap().weight; + + if !threshold_reached { + res.unwrap(); + if cur_weight >= self.genesis().validators.quorum_threshold() { + threshold_reached = true; + } + } else { + assert_matches!(res, Err(commit::Error::Old { .. })); + } + } + + self.try_recv().unwrap() + } + + pub(crate) async fn process_replica_timeout_all( + &mut self, + ctx: &ctx::Ctx, + msg: validator::ReplicaTimeout, + ) -> validator::Signed { + let mut threshold_reached = false; + let mut cur_weight = 0; + + for key in self.keys.iter() { + let res = self + .replica + .on_timeout(ctx, key.sign_msg(msg.clone())) + .await; + let val_index = self.genesis().validators.index(&key.public()).unwrap(); + + cur_weight += self.genesis().validators.get(val_index).unwrap().weight; + + if !threshold_reached { + res.unwrap(); + if cur_weight >= self.genesis().validators.quorum_threshold() { + threshold_reached = true; + } + } else { + assert_matches!(res, Err(timeout::Error::Old { .. })); + } + } + + self.try_recv().unwrap() + } + + /// Produces a block, by executing the full view. + pub(crate) async fn produce_block(&mut self, ctx: &ctx::Ctx) { + let replica_commit = self.new_replica_commit(ctx).await; + self.process_replica_commit_all(ctx, replica_commit).await; + } + + /// Triggers replica timeout, processes the new validator::ReplicaTimeout + /// to start a new view, then executes the whole new view to make sure + /// that the consensus recovers after a timeout. + pub(crate) async fn produce_block_after_timeout(&mut self, ctx: &ctx::Ctx) { + let cur_view = self.replica.view_number; + + self.replica.start_timeout(ctx).await.unwrap(); + let replica_timeout = self.try_recv().unwrap().msg; + self.process_replica_timeout_all(ctx, replica_timeout).await; + + assert_eq!(self.replica.view_number, cur_view.next()); + + self.produce_block(ctx).await; + } + + pub(crate) fn send(&self, msg: validator::Signed) { + self.input_pipe.send(ConsensusReq { + msg, + ack: zksync_concurrency::oneshot::channel().0, + }); + } + + fn try_recv>(&mut self) -> Option> { + self.output_pipe.try_recv().map(|message| match message { + OutputMessage::Network(network::io::ConsensusInputMessage { message, .. }) => { + message.cast().unwrap() + } + }) + } +} diff --git a/node/actors/bft/src/chonky_bft/tests.rs b/node/actors/bft/src/chonky_bft/tests.rs deleted file mode 100644 index f101985e..00000000 --- a/node/actors/bft/src/chonky_bft/tests.rs +++ /dev/null @@ -1,1532 +0,0 @@ -use super::{leader_commit, proposal}; -use crate::{ - testonly, - testonly::ut_harness::{UTHarness, MAX_PAYLOAD_SIZE}, -}; -use assert_matches::assert_matches; -use rand::Rng; -use zksync_concurrency::{ctx, scope}; -use zksync_consensus_roles::validator::{ - self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, -}; - -/// Sanity check of the happy path. -#[tokio::test] -async fn block_production() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - util.produce_block(ctx).await; - Ok(()) - }) - .await - .unwrap(); -} - -/// Sanity check of block production with reproposal. -#[tokio::test] -async fn reproposal_block_production() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - util.new_leader_commit(ctx).await; - util.process_replica_timeout(ctx).await; - util.produce_block(ctx).await; - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_bad_chain() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let mut leader_prepare = util.new_leader_proposal(ctx).await; - leader_prepare.justification.view.genesis = rng.gen(); - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!( - res, - Err(proposal::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::Justification( - validator::PrepareQCVerifyError::View(_) - ) - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_sanity_yield_replica_commit() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let leader_prepare = util.new_leader_proposal(ctx).await; - let replica_commit = util - .process_leader_proposal(ctx, util.sign(leader_prepare.clone())) - .await - .unwrap(); - assert_eq!( - replica_commit.msg, - ReplicaCommit { - view: leader_prepare.view().clone(), - proposal: leader_prepare.proposal, - } - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_invalid_leader() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - let replica_prepare = util.new_replica_timeout(); - assert!(util - .process_replica_prepare(ctx, util.sign(replica_prepare.clone())) - .await - .unwrap() - .is_none()); - - let replica_prepare = util.keys[1].sign_msg(replica_prepare); - let mut leader_prepare = util - .process_replica_prepare(ctx, replica_prepare) - .await - .unwrap() - .unwrap() - .msg; - leader_prepare.justification.view.number = leader_prepare.justification.view.number.next(); - assert_ne!( - util.view_leader(leader_prepare.view().number), - util.keys[0].public() - ); - - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!( - res, - Err(proposal::Error::InvalidLeader { correct_leader, received_leader }) => { - assert_eq!(correct_leader, util.keys[1].public()); - assert_eq!(received_leader, util.keys[0].public()); - } - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_old_view() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let mut leader_prepare = util.new_leader_proposal(ctx).await; - leader_prepare.justification.view.number.0 = util.replica.view_number.0 - 1; - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!( - res, - Err(proposal::Error::Old { current_view, current_phase }) => { - assert_eq!(current_view, util.replica.view_number); - assert_eq!(current_phase, util.replica.phase); - } - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_pruned_block() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let mut leader_prepare = util.new_leader_proposal(ctx).await; - // We assume default replica state and nontrivial `genesis.fork.first_block` here. - leader_prepare.proposal.number = util - .replica - .config - .block_store - .queued() - .first - .prev() - .unwrap(); - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!(res, Err(proposal::Error::ProposalAlreadyPruned)); - Ok(()) - }) - .await - .unwrap(); -} - -/// Tests that `WriteBlockStore::verify_payload` is applied before signing a vote. -#[tokio::test] -async fn leader_prepare_invalid_payload() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = - UTHarness::new_with_payload(ctx, 1, Box::new(testonly::RejectPayload)).await; - s.spawn_bg(runner.run(ctx)); - - let leader_prepare = util.new_leader_proposal(ctx).await; - - // Insert a finalized block to the storage. - let mut justification = CommitQC::new( - ReplicaCommit { - view: util.replica_view(), - proposal: leader_prepare.proposal, - }, - util.genesis(), - ); - justification - .add(&util.sign(justification.message.clone()), util.genesis()) - .unwrap(); - let block = validator::FinalBlock { - payload: leader_prepare.proposal_payload.clone().unwrap(), - justification, - }; - util.replica - .config - .block_store - .queue_block(ctx, block.into()) - .await - .unwrap(); - - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!(res, Err(proposal::Error::InvalidPayload(..))); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_invalid_sig() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - let leader_prepare = util.new_leader_proposal(ctx).await; - let mut leader_prepare = util.sign(leader_prepare); - leader_prepare.sig = ctx.rng().gen(); - let res = util.process_leader_proposal(ctx, leader_prepare).await; - assert_matches!(res, Err(proposal::Error::InvalidSignature(..))); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_invalid_prepare_qc() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let mut leader_prepare = util.new_leader_proposal(ctx).await; - leader_prepare.justification.signature = ctx.rng().gen(); - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!( - res, - Err(proposal::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::Justification(_) - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_proposal_oversized_payload() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let payload_oversize = MAX_PAYLOAD_SIZE + 1; - let payload = Payload(vec![0; payload_oversize]); - let mut leader_prepare = util.new_leader_proposal(ctx).await; - leader_prepare.proposal.payload = payload.hash(); - leader_prepare.proposal_payload = Some(payload); - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!( - res, - Err(proposal::Error::ProposalOversizedPayload{ payload_size }) => { - assert_eq!(payload_size, payload_oversize); - } - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_proposal_mismatched_payload() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let mut leader_prepare = util.new_leader_proposal(ctx).await; - leader_prepare.proposal_payload = Some(ctx.rng().gen()); - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!( - res, - Err(proposal::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::ProposalMismatchedPayload - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_proposal_when_previous_not_finalized() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - tracing::info!("Execute view without replicas receiving the LeaderCommit."); - util.new_leader_commit(ctx).await; - util.process_replica_timeout(ctx).await; - tracing::info!("Make leader repropose the block."); - let mut leader_prepare = util.new_leader_proposal(ctx).await; - tracing::info!("Modify the message to include a new proposal anyway."); - let payload: Payload = rng.gen(); - leader_prepare.proposal.payload = payload.hash(); - leader_prepare.proposal_payload = Some(payload); - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!( - res, - Err(proposal::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::ProposalWhenPreviousNotFinalized - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_bad_block_number() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx,s| async { - let (mut util,runner) = UTHarness::new(ctx,1).await; - s.spawn_bg(runner.run(ctx)); - - tracing::info!("Produce initial block."); - util.produce_block(ctx).await; - tracing::info!("Make leader propose the next block."); - let mut leader_prepare = util.new_leader_proposal(ctx).await; - tracing::info!("Modify the proposal.number so that it doesn't match the previous block"); - leader_prepare.proposal.number = rng.gen(); - let res = util.process_leader_proposal(ctx, util.sign(leader_prepare.clone())).await; - assert_matches!(res, Err(proposal::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::BadBlockNumber { got, want } - )) => { - assert_eq!(want, leader_prepare.justification.high_qc().unwrap().message.proposal.number.next()); - assert_eq!(got, leader_prepare.proposal.number); - }); - Ok(()) - }).await.unwrap(); -} - -#[tokio::test] -async fn leader_prepare_reproposal_without_quorum() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - - tracing::info!("make leader repropose a block"); - util.new_leader_commit(ctx).await; - util.process_replica_timeout(ctx).await; - let mut leader_prepare = util.new_leader_proposal(ctx).await; - tracing::info!("modify justification, to make reproposal unjustified"); - let mut replica_prepare: ReplicaPrepare = leader_prepare - .justification - .map - .keys() - .next() - .unwrap() - .clone(); - leader_prepare.justification = PrepareQC::new(leader_prepare.justification.view); - for key in &util.keys { - replica_prepare.high_vote.as_mut().unwrap().proposal.payload = rng.gen(); - leader_prepare - .justification - .add(&key.sign_msg(replica_prepare.clone()), util.genesis())?; - } - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!( - res, - Err(proposal::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::ReproposalWithoutQuorum - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_reproposal_when_finalized() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - tracing::info!("Make leader propose a new block"); - util.produce_block(ctx).await; - let mut leader_prepare = util.new_leader_proposal(ctx).await; - tracing::info!( - "Modify the message so that it is actually a reproposal of the previous block" - ); - leader_prepare.proposal = leader_prepare - .justification - .high_qc() - .unwrap() - .message - .proposal; - leader_prepare.proposal_payload = None; - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!( - res, - Err(proposal::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::ReproposalWhenFinalized - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_prepare_reproposal_invalid_block() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - tracing::info!("Make leader repropose a block."); - util.new_leader_commit(ctx).await; - util.process_replica_timeout(ctx).await; - let mut leader_prepare = util.new_leader_proposal(ctx).await; - tracing::info!("Make the reproposal different than expected"); - leader_prepare.proposal.payload = rng.gen(); - let res = util - .process_leader_proposal(ctx, util.sign(leader_prepare)) - .await; - assert_matches!( - res, - Err(proposal::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::ReproposalBadBlock - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -/// Check that replica provides expected high_vote and high_qc after finalizing a block. -#[tokio::test] -async fn leader_commit_sanity_yield_replica_prepare() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let leader_commit = util.new_leader_commit(ctx).await; - let replica_prepare = util - .process_leader_commit(ctx, util.sign(leader_commit.clone())) - .await - .unwrap(); - let mut view = leader_commit.justification.message.view.clone(); - view.number = view.number.next(); - assert_eq!( - replica_prepare.msg, - ReplicaPrepare { - view, - high_vote: Some(leader_commit.justification.message.clone()), - high_qc: Some(leader_commit.justification), - } - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_commit_bad_chain() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let mut leader_commit = util.new_leader_commit(ctx).await; - leader_commit.justification.message.view.genesis = rng.gen(); - let res = util - .process_leader_commit(ctx, util.sign(leader_commit)) - .await; - assert_matches!( - res, - Err(leader_commit::Error::InvalidMessage( - validator::CommitQCVerifyError::InvalidMessage( - validator::ReplicaCommitVerifyError::BadView(_) - ) - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_commit_bad_leader() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - let leader_commit = util.new_leader_commit(ctx).await; - // Sign the leader_prepare with a key of different validator. - let res = util - .process_leader_commit(ctx, util.keys[1].sign_msg(leader_commit)) - .await; - assert_matches!(res, Err(leader_commit::Error::BadLeader { .. })); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_commit_invalid_sig() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - let leader_commit = util.new_leader_commit(ctx).await; - let mut leader_commit = util.sign(leader_commit); - leader_commit.sig = rng.gen(); - let res = util.process_leader_commit(ctx, leader_commit).await; - assert_matches!(res, Err(leader_commit::Error::InvalidSignature { .. })); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn leader_commit_invalid_commit_qc() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let mut leader_commit = util.new_leader_commit(ctx).await; - leader_commit.justification.signature = rng.gen(); - let res = util - .process_leader_commit(ctx, util.sign(leader_commit)) - .await; - assert_matches!( - res, - Err(leader_commit::Error::InvalidMessage( - validator::CommitQCVerifyError::BadSignature(..) - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_sanity() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - tracing::info!("started"); - util.new_leader_prepare(ctx).await; - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_sanity_yield_leader_prepare() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - util.produce_block(ctx).await; - let replica_prepare = util.new_replica_prepare(); - let leader_prepare = util - .process_replica_prepare(ctx, util.sign(replica_prepare.clone())) - .await - .unwrap() - .unwrap(); - assert_eq!(leader_prepare.msg.view(), &replica_prepare.view); - assert_eq!( - leader_prepare.msg.justification, - util.new_prepare_qc(|msg| *msg = replica_prepare) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_sanity_yield_leader_prepare_reproposal() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - - util.new_replica_commit_from_proposal(ctx).await; - util.process_replica_timeout(ctx).await; - let replica_prepare = util.new_replica_prepare(); - let leader_prepare = util - .process_replica_timeout_all(ctx, replica_prepare.clone()) - .await; - - assert_eq!(leader_prepare.msg.view(), &replica_prepare.view); - assert_eq!( - Some(leader_prepare.msg.proposal), - replica_prepare.high_vote.as_ref().map(|v| v.proposal), - ); - assert_eq!(leader_prepare.msg.proposal_payload, None); - let map = leader_prepare.msg.justification.map; - assert_eq!(map.len(), 1); - assert_eq!(*map.first_key_value().unwrap().0, replica_prepare); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_bad_chain() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let mut replica_prepare = util.new_replica_prepare(); - replica_prepare.view.genesis = rng.gen(); - let res = util - .process_replica_prepare(ctx, util.sign(replica_prepare)) - .await; - assert_matches!( - res, - Err(replica_prepare::Error::InvalidMessage( - validator::ReplicaPrepareVerifyError::View(_) - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_non_validator_signer() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let replica_prepare = util.new_replica_prepare(); - let non_validator_key: validator::SecretKey = ctx.rng().gen(); - let res = util - .process_replica_prepare(ctx, non_validator_key.sign_msg(replica_prepare)) - .await; - assert_matches!( - res, - Err(replica_prepare::Error::NonValidatorSigner { signer }) => { - assert_eq!(signer, non_validator_key.public()); - } - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_old_view() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let replica_prepare = util.new_replica_prepare(); - util.leader.view = util.replica.view_number.next(); - util.leader.phase = Phase::Prepare; - let res = util - .process_replica_prepare(ctx, util.sign(replica_prepare)) - .await; - assert_matches!( - res, - Err(replica_prepare::Error::Old { - current_view: ViewNumber(2), - current_phase: Phase::Prepare, - }) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_during_commit() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let replica_prepare = util.new_replica_prepare(); - util.leader.view = util.replica.view_number; - util.leader.phase = Phase::Commit; - let res = util - .process_replica_prepare(ctx, util.sign(replica_prepare)) - .await; - assert_matches!( - res, - Err(replica_prepare::Error::Old { - current_view, - current_phase: Phase::Commit, - }) => { - assert_eq!(current_view, util.replica.view_number); - } - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_not_leader_in_view() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - let mut replica_prepare = util.new_replica_prepare(); - replica_prepare.view.number = replica_prepare.view.number.next(); - let res = util - .process_replica_prepare(ctx, util.sign(replica_prepare)) - .await; - assert_matches!(res, Err(replica_prepare::Error::NotLeaderInView)); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_already_exists() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - util.set_owner_as_view_leader(); - let replica_prepare = util.new_replica_prepare(); - let replica_prepare = util.sign(replica_prepare.clone()); - assert!(util - .process_replica_prepare(ctx, replica_prepare.clone()) - .await - .unwrap() - .is_none()); - let res = util - .process_replica_prepare(ctx, replica_prepare.clone()) - .await; - assert_matches!(res, Err(replica_prepare::Error::Old { .. })); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_num_received_below_threshold() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - util.set_owner_as_view_leader(); - let replica_prepare = util.new_replica_prepare(); - assert!(util - .process_replica_prepare(ctx, util.sign(replica_prepare)) - .await - .unwrap() - .is_none()); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_invalid_sig() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let msg = util.new_replica_prepare(); - let mut replica_prepare = util.sign(msg); - replica_prepare.sig = ctx.rng().gen(); - let res = util.process_replica_prepare(ctx, replica_prepare).await; - assert_matches!(res, Err(replica_prepare::Error::InvalidSignature(_))); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_invalid_commit_qc() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - util.produce_block(ctx).await; - let mut replica_prepare = util.new_replica_prepare(); - replica_prepare.high_qc.as_mut().unwrap().signature = rng.gen(); - let res = util - .process_replica_prepare(ctx, util.sign(replica_prepare)) - .await; - assert_matches!( - res, - Err(replica_prepare::Error::InvalidMessage( - validator::ReplicaPrepareVerifyError::HighQC(_) - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -/// Check that leader behaves correctly in case receiving ReplicaPrepare -/// with high_qc with future views (which shouldn't be available yet). -#[tokio::test] -async fn replica_prepare_high_qc_of_future_view() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - util.produce_block(ctx).await; - let mut view = util.replica_view(); - let mut replica_prepare = util.new_replica_prepare(); - // Check both the current view and next view. - for _ in 0..2 { - let qc = util.new_commit_qc(|msg| msg.view = view.clone()); - replica_prepare.high_qc = Some(qc); - let res = util - .process_replica_prepare(ctx, util.sign(replica_prepare.clone())) - .await; - assert_matches!( - res, - Err(replica_prepare::Error::InvalidMessage( - validator::ReplicaPrepareVerifyError::HighQCFutureView - )) - ); - view.number = view.number.next(); - } - Ok(()) - }) - .await - .unwrap(); -} - -/// Check all ReplicaPrepare are included for weight calculation -/// even on different messages for the same view. -#[tokio::test] -async fn replica_prepare_different_messages() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - - util.produce_block(ctx).await; - - let view = util.replica_view(); - let replica_prepare = util.new_replica_prepare(); - - // Create a different proposal for the same view - let proposal = replica_prepare.clone().high_vote.unwrap().proposal; - let mut different_proposal = proposal; - different_proposal.number = different_proposal.number.next(); - - // Create a new ReplicaPrepare with the different proposal - let mut other_replica_prepare = replica_prepare.clone(); - let mut high_vote = other_replica_prepare.high_vote.clone().unwrap(); - high_vote.proposal = different_proposal; - let high_qc = util.new_commit_qc(|msg| { - msg.proposal = different_proposal; - msg.view = view.clone() - }); - - other_replica_prepare.high_vote = Some(high_vote); - other_replica_prepare.high_qc = Some(high_qc); - - let validators = util.keys.len(); - - // half of the validators sign replica_prepare - for i in 0..validators / 2 { - util.process_replica_prepare(ctx, util.keys[i].sign_msg(replica_prepare.clone())) - .await - .unwrap(); - } - - let mut replica_commit_result = None; - // The rest of the validators until threshold sign other_replica_prepare - for i in validators / 2..util.genesis().validators.quorum_threshold() as usize { - replica_commit_result = util - .process_replica_prepare(ctx, util.keys[i].sign_msg(other_replica_prepare.clone())) - .await - .unwrap(); - } - - // That should be enough for a proposal to be committed (even with different proposals) - assert_matches!(replica_commit_result, Some(_)); - - // Check the first proposal has been committed (as it has more votes) - let message = replica_commit_result.unwrap().msg; - assert_eq!(message.proposal, proposal); - Ok(()) - }) - .await - .unwrap(); -} - -/// Check that leader won't accumulate undefined amount of messages if -/// it's spammed with ReplicaPrepare messages for future views -#[tokio::test] -async fn replica_prepare_limit_messages_in_memory() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - let mut replica_prepare = util.new_replica_prepare(); - let mut view = util.replica_view(); - // Spam it with 200 messages for different views - for _ in 0..200 { - replica_prepare.view = view.clone(); - let res = util - .process_replica_prepare(ctx, util.sign(replica_prepare.clone())) - .await; - assert_matches!(res, Ok(_)); - // Since we have 2 replicas, we have to send only even numbered views - // to hit the same leader (the other replica will be leader on odd numbered views) - view.number = view.number.next().next(); - } - // Ensure only 1 prepare_qc is in memory, as the previous 199 were discarded each time - // new message is processed - assert_eq!(util.leader.prepare_qcs.len(), 1); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_prepare_filter_functions_test() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - let replica_prepare = util.new_replica_prepare(); - let msg = util.sign(validator::ConsensusMsg::ReplicaPrepare( - replica_prepare.clone(), - )); - - // Send a msg with invalid signature - let mut invalid_msg = msg.clone(); - invalid_msg.sig = ctx.rng().gen(); - util.leader_send(invalid_msg); - - // Send a correct message - util.leader_send(msg.clone()); - - // Validate only correct message is received - assert_eq!(util.leader.inbound_pipe.recv(ctx).await.unwrap().msg, msg); - - // Send a msg with view number = 2 - let mut replica_commit_from_view_2 = replica_prepare.clone(); - replica_commit_from_view_2.view.number = ViewNumber(2); - let msg_from_view_2 = util.sign(validator::ConsensusMsg::ReplicaPrepare( - replica_commit_from_view_2, - )); - util.leader_send(msg_from_view_2); - - // Send a msg with view number = 4, will prune message from view 2 - let mut replica_commit_from_view_4 = replica_prepare.clone(); - replica_commit_from_view_4.view.number = ViewNumber(4); - let msg_from_view_4 = util.sign(validator::ConsensusMsg::ReplicaPrepare( - replica_commit_from_view_4, - )); - util.leader_send(msg_from_view_4.clone()); - - // Send a msg with view number = 3, will be discarded, as it is older than message from view 4 - let mut replica_commit_from_view_3 = replica_prepare.clone(); - replica_commit_from_view_3.view.number = ViewNumber(3); - let msg_from_view_3 = util.sign(validator::ConsensusMsg::ReplicaPrepare( - replica_commit_from_view_3, - )); - util.leader_send(msg_from_view_3); - - // Validate only message from view 4 is received - assert_eq!( - util.leader.inbound_pipe.recv(ctx).await.unwrap().msg, - msg_from_view_4 - ); - - // Send a msg from validator 0 - let msg_from_validator_0 = util.keys[0].sign_msg(validator::ConsensusMsg::ReplicaPrepare( - replica_prepare.clone(), - )); - util.leader_send(msg_from_validator_0.clone()); - - // Send a msg from validator 1 - let msg_from_validator_1 = util.keys[1].sign_msg(validator::ConsensusMsg::ReplicaPrepare( - replica_prepare.clone(), - )); - util.leader_send(msg_from_validator_1.clone()); - - //Validate both are present in the inbound_pipe - assert_eq!( - util.leader.inbound_pipe.recv(ctx).await.unwrap().msg, - msg_from_validator_0 - ); - assert_eq!( - util.leader.inbound_pipe.recv(ctx).await.unwrap().msg, - msg_from_validator_1 - ); - - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_commit_sanity() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - - util.new_leader_commit(ctx).await; - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_commit_sanity_yield_leader_commit() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - util.produce_block(ctx).await; - let replica_commit = util.new_replica_commit_from_proposal(ctx).await; - let leader_commit = util - .process_replica_commit(ctx, util.sign(replica_commit.clone())) - .await - .unwrap() - .unwrap(); - assert_eq!( - leader_commit.msg.justification, - util.new_commit_qc(|msg| *msg = replica_commit) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_commit_bad_chain() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - let rng = &mut ctx.rng(); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let mut replica_commit = util.new_replica_commit_from_proposal(ctx).await; - replica_commit.view.genesis = rng.gen(); - let res = util - .process_replica_commit(ctx, util.sign(replica_commit)) - .await; - assert_matches!( - res, - Err(replica_commit::Error::InvalidMessage( - validator::ReplicaCommitVerifyError::BadView(_) - )) - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_commit_non_validator_signer() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let replica_commit = util.new_replica_commit_from_proposal(ctx).await; - let non_validator_key: validator::SecretKey = ctx.rng().gen(); - let res = util - .process_replica_commit(ctx, non_validator_key.sign_msg(replica_commit)) - .await; - assert_matches!( - res, - Err(replica_commit::Error::NonValidatorSigner { signer }) => { - assert_eq!(*signer, non_validator_key.public()); - } - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_commit_old() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let mut replica_commit = util.new_replica_commit_from_proposal(ctx).await; - replica_commit.view.number = ViewNumber(util.replica.view_number.0 - 1); - let replica_commit = util.sign(replica_commit); - let res = util.process_replica_commit(ctx, replica_commit).await; - assert_matches!( - res, - Err(replica_commit::Error::Old { current_view, current_phase }) => { - assert_eq!(current_view, util.replica.view_number); - assert_eq!(current_phase, util.replica.phase); - } - ); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_commit_not_leader_in_view() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - util.produce_block(ctx).await; - let current_view_leader = util.view_leader(util.replica.view_number); - assert_ne!(current_view_leader, util.owner_key().public()); - let replica_commit = util.new_replica_commit(); - let res = util - .process_replica_commit(ctx, util.sign(replica_commit)) - .await; - assert_matches!(res, Err(replica_commit::Error::NotLeaderInView)); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_commit_already_exists() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - let replica_commit = util.new_replica_commit_from_proposal(ctx).await; - assert!(util - .process_replica_commit(ctx, util.sign(replica_commit.clone())) - .await - .unwrap() - .is_none()); - - // Processing twice same ReplicaCommit for same view gets DuplicateSignature error - let res = util - .process_replica_commit(ctx, util.sign(replica_commit.clone())) - .await; - assert_matches!(res, Err(replica_commit::Error::Old { .. })); - - // Processing twice different ReplicaCommit for same view gets DuplicateSignature error too - let mut different_replica_commit = replica_commit.clone(); - different_replica_commit.proposal.number = replica_commit.proposal.number.next(); - let res = util - .process_replica_commit(ctx, util.sign(different_replica_commit.clone())) - .await; - assert_matches!(res, Err(replica_commit::Error::Old { .. })); - - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_commit_num_received_below_threshold() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - let replica_prepare = util.new_replica_prepare(); - assert!(util - .process_replica_prepare(ctx, util.sign(replica_prepare.clone())) - .await - .unwrap() - .is_none()); - let replica_prepare = util.keys[1].sign_msg(replica_prepare); - let leader_prepare = util - .process_replica_prepare(ctx, replica_prepare) - .await - .unwrap() - .unwrap(); - let replica_commit = util - .process_leader_prepare(ctx, leader_prepare) - .await - .unwrap(); - util.process_replica_commit(ctx, replica_commit.clone()) - .await - .unwrap(); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_commit_invalid_sig() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - let msg = util.new_replica_commit_from_proposal(ctx).await; - let mut replica_commit = util.sign(msg); - replica_commit.sig = ctx.rng().gen(); - let res = util.process_replica_commit(ctx, replica_commit).await; - assert_matches!(res, Err(replica_commit::Error::InvalidSignature(..))); - Ok(()) - }) - .await - .unwrap(); -} - -/// ReplicaCommit received before sending out LeaderPrepare. -/// Whether leader accepts the message or rejects doesn't matter. -/// It just shouldn't crash. -#[tokio::test] -async fn replica_commit_unexpected_proposal() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - util.produce_block(ctx).await; - let replica_commit = util.new_replica_commit(); - let _ = util - .process_replica_commit(ctx, util.sign(replica_commit)) - .await; - Ok(()) - }) - .await - .unwrap(); -} - -/// Proposal should be the same for every ReplicaCommit -/// Check it doesn't fail if one validator sends a different proposal in -/// the ReplicaCommit -#[tokio::test] -async fn replica_commit_different_proposals() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new_many(ctx).await; - s.spawn_bg(runner.run(ctx)); - - let replica_commit = util.new_replica_commit_from_proposal(ctx).await; - - // Process a modified replica_commit (ie. from a malicious or wrong node) - let mut bad_replica_commit = replica_commit.clone(); - bad_replica_commit.proposal.number = replica_commit.proposal.number.next(); - util.process_replica_commit(ctx, util.sign(bad_replica_commit)) - .await - .unwrap(); - - // The rest of the validators sign the correct one - let mut replica_commit_result = None; - for i in 1..util.keys.len() { - replica_commit_result = util - .process_replica_commit(ctx, util.keys[i].sign_msg(replica_commit.clone())) - .await - .unwrap(); - } - - // Check correct proposal has been committed - assert_matches!( - replica_commit_result, - Some(leader_commit) => { - assert_eq!( - leader_commit.msg.justification.message.proposal, - replica_commit.proposal - ); - } - ); - Ok(()) - }) - .await - .unwrap(); -} - -/// Check that leader won't accumulate undefined amount of messages if -/// it's spammed with ReplicaCommit messages for future views -#[tokio::test] -async fn replica_commit_limit_messages_in_memory() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - let mut replica_commit = util.new_replica_commit_from_proposal(ctx).await; - let mut view = util.replica_view(); - // Spam it with 200 messages for different views - for _ in 0..200 { - replica_commit.view = view.clone(); - let res = util - .process_replica_commit(ctx, util.sign(replica_commit.clone())) - .await; - assert_matches!(res, Ok(_)); - // Since we have 2 replicas, we have to send only even numbered views - // to hit the same leader (the other replica will be leader on odd numbered views) - view.number = view.number.next().next(); - } - // Ensure only 1 commit_qc is in memory, as the previous 199 were discarded each time - // new message is processed - assert_eq!(util.leader.commit_qcs.len(), 1); - Ok(()) - }) - .await - .unwrap(); -} - -#[tokio::test] -async fn replica_commit_filter_functions_test() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 2).await; - s.spawn_bg(runner.run(ctx)); - - let replica_commit = util.new_replica_commit_from_proposal(ctx).await; - let msg = util.sign(validator::ConsensusMsg::ReplicaCommit( - replica_commit.clone(), - )); - - // Send a msg with invalid signature - let mut invalid_msg = msg.clone(); - invalid_msg.sig = ctx.rng().gen(); - util.leader_send(invalid_msg); - - // Send a correct message - util.leader_send(msg.clone()); - - // Validate only correct message is received - assert_eq!(util.leader.inbound_pipe.recv(ctx).await.unwrap().msg, msg); - - // Send a msg with view number = 2 - let mut replica_commit_from_view_2 = replica_commit.clone(); - replica_commit_from_view_2.view.number = ViewNumber(2); - let msg_from_view_2 = util.sign(validator::ConsensusMsg::ReplicaCommit( - replica_commit_from_view_2, - )); - util.leader_send(msg_from_view_2); - - // Send a msg with view number = 4, will prune message from view 2 - let mut replica_commit_from_view_4 = replica_commit.clone(); - replica_commit_from_view_4.view.number = ViewNumber(4); - let msg_from_view_4 = util.sign(validator::ConsensusMsg::ReplicaCommit( - replica_commit_from_view_4, - )); - util.leader_send(msg_from_view_4.clone()); - - // Send a msg with view number = 3, will be discarded, as it is older than message from view 4 - let mut replica_commit_from_view_3 = replica_commit.clone(); - replica_commit_from_view_3.view.number = ViewNumber(3); - let msg_from_view_3 = util.sign(validator::ConsensusMsg::ReplicaCommit( - replica_commit_from_view_3, - )); - util.leader_send(msg_from_view_3); - - // Validate only message from view 4 is received - assert_eq!( - util.leader.inbound_pipe.recv(ctx).await.unwrap().msg, - msg_from_view_4 - ); - - // Send a msg from validator 0 - let msg_from_validator_0 = util.keys[0].sign_msg(validator::ConsensusMsg::ReplicaCommit( - replica_commit.clone(), - )); - util.leader_send(msg_from_validator_0.clone()); - - // Send a msg from validator 1 - let msg_from_validator_1 = util.keys[1].sign_msg(validator::ConsensusMsg::ReplicaCommit( - replica_commit.clone(), - )); - util.leader_send(msg_from_validator_1.clone()); - - //Validate both are present in the inbound_pipe - assert_eq!( - util.leader.inbound_pipe.recv(ctx).await.unwrap().msg, - msg_from_validator_0 - ); - assert_eq!( - util.leader.inbound_pipe.recv(ctx).await.unwrap().msg, - msg_from_validator_1 - ); - - Ok(()) - }) - .await - .unwrap(); -} diff --git a/node/actors/bft/src/chonky_bft/tests/commit.rs b/node/actors/bft/src/chonky_bft/tests/commit.rs new file mode 100644 index 00000000..d02507e6 --- /dev/null +++ b/node/actors/bft/src/chonky_bft/tests/commit.rs @@ -0,0 +1,422 @@ +use crate::chonky_bft::{commit, testonly::UTHarness}; +use assert_matches::assert_matches; +use rand::Rng; +use zksync_concurrency::{ctx, scope}; +use zksync_consensus_roles::validator; + +#[tokio::test] +async fn commit_yield_new_view_sanity() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + let cur_view = util.replica.view_number; + let replica_commit = util.new_replica_commit(ctx).await; + let new_view = util + .process_replica_commit_all(ctx, replica_commit.clone()) + .await + .msg; + + assert_eq!(new_view.view().number, cur_view.next()); + assert_matches!(new_view.justification, validator::ProposalJustification::Commit(qc) => { + assert_eq!(qc.message.proposal, replica_commit.proposal); + }); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn commit_non_validator_signer() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let replica_commit = util.new_replica_commit(ctx).await; + let non_validator_key: validator::SecretKey = ctx.rng().gen(); + let res = util + .process_replica_commit(ctx, non_validator_key.sign_msg(replica_commit)) + .await; + + assert_matches!( + res, + Err(commit::Error::NonValidatorSigner { signer }) => { + assert_eq!(*signer, non_validator_key.public()); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn replica_commit_old() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let mut replica_commit = util.new_replica_commit(ctx).await; + replica_commit.view.number = validator::ViewNumber(util.replica.view_number.0 - 1); + let replica_commit = util.owner_key().sign_msg(replica_commit); + let res = util.process_replica_commit(ctx, replica_commit).await; + + assert_matches!( + res, + Err(commit::Error::Old { current_view }) => { + assert_eq!(current_view, util.replica.view_number); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn commit_duplicate_signer() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; + s.spawn_bg(runner.run(ctx)); + + let mut replica_commit = util.new_replica_commit(ctx).await; + assert!(util + .process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit.clone())) + .await + .unwrap() + .is_none()); + + // Processing twice same ReplicaCommit for same view gets DuplicateSigner error + let res = util + .process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit.clone())) + .await; + assert_matches!( + res, + Err(commit::Error::DuplicateSigner { + message_view, + signer + })=> { + assert_eq!(message_view, util.replica.view_number); + assert_eq!(*signer, util.owner_key().public()); + } + ); + + // Processing twice different ReplicaCommit for same view gets DuplicateSigner error too + replica_commit.proposal.number = replica_commit.proposal.number.next(); + let res = util + .process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit.clone())) + .await; + assert_matches!( + res, + Err(commit::Error::DuplicateSigner { + message_view, + signer + })=> { + assert_eq!(message_view, util.replica.view_number); + assert_eq!(*signer, util.owner_key().public()); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn commit_invalid_sig() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let msg = util.new_replica_commit(ctx).await; + let mut replica_commit = util.owner_key().sign_msg(msg); + replica_commit.sig = ctx.rng().gen(); + + let res = util.process_replica_commit(ctx, replica_commit).await; + assert_matches!(res, Err(commit::Error::InvalidSignature(..))); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn commit_invalid_message() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let mut replica_commit = util.new_replica_commit(ctx).await; + replica_commit.view.genesis = rng.gen(); + + let res = util + .process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit)) + .await; + assert_matches!(res, Err(commit::Error::InvalidMessage(_))); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn replica_commit_num_received_below_threshold() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + let replica_commit = util.new_replica_commit(ctx).await; + for i in 0..util.genesis().validators.quorum_threshold() as usize - 1 { + assert!(util + .process_replica_commit(ctx, util.keys[i].sign_msg(replica_commit.clone())) + .await + .unwrap() + .is_none()); + } + let res = util + .process_replica_commit( + ctx, + util.keys[util.genesis().validators.quorum_threshold() as usize - 1] + .sign_msg(replica_commit.clone()), + ) + .await + .unwrap() + .unwrap() + .msg; + assert_matches!(res.justification, validator::ProposalJustification::Commit(qc) => { + assert_eq!(qc.message.proposal, replica_commit.proposal); + }); + for i in util.genesis().validators.quorum_threshold() as usize..util.keys.len() { + let res = util + .process_replica_commit(ctx, util.keys[i].sign_msg(replica_commit.clone())) + .await; + assert_matches!(res, Err(commit::Error::Old { .. })); + } + + Ok(()) + }) + .await + .unwrap(); +} + +/// ReplicaCommit received before receiving LeaderProposal. +/// Whether replica accepts or rejects the message it doesn't matter. +/// It just shouldn't crash. +#[tokio::test] +async fn replica_commit_unexpected_proposal() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + util.produce_block(ctx).await; + let replica_commit = validator::ReplicaCommit { + view: util.view(), + proposal: validator::BlockHeader { + number: util + .replica + .high_commit_qc + .as_ref() + .unwrap() + .message + .proposal + .number + .next(), + payload: ctx.rng().gen(), + }, + }; + + let _ = util + .process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit)) + .await; + + Ok(()) + }) + .await + .unwrap(); +} + +/// Proposal should be the same for every ReplicaCommit +/// Check it doesn't fail if one validator sends a different proposal in +/// the ReplicaCommit +#[tokio::test] +async fn replica_commit_different_proposals() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + let replica_commit = util.new_replica_commit(ctx).await; + + // Process a modified replica_commit (ie. from a malicious or wrong node) + let mut bad_replica_commit = replica_commit.clone(); + bad_replica_commit.proposal.number = replica_commit.proposal.number.next(); + util.process_replica_commit(ctx, util.owner_key().sign_msg(bad_replica_commit)) + .await + .unwrap(); + + // The rest of the validators sign the correct one + let mut replica_commit_result = None; + for i in 1..util.keys.len() { + replica_commit_result = util + .process_replica_commit(ctx, util.keys[i].sign_msg(replica_commit.clone())) + .await + .unwrap(); + } + + // Check correct proposal has been committed + assert_matches!(replica_commit_result.unwrap().msg.justification, validator::ProposalJustification::Commit(qc) => { + assert_eq!(qc.message.proposal, replica_commit.proposal); + }); + + Ok(()) + }) + .await + .unwrap(); +} + +/// Check that leader won't accumulate undefined amount of messages if +/// it's spammed with ReplicaCommit messages for future views +#[tokio::test] +async fn replica_commit_limit_messages_in_memory() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; + s.spawn_bg(runner.run(ctx)); + + let mut replica_commit = util.new_replica_commit(ctx).await; + let mut view = util.view(); + // Spam it with 200 messages for different views + for _ in 0..200 { + replica_commit.view = view.clone(); + let res = util + .process_replica_commit(ctx, util.owner_key().sign_msg(replica_commit.clone())) + .await; + assert_matches!(res, Ok(_)); + view.number = view.number.next(); + } + + // Ensure only 1 commit_qc is in memory, as the previous 199 were discarded each time + // a new message was processed + assert_eq!(util.replica.commit_qcs_cache.len(), 1); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn replica_commit_filter_functions_test() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; + s.spawn_bg(runner.run(ctx)); + + let replica_commit = util.new_replica_commit(ctx).await; + let msg = util + .owner_key() + .sign_msg(validator::ConsensusMsg::ReplicaCommit( + replica_commit.clone(), + )); + + // Send a msg with invalid signature + let mut invalid_msg = msg.clone(); + invalid_msg.sig = ctx.rng().gen(); + util.send(invalid_msg); + + // Send a correct message + util.send(msg.clone()); + + // Validate only correct message is received + assert_eq!(util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, msg); + + // Send a msg with view number = 2 + let mut replica_commit_from_view_2 = replica_commit.clone(); + replica_commit_from_view_2.view.number = validator::ViewNumber(2); + let msg_from_view_2 = util + .owner_key() + .sign_msg(validator::ConsensusMsg::ReplicaCommit( + replica_commit_from_view_2, + )); + util.send(msg_from_view_2); + + // Send a msg with view number = 4, will prune message from view 2 + let mut replica_commit_from_view_4 = replica_commit.clone(); + replica_commit_from_view_4.view.number = validator::ViewNumber(4); + let msg_from_view_4 = util + .owner_key() + .sign_msg(validator::ConsensusMsg::ReplicaCommit( + replica_commit_from_view_4, + )); + util.send(msg_from_view_4.clone()); + + // Send a msg with view number = 3, will be discarded, as it is older than message from view 4 + let mut replica_commit_from_view_3 = replica_commit.clone(); + replica_commit_from_view_3.view.number = validator::ViewNumber(3); + let msg_from_view_3 = util + .owner_key() + .sign_msg(validator::ConsensusMsg::ReplicaCommit( + replica_commit_from_view_3, + )); + util.send(msg_from_view_3); + + // Validate only message from view 4 is received + assert_eq!( + util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + msg_from_view_4 + ); + + // Send a msg from validator 0 + let msg_from_validator_0 = util.keys[0].sign_msg(validator::ConsensusMsg::ReplicaCommit( + replica_commit.clone(), + )); + util.send(msg_from_validator_0.clone()); + + // Send a msg from validator 1 + let msg_from_validator_1 = util.keys[1].sign_msg(validator::ConsensusMsg::ReplicaCommit( + replica_commit.clone(), + )); + util.send(msg_from_validator_1.clone()); + + //Validate both are present in the inbound_pipe + assert_eq!( + util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + msg_from_validator_0 + ); + assert_eq!( + util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + msg_from_validator_1 + ); + + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/bft/src/chonky_bft/tests/mod.rs b/node/actors/bft/src/chonky_bft/tests/mod.rs new file mode 100644 index 00000000..1cd17292 --- /dev/null +++ b/node/actors/bft/src/chonky_bft/tests/mod.rs @@ -0,0 +1,87 @@ +use crate::chonky_bft::testonly::UTHarness; +use zksync_concurrency::{ctx, scope}; +use zksync_consensus_roles::validator; + +mod commit; +mod proposal; +mod timeout; + +/// Sanity check of the happy path. +#[tokio::test] +async fn block_production() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + util.produce_block(ctx).await; + + Ok(()) + }) + .await + .unwrap(); +} + +/// Sanity check of block production after timeout +#[tokio::test] +async fn block_production_timeout() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + util.produce_block_after_timeout(ctx).await; + + Ok(()) + }) + .await + .unwrap(); +} + +/// Sanity check of block production with reproposal. +#[tokio::test] +async fn reproposal_block_production() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + let proposal = util.new_leader_proposal(ctx).await; + let replica_commit = util + .process_leader_proposal(ctx, util.leader_key().sign_msg(proposal.clone())) + .await + .unwrap() + .msg; + + let mut timeout = validator::ReplicaTimeout { + view: replica_commit.view.clone(), + high_vote: Some(replica_commit.clone()), + high_qc: util.replica.high_commit_qc.clone(), + }; + for i in 0..util.genesis().validators.subquorum_threshold() as usize { + util.process_replica_timeout(ctx, util.keys[i].sign_msg(timeout.clone())) + .await + .unwrap(); + } + timeout.high_vote = None; + for i in util.genesis().validators.subquorum_threshold() as usize..util.keys.len() { + let _ = util + .process_replica_timeout(ctx, util.keys[i].sign_msg(timeout.clone())) + .await; + } + + assert!(util.replica.high_commit_qc.is_none()); + util.produce_block(ctx).await; + assert_eq!( + util.replica.high_commit_qc.unwrap().message.proposal, + replica_commit.proposal + ); + + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/bft/src/chonky_bft/tests/proposal.rs b/node/actors/bft/src/chonky_bft/tests/proposal.rs new file mode 100644 index 00000000..78177b5d --- /dev/null +++ b/node/actors/bft/src/chonky_bft/tests/proposal.rs @@ -0,0 +1,336 @@ +use crate::{ + chonky_bft::{ + proposal, + testonly::{UTHarness, MAX_PAYLOAD_SIZE}, + }, + testonly::RejectPayload, +}; +use assert_matches::assert_matches; +use rand::Rng; +use zksync_concurrency::{ctx, scope}; +use zksync_consensus_roles::validator; + +#[tokio::test] +async fn proposal_yield_replica_commit_sanity() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let proposal = util.new_leader_proposal(ctx).await; + let replica_commit = util + .process_leader_proposal(ctx, util.owner_key().sign_msg(proposal.clone())) + .await + .unwrap(); + + assert_eq!( + replica_commit.msg, + validator::ReplicaCommit { + view: proposal.view().clone(), + proposal: validator::BlockHeader { + number: proposal.justification.get_implied_block(util.genesis()).0, + payload: proposal.proposal_payload.unwrap().hash() + }, + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn proposal_old_view() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let proposal = util.new_leader_proposal(ctx).await; + + util.replica.phase = validator::Phase::Commit; + + let res = util + .process_leader_proposal(ctx, util.leader_key().sign_msg(proposal.clone())) + .await; + + assert_matches!( + res, + Err(proposal::Error::Old { current_view, current_phase }) => { + assert_eq!(current_view, util.replica.view_number); + assert_eq!(current_phase, util.replica.phase); + } + ); + + util.replica.phase = validator::Phase::Timeout; + + let res = util + .process_leader_proposal(ctx, util.leader_key().sign_msg(proposal.clone())) + .await; + + assert_matches!( + res, + Err(proposal::Error::Old { current_view, current_phase }) => { + assert_eq!(current_view, util.replica.view_number); + assert_eq!(current_phase, util.replica.phase); + } + ); + + util.replica.phase = validator::Phase::Prepare; + util.replica.view_number = util.replica.view_number.next(); + + let res = util + .process_leader_proposal(ctx, util.leader_key().sign_msg(proposal)) + .await; + + assert_matches!( + res, + Err(proposal::Error::Old { current_view, current_phase }) => { + assert_eq!(current_view, util.replica.view_number); + assert_eq!(current_phase, util.replica.phase); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn proposal_invalid_leader() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; + s.spawn_bg(runner.run(ctx)); + + let proposal = util.new_leader_proposal(ctx).await; + + assert_ne!( + util.view_leader(proposal.view().number), + util.owner_key().public() + ); + + let res = util + .process_leader_proposal(ctx, util.owner_key().sign_msg(proposal)) + .await; + + assert_matches!( + res, + Err(proposal::Error::InvalidLeader { correct_leader, received_leader }) => { + assert_eq!(correct_leader, util.keys[1].public()); + assert_eq!(received_leader, util.keys[0].public()); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn proposal_invalid_signature() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; + s.spawn_bg(runner.run(ctx)); + + let proposal = util.new_leader_proposal(ctx).await; + let mut signed_proposal = util.leader_key().sign_msg(proposal); + signed_proposal.sig = ctx.rng().gen(); + + let res = util.process_leader_proposal(ctx, signed_proposal).await; + + assert_matches!(res, Err(proposal::Error::InvalidSignature(_))); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn proposal_invalid_message() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let mut proposal = util.new_leader_proposal(ctx).await; + proposal.justification = ctx.rng().gen(); + let res = util + .process_leader_proposal(ctx, util.leader_key().sign_msg(proposal)) + .await; + + assert_matches!(res, Err(proposal::Error::InvalidMessage(_))); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn proposal_pruned_block() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let fake_commit = validator::ReplicaCommit { + view: util.view(), + proposal: validator::BlockHeader { + number: util + .replica + .config + .block_store + .queued() + .first + .prev() + .unwrap() + .prev() + .unwrap(), + payload: ctx.rng().gen(), + }, + }; + + util.process_replica_commit_all(ctx, fake_commit).await; + + // The replica should now produce a proposal for an already pruned block number. + let proposal = util.new_leader_proposal(ctx).await; + + let res = util + .process_leader_proposal(ctx, util.leader_key().sign_msg(proposal)) + .await; + + assert_matches!(res, Err(proposal::Error::ProposalAlreadyPruned)); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn proposal_missing_payload() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let mut proposal = util.new_leader_proposal(ctx).await; + proposal.proposal_payload = None; + + let res = util + .process_leader_proposal(ctx, util.leader_key().sign_msg(proposal)) + .await; + + assert_matches!(res, Err(proposal::Error::MissingPayload)); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn proposal_proposal_oversized_payload() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let payload = validator::Payload(vec![0; MAX_PAYLOAD_SIZE + 1]); + let mut proposal = util.new_leader_proposal(ctx).await; + proposal.proposal_payload = Some(payload); + + let res = util + .process_leader_proposal(ctx, util.owner_key().sign_msg(proposal)) + .await; + assert_matches!( + res, + Err(proposal::Error::ProposalOversizedPayload{ payload_size }) => { + assert_eq!(payload_size, MAX_PAYLOAD_SIZE + 1); + } + ); + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn proposal_missing_previous_payload() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let missing_payload_number = util.replica.config.block_store.queued().first.next(); + let fake_commit = validator::ReplicaCommit { + view: util.view(), + proposal: validator::BlockHeader { + number: missing_payload_number, + payload: ctx.rng().gen(), + }, + }; + + util.process_replica_commit_all(ctx, fake_commit).await; + + let proposal = validator::LeaderProposal { + proposal_payload: Some(ctx.rng().gen()), + justification: validator::ProposalJustification::Commit( + util.replica.high_commit_qc.clone().unwrap(), + ), + }; + + let res = util + .process_leader_proposal(ctx, util.leader_key().sign_msg(proposal)) + .await; + + assert_matches!( + res, + Err(proposal::Error::MissingPreviousPayload { prev_number } ) => { + assert_eq!(prev_number, missing_payload_number); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn proposal_invalid_payload() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = + UTHarness::new_with_payload_manager(ctx, 1, Box::new(RejectPayload)).await; + s.spawn_bg(runner.run(ctx)); + + let proposal = util.new_leader_proposal(ctx).await; + + let res = util + .process_leader_proposal(ctx, util.leader_key().sign_msg(proposal)) + .await; + + assert_matches!(res, Err(proposal::Error::InvalidPayload(_))); + + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/bft/src/chonky_bft/tests/timeout.rs b/node/actors/bft/src/chonky_bft/tests/timeout.rs new file mode 100644 index 00000000..c2b64ea3 --- /dev/null +++ b/node/actors/bft/src/chonky_bft/tests/timeout.rs @@ -0,0 +1,435 @@ +use crate::chonky_bft::{testonly::UTHarness, timeout}; +use assert_matches::assert_matches; +use rand::Rng; +use zksync_concurrency::{ctx, scope}; +use zksync_consensus_roles::validator; + +#[tokio::test] +async fn timeout_yield_new_view_sanity() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + let cur_view = util.replica.view_number; + let replica_timeout = util.new_replica_timeout(); + let new_view = util + .process_replica_timeout_all(ctx, replica_timeout.clone()) + .await + .msg; + + assert_eq!(new_view.view().number, cur_view.next()); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn timeout_non_validator_signer() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let replica_timeout = util.new_replica_timeout(); + let non_validator_key: validator::SecretKey = ctx.rng().gen(); + let res = util + .process_replica_timeout(ctx, non_validator_key.sign_msg(replica_timeout)) + .await; + + assert_matches!( + res, + Err(timeout::Error::NonValidatorSigner { signer }) => { + assert_eq!(*signer, non_validator_key.public()); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn replica_timeout_old() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let mut replica_timeout = util.new_replica_timeout(); + replica_timeout.view.number = validator::ViewNumber(util.replica.view_number.0 - 1); + let replica_timeout = util.owner_key().sign_msg(replica_timeout); + let res = util.process_replica_timeout(ctx, replica_timeout).await; + + assert_matches!( + res, + Err(timeout::Error::Old { current_view }) => { + assert_eq!(current_view, util.replica.view_number); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn timeout_duplicate_signer() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; + s.spawn_bg(runner.run(ctx)); + + util.produce_block(ctx).await; + + let replica_timeout = util.new_replica_timeout(); + assert!(util + .process_replica_timeout(ctx, util.owner_key().sign_msg(replica_timeout.clone())) + .await + .unwrap() + .is_none()); + + // Processing twice same ReplicaTimeout for same view gets DuplicateSigner error + let res = util + .process_replica_timeout(ctx, util.owner_key().sign_msg(replica_timeout.clone())) + .await; + assert_matches!( + res, + Err(timeout::Error::DuplicateSigner { + message_view, + signer + })=> { + assert_eq!(message_view, util.replica.view_number); + assert_eq!(*signer, util.owner_key().public()); + } + ); + + // Processing twice different ReplicaTimeout for same view gets DuplicateSigner error too + // replica_timeout.high_vote = None; + let res = util + .process_replica_timeout(ctx, util.owner_key().sign_msg(replica_timeout.clone())) + .await; + assert_matches!( + res, + Err(timeout::Error::DuplicateSigner { + message_view, + signer + })=> { + assert_eq!(message_view, util.replica.view_number); + assert_eq!(*signer, util.owner_key().public()); + } + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn timeout_invalid_sig() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let msg = util.new_replica_timeout(); + let mut replica_timeout = util.owner_key().sign_msg(msg); + replica_timeout.sig = ctx.rng().gen(); + + let res = util.process_replica_timeout(ctx, replica_timeout).await; + assert_matches!(res, Err(timeout::Error::InvalidSignature(..))); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn timeout_invalid_message() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let replica_timeout = util.new_replica_timeout(); + + let mut bad_replica_timeout = replica_timeout.clone(); + bad_replica_timeout.view.genesis = rng.gen(); + let res = util + .process_replica_timeout(ctx, util.owner_key().sign_msg(bad_replica_timeout)) + .await; + assert_matches!( + res, + Err(timeout::Error::InvalidMessage( + validator::ReplicaTimeoutVerifyError::BadView(_) + )) + ); + + let mut bad_replica_timeout = replica_timeout.clone(); + bad_replica_timeout.high_vote = Some(rng.gen()); + let res = util + .process_replica_timeout(ctx, util.owner_key().sign_msg(bad_replica_timeout)) + .await; + assert_matches!( + res, + Err(timeout::Error::InvalidMessage( + validator::ReplicaTimeoutVerifyError::InvalidHighVote(_) + )) + ); + + let mut bad_replica_timeout = replica_timeout.clone(); + bad_replica_timeout.high_qc = Some(rng.gen()); + let res = util + .process_replica_timeout(ctx, util.owner_key().sign_msg(bad_replica_timeout)) + .await; + assert_matches!( + res, + Err(timeout::Error::InvalidMessage( + validator::ReplicaTimeoutVerifyError::InvalidHighQC(_) + )) + ); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn timeout_num_received_below_threshold() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + let replica_timeout = util.new_replica_timeout(); + for i in 0..util.genesis().validators.quorum_threshold() as usize - 1 { + assert!(util + .process_replica_timeout(ctx, util.keys[i].sign_msg(replica_timeout.clone())) + .await + .unwrap() + .is_none()); + } + let res = util + .process_replica_timeout( + ctx, + util.keys[util.genesis().validators.quorum_threshold() as usize - 1] + .sign_msg(replica_timeout.clone()), + ) + .await + .unwrap() + .unwrap() + .msg; + assert_matches!(res.justification, validator::ProposalJustification::Timeout(qc) => { + assert_eq!(qc.view, replica_timeout.view); + }); + for i in util.genesis().validators.quorum_threshold() as usize..util.keys.len() { + let res = util + .process_replica_timeout(ctx, util.keys[i].sign_msg(replica_timeout.clone())) + .await; + assert_matches!(res, Err(timeout::Error::Old { .. })); + } + + Ok(()) + }) + .await + .unwrap(); +} + +/// Check all ReplicaTimeout are included for weight calculation +/// even on different messages for the same view. +#[tokio::test] +async fn timeout_weight_different_messages() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new_many(ctx).await; + s.spawn_bg(runner.run(ctx)); + + let view = util.view(); + util.produce_block(ctx).await; + + let replica_timeout = util.new_replica_timeout(); + let proposal = replica_timeout.clone().high_vote.unwrap().proposal; + + // Create a different proposal for the same view + let mut different_proposal = proposal; + different_proposal.number = different_proposal.number.next(); + + // Create a new ReplicaTimeout with the different proposal + let mut other_replica_timeout = replica_timeout.clone(); + let mut high_vote = other_replica_timeout.high_vote.clone().unwrap(); + high_vote.proposal = different_proposal; + let high_qc = util + .new_commit_qc(ctx, |msg: &mut validator::ReplicaCommit| { + msg.proposal = different_proposal; + msg.view = view; + }) + .await; + other_replica_timeout.high_vote = Some(high_vote); + other_replica_timeout.high_qc = Some(high_qc); + + let validators = util.keys.len(); + + // half of the validators sign replica_timeout + for i in 0..validators / 2 { + util.process_replica_timeout(ctx, util.keys[i].sign_msg(replica_timeout.clone())) + .await + .unwrap(); + } + + let mut res = None; + // The rest of the validators until threshold sign other_replica_timeout + for i in validators / 2..util.genesis().validators.quorum_threshold() as usize { + res = util + .process_replica_timeout(ctx, util.keys[i].sign_msg(other_replica_timeout.clone())) + .await + .unwrap(); + } + + assert_matches!(res.unwrap().msg.justification, validator::ProposalJustification::Timeout(qc) => { + assert_eq!(qc.view, replica_timeout.view); + assert_eq!(qc.high_vote(util.genesis()).unwrap(), proposal); + }); + + Ok(()) + }) + .await + .unwrap(); +} + +/// Check that leader won't accumulate undefined amount of messages if +/// it's spammed with ReplicaTimeout messages for future views +#[tokio::test] +async fn replica_timeout_limit_messages_in_memory() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; + s.spawn_bg(runner.run(ctx)); + + let mut replica_timeout = util.new_replica_timeout(); + let mut view = util.view(); + // Spam it with 200 messages for different views + for _ in 0..200 { + replica_timeout.view = view.clone(); + let res = util + .process_replica_timeout(ctx, util.owner_key().sign_msg(replica_timeout.clone())) + .await; + assert_matches!(res, Ok(_)); + view.number = view.number.next(); + } + + // Ensure only 1 timeout_qc is in memory, as the previous 199 were discarded each time + // a new message was processed + assert_eq!(util.replica.timeout_qcs_cache.len(), 1); + + Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn replica_timeout_filter_functions_test() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 2).await; + s.spawn_bg(runner.run(ctx)); + + let replica_timeout = util.new_replica_timeout(); + let msg = util + .owner_key() + .sign_msg(validator::ConsensusMsg::ReplicaTimeout( + replica_timeout.clone(), + )); + + // Send a msg with invalid signature + let mut invalid_msg = msg.clone(); + invalid_msg.sig = ctx.rng().gen(); + util.send(invalid_msg); + + // Send a correct message + util.send(msg.clone()); + + // Validate only correct message is received + assert_eq!(util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, msg); + + // Send a msg with view number = 2 + let mut replica_timeout_from_view_2 = replica_timeout.clone(); + replica_timeout_from_view_2.view.number = validator::ViewNumber(2); + let msg_from_view_2 = util + .owner_key() + .sign_msg(validator::ConsensusMsg::ReplicaTimeout( + replica_timeout_from_view_2, + )); + util.send(msg_from_view_2); + + // Send a msg with view number = 4, will prune message from view 2 + let mut replica_timeout_from_view_4 = replica_timeout.clone(); + replica_timeout_from_view_4.view.number = validator::ViewNumber(4); + let msg_from_view_4 = util + .owner_key() + .sign_msg(validator::ConsensusMsg::ReplicaTimeout( + replica_timeout_from_view_4, + )); + util.send(msg_from_view_4.clone()); + + // Send a msg with view number = 3, will be discarded, as it is older than message from view 4 + let mut replica_timeout_from_view_3 = replica_timeout.clone(); + replica_timeout_from_view_3.view.number = validator::ViewNumber(3); + let msg_from_view_3 = util + .owner_key() + .sign_msg(validator::ConsensusMsg::ReplicaTimeout( + replica_timeout_from_view_3, + )); + util.send(msg_from_view_3); + + // Validate only message from view 4 is received + assert_eq!( + util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + msg_from_view_4 + ); + + // Send a msg from validator 0 + let msg_from_validator_0 = util.keys[0].sign_msg(validator::ConsensusMsg::ReplicaTimeout( + replica_timeout.clone(), + )); + util.send(msg_from_validator_0.clone()); + + // Send a msg from validator 1 + let msg_from_validator_1 = util.keys[1].sign_msg(validator::ConsensusMsg::ReplicaTimeout( + replica_timeout.clone(), + )); + util.send(msg_from_validator_1.clone()); + + // Validate both are present in the inbound_pipe + assert_eq!( + util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + msg_from_validator_0 + ); + assert_eq!( + util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + msg_from_validator_1 + ); + + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/bft/src/chonky_bft/timeout.rs b/node/actors/bft/src/chonky_bft/timeout.rs index a1036780..3884e947 100644 --- a/node/actors/bft/src/chonky_bft/timeout.rs +++ b/node/actors/bft/src/chonky_bft/timeout.rs @@ -14,7 +14,7 @@ pub(crate) enum Error { /// Signer of the message. signer: Box, }, - /// Past view or phase. + /// Past view. #[error("past view (current view: {current_view:?})")] Old { /// Current view. @@ -79,7 +79,7 @@ impl StateMachine { } // If we already have a message from the same validator for the same or past view, ignore it. - if let Some(&view) = self.commit_views_cache.get(author) { + if let Some(&view) = self.timeout_views_cache.get(author) { if view >= message.view.number { return Err(Error::DuplicateSigner { message_view: message.view.number, diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index 8a3fab22..b00f227a 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -14,8 +14,8 @@ mod config; pub mod io; mod metrics; pub mod testonly; -#[cfg(test)] -mod tests; +//#[cfg(test)] +//mod tests; /// Protocol version of this BFT implementation. pub const PROTOCOL_VERSION: validator::ProtocolVersion = validator::ProtocolVersion::CURRENT; diff --git a/node/actors/bft/src/testonly/make.rs b/node/actors/bft/src/testonly/make.rs index d2b49113..13382860 100644 --- a/node/actors/bft/src/testonly/make.rs +++ b/node/actors/bft/src/testonly/make.rs @@ -1,9 +1,23 @@ //! This module contains utilities that are only meant for testing purposes. +use crate::io::InputMessage; use crate::PayloadManager; -use rand::Rng as _; +use rand::{distributions::Standard, prelude::Distribution, Rng}; use zksync_concurrency::ctx; +use zksync_concurrency::oneshot; +use zksync_consensus_network::io::ConsensusReq; use zksync_consensus_roles::validator; +// Generates a random InputMessage. +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> InputMessage { + let (send, _) = oneshot::channel(); + InputMessage::Network(ConsensusReq { + msg: rng.gen(), + ack: send, + }) + } +} + /// Produces random payload of a given size. #[derive(Debug)] pub struct RandomPayload(pub usize); diff --git a/node/actors/bft/src/testonly/mod.rs b/node/actors/bft/src/testonly/mod.rs index 504bb149..03aed7c0 100644 --- a/node/actors/bft/src/testonly/mod.rs +++ b/node/actors/bft/src/testonly/mod.rs @@ -1,33 +1,15 @@ //! This module contains utilities that are only meant for testing purposes. -use crate::io::InputMessage; -use rand::{distributions::Standard, prelude::Distribution, Rng}; -use zksync_concurrency::oneshot; -use zksync_consensus_network::io::ConsensusReq; - mod make; #[cfg(test)] mod node; #[cfg(test)] mod run; #[cfg(test)] -pub(crate) mod ut_harness; +pub mod twins; pub use make::*; #[cfg(test)] pub(crate) use node::*; #[cfg(test)] pub(crate) use run::*; -#[cfg(test)] -pub mod twins; - -// Generates a random InputMessage. -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> InputMessage { - let (send, _) = oneshot::channel(); - InputMessage::Network(ConsensusReq { - msg: rng.gen(), - ack: send, - }) - } -} diff --git a/node/libs/roles/src/validator/messages/leader_proposal.rs b/node/libs/roles/src/validator/messages/leader_proposal.rs index b0ea5faa..e05c6668 100644 --- a/node/libs/roles/src/validator/messages/leader_proposal.rs +++ b/node/libs/roles/src/validator/messages/leader_proposal.rs @@ -112,9 +112,11 @@ impl ProposalJustification { // Either the previous proposal was finalized or we know for certain // that it couldn't have been finalized (because there is no high vote). // Either way, we can propose a new block. + + // If there is no high QC, then we must be at the start of the chain. let block_number = match high_qc { Some(qc) => qc.header().number.next(), - None => BlockNumber(0), + None => genesis.first_block, }; (block_number, None)