Skip to content

Commit

Permalink
Revert "revert: "feat: Poll the main node for the next batch to sign …
Browse files Browse the repository at this point in the history
…(BFT-496)" (#2574)"

This reverts commit 72d3be8.
  • Loading branch information
aakoshh committed Aug 2, 2024
1 parent 262942b commit 2081bf2
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 99 deletions.
41 changes: 21 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,16 @@ zk_evm_1_4_1 = { package = "zk_evm", version = "0.141.0" }
zk_evm_1_5_0 = { package = "zk_evm", version = "=0.150.0" }

# Consensus dependencies.
zksync_concurrency = "=0.1.0-rc.4"
zksync_consensus_bft = "=0.1.0-rc.4"
zksync_consensus_crypto = "=0.1.0-rc.4"
zksync_consensus_executor = "=0.1.0-rc.4"
zksync_consensus_network = "=0.1.0-rc.4"
zksync_consensus_roles = "=0.1.0-rc.4"
zksync_consensus_storage = "=0.1.0-rc.4"
zksync_consensus_utils = "=0.1.0-rc.4"
zksync_protobuf = "=0.1.0-rc.4"
zksync_protobuf_build = "=0.1.0-rc.4"
zksync_concurrency = "=0.1.0-rc.5"
zksync_consensus_bft = "=0.1.0-rc.5"
zksync_consensus_crypto = "=0.1.0-rc.5"
zksync_consensus_executor = "=0.1.0-rc.5"
zksync_consensus_network = "=0.1.0-rc.5"
zksync_consensus_roles = "=0.1.0-rc.5"
zksync_consensus_storage = "=0.1.0-rc.5"
zksync_consensus_utils = "=0.1.0-rc.5"
zksync_protobuf = "=0.1.0-rc.5"
zksync_protobuf_build = "=0.1.0-rc.5"

# "Local" dependencies
zksync_multivm = { version = "0.1.0", path = "core/lib/multivm" }
Expand Down
1 change: 1 addition & 0 deletions core/lib/dal/src/consensus_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ impl ConsensusDal<'_, '_> {
}
.await?
else {
tracing::info!(%genesis.first_block, "genesis block not found");
return Ok(None);
};
Ok(Some(AttestationStatus {
Expand Down
1 change: 1 addition & 0 deletions core/node/consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ pub(super) fn executor(
rpc,
// TODO: Add to configuration
debug_page: None,
batch_poll_interval: time::Duration::seconds(1),
})
}
54 changes: 53 additions & 1 deletion core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use anyhow::Context as _;
use async_trait::async_trait;
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_consensus_executor as executor;
use zksync_consensus_executor::{
self as executor,
attestation::{AttestationStatusClient, AttestationStatusRunner},
};
use zksync_consensus_network::gossip;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};
use zksync_dal::consensus_dal;
use zksync_node_sync::{
fetcher::FetchedBlock, sync_action::ActionQueueSender, MainNodeClient, SyncState,
};
Expand Down Expand Up @@ -47,6 +53,7 @@ impl EN {

// Initialize genesis.
let genesis = self.fetch_genesis(ctx).await.wrap("fetch_genesis()")?;
let genesis_hash = genesis.hash();
let mut conn = self.pool.connection(ctx).await.wrap("connection()")?;

conn.try_update_genesis(ctx, &genesis)
Expand Down Expand Up @@ -99,6 +106,18 @@ impl EN {
.wrap("BatchStore::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

let (attestation_status, runner) = {
AttestationStatusRunner::init(
ctx,
Box::new(MainNodeAttestationStatus(self.client.clone())),
time::Duration::seconds(5),
genesis_hash,
)
.await
.wrap("AttestationStatusRunner::init()")?
};
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
Expand All @@ -111,7 +130,9 @@ impl EN {
payload_manager: Box::new(store.clone()),
}),
attester,
attestation_status,
};
tracing::info!("running the external node executor");
executor.run(ctx).await?;

Ok(())
Expand Down Expand Up @@ -239,3 +260,34 @@ impl EN {
Ok(())
}
}

/// Wrapper to call [MainNodeClient::fetch_attestation_status] and adapt the return value to [AttestationStatusClient].
struct MainNodeAttestationStatus(Box<DynClient<L2>>);

