Skip to content

Commit

Permalink
Make consensus state machine asynchronous (#56)
Browse files Browse the repository at this point in the history
### Goals
* `leader` being unable to block colocated `replica` (and vise versa).
* Having an inbound message queue with the capability to
invalidate/remove messages from the same validator.

### Main changes
* `sync::prunable_mpsc` module has been added to provide a channel-like
FIFO queue with additional functionality for removal of specific pending
values.
* Introduced one background task for each state machine
(`replica`/`leader`), which consumes corresponding messages from the
inbound queue. Top-level processing includes only `send()` operation,
without waiting for the processing response.

### Follow-up:
* Consider additional asynchronous pipelines within each role
* Re-evaluate caching and transitioning within the state machines
  • Loading branch information
moshababo authored Jan 25, 2024
1 parent 5b3d383 commit a766bd2
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 142 deletions.
1 change: 1 addition & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl StateMachine {
)),
recipient: Target::Broadcast,
};
self.pipe.send(output_message.into());
self.outbound_pipe.send(output_message.into());

// Clean the caches.
self.prepare_message_cache.retain(|k, _| k >= &self.view);
Expand Down
131 changes: 80 additions & 51 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ use std::{
};
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator;
use zksync_consensus_network::io::{ConsensusInputMessage, ConsensusReq, Target};
use zksync_consensus_roles::validator::{self, ConsensusMsg, Signed};

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
/// those messages. When participating in consensus we are not the leader most of the time.
pub(crate) struct StateMachine {
/// Consensus configuration and output channel.
pub(crate) config: Arc<Config>,
/// Pipe through with leader sends network messages.
pub(crate) pipe: OutputSender,
/// Pipe through which leader sends network messages.
pub(crate) outbound_pipe: OutputSender,
/// Pipe through which leader receives network requests.
inbound_pipe: sync::prunable_mpsc::Receiver<ConsensusReq>,
/// The current view number. This might not match the replica's view number, we only have this here
/// to make the leader advance monotonically in time and stop it from accepting messages from the past.
pub(crate) view: validator::ViewNumber,
Expand All @@ -28,7 +30,7 @@ pub(crate) struct StateMachine {
/// A cache of replica prepare messages indexed by view number and validator.
pub(crate) prepare_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaPrepare>>,
HashMap<validator::PublicKey, Signed<validator::ReplicaPrepare>>,
>,
/// Prepare QCs indexed by view number.
pub(crate) prepare_qcs: BTreeMap<validator::ViewNumber, validator::PrepareQC>,
Expand All @@ -37,19 +39,29 @@ pub(crate) struct StateMachine {
/// A cache of replica commit messages indexed by view number and validator.
pub(crate) commit_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaCommit>>,
HashMap<validator::PublicKey, Signed<validator::ReplicaCommit>>,
>,
/// Commit QCs indexed by view number.
pub(crate) commit_qcs: BTreeMap<validator::ViewNumber, validator::CommitQC>,
}

impl StateMachine {
/// Creates a new StateMachine struct.
/// Creates a new [`StateMachine`] instance.
///
/// Returns a tuple containing:
/// * The newly created [`StateMachine`] instance.
/// * A sender handle that should be used to send values to be processed by the instance, asynchronously.
#[instrument(level = "trace")]
pub fn new(ctx: &ctx::Ctx, config: Arc<Config>, pipe: OutputSender) -> Self {
StateMachine {
pub fn new(
ctx: &ctx::Ctx,
config: Arc<Config>,
outbound_pipe: OutputSender,
) -> (Self, sync::prunable_mpsc::Sender<ConsensusReq>) {
let (send, recv) = sync::prunable_mpsc::channel(StateMachine::inbound_pruning_predicate);

let this = StateMachine {
config,
pipe,
outbound_pipe,
view: validator::ViewNumber(0),
phase: validator::Phase::Prepare,
phase_start: ctx.now(),
Expand All @@ -58,49 +70,54 @@ impl StateMachine {
commit_message_cache: BTreeMap::new(),
prepare_qc: sync::watch::channel(None).0,
commit_qcs: BTreeMap::new(),
}
inbound_pipe: recv,
};

(this, send)
}

/// Process an input message (leaders don't time out waiting for a message). This is the
/// main entry point for the state machine. We need read-access to the inner consensus struct.
/// As a result, we can modify our state machine or send a message to the executor.
#[instrument(level = "trace", skip(self), ret)]
pub(crate) async fn process_input(
&mut self,
ctx: &ctx::Ctx,
input: validator::Signed<validator::ConsensusMsg>,
) -> ctx::Result<()> {
let now = ctx.now();
let label = match &input.msg {
validator::ConsensusMsg::ReplicaPrepare(_) => {
let res = match self
.process_replica_prepare(ctx, input.cast().unwrap())
.await
.wrap("process_replica_prepare()")
{
Ok(()) => Ok(()),
Err(super::replica_prepare::Error::Internal(err)) => {
return Err(err);
}
Err(err) => {
tracing::warn!("process_replica_prepare: {err:#}");
Err(())
}
};
metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res)
}
validator::ConsensusMsg::ReplicaCommit(_) => {
let res = self
.process_replica_commit(ctx, input.cast().unwrap())
.map_err(|err| {
tracing::warn!("process_replica_commit: {err:#}");
});
metrics::ConsensusMsgLabel::ReplicaCommit.with_result(&res)
}
_ => unreachable!(),
};
metrics::METRICS.leader_processing_latency[&label].observe_latency(ctx.now() - now);
Ok(())
/// Runs a loop to process incoming messages.
/// This is the main entry point for the state machine,
/// potentially triggering state modifications and message sending to the executor.
pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
let req = self.inbound_pipe.recv(ctx).await?;

let now = ctx.now();
let label = match &req.msg.msg {
ConsensusMsg::ReplicaPrepare(_) => {
let res = match self
.process_replica_prepare(ctx, req.msg.cast().unwrap())
.await
.wrap("process_replica_prepare()")
{
Ok(()) => Ok(()),
Err(super::replica_prepare::Error::Internal(err)) => {
return Err(err);
}
Err(err) => {
tracing::warn!("process_replica_prepare: {err:#}");
Err(())
}
};
metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res)
}
ConsensusMsg::ReplicaCommit(_) => {
let res = self
.process_replica_commit(ctx, req.msg.cast().unwrap())
.map_err(|err| {
tracing::warn!("process_replica_commit: {err:#}");
});
metrics::ConsensusMsgLabel::ReplicaCommit.with_result(&res)
}
_ => unreachable!(),
};
metrics::METRICS.leader_processing_latency[&label].observe_latency(ctx.now() - now);

// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
}
}

/// In a loop, receives a PrepareQC and sends a LeaderPrepare containing it.
Expand Down Expand Up @@ -213,4 +230,16 @@ impl StateMachine {
);
Ok(())
}

#[allow(clippy::match_like_matches_macro)]
fn inbound_pruning_predicate(pending_req: &ConsensusReq, new_req: &ConsensusReq) -> bool {
if pending_req.msg.key != new_req.msg.key {
return false;
}
match (&pending_req.msg.msg, &new_req.msg.msg) {
(ConsensusMsg::ReplicaPrepare(_), ConsensusMsg::ReplicaPrepare(_)) => true,
(ConsensusMsg::ReplicaCommit(_), ConsensusMsg::ReplicaCommit(_)) => true,
_ => false,
}
}
}
47 changes: 19 additions & 28 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
//! - [Notes on modern consensus algorithms](https://timroughgarden.github.io/fob21/andy.pdf)
//! - [Blog post comparing several consensus algorithms](https://decentralizedthoughts.github.io/2023-04-01-hotstuff-2/)
//! - Blog posts explaining [safety](https://seafooler.com/2022/01/24/understanding-safety-hotstuff/) and [responsiveness](https://seafooler.com/2022/04/02/understanding-responsiveness-hotstuff/)

