diff --git a/node/Cargo.lock b/node/Cargo.lock index 7daf63bc..2484f909 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -2971,6 +2971,7 @@ name = "zksync_consensus_network" version = "0.1.0" dependencies = [ "anyhow", + "assert_matches", "async-trait", "im", "once_cell", diff --git a/node/actors/bft/src/replica/state_machine.rs b/node/actors/bft/src/replica/state_machine.rs index 355ce98f..f1dd946e 100644 --- a/node/actors/bft/src/replica/state_machine.rs +++ b/node/actors/bft/src/replica/state_machine.rs @@ -27,7 +27,6 @@ pub(crate) struct StateMachine { /// The highest commit quorum certificate known to the replica. pub(crate) high_qc: Option, /// A cache of the received block proposals. - // TODO: this should be invalidated per view, not block number which is no longer monotone pub(crate) block_proposal_cache: BTreeMap>, /// The deadline to receive an input message. diff --git a/node/actors/network/Cargo.toml b/node/actors/network/Cargo.toml index 0edd257c..e1e835a9 100644 --- a/node/actors/network/Cargo.toml +++ b/node/actors/network/Cargo.toml @@ -27,6 +27,7 @@ tracing.workspace = true vise.workspace = true [dev-dependencies] +assert_matches.workspace = true pretty_assertions.workspace = true test-casing.workspace = true tokio.workspace = true diff --git a/node/actors/network/src/consensus/handshake/tests.rs b/node/actors/network/src/consensus/handshake/tests.rs index 093d067b..2aca99c8 100644 --- a/node/actors/network/src/consensus/handshake/tests.rs +++ b/node/actors/network/src/consensus/handshake/tests.rs @@ -4,6 +4,7 @@ use rand::Rng; use zksync_concurrency::{ctx, io, scope, testonly::abort_on_panic}; use zksync_consensus_roles::validator; use zksync_protobuf::testonly::test_encode_random; +use assert_matches::assert_matches; #[test] fn test_schema_encode_decode() { @@ -113,6 +114,50 @@ async fn test_peer_mismatch() { .unwrap(); } +#[tokio::test] +async fn test_genesis_mismatch() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + + let key0: validator::SecretKey = rng.gen(); + let key1: validator::SecretKey = rng.gen(); + + tracing::info!("test that inbound handshake rejects mismatching genesis"); + scope::run!(ctx, |ctx, s| async { + let (s0, mut s1) = noise::testonly::pipe(ctx).await; + s.spawn(async { + let mut s0 = s0; + let res = outbound(ctx, &key0, ctx.rng().gen(), &mut s0, &key1.public()).await; + assert_matches!(res, Err(Error::Stream(_))); + Ok(()) + }); + let res = inbound(ctx, &key1, rng.gen(), &mut s1).await; + assert_matches!(res, Err(Error::GenesisMismatch)); + anyhow::Ok(()) + }) + .await + .unwrap(); + + tracing::info!("test that outbound handshake rejects mismatching genesis"); + scope::run!(ctx, |ctx, s| async { + let (s0, mut s1) = noise::testonly::pipe(ctx).await; + s.spawn(async { + let mut s0 = s0; + let res = outbound(ctx, &key0, ctx.rng().gen(), &mut s0, &key1.public()).await; + assert_matches!(res, Err(Error::GenesisMismatch)); + Ok(()) + }); + let session_id = node::SessionId(s1.id().encode()); + let _ : Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size()).await.unwrap(); + frame::send_proto(ctx, &mut s1, &Handshake { + session_id: key1.sign_msg(session_id), + genesis: rng.gen(), + }).await.unwrap(); + anyhow::Ok(()) + }).await.unwrap(); +} + #[tokio::test] async fn test_invalid_signature() { abort_on_panic(); diff --git a/node/actors/network/src/consensus/tests.rs b/node/actors/network/src/consensus/tests.rs index 7b887e82..f1d34e54 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/actors/network/src/consensus/tests.rs @@ -1,11 +1,12 @@ use super::*; -use crate::{io, preface, rpc, testonly}; +use crate::{io, preface, rpc, testonly, metrics}; use rand::Rng; use tracing::Instrument as _; use zksync_concurrency::{ctx, net, scope, testonly::abort_on_panic}; use zksync_consensus_roles::validator; use zksync_consensus_storage::testonly::new_store; use zksync_consensus_utils::no_copy::NoCopy; +use assert_matches::assert_matches; #[tokio::test] async fn test_one_connection_per_validator() { @@ -63,6 +64,50 @@ async fn test_one_connection_per_validator() { .unwrap(); } + +#[tokio::test] +async fn test_genesis_mismatch() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let setup = validator::testonly::GenesisSetup::new(rng, 2); + let cfgs = testonly::new_configs(rng, &setup, /*gossip_peers=*/0); + + scope::run!(ctx, |ctx,s| async { + let mut listener = cfgs[1].server_addr.bind().context("server_addr.bind()")?; + + tracing::info!("Start one node, we will simulate the other one."); + let (store,runner) = new_store(ctx,&setup.blocks[0]).await; + s.spawn_bg(runner.run(ctx)); + let (node,runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node"))); + + tracing::info!("Populate the validator_addrs of the running node."); + node.net.gossip.validator_addrs.update(&setup.genesis.validators, &[Arc::new(setup.keys[1].sign_msg(validator::NetAddress{ + addr: cfgs[1].public_addr, + version: 0, + timestamp: ctx.now_utc(), + }))]).await.unwrap(); + + tracing::info!("Accept a connection with mismatching genesis."); + let stream = metrics::MeteredStream::listen(ctx, &mut listener).await?.context("listen()")?; + let (mut stream, endpoint) = preface::accept(ctx, stream).await.context("preface::accept()")?; + assert_eq!(endpoint, preface::Endpoint::ConsensusNet); + tracing::info!("Expect the handshake to fail"); + let res = handshake::inbound(ctx, &setup.keys[1], rng.gen(), &mut stream).await; + assert_matches!(res,Err(handshake::Error::GenesisMismatch)); + + tracing::info!("Try to connect to a node with a mismatching genesis."); + let mut stream = preface::connect(ctx, cfgs[0].public_addr, preface::Endpoint::ConsensusNet).await.context("preface::connect")?; + let res = handshake::outbound(ctx, &setup.keys[1], rng.gen(), &mut stream, &setup.keys[0].public()).await; + tracing::info!("Expect the peer to verify the mismatching Genesis and close the connection."); + assert_matches!(res,Err(handshake::Error::Stream(_))); + Ok(()) + }) + .await + .unwrap(); +} + #[tokio::test(flavor = "multi_thread")] async fn test_address_change() { abort_on_panic(); diff --git a/node/actors/network/src/gossip/handshake/mod.rs b/node/actors/network/src/gossip/handshake/mod.rs index 180d2eec..de412474 100644 --- a/node/actors/network/src/gossip/handshake/mod.rs +++ b/node/actors/network/src/gossip/handshake/mod.rs @@ -54,10 +54,10 @@ pub(super) enum Error { SessionIdMismatch, #[error("unexpected peer")] PeerMismatch, - #[error("validator signature")] + #[error(transparent)] Signature(#[from] node::InvalidSignatureError), - #[error("stream")] - Stream(#[source] anyhow::Error), + #[error(transparent)] + Stream(anyhow::Error), } pub(super) async fn outbound( diff --git a/node/actors/network/src/gossip/handshake/tests.rs b/node/actors/network/src/gossip/handshake/tests.rs index c1edd6d8..94746a0b 100644 --- a/node/actors/network/src/gossip/handshake/tests.rs +++ b/node/actors/network/src/gossip/handshake/tests.rs @@ -5,6 +5,7 @@ use std::collections::{HashMap, HashSet}; use zksync_concurrency::{ctx, io, scope, testonly::abort_on_panic}; use zksync_consensus_roles::node; use zksync_protobuf::testonly::test_encode_random; +use assert_matches::assert_matches; #[test] fn test_schema_encode_decode() { @@ -126,6 +127,51 @@ async fn test_peer_mismatch() { .unwrap(); } +#[tokio::test] +async fn test_genesis_mismatch() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + + let cfg0 = make_cfg(rng); + let cfg1 = make_cfg(rng); + + tracing::info!("test that inbound handshake rejects mismatching genesis"); + scope::run!(ctx, |ctx, s| async { + let (s0, mut s1) = noise::testonly::pipe(ctx).await; + s.spawn(async { + let mut s0 = s0; + let res = outbound(ctx, &cfg0, ctx.rng().gen(), &mut s0, &cfg1.key.public()).await; + assert_matches!(res, Err(Error::Stream(_))); + Ok(()) + }); + let res = inbound(ctx, &cfg1, rng.gen(), &mut s1).await; + assert_matches!(res, Err(Error::GenesisMismatch)); + anyhow::Ok(()) + }) + .await + .unwrap(); + + tracing::info!("test that outbound handshake rejects mismatching genesis"); + scope::run!(ctx, |ctx, s| async { + let (s0, mut s1) = noise::testonly::pipe(ctx).await; + s.spawn(async { + let mut s0 = s0; + let res = outbound(ctx, &cfg0, ctx.rng().gen(), &mut s0, &cfg1.key.public()).await; + assert_matches!(res, Err(Error::GenesisMismatch)); + Ok(()) + }); + let session_id = node::SessionId(s1.id().encode()); + let _ : Handshake = frame::recv_proto(ctx, &mut s1, Handshake::max_size()).await.unwrap(); + frame::send_proto(ctx, &mut s1, &Handshake { + session_id: cfg1.key.sign_msg(session_id), + genesis: rng.gen(), + is_static: false, + }).await.unwrap(); + anyhow::Ok(()) + }).await.unwrap(); +} + #[tokio::test] async fn test_invalid_signature() { abort_on_panic(); diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 5b29666c..a9a42f8f 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -1,5 +1,5 @@ use super::*; -use crate::{io, preface, rpc, testonly}; +use crate::{metrics, io, preface, rpc, testonly}; use pretty_assertions::assert_eq; use rand::Rng; use std::{ @@ -16,6 +16,7 @@ use zksync_concurrency::{ }; use zksync_consensus_roles::validator::{self, BlockNumber, FinalBlock}; use zksync_consensus_storage::testonly::new_store; +use assert_matches::assert_matches; #[tokio::test] async fn test_one_connection_per_node() { @@ -260,6 +261,42 @@ async fn test_validator_addrs_propagation() { .unwrap(); } +#[tokio::test] +async fn test_genesis_mismatch() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let setup = validator::testonly::GenesisSetup::new(rng, 2); + let cfgs = testonly::new_configs(rng, &setup, 1); + + scope::run!(ctx, |ctx,s| async { + let mut listener = cfgs[1].server_addr.bind().context("server_addr.bind()")?; + + tracing::info!("Start one node, we will simulate the other one."); + let (store,runner) = new_store(ctx,&setup.blocks[0]).await; + s.spawn_bg(runner.run(ctx)); + let (_node,runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone()); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node"))); + + tracing::info!("Accept a connection with mismatching genesis."); + let stream = metrics::MeteredStream::listen(ctx, &mut listener).await?.context("listen()")?; + let (mut stream, endpoint) = preface::accept(ctx, stream).await.context("preface::accept()")?; + assert_eq!(endpoint, preface::Endpoint::GossipNet); + tracing::info!("Expect the handshake to fail"); + let res = handshake::inbound(ctx, &cfgs[1].gossip, rng.gen(), &mut stream).await; + assert_matches!(res,Err(handshake::Error::GenesisMismatch)); + + tracing::info!("Try to connect to a node with a mismatching genesis."); + let mut stream = preface::connect(ctx, cfgs[0].public_addr, preface::Endpoint::GossipNet).await.context("preface::connect")?; + let res = handshake::outbound(ctx, &cfgs[1].gossip, rng.gen(), &mut stream, &cfgs[0].gossip.key.public()).await; + tracing::info!("Expect the peer to verify the mismatching Genesis and close the connection."); + assert_matches!(res,Err(handshake::Error::Stream(_))); + Ok(()) + }) + .await + .unwrap(); +} + const EXCHANGED_STATE_COUNT: usize = 5; const NETWORK_CONNECTIVITY_CASES: [(usize, usize); 5] = [(2, 1), (3, 2), (5, 3), (10, 4), (10, 7)];