Skip to content

Commit

Permalink
Remove bidirectional communication
Browse files Browse the repository at this point in the history
  • Loading branch information
moshababo committed Jan 22, 2024
1 parent ae28edc commit 2209d5d
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 144 deletions.
33 changes: 15 additions & 18 deletions node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
};
use tracing::instrument;
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_network::io::{ConsensusInputMessage, ConsensusReq, Target};
use zksync_consensus_roles::validator::{self, ConsensusMsg, Signed};

/// The StateMachine struct contains the state of the leader. This is a simple state machine. We just store
Expand All @@ -17,8 +17,8 @@ pub(crate) struct StateMachine {
pub(crate) config: Arc<Config>,
/// Pipe through which leader sends network messages.
pub(crate) outbound_pipe: OutputSender,
/// Pipe through which leader receives network messages.
inbound_pipe: sync::prunable_mpsc::Receiver<Signed<ConsensusMsg>, ctx::Result<()>>,
/// Pipe through which leader receives network requests.
inbound_pipe: sync::prunable_mpsc::Receiver<ConsensusReq>,
/// The current view number. This might not match the replica's view number, we only have this here
/// to make the leader advance monotonically in time and stop it from accepting messages from the past.
pub(crate) view: validator::ViewNumber,
Expand Down Expand Up @@ -52,10 +52,7 @@ impl StateMachine {
ctx: &ctx::Ctx,
config: Arc<Config>,
outbound_pipe: OutputSender,
) -> (
Self,
sync::prunable_mpsc::Sender<Signed<ConsensusMsg>, ctx::Result<()>>,
) {
) -> (Self, sync::prunable_mpsc::Sender<ConsensusReq>) {
let (send, recv) = sync::prunable_mpsc::channel(StateMachine::inbound_pruning_predicate);

let this = StateMachine {
Expand All @@ -80,13 +77,13 @@ impl StateMachine {
/// potentially triggering state modifications and message sending to the executor.
pub async fn run(mut self, ctx: &ctx::Ctx) -> ctx::Result<()> {
loop {
let (signed_message, res_send) = self.inbound_pipe.recv(ctx).await?;
let req = self.inbound_pipe.recv(ctx).await?;

let now = ctx.now();
let label = match &signed_message.msg {
let label = match &req.msg.msg {
ConsensusMsg::ReplicaPrepare(_) => {
let res = match self
.process_replica_prepare(ctx, signed_message.cast().unwrap())
.process_replica_prepare(ctx, req.msg.cast().unwrap())
.await
.wrap("process_replica_prepare()")
{
Expand All @@ -103,7 +100,7 @@ impl StateMachine {
}
ConsensusMsg::ReplicaCommit(_) => {
let res = self
.process_replica_commit(ctx, signed_message.cast().unwrap())
.process_replica_commit(ctx, req.msg.cast().unwrap())
.map_err(|err| {
tracing::warn!("process_replica_commit: {err:#}");
});
Expand All @@ -112,7 +109,10 @@ impl StateMachine {
_ => unreachable!(),
};
metrics::METRICS.leader_processing_latency[&label].observe_latency(ctx.now() - now);
let _ = res_send.send(Ok(()));

// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
}
}

Expand Down Expand Up @@ -219,15 +219,12 @@ impl StateMachine {
Ok(())
}

pub fn inbound_pruning_predicate(
pending_msg: &Signed<ConsensusMsg>,
new_msg: &Signed<ConsensusMsg>,
) -> bool {
if pending_msg.key != new_msg.key {
pub fn inbound_pruning_predicate(pending_req: &ConsensusReq, new_req: &ConsensusReq) -> bool {
if pending_req.msg.key != new_req.msg.key {
return false;
}

match (&pending_msg.msg, &new_msg.msg) {
match (&pending_req.msg.msg, &new_req.msg.msg) {
(ConsensusMsg::ReplicaPrepare(existing_msg), ConsensusMsg::ReplicaPrepare(new_msg)) => {
new_msg.view > existing_msg.view
}
Expand Down
44 changes: 2 additions & 42 deletions node/actors/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ impl Config {
let (replica, replica_send) =
replica::StateMachine::start(ctx, cfg.clone(), pipe.send.clone()).await?;

// mpsc channel for returning error asynchronously.
let (err_send, mut err_recv) = mpsc::channel::<ctx::Result<()>>(1);

let res = scope::run!(ctx, |ctx, s| async {
let prepare_qc_recv = leader.prepare_qc.subscribe();

Expand All @@ -98,31 +95,15 @@ impl Config {
return Ok(());
}

// Check if an asynchronous error was returned.
if let Ok(res) = err_recv.try_recv() {
return Err(res.err().unwrap());
}

let InputMessage::Network(req) = input.unwrap();
let res_recv;
match &req.msg.msg {
ConsensusMsg::ReplicaPrepare(_) | ConsensusMsg::ReplicaCommit(_) => {
res_recv = leader_send.send(req.msg).await;
leader_send.send(req).await;
}
ConsensusMsg::LeaderPrepare(_) | ConsensusMsg::LeaderCommit(_) => {
res_recv = replica_send.send(req.msg).await;
replica_send.send(req).await;
}
}

s.spawn_bg(async {
handle_result(ctx, res_recv, &err_send).await;

// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());

Ok(())
});
}
})
.await;
Expand All @@ -132,24 +113,3 @@ impl Config {
}
}
}

async fn handle_result(
ctx: &ctx::Ctx,
res_recv: Receiver<ctx::Result<()>>,
err_send: &mpsc::Sender<ctx::Result<()>>,
) {
let res = res_recv.recv_or_disconnected(ctx).await;

// Ignore the outer `Canceled` (should be handled elsewhere)
// and `Disconnected` (expected in the case of inbound message queue pruning) errors.
if let Ok(Ok(Err(err))) = res {
match err {
// Ignore inner `Canceled` as well, for the same reason.
ctx::Error::Canceled(_) => {}
// Notify internal error.
ctx::Error::Internal(_) => {
err_send.send(Err(err)).await.unwrap();
}
}
}
}
33 changes: 15 additions & 18 deletions node/actors/bft/src/replica/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
sync::Arc,
};
use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time};
use zksync_consensus_network::io::ConsensusReq;
use zksync_consensus_roles::{
validator,
validator::{ConsensusMsg, Signed},
Expand All @@ -18,8 +19,8 @@ pub(crate) struct StateMachine {
pub(crate) config: Arc<Config>,
/// Pipe through which replica sends network messages.
pub(super) outbound_pipe: OutputSender,
/// Pipe through which replica receives network messages.
inbound_pipe: sync::prunable_mpsc::Receiver<Signed<ConsensusMsg>, ctx::Result<()>>,
/// Pipe through which replica receives network requests.
inbound_pipe: sync::prunable_mpsc::Receiver<ConsensusReq>,
/// The current view number.
pub(crate) view: validator::ViewNumber,
/// The current phase.
Expand All @@ -42,10 +43,7 @@ impl StateMachine {
ctx: &ctx::Ctx,
config: Arc<Config>,
outbound_pipe: OutputSender,
) -> ctx::Result<(
Self,
sync::prunable_mpsc::Sender<Signed<ConsensusMsg>, ctx::Result<()>>,
)> {
) -> ctx::Result<(Self, sync::prunable_mpsc::Sender<ConsensusReq>)> {
let backup = match config.replica_store.state(ctx).await? {
Some(backup) => backup,
None => config.block_store.subscribe().borrow().last.clone().into(),
Expand Down Expand Up @@ -94,16 +92,16 @@ impl StateMachine {
}

// Check for timeout.
let Some((signed_message, res_send)) = recv.ok() else {
let _ = self.start_new_view(ctx).await;
let Some(req) = recv.ok() else {
self.start_new_view(ctx).await?;
continue;
};

let now = ctx.now();
let label = match &signed_message.msg {
let label = match &req.msg.msg {
ConsensusMsg::LeaderPrepare(_) => {
let res = match self
.process_leader_prepare(ctx, signed_message.cast().unwrap())
.process_leader_prepare(ctx, req.msg.cast().unwrap())
.await
.wrap("process_leader_prepare()")
{
Expand All @@ -118,7 +116,7 @@ impl StateMachine {
}
ConsensusMsg::LeaderCommit(_) => {
let res = match self
.process_leader_commit(ctx, signed_message.cast().unwrap())
.process_leader_commit(ctx, req.msg.cast().unwrap())
.await
.wrap("process_leader_commit()")
{
Expand All @@ -135,7 +133,9 @@ impl StateMachine {
};
metrics::METRICS.replica_processing_latency[&label].observe_latency(ctx.now() - now);

let _ = res_send.send(Ok(()));
// Notify network actor that the message has been processed.
// Ignore sending error.
let _ = req.ack.send(());
}
}

Expand Down Expand Up @@ -163,14 +163,11 @@ impl StateMachine {
Ok(())
}

pub fn inbound_pruning_predicate(
pending_msg: &Signed<ConsensusMsg>,
new_msg: &Signed<ConsensusMsg>,
) -> bool {
if pending_msg.key != new_msg.key {
pub fn inbound_pruning_predicate(pending_req: &ConsensusReq, new_req: &ConsensusReq) -> bool {
if pending_req.msg.key != new_req.msg.key {
return false;
}
match (&pending_msg.msg, &new_msg.msg) {
match (&pending_req.msg.msg, &new_req.msg.msg) {
(ConsensusMsg::LeaderPrepare(existing_msg), ConsensusMsg::LeaderPrepare(new_msg)) => {
new_msg.view > existing_msg.view
}
Expand Down
43 changes: 18 additions & 25 deletions node/libs/concurrency/src/sync/prunable_mpsc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! Prunable, multi-producer, single-consumer, unbounded FIFO queue for communicating between asynchronous tasks.
//! The pruning takes place whenever a new value is sent, based on a specified predicate.
//! The channel also facilitates the asynchronous returning of result associated with the processing of values received from the channel.
//!
//! The separation of [`Sender`] and [`Receiver`] is employed primarily because [`Receiver`] requires
//! a mutable reference to the signaling channel, unlike [`Sender`], hence making it undesirable to
//! be used in conjunction.
//!
use crate::{
ctx, oneshot,
ctx,
sync::{self, watch, Mutex},
};
use std::{collections::VecDeque, fmt, sync::Arc};
Expand All @@ -21,13 +20,12 @@ mod tests;
/// The Sender can be cloned to send to the same channel from multiple code locations. Only one Receiver is supported.
///
/// * [`T`]: The type of data that will be sent through the channel.
/// * [`U`]: The type of the asynchronous returning of result associated with the processing of values received from the channel.
/// * [`pruning_predicate`]: A function that determines whether an unreceived, pending value in the buffer should be pruned based on a newly sent value.
///
pub fn channel<T, U>(
pub fn channel<T>(
pruning_predicate: impl 'static + Sync + Send + Fn(&T, &T) -> bool,
) -> (Sender<T, U>, Receiver<T, U>) {
let queue: Mutex<VecDeque<(T, oneshot::Sender<U>)>> = Mutex::new(VecDeque::new());
) -> (Sender<T>, Receiver<T>) {
let queue: Mutex<VecDeque<T>> = Mutex::new(VecDeque::new());
// Internal watch, to enable waiting on the receiver side for new values.
let (has_values_send, has_values_recv) = watch::channel(false);

Expand All @@ -49,54 +47,49 @@ pub fn channel<T, U>(
(send, recv)
}

struct Shared<T, U> {
buffer: Mutex<VecDeque<(T, oneshot::Sender<U>)>>,
struct Shared<T> {
buffer: Mutex<VecDeque<T>>,
has_values_send: watch::Sender<bool>,
}

/// Sends values to the associated [`Receiver`].
/// Instances are created by the [`channel`] function.
#[allow(clippy::type_complexity)]
pub struct Sender<T, U> {
shared: Arc<Shared<T, U>>,
pub struct Sender<T> {
shared: Arc<Shared<T>>,
pruning_predicate: Box<dyn Sync + Send + Fn(&T, &T) -> bool>,
}

impl<T, U> Sender<T, U> {
impl<T> Sender<T> {
/// Sends a value.
/// This initiates the pruning procedure which operates in O(N) time complexity
/// on the buffer of pending values.
pub async fn send(&self, value: T) -> oneshot::Receiver<U> {
// Create oneshot channel for returning result asynchronously.
let (res_send, res_recv) = oneshot::channel();

pub async fn send(&self, value: T) {
let mut buffer = self.shared.buffer.lock().await;
buffer.retain(|pending_value| !(self.pruning_predicate)(&pending_value.0, &value));
buffer.push_back((value, res_send));
buffer.retain(|pending_value| !(self.pruning_predicate)(&pending_value, &value));
buffer.push_back(value);

self.shared.has_values_send.send_replace(true);

res_recv
}
}

impl<T, U> fmt::Debug for Sender<T, U> {
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Sender").finish()
}
}

/// Receives values from the associated [`Sender`].
/// Instances are created by the [`channel`] function.
pub struct Receiver<T, U> {
shared: Arc<Shared<T, U>>,
pub struct Receiver<T> {
shared: Arc<Shared<T>>,
has_values_recv: watch::Receiver<bool>,
}

impl<T, U> Receiver<T, U> {
impl<T> Receiver<T> {
/// Receives the next value for this receiver.
/// If there are no messages in the buffer, this method will hang until a message is sent.
pub async fn recv(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<(T, oneshot::Sender<U>)> {
pub async fn recv(&mut self, ctx: &ctx::Ctx) -> ctx::OrCanceled<T> {
sync::wait_for(ctx, &mut self.has_values_recv, |has_values| *has_values).await?;
let mut buffer = self.shared.buffer.lock().await;
// `None` is unexpected because we waited for new values, and there's only a single receiver.
Expand All @@ -110,7 +103,7 @@ impl<T, U> Receiver<T, U> {
}
}

impl<T, U> fmt::Debug for Receiver<T, U> {
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Receiver").finish()
}
Expand Down
Loading

0 comments on commit 2209d5d

Please sign in to comment.