Skip to content

Commit

Permalink
Apply pruning predicate, improve async error propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
moshababo committed Jan 16, 2024
1 parent 91227d7 commit 7d5bac1
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 83 deletions.
30 changes: 24 additions & 6 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator;
use zksync_consensus_roles::{validator, validator::ConsensusMsg};

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
/// replica messages and produce leader messages (including proposing blocks) when we reach the threshold for
Expand Down Expand Up @@ -61,12 +61,11 @@ impl StateMachine {
}
}

/// Process an input message (leaders don't time out waiting for a message). This is the
/// main entry point for the state machine. We need read-access to the inner consensus struct.
/// As a result, we can modify our state machine or send a message to the executor.
// #[instrument(level = "trace", skip(self), ret)]
/// Runs a loop to process incoming messages.
/// This is the main entry point for the state machine,
/// potentially triggering state modifications and message sending to the executor.
pub async fn run(
&mut self,
mut self,
ctx: &ctx::Ctx,
mut queue: sync::prunable_queue::Receiver<
validator::Signed<validator::ConsensusMsg>,
Expand Down Expand Up @@ -212,4 +211,23 @@ impl StateMachine {
);
Ok(())
}

pub fn queue_pruning_predicate(
existing_msg: &validator::Signed<ConsensusMsg>,
new_msg: &validator::Signed<ConsensusMsg>,
) -> bool {
if existing_msg.key != new_msg.key {
return false;
}

match (&existing_msg.msg, &new_msg.msg) {
(ConsensusMsg::ReplicaPrepare(existing_msg), ConsensusMsg::ReplicaPrepare(new_msg)) => {
new_msg.view > existing_msg.view
}
(ConsensusMsg::ReplicaCommit(existing_msg), ConsensusMsg::ReplicaCommit(new_msg)) => {
new_msg.view > existing_msg.view
}
_ => false,
}
}
}
119 changes: 62 additions & 57 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
use crate::io::{InputMessage, OutputMessage};
pub use config::Config;
use std::sync::Arc;
use zksync_concurrency::{ctx, scope, sync};
use zksync_consensus_roles::validator;
use tokio::sync::mpsc;
use zksync_concurrency::{ctx, oneshot::Receiver, scope, sync};
use zksync_consensus_roles::validator::{self, ConsensusMsg};
use zksync_consensus_utils::pipe::ActorPipe;

