Skip to content

Commit

Permalink
Consensus txn forwarder/mempool socket init
Browse files Browse the repository at this point in the history
  • Loading branch information
daltoncoder committed Jun 28, 2023
1 parent 4a874bb commit 6175da5
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2149,6 +2149,7 @@ dependencies = [
name = "draco-consensus"
version = "0.0.0"
dependencies = [
"affair",
"anyhow",
"async-trait",
"bincode",
Expand Down
9 changes: 9 additions & 0 deletions core/application/src/query_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ impl SyncQueryRunnerInterface for QueryRunner {
})
}

fn get_epoch(&self) -> Epoch {
self.inner.run(
|ctx| match self.metadata_table.get(ctx).get(&Metadata::Epoch) {
Some(Value::Epoch(epoch)) => epoch,
_ => 0,
},
)
}

fn get_epoch_info(&self) -> EpochInfo {
self.inner.run(|ctx| {
let node_table = self.node_table.get(ctx);
Expand Down
1 change: 1 addition & 0 deletions core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
affair.workspace = true
fleek-crypto.workspace = true
async-trait.workspace = true
bincode.workspace = true
Expand Down
14 changes: 10 additions & 4 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
time::{Duration, SystemTime},
};

use affair::{Executor, TokioSpawn};
use async_trait::async_trait;
use draco_interfaces::{
application::ExecutionEngineSocket,
Expand Down Expand Up @@ -33,6 +34,7 @@ use tokio::{
use crate::{
config::Config,
execution::Execution,
forwarder::Forwarder,
narwhal::{NarwhalArgs, NarwhalService},
};

Expand All @@ -50,6 +52,9 @@ pub struct Consensus<Q: SyncQueryRunnerInterface> {
/// so its not always sending a transaction to its own mempool. The signer interface
/// also takes care of nonce bookkeeping and retry logic
txn_socket: SubmitTxSocket,
/// This socket recieves signed transactions and forwards them to an active committee member to
/// be ordered
mempool_socket: MempoolSocket,
/// Narwhal execution state.
execution_state: Arc<Execution>,
/// Timestamp of the narwhal certificate that caused an epoch change
Expand Down Expand Up @@ -261,14 +266,14 @@ impl<Q: SyncQueryRunnerInterface> ConsensusInterface for Consensus<Q> {
let reconfigure_notify = Arc::new(Notify::new());

let networking_keypair = NetworkKeyPair::from(networking_sk);

// TODO(dalton): Give signer mempool socket
let primary_keypair = KeyPair::from(primary_sk);
let forwarder = Forwarder::new(query_runner.clone(), primary_keypair.public().clone());

Ok(Self {
query_runner,
store_path: config.store_path,
narwhal_args: NarwhalArgs {
primary_keypair: KeyPair::from(primary_sk),
primary_keypair,
primary_network_keypair: networking_keypair.copy(),
worker_keypair: networking_keypair,
primary_address: config.address,
Expand All @@ -279,6 +284,7 @@ impl<Q: SyncQueryRunnerInterface> ConsensusInterface for Consensus<Q> {
epoch_state: Mutex::new(None),
execution_state: Arc::new(Execution::new(executor, reconfigure_notify.clone())),
txn_socket: signer.get_socket(),
mempool_socket: TokioSpawn::spawn_async(forwarder),
reconfigure_notify,
shutdown_notify: Notify::new(),
})
Expand All @@ -288,6 +294,6 @@ impl<Q: SyncQueryRunnerInterface> ConsensusInterface for Consensus<Q> {
/// this can be used by any other systems that are interested in posting some
/// transaction to the consensus.
fn mempool(&self) -> MempoolSocket {
todo!()
self.mempool_socket.clone()
}
}
88 changes: 88 additions & 0 deletions core/consensus/src/forwarder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
The purpose of the forwarder is to provide a socket that other processess can send signed transactions too. The forwarder
will then forward the transaction to an active narwhal committee member, prefering its own worker if the node is currently
on the committee. Retry logic should be done on the proccess that has the sender side of the socket.
*/
use affair::AsyncWorker;
use async_trait::async_trait;
use draco_interfaces::{
types::{Epoch, NodeInfo, UpdateRequest},
SyncQueryRunnerInterface,
};
use fastcrypto::bls12381::min_sig::BLS12381PublicKey;
use fleek_crypto::NodePublicKey;
use narwhal_types::{TransactionProto, TransactionsClient};

pub struct Forwarder<Q: SyncQueryRunnerInterface> {
/// Query runner used to read application state
query_runner: Q,
/// The public key of this node
primary_name: NodePublicKey,
/// Current Epoch
epoch: Epoch,
/// List of the committee members
committee: Vec<NodeInfo>,
}

impl<Q: SyncQueryRunnerInterface> Forwarder<Q> {
pub fn new(query_runner: Q, primary_name: BLS12381PublicKey) -> Self {
Self {
query_runner,
primary_name: primary_name.into(),
epoch: 0,
committee: Vec::new(),
}
}
}

#[async_trait]
impl<Q: SyncQueryRunnerInterface + 'static> AsyncWorker for Forwarder<Q> {
type Request = UpdateRequest;
type Response = ();

async fn handle(&mut self, req: Self::Request) -> Self::Response {
// Grab the epoch
let epoch = self.query_runner.get_epoch();

// If the epoch is different then the last time we grabbed the committee, or if we dont have
// any committee info repull the committee info from application
if epoch != self.epoch || self.committee.is_empty() {
let committee = self.query_runner.get_epoch_info().committee;

if committee.iter().any(|x| x.public_key == self.primary_name) {
self.committee = committee
.iter()
.filter(|x| x.public_key == self.primary_name)
.cloned()
.collect()
} else {
self.committee = committee;
}
self.epoch = epoch;
}

if self.committee.is_empty() {
return;
}

let mempool_address = self.committee[0].workers[0].mempool.to_string();

// serialize transaction
let txn_bytes = match bincode::serialize(&req) {
Ok(bytes) => bytes,
_ => return,
};

let request = TransactionProto {
transaction: txn_bytes.into(),
};

// Send to committee
let mut client = match TransactionsClient::connect(mempool_address).await {
Ok(client) => client,
_ => return,
};

let _ = client.submit_transaction(request).await;
}
}
1 change: 1 addition & 0 deletions core/consensus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod config;
pub mod consensus;
pub mod execution;
pub mod forwarder;
pub mod narwhal;
pub mod validator;
4 changes: 3 additions & 1 deletion core/interfaces/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub trait ApplicationInterface:
fn sync_query(&self) -> Self::SyncExecutor;
}

pub trait SyncQueryRunnerInterface: Clone + Send + Sync {
pub trait SyncQueryRunnerInterface: Clone + Send + Sync + 'static {
/// Returns the latest bandwidth balance associated with the given account public key.
fn get_account_balance(&self, account: &AccountOwnerPublicKey) -> u128;

Expand Down Expand Up @@ -128,6 +128,8 @@ pub trait SyncQueryRunnerInterface: Clone + Send + Sync {
/// Returns the committee members of the current epoch.
fn get_committee_members(&self) -> Vec<NodePublicKey>;

/// Returns just the current epoch
fn get_epoch(&self) -> Epoch;
/// Returns all the information on the current epoch that Narwhal needs to run
fn get_epoch_info(&self) -> EpochInfo;

Expand Down

0 comments on commit 6175da5

Please sign in to comment.