Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed PushBatchStoreStateReq message #156

Merged
merged 5 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 3 additions & 7 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
//! Module to publish attestations over batches.

use std::sync::Arc;

use crate::Attester;
use anyhow::Context;
use zksync_concurrency::ctx;
use zksync_concurrency::time;
use std::sync::Arc;
use zksync_concurrency::{ctx, time};
use zksync_consensus_network::gossip::BatchVotesPublisher;
use zksync_consensus_roles::attester;
use zksync_consensus_storage::{BatchStore, BlockStore};

use crate::Attester;

const POLL_INTERVAL: time::Duration = time::Duration::seconds(1);

/// Polls the database for new batches to be signed and publishes them to the gossip channel.
Expand Down Expand Up @@ -56,7 +53,6 @@ impl AttesterRunner {
.await
.context("wait_until_persisted")?
.last
.map(|b| b.number)
.unwrap_or_default();

// Determine the batch to start signing from.
Expand Down
9 changes: 4 additions & 5 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
};
use zksync_concurrency::{ctx, limiter, net, scope, time};
use zksync_consensus_bft as bft;
use zksync_consensus_network::{self as network};
use zksync_consensus_network as network;
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore, ReplicaStore};
use zksync_consensus_utils::pipe;
Expand Down Expand Up @@ -50,7 +50,7 @@ pub struct Config {
/// Maximal size of the block payload.
pub max_payload_size: usize,
/// Maximal size of a batch, which includes `max_payload_size` per block in the batch,
/// plus the size of the Merkle proof of the commitment being included on L1 (should be ~1kB).
/// plus the size of the Merkle proof of the commitment being included on L1.
pub max_batch_size: usize,
/// Key of this node. It uniquely identifies the node.
/// It should match the secret key provided in the `node_key` file.
Expand Down Expand Up @@ -132,15 +132,14 @@ impl Executor {

tracing::debug!("Starting actors in separate threads.");
scope::run!(ctx, |ctx, s| async {
s.spawn(async { dispatcher.run(ctx).await.context("IO Dispatcher stopped") });
let (net, runner) = network::Network::new(
network_config,
self.block_store.clone(),
self.batch_store.clone(),
network_actor_pipe,
);
net.register_metrics();

s.spawn(async { dispatcher.run(ctx).await.context("IO Dispatcher stopped") });
s.spawn(async { runner.run(ctx).await.context("Network stopped") });

if let Some(attester) = self.attester {
Expand All @@ -151,7 +150,7 @@ impl Executor {
attester,
net.batch_vote_publisher(),
);
s.spawn::<()>(async {
s.spawn(async {
runner.run(ctx).await?;
Ok(())
});
Expand Down
30 changes: 18 additions & 12 deletions node/actors/network/src/gossip/batch_votes.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! Global state distributed by active attesters, observed by all the nodes in the network.
use crate::watch::Watch;
use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, fmt, sync::Arc};
use zksync_concurrency::sync;
use zksync_consensus_roles::attester::{self, Batch};
use zksync_consensus_roles::attester;

/// Represents the currents state of node's knowledge about the attester votes.
/// Represents the current state of node's knowledge about the attester votes.
///
/// Eventually this data structure will have to track voting potentially happening
/// simultaneously on multiple heights, if we decrease the batch interval to be
Expand All @@ -24,7 +24,7 @@ pub(crate) struct BatchVotes {
/// for now, hoping that with 1 minute batches there's plenty of time for
/// the quorum to be reached, but eventually we'll have to allow multiple
/// votes across different heights.
pub(crate) votes: im::HashMap<attester::PublicKey, Arc<attester::Signed<Batch>>>,
pub(crate) votes: im::HashMap<attester::PublicKey, Arc<attester::Signed<attester::Batch>>>,

/// Total weight of votes at different heights and hashes.
///
Expand All @@ -40,7 +40,7 @@ pub(crate) struct BatchVotes {

impl BatchVotes {
/// Returns a set of votes of `self` which are newer than the entries in `b`.
pub(super) fn get_newer(&self, b: &Self) -> Vec<Arc<attester::Signed<Batch>>> {
pub(super) fn get_newer(&self, b: &Self) -> Vec<Arc<attester::Signed<attester::Batch>>> {
let mut newer = vec![];
for (k, v) in &self.votes {
if let Some(bv) = b.votes.get(k) {
Expand All @@ -61,7 +61,7 @@ impl BatchVotes {
pub(super) fn update(
&mut self,
attesters: &attester::Committee,
data: &[Arc<attester::Signed<Batch>>],
data: &[Arc<attester::Signed<attester::Batch>>],
) -> anyhow::Result<bool> {
let mut changed = false;

Expand Down Expand Up @@ -133,7 +133,7 @@ impl BatchVotes {
sigs
});
attester::BatchQC {
message: Batch {
message: attester::Batch {
number: *number,
hash: *hash,
},
Expand All @@ -151,13 +151,12 @@ impl BatchVotes {
self.min_batch_number = min_batch_number;
self.votes.retain(|_, v| v.msg.number >= min_batch_number);
if let Some(prev) = min_batch_number.prev() {
let (_, support) = self.support.split(&prev);
self.support = support;
self.support = self.support.split(&prev).1;
}
}

/// Add an already validated vote from an attester into the register.
fn add(&mut self, vote: Arc<attester::Signed<Batch>>, weight: attester::Weight) {
fn add(&mut self, vote: Arc<attester::Signed<attester::Batch>>, weight: attester::Weight) {
self.remove(&vote.key, weight);

let batch = self.support.entry(vote.msg.number).or_default();
Expand Down Expand Up @@ -213,7 +212,7 @@ impl BatchVotesWatch {
pub(crate) async fn update(
&self,
attesters: &attester::Committee,
data: &[Arc<attester::Signed<Batch>>],
data: &[Arc<attester::Signed<attester::Batch>>],
) -> anyhow::Result<()> {
let this = self.0.lock().await;
let mut votes = this.borrow().clone();
Expand All @@ -233,13 +232,20 @@ impl BatchVotesWatch {
/// Wrapper around [BatchVotesWatch] to publish votes over batches signed by an attester key.
pub struct BatchVotesPublisher(pub(crate) Arc<BatchVotesWatch>);

impl fmt::Debug for BatchVotesPublisher {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("BatchVotesPublisher")
.finish_non_exhaustive()
}
}
brunoffranca marked this conversation as resolved.
Show resolved Hide resolved

impl BatchVotesPublisher {
/// Sign an L1 batch and push it into the batch, which should cause it to be gossiped by the network.
pub async fn publish(
&self,
attesters: &attester::Committee,
attester: &attester::SecretKey,
batch: Batch,
batch: attester::Batch,
) -> anyhow::Result<()> {
if !attesters.contains(&attester.public()) {
return Ok(());
Expand Down
7 changes: 3 additions & 4 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
pub use self::batch_votes::BatchVotesPublisher;
use self::batch_votes::BatchVotesWatch;
use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats};
use anyhow::Context as _;
use fetch::RequestItem;
use std::sync::{atomic::AtomicUsize, Arc};
pub(crate) use validator_addrs::*;
use zksync_concurrency::{ctx, ctx::channel, scope, sync};
use zksync_concurrency::{ctx, ctx::channel, error::Wrap as _, scope, sync};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore};

Expand Down Expand Up @@ -170,9 +169,9 @@ impl Network {
let next_batch_number = qc.message.number.next();

self.batch_store
.queue_batch_qc(ctx, qc)
.persist_batch_qc(ctx, qc)
.await
.context("queue_batch_qc")?;
.wrap("queue_batch_qc")?;

self.batch_votes
.set_min_batch_number(next_batch_number)
Expand Down
Loading
Loading