From ecbabff2d0de26470da639622f7563cf84c79a1b Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Thu, 1 Aug 2024 14:24:35 +0100 Subject: [PATCH] feat: Add GenesisHash to AttestationStatusClient and reject unexpected updates (BFT-496) (#167) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - [x] Add the initial `GenesisHash` to `AttestationStatus` and change `AttestationStatusWatch::update` to take `GenesisHash` and return an error if the latest from the client indicates a change in genesis. This stops the runner and thus should stop the node itself when the error bubbles up. On external nodes this is redundant with the routine that monitors the main node API for changes. On the main node I shouldn't happen. - [x] Change `next_batch_to_attest() -> Option` into `attestation_status() -> Option` on `AttestationStatusClient`, so it now includes the `GenesisHash` as well. - [x] `LocalAttestationStatusClient` returns the `GenesisHash` it was started with ## Why ❔ This came out of a discussion here: https://github.com/matter-labs/zksync-era/pull/2544#discussion_r1699598459 Notably the `ConsensusDal::attestation_status()` now returns the `GenesisHash`, which I thought was only to signal through the API from the main node to the external nodes that a reorg has happened (which they would eventually learn anyway through polling the genesis endpoint, causing them to restart), and they should not attest for batches if they are on a different fork. The comment raised the issue that on the main node I discarded the `genesis` field from the response because I assumed it can only be the same as the one we started the `Executor` with, and second guessing it in the `BatchStore` would be awkward: the original genesis wasn't available to check against (it's cached in the `BlockStore`) and running a query to fetch it (even if the `PersistentBatchStore` supported it) would just compare it to what it already is. The way I handled this mismatch for external nodes was to just stop updating the status by returning `None` and wait for the restart, treating it like any other transient RPC error, it was just logged. It does make sense though to elevate it higher and be able to stop the node if it's in an inconsistent state. Now it's part of the `AttestationStatusWatch` because that is what we consider to be our interface with the node, (and not the `AttestationStatusRunner` which is a convenience construct). ### Reorgs without restart? If an inconsistency happened on the main node it wouldn't necessarily have to be fatal: if the genesis was allowed to change, and the components such as the `BlockStore` were able to handle it at all, then ostensibly the `AttestationStatus*` stuff could recover as well, for example by resetting the `BatchVotes` if they see a change in the `genesis`. The problem currently is that they might have already received and discarded votes for the new fork, which would not be gossiped again until clients are reconnected. Apparently we don't plan to handle regenesis on the fly, so `AttestationStatus::genesis` is only present to prevent any attempt at changing it by _causing_ a restart instead. ### Atomicity In the first version of this PR the `BatchStore` and `PersistentBatchStore` were also changed to have an `attestation_status()` method that returned a `GenesisHash` along with the `BatchNumber`. The only way I could make sense of this was if 1) changes in genesis while running the main node were allowed and 2) they could happen between calls to `BlockStore::genesis()` and `BatchStore::next_batch_to_attest()`, since those are not atomic methods. We discussed that this will not be supported, the `Genesis` cached in `BlockStore` will not change. Since we only start the `Executor` and the `AttestationStatusRunner` [after regenesis](https://github.com/matter-labs/zksync-era/blob/1d206c0af8f28eb00eb1498d6f2cdbb45ffef72a/core/node/consensus/src/mn.rs#L39) in `zksync-era`, we don't have to worry about this changing. In that light it would be counter intuitive to return the genesis hash from `BatchStore`. (In the future we could consider using Software Transactional Memory to have an in-memory representation of the state where we can do consistent read and writes between multiple Watch-like references. The DAL is capable of transactional reads, and it would be nice if we could combine reading for example here the genesis and the last/next batch and not have to worry about having to compare hashes, when all of these are local reads.) --- node/actors/executor/src/attestation.rs | 65 +++++++++++------- node/actors/executor/src/tests.rs | 67 ++++++++++++++----- .../network/src/gossip/attestation_status.rs | 46 ++++++++++++- node/actors/network/src/gossip/mod.rs | 4 +- node/actors/network/src/testonly.rs | 18 +++-- node/tools/src/config.rs | 1 + 6 files changed, 152 insertions(+), 49 deletions(-) diff --git a/node/actors/executor/src/attestation.rs b/node/actors/executor/src/attestation.rs index db55131f..c949b9bd 100644 --- a/node/actors/executor/src/attestation.rs +++ b/node/actors/executor/src/attestation.rs @@ -5,7 +5,7 @@ use anyhow::Context; use std::sync::Arc; use zksync_concurrency::{ctx, sync, time}; use zksync_consensus_network::gossip::{ - AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher, + AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher, }; use zksync_consensus_roles::attester; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -120,10 +120,10 @@ pub trait AttestationStatusClient: 'static + Send + Sync { /// /// The API might return an error while genesis is being created, which we represent with `None` /// here and mean that we'll have to try again later. - async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result>; + /// + /// The genesis hash is returned along with the new batch number to facilitate detecting reorgs + /// on the main node as soon as possible and prevent inconsistent state from entering the system. + async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result>; } /// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch]. @@ -143,8 +143,12 @@ impl AttestationStatusRunner { ctx: &ctx::Ctx, client: Box, poll_interval: time::Duration, - ) -> ctx::OrCanceled<(Arc, Self)> { - let status = Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0))); + genesis: attester::GenesisHash, + ) -> ctx::Result<(Arc, Self)> { + let status = Arc::new(AttestationStatusWatch::new( + genesis, + attester::BatchNumber::default(), + )); let mut runner = Self { status: status.clone(), client, @@ -157,25 +161,32 @@ impl AttestationStatusRunner { /// Initialize an [AttestationStatusWatch] based on a [BatchStore] and return it along with the [AttestationStatusRunner]. pub async fn init_from_store( ctx: &ctx::Ctx, - store: Arc, + batch_store: Arc, poll_interval: time::Duration, - ) -> ctx::OrCanceled<(Arc, Self)> { + genesis: attester::GenesisHash, + ) -> ctx::Result<(Arc, Self)> { Self::init( ctx, - Box::new(LocalAttestationStatusClient(store)), + Box::new(LocalAttestationStatusClient { + genesis, + batch_store, + }), poll_interval, + genesis, ) .await } /// Run the poll loop. pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let _ = self.poll_forever(ctx).await; - Ok(()) + match self.poll_forever(ctx).await { + Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), + Err(ctx::Error::Internal(err)) => Err(err), + } } /// Poll the client forever in a loop or until canceled. - async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> { + async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { loop { self.poll_until_some(ctx).await?; ctx.sleep(self.poll_interval).await?; @@ -183,11 +194,13 @@ impl AttestationStatusRunner { } /// Poll the client until some data is returned and write it into the status. - async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> { + async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> { loop { - match self.client.next_batch_to_attest(ctx).await { - Ok(Some(next_batch_to_attest)) => { - self.status.update(next_batch_to_attest).await; + match self.client.attestation_status(ctx).await { + Ok(Some(status)) => { + self.status + .update(status.genesis, status.next_batch_to_attest) + .await?; return Ok(()); } Ok(None) => { @@ -206,14 +219,20 @@ impl AttestationStatusRunner { } /// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore]. -struct LocalAttestationStatusClient(Arc); +struct LocalAttestationStatusClient { + /// We don't expect the genesis to change while the main node is running, + /// so we can just cache the genesis hash and return it for every request. + genesis: attester::GenesisHash, + batch_store: Arc, +} #[async_trait::async_trait] impl AttestationStatusClient for LocalAttestationStatusClient { - async fn next_batch_to_attest( - &self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - self.0.next_batch_to_attest(ctx).await + async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result> { + let batch_number = self.batch_store.next_batch_to_attest(ctx).await?; + Ok(batch_number.map(|n| AttestationStatus { + genesis: self.genesis, + next_batch_to_attest: n, + })) } } diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 92a09462..287a2717 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -1,5 +1,5 @@ //! High-level tests for `Executor`. -use std::sync::atomic::AtomicU64; +use std::sync::{atomic::AtomicU64, Mutex}; use super::*; use attestation::{AttestationStatusClient, AttestationStatusRunner}; @@ -7,7 +7,10 @@ use rand::Rng as _; use tracing::Instrument as _; use zksync_concurrency::{sync, testonly::abort_on_panic}; use zksync_consensus_bft as bft; -use zksync_consensus_network::testonly::{new_configs, new_fullnode}; +use zksync_consensus_network::{ + gossip::AttestationStatus, + testonly::{new_configs, new_fullnode}, +}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; use zksync_consensus_storage::{ testonly::{in_memory, TestMemoryStorage}, @@ -32,8 +35,11 @@ fn config(cfg: &network::Config) -> Config { /// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch] /// that will never be updated. -fn never_attest() -> Arc { - Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0))) +fn never_attest(genesis: &validator::Genesis) -> Arc { + Arc::new(AttestationStatusWatch::new( + genesis.hash(), + attester::BatchNumber::default(), + )) } fn validator( @@ -42,6 +48,7 @@ fn validator( batch_store: Arc, replica_store: impl ReplicaStore, ) -> Executor { + let attestation_status = never_attest(block_store.genesis()); Executor { config: config(cfg), block_store, @@ -52,7 +59,7 @@ fn validator( payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), attester: None, - attestation_status: never_attest(), + attestation_status, } } @@ -61,13 +68,14 @@ fn fullnode( block_store: Arc, batch_store: Arc, ) -> Executor { + let attestation_status = never_attest(block_store.genesis()); Executor { config: config(cfg), block_store, batch_store, validator: None, attester: None, - attestation_status: never_attest(), + attestation_status, } } @@ -322,18 +330,22 @@ async fn test_attestation_status_runner() { abort_on_panic(); let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(5)); let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0)); + let rng = &mut ctx.rng(); + + let genesis: attester::GenesisHash = rng.gen(); - #[derive(Default)] + #[derive(Clone)] struct MockAttestationStatus { - batch_number: AtomicU64, + genesis: Arc>, + batch_number: Arc, } #[async_trait::async_trait] impl AttestationStatusClient for MockAttestationStatus { - async fn next_batch_to_attest( + async fn attestation_status( &self, _ctx: &ctx::Ctx, - ) -> ctx::Result> { + ) -> ctx::Result> { let curr = self .batch_number .fetch_add(1u64, std::sync::atomic::Ordering::Relaxed); @@ -342,16 +354,25 @@ async fn test_attestation_status_runner() { Ok(None) } else { // The first actual result will be 1 on the 2nd poll. - Ok(Some(attester::BatchNumber(curr))) + let status = AttestationStatus { + genesis: *self.genesis.lock().unwrap(), + next_batch_to_attest: attester::BatchNumber(curr), + }; + Ok(Some(status)) } } } - scope::run!(ctx, |ctx, s| async { + let res = scope::run!(ctx, |ctx, s| async { + let client = MockAttestationStatus { + genesis: Arc::new(Mutex::new(genesis)), + batch_number: Arc::new(AtomicU64::default()), + }; let (status, runner) = AttestationStatusRunner::init( ctx, - Box::new(MockAttestationStatus::default()), + Box::new(client.clone()), time::Duration::milliseconds(100), + genesis, ) .await .unwrap(); @@ -364,15 +385,27 @@ async fn test_attestation_status_runner() { let status = sync::changed(ctx, &mut recv_status).await?; assert_eq!(status.next_batch_to_attest.0, 1); } - // Now start polling for new values. - s.spawn_bg(runner.run(ctx)); + // Now start polling for new values. Starting in the foreground because we want it to fail in the end. + s.spawn(runner.run(ctx)); // Check that polling sets the value. { let status = sync::changed(ctx, &mut recv_status).await?; assert_eq!(status.next_batch_to_attest.0, 2); } + // Change the genesis returned by the client. It should cause the scope to fail. + { + let mut genesis = client.genesis.lock().unwrap(); + *genesis = rng.gen(); + } Ok(()) }) - .await - .unwrap(); + .await; + + match res { + Ok(()) => panic!("expected to fail when the genesis changed"), + Err(e) => assert!( + e.to_string().contains("genesis changed"), + "only expect failures due to genesis change" + ), + } } diff --git a/node/actors/network/src/gossip/attestation_status.rs b/node/actors/network/src/gossip/attestation_status.rs index b4e19f6d..514bb84b 100644 --- a/node/actors/network/src/gossip/attestation_status.rs +++ b/node/actors/network/src/gossip/attestation_status.rs @@ -6,13 +6,20 @@ use zksync_consensus_roles::attester; use crate::watch::Watch; /// Coordinate the attestation by showing the status as seen by the main node. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct AttestationStatus { /// Next batch number where voting is expected. /// /// The node is expected to poll the main node during initialization until /// the batch to start from is established. pub next_batch_to_attest: attester::BatchNumber, + /// The hash of the genesis of the chain to which the L1 batches belong. + /// + /// We don't expect to handle a regenesis on the fly without restarting the + /// node, so this value is not expected to change; it's here only to stop + /// any attempt at updating the status with a batch number that refers + /// to a different fork. + pub genesis: attester::GenesisHash, } /// The subscription over the attestation status which voters can monitor for change. @@ -31,8 +38,12 @@ impl fmt::Debug for AttestationStatusWatch { impl AttestationStatusWatch { /// Create a new watch going from a specific batch number. - pub fn new(next_batch_to_attest: attester::BatchNumber) -> Self { + pub fn new( + genesis: attester::GenesisHash, + next_batch_to_attest: attester::BatchNumber, + ) -> Self { Self(Watch::new(AttestationStatus { + genesis, next_batch_to_attest, })) } @@ -43,8 +54,36 @@ impl AttestationStatusWatch { } /// Set the next batch number to attest on and notify subscribers it changed. - pub async fn update(&self, next_batch_to_attest: attester::BatchNumber) { + /// + /// Fails if the genesis we want to update to is not the same as the watch was started with, + /// because the rest of the system is not expected to be able to handle reorgs without a + /// restart of the node. + pub async fn update( + &self, + genesis: attester::GenesisHash, + next_batch_to_attest: attester::BatchNumber, + ) -> anyhow::Result<()> { let this = self.0.lock().await; + { + let status = this.borrow(); + anyhow::ensure!( + status.genesis == genesis, + "the attestation status genesis changed: {:?} -> {:?}", + status.genesis, + genesis + ); + // The next batch to attest moving backwards could cause the voting process + // to get stuck due to the way gossiping works and the BatchVotes discards + // votes below the expected minimum: even if we clear the votes, we might + // not get them again from any peer. By returning an error we can cause + // the node to be restarted and connections re-established for fresh gossip. + anyhow::ensure!( + status.next_batch_to_attest <= next_batch_to_attest, + "next batch to attest moved backwards: {} -> {}", + status.next_batch_to_attest, + next_batch_to_attest + ); + } this.send_if_modified(|status| { if status.next_batch_to_attest == next_batch_to_attest { return false; @@ -52,5 +91,6 @@ impl AttestationStatusWatch { status.next_batch_to_attest = next_batch_to_attest; true }); + Ok(()) } } diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index a6a4e4e1..90e5a1d5 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -12,7 +12,9 @@ //! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to //! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip //! network graph (minimize its diameter, increase connectedness). -pub use self::attestation_status::{AttestationStatusReceiver, AttestationStatusWatch}; +pub use self::attestation_status::{ + AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch, +}; pub use self::batch_votes::BatchVotesPublisher; use self::batch_votes::BatchVotesWatch; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index c61b7658..00de2120 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -162,6 +162,7 @@ pub fn new_fullnode(rng: &mut impl Rng, peer: &Config) -> Config { pub struct InstanceRunner { net_runner: Runner, attestation_status: Arc, + block_store: Arc, batch_store: Arc, terminate: channel::Receiver<()>, } @@ -172,9 +173,13 @@ impl InstanceRunner { scope::run!(ctx, |ctx, s| async { s.spawn_bg(self.net_runner.run(ctx)); s.spawn_bg(async { + let genesis = self.block_store.genesis().hash(); loop { - if let Ok(Some(n)) = self.batch_store.next_batch_to_attest(ctx).await { - self.attestation_status.update(n).await; + if let Ok(Some(batch_number)) = self.batch_store.next_batch_to_attest(ctx).await + { + self.attestation_status + .update(genesis, batch_number) + .await?; } if ctx.sleep(time::Duration::seconds(1)).await.is_err() { return Ok(()); @@ -199,13 +204,15 @@ impl Instance { ) -> (Self, InstanceRunner) { // Semantically we'd want this to be created at the same level as the stores, // but doing so would introduce a lot of extra cruft in setting up tests. - let attestation_status = - Arc::new(AttestationStatusWatch::new(attester::BatchNumber::default())); + let attestation_status = Arc::new(AttestationStatusWatch::new( + block_store.genesis().hash(), + attester::BatchNumber::default(), + )); let (actor_pipe, dispatcher_pipe) = pipe::new(); let (net, net_runner) = Network::new( cfg, - block_store, + block_store.clone(), batch_store.clone(), actor_pipe, attestation_status.clone(), @@ -220,6 +227,7 @@ impl Instance { InstanceRunner { net_runner, attestation_status, + block_store, batch_store, terminate: terminate_recv, }, diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 30eed712..97d49fbf 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -269,6 +269,7 @@ impl Configs { ctx, store.batches.clone(), time::Duration::seconds(1), + self.app.genesis.hash(), ) .await?;