Skip to content

Commit

Permalink
made max_payload_size configurable (#60)
Browse files Browse the repository at this point in the history
The max_payload_size is checked in bft replica and also implies
buffering limits in the network actor (both gossip and consensus
networks). It turned out that hardcoding it was too optimistic strategy,
so I've extracted it to a config, so that it is settable in runtime.
Given the lesson learned, I'll be also deprecating the
`ProtoFmt::max_size()` function soon.
Additional changes:
* added some logging useful for debugging the zksync-era tests and
improved existing logging to always show error the cause, not just the
leaf message.
* removed `tracing::instrument` annotations which were logging
non-critical/not-a-bug errors on level ERROR
* made network crate depend on the storage crate to avoid the workaround
way of implementing the get_block rpc
* cleaned up the implementations of the rpcs to be more consistent
* deduplicated the testonly helpers and moved them to the crates that
they belong to
  • Loading branch information
pompon0 committed Jan 24, 2024
1 parent 57f037d commit 5b3d383
Show file tree
Hide file tree
Showing 75 changed files with 1,691 additions and 2,019 deletions.
2 changes: 2 additions & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions node/actors/bft/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub struct Config {
pub secret_key: validator::SecretKey,
/// A vector of public keys for all the validators in the network.
pub validator_set: validator::ValidatorSet,
/// The maximum size of the payload of a block, in bytes. We will
/// reject blocks with payloads larger than this.
pub max_payload_size: usize,
/// Block store.
pub block_store: Arc<storage::BlockStore>,
/// Replica store.
Expand All @@ -21,10 +24,6 @@ pub struct Config {
}

impl Config {
/// The maximum size of the payload of a block, in bytes. We will
/// reject blocks with payloads larger than this.
pub(crate) const PAYLOAD_MAX_SIZE: usize = 500 * zksync_protobuf::kB;

/// Computes the validator for the given view.
#[instrument(level = "trace", ret)]
pub fn view_leader(&self, view_number: validator::ViewNumber) -> validator::PublicKey {
Expand Down
8 changes: 8 additions & 0 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ impl StateMachine {
.payload_manager
.propose(ctx, highest_qc.header().number.next())
.await?;
if payload.0.len() > cfg.max_payload_size {
return Err(anyhow::format_err!(
"proposed payload too large: got {}B, max {}B",
payload.0.len(),
cfg.max_payload_size
)
.into());
}
metrics::METRICS
.leader_proposal_payload_size
.observe(payload.0.len());
Expand Down
9 changes: 5 additions & 4 deletions node/actors/bft/src/replica/block.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::StateMachine;
use tracing::{info, instrument};
use tracing::instrument;
use zksync_concurrency::ctx;
use zksync_consensus_roles::validator;

Expand Down Expand Up @@ -30,9 +30,10 @@ impl StateMachine {
justification: commit_qc.clone(),
};

info!(
"Finalized a block!\nFinal block: {:#?}",
block.header().hash()
tracing::info!(
"Finalized block {}: {:#?}",
block.header().number,
block.header().hash(),
);
self.config
.block_store
Expand Down
3 changes: 1 addition & 2 deletions node/actors/bft/src/replica/leader_prepare.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use super::StateMachine;
use crate::Config;
use std::collections::HashMap;
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap};
Expand Down Expand Up @@ -229,7 +228,7 @@ impl StateMachine {
// The leader proposed a new block.
Some(payload) => {
// Check that the payload doesn't exceed the maximum size.
if payload.0.len() > Config::PAYLOAD_MAX_SIZE {
if payload.0.len() > self.config.max_payload_size {
return Err(Error::ProposalOversizedPayload {
payload_size: payload.0.len(),
header: message.proposal,
Expand Down
7 changes: 5 additions & 2 deletions node/actors/bft/src/replica/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use super::{leader_commit, leader_prepare};
use crate::{testonly, testonly::ut_harness::UTHarness, Config};
use crate::{
testonly,
testonly::ut_harness::{UTHarness, MAX_PAYLOAD_SIZE},
};
use assert_matches::assert_matches;
use rand::Rng;
use zksync_concurrency::{ctx, scope};
Expand Down Expand Up @@ -271,7 +274,7 @@ async fn leader_prepare_proposal_oversized_payload() {
let (mut util, runner) = UTHarness::new(ctx, 1).await;
s.spawn_bg(runner.run(ctx));

let payload_oversize = Config::PAYLOAD_MAX_SIZE + 1;
let payload_oversize = MAX_PAYLOAD_SIZE + 1;
let payload_vec = vec![0; payload_oversize];
let mut leader_prepare = util.new_leader_prepare(ctx).await.msg;
leader_prepare.proposal_payload = Some(Payload(payload_vec));
Expand Down
5 changes: 3 additions & 2 deletions node/actors/bft/src/testonly/fuzz.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::testonly::node::MAX_PAYLOAD_SIZE;
use rand::{seq::SliceRandom, Rng};
use zksync_consensus_roles::validator;

Expand Down Expand Up @@ -155,10 +156,10 @@ impl Fuzz for validator::Signers {
impl Fuzz for validator::Payload {
fn mutate(&mut self, rng: &mut impl Rng) {
// Push bytes into the payload until it exceeds the limit.
let num_bytes = crate::Config::PAYLOAD_MAX_SIZE + 1 - self.0.len();
let num_bytes = MAX_PAYLOAD_SIZE - self.0.len() + 1;
let bytes: Vec<u8> = (0..num_bytes).map(|_| rng.gen()).collect();
self.0.extend_from_slice(&bytes);
assert!(self.0.len() > crate::Config::PAYLOAD_MAX_SIZE);
assert!(self.0.len() > MAX_PAYLOAD_SIZE);
}
}

Expand Down
34 changes: 4 additions & 30 deletions node/actors/bft/src/testonly/make.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! This module contains utilities that are only meant for testing purposes.
use crate::{Config, PayloadManager};
use crate::PayloadManager;
use rand::Rng as _;
use zksync_concurrency::ctx;
use zksync_consensus_roles::validator;

/// Produces random payload.
/// Produces random payload of a given size.
#[derive(Debug)]
pub struct RandomPayload;
pub struct RandomPayload(pub usize);

#[async_trait::async_trait]
impl PayloadManager for RandomPayload {
Expand All @@ -15,7 +15,7 @@ impl PayloadManager for RandomPayload {
ctx: &ctx::Ctx,
_number: validator::BlockNumber,
) -> ctx::Result<validator::Payload> {
let mut payload = validator::Payload(vec![0; Config::PAYLOAD_MAX_SIZE]);
let mut payload = validator::Payload(vec![0; self.0]);
ctx.rng().fill(&mut payload.0[..]);
Ok(payload)
}
Expand Down Expand Up @@ -77,29 +77,3 @@ impl PayloadManager for RejectPayload {
Err(anyhow::anyhow!("invalid payload").into())
}
}

/// Creates a genesis block with the given payload
/// and a validator set for the chain.
pub fn make_genesis(
keys: &[validator::SecretKey],
payload: validator::Payload,
block_number: validator::BlockNumber,
) -> (validator::FinalBlock, validator::ValidatorSet) {
let header = validator::BlockHeader::genesis(payload.hash(), block_number);
let validator_set = validator::ValidatorSet::new(keys.iter().map(|k| k.public())).unwrap();
let signed_messages: Vec<_> = keys
.iter()
.map(|sk| {
sk.sign_msg(validator::ReplicaCommit {
protocol_version: validator::ProtocolVersion::EARLIEST,
view: validator::ViewNumber(0),
proposal: header,
})
})
.collect();
let final_block = validator::FinalBlock {
payload,
justification: validator::CommitQC::from(&signed_messages, &validator_set).unwrap(),
};
(final_block, validator_set)
}
38 changes: 30 additions & 8 deletions node/actors/bft/src/testonly/node.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use super::Fuzz;
use crate::{io, testonly, PayloadManager};
use anyhow::Context as _;
use rand::Rng;
use std::sync::Arc;
use zksync_concurrency::{ctx, scope};
use zksync_consensus_network as network;
use zksync_consensus_network::io::ConsensusInputMessage;
use zksync_consensus_storage as storage;
use zksync_consensus_utils::pipe::DispatcherPipe;
use zksync_consensus_storage::testonly::in_memory;
use zksync_consensus_utils::pipe;

pub(crate) const MAX_PAYLOAD_SIZE: usize = 1000;

/// Enum representing the behavior of the node.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
Expand All @@ -29,32 +33,50 @@ impl Behavior {
pub(crate) fn payload_manager(&self) -> Box<dyn PayloadManager> {
match self {
Self::HonestNotProposing => Box::new(testonly::PendingPayload),
_ => Box::new(testonly::RandomPayload),
_ => Box::new(testonly::RandomPayload(MAX_PAYLOAD_SIZE)),
}
}
}

/// Struct representing a node.
pub(super) struct Node {
pub(crate) net: network::testonly::Instance,
pub(crate) net: network::Config,
pub(crate) behavior: Behavior,
pub(crate) block_store: Arc<storage::BlockStore>,
}

impl Node {
/// Runs a mock executor.
pub(crate) async fn run_executor(
pub(crate) async fn run(
&self,
ctx: &ctx::Ctx,
consensus_pipe: DispatcherPipe<io::InputMessage, io::OutputMessage>,
network_pipe: DispatcherPipe<network::io::InputMessage, network::io::OutputMessage>,
network_pipe: &mut pipe::DispatcherPipe<
network::io::InputMessage,
network::io::OutputMessage,
>,
) -> anyhow::Result<()> {
let rng = &mut ctx.rng();
let mut net_recv = network_pipe.recv;
let net_send = network_pipe.send;
let net_recv = &mut network_pipe.recv;
let net_send = &mut network_pipe.send;
let (consensus_actor_pipe, consensus_pipe) = pipe::new();
let mut con_recv = consensus_pipe.recv;
let con_send = consensus_pipe.send;
scope::run!(ctx, |ctx, s| async {
s.spawn(async {
let validator_key = self.net.consensus.as_ref().unwrap().key.clone();
let validator_set = self.net.validators.clone();
crate::Config {
secret_key: validator_key.clone(),
validator_set,
block_store: self.block_store.clone(),
replica_store: Box::new(in_memory::ReplicaStore::default()),
payload_manager: self.behavior.payload_manager(),
max_payload_size: MAX_PAYLOAD_SIZE,
}
.run(ctx, consensus_actor_pipe)
.await
.context("consensus.run()")
});
s.spawn(async {
while let Ok(network_message) = net_recv.recv(ctx).await {
match network_message {
Expand Down
Loading

0 comments on commit 5b3d383

Please sign in to comment.