Skip to content

Commit

Permalink
BFT-496: Add GenesisHash to AttestationStatusClient and check it in A…
Browse files Browse the repository at this point in the history
…ttestationStatusRunner
  • Loading branch information
aakoshh committed Aug 1, 2024
1 parent aa29e87 commit d36057e
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 43 deletions.
49 changes: 34 additions & 15 deletions node/actors/executor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,13 @@ pub trait AttestationStatusClient: 'static + Send + Sync {
///
/// The API might return an error while genesis is being created, which we represent with `None`
/// here and mean that we'll have to try again later.
async fn next_batch_to_attest(
///
/// The genesis hash is returned along with the new batch number to facilitate detecting reorgs
/// on the main node as soon as
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>>;
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>>;
}

/// Use an [AttestationStatusClient] to periodically poll the main node and update the [AttestationStatusWatch].
Expand All @@ -133,6 +136,7 @@ pub struct AttestationStatusRunner {
status: Arc<AttestationStatusWatch>,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
genesis: attester::GenesisHash,
}

impl AttestationStatusRunner {
Expand All @@ -143,12 +147,14 @@ impl AttestationStatusRunner {
ctx: &ctx::Ctx,
client: Box<dyn AttestationStatusClient>,
poll_interval: time::Duration,
) -> ctx::OrCanceled<(Arc<AttestationStatusWatch>, Self)> {
let status = Arc::new(AttestationStatusWatch::new(attester::BatchNumber(0)));
genesis: attester::GenesisHash,
) -> ctx::Result<(Arc<AttestationStatusWatch>, Self)> {
let status = Arc::new(AttestationStatusWatch::new(attester::BatchNumber::default()));
let mut runner = Self {
status: status.clone(),
client,
poll_interval,
genesis,
};
runner.poll_until_some(ctx).await?;
Ok((status, runner))
Expand All @@ -159,35 +165,48 @@ impl AttestationStatusRunner {
ctx: &ctx::Ctx,
store: Arc<BatchStore>,
poll_interval: time::Duration,
) -> ctx::OrCanceled<(Arc<AttestationStatusWatch>, Self)> {
genesis: attester::GenesisHash,
) -> ctx::Result<(Arc<AttestationStatusWatch>, Self)> {
Self::init(
ctx,
Box::new(LocalAttestationStatusClient(store)),
poll_interval,
genesis,
)
.await
}

/// Run the poll loop.
pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let _ = self.poll_forever(ctx).await;
Ok(())
match self.poll_forever(ctx).await {
Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()),
Err(ctx::Error::Internal(err)) => Err(err),
}
}

/// Poll the client forever in a loop or until canceled.
async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> {
async fn poll_forever(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
self.poll_until_some(ctx).await?;
ctx.sleep(self.poll_interval).await?;
}
}

