Skip to content

Commit

Permalink
Consensus forwarder
Browse files Browse the repository at this point in the history
  • Loading branch information
daltoncoder committed Jun 30, 2023
1 parent ab7e267 commit 66b398d
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 40 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2156,6 +2156,7 @@ dependencies = [
"draco-interfaces",
"fastcrypto",
"fleek-crypto",
"futures",
"multiaddr",
"mysten-metrics",
"mysten-network",
Expand All @@ -2170,10 +2171,12 @@ dependencies = [
"narwhal-worker",
"prometheus",
"rand 0.8.5",
"rayon",
"resolve-path",
"serde",
"sui-protocol-config",
"tokio",
"tonic",
]

[[package]]
Expand Down
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bincode = "1.3.3"
num-bigint = "0.4.0"
bytes = "1.4"
bigdecimal = "0.3.1"
futures = "0.3"
jsonrpc-v2 = "0.11.0"
lru = "0.10.0"
rand = "0.8.5"
Expand All @@ -35,14 +36,14 @@ dirs = "5.0.1"
affair = { path = "lib/affair" }
atomo = { path = "lib/atomo" }
fleek-crypto = { path = "lib/fleek-crypto" }
hp-float = { path= "lib/hp-float" }
hp-float = { path = "lib/hp-float" }

# Since MystenLabs uses Hakari in the sui repository, and we bring narwhal in from there, this
# results into all of their dependencies and packages to be listed in their workspace-hack crate.
# That adds a huge amount of unwanted dependencies to our crate. The `empty-workspace-hack` is an
# empty crate which only depends on `strum` which is the only dependency narwhal cared about.
[patch.'https://github.com/MystenLabs/sui.git']
workspace-hack = { git = "https://github.com/fleek-network/empty-workspace-hack.git", rev = "c07eb1e343a455d57a5481b50eada03c62b4f2c6"}
workspace-hack = { git = "https://github.com/fleek-network/empty-workspace-hack.git", rev = "c07eb1e343a455d57a5481b50eada03c62b4f2c6" }

[profile.release]
# 2 full, 0 nothing, 1 good enough.
Expand Down
6 changes: 5 additions & 1 deletion core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ fleek-crypto.workspace = true
async-trait.workspace = true
bincode.workspace = true
draco-interfaces = { path = "../interfaces" }
futures.workspace = true
anyhow.workspace = true
serde.workspace = true
tokio.workspace = true

rand = "0.8.5"
resolve-path = "0.1.0"
rayon = "1.7.0"

# Narwhal 65ae52bdc72c6f7f504510d83b73cf2404562f2f
# Narwhal b06ada015694890d7c46347b13fbc3e9a763513c
narwhal-config = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "narwhal-config" }
narwhal-consensus = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "narwhal-consensus" }
narwhal-crypto = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "narwhal-crypto" }
Expand All @@ -31,6 +33,8 @@ narwhal-network = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada
mysten-metrics = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "mysten-metrics" }
mysten-network = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "mysten-network" }
sui-protocol-config = { git = "https://github.com/MystenLabs/sui.git", rev = "b06ada015694890d7c46347b13fbc3e9a763513c", package = "sui-protocol-config" }
tonic = { version = "0.8.2", features = ["transport", "tls"] }

prometheus = "0.13.3"
multiaddr = "0.17.1"
fastcrypto = { git = "https://github.com/MystenLabs/fastcrypto", rev = "c961a01596a87e76f590c7e43aca9d57106dbbb1" }
168 changes: 131 additions & 37 deletions core/consensus/src/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,25 @@
will then forward the transaction to an active narwhal committee member, prefering its own worker if the node is currently
on the committee. Retry logic should be done on the proccess that has the sender side of the socket.
*/
use std::{
cmp,
collections::{hash_map::Entry, HashMap},
};

use affair::AsyncWorker;
use anyhow::{bail, Result};
use async_trait::async_trait;
use draco_interfaces::{
types::{Epoch, NodeInfo, UpdateRequest},
types::{Epoch, EpochInfo, NodeInfo, UpdateRequest},
SyncQueryRunnerInterface,
};
use fastcrypto::bls12381::min_sig::BLS12381PublicKey;
use fleek_crypto::NodePublicKey;
use narwhal_types::{TransactionProto, TransactionsClient};
use rand::seq::SliceRandom;
use tonic::transport::channel::Channel;

const TARGETED_CONNECTION_NUM: usize = 10;

pub struct Forwarder<Q: SyncQueryRunnerInterface> {
/// Query runner used to read application state
Expand All @@ -22,6 +32,15 @@ pub struct Forwarder<Q: SyncQueryRunnerInterface> {
epoch: Epoch,
/// List of the committee members
committee: Vec<NodeInfo>,
/// Cursor that keeps track of where we are in the committee vec, so we try connecting to new
/// nodes and loop back if connections fail
cursor: usize,
/// maximum number of connections we will try and make to other workers, this epoch
max_connections: usize,
/// When our connections drop under this number we will try and connect to new workers
min_connections: usize,
/// Open connections to committee workers
active_connections: HashMap<usize, TransactionsClient<Channel>>,
}

impl<Q: SyncQueryRunnerInterface> Forwarder<Q> {
Expand All @@ -31,58 +50,133 @@ impl<Q: SyncQueryRunnerInterface> Forwarder<Q> {
primary_name: primary_name.into(),
epoch: 0,
committee: Vec::new(),
cursor: 0,
max_connections: 0,
min_connections: 0,
active_connections: HashMap::with_capacity(TARGETED_CONNECTION_NUM),
}
}
}

