Skip to content

Commit

Permalink
fixed PushBatchStoreStateReq message (#156)
Browse files Browse the repository at this point in the history
it was pushing the last batch, instead of just a batch number.
Additionally I've:
* tweaked names a bit
* replaced context() with wrap() to propagate cancellation
* fixed gossipnet msg size limits
* added Debug for the public type

---------

Co-authored-by: Bruno França <[email protected]>
  • Loading branch information
pompon0 and brunoffranca authored Jul 22, 2024
1 parent cf01b6a commit 99df300
Show file tree
Hide file tree
Showing 19 changed files with 145 additions and 181 deletions.
4 changes: 2 additions & 2 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
//! Module to publish attestations over batches.

use std::sync::Arc;

use crate::Attester;
use anyhow::Context;
use zksync_concurrency::ctx;
use zksync_concurrency::time;
use std::sync::Arc;
use zksync_concurrency::{ctx, time};
use zksync_consensus_network::gossip::BatchVotesPublisher;
use zksync_consensus_roles::attester;
use zksync_consensus_storage::{BatchStore, BlockStore};

use crate::Attester;

const POLL_INTERVAL: time::Duration = time::Duration::seconds(1);

/// Polls the database for new batches to be signed and publishes them to the gossip channel.
Expand Down Expand Up @@ -56,7 +53,6 @@ impl AttesterRunner {
.await
.context("wait_until_persisted")?
.last
.map(|b| b.number)
.unwrap_or_default();

// Determine the batch to start signing from.
Expand Down
9 changes: 4 additions & 5 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network::{self as network};
use zksync_consensus_network as network;
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
Expand Down Expand Up @@ -50,7 +50,7 @@ pub struct Config {
/// Maximal size of the block payload.
pub max_payload_size: usize,
/// Maximal size of a batch, which includes `max_payload_size` per block in the batch,
/// plus the size of the Merkle proof of the commitment being included on L1 (should be ~1kB).
/// plus the size of the Merkle proof of the commitment being included on L1.
pub max_batch_size: usize,
/// Key of this node. It uniquely identifies the node.
/// It should match the secret key provided in the `node_key` file.
Expand Down Expand Up @@ -132,15 +132,14 @@ impl Executor {

tracing::debug!("Starting actors in separate threads.");
scope::run!(ctx, |ctx, s| async {
s.spawn(async { dispatcher.run(ctx).await.context("IO Dispatcher stopped") });
let (net, runner) = network::Network::new(
network_config,
self.block_store.clone(),
self.batch_store.clone(),
network_actor_pipe,
);
net.register_metrics();

s.spawn(async { dispatcher.run(ctx).await.context("IO Dispatcher stopped") });
s.spawn(async { runner.run(ctx).await.context("Network stopped") });

if let Some(attester) = self.attester {
Expand All @@ -151,7 +150,7 @@ impl Executor {
attester,
net.batch_vote_publisher(),
);
s.spawn::<()>(async {
s.spawn(async {
runner.run(ctx).await?;
Ok(())
});
Expand Down
30 changes: 18 additions & 12 deletions node/actors/network/src/gossip/batch_votes.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! Global state distributed by active attesters, observed by all the nodes in the network.
use crate::watch::Watch;
use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, fmt, sync::Arc};
use zksync_concurrency::sync;
use zksync_consensus_roles::attester::{self, Batch};
use zksync_consensus_roles::attester;

/// Represents the currents state of node's knowledge about the attester votes.
/// Represents the current state of node's knowledge about the attester votes.
///
/// Eventually this data structure will have to track voting potentially happening
/// simultaneously on multiple heights, if we decrease the batch interval to be
Expand All @@ -24,7 +24,7 @@ pub(crate) struct BatchVotes {
/// for now, hoping that with 1 minute batches there's plenty of time for
/// the quorum to be reached, but eventually we'll have to allow multiple
/// votes across different heights.
pub(crate) votes: im::HashMap<attester::PublicKey, Arc<attester::Signed<Batch>>>,
pub(crate) votes: im::HashMap<attester::PublicKey, Arc<attester::Signed<attester::Batch>>>,

/// Total weight of votes at different heights and hashes.
///
Expand All @@ -40,7 +40,7 @@ pub(crate) struct BatchVotes {

impl BatchVotes {
/// Returns a set of votes of `self` which are newer than the entries in `b`.
pub(super) fn get_newer(&self, b: &Self) -> Vec<Arc<attester::Signed<Batch>>> {
pub(super) fn get_newer(&self, b: &Self) -> Vec<Arc<attester::Signed<attester::Batch>>> {
let mut newer = vec![];
for (k, v) in &self.votes {
if let Some(bv) = b.votes.get(k) {
Expand All @@ -61,7 +61,7 @@ impl BatchVotes {
pub(super) fn update(
&mut self,
attesters: &attester::Committee,
data: &[Arc<attester::Signed<Batch>>],
data: &[Arc<attester::Signed<attester::Batch>>],
) -> anyhow::Result<bool> {
let mut changed = false;

Expand Down Expand Up @@ -133,7 +133,7 @@ impl BatchVotes {
sigs
});
attester::BatchQC {
message: Batch {
message: attester::Batch {
number: *number,
hash: *hash,
},
Expand All @@ -151,13 +151,12 @@ impl BatchVotes {
self.min_batch_number = min_batch_number;
self.votes.retain(|_, v| v.msg.number >= min_batch_number);
if let Some(prev) = min_batch_number.prev() {
let (_, support) = self.support.split(&prev);
self.support = support;
self.support = self.support.split(&prev).1;
}
}

/// Add an already validated vote from an attester into the register.
fn add(&mut self, vote: Arc<attester::Signed<Batch>>, weight: attester::Weight) {
fn add(&mut self, vote: Arc<attester::Signed<attester::Batch>>, weight: attester::Weight) {
self.remove(&vote.key, weight);

let batch = self.support.entry(vote.msg.number).or_default();
Expand Down Expand Up @@ -213,7 +212,7 @@ impl BatchVotesWatch {
pub(crate) async fn update(
&self,
attesters: &attester::Committee,
data: &[Arc<attester::Signed<Batch>>],
data: &[Arc<attester::Signed<attester::Batch>>],
) -> anyhow::Result<()> {
let this = self.0.lock().await;
let mut votes = this.borrow().clone();
Expand All @@ -233,13 +232,20 @@ impl BatchVotesWatch {
/// Wrapper around [BatchVotesWatch] to publish votes over batches signed by an attester key.
pub struct BatchVotesPublisher(pub(crate) Arc<BatchVotesWatch>);

impl fmt::Debug for BatchVotesPublisher {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("BatchVotesPublisher")
.finish_non_exhaustive()
}
}

impl BatchVotesPublisher {
/// Sign an L1 batch and push it into the batch, which should cause it to be gossiped by the network.
pub async fn publish(
&self,
attesters: &attester::Committee,
attester: &attester::SecretKey,
batch: Batch,
batch: attester::Batch,
) -> anyhow::Result<()> {
if !attesters.contains(&attester.public()) {
return Ok(());
Expand Down
7 changes: 3 additions & 4 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
pub use self::batch_votes::BatchVotesPublisher;
use self::batch_votes::BatchVotesWatch;
use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats};
use anyhow::Context as _;
use fetch::RequestItem;
use std::sync::{atomic::AtomicUsize, Arc};
pub(crate) use validator_addrs::*;
use zksync_concurrency::{ctx, ctx::channel, scope, sync};
use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore};

Expand Down Expand Up @@ -170,9 +169,9 @@ impl Network {
let next_batch_number = qc.message.number.next();

self.batch_store
.queue_batch_qc(ctx, qc)
.persist_batch_qc(ctx, qc)
.await
.context("queue_batch_qc")?;
.wrap("queue_batch_qc")?;

self.batch_votes
.set_min_batch_number(next_batch_number)
Expand Down
Loading

0 comments on commit 99df300

Please sign in to comment.