Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add GenesisHash to AttestationStatusClient and reject unexpected updates (BFT-496) #167

Merged
merged 5 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 42 additions & 23 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Context;
use std::sync::Arc;
use zksync_concurrency::{ctx, sync, time};
use zksync_consensus_network::gossip::{
AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch, BatchVotesPublisher,
};
use zksync_consensus_roles::attester;
use zksync_consensus_storage::{BatchStore, BlockStore};
Expand Down Expand Up @@ -120,10 +120,10 @@ pub trait AttestationStatusClient: 'static + Send + Sync {
///
/// The API might return an error while genesis is being created, which we represent with `None`
/// here and mean that we'll have to try again later.
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>>;
///
/// The genesis hash is returned along with the new batch number to facilitate detecting reorgs
/// on the main node as soon as possible and prevent inconsistent state from entering the system.
async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<AttestationStatus>>;
}

/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch].
Expand All @@ -143,8 +143,12 @@ impl AttestationStatusRunner {
ctx: &ctx::Ctx,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
) -> ctx::OrCanceled<(Arc<AttestationStatusWatch>, Self)> {
let status = Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0)));
genesis: attester::GenesisHash,
) -> ctx::Result<(Arc<AttestationStatusWatch>, Self)> {
let status = Arc::new(AttestationStatusWatch::new(
genesis,
attester::BatchNumber::default(),
));
let mut runner = Self {
status: status.clone(),
client,
Expand All @@ -157,37 +161,46 @@ impl AttestationStatusRunner {
/// Initialize an [AttestationStatusWatch] based on a [BatchStore] and return it along with the [AttestationStatusRunner].
pub async fn init_from_store(
ctx: &ctx::Ctx,
store: Arc<BatchStore>,
batch_store: Arc<BatchStore>,
poll_interval: time::Duration,
) -> ctx::OrCanceled<(Arc<AttestationStatusWatch>, Self)> {
genesis: attester::GenesisHash,
) -> ctx::Result<(Arc<AttestationStatusWatch>, Self)> {
Self::init(
ctx,
Box::new(LocalAttestationStatusClient(store)),
Box::new(LocalAttestationStatusClient {
genesis,
batch_store,
}),
poll_interval,
genesis,
)
.await
}

/// Run the poll loop.
pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let _ = self.poll_forever(ctx).await;
Ok(())
match self.poll_forever(ctx).await {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
}
}

/// Poll the client forever in a loop or until canceled.
async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> {
async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
self.poll_until_some(ctx).await?;
ctx.sleep(self.poll_interval).await?;
}
}

/// Poll the client until some data is returned and write it into the status.
async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> {
async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
match self.client.next_batch_to_attest(ctx).await {
Ok(Some(next_batch_to_attest)) => {
self.status.update(next_batch_to_attest).await;
match self.client.attestation_status(ctx).await {
Ok(Some(status)) => {
self.status
.update(status.genesis, status.next_batch_to_attest)
.await?;
return Ok(());
}
Ok(None) => {
Expand All @@ -206,14 +219,20 @@ impl AttestationStatusRunner {
}

/// Implement the attestation status for the main node by returning the next to vote on from the [BatchStore].
struct LocalAttestationStatusClient(Arc<BatchStore>);
struct LocalAttestationStatusClient {
/// We don't expect the genesis to change while the main node is running,
/// so we can just cache the genesis hash and return it for every request.
genesis: attester::GenesisHash,
batch_store: Arc<BatchStore>,
}

#[async_trait::async_trait]
impl AttestationStatusClient for LocalAttestationStatusClient {
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
self.0.next_batch_to_attest(ctx).await
async fn attestation_status(&self, ctx: &ctx::Ctx) -> ctx::Result<Option<AttestationStatus>> {
let batch_number = self.batch_store.next_batch_to_attest(ctx).await?;
Ok(batch_number.map(|n| AttestationStatus {
genesis: self.genesis,
next_batch_to_attest: n,
}))
}
}
67 changes: 50 additions & 17 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
//! High-level tests for `Executor`.
use std::sync::atomic::AtomicU64;
use std::sync::{atomic::AtomicU64, Mutex};

use super::*;
use attestation::{AttestationStatusClient, AttestationStatusRunner};
use rand::Rng as _;
use tracing::Instrument as _;
use zksync_concurrency::{sync, testonly::abort_on_panic};
use zksync_consensus_bft as bft;
use zksync_consensus_network::testonly::{new_configs, new_fullnode};
use zksync_consensus_network::{
gossip::AttestationStatus,
testonly::{new_configs, new_fullnode},
};
use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber};
use zksync_consensus_storage::{
testonly::{in_memory, TestMemoryStorage},
Expand All @@ -32,8 +35,11 @@ fn config(cfg: &network::Config) -> Config {

/// The test executors below are not running with attesters, so we just create an [AttestationStatusWatch]
/// that will never be updated.
fn never_attest() -> Arc<AttestationStatusWatch> {
Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0)))
fn never_attest(genesis: &validator::Genesis) -> Arc<AttestationStatusWatch> {
Arc::new(AttestationStatusWatch::new(
genesis.hash(),
attester::BatchNumber::default(),
))
}

fn validator(
Expand All @@ -42,6 +48,7 @@ fn validator(
batch_store: Arc<BatchStore>,
replica_store: impl ReplicaStore,
) -> Executor {
let attestation_status = never_attest(block_store.genesis());
Executor {
config: config(cfg),
block_store,
Expand All @@ -52,7 +59,7 @@ fn validator(
payload_manager: Box::new(bft::testonly::RandomPayload(1000)),
}),
attester: None,
attestation_status: never_attest(),
attestation_status,
}
}

Expand All @@ -61,13 +68,14 @@ fn fullnode(
block_store: Arc<BlockStore>,
batch_store: Arc<BatchStore>,
) -> Executor {
let attestation_status = never_attest(block_store.genesis());
Executor {
config: config(cfg),
block_store,
batch_store,
validator: None,
attester: None,
attestation_status: never_attest(),
attestation_status,
}
}

Expand Down Expand Up @@ -322,18 +330,22 @@ async fn test_attestation_status_runner() {
abort_on_panic();
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(5));
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
let rng = &mut ctx.rng();

let genesis: attester::GenesisHash = rng.gen();

#[derive(Default)]
#[derive(Clone)]
struct MockAttestationStatus {
batch_number: AtomicU64,
genesis: Arc<Mutex<attester::GenesisHash>>,
batch_number: Arc<AtomicU64>,
}

#[async_trait::async_trait]
impl AttestationStatusClient for MockAttestationStatus {
async fn next_batch_to_attest(
async fn attestation_status(
&self,
_ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
) -> ctx::Result<Option<AttestationStatus>> {
let curr = self
.batch_number
.fetch_add(1u64, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -342,16 +354,25 @@ async fn test_attestation_status_runner() {
Ok(None)
} else {
// The first actual result will be 1 on the 2nd poll.
Ok(Some(attester::BatchNumber(curr)))
let status = AttestationStatus {
genesis: *self.genesis.lock().unwrap(),
next_batch_to_attest: attester::BatchNumber(curr),
};
Ok(Some(status))
}
}
}

scope::run!(ctx, |ctx, s| async {
let res = scope::run!(ctx, |ctx, s| async {
let client = MockAttestationStatus {
genesis: Arc::new(Mutex::new(genesis)),
batch_number: Arc::new(AtomicU64::default()),
};
let (status, runner) = AttestationStatusRunner::init(
ctx,
Box::new(MockAttestationStatus::default()),
Box::new(client.clone()),
time::Duration::milliseconds(100),
genesis,
)
.await
.unwrap();
Expand All @@ -364,15 +385,27 @@ async fn test_attestation_status_runner() {
let status = sync::changed(ctx, &mut recv_status).await?;
assert_eq!(status.next_batch_to_attest.0, 1);
}
// Now start polling for new values.
s.spawn_bg(runner.run(ctx));
// Now start polling for new values. Starting in the foreground because we want it to fail in the end.
s.spawn(runner.run(ctx));
// Check that polling sets the value.
{
let status = sync::changed(ctx, &mut recv_status).await?;
assert_eq!(status.next_batch_to_attest.0, 2);
}
// Change the genesis returned by the client. It should cause the scope to fail.
{
let mut genesis = client.genesis.lock().unwrap();
*genesis = rng.gen();
}
Ok(())
})
.await
.unwrap();
.await;

match res {
Ok(()) => panic!("expected to fail when the genesis changed"),
Err(e) => assert!(
e.to_string().contains("genesis changed"),
"only expect failures due to genesis change"
),
}
}
46 changes: 43 additions & 3 deletions node/actors/network/src/gossip/attestation_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ use zksync_consensus_roles::attester;
use crate::watch::Watch;

/// Coordinate the attestation by showing the status as seen by the main node.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AttestationStatus {
/// Next batch number where voting is expected.
///
/// The node is expected to poll the main node during initialization until
/// the batch to start from is established.
pub next_batch_to_attest: attester::BatchNumber,
/// The hash of the genesis of the chain to which the L1 batches belong.
///
/// We don't expect to handle a regenesis on the fly without restarting the
/// node, so this value is not expected to change; it's here only to stop
/// any attempt at updating the status with a batch number that refers
/// to a different fork.
pub genesis: attester::GenesisHash,
}

/// The subscription over the attestation status which voters can monitor for change.
Expand All @@ -31,8 +38,12 @@ impl fmt::Debug for AttestationStatusWatch {

impl AttestationStatusWatch {
/// Create a new watch going from a specific batch number.
pub fn new(next_batch_to_attest: attester::BatchNumber) -> Self {
pub fn new(
genesis: attester::GenesisHash,
next_batch_to_attest: attester::BatchNumber,
) -> Self {
Self(Watch::new(AttestationStatus {
genesis,
next_batch_to_attest,
}))
}
Expand All @@ -43,14 +54,43 @@ impl AttestationStatusWatch {
}

/// Set the next batch number to attest on and notify subscribers it changed.
pub async fn update(&self, next_batch_to_attest: attester::BatchNumber) {
///
/// Fails if the genesis we want to update to is not the same as the watch was started with,
/// because the rest of the system is not expected to be able to handle reorgs without a
/// restart of the node.
pub async fn update(
&self,
genesis: attester::GenesisHash,
next_batch_to_attest: attester::BatchNumber,
) -> anyhow::Result<()> {
let this = self.0.lock().await;
{
let status = this.borrow();
anyhow::ensure!(
status.genesis == genesis,
"the attestation status genesis changed: {:?} -> {:?}",
status.genesis,
genesis
);
// The next batch to attest moving backwards could cause the voting process
// to get stuck due to the way gossiping works and the BatchVotes discards
// votes below the expected minimum: even if we clear the votes, we might
// not get them again from any peer. By returning an error we can cause
// the node to be restarted and connections re-established for fresh gossip.
anyhow::ensure!(
status.next_batch_to_attest <= next_batch_to_attest,
"next batch to attest moved backwards: {} -> {}",
status.next_batch_to_attest,
next_batch_to_attest
);
}
this.send_if_modified(|status| {
if status.next_batch_to_attest == next_batch_to_attest {
return false;
}
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
status.next_batch_to_attest = next_batch_to_attest;
true
});
Ok(())
}
}
4 changes: 3 additions & 1 deletion node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
//! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to
//! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip
//! network graph (minimize its diameter, increase connectedness).
pub use self::attestation_status::{AttestationStatusReceiver, AttestationStatusWatch};
pub use self::attestation_status::{
AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch,
};
pub use self::batch_votes::BatchVotesPublisher;
use self::batch_votes::BatchVotesWatch;
use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats};
Expand Down
Loading
Loading