Skip to content

Commit

Permalink
Made consensus actor write blocks directly to storage (#38)
Browse files Browse the repository at this point in the history
Consensus actor has access to storage anyway and storage supports
observing its state.
Having a dedicated channel for broadcasting final blocks is redundant in
this setup.
  • Loading branch information
pompon0 authored Nov 20, 2023
1 parent 467de55 commit 9f58e68
Show file tree
Hide file tree
Showing 19 changed files with 172 additions and 262 deletions.
3 changes: 0 additions & 3 deletions node/actors/bft/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Input and output messages for the Consensus actor. These are processed by the executor actor.

use zksync_consensus_network::io::{ConsensusInputMessage, ConsensusReq};
use zksync_consensus_roles::validator;

/// All the messages that other actors can send to the Consensus actor.
#[derive(Debug)]
Expand All @@ -15,8 +14,6 @@ pub enum InputMessage {
pub enum OutputMessage {
/// Message types to the Network actor.
Network(ConsensusInputMessage),
/// Message types to the Sync actor.
FinalizedBlock(validator::FinalBlock),
}

impl From<ConsensusInputMessage> for OutputMessage {
Expand Down
14 changes: 8 additions & 6 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use inner::ConsensusInner;
use tracing::{info, instrument};
use zksync_concurrency::ctx;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::FallbackReplicaStateStore;
use zksync_consensus_storage::ReplicaStore;
use zksync_consensus_utils::pipe::ActorPipe;

mod inner;
Expand Down Expand Up @@ -53,7 +53,7 @@ impl Consensus {
pipe: ActorPipe<InputMessage, OutputMessage>,
secret_key: validator::SecretKey,
validator_set: validator::ValidatorSet,
storage: FallbackReplicaStateStore,
storage: ReplicaStore,
) -> anyhow::Result<Self> {
Ok(Consensus {
inner: ConsensusInner {
Expand All @@ -69,7 +69,7 @@ impl Consensus {
/// Starts the Consensus actor. It will start running, processing incoming messages and
/// sending output messages. This is a blocking method.
#[instrument(level = "trace", ret)]
pub fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
info!(
"Starting consensus actor {:?}",
self.inner.secret_key.public()
Expand All @@ -78,6 +78,7 @@ impl Consensus {
// We need to start the replica before processing inputs.
self.replica
.start(ctx, &self.inner)
.await
.context("replica.start()")?;

// This is the infinite loop where the consensus actually runs. The validator waits for either
Expand All @@ -87,7 +88,7 @@ impl Consensus {
.inner
.pipe
.recv(&ctx.with_deadline(self.replica.timeout_deadline))
.block()
.await
.ok();

// We check if the context is active before processing the input. If the context is not active,
Expand All @@ -114,15 +115,16 @@ impl Consensus {
validator::ConsensusMsg::LeaderPrepare(_)
| validator::ConsensusMsg::LeaderCommit(_) => {
self.replica
.process_input(ctx, &self.inner, Some(req.msg))?;
.process_input(ctx, &self.inner, Some(req.msg))
.await?;
}
}
// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
}
None => {
self.replica.process_input(ctx, &self.inner, None)?;
self.replica.process_input(ctx, &self.inner, None).await?;
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/actors/bft/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub(crate) struct ConsensusMetrics {
/// Latency of processing messages by the leader.
#[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)]
pub(crate) leader_processing_latency: Family<ProcessingLatencyLabels, Histogram<Duration>>,
/// Number of the last finalized block observed by the node.
pub(crate) finalized_block_number: Gauge<u64>,
}

/// Global instance of [`ConsensusMetrics`].
Expand Down
22 changes: 16 additions & 6 deletions node/actors/bft/src/replica/block.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
use super::StateMachine;
use crate::{inner::ConsensusInner, io::OutputMessage};
use crate::inner::ConsensusInner;
use anyhow::Context as _;
use tracing::{info, instrument};
use zksync_concurrency::ctx;
use zksync_consensus_roles::validator;

impl StateMachine {
/// Tries to build a finalized block from the given CommitQC. We simply search our
/// block proposal cache for the matching block, and if we find it we build the block.
/// If this method succeeds, it sends the finalized block to the executor.
#[instrument(level = "trace", ret)]
pub(crate) fn build_block(
pub(crate) async fn save_block(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
commit_qc: &validator::CommitQC,
) {
) -> anyhow::Result<()> {
// TODO(gprusak): for availability of finalized blocks,
// replicas should be able to broadcast highest quorums without
// the corresponding block (same goes for synchronization).
let Some(cache) = self
.block_proposal_cache
.get(&commit_qc.message.proposal.number)
else {
return;
return Ok(());
};
let Some(payload) = cache.get(&commit_qc.message.proposal.payload) else {
return;
return Ok(());
};
let block = validator::FinalBlock {
header: commit_qc.message.proposal,
Expand All @@ -35,7 +38,14 @@ impl StateMachine {
"Finalized a block!\nFinal block: {:#?}",
block.header.hash()
);
self.storage
.put_block(ctx, &block)
.await
.context("store.put_block()")?;

consensus.pipe.send(OutputMessage::FinalizedBlock(block));
let number_metric = &crate::metrics::METRICS.finalized_block_number;
let current_number = number_metric.get();
number_metric.set(current_number.max(block.header.number.0));
Ok(())
}
}
6 changes: 4 additions & 2 deletions node/actors/bft/src/replica/leader_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl StateMachine {
/// Processes a leader commit message. We can approve this leader message even if we
/// don't have the block proposal stored. It is enough to see the justification.
#[instrument(level = "trace", err)]
pub(crate) fn process_leader_commit(
pub(crate) async fn process_leader_commit(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
Expand Down Expand Up @@ -86,7 +86,8 @@ impl StateMachine {
// ----------- All checks finished. Now we process the message. --------------

// Try to create a finalized block with this CommitQC and our block proposal cache.
self.build_block(consensus, &message.justification);
self.save_block(ctx, consensus, &message.justification)
.await?;

// Update the state machine. We don't update the view and phase (or backup our state) here
// because we will do it when we start the new view.
Expand All @@ -97,6 +98,7 @@ impl StateMachine {
// Start a new view. But first we skip to the view of this message.
self.view = view;
self.start_new_view(ctx, consensus)
.await
.context("start_new_view()")?;

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions node/actors/bft/src/replica/leader_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub(crate) enum Error {
impl StateMachine {
/// Processes a leader prepare message.
#[instrument(level = "trace", ret)]
pub(crate) fn process_leader_prepare(
pub(crate) async fn process_leader_prepare(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
Expand Down Expand Up @@ -189,7 +189,7 @@ impl StateMachine {

// Try to create a finalized block with this CommitQC and our block proposal cache.
// This gives us another chance to finalize a block that we may have missed before.
self.build_block(consensus, &highest_qc);
self.save_block(ctx, consensus, &highest_qc).await?;

// ----------- Checking the block proposal --------------

Expand Down Expand Up @@ -276,7 +276,7 @@ impl StateMachine {
}

// Backup our state.
self.backup_state(ctx).context("backup_state()")?;
self.backup_state(ctx).await.context("backup_state()")?;

// Send the replica message to the leader.
let output_message = ConsensusInputMessage {
Expand Down
4 changes: 2 additions & 2 deletions node/actors/bft/src/replica/new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use zksync_consensus_roles::validator;
impl StateMachine {
/// This blocking method is used whenever we start a new view.
#[instrument(level = "trace", err)]
pub(crate) fn start_new_view(
pub(crate) async fn start_new_view(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
Expand All @@ -27,7 +27,7 @@ impl StateMachine {
.retain(|k, _| k > &self.high_qc.message.proposal.number);

// Backup our state.
self.backup_state(ctx).context("backup_state")?;
self.backup_state(ctx).await.context("backup_state")?;

// Send the replica message to the next leader.
let output_message = ConsensusInputMessage {
Expand Down
83 changes: 38 additions & 45 deletions node/actors/bft/src/replica/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use crate::{metrics, ConsensusInner};
use anyhow::Context as _;
use std::collections::{BTreeMap, HashMap};
use tracing::instrument;
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _, scope, time};
use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _, time};
use zksync_consensus_roles::validator;
use zksync_consensus_storage as storage;
use zksync_consensus_storage::{FallbackReplicaStateStore, StorageError};
use zksync_consensus_storage::ReplicaStore;

/// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible
/// for validating and voting on blocks. When participating in consensus we are always a replica.
Expand All @@ -24,17 +24,15 @@ pub(crate) struct StateMachine {
BTreeMap<validator::BlockNumber, HashMap<validator::PayloadHash, validator::Payload>>,
/// The deadline to receive an input message.
pub(crate) timeout_deadline: time::Deadline,
/// A reference to the storage module. We use it to backup the replica state.
pub(crate) storage: FallbackReplicaStateStore,
/// A reference to the storage module. We use it to backup the replica state and store
/// finalized blocks.
pub(crate) storage: ReplicaStore,
}

impl StateMachine {
/// Creates a new StateMachine struct. We try to recover a past state from the storage module,
/// otherwise we initialize the state machine with whatever head block we have.
pub(crate) async fn new(
ctx: &ctx::Ctx,
storage: FallbackReplicaStateStore,
) -> anyhow::Result<Self> {
pub(crate) async fn new(ctx: &ctx::Ctx, storage: ReplicaStore) -> anyhow::Result<Self> {
let backup = storage.replica_state(ctx).await?;
let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new();
for proposal in backup.proposals {
Expand All @@ -59,13 +57,14 @@ impl StateMachine {
/// we are able to process inputs. If we are in the genesis block, then we start a new view,
/// this will kick start the consensus algorithm. Otherwise, we just start the timer.
#[instrument(level = "trace", ret)]
pub(crate) fn start(
pub(crate) async fn start(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
) -> anyhow::Result<()> {
if self.view == validator::ViewNumber(0) {
self.start_new_view(ctx, consensus)
.await
.context("start_new_view")
} else {
self.reset_timer(ctx);
Expand All @@ -77,7 +76,7 @@ impl StateMachine {
/// the main entry point for the state machine. We need read-access to the inner consensus struct.
/// As a result, we can modify our state machine or send a message to the executor.
#[instrument(level = "trace", ret)]
pub(crate) fn process_input(
pub(crate) async fn process_input(
&mut self,
ctx: &ctx::Ctx,
consensus: &ConsensusInner,
Expand All @@ -86,38 +85,42 @@ impl StateMachine {
let Some(signed_msg) = input else {
tracing::warn!("We timed out before receiving a message.");
// Start new view.
self.start_new_view(ctx, consensus)?;
self.start_new_view(ctx, consensus).await?;
return Ok(());
};

let now = ctx.now();
let label = match &signed_msg.msg {
validator::ConsensusMsg::LeaderPrepare(_) => {
let res =
match self.process_leader_prepare(ctx, consensus, signed_msg.cast().unwrap()) {
Err(super::leader_prepare::Error::Internal(err)) => {
return Err(err).context("process_leader_prepare()")
}
Err(err) => {
tracing::warn!("process_leader_prepare(): {err:#}");
Err(())
}
Ok(()) => Ok(()),
};
let res = match self
.process_leader_prepare(ctx, consensus, signed_msg.cast().unwrap())
.await
{
Err(super::leader_prepare::Error::Internal(err)) => {
return Err(err).context("process_leader_prepare()")
}
Err(err) => {
tracing::warn!("process_leader_prepare(): {err:#}");
Err(())
}
Ok(()) => Ok(()),
};
metrics::ConsensusMsgLabel::LeaderPrepare.with_result(&res)
}
validator::ConsensusMsg::LeaderCommit(_) => {
let res =
match self.process_leader_commit(ctx, consensus, signed_msg.cast().unwrap()) {
Err(super::leader_commit::Error::Internal(err)) => {
return Err(err).context("process_leader_commit()")
}
Err(err) => {
tracing::warn!("process_leader_commit(): {err:#}");
Err(())
}
Ok(()) => Ok(()),
};
let res = match self
.process_leader_commit(ctx, consensus, signed_msg.cast().unwrap())
.await
{
Err(super::leader_commit::Error::Internal(err)) => {
return Err(err).context("process_leader_commit()")
}
Err(err) => {
tracing::warn!("process_leader_commit(): {err:#}");
Err(())
}
Ok(()) => Ok(()),
};
metrics::ConsensusMsgLabel::LeaderCommit.with_result(&res)
}
_ => unreachable!(),
Expand All @@ -127,7 +130,7 @@ impl StateMachine {
}

/// Backups the replica state to disk.
pub(crate) fn backup_state(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
pub(crate) async fn backup_state(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let mut proposals = vec![];
for (number, payloads) in &self.block_proposal_cache {
proposals.extend(payloads.values().map(|p| storage::Proposal {
Expand All @@ -142,17 +145,7 @@ impl StateMachine {
high_qc: self.high_qc.clone(),
proposals,
};

let store_result = scope::run_blocking!(ctx, |ctx, s| {
let backup_future = self.storage.put_replica_state(ctx, &backup);
s.spawn(backup_future).join(ctx).block()?;
Ok(())
});
match store_result {
Ok(()) => { /* Everything went fine */ }
Err(StorageError::Canceled(_)) => tracing::trace!("Storing replica state was canceled"),
Err(StorageError::Database(err)) => return Err(err),
}
self.storage.put_replica_state(ctx, &backup).await?;
Ok(())
}
}
3 changes: 2 additions & 1 deletion node/actors/bft/src/replica/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ async fn start_new_view_not_leader() {
consensus.replica.high_qc.message.view = ViewNumber(0);

scope::run!(ctx, |ctx, s| {
s.spawn_blocking(|| {
s.spawn(async {
consensus
.replica
.start_new_view(ctx, &consensus.inner)
.await
.unwrap();
Ok(())
})
Expand Down
4 changes: 2 additions & 2 deletions node/actors/bft/src/testonly/make.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
use std::sync::Arc;
use zksync_concurrency::ctx;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{FallbackReplicaStateStore, InMemoryStorage};
use zksync_consensus_storage::{InMemoryStorage, ReplicaStore};
use zksync_consensus_utils::pipe::{self, DispatcherPipe};

/// This creates a mock Consensus struct for unit tests.
Expand All @@ -27,7 +27,7 @@ pub async fn make_consensus(
consensus_pipe,
key.clone(),
validator_set.clone(),
FallbackReplicaStateStore::from_store(Arc::new(storage)),
ReplicaStore::from_store(Arc::new(storage)),
);
let consensus = consensus
.await
Expand Down
Loading

0 comments on commit 9f58e68

Please sign in to comment.