Skip to content

Commit

Permalink
feat: Add GenesisHash to AttestationStatusClient and reject unexpecte…
Browse files Browse the repository at this point in the history
…d updates (BFT-496) (#167)

## What ❔
- [x] Add the initial `GenesisHash` to `AttestationStatus` and change
`AttestationStatusWatch::update` to take `GenesisHash` and return an
error if the latest from the client indicates a change in genesis. This
stops the runner and thus should stop the node itself when the error
bubbles up. On external nodes this is redundant with the routine that
monitors the main node API for changes. On the main node I shouldn't
happen.
- [x] Change `next_batch_to_attest() -> Option<BatchNumber>` into
`attestation_status() -> Option<AttestationStatus>` on
`AttestationStatusClient`, so it now includes the `GenesisHash` as well.
- [x] `LocalAttestationStatusClient` returns the `GenesisHash` it was
started with

## Why ❔

This came out of a discussion here:
matter-labs/zksync-era#2544 (comment)

Notably the `ConsensusDal::attestation_status()` now returns the
`GenesisHash`, which I thought was only to signal through the API from
the main node to the external nodes that a reorg has happened (which
they would eventually learn anyway through polling the genesis endpoint,
causing them to restart), and they should not attest for batches if they
are on a different fork. The comment raised the issue that on the main
node I discarded the `genesis` field from the response because I assumed
it can only be the same as the one we started the `Executor` with, and
second guessing it in the `BatchStore` would be awkward: the original
genesis wasn't available to check against (it's cached in the
`BlockStore`) and running a query to fetch it (even if the
`PersistentBatchStore` supported it) would just compare it to what it
already is.

The way I handled this mismatch for external nodes was to just stop
updating the status by returning `None` and wait for the restart,
treating it like any other transient RPC error, it was just logged. It
does make sense though to elevate it higher and be able to stop the node
if it's in an inconsistent state.

Now it's part of the `AttestationStatusWatch` because that is what we
consider to be our interface with the node, (and not the
`AttestationStatusRunner` which is a convenience construct).

### Reorgs without restart?

If an inconsistency happened on the main node it wouldn't necessarily
have to be fatal: if the genesis was allowed to change, and the
components such as the `BlockStore` were able to handle it at all, then
ostensibly the `AttestationStatus*` stuff could recover as well, for
example by resetting the `BatchVotes` if they see a change in the
`genesis`. The problem currently is that they might have already
received and discarded votes for the new fork, which would not be
gossiped again until clients are reconnected.

Apparently we don't plan to handle regenesis on the fly, so
`AttestationStatus::genesis` is only present to prevent any attempt at
changing it by _causing_ a restart instead.

### Atomicity

In the first version of this PR the `BatchStore` and
`PersistentBatchStore` were also changed to have an
`attestation_status()` method that returned a `GenesisHash` along with
the `BatchNumber`. The only way I could make sense of this was if 1)
changes in genesis while running the main node were allowed and 2) they
could happen between calls to `BlockStore::genesis()` and
`BatchStore::next_batch_to_attest()`, since those are not atomic
methods. We discussed that this will not be supported, the `Genesis`
cached in `BlockStore` will not change. Since we only start the
`Executor` and the `AttestationStatusRunner` [after
regenesis](https://github.com/matter-labs/zksync-era/blob/1d206c0af8f28eb00eb1498d6f2cdbb45ffef72a/core/node/consensus/src/mn.rs#L39)
in `zksync-era`, we don't have to worry about this changing. In that
light it would be counter intuitive to return the genesis hash from
`BatchStore`.


(In the future we could consider using Software Transactional Memory to
have an in-memory representation of the state where we can do consistent
read and writes between multiple Watch-like references. The DAL is
capable of transactional reads, and it would be nice if we could combine
reading for example here the genesis and the last/next batch and not
have to worry about having to compare hashes, when all of these are
local reads.)
  • Loading branch information
aakoshh authored Aug 1, 2024
1 parent aa29e87 commit ecbabff
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 49 deletions.
65 changes: 42 additions & 23 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Context;
use std::sync::Arc;
use zksync_concurrency::{ctx, sync, time};
use zksync_consensus_network::gossip::{
AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
};
use zksync_consensus_roles::attester;
use zksync_consensus_storage::{BatchStore, BlockStore};
Expand Down Expand Up @@ -120,10 +120,10 @@ pub trait AttestationStatusClient: 'static + Send + Sync {
///
/// The API might return an error while genesis is being created, which we represent with `None`
/// here and mean that we'll have to try again later.
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>>;
///
/// The genesis hash is returned along with the new batch number to facilitate detecting reorgs
/// on the main node as soon as possible and prevent inconsistent state from entering the system.
async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<AttestationStatus>>;
}

