From 66b398d068e36c73e7f739cf41011a2db97d8002 Mon Sep 17 00:00:00 2001 From: daltoncoder Date: Thu, 29 Jun 2023 20:12:06 -0400 Subject: [PATCH] Consensus forwarder --- Cargo.lock | 3 + Cargo.toml | 5 +- core/consensus/Cargo.toml | 6 +- core/consensus/src/forwarder.rs | 168 +++++++++++++++++++++++++------- 4 files changed, 142 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da4ecdc3a..95843ce16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2156,6 +2156,7 @@ dependencies = [ "draco-interfaces", "fastcrypto", "fleek-crypto", + "futures", "multiaddr", "mysten-metrics", "mysten-network", @@ -2170,10 +2171,12 @@ dependencies = [ "narwhal-worker", "prometheus", "rand 0.8.5", + "rayon", "resolve-path", "serde", "sui-protocol-config", "tokio", + "tonic", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 055973861..cb0fb154b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ bincode = "1.3.3" num-bigint = "0.4.0" bytes = "1.4" bigdecimal = "0.3.1" +futures = "0.3" jsonrpc-v2 = "0.11.0" lru = "0.10.0" rand = "0.8.5" @@ -35,14 +36,14 @@ dirs = "5.0.1" affair = { path = "lib/affair" } atomo = { path = "lib/atomo" } fleek-crypto = { path = "lib/fleek-crypto" } -hp-float = { path= "lib/hp-float" } +hp-float = { path = "lib/hp-float" } # Since MystenLabs uses Hakari in the sui repository, and we bring narwhal in from there, this # results into all of their dependencies and packages to be listed in their workspace-hack crate. # That adds a huge amount of unwanted dependencies to our crate. The `empty-workspace-hack` is an # empty crate which only depends on `strum` which is the only dependency narwhal cared about. [patch.'https://github.com/MystenLabs/sui.git'] -workspace-hack = { git = "https://github.com/fleek-network/empty-workspace-hack.git", rev = "c07eb1e343a455d57a5481b50eada03c62b4f2c6"} +workspace-hack = { git = "https://github.com/fleek-network/empty-workspace-hack.git", rev = "c07eb1e343a455d57a5481b50eada03c62b4f2c6" } [profile.release] # 2 full, 0 nothing, 1 good enough. diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml index 28a39fafa..e6ea66716 100644 --- a/core/consensus/Cargo.toml +++ b/core/consensus/Cargo.toml @@ -11,14 +11,16 @@ fleek-crypto.workspace = true async-trait.workspace = true bincode.workspace = true draco-interfaces = { path = "../interfaces" } +futures.workspace = true anyhow.workspace = true serde.workspace = true tokio.workspace = true rand = "0.8.5" resolve-path = "0.1.0" +rayon = "1.7.0" -# Narwhal 65ae52bdc72c6f7f504510d83b73cf2404562f2f +# Narwhal b06ada015694890d7c46347b13fbc3e9a763513c narwhal-config = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "narwhal-config" } narwhal-consensus = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "narwhal-consensus" } narwhal-crypto = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "narwhal-crypto" } @@ -31,6 +33,8 @@ narwhal-network = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada mysten-metrics = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "mysten-metrics" } mysten-network = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "mysten-network" } sui-protocol-config = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "sui-protocol-config" } +tonic = { version = "0.8.2", features = ["transport", "tls"] } + prometheus = "0.13.3" multiaddr = "0.17.1" fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "c961a01596a87e76f590c7e43aca9d57106dbbb1" } diff --git a/core/consensus/src/forwarder.rs b/core/consensus/src/forwarder.rs index f808c11df..899bbd59a 100644 --- a/core/consensus/src/forwarder.rs +++ b/core/consensus/src/forwarder.rs @@ -3,15 +3,25 @@ will then forward the transaction to an active narwhal committee member, prefering its own worker if the node is currently on the committee. Retry logic should be done on the proccess that has the sender side of the socket. */ +use std::{ + cmp, + collections::{hash_map::Entry, HashMap}, +}; + use affair::AsyncWorker; +use anyhow::{bail, Result}; use async_trait::async_trait; use draco_interfaces::{ - types::{Epoch, NodeInfo, UpdateRequest}, + types::{Epoch, EpochInfo, NodeInfo, UpdateRequest}, SyncQueryRunnerInterface, }; use fastcrypto::bls12381::min_sig::BLS12381PublicKey; use fleek_crypto::NodePublicKey; use narwhal_types::{TransactionProto, TransactionsClient}; +use rand::seq::SliceRandom; +use tonic::transport::channel::Channel; + +const TARGETED_CONNECTION_NUM: usize = 10; pub struct Forwarder { /// Query runner used to read application state @@ -22,6 +32,15 @@ pub struct Forwarder { epoch: Epoch, /// List of the committee members committee: Vec, + /// Cursor that keeps track of where we are in the committee vec, so we try connecting to new + /// nodes and loop back if connections fail + cursor: usize, + /// maximum number of connections we will try and make to other workers, this epoch + max_connections: usize, + /// When our connections drop under this number we will try and connect to new workers + min_connections: usize, + /// Open connections to committee workers + active_connections: HashMap>, } impl Forwarder { @@ -31,58 +50,133 @@ impl Forwarder { primary_name: primary_name.into(), epoch: 0, committee: Vec::new(), + cursor: 0, + max_connections: 0, + min_connections: 0, + active_connections: HashMap::with_capacity(TARGETED_CONNECTION_NUM), } } -} - -#[async_trait] -impl AsyncWorker for Forwarder { - type Request = UpdateRequest; - type Response = (); - async fn handle(&mut self, req: Self::Request) -> Self::Response { + async fn handle_forward(&mut self, req: &UpdateRequest) -> Result<()> { // Grab the epoch let epoch = self.query_runner.get_epoch(); - // If the epoch is different then the last time we grabbed the committee, or if we dont have - // any committee info repull the committee info from application - if epoch != self.epoch || self.committee.is_empty() { - let committee = self.query_runner.get_epoch_info().committee; - - if committee.iter().any(|x| x.public_key == self.primary_name) { - self.committee = committee - .iter() - .filter(|x| x.public_key == self.primary_name) - .cloned() - .collect() - } else { - self.committee = committee; - } - self.epoch = epoch; + // If the epoch is different then the last time we grabbed the committee refresh + if epoch != self.epoch { + self.refresh_epoch(); } - if self.committee.is_empty() { - return; + // If there is no open connections try and make a connection with the targeted number of + // peers or the rest of committee we have not tried yet + if self.active_connections.len() < self.min_connections { + self.make_connections().await?; } - let mempool_address = self.committee[0].workers[0].mempool.to_string(); - // serialize transaction - let txn_bytes = match bincode::serialize(&req) { - Ok(bytes) => bytes, - _ => return, - }; + let txn_bytes = bincode::serialize(&req)?; let request = TransactionProto { transaction: txn_bytes.into(), }; - // Send to committee - let mut client = match TransactionsClient::connect(mempool_address).await { - Ok(client) => client, - _ => return, - }; + // Here we take ownership of self.active_connections by swapping it with an empty vec, so we + // can try and send the transaction to each worker we have a connection with. If the + // send was successful we keep the connection, if the send was not successful we assume it a + // bad connection and drop the client + let cap = self.active_connections.capacity(); + let active_connections = + std::mem::replace(&mut self.active_connections, HashMap::with_capacity(cap)); + + self.active_connections.extend( + futures::future::join_all(active_connections.into_iter().map(|mut e| async { + e.1.submit_transaction(request.clone()) + .await + .ok() + .map(|_| e) + })) + .await + .into_iter() + // Note that `Option: Iter`. So here we can use flatten. If the option is Some, + // then the inner iterator will yield one item of type `T` and if the option is None + // then it will be an empty iterator. + .flatten(), + ); + + if self.active_connections.is_empty() { + bail!("Failed sending transaction to any worker") + } + Ok(()) + } + + fn refresh_epoch(&mut self) { + let EpochInfo { + mut committee, + epoch, + .. + } = self.query_runner.get_epoch_info(); + + // If our node is on the committee, return a vec with just our node. Or return a vec of all + // the committee members + if let Some(item) = committee.iter().find(|x| x.public_key == self.primary_name) { + self.committee = vec![item.clone()]; + } else { + // shuffle the order with thread_range so nodes accross the network are not all + // connecting to the same workers + committee.shuffle(&mut rand::thread_rng()); + self.committee = committee; + } + + // Set the epoch info to the newest epoch + self.epoch = epoch; - let _ = client.submit_transaction(request).await; + self.max_connections = cmp::min(TARGETED_CONNECTION_NUM, self.committee.len()); + self.min_connections = self.max_connections / 3 * 2 + 1; + } + + async fn make_connections(&mut self) -> Result<()> { + let start_index = self.cursor; + + while self.active_connections.len() < self.min_connections { + // Only try to make a connection with this worker if we dont already have one + if let Entry::Vacant(e) = self.active_connections.entry(self.cursor) { + let mempool = self.committee[self.cursor].workers[0].mempool.to_string(); + if let Ok(client) = TransactionsClient::connect(mempool).await { + e.insert(client); + } + } + + // Increment cursor to modulo so we never try to access out of bounds index + self.cursor = (self.cursor + 1) % self.committee.len(); + + // If we have travelled the entire ring buffer break the loop...we tried our best + if self.cursor == start_index { + break; + } + } + + if self.active_connections.is_empty() { + bail!("Couldnt get a connection with a committee member") + } + + Ok(()) + } +} + +#[async_trait] +impl AsyncWorker for Forwarder { + type Request = UpdateRequest; + type Response = (); + + async fn handle(&mut self, req: Self::Request) -> Self::Response { + // if it fails we should retry once to cover all edge cases + let mut retried = 0; + while retried < 2 { + if let Err(e) = self.handle_forward(&req).await { + println!("Failed to send transaction to a worker: {e}"); + retried += 1; + } else { + break; + } + } } }