Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Poll the main node for the next batch to sign (BFT-496) #2544

Merged
merged 14 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 21 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,16 @@ zk_evm_1_4_1 = { package = "zk_evm", version = "0.141.0" }
zk_evm_1_5_0 = { package = "zk_evm", version = "0.150.0" }

# Consensus dependencies.
zksync_concurrency = "=0.1.0-rc.4"
zksync_consensus_bft = "=0.1.0-rc.4"
zksync_consensus_crypto = "=0.1.0-rc.4"
zksync_consensus_executor = "=0.1.0-rc.4"
zksync_consensus_network = "=0.1.0-rc.4"
zksync_consensus_roles = "=0.1.0-rc.4"
zksync_consensus_storage = "=0.1.0-rc.4"
zksync_consensus_utils = "=0.1.0-rc.4"
zksync_protobuf = "=0.1.0-rc.4"
zksync_protobuf_build = "=0.1.0-rc.4"
zksync_concurrency = "=0.1.0-rc.5"
zksync_consensus_bft = "=0.1.0-rc.5"
zksync_consensus_crypto = "=0.1.0-rc.5"
zksync_consensus_executor = "=0.1.0-rc.5"
zksync_consensus_network = "=0.1.0-rc.5"
zksync_consensus_roles = "=0.1.0-rc.5"
zksync_consensus_storage = "=0.1.0-rc.5"
zksync_consensus_utils = "=0.1.0-rc.5"
zksync_protobuf = "=0.1.0-rc.5"
zksync_protobuf_build = "=0.1.0-rc.5"

# "Local" dependencies
zksync_multivm = { version = "0.1.0", path = "core/lib/multivm" }
Expand Down
1 change: 1 addition & 0 deletions core/lib/dal/src/consensus_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ impl ConsensusDal<'_, '_> {
}
.await?
else {
tracing::info!(%genesis.first_block, "genesis block not found");
return Ok(None);
};
Ok(Some(AttestationStatus {
Expand Down
1 change: 1 addition & 0 deletions core/node/consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ pub(super) fn executor(
rpc,
// TODO: Add to configuration
debug_page: None,
batch_poll_interval: time::Duration::seconds(1),
})
}
54 changes: 53 additions & 1 deletion core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use anyhow::Context as _;
use async_trait::async_trait;
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_consensus_executor as executor;
use zksync_consensus_executor::{
self as executor,
attestation::{AttestationStatusClient, AttestationStatusRunner},
};
use zksync_consensus_network::gossip;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};
use zksync_dal::consensus_dal;
use zksync_node_sync::{
fetcher::FetchedBlock, sync_action::ActionQueueSender, MainNodeClient, SyncState,
};
Expand Down Expand Up @@ -47,6 +53,7 @@ impl EN {

// Initialize genesis.
let genesis = self.fetch_genesis(ctx).await.wrap("fetch_genesis()")?;
let genesis_hash = genesis.hash();
let mut conn = self.pool.connection(ctx).await.wrap("connection()")?;

conn.try_update_genesis(ctx, &genesis)
Expand Down Expand Up @@ -99,6 +106,18 @@ impl EN {
.wrap("BatchStore::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

let (attestation_status, runner) = {
AttestationStatusRunner::init(
ctx,
Box::new(MainNodeAttestationStatus(self.client.clone())),
time::Duration::seconds(5),
genesis_hash,
)
.await
.wrap("AttestationStatusRunner::init()")?
};
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
Expand All @@ -111,7 +130,9 @@ impl EN {
payload_manager: Box::new(store.clone()),
}),
attester,
attestation_status,
};
tracing::info!("running the external node executor");
executor.run(ctx).await?;

Ok(())
Expand Down Expand Up @@ -239,3 +260,34 @@ impl EN {
Ok(())
}
}

/// Wrapper to call [MainNodeClient::fetch_attestation_status] and adapt the return value to [AttestationStatusClient].
struct MainNodeAttestationStatus(Box<DynClient<L2>>);

#[async_trait]
impl AttestationStatusClient for MainNodeAttestationStatus {
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<gossip::AttestationStatus>> {
match ctx.wait(self.0.fetch_attestation_status()).await? {
Ok(Some(status)) => {
// If this fails the AttestationStatusRunner will log it an retry it later,
// but it won't stop the whole node.
let status: consensus_dal::AttestationStatus =
zksync_protobuf::serde::deserialize(&status.0)
.context("deserialize(AttestationStatus")?;
let status = gossip::AttestationStatus {
genesis: status.genesis,
next_batch_to_attest: status.next_batch_to_attest,
};
Ok(Some(status))
}
Ok(None) => Ok(None),
Err(err) => {
tracing::warn!("AttestationStatus call to main node HTTP RPC failed: {err}");
Ok(None)
}
}
}
}
19 changes: 17 additions & 2 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor::{self as executor, Attester};
use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner, Attester};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};

