Skip to content

Commit

Permalink
fixed loadtest
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Dec 24, 2023
1 parent 2b5808c commit 93ab685
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 88 deletions.
2 changes: 1 addition & 1 deletion node/actors/bft/src/replica/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ impl StateMachine {
/// Tries to build a finalized block from the given CommitQC. We simply search our
/// block proposal cache for the matching block, and if we find it we build the block.
/// If this method succeeds, it sends the finalized block to the executor.
#[instrument(level = "trace", ret)]
#[instrument(level = "debug", skip(self), ret)]
pub(crate) async fn save_block(
&mut self,
ctx: &ctx::Ctx,
Expand Down
18 changes: 10 additions & 8 deletions node/libs/storage/src/block_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Defines storage layer for finalized blocks.
use anyhow::Context as _;
use std::collections::BTreeMap;
use std::fmt;
use zksync_concurrency::{ctx, sync};
Expand All @@ -25,17 +26,17 @@ impl BlockStoreState {
/// Implementations **must** propagate context cancellation using [`StorageError::Canceled`].
#[async_trait::async_trait]
pub trait PersistentBlockStore: fmt::Debug + Send + Sync {
/// Range of blocks avaliable in storage.
/// Range of blocks avaliable in storage. None iff storage is empty.
/// Consensus code calls this method only once and then tracks the
/// range of avaliable blocks internally.
async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result<BlockStoreState>;
async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<BlockStoreState>>;

/// Gets a block by its number. Should return an error if block is missing.
/// Gets a block by its number. Returns None if block is missing.
async fn block(
&self,
ctx: &ctx::Ctx,
number: validator::BlockNumber,
) -> ctx::Result<validator::FinalBlock>;
) -> ctx::Result<Option<validator::FinalBlock>>;

/// Persistently store a block.
/// Implementations are only required to accept a block directly after the current last block,
Expand Down Expand Up @@ -73,9 +74,9 @@ impl BlockStore {
if cache_capacity < 1 {
return Err(anyhow::anyhow!("cache_capacity has to be >=1").into());
}
let state = persistent.state(ctx).await?;
if state.first > state.last {
return Err(anyhow::anyhow!("at least 1 block has to be available in storage").into());
let state = persistent.state(ctx).await?.context("storage empty, expected at least 1 block")?;
if state.first.header().number > state.last.header().number {
return Err(anyhow::anyhow!("invalid state").into());
}
Ok(Self {
persistent,
Expand Down Expand Up @@ -103,7 +104,7 @@ impl BlockStore {
return Ok(Some(block.clone()));
}
}
Ok(Some(self.persistent.block(ctx, number).await?))
Ok(Some(self.persistent.block(ctx, number).await?.context("block disappeared from storage")?))
}

pub async fn last_block(&self, ctx: &ctx::Ctx) -> ctx::Result<validator::FinalBlock> {
Expand Down Expand Up @@ -139,6 +140,7 @@ impl BlockStore {
Ok(())
}

#[tracing::instrument(level = "debug", skip(self))]
pub async fn store_block(
&self,
ctx: &ctx::Ctx,
Expand Down
46 changes: 23 additions & 23 deletions node/libs/storage/src/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use anyhow::Context as _;
use rocksdb::{Direction, IteratorMode, ReadOptions};
use std::sync::Arc;
use std::{fmt, path::Path, sync::RwLock};
use zksync_concurrency::{ctx, scope};
use zksync_concurrency::{ctx, scope, error::Wrap as _};
use zksync_consensus_roles::validator;

/// Enum used to represent a key in the database. It also acts as a separator between different stores.
Expand Down Expand Up @@ -69,32 +69,33 @@ impl Store {
)))
}

fn state_blocking(&self) -> anyhow::Result<BlockStoreState> {
fn state_blocking(&self) -> anyhow::Result<Option<BlockStoreState>> {
let db = self.0.read().unwrap();

let mut options = ReadOptions::default();
options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..);
let Some(res) = db
.iterator_opt(IteratorMode::Start, options)
.next()
else { return Ok(None) };
let (_,first) = res.context("RocksDB error reading first stored block")?;
let first: validator::FinalBlock =
zksync_protobuf::decode(&first).context("Failed decoding first stored block bytes")?;

let mut options = ReadOptions::default();
options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..);
let (_, last) = db
.iterator_opt(DatabaseKey::BLOCK_HEAD_ITERATOR, options)
.next()
.context("Head block not found")?
.context("last block not found")?
.context("RocksDB error reading head block")?;
let last: validator::FinalBlock =
zksync_protobuf::decode(&last).context("Failed decoding head block bytes")?;

let mut options = ReadOptions::default();
options.set_iterate_range(DatabaseKey::BLOCKS_START_KEY..);
let (_, first) = db
.iterator_opt(IteratorMode::Start, options)
.next()
.context("First stored block not found")?
.context("RocksDB error reading first stored block")?;
let first: validator::FinalBlock =
zksync_protobuf::decode(&first).context("Failed decoding first stored block bytes")?;
Ok(BlockStoreState {
Ok(Some(BlockStoreState {
first: first.justification,
last: last.justification,
})
}))
}
}

Expand All @@ -106,27 +107,26 @@ impl fmt::Debug for Store {

#[async_trait::async_trait]
impl PersistentBlockStore for Arc<Store> {
async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result<BlockStoreState> {
async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result<Option<BlockStoreState>> {
Ok(scope::wait_blocking(|| self.state_blocking()).await?)
}

async fn block(
&self,
_ctx: &ctx::Ctx,
number: validator::BlockNumber,
) -> ctx::Result<validator::FinalBlock> {
Ok(scope::wait_blocking(|| {
) -> ctx::Result<Option<validator::FinalBlock>> {
scope::wait_blocking(|| {
let db = self.0.read().unwrap();
let block = db
let Some(block) = db
.get(DatabaseKey::Block(number).encode_key())
.context("RocksDB error")?
.context("not found")?;
zksync_protobuf::decode(&block).context("failed decoding block")
})
.await
.context(number)?)
else { return Ok(None) };
Ok(Some(zksync_protobuf::decode(&block).context("failed decoding block")?))
}).await.wrap(number)
}

#[tracing::instrument(level = "debug", skip(self))]
async fn store_next_block(
&self,
_ctx: &ctx::Ctx,
Expand Down
12 changes: 5 additions & 7 deletions node/libs/storage/src/testonly/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! In-memory storage implementation.
use crate::{BlockStoreState, PersistentBlockStore, ReplicaState};
use anyhow::Context as _;
use std::collections::BTreeMap;
use std::sync::Mutex;
use zksync_concurrency::ctx;
Expand All @@ -23,26 +22,25 @@ impl BlockStore {

#[async_trait::async_trait]
impl PersistentBlockStore for BlockStore {
async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result<BlockStoreState> {
async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result<Option<BlockStoreState>> {
let blocks = self.0.lock().unwrap();
Ok(BlockStoreState {
Ok(Some(BlockStoreState {
first: blocks.first_key_value().unwrap().1.justification.clone(),
last: blocks.last_key_value().unwrap().1.justification.clone(),
})
}))
}

async fn block(
&self,
_ctx: &ctx::Ctx,
number: validator::BlockNumber,
) -> ctx::Result<validator::FinalBlock> {
) -> ctx::Result<Option<validator::FinalBlock>> {
Ok(self
.0
.lock()
.unwrap()
.get(&number)
.context("not found")?
.clone())
.cloned())
}

async fn store_next_block(
Expand Down
6 changes: 3 additions & 3 deletions node/libs/storage/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ fn make_block(rng: &mut impl Rng, parent: &validator::BlockHeader) -> validator:
}

async fn dump(ctx: &ctx::Ctx, store: &dyn PersistentBlockStore) -> Vec<validator::FinalBlock> {
let Some(range) = store.state(ctx).await.unwrap() else { return vec![] };
let mut blocks = vec![];
let range = store.state(ctx).await.unwrap();
for n in range.first.header().number.0..range.next().0 {
let n = validator::BlockNumber(n);
let block = store.block(ctx, n).await.unwrap();
let block = store.block(ctx, n).await.unwrap().unwrap();
assert_eq!(block.header().number, n);
blocks.push(block);
}
assert!(store.block(ctx, range.next()).await.is_err());
assert!(store.block(ctx, range.next()).await.unwrap().is_none());
blocks
}

Expand Down
2 changes: 0 additions & 2 deletions node/tools/src/bin/localnet_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,9 @@ fn main() -> anyhow::Result<()> {
public_addr: addrs[i],
metrics_server_addr,

validator_key: Some(validator_keys[i].public()),
validators: validator_set.clone(),
genesis_block: genesis.clone(),

node_key: node_keys[i].public(),
gossip_dynamic_inbound_limit: 0,
gossip_static_inbound: [].into(),
gossip_static_outbound: [].into(),
Expand Down
44 changes: 16 additions & 28 deletions node/tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ pub struct AppConfig {
pub public_addr: std::net::SocketAddr,
pub metrics_server_addr: Option<std::net::SocketAddr>,

pub validator_key: Option<validator::PublicKey>,
pub validators: validator::ValidatorSet,
pub genesis_block: validator::FinalBlock,

pub node_key: node::PublicKey,
pub gossip_dynamic_inbound_limit: u64,
pub gossip_static_inbound: HashSet<node::PublicKey>,
pub gossip_static_outbound: HashMap<node::PublicKey, std::net::SocketAddr>,
Expand Down Expand Up @@ -76,11 +74,9 @@ impl ProtoFmt for AppConfig {
metrics_server_addr: read_optional_text(&r.metrics_server_addr)
.context("metrics_server_addr")?,

validator_key: read_optional_text(&r.validator_key).context("validator_key")?,
validators,
genesis_block: read_required_text(&r.genesis_block).context("genesis_block")?,

node_key: read_required_text(&r.node_key).context("node_key")?,
gossip_dynamic_inbound_limit: *required(&r.gossip_dynamic_inbound_limit)
.context("gossip_dynamic_inbound_limit")?,
gossip_static_inbound,
Expand All @@ -94,11 +90,9 @@ impl ProtoFmt for AppConfig {
public_addr: Some(self.public_addr.encode()),
metrics_server_addr: self.metrics_server_addr.as_ref().map(TextFmt::encode),

validator_key: self.validator_key.as_ref().map(TextFmt::encode),
validators: self.validators.iter().map(TextFmt::encode).collect(),
genesis_block: Some(self.genesis_block.encode()),

node_key: Some(self.node_key.encode()),
gossip_dynamic_inbound_limit: Some(self.gossip_dynamic_inbound_limit),
gossip_static_inbound: self
.gossip_static_inbound
Expand Down Expand Up @@ -171,35 +165,29 @@ impl<'a> ConfigPaths<'a> {
}

impl Configs {
pub async fn into_executor(self, ctx: &ctx::Ctx) -> anyhow::Result<executor::Executor> {
anyhow::ensure!(
self.app.node_key == self.node_key.public(),
"node secret key has to match the node public key in the app config",
);
anyhow::ensure!(
self.app.validator_key == self.validator_key.as_ref().map(|k| k.public()),
"validator secret key has to match the validator public key in the app config",
);
let storage = Arc::new(rocksdb::Store::new(&self.database).await?);
let block_store = Arc::new(BlockStore::new(ctx,Box::new(storage.clone()),1000).await?);
// TODO: figure out how to insert iff empty.
storage.store_next_block(ctx,&self.app.genesis_block,).await.context("store_next_block")?;
pub async fn make_executor(&self, ctx: &ctx::Ctx) -> anyhow::Result<executor::Executor> {
let store = Arc::new(rocksdb::Store::new(&self.database).await?);
// Store genesis if db is empty.
if store.state(ctx).await?.is_none() {
store.store_next_block(ctx,&self.app.genesis_block).await.context("store_next_block()")?;
}
let block_store = Arc::new(BlockStore::new(ctx,Box::new(store.clone()),1000).await?);
Ok(executor::Executor {
config: executor::Config {
server_addr: self.app.server_addr,
validators: self.app.validators,
node_key: self.node_key,
server_addr: self.app.server_addr.clone(),
validators: self.app.validators.clone(),
node_key: self.node_key.clone(),
gossip_dynamic_inbound_limit: self.app.gossip_dynamic_inbound_limit,
gossip_static_inbound: self.app.gossip_static_inbound,
gossip_static_outbound: self.app.gossip_static_outbound,
gossip_static_inbound: self.app.gossip_static_inbound.clone(),
gossip_static_outbound: self.app.gossip_static_outbound.clone(),
},
block_store: block_store,
validator: self.validator_key.map(|key| executor::Validator {
validator: self.validator_key.as_ref().map(|key| executor::Validator {
config: executor::ValidatorConfig {
key,
public_addr: self.app.public_addr,
key: key.clone(),
public_addr: self.app.public_addr.clone(),
},
replica_store: Box::new(storage),
replica_store: Box::new(store),
payload_manager: Box::new(bft::testonly::RandomPayload),
}),
})
Expand Down
13 changes: 8 additions & 5 deletions node/tools/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> {
.with_ansi(std::env::var("NO_COLOR").is_err() && std::io::stdout().is_terminal())
.with_file(false)
.with_line_number(false)
.with_filter(LevelFilter::INFO);
.with_filter(tracing_subscriber::EnvFilter::from_default_env());

// Create the logger for the log file. This will produce machine-readable logs for
// all events of level DEBUG or higher.
Expand All @@ -84,6 +84,12 @@ async fn main() -> anyhow::Result<()> {
.load()
.context("config_paths().load()")?;

let executor = configs
.make_executor(ctx)
.await
.context("configs.into_executor()")?;
let block_store = executor.block_store.clone();

// Initialize the storage.
scope::run!(ctx, |ctx, s| async {
if let Some(addr) = configs.app.metrics_server_addr {
Expand All @@ -97,10 +103,7 @@ async fn main() -> anyhow::Result<()> {
Ok(())
});
}
let executor = configs
.into_executor(ctx)
.await
.context("configs.into_executor()")?;
s.spawn_bg(block_store.run_background_tasks(ctx));
s.spawn(executor.run(ctx));
Ok(())
})
Expand Down
8 changes: 0 additions & 8 deletions node/tools/src/proto/mod.proto
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ message AppConfig {
optional string metrics_server_addr = 3; // [optional] IpAddr

// Consensus

// Public key of this validator.
// Should be set iff this node is a validator.
// It should match the secret key provided in the `validator_key` file.
optional string validator_key = 4; // [optional] ValidatorPublicKey

// Public keys of all validators.
repeated string validators = 5; // [required] ValidatorPublicKey
Expand All @@ -76,9 +71,6 @@ message AppConfig {

// Gossip network

// Public key of this node. It uniquely identifies the node.
// It should match the secret key provided in the `node_key` file.
optional string node_key = 7; // [required] NodePublicKey
// Limit on the number of gossip network inbound connections outside
// of the `gossip_static_inbound` set.
optional uint64 gossip_dynamic_inbound_limit = 8; // [required]
Expand Down
4 changes: 1 addition & 3 deletions node/tools/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use rand::{
Rng,
};
use zksync_concurrency::ctx;
use zksync_consensus_roles::{node, validator};
use zksync_consensus_roles::{node};
use zksync_protobuf::testonly::test_encode_random;

fn make_addr<R: Rng + ?Sized>(rng: &mut R) -> std::net::SocketAddr {
Expand All @@ -18,11 +18,9 @@ impl Distribution<AppConfig> for Standard {
public_addr: make_addr(rng),
metrics_server_addr: Some(make_addr(rng)),

validator_key: Some(rng.gen::<validator::SecretKey>().public()),
validators: rng.gen(),
genesis_block: rng.gen(),

node_key: rng.gen::<node::SecretKey>().public(),
gossip_dynamic_inbound_limit: rng.gen(),
gossip_static_inbound: (0..5)
.map(|_| rng.gen::<node::SecretKey>().public())
Expand Down

0 comments on commit 93ab685

Please sign in to comment.