/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch].
Expand All @@ -143,8 +143,12 @@ impl AttestationStatusRunner {
ctx: &ctx::Ctx,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
) -> ctx::OrCanceled<(Arc<AttestationStatusWatch>, Self)> {
let status = Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0)));
genesis: attester::GenesisHash,
) -> ctx::Result<(Arc<AttestationStatusWatch>, Self)> {
let status = Arc::new(AttestationStatusWatch::new(
genesis,
attester::BatchNumber::default(),
));
let mut runner = Self {
status: status.clone(),
client,
Expand All @@ -157,37 +161,46 @@ impl AttestationStatusRunner {
/// Initialize an [AttestationStatusWatch] based on a [BatchStore] and return it along with the [AttestationStatusRunner].
pub async fn init_from_store(
ctx: &ctx::Ctx,
store: Arc<BatchStore>,
batch_store: Arc<BatchStore>,
poll_interval: time::Duration,
) -> ctx::OrCanceled<(Arc<AttestationStatusWatch>, Self)> {
genesis: attester::GenesisHash,
) -> ctx::Result<(Arc<AttestationStatusWatch>, Self)> {
Self::init(
ctx,
Box::new(LocalAttestationStatusClient(store)),
Box::new(LocalAttestationStatusClient {
genesis,
batch_store,
}),
poll_interval,
genesis,
)
.await
}

/// Run the poll loop.
pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let _ = self.poll_forever(ctx).await;
Ok(())
match self.poll_forever(ctx).await {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
}
}

/// Poll the client forever in a loop or until canceled.
async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> {
async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
self.poll_until_some(ctx).await?;
ctx.sleep(self.poll_interval).await?;
}
}

/// Poll the client until some data is returned and write it into the status.
async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> {
async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
match self.client.next_batch_to_attest(ctx).await {
Ok(Some(next_batch_to_attest)) => {
self.status.update(next_batch_to_attest).await;
match self.client.attestation_status(ctx).await {
Ok(Some(status)) => {
self.status
.update(status.genesis, status.next_batch_to_attest)
.await?;
return Ok(());
}
Ok(None) => {
Expand All @@ -206,14 +219,20 @@ impl AttestationStatusRunner {
}

/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore].
struct LocalAttestationStatusClient(Arc<BatchStore>);
struct LocalAttestationStatusClient {
/// We don't expect the genesis to change while the main node is running,
/// so we can just cache the genesis hash and return it for every request.
genesis: attester::GenesisHash,
batch_store: Arc<BatchStore>,
}

#[async_trait::async_trait]
impl AttestationStatusClient for LocalAttestationStatusClient {
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
self.0.next_batch_to_attest(ctx).await
async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<AttestationStatus>> {
let batch_number = self.batch_store.next_batch_to_attest(ctx).await?;
Ok(batch_number.map(|n| AttestationStatus {
genesis: self.genesis,
next_batch_to_attest: n,
}))
}
}
67 changes: 50 additions & 17 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
//! High-level tests for `Executor`.
use std::sync::atomic::AtomicU64;
use std::sync::{atomic::AtomicU64, Mutex};

use super::*;
use attestation::{AttestationStatusClient, AttestationStatusRunner};
use rand::Rng as _;
use tracing::Instrument as _;
use zksync_concurrency::{sync, testonly::abort_on_panic};
use zksync_consensus_bft as bft;
use zksync_consensus_network::testonly::{new_configs, new_fullnode};
use zksync_consensus_network::{
gossip::AttestationStatus,
testonly::{new_configs, new_fullnode},
};
use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber};
use zksync_consensus_storage::{
testonly::{in_memory, TestMemoryStorage},
Expand All @@ -32,8 +35,11 @@ fn config(cfg: &network::Config) -> Config {

/// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch]
/// that will never be updated.
fn never_attest() -> Arc<AttestationStatusWatch> {
Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0)))
fn never_attest(genesis: &validator::Genesis) -> Arc<AttestationStatusWatch> {
Arc::new(AttestationStatusWatch::new(
genesis.hash(),
attester::BatchNumber::default(),
))
}

fn validator(
Expand All @@ -42,6 +48,7 @@ fn validator(
batch_store: Arc<BatchStore>,
replica_store: impl ReplicaStore,
) -> Executor {
let attestation_status = never_attest(block_store.genesis());
Executor {
config: config(cfg),
block_store,
Expand All @@ -52,7 +59,7 @@ fn validator(
payload_manager: Box::new(bft::testonly::RandomPayload(1000)),
}),
attester: None,
attestation_status: never_attest(),
attestation_status,
}
}

Expand All @@ -61,13 +68,14 @@ fn fullnode(
block_store: Arc<BlockStore>,
batch_store: Arc<BatchStore>,
) -> Executor {
let attestation_status = never_attest(block_store.genesis());
Executor {
config: config(cfg),
block_store,
batch_store,
validator: None,
attester: None,
attestation_status: never_attest(),
attestation_status,
}
}

Expand Down Expand Up @@ -322,18 +330,22 @@ async fn test_attestation_status_runner() {
abort_on_panic();
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(5));
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
let rng = &mut ctx.rng();

let genesis: attester::GenesisHash = rng.gen();

#[derive(Default)]
#[derive(Clone)]
struct MockAttestationStatus {
batch_number: AtomicU64,
genesis: Arc<Mutex<attester::GenesisHash>>,
batch_number: Arc<AtomicU64>,
}

#[async_trait::async_trait]
impl AttestationStatusClient for MockAttestationStatus {
async fn next_batch_to_attest(
async fn attestation_status(
&self,
_ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
) -> ctx::Result<Option<AttestationStatus>> {
let curr = self
.batch_number
.fetch_add(1u64, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -342,16 +354,25 @@ async fn test_attestation_status_runner() {
Ok(None)
} else {
// The first actual result will be 1 on the 2nd poll.
Ok(Some(attester::BatchNumber(curr)))
let status = AttestationStatus {
genesis: *self.genesis.lock().unwrap(),
next_batch_to_attest: attester::BatchNumber(curr),
};
Ok(Some(status))
}
}
}

scope::run!(ctx, |ctx, s| async {
let res = scope::run!(ctx, |ctx, s| async {
let client = MockAttestationStatus {
genesis: Arc::new(Mutex::new(genesis)),
batch_number: Arc::new(AtomicU64::default()),
};
let (status, runner) = AttestationStatusRunner::init(
ctx,
Box::new(MockAttestationStatus::default()),
Box::new(client.clone()),
time::Duration::milliseconds(100),
genesis,
)
.await
.unwrap();
Expand All @@ -364,15 +385,27 @@ async fn test_attestation_status_runner() {
let status = sync::changed(ctx, &mut recv_status).await?;
assert_eq!(status.next_batch_to_attest.0, 1);
}
// Now start polling for new values.
s.spawn_bg(runner.run(ctx));
// Now start polling for new values. Starting in the foreground because we want it to fail in the end.
s.spawn(runner.run(ctx));
// Check that polling sets the value.
{
let status = sync::changed(ctx, &mut recv_status).await?;
assert_eq!(status.next_batch_to_attest.0, 2);
}
// Change the genesis returned by the client. It should cause the scope to fail.
{
let mut genesis = client.genesis.lock().unwrap();
*genesis = rng.gen();
}
Ok(())
})
.await
.unwrap();
.await;

match res {
Ok(()) => panic!("expected to fail when the genesis changed"),
Err(e) => assert!(
e.to_string().contains("genesis changed"),
"only expect failures due to genesis change"
),
}
}
46 changes: 43 additions & 3 deletions node/actors/network/src/gossip/attestation_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ use zksync_consensus_roles::attester;
use crate::watch::Watch;

/// Coordinate the attestation by showing the status as seen by the main node.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AttestationStatus {
/// Next batch number where voting is expected.
///
/// The node is expected to poll the main node during initialization until
/// the batch to start from is established.
pub next_batch_to_attest: attester::BatchNumber,
/// The hash of the genesis of the chain to which the L1 batches belong.
///
/// We don't expect to handle a regenesis on the fly without restarting the
/// node, so this value is not expected to change; it's here only to stop
/// any attempt at updating the status with a batch number that refers
/// to a different fork.
pub genesis: attester::GenesisHash,
}

/// The subscription over the attestation status which voters can monitor for change.
Expand All @@ -31,8 +38,12 @@ impl fmt::Debug for AttestationStatusWatch {

impl AttestationStatusWatch {
/// Create a new watch going from a specific batch number.
pub fn new(next_batch_to_attest: attester::BatchNumber) -> Self {
pub fn new(
genesis: attester::GenesisHash,
next_batch_to_attest: attester::BatchNumber,
) -> Self {
Self(Watch::new(AttestationStatus {
genesis,
next_batch_to_attest,
}))
}
Expand All @@ -43,14 +54,43 @@ impl AttestationStatusWatch {
}

/// Set the next batch number to attest on and notify subscribers it changed.
pub async fn update(&self, next_batch_to_attest: attester::BatchNumber) {
///
/// Fails if the genesis we want to update to is not the same as the watch was started with,
/// because the rest of the system is not expected to be able to handle reorgs without a
/// restart of the node.
pub async fn update(
&self,
genesis: attester::GenesisHash,
next_batch_to_attest: attester::BatchNumber,
) -> anyhow::Result<()> {
let this = self.0.lock().await;
{
let status = this.borrow();
anyhow::ensure!(
status.genesis == genesis,
"the attestation status genesis changed: {:?} -> {:?}",
status.genesis,
genesis
);
// The next batch to attest moving backwards could cause the voting process
// to get stuck due to the way gossiping works and the BatchVotes discards
// votes below the expected minimum: even if we clear the votes, we might
// not get them again from any peer. By returning an error we can cause
// the node to be restarted and connections re-established for fresh gossip.
anyhow::ensure!(
status.next_batch_to_attest <= next_batch_to_attest,
"next batch to attest moved backwards: {} -> {}",
status.next_batch_to_attest,
next_batch_to_attest
);
}
this.send_if_modified(|status| {
if status.next_batch_to_attest == next_batch_to_attest {
return false;
}
status.next_batch_to_attest = next_batch_to_attest;
true
});
Ok(())
}
}
4 changes: 3 additions & 1 deletion node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
//! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to
//! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip
//! network graph (minimize its diameter, increase connectedness).
pub use self::attestation_status::{AttestationStatusReceiver, AttestationStatusWatch};
pub use self::attestation_status::{
AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch,
};
pub use self::batch_votes::BatchVotesPublisher;
use self::batch_votes::BatchVotesWatch;
use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats};
Expand Down
Loading

0 comments on commit ecbabff

Please sign in to comment.