Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make consensus state machine asynchronous (BFT-406) #56

Merged
merged 37 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4264d08
Making consensus state machine asynchronous
moshababo Jan 10, 2024
7ead989
Add sync::prunable_queue
moshababo Jan 14, 2024
91227d7
Add asynchronous flow
moshababo Jan 15, 2024
7d5bac1
Apply pruning predicate, improve async error propagation
moshababo Jan 16, 2024
8099e4d
Merge branch 'main' into bft_async
moshababo Jan 16, 2024
e5d1027
Don't import Ctx directly
moshababo Jan 18, 2024
ba644b8
Use Mutex re-export
moshababo Jan 18, 2024
bc15f5c
Box internally instead expecting it
moshababo Jan 18, 2024
aac7624
Use channel naming conventions
moshababo Jan 18, 2024
10f971b
Use the infallible `send_replace`
moshababo Jan 18, 2024
0ac0707
Encapsulate queue construction
moshababo Jan 19, 2024
7c35b20
`prunable_queue` -> `prunable_mpsc`
moshababo Jan 19, 2024
61c3511
Move tests into a separate module in a separate file
moshababo Jan 19, 2024
d74bb6b
Add time complexity note
moshababo Jan 19, 2024
4dd7598
Add comment
moshababo Jan 19, 2024
3cd687d
Simplify match expression
moshababo Jan 19, 2024
020f25b
Upgrade test code
moshababo Jan 22, 2024
3aaba4f
cargo fmt
moshababo Jan 22, 2024
027d82f
cargo clippy
moshababo Jan 22, 2024
3620997
Move timeout handling under replica
moshababo Jan 22, 2024
fc46547
cargo clippy (unrelated)
moshababo Jan 22, 2024
ae28edc
Merge branch 'main' into bft_async
moshababo Jan 22, 2024
2209d5d
Remove bidirectional communication
moshababo Jan 22, 2024
5683762
Test module to use super::*;
moshababo Jan 23, 2024
7446f7d
Add abort_on_panic()
moshababo Jan 23, 2024
0c02bc4
Move inside closure
moshababo Jan 23, 2024
930d3ea
Remove #[allow(clippy::map_identity)]
moshababo Jan 23, 2024
500b17f
Document return value
moshababo Jan 23, 2024
677b467
Change pruning predicate to prune existing messages from the same val…
moshababo Jan 23, 2024
ee16c24
Merge branch 'main' into bft_async
moshababo Jan 23, 2024
d25c1c9
cargo clippy
moshababo Jan 23, 2024
0862153
cleanup
moshababo Jan 23, 2024
ba22e74
Use std Mutex
moshababo Jan 23, 2024
dd4f659
Remove secondary watch value
moshababo Jan 25, 2024
ab5f4cb
Refactor timeout assertion
moshababo Jan 25, 2024
8e4543c
Refactor docs
moshababo Jan 25, 2024
6edb3c9
Merge branch 'main' into bft_async
moshababo Jan 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions node/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion node/actors/bft/src/leader/replica_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl StateMachine {
)),
recipient: Target::Broadcast,
};
self.pipe.send(output_message.into());
self.outbound_pipe.send(output_message.into());

