Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve: Do not initialise the AttestationStatus (BFT-496) #169

Merged
merged 2 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Context;
use std::sync::Arc;
use zksync_concurrency::{ctx, sync, time};
use zksync_consensus_network::gossip::{
AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
};
use zksync_consensus_roles::attester;
use zksync_consensus_storage::{BatchStore, BlockStore};
Expand Down Expand Up @@ -58,9 +58,12 @@ impl AttesterRunner {
self.status.mark_changed();

loop {
let batch_number = sync::changed(ctx, &mut self.status)
let Some(batch_number) = sync::changed(ctx, &mut self.status)
.await?
.next_batch_to_attest;
.next_batch_to_attest
else {
continue;
};

tracing::info!(%batch_number, "attestation status");

Expand Down Expand Up @@ -123,7 +126,10 @@ pub trait AttestationStatusClient: 'static + Send + Sync {
///
/// The genesis hash is returned along with the new batch number to facilitate detecting reorgs
/// on the main node as soon as possible and prevent inconsistent state from entering the system.
async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<AttestationStatus>>;
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>>;
}

/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch].
Expand All @@ -140,21 +146,19 @@ impl AttestationStatusRunner {
///
/// It polls the [AttestationStatusClient] until it returns a value to initialize the status with.
pub async fn init(
ctx: &ctx::Ctx,
_ctx: &ctx::Ctx,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
genesis: attester::GenesisHash,
) -> ctx::Result<(Arc<AttestationStatusWatch>, Self)> {
let status = Arc::new(AttestationStatusWatch::new(
genesis,
attester::BatchNumber::default(),
));
let mut runner = Self {
let status = Arc::new(AttestationStatusWatch::new(genesis));
let runner = Self {
status: status.clone(),
client,
poll_interval,
};
runner.poll_until_some(ctx).await?;
// This would initialise the status to some value, however the EN was rolled out first without the main node API.
// runner.poll_until_some(ctx).await?;
Ok((status, runner))
}

Expand Down Expand Up @@ -197,10 +201,8 @@ impl AttestationStatusRunner {
async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
match self.client.attestation_status(ctx).await {
Ok(Some(status)) => {
self.status
.update(status.genesis, status.next_batch_to_attest)
.await?;
Ok(Some((genesis, next_batch_to_attest))) => {
self.status.update(genesis, next_batch_to_attest).await?;
return Ok(());
}
Ok(None) => {
Expand Down Expand Up @@ -228,11 +230,12 @@ struct LocalAttestationStatusClient {

#[async_trait::async_trait]
impl AttestationStatusClient for LocalAttestationStatusClient {
async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<AttestationStatus>> {
let batch_number = self.batch_store.next_batch_to_attest(ctx).await?;
Ok(batch_number.map(|n| AttestationStatus {
genesis: self.genesis,
next_batch_to_attest: n,
}))
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>> {
let next_batch_to_attest = self.batch_store.next_batch_to_attest(ctx).await?;

Ok(next_batch_to_attest.map(|n| (self.genesis, n)))
}
}
29 changes: 11 additions & 18 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use rand::Rng as _;
use tracing::Instrument as _;
use zksync_concurrency::{sync, testonly::abort_on_panic};
use zksync_consensus_bft as bft;
use zksync_consensus_network::{
gossip::AttestationStatus,
testonly::{new_configs, new_fullnode},
};
use zksync_consensus_network::testonly::{new_configs, new_fullnode};
use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber};
use zksync_consensus_storage::{
testonly::{in_memory, TestMemoryStorage},
Expand All @@ -36,10 +33,7 @@ fn config(cfg: &network::Config) -> Config {
/// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch]
/// that will never be updated.
fn never_attest(genesis: &validator::Genesis) -> Arc<AttestationStatusWatch> {
Arc::new(AttestationStatusWatch::new(
genesis.hash(),
attester::BatchNumber::default(),
))
Arc::new(AttestationStatusWatch::new(genesis.hash()))
}

fn validator(
Expand Down Expand Up @@ -345,7 +339,7 @@ async fn test_attestation_status_runner() {
async fn attestation_status(
&self,
_ctx: &ctx::Ctx,
) -> ctx::Result<Option<AttestationStatus>> {
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>> {
let curr = self
.batch_number
.fetch_add(1u64, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -354,11 +348,9 @@ async fn test_attestation_status_runner() {
Ok(None)
} else {
// The first actual result will be 1 on the 2nd poll.
let status = AttestationStatus {
genesis: *self.genesis.lock().unwrap(),
next_batch_to_attest: attester::BatchNumber(curr),
};
Ok(Some(status))
let genesis = *self.genesis.lock().unwrap();
let next_batch_to_attest = attester::BatchNumber(curr);
Ok(Some((genesis, next_batch_to_attest)))
}
}
}
Expand All @@ -380,17 +372,18 @@ async fn test_attestation_status_runner() {
let mut recv_status = status.subscribe();
recv_status.mark_changed();

// Check that the value has been initialised to a non-default value.
// Check that the value has *not* been initialised to a non-default value.
{
let status = sync::changed(ctx, &mut recv_status).await?;
assert_eq!(status.next_batch_to_attest.0, 1);
assert!(status.next_batch_to_attest.is_none());
}
// Now start polling for new values. Starting in the foreground because we want it to fail in the end.
s.spawn(runner.run(ctx));
// Check that polling sets the value.
{
let status = sync::changed(ctx, &mut recv_status).await?;
assert_eq!(status.next_batch_to_attest.0, 2);
assert!(status.next_batch_to_attest.is_some());
assert_eq!(status.next_batch_to_attest.unwrap().0, 1);
}
// Change the genesis returned by the client. It should cause the scope to fail.
{
Expand All @@ -405,7 +398,7 @@ async fn test_attestation_status_runner() {
Ok(()) => panic!("expected to fail when the genesis changed"),
Err(e) => assert!(
e.to_string().contains("genesis changed"),
"only expect failures due to genesis change"
"only expect failures due to genesis change; got: {e}"
),
}
}
33 changes: 16 additions & 17 deletions node/actors/network/src/gossip/attestation_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::watch::Watch;
pub struct AttestationStatus {
/// Next batch number where voting is expected.
///
/// The node is expected to poll the main node during initialization until
/// the batch to start from is established.
pub next_batch_to_attest: attester::BatchNumber,
/// The field is optional so that we can start an external node without the main node API
/// already deployed, which is how the initial rollout is.
pub next_batch_to_attest: Option<attester::BatchNumber>,
/// The hash of the genesis of the chain to which the L1 batches belong.
///
/// We don't expect to handle a regenesis on the fly without restarting the
Expand All @@ -37,14 +37,11 @@ impl fmt::Debug for AttestationStatusWatch {
}

impl AttestationStatusWatch {
/// Create a new watch going from a specific batch number.
pub fn new(
genesis: attester::GenesisHash,
next_batch_to_attest: attester::BatchNumber,
) -> Self {
/// Create a new watch with the current genesis, and a yet-to-be-determined batch number.
pub fn new(genesis: attester::GenesisHash) -> Self {
Self(Watch::new(AttestationStatus {
genesis,
next_batch_to_attest,
next_batch_to_attest: None,
}))
}

Expand Down Expand Up @@ -77,18 +74,20 @@ impl AttestationStatusWatch {
// votes below the expected minimum: even if we clear the votes, we might
// not get them again from any peer. By returning an error we can cause
// the node to be restarted and connections re-established for fresh gossip.
anyhow::ensure!(
status.next_batch_to_attest <= next_batch_to_attest,
"next batch to attest moved backwards: {} -> {}",
status.next_batch_to_attest,
next_batch_to_attest
);
if let Some(old_batch_to_attest) = status.next_batch_to_attest {
anyhow::ensure!(
old_batch_to_attest <= next_batch_to_attest,
"next batch to attest moved backwards: {} -> {}",
old_batch_to_attest,
next_batch_to_attest
);
}
}
this.send_if_modified(|status| {
if status.next_batch_to_attest == next_batch_to_attest {
if status.next_batch_to_attest == Some(next_batch_to_attest) {
return false;
}
status.next_batch_to_attest = next_batch_to_attest;
status.next_batch_to_attest = Some(next_batch_to_attest);
true
});
Ok(())
Expand Down
11 changes: 6 additions & 5 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,15 @@ impl Network {

loop {
// Wait until the status indicates that we're ready to sign the next batch.
let next_batch_number = sync::changed(ctx, &mut recv_status)
let Some(batch_number) = sync::changed(ctx, &mut recv_status)
.await?
.next_batch_to_attest;
.next_batch_to_attest
else {
continue;
};

// Get rid of all previous votes. We don't expect this to go backwards without regenesis, which will involve a restart.
self.batch_votes
.set_min_batch_number(next_batch_number)
.await;
self.batch_votes.set_min_batch_number(batch_number).await;

// Now wait until we find the next quorum, whatever it is:
// * on the main node, if attesters are honest, they will vote on the next batch number and the main node will not see gaps
Expand Down
8 changes: 3 additions & 5 deletions node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use zksync_concurrency::{
ctx::{self, channel},
io, limiter, net, scope, sync, time,
};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore};
use zksync_consensus_utils::pipe;

Expand Down Expand Up @@ -204,10 +204,8 @@ impl Instance {
) -> (Self, InstanceRunner) {
// Semantically we'd want this to be created at the same level as the stores,
// but doing so would introduce a lot of extra cruft in setting up tests.
let attestation_status = Arc::new(AttestationStatusWatch::new(
block_store.genesis().hash(),
attester::BatchNumber::default(),
));
let attestation_status =
Arc::new(AttestationStatusWatch::new(block_store.genesis().hash()));

let (actor_pipe, dispatcher_pipe) = pipe::new();
let (net, net_runner) = Network::new(
Expand Down
Loading