Skip to content

Commit

Permalink
snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Feb 14, 2024
1 parent 5b93025 commit ad5b1a5
Show file tree
Hide file tree
Showing 14 changed files with 106 additions and 102 deletions.
8 changes: 4 additions & 4 deletions node/actors/network/src/consensus/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn test_one_connection_per_validator() {
let nodes = testonly::new_configs(rng, &setup, 1);

scope::run!(ctx, |ctx,s| async {
let (store,runner) = new_store(ctx,&setup.blocks[0]).await;
let (store,runner) = new_store(ctx,&setup.genesis).await;
s.spawn_bg(runner.run(ctx));
let nodes : Vec<_> = nodes.into_iter().enumerate().map(|(i,node)| {
let (node,runner) = testonly::Instance::new(ctx, node, store.clone());
Expand Down Expand Up @@ -76,7 +76,7 @@ async fn test_genesis_mismatch() {
let mut listener = cfgs[1].server_addr.bind().context("server_addr.bind()")?;

tracing::info!("Start one node, we will simulate the other one.");
let (store,runner) = new_store(ctx,&setup.blocks[0]).await;
let (store,runner) = new_store(ctx,&setup.genesis).await;
s.spawn_bg(runner.run(ctx));
let (node,runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone());
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node")));
Expand Down Expand Up @@ -116,7 +116,7 @@ async fn test_address_change() {
let setup = validator::testonly::GenesisSetup::new(rng, 5);
let mut cfgs = testonly::new_configs(rng, &setup, 1);
scope::run!(ctx, |ctx, s| async {
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
let mut nodes: Vec<_> = cfgs
.iter()
Expand Down Expand Up @@ -169,7 +169,7 @@ async fn test_transmission() {
let cfgs = testonly::new_configs(rng, &setup, 1);

scope::run!(ctx, |ctx, s| async {
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
let mut nodes: Vec<_> = cfgs
.iter()
Expand Down
20 changes: 10 additions & 10 deletions node/actors/network/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn test_one_connection_per_node() {
let cfgs = testonly::new_configs(rng, &setup, 2);

scope::run!(ctx, |ctx,s| async {
let (store,runner) = new_store(ctx,&setup.blocks[0]).await;
let (store,runner) = new_store(ctx,&setup.genesis).await;
s.spawn_bg(runner.run(ctx));
let mut nodes : Vec<_> = cfgs.iter().enumerate().map(|(i,cfg)| {
let (node,runner) = testonly::Instance::new(ctx, cfg.clone(), store.clone());
Expand Down Expand Up @@ -230,7 +230,7 @@ async fn test_validator_addrs_propagation() {
let cfgs = testonly::new_configs(rng, &setup, 1);

scope::run!(ctx, |ctx, s| async {
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
let nodes: Vec<_> = cfgs
.iter()
Expand Down Expand Up @@ -273,7 +273,7 @@ async fn test_genesis_mismatch() {
let mut listener = cfgs[1].server_addr.bind().context("server_addr.bind()")?;

tracing::info!("Start one node, we will simulate the other one.");
let (store,runner) = new_store(ctx,&setup.blocks[0]).await;
let (store,runner) = new_store(ctx,&setup.genesis).await;
s.spawn_bg(runner.run(ctx));
let (_node,runner) = testonly::Instance::new(ctx, cfgs[0].clone(), store.clone());
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node")));
Expand Down Expand Up @@ -316,7 +316,7 @@ async fn syncing_blocks(node_count: usize, gossip_peers: usize) {
scope::run!(ctx, |ctx, s| async {
let mut nodes = vec![];
for (i, cfg) in cfgs.into_iter().enumerate() {
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
let (node, runner) = testonly::Instance::new(ctx, cfg, store);
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i)));
Expand Down Expand Up @@ -358,7 +358,7 @@ async fn wait_for_updates(
else {
continue;
};
if state.last == block.justification {
if state.last.as_ref() == Some(&block.justification) {
updates.insert(peer);
}
response.send(()).ok();
Expand All @@ -384,15 +384,15 @@ async fn uncoordinated_block_syncing(

let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0));
let rng = &mut ctx.rng();
let mut setup = validator::testonly::GenesisSetup::empty(rng, node_count);
let mut setup = validator::testonly::GenesisSetup::new(rng, node_count);
setup.push_blocks(rng, EXCHANGED_STATE_COUNT);
scope::run!(ctx, |ctx, s| async {
for (i, cfg) in testonly::new_configs(rng, &setup, gossip_peers)
.into_iter()
.enumerate()
{
let i = i;
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
let (node, runner) = testonly::Instance::new(ctx, cfg, store.clone());
s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i)));
Expand Down Expand Up @@ -428,7 +428,7 @@ async fn getting_blocks_from_peers(node_count: usize, gossip_peers: usize) {
// All inbound and outbound peers should answer the request.
let expected_successful_responses = (2 * gossip_peers).min(node_count - 1);

let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
scope::run!(ctx, |ctx, s| async {
s.spawn_bg(runner.run(ctx));
let mut nodes: Vec<_> = cfgs
Expand Down Expand Up @@ -515,7 +515,7 @@ async fn validator_node_restart() {
for cfg in &mut cfgs {
cfg.rpc.push_validator_addrs_rate.refresh = time::Duration::ZERO;
}
let (store, store_runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, store_runner) = new_store(ctx, &setup.genesis).await;
let (node1, node1_runner) = testonly::Instance::new(ctx, cfgs[1].clone(), store.clone());
scope::run!(ctx, |ctx, s| async {
s.spawn_bg(store_runner.run(ctx));
Expand Down Expand Up @@ -598,7 +598,7 @@ async fn rate_limiting() {
}
let mut nodes = vec![];
scope::run!(ctx, |ctx, s| async {
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
// Spawn the satellite nodes and wait until they register
// their own address.
Expand Down
4 changes: 2 additions & 2 deletions node/actors/network/src/proto/gossip.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ message PushValidatorAddrs {
// and actively fetch newest blocks.
message PushBlockStoreState {
// First L2 block that the node has locally.
optional roles.validator.CommitQC first = 1;
optional uint64 first = 1; // required; BlockNumber
// Last L2 block that the node has locally.
optional roles.validator.CommitQC last = 2;
optional roles.validator.CommitQC last = 2; // optional
}

// Asks the server to send an L2 block (including its transactions).
Expand Down
11 changes: 6 additions & 5 deletions node/actors/network/src/rpc/push_block_store_state.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! RPC for notifying peer about our BlockStore state.
use crate::{mux, proto::gossip as proto};
use anyhow::Context;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::BlockStoreState;
use zksync_protobuf::{read_required, ProtoFmt};
use zksync_protobuf::{required, read_optional, ProtoFmt};

/// PushBlockStoreState RPC.
#[derive(Debug)]
Expand All @@ -26,15 +27,15 @@ impl ProtoFmt for Req {

fn read(message: &Self::Proto) -> anyhow::Result<Self> {
Ok(Self(BlockStoreState {
first: read_required(&message.first).context("first")?,
last: read_required(&message.last).context("last")?,
first: validator::BlockNumber(*required(&message.first).context("first")?),
last: read_optional(&message.last).context("last")?,
}))
}

fn build(&self) -> Self::Proto {
Self::Proto {
first: Some(self.0.first.build()),
last: Some(self.0.last.build()),
first: Some(self.0.first.0),
last: self.0.last.as_ref().map(|x|x.build()),
}
}
}
2 changes: 1 addition & 1 deletion node/actors/network/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn test_metrics() {
let setup = validator::testonly::GenesisSetup::new(rng, 3);
let cfgs = testonly::new_configs(rng, &setup, 1);
scope::run!(ctx, |ctx, s| async {
let (store, runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, runner) = new_store(ctx, &setup.genesis).await;
s.spawn_bg(runner.run(ctx));
let nodes: Vec<_> = cfgs
.into_iter()
Expand Down
3 changes: 2 additions & 1 deletion node/actors/sync_blocks/src/peers/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ struct TestHandles {
#[async_trait]
trait Test: fmt::Debug + Send + Sync {
const BLOCK_COUNT: usize;
// TODO: move this to genesis
const GENESIS_BLOCK_NUMBER: usize = 0;

fn config(&self, setup: &validator::testonly::GenesisSetup) -> Config {
Expand Down Expand Up @@ -70,7 +71,7 @@ async fn test_peer_states<T: Test>(test: T) {
let rng = &mut ctx.rng();
let mut setup = validator::testonly::GenesisSetup::new(rng, 4);
setup.push_blocks(rng, T::BLOCK_COUNT);
let (store, store_run) = new_store(ctx, &setup.blocks[T::GENESIS_BLOCK_NUMBER]).await;
let (store, store_run) = new_store(ctx, &setup.genesis).await;
test.initialize_storage(ctx, store.as_ref(), &setup).await;

let (message_sender, message_receiver) = channel::unbounded();
Expand Down
2 changes: 1 addition & 1 deletion node/actors/sync_blocks/src/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Node {
network: network::Config,
setup: Arc<GenesisSetup>,
) -> (Self, NodeRunner) {
let (store, store_runner) = new_store(ctx, &setup.blocks[0]).await;
let (store, store_runner) = new_store(ctx, &setup.genesis).await;
let (switch_on_sender, switch_on_receiver) = oneshot::channel();
let (terminate_send, terminate_recv) = channel::bounded(1);

Expand Down
9 changes: 0 additions & 9 deletions node/libs/roles/src/validator/messages/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,6 @@ impl BlockHeader {
BlockHeaderHash(Keccak256::new(&zksync_protobuf::canonical(self)))
}

/// Creates a first block of the chain
pub fn first(payload: PayloadHash, number: BlockNumber) -> Self {
Self {
parent: None,
number,
payload,
}
}

/// Creates a child block for the given parent.
pub fn next(parent: &BlockHeader, payload: PayloadHash) -> Self {
Self {
Expand Down
23 changes: 11 additions & 12 deletions node/libs/roles/src/validator/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ pub struct BlockBuilder<'a> {
}

impl GenesisSetup {
/// Constructs GenesisSetup with no blocks.
pub fn empty(rng: &mut impl Rng, validators: usize) -> Self {
/// Constructs GenesisSetup.
pub fn new(rng: &mut impl Rng, validators: usize) -> Self {
let keys: Vec<SecretKey> = (0..validators).map(|_| rng.gen()).collect();
let genesis = Genesis {
validators: ValidatorSet::new(keys.iter().map(|k| k.public())).unwrap(),
Expand All @@ -84,34 +84,33 @@ impl GenesisSetup {
}
}

/// Constructs GenesisSetup with genesis block.
pub fn new(rng: &mut impl Rng, validators: usize) -> Self {
let mut this = Self::empty(rng, validators);
this.push_block(rng.gen());
this
}

/// Returns a builder for the next block.
pub fn next_block(&mut self) -> BlockBuilder {
assert!(self.genesis.forks.root()==self.genesis.forks.current(), "constructing blocks for genesis with >1 fork is not supported yet");
let parent = self.blocks.last().map(|b| &b.justification.message);
let payload = Payload(vec![]);
let fork = self.genesis.forks.current();
BlockBuilder {
msg: match parent {
Some(p) => ReplicaCommit {
view: View {
protocol_version: p.view.protocol_version,
fork: p.view.fork,
fork: fork.number,
number: p.view.number.next(),
},
proposal: BlockHeader::next(&p.proposal, payload.hash()),
},
None => ReplicaCommit {
view: View {
protocol_version: ProtocolVersion::EARLIEST,
fork: self.genesis.forks.current().number,
fork: fork.number,
number: ViewNumber(0),
},
proposal: BlockHeader::first(payload.hash(), BlockNumber(0)),
proposal: BlockHeader {
parent: fork.first_parent,
number: fork.first_block,
payload: payload.hash(),
},
},
},
payload,
Expand Down
15 changes: 9 additions & 6 deletions node/libs/storage/src/block_store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use std::time;
#[derive(Debug, vise::Metrics)]
#[metrics(prefix = "zksync_consensus_storage_persistent_block_store")]
pub(super) struct PersistentBlockStore {
/// Latency of a successful `state()` call.
/// Latency of a successful `genesis()` call.
#[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)]
pub(super) state_latency: vise::Histogram<time::Duration>,
pub(super) genesis_latency: vise::Histogram<time::Duration>,
/// Latency of a successful `last()` call.
#[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)]
pub(super) last_latency: vise::Histogram<time::Duration>,
/// Latency of a successful `block()` call.
#[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)]
pub(super) block_latency: vise::Histogram<time::Duration>,
Expand All @@ -21,8 +24,8 @@ pub(super) static PERSISTENT_BLOCK_STORE: vise::Global<PersistentBlockStore> = v
#[derive(Debug, vise::Metrics)]
#[metrics(prefix = "zksync_consensus_storage_block_store")]
pub(super) struct BlockStore {
/// BlockNumber of the last queued block.
pub(super) last_queued_block: vise::Gauge<u64>,
/// BlockNumber of the last persisted block.
pub(super) last_persisted_block: vise::Gauge<u64>,
/// BlockNumber of the next block to queue.
pub(super) next_queued_block: vise::Gauge<u64>,
/// BlockNumber of the next block to persist.
pub(super) next_persisted_block: vise::Gauge<u64>,
}
Loading

0 comments on commit ad5b1a5

Please sign in to comment.