#[async_trait]
impl AttestationStatusClient for MainNodeAttestationStatus {
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<gossip::AttestationStatus>> {
match ctx.wait(self.0.fetch_attestation_status()).await? {
Ok(Some(status)) => {
// If this fails the AttestationStatusRunner will log it an retry it later,
// but it won't stop the whole node.
let status: consensus_dal::AttestationStatus =
zksync_protobuf::serde::deserialize(&status.0)
.context("deserialize(AttestationStatus")?;
let status = gossip::AttestationStatus {
genesis: status.genesis,
next_batch_to_attest: status.next_batch_to_attest,
};
Ok(Some(status))
}
Ok(None) => Ok(None),
Err(err) => {
tracing::warn!("AttestationStatus call to main node HTTP RPC failed: {err}");
Ok(None)
}
}
}
}
19 changes: 17 additions & 2 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor::{self as executor, Attester};
use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner, Attester};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};

Expand Down Expand Up @@ -61,6 +61,18 @@ pub async fn run_main_node(
.wrap("BatchStore::new()")?;
s.spawn_bg(runner.run(ctx));

let (attestation_status, runner) = {
AttestationStatusRunner::init_from_store(
ctx,
batch_store.clone(),
time::Duration::seconds(1),
block_store.genesis().hash(),
)
.await
.wrap("AttestationStatusRunner::init_from_store()")?
};
s.spawn_bg(runner.run(ctx));

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
Expand All @@ -71,7 +83,10 @@ pub async fn run_main_node(
payload_manager: Box::new(store.clone()),
}),
attester,
attestation_status,
};

tracing::info!("running the main node executor");
executor.run(ctx).await
})
.await
Expand Down
11 changes: 11 additions & 0 deletions core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,4 +435,15 @@ impl<'a> Connection<'a> {
last,
})
}

/// Wrapper for `consensus_dal().attestation_status()`.
pub async fn attestation_status(
&mut self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<consensus_dal::AttestationStatus>> {
Ok(ctx
.wait(self.0.consensus_dal().attestation_status())
.await?
.context("attestation_status()")?)
}
}
59 changes: 19 additions & 40 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,44 +523,18 @@ impl storage::PersistentBatchStore for Store {
self.batches_persisted.clone()
}

/// Get the earliest L1 batch number which has to be signed by attesters.
async fn earliest_batch_number_to_sign(
/// Get the next L1 batch number which has to be signed by attesters.
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
// This is the rough roadmap of how this logic will evolve:
// 1. Make best effort at gossiping and collecting votes; the `BatchVotes` in consensus only considers the last vote per attesters.
// Still, we can re-sign more than the last batch, anticipating step 2.
// 2. Ask the Main Node what is the earliest batch number that it still expects votes for (ie. what is the last submission + 1).
// 3. Change `BatchVotes` to handle multiple pending batch numbers, anticipating that batch intervals might decrease dramatically.
// 4. Once QC is required to submit to L1, Look at L1 to figure out what is the last submission, and sign after that.

// Originally this method returned all unsigned batch numbers by doing a DAL query, but we decided it should be okay and cheap
// to resend signatures for already signed batches, and we don't have to worry about skipping them. Because of that, we also
// didn't think it makes sense to query the database for the earliest unsigned batch *after* the submission, because we might
// as well just re-sign everything. Until we have a way to argue about the "last submission" we just re-sign the last 10 to
// try to produce as many QCs as the voting register allows, within reason.

// The latest decision is not to store batches with gaps between in the database *of the main node*.
// Once we have an API to serve to external nodes the earliest number the main node wants them to sign,
// we can get rid of this method: on the main node we can sign from what `last_batch_qc` returns, and
// while external nodes we can go from whatever the API returned.

const NUM_BATCHES_TO_SIGN: u64 = 10;

let Some(last_batch_number) = self
Ok(self
.conn(ctx)
.await?
.get_last_batch_number(ctx)
.attestation_status(ctx)
.await
.wrap("get_last_batch_number")?
else {
return Ok(None);
};

Ok(Some(attester::BatchNumber(
last_batch_number.0.saturating_sub(NUM_BATCHES_TO_SIGN),
)))
.wrap("next_batch_to_attest")?
.map(|s| s.next_batch_to_attest))
}

/// Get the L1 batch QC from storage with the highest number.
Expand Down Expand Up @@ -603,16 +577,21 @@ impl storage::PersistentBatchStore for Store {
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::Batch>> {
let Some(hash) = self
.conn(ctx)
.await?
.batch_hash(ctx, number)
.await
.wrap("batch_hash()")?
else {
let mut conn = self.conn(ctx).await?;

let Some(hash) = conn.batch_hash(ctx, number).await.wrap("batch_hash()")? else {
return Ok(None);
};
Ok(Some(attester::Batch { number, hash }))

let Some(genesis) = conn.genesis(ctx).await.wrap("genesis()")? else {
return Ok(None);
};

Ok(Some(attester::Batch {
number,
hash,
genesis: genesis.hash(),
}))
}

/// Returns the QC of the batch with the given number.
Expand Down
Loading

0 comments on commit 2081bf2

Please sign in to comment.