// Clean the caches.
self.prepare_message_cache.retain(|k, _| k >= &self.view);
Expand Down
131 changes: 80 additions & 51 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,19 @@ use std::{
};
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator;
use zksync_consensus_network::io::{ConsensusInputMessage, ConsensusReq, Target};
use zksync_consensus_roles::validator::{self, ConsensusMsg, Signed};

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
/// those messages. When participating in consensus we are not the leader most of the time.
pub(crate) struct StateMachine {
/// Consensus configuration and output channel.
pub(crate) config: Arc<Config>,
/// Pipe through with leader sends network messages.
pub(crate) pipe: OutputSender,
/// Pipe through which leader sends network messages.
pub(crate) outbound_pipe: OutputSender,
/// Pipe through which leader receives network requests.
inbound_pipe: sync::prunable_mpsc::Receiver<ConsensusReq>,
/// The current view number. This might not match the replica's view number, we only have this here
/// to make the leader advance monotonically in time and stop it from accepting messages from the past.
pub(crate) view: validator::ViewNumber,
Expand All @@ -28,7 +30,7 @@ pub(crate) struct StateMachine {
/// A cache of replica prepare messages indexed by view number and validator.
pub(crate) prepare_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaPrepare>>,
HashMap<validator::PublicKey, Signed<validator::ReplicaPrepare>>,
>,
/// Prepare QCs indexed by view number.
pub(crate) prepare_qcs: BTreeMap<validator::ViewNumber, validator::PrepareQC>,
Expand All @@ -37,19 +39,29 @@ pub(crate) struct StateMachine {
/// A cache of replica commit messages indexed by view number and validator.
pub(crate) commit_message_cache: BTreeMap<
validator::ViewNumber,
HashMap<validator::PublicKey, validator::Signed<validator::ReplicaCommit>>,
HashMap<validator::PublicKey, Signed<validator::ReplicaCommit>>,
>,
/// Commit QCs indexed by view number.
pub(crate) commit_qcs: BTreeMap<validator::ViewNumber, validator::CommitQC>,
}

impl StateMachine {
/// Creates a new StateMachine struct.
/// Creates a new [`StateMachine`] instance.
///
/// Returns a tuple containing:
/// * The newly created [`StateMachine`] instance.
/// * A sender handle that should be used to send values to be processed by the instance, asynchronously.
#[instrument(level = "trace")]
pub fn new(ctx: &ctx::Ctx, config: Arc<Config>, pipe: OutputSender) -> Self {
StateMachine {
pub fn new(
ctx: &ctx::Ctx,
config: Arc<Config>,
outbound_pipe: OutputSender,
) -> (Self, sync::prunable_mpsc::Sender<ConsensusReq>) {
let (send, recv) = sync::prunable_mpsc::channel(StateMachine::inbound_pruning_predicate);

let this = StateMachine {
config,
pipe,
outbound_pipe,
view: validator::ViewNumber(0),
phase: validator::Phase::Prepare,
phase_start: ctx.now(),
Expand All @@ -58,49 +70,54 @@ impl StateMachine {
commit_message_cache: BTreeMap::new(),
prepare_qc: sync::watch::channel(None).0,
commit_qcs: BTreeMap::new(),
}
inbound_pipe: recv,
};

(this, send)
}

/// Process an input message (leaders don't time out waiting for a message). This is the
/// main entry point for the state machine. We need read-access to the inner consensus struct.
/// As a result, we can modify our state machine or send a message to the executor.
#[instrument(level = "trace", skip(self), ret)]
pub(crate) async fn process_input(
&mut self,
ctx: &ctx::Ctx,
input: validator::Signed<validator::ConsensusMsg>,
) -> ctx::Result<()> {
let now = ctx.now();
let label = match &input.msg {
validator::ConsensusMsg::ReplicaPrepare(_) => {
let res = match self
.process_replica_prepare(ctx, input.cast().unwrap())
.await
.wrap("process_replica_prepare()")
{
Ok(()) => Ok(()),
Err(super::replica_prepare::Error::Internal(err)) => {
return Err(err);
}
Err(err) => {
tracing::warn!("process_replica_prepare: {err:#}");
Err(())
}
};
metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res)
}
validator::ConsensusMsg::ReplicaCommit(_) => {
let res = self
.process_replica_commit(ctx, input.cast().unwrap())
.map_err(|err| {
tracing::warn!("process_replica_commit: {err:#}");
});
metrics::ConsensusMsgLabel::ReplicaCommit.with_result(&res)
}
_ => unreachable!(),
};
metrics::METRICS.leader_processing_latency[&label].observe_latency(ctx.now() - now);
Ok(())
/// Runs a loop to process incoming messages.
/// This is the main entry point for the state machine,
/// potentially triggering state modifications and message sending to the executor.
pub(crate) async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
let req = self.inbound_pipe.recv(ctx).await?;

let now = ctx.now();
let label = match &req.msg.msg {
ConsensusMsg::ReplicaPrepare(_) => {
let res = match self
.process_replica_prepare(ctx, req.msg.cast().unwrap())
.await
.wrap("process_replica_prepare()")
{
Ok(()) => Ok(()),
Err(super::replica_prepare::Error::Internal(err)) => {
return Err(err);
}
Err(err) => {
tracing::warn!("process_replica_prepare: {err:#}");
Err(())
}
};
metrics::ConsensusMsgLabel::ReplicaPrepare.with_result(&res)
}
ConsensusMsg::ReplicaCommit(_) => {
let res = self
.process_replica_commit(ctx, req.msg.cast().unwrap())
.map_err(|err| {
tracing::warn!("process_replica_commit: {err:#}");
});
metrics::ConsensusMsgLabel::ReplicaCommit.with_result(&res)
}
_ => unreachable!(),
};
metrics::METRICS.leader_processing_latency[&label].observe_latency(ctx.now() - now);

// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
}
}

/// In a loop, receives a PrepareQC and sends a LeaderPrepare containing it.
Expand Down Expand Up @@ -213,4 +230,16 @@ impl StateMachine {
);
Ok(())
}

#[allow(clippy::match_like_matches_macro)]
fn inbound_pruning_predicate(pending_req: &ConsensusReq, new_req: &ConsensusReq) -> bool {
if pending_req.msg.key != new_req.msg.key {
return false;
}
match (&pending_req.msg.msg, &new_req.msg.msg) {
(ConsensusMsg::ReplicaPrepare(_), ConsensusMsg::ReplicaPrepare(_)) => true,
(ConsensusMsg::ReplicaCommit(_), ConsensusMsg::ReplicaCommit(_)) => true,
_ => false,
}
}
}
47 changes: 19 additions & 28 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
//! - [Notes on modern consensus algorithms](https://timroughgarden.github.io/fob21/andy.pdf)
//! - [Blog post comparing several consensus algorithms](https://decentralizedthoughts.github.io/2023-04-01-hotstuff-2/)
//! - Blog posts explaining [safety](https://seafooler.com/2022/01/24/understanding-safety-hotstuff/) and [responsiveness](https://seafooler.com/2022/04/02/understanding-responsiveness-hotstuff/)

use crate::io::{InputMessage, OutputMessage};
pub use config::Config;
use std::sync::Arc;
use zksync_concurrency::{ctx, scope};
use zksync_consensus_roles::validator;
use zksync_consensus_roles::validator::{self, ConsensusMsg};
use zksync_consensus_utils::pipe::ActorPipe;

mod config;
Expand All @@ -30,8 +32,6 @@ pub mod testonly;
#[cfg(test)]
mod tests;

pub use config::Config;

/// Protocol version of this BFT implementation.
pub const PROTOCOL_VERSION: validator::ProtocolVersion = validator::ProtocolVersion::EARLIEST;

Expand Down Expand Up @@ -65,15 +65,19 @@ impl Config {
mut pipe: ActorPipe<InputMessage, OutputMessage>,
) -> anyhow::Result<()> {
let cfg = Arc::new(self);
let (leader, leader_send) = leader::StateMachine::new(ctx, cfg.clone(), pipe.send.clone());
let (replica, replica_send) =
replica::StateMachine::start(ctx, cfg.clone(), pipe.send.clone()).await?;

let res = scope::run!(ctx, |ctx, s| async {
let mut replica =
replica::StateMachine::start(ctx, cfg.clone(), pipe.send.clone()).await?;
let mut leader = leader::StateMachine::new(ctx, cfg.clone(), pipe.send.clone());
let prepare_qc_recv = leader.prepare_qc.subscribe();

s.spawn_bg(replica.run(ctx));
s.spawn_bg(leader.run(ctx));
s.spawn_bg(leader::StateMachine::run_proposer(
ctx,
&cfg,
leader.prepare_qc.subscribe(),
prepare_qc_recv,
&pipe.send,
));

Expand All @@ -82,36 +86,23 @@ impl Config {
// This is the infinite loop where the consensus actually runs. The validator waits for either
// a message from the network or for a timeout, and processes each accordingly.
loop {
let input = pipe
.recv
.recv(&ctx.with_deadline(replica.timeout_deadline))
.await
.ok();
let input = pipe.recv.recv(ctx).await;

// We check if the context is active before processing the input. If the context is not active,
// we stop.
if !ctx.is_active() {
return Ok(());
}

let Some(InputMessage::Network(req)) = input else {
replica.start_new_view(ctx).await?;
pompon0 marked this conversation as resolved.
Show resolved Hide resolved
continue;
};

use validator::ConsensusMsg as Msg;
let res = match &req.msg.msg {
Msg::ReplicaPrepare(_) | Msg::ReplicaCommit(_) => {
leader.process_input(ctx, req.msg).await
let InputMessage::Network(req) = input.unwrap();
match &req.msg.msg {
ConsensusMsg::ReplicaPrepare(_) | ConsensusMsg::ReplicaCommit(_) => {
leader_send.send(req);
}
Msg::LeaderPrepare(_) | Msg::LeaderCommit(_) => {
replica.process_input(ctx, req.msg).await
ConsensusMsg::LeaderPrepare(_) | ConsensusMsg::LeaderCommit(_) => {
replica_send.send(req);
}
};
// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
res?;
}
}
})
.await;
Expand Down
2 changes: 1 addition & 1 deletion node/actors/bft/src/replica/leader_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl StateMachine {
.sign_msg(validator::ConsensusMsg::ReplicaCommit(commit_vote)),
recipient: Target::Validator(author.clone()),
};
self.pipe.send(output_message.into());
self.outbound_pipe.send(output_message.into());

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion node/actors/bft/src/replica/new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl StateMachine {
)),
recipient: Target::Validator(self.config.view_leader(next_view)),
};
self.pipe.send(output_message.into());
self.outbound_pipe.send(output_message.into());

// Reset the timer.
self.reset_timer(ctx);
Expand Down
Loading
Loading