Skip to content

Commit

Permalink
traits for proposing/verifying payload (#42)
Browse files Browse the repository at this point in the history
We need a way to delegate proposing/verifying the block's payload to the
user.
See the related pr: matter-labs/zksync-era#554

Additionally introduced stronger typing for signed messages in bft
tests.

Fixes BFT-388.
  • Loading branch information
pompon0 committed Dec 7, 2023
1 parent f25f078 commit b06ae4b
Show file tree
Hide file tree
Showing 28 changed files with 959 additions and 1,253 deletions.
2 changes: 2 additions & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion node/actors/bft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ zksync_consensus_utils.workspace = true
zksync_protobuf.workspace = true

anyhow.workspace = true
async-trait.workspace = true
once_cell.workspace = true
rand.workspace = true
thiserror.workspace = true
Expand All @@ -25,6 +26,7 @@ vise.workspace = true
[dev-dependencies]
tokio.workspace = true
assert_matches.workspace = true
pretty_assertions.workspace = true

[lints]
workspace = true
workspace = true
2 changes: 1 addition & 1 deletion node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub(crate) enum Error {
}

impl StateMachine {
#[instrument(level = "trace", ret)]
#[instrument(level = "trace", skip(self), ret)]
pub(crate) fn process_replica_commit(
&mut self,
ctx: &ctx::Ctx,
Expand Down
31 changes: 22 additions & 9 deletions node/actors/bft/src/leader/replica_prepare.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use super::StateMachine;
use crate::{inner::ConsensusInner, metrics};
use rand::Rng;
use std::collections::HashMap;
use tracing::instrument;
use zksync_concurrency::ctx;
use zksync_concurrency::{ctx, error::Wrap};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator::{self, ProtocolVersion};

Expand Down Expand Up @@ -68,11 +67,26 @@ pub(crate) enum Error {
/// Invalid `HighQC` message.
#[error("invalid high QC: {0:#}")]
InvalidHighQC(#[source] anyhow::Error),
/// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable.
#[error(transparent)]
Internal(#[from] ctx::Error),
}

impl Wrap for Error {
fn with_wrap<C: std::fmt::Display + Send + Sync + 'static, F: FnOnce() -> C>(
self,
f: F,
) -> Self {
match self {
Error::Internal(err) => Error::Internal(err.with_wrap(f)),
err => err,
}
}
}

impl StateMachine {
#[instrument(level = "trace", ret)]
pub(crate) fn process_replica_prepare(
#[instrument(level = "trace", skip(self), ret)]
pub(crate) async fn process_replica_prepare(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
Expand Down Expand Up @@ -214,11 +228,10 @@ impl StateMachine {
Some(proposal) if proposal != highest_qc.message.proposal => (proposal, None),
// The previous block was finalized, so we can propose a new block.
_ => {
// TODO(bruno): For now we just create a block with a random payload. After we integrate with
// the execution layer we should have a call here to the mempool to get a real payload.
let mut payload = validator::Payload(vec![0; ConsensusInner::PAYLOAD_MAX_SIZE]);
ctx.rng().fill(&mut payload.0[..]);

let payload = self
.payload_source
.propose(ctx, highest_qc.message.proposal.number.next())
.await?;
metrics::METRICS
.leader_proposal_payload_size
.observe(payload.0.len());
Expand Down
35 changes: 24 additions & 11 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use crate::{metrics, ConsensusInner};
use crate::{metrics, ConsensusInner, PayloadSource};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
unreachable,
};
use tracing::instrument;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _, time};
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, time};
use zksync_consensus_roles::validator;

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
/// those messages. When participating in consensus we are not the leader most of the time.
#[derive(Debug)]
pub(crate) struct StateMachine {
/// Payload provider for the new blocks.
pub(crate) payload_source: Arc<dyn PayloadSource>,
/// The current view number. This might not match the replica's view number, we only have this here
/// to make the leader advance monotonically in time and stop it from accepting messages from the past.
pub(crate) view: validator::ViewNumber,
Expand All @@ -36,9 +38,10 @@ pub(crate) struct StateMachine {

impl StateMachine {
/// Creates a new StateMachine struct.
#[instrument(level = "trace", ret)]
pub fn new(ctx: &ctx::Ctx) -> Self {
#[instrument(level = "trace", skip(payload_source))]
pub fn new(ctx: &ctx::Ctx, payload_source: Arc<dyn PayloadSource>) -> Self {
StateMachine {
payload_source,
view: validator::ViewNumber(0),
phase: validator::Phase::Prepare,
phase_start: ctx.now(),
Expand All @@ -51,21 +54,30 @@ impl StateMachine {
/// Process an input message (leaders don't time out waiting for a message). This is the
/// main entry point for the state machine. We need read-access to the inner consensus struct.
/// As a result, we can modify our state machine or send a message to the executor.
#[instrument(level = "trace", ret)]
pub(crate) fn process_input(
#[instrument(level = "trace", skip(self), ret)]
pub(crate) async fn process_input(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
input: validator::Signed<validator::ConsensusMsg>,
) {
) -> ctx::Result<()> {
let now = ctx.now();
let label = match &input.msg {
validator::ConsensusMsg::ReplicaPrepare(_) => {
let res = self
let res = match self
.process_replica_prepare(ctx, consensus, input.cast().unwrap())
.map_err(|err| {
.await
.wrap("process_replica_prepare()")
{
Ok(()) => Ok(()),
Err(super::replica_prepare::Error::Internal(err)) => {
return Err(err);
}
Err(err) => {
tracing::warn!("process_replica_prepare: {err:#}");
});
Err(())
}
};
metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res)
}
validator::ConsensusMsg::ReplicaCommit(_) => {
Expand All @@ -79,5 +91,6 @@ impl StateMachine {
_ => unreachable!(),
};
metrics::METRICS.leader_processing_latency[&label].observe_latency(ctx.now() - now);
Ok(())
}
}
Loading

0 comments on commit b06ae4b

Please sign in to comment.