use crate::io::{InputMessage, OutputMessage};
pub use config::Config;
use std::sync::Arc;
use zksync_concurrency::{ctx, scope};
use zksync_consensus_roles::validator;
use zksync_consensus_roles::validator::{self, ConsensusMsg};
use zksync_consensus_utils::pipe::ActorPipe;

mod config;
Expand All @@ -30,8 +32,6 @@ pub mod testonly;
#[cfg(test)]
mod tests;

pub use config::Config;

/// Protocol version of this BFT implementation.
pub const PROTOCOL_VERSION: validator::ProtocolVersion = validator::ProtocolVersion::EARLIEST;

Expand Down Expand Up @@ -65,15 +65,19 @@ impl Config {
mut pipe: ActorPipe<InputMessage, OutputMessage>,
) -> anyhow::Result<()> {
let cfg = Arc::new(self);
let (leader, leader_send) = leader::StateMachine::new(ctx, cfg.clone(), pipe.send.clone());
let (replica, replica_send) =
replica::StateMachine::start(ctx, cfg.clone(), pipe.send.clone()).await?;

let res = scope::run!(ctx, |ctx, s| async {
let mut replica =
replica::StateMachine::start(ctx, cfg.clone(), pipe.send.clone()).await?;
let mut leader = leader::StateMachine::new(ctx, cfg.clone(), pipe.send.clone());
let prepare_qc_recv = leader.prepare_qc.subscribe();

s.spawn_bg(replica.run(ctx));
s.spawn_bg(leader.run(ctx));
s.spawn_bg(leader::StateMachine::run_proposer(
ctx,
&cfg,
leader.prepare_qc.subscribe(),
prepare_qc_recv,
&pipe.send,
));

Expand All @@ -82,36 +86,23 @@ impl Config {
// This is the infinite loop where the consensus actually runs. The validator waits for either
// a message from the network or for a timeout, and processes each accordingly.
loop {
let input = pipe
.recv
.recv(&ctx.with_deadline(replica.timeout_deadline))
.await
.ok();
let input = pipe.recv.recv(ctx).await;

// We check if the context is active before processing the input. If the context is not active,
// we stop.
if !ctx.is_active() {
return Ok(());
}

let Some(InputMessage::Network(req)) = input else {
replica.start_new_view(ctx).await?;
continue;
};

use validator::ConsensusMsg as Msg;
let res = match &req.msg.msg {
Msg::ReplicaPrepare(_) | Msg::ReplicaCommit(_) => {
leader.process_input(ctx, req.msg).await
let InputMessage::Network(req) = input.unwrap();
match &req.msg.msg {
ConsensusMsg::ReplicaPrepare(_) | ConsensusMsg::ReplicaCommit(_) => {
leader_send.send(req);
}
Msg::LeaderPrepare(_) | Msg::LeaderCommit(_) => {
replica.process_input(ctx, req.msg).await
ConsensusMsg::LeaderPrepare(_) | ConsensusMsg::LeaderCommit(_) => {
replica_send.send(req);
}
};
// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
res?;
}
}
})
.await;
Expand Down
2 changes: 1 addition & 1 deletion node/actors/bft/src/replica/leader_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl StateMachine {
.sign_msg(validator::ConsensusMsg::ReplicaCommit(commit_vote)),
recipient: Target::Validator(author.clone()),
};
self.pipe.send(output_message.into());
self.outbound_pipe.send(output_message.into());

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion node/actors/bft/src/replica/new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl StateMachine {
)),
recipient: Target::Validator(self.config.view_leader(next_view)),
};
self.pipe.send(output_message.into());
self.outbound_pipe.send(output_message.into());

// Reset the timer.
self.reset_timer(ctx);
Expand Down
Loading

0 comments on commit a766bd2

Please sign in to comment.