Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for running a node from a state snapshot (BFT-418) #75

Merged
merged 9 commits into from
Mar 18, 2024
Merged
7 changes: 3 additions & 4 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ impl StateMachine {
// The previous block was finalized, so we can propose a new block.
_ => {
let fork = &cfg.genesis().fork;
let (parent, number) = match high_qc {
Some(qc) => (Some(qc.header().hash()), qc.header().number.next()),
None => (fork.first_parent, fork.first_block),
let number = match high_qc {
Some(qc) => qc.header().number.next(),
None => fork.first_block,
};
// Defensively assume that PayloadManager cannot propose until the previous block is stored.
if let Some(prev) = number.prev() {
Expand All @@ -189,7 +189,6 @@ impl StateMachine {
.observe(payload.0.len());
let proposal = validator::BlockHeader {
number,
parent,
payload: payload.hash(),
};
(proposal, Some(payload))
Expand Down
7 changes: 0 additions & 7 deletions node/actors/bft/src/leader/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ async fn replica_prepare_sanity_yield_leader_prepare() {
.unwrap()
.unwrap();
assert_eq!(leader_prepare.msg.view(), &replica_prepare.view);
assert_eq!(
leader_prepare.msg.proposal.parent,
replica_prepare
.high_vote
.as_ref()
.map(|v| v.proposal.hash()),
);
assert_eq!(
leader_prepare.msg.justification,
util.new_prepare_qc(|msg| *msg = replica_prepare)
Expand Down
2 changes: 1 addition & 1 deletion node/actors/bft/src/replica/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl StateMachine {
tracing::info!(
"Finalized block {}: {:#?}",
block.header().number,
block.header().hash(),
block.header().payload,
);
self.config
.block_store
Expand Down
13 changes: 11 additions & 2 deletions node/actors/bft/src/replica/leader_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ pub(crate) enum Error {
/// Invalid message.
#[error("invalid message: {0:#}")]
InvalidMessage(#[source] validator::LeaderPrepareVerifyError),
/// Previous proposal was not finalized.

/// Leader proposed a block that was already pruned from replica's storage.
#[error("leader proposed a block that was already pruned from replica's storage")]
ProposalAlreadyPruned,
/// Oversized payload.
#[error("block proposal with an oversized payload (payload size: {payload_size})")]
ProposalOversizedPayload {
Expand Down Expand Up @@ -110,6 +111,14 @@ impl StateMachine {
});
}

// Replica MUSN'T vote for blocks which have been already pruned for storage.
// (because it won't be albe to persist and broadcast them once finalized).
pompon0 marked this conversation as resolved.
Show resolved Hide resolved
// TODO(gprusak): it should never happen, we should add safety checks to prevent
// pruning blocks not known to be finalized.
if message.proposal.number < self.config.block_store.subscribe().borrow().first {
return Err(Error::ProposalAlreadyPruned);
}

// ----------- Checking the message --------------

signed_message.verify().map_err(Error::InvalidSignature)?;
Expand Down
62 changes: 30 additions & 32 deletions node/actors/bft/src/replica/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use assert_matches::assert_matches;
use rand::Rng;
use zksync_concurrency::{ctx, scope};
use zksync_consensus_roles::validator::{
self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, ViewNumber,
self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare,
};

/// Sanity check of the happy path.
Expand Down Expand Up @@ -101,10 +101,6 @@ async fn leader_prepare_invalid_leader() {
let (mut util, runner) = UTHarness::new(ctx, 2).await;
s.spawn_bg(runner.run(ctx));

let view = ViewNumber(2);
util.set_view(view);
assert_eq!(util.view_leader(view), util.keys[0].public());

let replica_prepare = util.new_replica_prepare();
assert!(util
.process_replica_prepare(ctx, util.sign(replica_prepare.clone()))
Expand Down Expand Up @@ -167,6 +163,35 @@ async fn leader_prepare_old_view() {
.unwrap();
}

#[tokio::test]
async fn leader_prepare_pruned_block() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new(ctx, 1).await;
s.spawn_bg(runner.run(ctx));

let mut leader_prepare = util.new_leader_prepare(ctx).await;
// We assume default replica state and nontrivial `genesis.fork.first_block` here.
leader_prepare.proposal.number = util
.replica
.config
.block_store
.subscribe()
.borrow()
.first
.prev()
.unwrap();
let res = util
.process_leader_prepare(ctx, util.sign(leader_prepare))
.await;
assert_matches!(res, Err(leader_prepare::Error::ProposalAlreadyPruned));
Ok(())
})
.await
.unwrap();
}

/// Tests that `WriteBlockStore::verify_payload` is applied before signing a vote.
#[tokio::test]
async fn leader_prepare_invalid_payload() {
Expand Down Expand Up @@ -338,33 +363,6 @@ async fn leader_prepare_proposal_when_previous_not_finalized() {
.unwrap();
}

#[tokio::test]
async fn leader_prepare_bad_parent_hash() {
zksync_concurrency::testonly::abort_on_panic();
let ctx = &ctx::test_root(&ctx::RealClock);
scope::run!(ctx, |ctx, s| async {
let (mut util, runner) = UTHarness::new(ctx, 1).await;
s.spawn_bg(runner.run(ctx));

tracing::info!("Produce initial block.");
util.produce_block(ctx).await;
tracing::info!("Make leader propose the next block.");
let mut leader_prepare = util.new_leader_prepare(ctx).await;
tracing::info!("Modify the proposal.parent so that it doesn't match the previous block");
leader_prepare.proposal.parent = Some(ctx.rng().gen());
let res = util.process_leader_prepare(ctx, util.sign(leader_prepare.clone())).await;
assert_matches!(res, Err(leader_prepare::Error::InvalidMessage(
validator::LeaderPrepareVerifyError::BadParentHash { got, want }
)) => {
assert_eq!(want, Some(leader_prepare.justification.high_qc().unwrap().message.proposal.hash()));
assert_eq!(got, leader_prepare.proposal.parent);
});
Ok(())
})
.await
.unwrap();
}

#[tokio::test]
async fn leader_prepare_bad_block_number() {
zksync_concurrency::testonly::abort_on_panic();
Expand Down
8 changes: 3 additions & 5 deletions node/actors/bft/src/testonly/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,9 @@ impl Fuzz for validator::Payload {

impl Fuzz for validator::BlockHeader {
fn mutate(&mut self, rng: &mut impl Rng) {
match rng.gen_range(0..3) {
0 => self.parent = rng.gen(),
1 => self.number = rng.gen(),
2 => self.payload = rng.gen(),
_ => unreachable!(),
match rng.gen_range(0..2) {
0 => self.number = rng.gen(),
_ => self.payload = rng.gen(),
}
}
}
13 changes: 0 additions & 13 deletions node/actors/bft/src/testonly/ut_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,6 @@ impl UTHarness {
self.replica.view = view;
}

pub(crate) fn set_view(&mut self, view: ViewNumber) {
self.set_replica_view(view);
self.set_leader_view(view);
}

pub(crate) fn set_leader_view(&mut self, view: ViewNumber) {
self.leader.view = view
}

pub(crate) fn set_replica_view(&mut self, view: ViewNumber) {
self.replica.view = view
}

pub(crate) fn replica_view(&self) -> validator::View {
validator::View {
protocol_version: self.protocol_version(),
Expand Down
113 changes: 93 additions & 20 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,59 @@ use zksync_consensus_bft as bft;
use zksync_consensus_network::testonly::{new_configs, new_fullnode};
use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber};
use zksync_consensus_storage::{
testonly::{in_memory, new_store},
testonly::{in_memory, new_store, new_store_with_first},
BlockStore,
};

fn make_executor(cfg: &network::Config, block_store: Arc<BlockStore>) -> Executor {
fn config(cfg: &network::Config) -> Config {
Config {
server_addr: *cfg.server_addr,
public_addr: cfg.public_addr,
max_payload_size: usize::MAX,
node_key: cfg.gossip.key.clone(),
gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit,
gossip_static_inbound: cfg.gossip.static_inbound.clone(),
gossip_static_outbound: cfg.gossip.static_outbound.clone(),
}
}

fn validator(
cfg: &network::Config,
block_store: Arc<BlockStore>,
replica_store: impl ReplicaStore,
) -> Executor {
Executor {
config: Config {
server_addr: *cfg.server_addr,
public_addr: cfg.public_addr,
max_payload_size: usize::MAX,
node_key: cfg.gossip.key.clone(),
gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit,
gossip_static_inbound: cfg.gossip.static_inbound.clone(),
gossip_static_outbound: cfg.gossip.static_outbound.clone(),
},
config: config(cfg),
block_store,
validator: cfg.validator_key.as_ref().map(|key| Validator {
key: key.clone(),
replica_store: Box::new(in_memory::ReplicaStore::default()),
validator: Some(Validator {
key: cfg.validator_key.clone().unwrap(),
replica_store: Box::new(replica_store),
payload_manager: Box::new(bft::testonly::RandomPayload(1000)),
}),
}
}

fn fullnode(cfg: &network::Config, block_store: Arc<BlockStore>) -> Executor {
Executor {
config: config(cfg),
block_store,
validator: None,
}
}

#[tokio::test]
async fn executing_single_validator() {
async fn test_single_validator() {
abort_on_panic();
let ctx = &ctx::root();
let rng = &mut ctx.rng();

let setup = Setup::new(rng, 1);
let cfgs = new_configs(rng, &setup, 0);
scope::run!(ctx, |ctx, s| async {
let replica_store = in_memory::ReplicaStore::default();
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(make_executor(&cfgs[0], store.clone()).run(ctx));
s.spawn_bg(validator(&cfgs[0], store.clone(), replica_store).run(ctx));
store.wait_until_persisted(ctx, BlockNumber(5)).await?;
Ok(())
})
Expand All @@ -49,7 +66,7 @@ async fn executing_single_validator() {
}

#[tokio::test]
async fn executing_validator_and_full_node() {
async fn test_fullnode_syncing_from_validator() {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let rng = &mut ctx.rng();
Expand All @@ -58,17 +75,73 @@ async fn executing_validator_and_full_node() {
let cfgs = new_configs(rng, &setup, 0);
scope::run!(ctx, |ctx, s| async {
// Spawn validator.
let replica_store = in_memory::ReplicaStore::default();
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(make_executor(&cfgs[0], store).run(ctx));
s.spawn_bg(validator(&cfgs[0], store, replica_store).run(ctx));

// Spawn full node.
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(make_executor(&new_fullnode(rng, &cfgs[0]), store.clone()).run(ctx));
s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), store.clone()).run(ctx));

// Wait for blocks in full node store.
store.wait_until_persisted(ctx, BlockNumber(5)).await?;
store
.wait_until_persisted(ctx, setup.genesis.fork.first_block + 5)
.await?;
Ok(())
})
.await
.unwrap();
}

/// Test in which validator is syncing missing blocks from a full node before producing blocks.
#[tokio::test]
async fn test_validator_syncing_from_fullnode() {
abort_on_panic();
let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let rng = &mut ctx.rng();

let setup = Setup::new(rng, 1);
let cfgs = new_configs(rng, &setup, 0);
scope::run!(ctx, |ctx, s| async {
// Spawn full node.
let (node_store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), node_store.clone()).run(ctx));

// Run validator and produce some blocks.
// Wait for the blocks to be fetched by the full node.
let replica_store = in_memory::ReplicaStore::default();
scope::run!(ctx, |ctx, s| async {
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(validator(&cfgs[0], store, replica_store.clone()).run(ctx));
node_store
.wait_until_persisted(ctx, setup.genesis.fork.first_block + 4)
.await?;
Ok(())
})
.await
.unwrap();

// Restart the validator with empty store (but preserved replica state) and non-trivial
// `store.state.first`.
// Validator should fetch the past blocks from the full node before producing next blocks.
let last_block = node_store
.subscribe()
.borrow()
.last
.as_ref()
.unwrap()
.header()
.number;
let (store, runner) =
new_store_with_first(ctx, &setup.genesis, setup.genesis.fork.first_block + 2).await;
s.spawn_bg(runner.run(ctx));
s.spawn_bg(validator(&cfgs[0], store, replica_store).run(ctx));
node_store.wait_until_persisted(ctx, last_block + 3).await?;

Ok(())
})
.await
Expand Down
Loading
Loading