Skip to content

Commit

Permalink
compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Feb 14, 2024
1 parent ad5b1a5 commit 407575c
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 182 deletions.
2 changes: 1 addition & 1 deletion node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Test {
let mut honest = vec![];
scope::run!(ctx, |ctx, s| async {
for (i, net) in nets.into_iter().enumerate() {
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
if self.nodes[i] == Behavior::Honest {
honest.push(store.clone());
Expand Down
2 changes: 1 addition & 1 deletion node/actors/bft/src/testonly/ut_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl UTHarness {
) -> (UTHarness, BlockStoreRunner) {
let rng = &mut ctx.rng();
let setup = validator::testonly::GenesisSetup::new(rng, num_validators);
let (block_store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (block_store, runner) = new_store(ctx, &setup.genesis).await;
let (send, recv) = ctx::channel::unbounded();

let cfg = Arc::new(Config {
Expand Down
74 changes: 9 additions & 65 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use super::*;
use zksync_consensus_network::testonly::{new_configs,new_fullnode};
use test_casing::test_casing;
use zksync_concurrency::{
testonly::{abort_on_panic, set_timeout},
time,
Expand Down Expand Up @@ -46,7 +45,7 @@ async fn executing_single_validator() {
let setup = GenesisSetup::new(rng,1);
let cfgs = new_configs(rng,&setup,0);
scope::run!(ctx, |ctx, s| async {
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(make_executor(&cfgs[0],store.clone()).run(ctx));
store.wait_until_persisted(ctx, BlockNumber(5)).await?;
Expand All @@ -66,12 +65,12 @@ async fn executing_validator_and_full_node() {
let cfgs = new_configs(rng,&setup,0);
scope::run!(ctx, |ctx, s| async {
// Spawn validator.
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(make_executor(&cfgs[0],store).run(ctx));

// Spawn full node.
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(make_executor(&new_fullnode(rng, &cfgs[0]),store.clone()).run(ctx));

Expand All @@ -83,62 +82,6 @@ async fn executing_validator_and_full_node() {
.unwrap();
}

#[test_casing(2, [false, true])]
#[tokio::test]
async fn syncing_full_node_from_snapshot(delay_block_storage: bool) {
abort_on_panic();
let _guard = set_timeout(time::Duration::seconds(10));
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let rng = &mut ctx.rng();

let mut setup = GenesisSetup::new(rng,2);
setup.push_blocks(rng, 10);
let mut cfgs = new_configs(rng,&setup,1);
// Turn the nodes into non-validators - we will add the blocks to the storage manually.
for cfg in &mut cfgs {
cfg.validator_key = None;
}

scope::run!(ctx, |ctx, s| async {
// Spawn stores.
let (store1, runner) = new_store(ctx, &setup.blocks[0]).await;
s.spawn_bg(runner.run(ctx));
// Node2 will start from a snapshot.
let (store2, runner) = new_store(ctx, &setup.blocks[4]).await;
s.spawn_bg(runner.run(ctx));

if !delay_block_storage {
// Instead of running consensus on the validator, add the generated blocks manually.
for block in &setup.blocks[1..] {
store1.queue_block(ctx, block.clone()).await.unwrap();
}
}

// Spawn nodes.
s.spawn_bg(make_executor(&cfgs[0],store1.clone()).run(ctx));
s.spawn_bg(make_executor(&cfgs[1],store2.clone()).run(ctx));

if delay_block_storage {
// Emulate the validator gradually adding new blocks to the storage.
for block in &setup.blocks[1..] {
ctx.sleep(time::Duration::milliseconds(500)).await?;
store1.queue_block(ctx, block.clone()).await?;
}
}

store2.wait_until_persisted(ctx, BlockNumber(10)).await?;

// Check that the node didn't receive any blocks with number lesser than the initial snapshot block.
for lesser_block_number in 0..3 {
let block = store2.block(ctx, BlockNumber(lesser_block_number)).await?;
assert!(block.is_none());
}
anyhow::Ok(())
})
.await
.unwrap();
}

/// * finalize some blocks
/// * revert bunch of blocks
/// * restart validators and make sure that new blocks get produced
Expand All @@ -154,7 +97,7 @@ async fn test_block_revert() {
let mut setup = GenesisSetup::new(rng, 2);
let mut cfgs = new_configs(rng, &setup, 1);
// Persistent stores for the validators.
let ps : Vec<_> = cfgs.iter().map(|_|in_memory::BlockStore::new(setup.blocks[0].clone())).collect();
let mut ps : Vec<_> = cfgs.iter().map(|_|in_memory::BlockStore::new(setup.genesis.clone())).collect();

// Make validators produce some blocks.
scope::run!(ctx, |ctx,s| async {
Expand All @@ -173,15 +116,16 @@ async fn test_block_revert() {

tracing::info!("Revert blocks");
let first = BlockNumber(3);
setup.genesis.forks.push(validator::Fork {
let fork = validator::Fork {
number: setup.genesis.forks.current().number.next(),
first_block: first,
first_parent: ps[0].block(ctx,first).await.unwrap().header().parent,
}).unwrap();
};
setup.genesis.forks.push(fork.clone()).unwrap();
// Update configs and persistent storage.
for i in 0..cfgs.len() {
cfgs[i].genesis = setup.genesis.clone();
ps[i].revert(first);
ps[i] = ps[i].fork(fork.clone()).unwrap();
}

let last_block = BlockNumber(8);
Expand All @@ -196,7 +140,7 @@ async fn test_block_revert() {
}

tracing::info!("Spawn a new node with should fetch blocks from both new and old fork");
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(make_executor(&new_fullnode(rng,&cfgs[0]),store.clone()).run(ctx));
store.wait_until_persisted(ctx, last_block).await?;
Expand Down
15 changes: 6 additions & 9 deletions node/actors/sync_blocks/src/peers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,30 +67,27 @@ impl PeerStates {
state: BlockStoreState,
) -> anyhow::Result<()> {
use std::collections::hash_map::Entry;

let last = state.last.header().number;
anyhow::ensure!(state.first.header().number <= state.last.header().number);
state
.last
let Some(last) = &state.last else { return Ok(()) };
last
.verify(&self.config.genesis, /*allow_past_forks=*/true)
.context("state.last.verify()")?;
let mut peers = self.peers.lock().unwrap();
match peers.entry(peer.clone()) {
Entry::Occupied(mut e) => e.get_mut().state = state,
Entry::Occupied(mut e) => e.get_mut().state = state.clone(),
Entry::Vacant(e) => {
let permits = self.config.max_concurrent_blocks_per_peer;
e.insert(PeerState {
state,
state: state.clone(),
get_block_semaphore: Arc::new(sync::Semaphore::new(permits)),
});
}
}
self.highest_peer_block
.send_if_modified(|highest_peer_block| {
if *highest_peer_block >= last {
if *highest_peer_block >= last.header().number {
return false;
}
*highest_peer_block = last;
*highest_peer_block = last.header().number;
true
});
Ok(())
Expand Down
14 changes: 7 additions & 7 deletions node/actors/sync_blocks/src/peers/tests/fakes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,23 @@ use zksync_consensus_storage::testonly::new_store;
async fn processing_invalid_sync_states() {
let ctx = &ctx::test_root(&ctx::RealClock);
let rng = &mut ctx.rng();
let mut setup = GenesisSetup::empty(rng, 4);
let mut setup = GenesisSetup::new(rng, 4);
setup.push_blocks(rng, 3);
let (storage, _runner) = new_store(ctx, &setup.blocks[0]).await;
let (storage, _runner) = new_store(ctx, &setup.genesis).await;

let (message_sender, _) = channel::unbounded();
let peer_states = PeerStates::new(Config::new(setup.genesis.clone()), storage, message_sender);

let peer = &rng.gen::<node::SecretKey>().public();
let mut invalid_sync_state = sync_state(&setup, 1);
invalid_sync_state.first = setup.blocks[2].justification.clone();
invalid_sync_state.first = setup.blocks[2].header().number;
assert!(peer_states.update(peer, invalid_sync_state).is_err());

let mut invalid_sync_state = sync_state(&setup, 1);
invalid_sync_state.last.message.proposal.number = BlockNumber(5);
invalid_sync_state.last.as_mut().unwrap().message.proposal.number = BlockNumber(5);
assert!(peer_states.update(peer, invalid_sync_state).is_err());

let mut other_network = GenesisSetup::empty(rng, 4);
let mut other_network = GenesisSetup::new(rng, 4);
other_network.push_blocks(rng, 2);
let invalid_sync_state = sync_state(&other_network, 1);
assert!(peer_states.update(peer, invalid_sync_state).is_err());
Expand All @@ -50,7 +50,7 @@ impl Test for PeerWithFakeSyncState {
let rng = &mut ctx.rng();
let peer_key = rng.gen::<node::SecretKey>().public();
let mut fake_sync_state = sync_state(&setup, 1);
fake_sync_state.last.message.proposal.number = BlockNumber(42);
fake_sync_state.last.as_mut().unwrap().message.proposal.number = BlockNumber(42);
assert!(peer_states.update(&peer_key, fake_sync_state).is_err());

clock.advance(BLOCK_SLEEP_INTERVAL);
Expand Down Expand Up @@ -94,7 +94,7 @@ impl Test for PeerWithFakeBlock {
setup.blocks[0].clone(),
// block with wrong validator set
{
let mut setup = GenesisSetup::empty(rng, 4);
let mut setup = GenesisSetup::new(rng, 4);
setup.push_blocks(rng, 2);
setup.blocks[1].clone()
},
Expand Down
4 changes: 2 additions & 2 deletions node/actors/sync_blocks/src/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ impl GossipNetworkTest for BasicSynchronization {
for node_handle in &mut node_handles {
node_handle.switch_on();
let state = node_handle.store.subscribe().borrow().clone();
assert_eq!(state.first.header().number, BlockNumber(0));
assert_eq!(state.last.header().number, BlockNumber(0));
assert_eq!(state.first, BlockNumber(0));
assert_eq!(state.last.as_ref().unwrap().header().number, BlockNumber(0));
}

for block_number in (1..5).map(BlockNumber) {
Expand Down
4 changes: 2 additions & 2 deletions node/actors/sync_blocks/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pub(crate) fn snapshot_sync_state(
) -> BlockStoreState {
assert!(!range.is_empty());
BlockStoreState {
first: setup.blocks[*range.start()].justification.clone(),
last: setup.blocks[*range.end()].justification.clone(),
first: setup.blocks[*range.start()].header().number,
last: Some(setup.blocks[*range.end()].justification.clone()),
}
}

Expand Down
2 changes: 1 addition & 1 deletion node/tools/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ fn main() {
zksync_protobuf_build::Config {
input_root: "src/proto".into(),
proto_root: "zksync/tools".into(),
dependencies: vec![],
dependencies: vec!["::zksync_consensus_roles::proto".parse().unwrap()],
protobuf_crate: "::zksync_protobuf".parse().unwrap(),
is_public: false,
}
Expand Down
9 changes: 2 additions & 7 deletions node/tools/src/bin/localnet_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ fn main() -> anyhow::Result<()> {
// Generate the keys for all the replicas.
let rng = &mut rand::thread_rng();

let mut setup = validator::GenesisSetup::empty(rng, addrs.len());
setup
.next_block()
.payload(validator::Payload(vec![]))
.push();
let setup = validator::GenesisSetup::new(rng, addrs.len());
let validator_keys = setup.keys.clone();
let node_keys: Vec<node::SecretKey> = (0..addrs.len()).map(|_| rng.gen()).collect();

Expand All @@ -81,8 +77,7 @@ fn main() -> anyhow::Result<()> {
public_addr: addrs[i],
metrics_server_addr,

validators: setup.genesis.validators.clone(),
genesis_block: setup.blocks[0].clone(),
genesis: setup.genesis.clone(),
max_payload_size: args.payload_size,

gossip_dynamic_inbound_limit: 0,
Expand Down
37 changes: 8 additions & 29 deletions node/tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use zksync_consensus_bft as bft;
use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt};
use zksync_consensus_executor as executor;
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BlockStore, BlockStoreRunner, PersistentBlockStore};
use zksync_protobuf::{required, serde::Serde, ProtoFmt};
use zksync_consensus_storage::{BlockStore, BlockStoreRunner};
use zksync_protobuf::{required, read_required, serde::Serde, ProtoFmt};

/// Decodes a proto message from json for arbitrary ProtoFmt.
pub fn decode_json<T: serde::de::DeserializeOwned>(json: &str) -> anyhow::Result<T> {
Expand Down Expand Up @@ -54,8 +54,7 @@ pub struct AppConfig {
pub public_addr: std::net::SocketAddr,
pub metrics_server_addr: Option<std::net::SocketAddr>,

pub validators: validator::ValidatorSet,
pub genesis_block: validator::FinalBlock,
pub genesis: validator::Genesis,
pub max_payload_size: usize,

pub gossip_dynamic_inbound_limit: usize,
Expand All @@ -67,14 +66,6 @@ impl ProtoFmt for AppConfig {
type Proto = proto::AppConfig;

fn read(r: &Self::Proto) -> anyhow::Result<Self> {
let validators = r.validators.iter().enumerate().map(|(i, v)| {
Text::new(v)
.decode()
.with_context(|| format!("validators[{i}]"))
});
let validators: anyhow::Result<Vec<_>> = validators.collect();
let validators = validator::ValidatorSet::new(validators?).context("validators")?;

let mut gossip_static_inbound = HashSet::new();
for (i, v) in r.gossip_static_inbound.iter().enumerate() {
gossip_static_inbound.insert(
Expand All @@ -95,9 +86,8 @@ impl ProtoFmt for AppConfig {
public_addr: read_required_text(&r.public_addr).context("public_addr")?,
metrics_server_addr: read_optional_text(&r.metrics_server_addr)
.context("metrics_server_addr")?,

validators,
genesis_block: read_required_text(&r.genesis_block).context("genesis_block")?,

genesis: read_required(&r.genesis).context("genesis")?,
max_payload_size: required(&r.max_payload_size)
.and_then(|x| Ok((*x).try_into()?))
.context("max_payload_size")?,
Expand All @@ -116,8 +106,7 @@ impl ProtoFmt for AppConfig {
public_addr: Some(self.public_addr.encode()),
metrics_server_addr: self.metrics_server_addr.as_ref().map(TextFmt::encode),

validators: self.validators.iter().map(TextFmt::encode).collect(),
genesis_block: Some(self.genesis_block.encode()),
genesis: Some(self.genesis.build()),
max_payload_size: Some(self.max_payload_size.try_into().unwrap()),

gossip_dynamic_inbound_limit: Some(
Expand Down Expand Up @@ -199,23 +188,13 @@ impl Configs {
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<(executor::Executor, BlockStoreRunner)> {
let store = store::RocksDB::open(&self.database).await?;
// Store genesis if db is empty.
if store.is_empty().await? {
store
.store_next_block(ctx, &self.app.genesis_block)
.await
.context("store_next_block()")?;
}
let store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?;
let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())).await?;
let e = executor::Executor {
config: executor::Config {
server_addr: self.app.server_addr,
public_addr: self.app.public_addr,
genesis: validator::Genesis {
forks: validator::ForkSet::new(vec![validator::Fork::default()]).unwrap(),
validators: self.app.validators.clone(),
},
genesis: self.app.genesis.clone(),
node_key: self.node_key.clone(),
gossip_dynamic_inbound_limit: self.app.gossip_dynamic_inbound_limit,
gossip_static_inbound: self.app.gossip_static_inbound.clone(),
Expand Down
Loading

0 comments on commit 407575c

Please sign in to comment.