mod config;
Expand Down Expand Up @@ -65,28 +66,28 @@ impl Config {
mut pipe: ActorPipe<InputMessage, OutputMessage>,
) -> anyhow::Result<()> {
let cfg = Arc::new(self);
let mut leader = leader::StateMachine::new(ctx, cfg.clone(), pipe.send.clone());
let mut replica = replica::StateMachine::start(ctx, cfg.clone(), pipe.send.clone()).await?;

let (leader_sender, leader_receiver) = sync::prunable_queue::new(
Box::new(|_, _| true), // TODO: apply actual dedup predicate
);
let (replica_sender, replica_receiver) = sync::prunable_queue::new(
Box::new(|_, _| true), // TODO: apply actual dedup predicate
);
let leader = leader::StateMachine::new(ctx, cfg.clone(), pipe.send.clone());
let replica = replica::StateMachine::start(ctx, cfg.clone(), pipe.send.clone()).await?;

let (leader_send, leader_recv) =
sync::prunable_queue::new(Box::new(leader::StateMachine::queue_pruning_predicate));

let (replica_send, replica_recv) =
sync::prunable_queue::new(Box::new(replica::StateMachine::queue_pruning_predicate));

// mpsc channel for returning error asynchronously.
let (err_sender, mut err_receiver) = tokio::sync::mpsc::channel::<ctx::Result<()>>(1);
let (err_send, mut err_recv) = mpsc::channel::<ctx::Result<()>>(1);

let res = scope::run!(ctx, |ctx, s| async {
let prepare_qc_receiver = leader.prepare_qc.subscribe();
let timeout_deadline = replica.timeout_deadline.subscribe();
let prepare_qc_recv = leader.prepare_qc.subscribe();
let deadline_recv = replica.timeout_deadline.subscribe();

s.spawn_bg(replica.run(ctx, replica_receiver));
s.spawn_bg(leader.run(ctx, leader_receiver));
s.spawn_bg(replica.run(ctx, replica_recv));
s.spawn_bg(leader.run(ctx, leader_recv));
s.spawn_bg(leader::StateMachine::run_proposer(
ctx,
&cfg,
prepare_qc_receiver,
prepare_qc_recv,
&pipe.send,
));

Expand All @@ -95,7 +96,7 @@ impl Config {
// This is the infinite loop where the consensus actually runs. The validator waits for either
// a message from the network or for a timeout, and processes each accordingly.
loop {
let deadline = *timeout_deadline.borrow();
let deadline = *deadline_recv.borrow();
let input = pipe.recv.recv(&ctx.with_deadline(deadline)).await.ok();

// We check if the context is active before processing the input. If the context is not active,
Expand All @@ -104,54 +105,37 @@ impl Config {
return Ok(());
}

if let Ok(err) = err_receiver.try_recv() {
return Err(err.err().unwrap());
// Check if an asynchronous error was received.
if let Ok(res) = err_recv.try_recv() {
return Err(res.err().unwrap());
}

let Some(InputMessage::Network(req)) = input else {
let res = replica_sender.enqueue(None).await;
// Await for result before proceeding, allowing deadline value to be updated.
res.recv_or_disconnected(ctx)
.await
.unwrap()
.unwrap()
.unwrap();
let res_recv = replica_send.enqueue(None).await;
// Wait for result before proceeding, allowing timeout deadline value to get updated.
handle_result(ctx, res_recv, &err_send).await;
continue;
};

use validator::ConsensusMsg as Msg;
let res_recv;
match &req.msg.msg {
Msg::ReplicaPrepare(_) | Msg::ReplicaCommit(_) => {
let res = leader_sender.enqueue(req.msg).await;
s.spawn_bg(async {
let res = res.recv_or_disconnected(ctx).await;
// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
// Notify if result is Err.
if let Ok(Ok(Err(err))) = res {
err_sender.clone().send(Err(err)).await.unwrap();
}

Ok(())
});
ConsensusMsg::ReplicaPrepare(_) | ConsensusMsg::ReplicaCommit(_) => {
res_recv = leader_send.enqueue(req.msg).await;
}
Msg::LeaderPrepare(_) | Msg::LeaderCommit(_) => {
let res = replica_sender.enqueue(Some(req.msg)).await;
s.spawn_bg(async {
let res = res.recv_or_disconnected(ctx).await;
// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
// Notify if result is Err.
if let Ok(Ok(Err(err))) = res {
err_sender.clone().send(Err(err)).await.unwrap();
}

Ok(())
});
ConsensusMsg::LeaderPrepare(_) | ConsensusMsg::LeaderCommit(_) => {
res_recv = replica_send.enqueue(Some(req.msg)).await;
}
};
}

s.spawn_bg(async {
handle_result(ctx, res_recv, &err_send).await;

// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());

Ok(())
});
}
})
.await;
Expand All @@ -161,3 +145,24 @@ impl Config {
}
}
}

async fn handle_result(
ctx: &ctx::Ctx,
res_recv: Receiver<ctx::Result<()>>,
err_send: &mpsc::Sender<ctx::Result<()>>,
) {
let res = res_recv.recv_or_disconnected(ctx).await;

// Ignore the outer `Canceled` (should be handled elsewhere)
// and `Disconnected` (expected in the case of inbound message queue pruning) errors.
if let Ok(Ok(Err(err))) = res {
match err {
// Ignore inner `Canceled` as well, for the same reason.
ctx::Error::Canceled(_) => {}
// Notify internal error.
ctx::Error::Internal(_) => {
err_send.send(Err(err)).await.unwrap();
}
}
}
}
42 changes: 32 additions & 10 deletions node/actors/bft/src/replica/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{
sync::Arc,
};
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_roles::validator;
use zksync_consensus_roles::{validator, validator::ConsensusMsg};
use zksync_consensus_storage as storage;

/// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible
Expand Down Expand Up @@ -65,24 +65,22 @@ impl StateMachine {
Ok(this)
}

/// Process an input message (it will be None if the channel timed out waiting for a message). This is
/// the main entry point for the state machine. We need read-access to the inner consensus struct.
/// As a result, we can modify our state machine or send a message to the executor.
//#[instrument(level = "trace", ret)]
/// Runs a loop to process incoming messages (may be `None` if the channel times out while waiting for a message).
/// This is the main entry point for the state machine,
/// potentially triggering state modifications and message sending to the executor.
pub async fn run(
&mut self,
mut self,
ctx: &ctx::Ctx,
mut queue: sync::prunable_queue::Receiver<
Option<validator::Signed<validator::ConsensusMsg>>,
ctx::Result<()>,
>,
) -> ctx::Result<()> {
loop {
let (signed_message, res_sender) = queue.dequeue(ctx).await?;

let (signed_message, res_send) = queue.dequeue(ctx).await?;
let Some(signed_message) = signed_message else {
let res = self.start_new_view(ctx).await;
let _ = res_sender.send(res);
let _ = res_send.send(res);
continue;
};

Expand Down Expand Up @@ -122,7 +120,7 @@ impl StateMachine {
};
metrics::METRICS.replica_processing_latency[&label].observe_latency(ctx.now() - now);

let _ = res_sender.send(Ok(()));
let _ = res_send.send(Ok(()));
}
}

Expand All @@ -149,4 +147,28 @@ impl StateMachine {
.wrap("put_replica_state")?;
Ok(())
}

pub fn queue_pruning_predicate(
existing_msg: &Option<validator::Signed<ConsensusMsg>>,
new_msg: &Option<validator::Signed<ConsensusMsg>>,
) -> bool {
if existing_msg.is_none() || new_msg.is_none() {
return false;
}
let (existing_msg, new_msg) = (existing_msg.as_ref().unwrap(), new_msg.as_ref().unwrap());

if existing_msg.key != new_msg.key {
return false;
}

match (&existing_msg.msg, &new_msg.msg) {
(ConsensusMsg::LeaderPrepare(existing_msg), ConsensusMsg::LeaderPrepare(new_msg)) => {
new_msg.view > existing_msg.view
}
(ConsensusMsg::LeaderCommit(existing_msg), ConsensusMsg::LeaderCommit(new_msg)) => {
new_msg.justification.message.view > existing_msg.justification.message.view
}
_ => false,
}
}
}
5 changes: 2 additions & 3 deletions node/actors/bft/src/replica/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ impl StateMachine {
pub(crate) fn reset_timer(&mut self, ctx: &ctx::Ctx) {
let timeout =
Self::BASE_DURATION * 2u32.pow((self.view.0 - self.high_qc.message.view.0) as u32);

metrics::METRICS.replica_view_timeout.set_latency(timeout);
let _ = self
.timeout_deadline
.send(time::Deadline::Finite(ctx.now() + timeout));
self.timeout_deadline.send_replace(time::Deadline::Finite(ctx.now() + timeout));
}
}
14 changes: 7 additions & 7 deletions node/libs/concurrency/src/sync/prunable_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::{collections::VecDeque, sync::Arc};
use tokio::sync::Mutex;

pub fn new<T, U>(
predicate: Box<dyn Sync + Send + Fn(&T, &T) -> bool>,
pruning_predicate: Box<dyn Sync + Send + Fn(&T, &T) -> bool>,
) -> (Sender<T, U>, Receiver<T, U>) {
let queue: Mutex<VecDeque<(T, oneshot::Sender<U>)>> = Mutex::new(VecDeque::new());
// Internal signaling, to enable waiting on the receiver side for new items.
Expand All @@ -30,7 +30,7 @@ pub fn new<T, U>(

let sender = Sender {
shared: shared.clone(),
predicate,
pruning_predicate: pruning_predicate,
};

let receiver = Receiver {
Expand All @@ -48,22 +48,22 @@ pub struct Shared<T, U> {

pub struct Sender<T, U> {
shared: Arc<Shared<T, U>>,
predicate: Box<dyn Sync + Send + Fn(&T, &T) -> bool>,
pruning_predicate: Box<dyn Sync + Send + Fn(&T, &T) -> bool>,
}

impl<T, U> Sender<T, U> {
pub async fn enqueue(&self, item: T) -> oneshot::Receiver<U> {
// Create oneshot channel for returning result asynchronously.
let (res_sender, res_receiver) = oneshot::channel();
let (res_send, res_recv) = oneshot::channel();

let mut queue = self.shared.queue.lock().await;
queue.retain(|existing_item| (self.predicate)(&existing_item.0, &item));
queue.push_back((item, res_sender));
queue.retain(|existing_item| !(self.pruning_predicate)(&existing_item.0, &item));
queue.push_back((item, res_send));

// Ignore sending error.
let _ = self.shared.has_items_send.send(true);

res_receiver
res_recv
}
}

Expand Down

0 comments on commit 7d5bac1

Please sign in to comment.