diff --git a/Cargo.lock b/Cargo.lock index 223bec2b5..da4ecdc3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2149,6 +2149,7 @@ dependencies = [ name = "draco-consensus" version = "0.0.0" dependencies = [ + "affair", "anyhow", "async-trait", "bincode", diff --git a/core/application/src/query_runner.rs b/core/application/src/query_runner.rs index 2e42c3b97..8120cb67a 100644 --- a/core/application/src/query_runner.rs +++ b/core/application/src/query_runner.rs @@ -192,6 +192,15 @@ impl SyncQueryRunnerInterface for QueryRunner { }) } + fn get_epoch(&self) -> Epoch { + self.inner.run( + |ctx| match self.metadata_table.get(ctx).get(&Metadata::Epoch) { + Some(Value::Epoch(epoch)) => epoch, + _ => 0, + }, + ) + } + fn get_epoch_info(&self) -> EpochInfo { self.inner.run(|ctx| { let node_table = self.node_table.get(ctx); diff --git a/core/consensus/Cargo.toml b/core/consensus/Cargo.toml index 3c1562c79..28a39fafa 100644 --- a/core/consensus/Cargo.toml +++ b/core/consensus/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +affair.workspace = true fleek-crypto.workspace = true async-trait.workspace = true bincode.workspace = true diff --git a/core/consensus/src/consensus.rs b/core/consensus/src/consensus.rs index 3852bd187..0892b4d80 100644 --- a/core/consensus/src/consensus.rs +++ b/core/consensus/src/consensus.rs @@ -5,6 +5,7 @@ use std::{ time::{Duration, SystemTime}, }; +use affair::{Executor, TokioSpawn}; use async_trait::async_trait; use draco_interfaces::{ application::ExecutionEngineSocket, @@ -33,6 +34,7 @@ use tokio::{ use crate::{ config::Config, execution::Execution, + forwarder::Forwarder, narwhal::{NarwhalArgs, NarwhalService}, }; @@ -50,6 +52,9 @@ pub struct Consensus { /// so its not always sending a transaction to its own mempool. The signer interface /// also takes care of nonce bookkeeping and retry logic txn_socket: SubmitTxSocket, + /// This socket recieves signed transactions and forwards them to an active committee member to + /// be ordered + mempool_socket: MempoolSocket, /// Narwhal execution state. execution_state: Arc, /// Timestamp of the narwhal certificate that caused an epoch change @@ -261,14 +266,14 @@ impl ConsensusInterface for Consensus { let reconfigure_notify = Arc::new(Notify::new()); let networking_keypair = NetworkKeyPair::from(networking_sk); - - // TODO(dalton): Give signer mempool socket + let primary_keypair = KeyPair::from(primary_sk); + let forwarder = Forwarder::new(query_runner.clone(), primary_keypair.public().clone()); Ok(Self { query_runner, store_path: config.store_path, narwhal_args: NarwhalArgs { - primary_keypair: KeyPair::from(primary_sk), + primary_keypair, primary_network_keypair: networking_keypair.copy(), worker_keypair: networking_keypair, primary_address: config.address, @@ -279,6 +284,7 @@ impl ConsensusInterface for Consensus { epoch_state: Mutex::new(None), execution_state: Arc::new(Execution::new(executor, reconfigure_notify.clone())), txn_socket: signer.get_socket(), + mempool_socket: TokioSpawn::spawn_async(forwarder), reconfigure_notify, shutdown_notify: Notify::new(), }) @@ -288,6 +294,6 @@ impl ConsensusInterface for Consensus { /// this can be used by any other systems that are interested in posting some /// transaction to the consensus. fn mempool(&self) -> MempoolSocket { - todo!() + self.mempool_socket.clone() } } diff --git a/core/consensus/src/forwarder.rs b/core/consensus/src/forwarder.rs new file mode 100644 index 000000000..f808c11df --- /dev/null +++ b/core/consensus/src/forwarder.rs @@ -0,0 +1,88 @@ +/* + The purpose of the forwarder is to provide a socket that other processess can send signed transactions too. The forwarder + will then forward the transaction to an active narwhal committee member, prefering its own worker if the node is currently + on the committee. Retry logic should be done on the proccess that has the sender side of the socket. +*/ +use affair::AsyncWorker; +use async_trait::async_trait; +use draco_interfaces::{ + types::{Epoch, NodeInfo, UpdateRequest}, + SyncQueryRunnerInterface, +}; +use fastcrypto::bls12381::min_sig::BLS12381PublicKey; +use fleek_crypto::NodePublicKey; +use narwhal_types::{TransactionProto, TransactionsClient}; + +pub struct Forwarder { + /// Query runner used to read application state + query_runner: Q, + /// The public key of this node + primary_name: NodePublicKey, + /// Current Epoch + epoch: Epoch, + /// List of the committee members + committee: Vec, +} + +impl Forwarder { + pub fn new(query_runner: Q, primary_name: BLS12381PublicKey) -> Self { + Self { + query_runner, + primary_name: primary_name.into(), + epoch: 0, + committee: Vec::new(), + } + } +} + +#[async_trait] +impl AsyncWorker for Forwarder { + type Request = UpdateRequest; + type Response = (); + + async fn handle(&mut self, req: Self::Request) -> Self::Response { + // Grab the epoch + let epoch = self.query_runner.get_epoch(); + + // If the epoch is different then the last time we grabbed the committee, or if we dont have + // any committee info repull the committee info from application + if epoch != self.epoch || self.committee.is_empty() { + let committee = self.query_runner.get_epoch_info().committee; + + if committee.iter().any(|x| x.public_key == self.primary_name) { + self.committee = committee + .iter() + .filter(|x| x.public_key == self.primary_name) + .cloned() + .collect() + } else { + self.committee = committee; + } + self.epoch = epoch; + } + + if self.committee.is_empty() { + return; + } + + let mempool_address = self.committee[0].workers[0].mempool.to_string(); + + // serialize transaction + let txn_bytes = match bincode::serialize(&req) { + Ok(bytes) => bytes, + _ => return, + }; + + let request = TransactionProto { + transaction: txn_bytes.into(), + }; + + // Send to committee + let mut client = match TransactionsClient::connect(mempool_address).await { + Ok(client) => client, + _ => return, + }; + + let _ = client.submit_transaction(request).await; + } +} diff --git a/core/consensus/src/lib.rs b/core/consensus/src/lib.rs index cf5dc9203..010cb28d8 100644 --- a/core/consensus/src/lib.rs +++ b/core/consensus/src/lib.rs @@ -1,5 +1,6 @@ pub mod config; pub mod consensus; pub mod execution; +pub mod forwarder; pub mod narwhal; pub mod validator; diff --git a/core/interfaces/src/application.rs b/core/interfaces/src/application.rs index de3fd2d02..e615d29f4 100644 --- a/core/interfaces/src/application.rs +++ b/core/interfaces/src/application.rs @@ -69,7 +69,7 @@ pub trait ApplicationInterface: fn sync_query(&self) -> Self::SyncExecutor; } -pub trait SyncQueryRunnerInterface: Clone + Send + Sync { +pub trait SyncQueryRunnerInterface: Clone + Send + Sync + 'static { /// Returns the latest bandwidth balance associated with the given account public key. fn get_account_balance(&self, account: &AccountOwnerPublicKey) -> u128; @@ -128,6 +128,8 @@ pub trait SyncQueryRunnerInterface: Clone + Send + Sync { /// Returns the committee members of the current epoch. fn get_committee_members(&self) -> Vec; + /// Returns just the current epoch + fn get_epoch(&self) -> Epoch; /// Returns all the information on the current epoch that Narwhal needs to run fn get_epoch_info(&self) -> EpochInfo;