Skip to content

Commit

Permalink
support for running a node from a state snapshot (BFT-418) (#75)
Browse files Browse the repository at this point in the history
Support for a situation in which a node is started with a blockchain
state at block X > 0. Such a node doesn't want to fetch and store blocks
<X. I've adjusted syncing logic and validator logic (since validator
also might start from state snapshot).
Added tests for the new cases.

I've removed `parent` from the block struct, because the cost of
maintaining it exceeds the gains at this point (in zksync-era payload
implementation we have a commitment to the chain history anyway).
  • Loading branch information
pompon0 authored Mar 18, 2024
1 parent e53872a commit 5329a80
Show file tree
Hide file tree
Showing 32 changed files with 464 additions and 368 deletions.
7 changes: 3 additions & 4 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ impl StateMachine {
// The previous block was finalized, so we can propose a new block.
_ => {
let fork = &cfg.genesis().fork;
let (parent, number) = match high_qc {
Some(qc) => (Some(qc.header().hash()), qc.header().number.next()),
None => (fork.first_parent, fork.first_block),
let number = match high_qc {
Some(qc) => qc.header().number.next(),
None => fork.first_block,
};
// Defensively assume that PayloadManager cannot propose until the previous block is stored.
if let Some(prev) = number.prev() {
Expand All @@ -189,7 +189,6 @@ impl StateMachine {
.observe(payload.0.len());
let proposal = validator::BlockHeader {
number,
parent,
payload: payload.hash(),
};
(proposal, Some(payload))
Expand Down
7 changes: 0 additions & 7 deletions node/actors/bft/src/leader/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ async fn replica_prepare_sanity_yield_leader_prepare() {
.unwrap()
.unwrap();
assert_eq!(leader_prepare.msg.view(), &replica_prepare.view);
assert_eq!(
leader_prepare.msg.proposal.parent,
replica_prepare
.high_vote
.as_ref()
.map(|v| v.proposal.hash()),
);
assert_eq!(
leader_prepare.msg.justification,
util.new_prepare_qc(|msg| *msg = replica_prepare)
Expand Down
2 changes: 1 addition & 1 deletion node/actors/bft/src/replica/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl StateMachine {
tracing::info!(
"Finalized block {}: {:#?}",
block.header().number,
block.header().hash(),
block.header().payload,
);
self.config
.block_store
Expand Down
13 changes: 11 additions & 2 deletions node/actors/bft/src/replica/leader_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ pub(crate) enum Error {
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(#[source] validator::LeaderPrepareVerifyError),
/// Previous proposal was not finalized.

/// Leader proposed a block that was already pruned from replica's storage.
#[error("leader proposed a block that was already pruned from replica's storage")]
ProposalAlreadyPruned,
/// Oversized payload.
#[error("block proposal with an oversized payload (payload size: {payload_size})")]
ProposalOversizedPayload {
Expand Down Expand Up @@ -110,6 +111,14 @@ impl StateMachine {
});
}

// Replica MUSTN'T vote for blocks which have been already pruned for storage.
// (because it won't be able to persist and broadcast them once finalized).
// TODO(gprusak): it should never happen, we should add safety checks to prevent
// pruning blocks not known to be finalized.
if message.proposal.number < self.config.block_store.subscribe().borrow().first {
return Err(Error::ProposalAlreadyPruned);
}

// ----------- Checking the message --------------

signed_message.verify().map_err(Error::InvalidSignature)?;
Expand Down
62 changes: 30 additions & 32 deletions node/actors/bft/src/replica/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use assert_matches::assert_matches;
use rand::Rng;
use zksync_concurrency::{ctx, scope};
use zksync_consensus_roles::validator::{
self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, ViewNumber,
self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare,
};

/// Sanity check of the happy path.
Expand Down Expand Up @@ -101,10 +101,6 @@ async fn leader_prepare_invalid_leader() {
let (mut util, runner) = UTHarness::new(ctx, 2).await;
s.spawn_bg(runner.run(ctx));

let view = ViewNumber(2);
util.set_view(view);
assert_eq!(util.view_leader(view), util.keys[0].public());

let replica_prepare = util.new_replica_prepare();
assert!(util
.process_replica_prepare(ctx, util.sign(replica_prepare.clone()))
Expand Down Expand Up @@ -167,6 +163,35 @@ async fn leader_prepare_old_view() {
.unwrap();
}

#[tokio::test]
async fn leader_prepare_pruned_block() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new(ctx, 1).await;
s.spawn_bg(runner.run(ctx));

let mut leader_prepare = util.new_leader_prepare(ctx).await;
// We assume default replica state and nontrivial `genesis.fork.first_block` here.
leader_prepare.proposal.number = util
.replica
.config
.block_store
.subscribe()
.borrow()
.first
.prev()
.unwrap();
let res = util
.process_leader_prepare(ctx, util.sign(leader_prepare))
.await;
assert_matches!(res, Err(leader_prepare::Error::ProposalAlreadyPruned));
Ok(())
})
.await
.unwrap();
}

/// Tests that `WriteBlockStore::verify_payload` is applied before signing a vote.
#[tokio::test]
async fn leader_prepare_invalid_payload() {
Expand Down Expand Up @@ -338,33 +363,6 @@ async fn leader_prepare_proposal_when_previous_not_finalized() {
.unwrap();
}

#[tokio::test]
async fn leader_prepare_bad_parent_hash() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new(ctx, 1).await;
s.spawn_bg(runner.run(ctx));

tracing::info!("Produce initial block.");
util.produce_block(ctx).await;
tracing::info!("Make leader propose the next block.");
let mut leader_prepare = util.new_leader_prepare(ctx).await;
tracing::info!("Modify the proposal.parent so that it doesn't match the previous block");
leader_prepare.proposal.parent = Some(ctx.rng().gen());
let res = util.process_leader_prepare(ctx, util.sign(leader_prepare.clone())).await;
assert_matches!(res, Err(leader_prepare::Error::InvalidMessage(
validator::LeaderPrepareVerifyError::BadParentHash { got, want }
)) => {
assert_eq!(want, Some(leader_prepare.justification.high_qc().unwrap().message.proposal.hash()));
assert_eq!(got, leader_prepare.proposal.parent);
});
Ok(())
})
.await
.unwrap();
}

#[tokio::test]
async fn leader_prepare_bad_block_number() {
zksync_concurrency::testonly::abort_on_panic();
Expand Down
8 changes: 3 additions & 5 deletions node/actors/bft/src/testonly/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,9 @@ impl Fuzz for validator::Payload {

impl Fuzz for validator::BlockHeader {
fn mutate(&mut self, rng: &mut impl Rng) {
match rng.gen_range(0..3) {
0 => self.parent = rng.gen(),
1 => self.number = rng.gen(),
2 => self.payload = rng.gen(),
_ => unreachable!(),
match rng.gen_range(0..2) {
0 => self.number = rng.gen(),
_ => self.payload = rng.gen(),
}
}
}
13 changes: 0 additions & 13 deletions node/actors/bft/src/testonly/ut_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,6 @@ impl UTHarness {
self.replica.view = view;
}

pub(crate) fn set_view(&mut self, view: ViewNumber) {
self.set_replica_view(view);
self.set_leader_view(view);
}

pub(crate) fn set_leader_view(&mut self, view: ViewNumber) {
self.leader.view = view
}

pub(crate) fn set_replica_view(&mut self, view: ViewNumber) {
self.replica.view = view
}

pub(crate) fn replica_view(&self) -> validator::View {
validator::View {
protocol_version: self.protocol_version(),
Expand Down
113 changes: 93 additions & 20 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,59 @@ use zksync_consensus_bft as bft;
use zksync_consensus_network::testonly::{new_configs, new_fullnode};
use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber};
use zksync_consensus_storage::{
testonly::{in_memory, new_store},
testonly::{in_memory, new_store, new_store_with_first},
BlockStore,
};

fn make_executor(cfg: &network::Config, block_store: Arc<BlockStore>) -> Executor {
fn config(cfg: &network::Config) -> Config {
Config {
server_addr: *cfg.server_addr,
public_addr: cfg.public_addr,
max_payload_size: usize::MAX,
node_key: cfg.gossip.key.clone(),
gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit,
gossip_static_inbound: cfg.gossip.static_inbound.clone(),
gossip_static_outbound: cfg.gossip.static_outbound.clone(),
}
}

fn validator(
cfg: &network::Config,
block_store: Arc<BlockStore>,
replica_store: impl ReplicaStore,
) -> Executor {
Executor {
config: Config {
server_addr: *cfg.server_addr,
public_addr: cfg.public_addr,
max_payload_size: usize::MAX,
node_key: cfg.gossip.key.clone(),
gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit,
gossip_static_inbound: cfg.gossip.static_inbound.clone(),
gossip_static_outbound: cfg.gossip.static_outbound.clone(),
},
config: config(cfg),
block_store,
validator: cfg.validator_key.as_ref().map(|key| Validator {
key: key.clone(),
replica_store: Box::new(in_memory::ReplicaStore::default()),
validator: Some(Validator {
key: cfg.validator_key.clone().unwrap(),
replica_store: Box::new(replica_store),
payload_manager: Box::new(bft::testonly::RandomPayload(1000)),
}),
}
}

fn fullnode(cfg: &network::Config, block_store: Arc<BlockStore>) -> Executor {
Executor {
config: config(cfg),
block_store,
validator: None,
}
}

#[tokio::test]
async fn executing_single_validator() {
async fn test_single_validator() {
abort_on_panic();
let ctx = &ctx::root();
let rng = &mut ctx.rng();

let setup = Setup::new(rng, 1);
let cfgs = new_configs(rng, &setup, 0);
scope::run!(ctx, |ctx, s| async {
let replica_store = in_memory::ReplicaStore::default();
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(make_executor(&cfgs[0], store.clone()).run(ctx));
s.spawn_bg(validator(&cfgs[0], store.clone(), replica_store).run(ctx));
store.wait_until_persisted(ctx, BlockNumber(5)).await?;
Ok(())
})
Expand All @@ -49,7 +66,7 @@ async fn executing_single_validator() {
}

#[tokio::test]
async fn executing_validator_and_full_node() {
async fn test_fullnode_syncing_from_validator() {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let rng = &mut ctx.rng();
Expand All @@ -58,17 +75,73 @@ async fn executing_validator_and_full_node() {
let cfgs = new_configs(rng, &setup, 0);
scope::run!(ctx, |ctx, s| async {
// Spawn validator.
let replica_store = in_memory::ReplicaStore::default();
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(make_executor(&cfgs[0], store).run(ctx));
s.spawn_bg(validator(&cfgs[0], store, replica_store).run(ctx));

// Spawn full node.
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(make_executor(&new_fullnode(rng, &cfgs[0]), store.clone()).run(ctx));
s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), store.clone()).run(ctx));

// Wait for blocks in full node store.
store.wait_until_persisted(ctx, BlockNumber(5)).await?;
store
.wait_until_persisted(ctx, setup.genesis.fork.first_block + 5)
.await?;
Ok(())
})
.await
.unwrap();
}

/// Test in which validator is syncing missing blocks from a full node before producing blocks.
#[tokio::test]
async fn test_validator_syncing_from_fullnode() {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let rng = &mut ctx.rng();

let setup = Setup::new(rng, 1);
let cfgs = new_configs(rng, &setup, 0);
scope::run!(ctx, |ctx, s| async {
// Spawn full node.
let (node_store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), node_store.clone()).run(ctx));

// Run validator and produce some blocks.
// Wait for the blocks to be fetched by the full node.
let replica_store = in_memory::ReplicaStore::default();
scope::run!(ctx, |ctx, s| async {
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(validator(&cfgs[0], store, replica_store.clone()).run(ctx));
node_store
.wait_until_persisted(ctx, setup.genesis.fork.first_block + 4)
.await?;
Ok(())
})
.await
.unwrap();

// Restart the validator with empty store (but preserved replica state) and non-trivial
// `store.state.first`.
// Validator should fetch the past blocks from the full node before producing next blocks.
let last_block = node_store
.subscribe()
.borrow()
.last
.as_ref()
.unwrap()
.header()
.number;
let (store, runner) =
new_store_with_first(ctx, &setup.genesis, setup.genesis.fork.first_block + 2).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(validator(&cfgs[0], store, replica_store).run(ctx));
node_store.wait_until_persisted(ctx, last_block + 3).await?;

Ok(())
})
.await
Expand Down
Loading

0 comments on commit 5329a80

Please sign in to comment.