Expand Down Expand Up @@ -61,6 +61,18 @@ pub async fn run_main_node(
.wrap("BatchStore::new()")?;
s.spawn_bg(runner.run(ctx));

let (attestation_status, runner) = {
AttestationStatusRunner::init_from_store(
ctx,
batch_store.clone(),
time::Duration::seconds(1),
block_store.genesis().hash(),
)
.await
.wrap("AttestationStatusRunner::init_from_store()")?
};
s.spawn_bg(runner.run(ctx));

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
Expand All @@ -71,7 +83,10 @@ pub async fn run_main_node(
payload_manager: Box::new(store.clone()),
}),
attester,
attestation_status,
};

tracing::info!("running the main node executor");
executor.run(ctx).await
})
.await
Expand Down
11 changes: 11 additions & 0 deletions core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,4 +435,15 @@ impl<'a> Connection<'a> {
last,
})
}

/// Wrapper for `consensus_dal().attestation_status()`.
pub async fn attestation_status(
&mut self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<consensus_dal::AttestationStatus>> {
Ok(ctx
.wait(self.0.consensus_dal().attestation_status())
.await?
.context("attestation_status()")?)
}
}
59 changes: 19 additions & 40 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,44 +523,18 @@ impl storage::PersistentBatchStore for Store {
self.batches_persisted.clone()
}

/// Get the earliest L1 batch number which has to be signed by attesters.
async fn earliest_batch_number_to_sign(
/// Get the next L1 batch number which has to be signed by attesters.
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
// This is the rough roadmap of how this logic will evolve:
// 1. Make best effort at gossiping and collecting votes; the `BatchVotes` in consensus only considers the last vote per attesters.
// Still, we can re-sign more than the last batch, anticipating step 2.
// 2. Ask the Main Node what is the earliest batch number that it still expects votes for (ie. what is the last submission + 1).
// 3. Change `BatchVotes` to handle multiple pending batch numbers, anticipating that batch intervals might decrease dramatically.
// 4. Once QC is required to submit to L1, Look at L1 to figure out what is the last submission, and sign after that.

// Originally this method returned all unsigned batch numbers by doing a DAL query, but we decided it should be okay and cheap
// to resend signatures for already signed batches, and we don't have to worry about skipping them. Because of that, we also
// didn't think it makes sense to query the database for the earliest unsigned batch *after* the submission, because we might
// as well just re-sign everything. Until we have a way to argue about the "last submission" we just re-sign the last 10 to
// try to produce as many QCs as the voting register allows, within reason.

// The latest decision is not to store batches with gaps between in the database *of the main node*.
// Once we have an API to serve to external nodes the earliest number the main node wants them to sign,
// we can get rid of this method: on the main node we can sign from what `last_batch_qc` returns, and
// while external nodes we can go from whatever the API returned.

const NUM_BATCHES_TO_SIGN: u64 = 10;

let Some(last_batch_number) = self
Ok(self
.conn(ctx)
.await?
.get_last_batch_number(ctx)
.attestation_status(ctx)
.await
.wrap("get_last_batch_number")?
else {
return Ok(None);
};

Ok(Some(attester::BatchNumber(
last_batch_number.0.saturating_sub(NUM_BATCHES_TO_SIGN),
)))
.wrap("next_batch_to_attest")?
.map(|s| s.next_batch_to_attest))
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
}

/// Get the L1 batch QC from storage with the highest number.
Expand Down Expand Up @@ -603,16 +577,21 @@ impl storage::PersistentBatchStore for Store {
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::Batch>> {
let Some(hash) = self
.conn(ctx)
.await?
.batch_hash(ctx, number)
.await
.wrap("batch_hash()")?
else {
let mut conn = self.conn(ctx).await?;

let Some(hash) = conn.batch_hash(ctx, number).await.wrap("batch_hash()")? else {
return Ok(None);
};
Ok(Some(attester::Batch { number, hash }))

let Some(genesis) = conn.genesis(ctx).await.wrap("genesis()")? else {
return Ok(None);
};

Ok(Some(attester::Batch {
number,
hash,
genesis: genesis.hash(),
}))
}

/// Returns the QC of the batch with the given number.
Expand Down
Loading
Loading