/// Poll the client until some data is returned and write it into the status.
async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<()> {
async fn poll_until_some(&mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
match self.client.next_batch_to_attest(ctx).await {
Ok(Some(next_batch_to_attest)) => {
self.status.update(next_batch_to_attest).await;
match self.client.attestation_status(ctx).await {
Ok(Some((genesis, batch_number))) => {
// A change in the genesis wouldn't necessarily be a problem, but at the moment
// the machinery the relies on the status notification cannot handle reorgs
// without restarting the application. On external nodes this happens by polling
// the main node API for any update in the genesis; on the main node it shouldn't
// happen at the moment. If the downstream expectation change we can remove this
// and let the updated status propagate through the system.
if self.genesis != genesis {
return Err(anyhow::format_err!("the attestation status genesis changed since we started the runner: {:?} -> {:?}", self.genesis, genesis).into());
}
self.status.update(batch_number).await;
return Ok(());
}
Ok(None) => {
Expand All @@ -210,10 +229,10 @@ struct LocalAttestationStatusClient(Arc<BatchStore>);

#[async_trait::async_trait]
impl AttestationStatusClient for LocalAttestationStatusClient {
async fn next_batch_to_attest(
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
self.0.next_batch_to_attest(ctx).await
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>> {
self.0.attestation_status(ctx).await
}
}
48 changes: 36 additions & 12 deletions node/actors/executor/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! High-level tests for `Executor`.
use std::sync::atomic::AtomicU64;
use std::sync::{atomic::AtomicU64, Mutex};

use super::*;
use attestation::{AttestationStatusClient, AttestationStatusRunner};
Expand Down Expand Up @@ -322,18 +322,22 @@ async fn test_attestation_status_runner() {
abort_on_panic();
let _guard = zksync_concurrency::testonly::set_timeout(time::Duration::seconds(5));
let ctx = &ctx::test_root(&ctx::AffineClock::new(10.0));
let rng = &mut ctx.rng();

let genesis: attester::GenesisHash = rng.gen();

#[derive(Default)]
#[derive(Clone)]
struct MockAttestationStatus {
batch_number: AtomicU64,
genesis: Arc<Mutex<attester::GenesisHash>>,
batch_number: Arc<AtomicU64>,
}

#[async_trait::async_trait]
impl AttestationStatusClient for MockAttestationStatus {
async fn next_batch_to_attest(
async fn attestation_status(
&self,
_ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>> {
let curr = self
.batch_number
.fetch_add(1u64, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -342,16 +346,24 @@ async fn test_attestation_status_runner() {
Ok(None)
} else {
// The first actual result will be 1 on the 2nd poll.
Ok(Some(attester::BatchNumber(curr)))
Ok(Some((
*self.genesis.lock().unwrap(),
attester::BatchNumber(curr),
)))
}
}
}

scope::run!(ctx, |ctx, s| async {
let res = scope::run!(ctx, |ctx, s| async {
let client = MockAttestationStatus {
genesis: Arc::new(Mutex::new(genesis)),
batch_number: Arc::new(AtomicU64::default()),
};
let (status, runner) = AttestationStatusRunner::init(
ctx,
Box::new(MockAttestationStatus::default()),
Box::new(client.clone()),
time::Duration::milliseconds(100),
genesis,
)
.await
.unwrap();
Expand All @@ -364,15 +376,27 @@ async fn test_attestation_status_runner() {
let status = sync::changed(ctx, &mut recv_status).await?;
assert_eq!(status.next_batch_to_attest.0, 1);
}
// Now start polling for new values.
s.spawn_bg(runner.run(ctx));
// Now start polling for new values. Starting in the foreground because we want it to fail in the end.
s.spawn(runner.run(ctx));
// Check that polling sets the value.
{
let status = sync::changed(ctx, &mut recv_status).await?;
assert_eq!(status.next_batch_to_attest.0, 2);
}
// Change the genesis returned by the client. It should cause the scope to fail.
{
let mut genesis = client.genesis.lock().unwrap();
*genesis = rng.gen();
}
Ok(())
})
.await
.unwrap();
.await;

match res {
Ok(()) => panic!("expected to fail when the genesis changed"),
Err(e) => assert!(
e.to_string().contains("genesis changed"),
"only expect failures due to genesis change"
),
}
}
16 changes: 15 additions & 1 deletion node/actors/network/src/gossip/attestation_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,27 @@ use zksync_consensus_roles::attester;
use crate::watch::Watch;

/// Coordinate the attestation by showing the status as seen by the main node.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AttestationStatus {
/// Next batch number where voting is expected.
///
/// The node is expected to poll the main node during initialization until
/// the batch to start from is established.
pub next_batch_to_attest: attester::BatchNumber,
// The hash of the genesis of the chain to which the L1 batches belong.
//
// A change in this value would indicate a reorg on the main node.
// On the main node itself this is not expected to change because
// a reorg involves a restart, and a regenesis happens before the
// executor is started. If it could happen, it could be used to
// signal to the `BatchVotes` that votes need to be cleared and
// potentially discarded votes received over gossip would need
// to be re-acquired (which doesn't happen at the moment unless
// the connection is re-established).
//
// It is not added yet as the system is not expected to be able
// to handle changes in the value.
//pub genesis: attester::GenesisHash,
}

/// The subscription over the attestation status which voters can monitor for change.
Expand Down
4 changes: 3 additions & 1 deletion node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
//! Static connections constitute a rigid "backbone" of the gossip network, which is insensitive to
//! eclipse attack. Dynamic connections are supposed to improve the properties of the gossip
//! network graph (minimize its diameter, increase connectedness).
pub use self::attestation_status::{AttestationStatusReceiver, AttestationStatusWatch};
pub use self::attestation_status::{
AttestationStatus, AttestationStatusReceiver, AttestationStatusWatch,
};
pub use self::batch_votes::BatchVotesPublisher;
use self::batch_votes::BatchVotesWatch;
use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats};
Expand Down
4 changes: 2 additions & 2 deletions node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ impl InstanceRunner {
s.spawn_bg(self.net_runner.run(ctx));
s.spawn_bg(async {
loop {
if let Ok(Some(n)) = self.batch_store.next_batch_to_attest(ctx).await {
self.attestation_status.update(n).await;
if let Ok(Some((_, bn))) = self.batch_store.attestation_status(ctx).await {
self.attestation_status.update(bn).await;
}
if ctx.sleep(time::Duration::seconds(1)).await.is_err() {
return Ok(());
Expand Down
21 changes: 13 additions & 8 deletions node/libs/storage/src/batch_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ pub trait PersistentBatchStore: 'static + fmt::Debug + Send + Sync {
/// An external node might never have a complete history of L1 batch QCs. Once the L1 batch is included on L1,
/// the external nodes might use the [attester::SyncBatch] route to obtain them, in which case they will not
/// have a QC and no reason to get them either. The main node, however, will want to have a QC for all batches.
async fn next_batch_to_attest(
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>>;
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>>;

/// Get the L1 batch QC from storage with the highest number.
///
Expand Down Expand Up @@ -276,22 +276,27 @@ impl BatchStore {
}

/// Retrieve the next batch number that doesn't have a QC yet and will need to be signed.
pub async fn next_batch_to_attest(
///
/// The `GenesisHash` is returned just so that the results are atomic and consistent;
/// it is not expected to differ from `BlockStore::genesis()` unless it happened to
/// change between the calls. It is for future compatibility, currently `BlockStore::genesis()`
/// returns a `Genesis` instance cached when the `BlockStore` is created.
pub async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>> {
let t = metrics::PERSISTENT_BATCH_STORE
.next_batch_to_attest_latency
.start();

let batch = self
let status = self
.persistent
.next_batch_to_attest(ctx)
.attestation_status(ctx)
.await
.wrap("persistent.next_batch_to_attest()")?;
.wrap("persistent.attestation_status()")?;

t.observe();
Ok(batch)
Ok(status)
}

/// Retrieve a batch to be signed.
Expand Down
10 changes: 6 additions & 4 deletions node/libs/storage/src/testonly/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,19 @@ impl PersistentBatchStore for BatchStore {
Ok(certs.get(last_batch_number).cloned())
}

async fn next_batch_to_attest(
async fn attestation_status(
&self,
_ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>> {
let batches = self.0.batches.lock().unwrap();
let certs = self.0.certs.lock().unwrap();

Ok(batches
let bn = batches
.iter()
.map(|b| b.number)
.find(|n| !certs.contains_key(n)))
.find(|n| !certs.contains_key(n));

Ok(bn.map(|bn| (self.0.genesis.hash(), bn)))
}

async fn get_batch_to_sign(
Expand Down
1 change: 1 addition & 0 deletions node/tools/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ impl Configs {
ctx,
store.batches.clone(),
time::Duration::seconds(1),
self.app.genesis.hash(),
)
.await?;

Expand Down

0 comments on commit d36057e

Please sign in to comment.