#[async_trait]
impl<Q: SyncQueryRunnerInterface + 'static> AsyncWorker for Forwarder<Q> {
type Request = UpdateRequest;
type Response = ();

async fn handle(&mut self, req: Self::Request) -> Self::Response {
async fn handle_forward(&mut self, req: &UpdateRequest) -> Result<()> {
// Grab the epoch
let epoch = self.query_runner.get_epoch();

// If the epoch is different then the last time we grabbed the committee, or if we dont have
// any committee info repull the committee info from application
if epoch != self.epoch || self.committee.is_empty() {
let committee = self.query_runner.get_epoch_info().committee;

if committee.iter().any(|x| x.public_key == self.primary_name) {
self.committee = committee
.iter()
.filter(|x| x.public_key == self.primary_name)
.cloned()
.collect()
} else {
self.committee = committee;
}
self.epoch = epoch;
// If the epoch is different then the last time we grabbed the committee refresh
if epoch != self.epoch {
self.refresh_epoch();
}

if self.committee.is_empty() {
return;
// If there is no open connections try and make a connection with the targeted number of
// peers or the rest of committee we have not tried yet
if self.active_connections.len() < self.min_connections {
self.make_connections().await?;
}

let mempool_address = self.committee[0].workers[0].mempool.to_string();

// serialize transaction
let txn_bytes = match bincode::serialize(&req) {
Ok(bytes) => bytes,
_ => return,
};
let txn_bytes = bincode::serialize(&req)?;

let request = TransactionProto {
transaction: txn_bytes.into(),
};

// Send to committee
let mut client = match TransactionsClient::connect(mempool_address).await {
Ok(client) => client,
_ => return,
};
// Here we take ownership of self.active_connections by swapping it with an empty vec, so we
// can try and send the transaction to each worker we have a connection with. If the
// send was successful we keep the connection, if the send was not successful we assume it a
// bad connection and drop the client
let cap = self.active_connections.capacity();
let active_connections =
std::mem::replace(&mut self.active_connections, HashMap::with_capacity(cap));

self.active_connections.extend(
futures::future::join_all(active_connections.into_iter().map(|mut e| async {
e.1.submit_transaction(request.clone())
.await
.ok()
.map(|_| e)
}))
.await
.into_iter()
// Note that `Option<T>: Iter<T>`. So here we can use flatten. If the option is Some,
// then the inner iterator will yield one item of type `T` and if the option is None
// then it will be an empty iterator.
.flatten(),
);

if self.active_connections.is_empty() {
bail!("Failed sending transaction to any worker")
}
Ok(())
}

fn refresh_epoch(&mut self) {
let EpochInfo {
mut committee,
epoch,
..
} = self.query_runner.get_epoch_info();

// If our node is on the committee, return a vec with just our node. Or return a vec of all
// the committee members
if let Some(item) = committee.iter().find(|x| x.public_key == self.primary_name) {
self.committee = vec![item.clone()];
} else {
// shuffle the order with thread_range so nodes accross the network are not all
// connecting to the same workers
committee.shuffle(&mut rand::thread_rng());
self.committee = committee;
}

// Set the epoch info to the newest epoch
self.epoch = epoch;

let _ = client.submit_transaction(request).await;
self.max_connections = cmp::min(TARGETED_CONNECTION_NUM, self.committee.len());
self.min_connections = self.max_connections / 3 * 2 + 1;
}

async fn make_connections(&mut self) -> Result<()> {
let start_index = self.cursor;

while self.active_connections.len() < self.min_connections {
// Only try to make a connection with this worker if we dont already have one
if let Entry::Vacant(e) = self.active_connections.entry(self.cursor) {
let mempool = self.committee[self.cursor].workers[0].mempool.to_string();
if let Ok(client) = TransactionsClient::connect(mempool).await {
e.insert(client);
}
}

// Increment cursor to modulo so we never try to access out of bounds index
self.cursor = (self.cursor + 1) % self.committee.len();

// If we have travelled the entire ring buffer break the loop...we tried our best
if self.cursor == start_index {
break;
}
}

if self.active_connections.is_empty() {
bail!("Couldnt get a connection with a committee member")
}

Ok(())
}
}

#[async_trait]
impl<Q: SyncQueryRunnerInterface + 'static> AsyncWorker for Forwarder<Q> {
type Request = UpdateRequest;
type Response = ();

async fn handle(&mut self, req: Self::Request) -> Self::Response {
// if it fails we should retry once to cover all edge cases
let mut retried = 0;
while retried < 2 {
if let Err(e) = self.handle_forward(&req).await {
println!("Failed to send transaction to a worker: {e}");
retried += 1;
} else {
break;
}
}
}
}

0 comments on commit 66b398d

Please sign in to comment.