From 407575c03410b815a90b26a42adb9c9ffdad8212 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 14 Feb 2024 14:38:02 +0100 Subject: [PATCH] compiles --- node/actors/bft/src/testonly/run.rs | 2 +- node/actors/bft/src/testonly/ut_harness.rs | 2 +- node/actors/executor/src/tests.rs | 74 +++--------------- node/actors/sync_blocks/src/peers/mod.rs | 15 ++-- .../sync_blocks/src/peers/tests/fakes.rs | 14 ++-- .../sync_blocks/src/tests/end_to_end.rs | 4 +- node/actors/sync_blocks/src/tests/mod.rs | 4 +- node/tools/build.rs | 2 +- node/tools/src/bin/localnet_config.rs | 9 +-- node/tools/src/config.rs | 37 ++------- node/tools/src/proto/mod.proto | 18 ++--- node/tools/src/store.rs | 76 ++++++++----------- node/tools/src/tests.rs | 7 +- 13 files changed, 82 insertions(+), 182 deletions(-) diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index f149cc11..3e96989d 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -31,7 +31,7 @@ impl Test { let mut honest = vec![]; scope::run!(ctx, |ctx, s| async { for (i, net) in nets.into_iter().enumerate() { - let (store, runner) = new_store(ctx, &setup.blocks[0]).await; + let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); if self.nodes[i] == Behavior::Honest { honest.push(store.clone()); diff --git a/node/actors/bft/src/testonly/ut_harness.rs b/node/actors/bft/src/testonly/ut_harness.rs index c52821ac..390a8d42 100644 --- a/node/actors/bft/src/testonly/ut_harness.rs +++ b/node/actors/bft/src/testonly/ut_harness.rs @@ -56,7 +56,7 @@ impl UTHarness { ) -> (UTHarness, BlockStoreRunner) { let rng = &mut ctx.rng(); let setup = validator::testonly::GenesisSetup::new(rng, num_validators); - let (block_store, runner) = new_store(ctx, &setup.blocks[0]).await; + let (block_store, runner) = new_store(ctx, &setup.genesis).await; let (send, recv) = ctx::channel::unbounded(); let cfg = Arc::new(Config { diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 1609db69..117f1b7f 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -2,7 +2,6 @@ use super::*; use zksync_consensus_network::testonly::{new_configs,new_fullnode}; -use test_casing::test_casing; use zksync_concurrency::{ testonly::{abort_on_panic, set_timeout}, time, @@ -46,7 +45,7 @@ async fn executing_single_validator() { let setup = GenesisSetup::new(rng,1); let cfgs = new_configs(rng,&setup,0); scope::run!(ctx, |ctx, s| async { - let (store, runner) = new_store(ctx, &setup.blocks[0]).await; + let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); s.spawn_bg(make_executor(&cfgs[0],store.clone()).run(ctx)); store.wait_until_persisted(ctx, BlockNumber(5)).await?; @@ -66,12 +65,12 @@ async fn executing_validator_and_full_node() { let cfgs = new_configs(rng,&setup,0); scope::run!(ctx, |ctx, s| async { // Spawn validator. - let (store, runner) = new_store(ctx, &setup.blocks[0]).await; + let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); s.spawn_bg(make_executor(&cfgs[0],store).run(ctx)); // Spawn full node. - let (store, runner) = new_store(ctx, &setup.blocks[0]).await; + let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); s.spawn_bg(make_executor(&new_fullnode(rng, &cfgs[0]),store.clone()).run(ctx)); @@ -83,62 +82,6 @@ async fn executing_validator_and_full_node() { .unwrap(); } -#[test_casing(2, [false, true])] -#[tokio::test] -async fn syncing_full_node_from_snapshot(delay_block_storage: bool) { - abort_on_panic(); - let _guard = set_timeout(time::Duration::seconds(10)); - let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); - let rng = &mut ctx.rng(); - - let mut setup = GenesisSetup::new(rng,2); - setup.push_blocks(rng, 10); - let mut cfgs = new_configs(rng,&setup,1); - // Turn the nodes into non-validators - we will add the blocks to the storage manually. - for cfg in &mut cfgs { - cfg.validator_key = None; - } - - scope::run!(ctx, |ctx, s| async { - // Spawn stores. - let (store1, runner) = new_store(ctx, &setup.blocks[0]).await; - s.spawn_bg(runner.run(ctx)); - // Node2 will start from a snapshot. - let (store2, runner) = new_store(ctx, &setup.blocks[4]).await; - s.spawn_bg(runner.run(ctx)); - - if !delay_block_storage { - // Instead of running consensus on the validator, add the generated blocks manually. - for block in &setup.blocks[1..] { - store1.queue_block(ctx, block.clone()).await.unwrap(); - } - } - - // Spawn nodes. - s.spawn_bg(make_executor(&cfgs[0],store1.clone()).run(ctx)); - s.spawn_bg(make_executor(&cfgs[1],store2.clone()).run(ctx)); - - if delay_block_storage { - // Emulate the validator gradually adding new blocks to the storage. - for block in &setup.blocks[1..] { - ctx.sleep(time::Duration::milliseconds(500)).await?; - store1.queue_block(ctx, block.clone()).await?; - } - } - - store2.wait_until_persisted(ctx, BlockNumber(10)).await?; - - // Check that the node didn't receive any blocks with number lesser than the initial snapshot block. - for lesser_block_number in 0..3 { - let block = store2.block(ctx, BlockNumber(lesser_block_number)).await?; - assert!(block.is_none()); - } - anyhow::Ok(()) - }) - .await - .unwrap(); -} - /// * finalize some blocks /// * revert bunch of blocks /// * restart validators and make sure that new blocks get produced @@ -154,7 +97,7 @@ async fn test_block_revert() { let mut setup = GenesisSetup::new(rng, 2); let mut cfgs = new_configs(rng, &setup, 1); // Persistent stores for the validators. - let ps : Vec<_> = cfgs.iter().map(|_|in_memory::BlockStore::new(setup.blocks[0].clone())).collect(); + let mut ps : Vec<_> = cfgs.iter().map(|_|in_memory::BlockStore::new(setup.genesis.clone())).collect(); // Make validators produce some blocks. scope::run!(ctx, |ctx,s| async { @@ -173,15 +116,16 @@ async fn test_block_revert() { tracing::info!("Revert blocks"); let first = BlockNumber(3); - setup.genesis.forks.push(validator::Fork { + let fork = validator::Fork { number: setup.genesis.forks.current().number.next(), first_block: first, first_parent: ps[0].block(ctx,first).await.unwrap().header().parent, - }).unwrap(); + }; + setup.genesis.forks.push(fork.clone()).unwrap(); // Update configs and persistent storage. for i in 0..cfgs.len() { cfgs[i].genesis = setup.genesis.clone(); - ps[i].revert(first); + ps[i] = ps[i].fork(fork.clone()).unwrap(); } let last_block = BlockNumber(8); @@ -196,7 +140,7 @@ async fn test_block_revert() { } tracing::info!("Spawn a new node with should fetch blocks from both new and old fork"); - let (store, runner) = new_store(ctx, &setup.blocks[0]).await; + let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); s.spawn_bg(make_executor(&new_fullnode(rng,&cfgs[0]),store.clone()).run(ctx)); store.wait_until_persisted(ctx, last_block).await?; diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index 7c2fa7c0..7bc0b475 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -67,30 +67,27 @@ impl PeerStates { state: BlockStoreState, ) -> anyhow::Result<()> { use std::collections::hash_map::Entry; - - let last = state.last.header().number; - anyhow::ensure!(state.first.header().number <= state.last.header().number); - state - .last + let Some(last) = &state.last else { return Ok(()) }; + last .verify(&self.config.genesis, /*allow_past_forks=*/true) .context("state.last.verify()")?; let mut peers = self.peers.lock().unwrap(); match peers.entry(peer.clone()) { - Entry::Occupied(mut e) => e.get_mut().state = state, + Entry::Occupied(mut e) => e.get_mut().state = state.clone(), Entry::Vacant(e) => { let permits = self.config.max_concurrent_blocks_per_peer; e.insert(PeerState { - state, + state: state.clone(), get_block_semaphore: Arc::new(sync::Semaphore::new(permits)), }); } } self.highest_peer_block .send_if_modified(|highest_peer_block| { - if *highest_peer_block >= last { + if *highest_peer_block >= last.header().number { return false; } - *highest_peer_block = last; + *highest_peer_block = last.header().number; true }); Ok(()) diff --git a/node/actors/sync_blocks/src/peers/tests/fakes.rs b/node/actors/sync_blocks/src/peers/tests/fakes.rs index 289dd1e8..0be63645 100644 --- a/node/actors/sync_blocks/src/peers/tests/fakes.rs +++ b/node/actors/sync_blocks/src/peers/tests/fakes.rs @@ -9,23 +9,23 @@ use zksync_consensus_storage::testonly::new_store; async fn processing_invalid_sync_states() { let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let mut setup = GenesisSetup::empty(rng, 4); + let mut setup = GenesisSetup::new(rng, 4); setup.push_blocks(rng, 3); - let (storage, _runner) = new_store(ctx, &setup.blocks[0]).await; + let (storage, _runner) = new_store(ctx, &setup.genesis).await; let (message_sender, _) = channel::unbounded(); let peer_states = PeerStates::new(Config::new(setup.genesis.clone()), storage, message_sender); let peer = &rng.gen::().public(); let mut invalid_sync_state = sync_state(&setup, 1); - invalid_sync_state.first = setup.blocks[2].justification.clone(); + invalid_sync_state.first = setup.blocks[2].header().number; assert!(peer_states.update(peer, invalid_sync_state).is_err()); let mut invalid_sync_state = sync_state(&setup, 1); - invalid_sync_state.last.message.proposal.number = BlockNumber(5); + invalid_sync_state.last.as_mut().unwrap().message.proposal.number = BlockNumber(5); assert!(peer_states.update(peer, invalid_sync_state).is_err()); - let mut other_network = GenesisSetup::empty(rng, 4); + let mut other_network = GenesisSetup::new(rng, 4); other_network.push_blocks(rng, 2); let invalid_sync_state = sync_state(&other_network, 1); assert!(peer_states.update(peer, invalid_sync_state).is_err()); @@ -50,7 +50,7 @@ impl Test for PeerWithFakeSyncState { let rng = &mut ctx.rng(); let peer_key = rng.gen::().public(); let mut fake_sync_state = sync_state(&setup, 1); - fake_sync_state.last.message.proposal.number = BlockNumber(42); + fake_sync_state.last.as_mut().unwrap().message.proposal.number = BlockNumber(42); assert!(peer_states.update(&peer_key, fake_sync_state).is_err()); clock.advance(BLOCK_SLEEP_INTERVAL); @@ -94,7 +94,7 @@ impl Test for PeerWithFakeBlock { setup.blocks[0].clone(), // block with wrong validator set { - let mut setup = GenesisSetup::empty(rng, 4); + let mut setup = GenesisSetup::new(rng, 4); setup.push_blocks(rng, 2); setup.blocks[1].clone() }, diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index 5661e078..49bce84b 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -212,8 +212,8 @@ impl GossipNetworkTest for BasicSynchronization { for node_handle in &mut node_handles { node_handle.switch_on(); let state = node_handle.store.subscribe().borrow().clone(); - assert_eq!(state.first.header().number, BlockNumber(0)); - assert_eq!(state.last.header().number, BlockNumber(0)); + assert_eq!(state.first, BlockNumber(0)); + assert_eq!(state.last.as_ref().unwrap().header().number, BlockNumber(0)); } for block_number in (1..5).map(BlockNumber) { diff --git a/node/actors/sync_blocks/src/tests/mod.rs b/node/actors/sync_blocks/src/tests/mod.rs index f42a8b2b..02987589 100644 --- a/node/actors/sync_blocks/src/tests/mod.rs +++ b/node/actors/sync_blocks/src/tests/mod.rs @@ -31,8 +31,8 @@ pub(crate) fn snapshot_sync_state( ) -> BlockStoreState { assert!(!range.is_empty()); BlockStoreState { - first: setup.blocks[*range.start()].justification.clone(), - last: setup.blocks[*range.end()].justification.clone(), + first: setup.blocks[*range.start()].header().number, + last: Some(setup.blocks[*range.end()].justification.clone()), } } diff --git a/node/tools/build.rs b/node/tools/build.rs index e4bba2bd..f4cfa5df 100644 --- a/node/tools/build.rs +++ b/node/tools/build.rs @@ -3,7 +3,7 @@ fn main() { zksync_protobuf_build::Config { input_root: "src/proto".into(), proto_root: "zksync/tools".into(), - dependencies: vec![], + dependencies: vec!["::zksync_consensus_roles::proto".parse().unwrap()], protobuf_crate: "::zksync_protobuf".parse().unwrap(), is_public: false, } diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index 984dfe22..303a6b62 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -63,11 +63,7 @@ fn main() -> anyhow::Result<()> { // Generate the keys for all the replicas. let rng = &mut rand::thread_rng(); - let mut setup = validator::GenesisSetup::empty(rng, addrs.len()); - setup - .next_block() - .payload(validator::Payload(vec![])) - .push(); + let setup = validator::GenesisSetup::new(rng, addrs.len()); let validator_keys = setup.keys.clone(); let node_keys: Vec = (0..addrs.len()).map(|_| rng.gen()).collect(); @@ -81,8 +77,7 @@ fn main() -> anyhow::Result<()> { public_addr: addrs[i], metrics_server_addr, - validators: setup.genesis.validators.clone(), - genesis_block: setup.blocks[0].clone(), + genesis: setup.genesis.clone(), max_payload_size: args.payload_size, gossip_dynamic_inbound_limit: 0, diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 08950076..77524a44 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -11,8 +11,8 @@ use zksync_consensus_bft as bft; use zksync_consensus_crypto::{read_optional_text, read_required_text, Text, TextFmt}; use zksync_consensus_executor as executor; use zksync_consensus_roles::{node, validator}; -use zksync_consensus_storage::{BlockStore, BlockStoreRunner, PersistentBlockStore}; -use zksync_protobuf::{required, serde::Serde, ProtoFmt}; +use zksync_consensus_storage::{BlockStore, BlockStoreRunner}; +use zksync_protobuf::{required, read_required, serde::Serde, ProtoFmt}; /// Decodes a proto message from json for arbitrary ProtoFmt. pub fn decode_json(json: &str) -> anyhow::Result { @@ -54,8 +54,7 @@ pub struct AppConfig { pub public_addr: std::net::SocketAddr, pub metrics_server_addr: Option, - pub validators: validator::ValidatorSet, - pub genesis_block: validator::FinalBlock, + pub genesis: validator::Genesis, pub max_payload_size: usize, pub gossip_dynamic_inbound_limit: usize, @@ -67,14 +66,6 @@ impl ProtoFmt for AppConfig { type Proto = proto::AppConfig; fn read(r: &Self::Proto) -> anyhow::Result { - let validators = r.validators.iter().enumerate().map(|(i, v)| { - Text::new(v) - .decode() - .with_context(|| format!("validators[{i}]")) - }); - let validators: anyhow::Result> = validators.collect(); - let validators = validator::ValidatorSet::new(validators?).context("validators")?; - let mut gossip_static_inbound = HashSet::new(); for (i, v) in r.gossip_static_inbound.iter().enumerate() { gossip_static_inbound.insert( @@ -95,9 +86,8 @@ impl ProtoFmt for AppConfig { public_addr: read_required_text(&r.public_addr).context("public_addr")?, metrics_server_addr: read_optional_text(&r.metrics_server_addr) .context("metrics_server_addr")?, - - validators, - genesis_block: read_required_text(&r.genesis_block).context("genesis_block")?, + + genesis: read_required(&r.genesis).context("genesis")?, max_payload_size: required(&r.max_payload_size) .and_then(|x| Ok((*x).try_into()?)) .context("max_payload_size")?, @@ -116,8 +106,7 @@ impl ProtoFmt for AppConfig { public_addr: Some(self.public_addr.encode()), metrics_server_addr: self.metrics_server_addr.as_ref().map(TextFmt::encode), - validators: self.validators.iter().map(TextFmt::encode).collect(), - genesis_block: Some(self.genesis_block.encode()), + genesis: Some(self.genesis.build()), max_payload_size: Some(self.max_payload_size.try_into().unwrap()), gossip_dynamic_inbound_limit: Some( @@ -199,23 +188,13 @@ impl Configs { &self, ctx: &ctx::Ctx, ) -> ctx::Result<(executor::Executor, BlockStoreRunner)> { - let store = store::RocksDB::open(&self.database).await?; - // Store genesis if db is empty. - if store.is_empty().await? { - store - .store_next_block(ctx, &self.app.genesis_block) - .await - .context("store_next_block()")?; - } + let store = store::RocksDB::open(self.app.genesis.clone(), &self.database).await?; let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())).await?; let e = executor::Executor { config: executor::Config { server_addr: self.app.server_addr, public_addr: self.app.public_addr, - genesis: validator::Genesis { - forks: validator::ForkSet::new(vec![validator::Fork::default()]).unwrap(), - validators: self.app.validators.clone(), - }, + genesis: self.app.genesis.clone(), node_key: self.node_key.clone(), gossip_dynamic_inbound_limit: self.app.gossip_dynamic_inbound_limit, gossip_static_inbound: self.app.gossip_static_inbound.clone(), diff --git a/node/tools/src/proto/mod.proto b/node/tools/src/proto/mod.proto index 3cac756d..47badb76 100644 --- a/node/tools/src/proto/mod.proto +++ b/node/tools/src/proto/mod.proto @@ -38,6 +38,8 @@ syntax = "proto3"; package zksync.tools; +import "zksync/roles/validator.proto"; + // (public key, ip address) of a gossip network node. message NodeAddr { optional string key = 1; // [required] NodePublicKey @@ -62,24 +64,20 @@ message AppConfig { // Consensus - // Public keys of all validators. - repeated string validators = 5; // [required] ValidatorPublicKey - - // Genesis block of the blockchain. - // Will be inserted to storage if not already present. - optional string genesis_block = 6; // [required] FinalBlock + // Specification of the chain. + optional roles.validator.Genesis genesis = 4; // required // Maximal size of the block payload. - optional uint64 max_payload_size = 11; // [required] B + optional uint64 max_payload_size = 5; // [required] B // Gossip network // Limit on the number of gossip network inbound connections outside // of the `gossip_static_inbound` set. - optional uint64 gossip_dynamic_inbound_limit = 8; // [required] + optional uint64 gossip_dynamic_inbound_limit = 6; // [required] // Inbound connections that should be unconditionally accepted on the gossip network. - repeated string gossip_static_inbound = 9; // NodePublicKey + repeated string gossip_static_inbound = 7; // NodePublicKey // Outbound gossip network connections that the node should actively try to // establish and maintain. - repeated NodeAddr gossip_static_outbound = 10; + repeated NodeAddr gossip_static_outbound = 8; } diff --git a/node/tools/src/store.rs b/node/tools/src/store.rs index 014be674..f71a1df2 100644 --- a/node/tools/src/store.rs +++ b/node/tools/src/store.rs @@ -8,7 +8,7 @@ use std::{ }; use zksync_concurrency::{ctx, error::Wrap as _, scope}; use zksync_consensus_roles::validator; -use zksync_consensus_storage::{BlockStoreState, PersistentBlockStore, ReplicaState, ReplicaStore}; +use zksync_consensus_storage::{PersistentBlockStore, ReplicaState, ReplicaStore}; /// Enum used to represent a key in the database. It also acts as a separator between different stores. #[derive(Debug, Clone, PartialEq, Eq)] @@ -47,62 +47,46 @@ impl DatabaseKey { } } +struct Inner { + genesis: validator::Genesis, + db: RwLock, +} + /// Main struct for the Storage module, it just contains the database. Provides a set of high-level /// atomic operations on the database. It "contains" the following data: /// /// - An append-only database of finalized blocks. /// - A backup of the consensus replica state. #[derive(Clone)] -pub(crate) struct RocksDB(Arc>); +pub(crate) struct RocksDB(Arc); impl RocksDB { /// Create a new Storage. It first tries to open an existing database, and if that fails it just creates a /// a new one. We need the genesis block of the chain as input. - pub(crate) async fn open(path: &Path) -> ctx::Result { + pub(crate) async fn open(genesis: validator::Genesis, path: &Path) -> ctx::Result { let mut options = rocksdb::Options::default(); options.create_missing_column_families(true); options.create_if_missing(true); - Ok(Self(Arc::new(RwLock::new( - scope::wait_blocking(|| { - rocksdb::DB::open(&options, path).context("Failed opening RocksDB") - }) - .await?, - )))) + Ok(Self(Arc::new(Inner { + genesis, + db: RwLock::new( + scope::wait_blocking(|| { + rocksdb::DB::open(&options, path).context("Failed opening RocksDB") + }) + .await?, + ), + }))) } - fn state_blocking(&self) -> anyhow::Result> { - let db = self.0.read().unwrap(); - - let mut options = ReadOptions::default(); - options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); - let Some(res) = db.iterator_opt(IteratorMode::Start, options).next() else { - return Ok(None); - }; - let (_, first) = res.context("RocksDB error reading first stored block")?; - let first: validator::FinalBlock = - zksync_protobuf::decode(&first).context("Failed decoding first stored block bytes")?; - + fn last_blocking(&self) -> anyhow::Result> { + let db = self.0.db.read().unwrap(); let mut options = ReadOptions::default(); options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..); - let (_, last) = db - .iterator_opt(DatabaseKey::BLOCK_HEAD_ITERATOR, options) - .next() - .context("last block not found")? - .context("RocksDB error reading head block")?; + let Some(res) = db.iterator_opt(DatabaseKey::BLOCK_HEAD_ITERATOR, options).next() else { return Ok(None) }; + let (_, last) = res.context("RocksDB error reading head block")?; let last: validator::FinalBlock = zksync_protobuf::decode(&last).context("Failed decoding head block bytes")?; - - Ok(Some(BlockStoreState { - first: first.justification, - last: last.justification, - })) - } - - /// Checks if BlockStore is empty. - pub(crate) async fn is_empty(&self) -> anyhow::Result { - Ok(scope::wait_blocking(|| self.state_blocking()) - .await? - .is_none()) + Ok(Some(last.justification)) } } @@ -114,10 +98,12 @@ impl fmt::Debug for RocksDB { #[async_trait::async_trait] impl PersistentBlockStore for RocksDB { - async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { - Ok(scope::wait_blocking(|| self.state_blocking()) - .await? - .context("storage is empty")?) + async fn genesis(&self, _ctx: &ctx::Ctx) -> ctx::Result { + Ok(self.0.genesis.clone()) + } + + async fn last(&self, _ctx: &ctx::Ctx) -> ctx::Result> { + Ok(scope::wait_blocking(|| self.last_blocking()).await?) } async fn block( @@ -126,7 +112,7 @@ impl PersistentBlockStore for RocksDB { number: validator::BlockNumber, ) -> ctx::Result { scope::wait_blocking(|| { - let db = self.0.read().unwrap(); + let db = self.0.db.read().unwrap(); let block = db .get(DatabaseKey::Block(number).encode_key()) .context("RocksDB error")? @@ -144,7 +130,7 @@ impl PersistentBlockStore for RocksDB { block: &validator::FinalBlock, ) -> ctx::Result<()> { scope::wait_blocking(|| { - let db = self.0.write().unwrap(); + let db = self.0.db.write().unwrap(); let block_number = block.header().number; let mut write_batch = rocksdb::WriteBatch::default(); write_batch.put( @@ -167,6 +153,7 @@ impl ReplicaStore for RocksDB { Ok(scope::wait_blocking(|| { let Some(raw_state) = self .0 + .db .read() .unwrap() .get(DatabaseKey::ReplicaState.encode_key()) @@ -182,6 +169,7 @@ impl ReplicaStore for RocksDB { async fn set_state(&self, _ctx: &ctx::Ctx, state: &ReplicaState) -> ctx::Result<()> { Ok(scope::wait_blocking(|| { self.0 + .db .write() .unwrap() .put( diff --git a/node/tools/src/tests.rs b/node/tools/src/tests.rs index 581f1ca2..f6b0a0f9 100644 --- a/node/tools/src/tests.rs +++ b/node/tools/src/tests.rs @@ -20,8 +20,7 @@ impl Distribution for Standard { public_addr: make_addr(rng), metrics_server_addr: Some(make_addr(rng)), - validators: rng.gen(), - genesis_block: rng.gen(), + genesis: rng.gen(), gossip_dynamic_inbound_limit: rng.gen(), gossip_static_inbound: (0..5) @@ -47,11 +46,11 @@ async fn test_reopen_rocksdb() { let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); let dir = TempDir::new().unwrap(); - let mut setup = GenesisSetup::empty(rng, 3); + let mut setup = GenesisSetup::new(rng, 3); setup.push_blocks(rng, 5); let mut want = vec![]; for b in &setup.blocks { - let store = store::RocksDB::open(dir.path()).await.unwrap(); + let store = store::RocksDB::open(setup.genesis.clone(),dir.path()).await.unwrap(); store.store_next_block(ctx, b).await.unwrap(); want.push(b.clone()); assert_eq!(want, testonly::dump(ctx, &store).await);