From ed5cac95be5767f8b9d04ae73e44dc6052463774 Mon Sep 17 00:00:00 2001 From: Parsa Ghadimi Date: Tue, 18 Jul 2023 15:00:10 -0400 Subject: [PATCH] simplify node.rs and group generics --- Cargo.lock | 1 + core/interfaces/src/node.rs | 279 ++++++++++++++++-------------------- core/node/Cargo.toml | 1 + core/node/draco.toml | 28 ++++ core/node/src/cli.rs | 78 +++++++++- core/node/src/main.rs | 111 +++++--------- 6 files changed, 273 insertions(+), 225 deletions(-) create mode 100644 core/node/draco.toml diff --git a/Cargo.lock b/Cargo.lock index f9223b2ec..37fcc3462 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2253,6 +2253,7 @@ dependencies = [ "draco-rpc", "draco-signer", "fleek-crypto", + "mock", "mockall", "serde", "tokio", diff --git a/core/interfaces/src/node.rs b/core/interfaces/src/node.rs index c0b2f6245..655f19605 100644 --- a/core/interfaces/src/node.rs +++ b/core/interfaces/src/node.rs @@ -22,106 +22,82 @@ use crate::{ GossipInterface, TopologyInterface, }; -pub struct Node< - ConfigProvider: ConfigProviderInterface, - Consensus: ConsensusInterface, - Application: ApplicationInterface, - BlockStore: BlockStoreInterface, - Indexer: IndexerInterface, - FileSystem: FileSystemInterface, - Signer: SignerInterface, - Stream: tokio_stream::Stream, - DeliveryAcknowledgmentAggregator: DeliveryAcknowledgmentAggregatorInterface, - Notifier: NotifierInterface, - ReputationAggregator: ReputationAggregatorInterface, - Rpc: RpcInterface, - Sdk: SdkInterface< - SyncQuery = Application::SyncExecutor, - ReputationReporter = ReputationAggregator::ReputationReporter, - FileSystem = FileSystem, - >, - Handshake: HandshakeInterface, - Topology: TopologyInterface, - Gossip: GossipInterface, -> { - pub configuration: Arc, - pub consensus: Consensus, - pub application: Application, - pub store: BlockStore, - pub indexer: Indexer, - pub fs: FileSystem, - pub signer: Signer, - pub origin_providers: HashMap>>, - pub rpc: Rpc, - pub delivery_acknowledgment_aggregator: DeliveryAcknowledgmentAggregator, - pub reputation_aggregator: ReputationAggregator, - pub handshake: Handshake, - pub topology: Arc, - pub gossip: Arc, - pub sdk: PhantomData, - pub notifier: PhantomData, +pub trait DracoTypes: Send + Sync { + type ConfigProvider: ConfigProviderInterface; + type Consensus: ConsensusInterface< + QueryRunner = ::SyncExecutor, + Gossip = Self::Gossip, + >; + type Application: ApplicationInterface; + type BlockStore: BlockStoreInterface; + type Indexer: IndexerInterface; + type FileSystem: FileSystemInterface; + type Signer: SignerInterface; + type Stream: tokio_stream::Stream; + type DeliveryAcknowledgmentAggregator: DeliveryAcknowledgmentAggregatorInterface; + type Notifier: NotifierInterface< + SyncQuery = ::SyncExecutor, + >; + type ReputationAggregator: ReputationAggregatorInterface; + type Rpc: RpcInterface<::SyncExecutor>; + type Sdk: SdkInterface< + SyncQuery = ::SyncExecutor, + ReputationReporter = < + Self::ReputationAggregator as ReputationAggregatorInterface + >::ReputationReporter, + FileSystem = Self::FileSystem, + >; + type Handshake: HandshakeInterface; + type Topology: TopologyInterface< + SyncQuery = ::SyncExecutor, + >; + type Gossip: GossipInterface< + Topology = Self::Topology, + Notifier = Self::Notifier, + Signer = Self::Signer, + >; } -impl< - ConfigProvider: ConfigProviderInterface, - Consensus: ConsensusInterface, - Application: ApplicationInterface, - BlockStore: BlockStoreInterface, - Indexer: IndexerInterface, - FileSystem: FileSystemInterface, - Signer: SignerInterface, - Stream: tokio_stream::Stream, - DeliveryAcknowledgmentAggregator: DeliveryAcknowledgmentAggregatorInterface, - Notifier: NotifierInterface, - ReputationAggregator: ReputationAggregatorInterface, - Rpc: RpcInterface, - Sdk: SdkInterface< - SyncQuery = Application::SyncExecutor, - ReputationReporter = ReputationAggregator::ReputationReporter, - FileSystem = FileSystem, - >, - Handshake: HandshakeInterface, - Topology: TopologyInterface, - Gossip: GossipInterface, -> - Node< - ConfigProvider, - Consensus, - Application, - BlockStore, - Indexer, - FileSystem, - Signer, - Stream, - DeliveryAcknowledgmentAggregator, - Notifier, - ReputationAggregator, - Rpc, - Sdk, - Handshake, - Topology, - Gossip, - > -{ - pub async fn init(configuration: Arc) -> anyhow::Result { - let mut signer = Signer::init(configuration.get::()).await?; +pub struct Node { + pub configuration: Arc, + pub consensus: T::Consensus, + pub application: T::Application, + pub store: T::BlockStore, + pub indexer: T::Indexer, + pub fs: T::FileSystem, + pub signer: T::Signer, + pub origin_providers: HashMap>>, + pub rpc: T::Rpc, + pub delivery_acknowledgment_aggregator: T::DeliveryAcknowledgmentAggregator, + pub reputation_aggregator: T::ReputationAggregator, + pub handshake: T::Handshake, + pub topology: Arc, + pub gossip: Arc, + pub sdk: PhantomData, + pub notifier: PhantomData, +} + +impl Node { + pub async fn init(configuration: Arc) -> anyhow::Result { + let mut signer = T::Signer::init(configuration.get::()).await?; - let application = Application::init(configuration.get::()).await?; + let application = T::Application::init(configuration.get::()).await?; let topology = Arc::new( - Topology::init( - configuration.get::(), + T::Topology::init( + configuration.get::(), signer.get_bls_pk(), application.sync_query(), ) .await?, ); - let gossip = - Arc::new(Gossip::init(configuration.get::(), topology.clone(), &signer).await?); + let gossip = Arc::new( + T::Gossip::init(configuration.get::(), topology.clone(), &signer).await?, + ); - let consensus = Consensus::init( - configuration.get::(), + let consensus = T::Consensus::init( + configuration.get::(), &signer, application.transaction_executor(), application.sync_query(), @@ -132,35 +108,35 @@ impl< // Provide the mempool socket to the signer so it can use it to send messages to consensus. signer.provide_mempool(consensus.mempool()); - let store = BlockStore::init(configuration.get::()).await?; + let store = T::BlockStore::init(configuration.get::()).await?; - let indexer = Indexer::init(configuration.get::()).await?; + let indexer = T::Indexer::init(configuration.get::()).await?; - let fs = FileSystem::new(&store, &indexer); + let fs = T::FileSystem::new(&store, &indexer); - let delivery_acknowledgment_aggregator = DeliveryAcknowledgmentAggregator::init( - configuration.get::(), + let delivery_acknowledgment_aggregator = T::DeliveryAcknowledgmentAggregator::init( + configuration.get::(), signer.get_socket(), ) .await?; - let notifier = Notifier::init(application.sync_query()); + let notifier = T::Notifier::init(application.sync_query()); - let reputation_aggregator = ReputationAggregator::init( - configuration.get::(), + let reputation_aggregator = T::ReputationAggregator::init( + configuration.get::(), signer.get_socket(), notifier, ) .await?; - let rpc = Rpc::init( - configuration.get::(), + let rpc = T::Rpc::init( + configuration.get::(), consensus.mempool(), application.sync_query(), ) .await?; - let handshake = Handshake::init(configuration.get::()).await?; + let handshake = T::Handshake::init(configuration.get::()).await?; Ok(Self { configuration, @@ -185,19 +161,19 @@ impl< pub fn register_origin_provider( &mut self, name: String, - provider: Box>, + provider: Box>, ) { if self.origin_providers.insert(name, provider).is_some() { panic!("Duplicate origin provider."); } } - pub fn register_service HandlerFn<'static, Sdk>>( + pub fn register_service HandlerFn<'static, T::Sdk>>( &mut self, id: ServiceId, setup: S, ) { - let sdk = Sdk::new( + let sdk = T::Sdk::new( self.application.sync_query(), self.reputation_aggregator.get_reporter(), self.fs.clone(), @@ -220,60 +196,21 @@ impl< } /// An associated function that consumes every - pub fn fill_configuration(configuration: &ConfigProvider) { - configuration.get::(); - configuration.get::(); - configuration.get::(); - configuration.get::(); - configuration.get::(); - configuration.get::(); - configuration.get::(); - configuration.get::(); - configuration.get::(); + pub fn fill_configuration(configuration: &T::ConfigProvider) { + configuration.get::(); + configuration.get::(); + configuration.get::(); + configuration.get::(); + configuration.get::(); + configuration.get::(); + configuration.get::(); + configuration.get::(); + configuration.get::(); } } #[async_trait] -impl< - ConfigProvider: ConfigProviderInterface, - Consensus: ConsensusInterface, - Application: ApplicationInterface, - BlockStore: BlockStoreInterface, - Indexer: IndexerInterface, - FileSystem: FileSystemInterface, - Signer: SignerInterface, - Stream: tokio_stream::Stream, - DeliveryAcknowledgmentAggregator: DeliveryAcknowledgmentAggregatorInterface, - Notifier: NotifierInterface, - ReputationAggregator: ReputationAggregatorInterface, - Rpc: RpcInterface, - Sdk: SdkInterface< - SyncQuery = Application::SyncExecutor, - ReputationReporter = ReputationAggregator::ReputationReporter, - FileSystem = FileSystem, - >, - Handshake: HandshakeInterface, - Topology: TopologyInterface, - Gossip: GossipInterface, -> WithStartAndShutdown - for Node< - ConfigProvider, - Consensus, - Application, - BlockStore, - Indexer, - FileSystem, - Signer, - Stream, - DeliveryAcknowledgmentAggregator, - Notifier, - ReputationAggregator, - Rpc, - Sdk, - Handshake, - Topology, - Gossip, - > +impl WithStartAndShutdown for Node where Self: Send, for<'a> &'a Self: Send, @@ -304,3 +241,41 @@ where self.rpc.shutdown().await; } } + +pub mod transformers { + use super::*; + + pub struct WithConsensus< + T: DracoTypes, + Consensus: ConsensusInterface< + QueryRunner = ::SyncExecutor, + Gossip = T::Gossip, + >, + >(T, PhantomData); + + impl< + T: DracoTypes, + Consensus: ConsensusInterface< + QueryRunner = ::SyncExecutor, + Gossip = T::Gossip, + >, + > DracoTypes for WithConsensus + { + type ConfigProvider = T::ConfigProvider; + type Consensus = Consensus; + type Application = T::Application; + type BlockStore = T::BlockStore; + type Indexer = T::Indexer; + type FileSystem = T::FileSystem; + type Signer = T::Signer; + type Stream = T::Stream; + type DeliveryAcknowledgmentAggregator = T::DeliveryAcknowledgmentAggregator; + type Notifier = T::Notifier; + type ReputationAggregator = T::ReputationAggregator; + type Rpc = T::Rpc; + type Sdk = T::Sdk; + type Handshake = T::Handshake; + type Topology = T::Topology; + type Gossip = T::Gossip; + } +} diff --git a/core/node/Cargo.toml b/core/node/Cargo.toml index 765a59c7b..5a65b29fd 100644 --- a/core/node/Cargo.toml +++ b/core/node/Cargo.toml @@ -16,6 +16,7 @@ draco-consensus = { path = "../consensus" } draco-notifier = { path = "../notifier" } draco-rpc = { path = "../rpc" } draco-signer = { path = "../signer" } +mock = { path = "../mock" } anyhow.workspace = true clap = { version = "4.2", features = ["derive"] } serde.workspace = true diff --git a/core/node/draco.toml b/core/node/draco.toml new file mode 100644 index 000000000..c00203a24 --- /dev/null +++ b/core/node/draco.toml @@ -0,0 +1,28 @@ +[application] +mode = "Dev" + +[blockstore] + +[consensus] +address = "/ip4/0.0.0.0/udp/8000" +mempool_address = "/ip4/0.0.0.0/udp/8002" +store_path = "~/.fleek/data/narwhal_store" +worker_address = "/ip4/0.0.0.0/udp/8001" + +[handshake] +listen_addr = "0.0.0.0:6969" + +[indexer] + +[pod] + +[rep-collector] +reporter_buffer_size = 50 + +[rpc] +addr = "127.0.0.1" +port = 4069 + +[signer] +network_key_path = "~/.draco/keystore/network.pem" +node_key_path = "~/.draco/keystore/node.pem" diff --git a/core/node/src/cli.rs b/core/node/src/cli.rs index 52339b5b3..b81fd7548 100644 --- a/core/node/src/cli.rs +++ b/core/node/src/cli.rs @@ -1,6 +1,10 @@ -use std::path::PathBuf; +use std::{marker::PhantomData, path::PathBuf, sync::Arc}; +use anyhow::Result; use clap::{arg, Parser, Subcommand}; +use draco_interfaces::{ConfigProviderInterface, DracoTypes, Node, WithStartAndShutdown}; + +use crate::{config::TomlConfigProvider, shutdown::ShutdownController}; #[derive(Parser)] #[command(about, version)] @@ -8,6 +12,9 @@ pub struct CliArgs { /// Path to the toml configuration file #[arg(short, long, default_value = "draco.toml")] pub config: PathBuf, + /// Determines that we should be using the mock consensus backend. + #[arg(long)] + pub with_mock_consensus: bool, #[command(subcommand)] pub cmd: Command, } @@ -25,3 +32,72 @@ pub enum Command { default: bool, }, } + +/// Create a new command line application. +pub struct Cli(CliArgs, PhantomData); + +impl Cli { + pub fn new(args: CliArgs) -> Self { + Self(args, PhantomData) + } +} + +impl> Cli +where + Node: Send + Sync, +{ + /// Execute the application based on the provided command. + pub async fn exec(self) -> Result<()> { + let args = self.0; + + match args.cmd { + Command::Run {} => Self::run(args.config).await, + Command::PrintConfig { default } if default => Self::print_default_config().await, + Command::PrintConfig { .. } => Self::print_config(args.config).await, + } + } + + /// Run the node with the provided configuration path. + async fn run(config_path: PathBuf) -> Result<()> { + let shutdown_controller = ShutdownController::default(); + shutdown_controller.install_ctrl_c_handler(); + + let config = Arc::new(Self::load_or_write_config(config_path).await?); + let node = Node::::init(config).await?; + + node.start().await; + + shutdown_controller.wait_for_shutdown().await; + node.shutdown().await; + + Ok(()) + } + + /// Print the default configuration for the node, this function does not + /// create a new file. + async fn print_default_config() -> Result<()> { + let config = TomlConfigProvider::default(); + Node::::fill_configuration(&config); + println!("{}", config.serialize_config()); + Ok(()) + } + + /// Print the configuration from the given path. + async fn print_config(config_path: PathBuf) -> Result<()> { + let config = Self::load_or_write_config(config_path).await?; + println!("{}", config.serialize_config()); + Ok(()) + } + + /// Load the configuration file and write the default to the disk. + async fn load_or_write_config(config_path: PathBuf) -> Result { + let config = TomlConfigProvider::open(&config_path)?; + Node::::fill_configuration(&config); + + if !config_path.exists() { + std::fs::write(&config_path, config.serialize_config())?; + } + + Ok(config) + } +} diff --git a/core/node/src/main.rs b/core/node/src/main.rs index fde2ae5f3..67d84be70 100644 --- a/core/node/src/main.rs +++ b/core/node/src/main.rs @@ -3,103 +3,70 @@ mod config; mod shutdown; mod template; -use std::{path::PathBuf, sync::Arc}; - use anyhow::Result; use clap::Parser; +use cli::Cli; use draco_application::{app::Application, query_runner::QueryRunner}; use draco_blockstore::memory::MemoryBlockStore; use draco_consensus::consensus::Consensus; use draco_handshake::server::{StreamProvider, TcpHandshakeServer, TcpProvider}; -use draco_interfaces::{common::WithStartAndShutdown as _, ConfigProviderInterface, Node}; +use draco_interfaces::{transformers, ApplicationInterface, DracoTypes}; use draco_notifier::Notifier; use draco_rep_collector::ReputationAggregator; use draco_rpc::server::Rpc; use draco_signer::Signer; +use mock::consensus::MockConsensus; use template::{gossip::Gossip, topology::Topology}; use crate::{ - cli::{CliArgs, Command}, + cli::CliArgs, config::TomlConfigProvider, - shutdown::ShutdownController, template::{ fs::FileSystem, indexer::Indexer, origin::MyStream, pod::DeliveryAcknowledgmentAggregator, sdk::Sdk, }, }; -pub type ConcreteNode = Node< - TomlConfigProvider, - Consensus, Notifier>>, - Application, - MemoryBlockStore, - Indexer, - FileSystem, - Signer, - MyStream, - DeliveryAcknowledgmentAggregator, - Notifier, - ReputationAggregator, - Rpc, - Sdk<::Reader, ::Writer>, - TcpHandshakeServer< +/// Finalized type bindings for Draco. +pub struct FinalTypes; + +impl DracoTypes for FinalTypes { + type ConfigProvider = TomlConfigProvider; + type Consensus = Consensus, Notifier>>; + type Application = Application; + type BlockStore = MemoryBlockStore; + type Indexer = Indexer; + type FileSystem = FileSystem; + type Signer = Signer; + type Stream = MyStream; + type DeliveryAcknowledgmentAggregator = DeliveryAcknowledgmentAggregator; + type Notifier = Notifier; + type ReputationAggregator = ReputationAggregator; + type Rpc = Rpc; + type Sdk = + Sdk<::Reader, ::Writer>; + type Handshake = TcpHandshakeServer< Sdk<::Reader, ::Writer>, - >, - Topology, - Gossip, Notifier>, ->; + >; + type Topology = Topology; + type Gossip = Gossip, Notifier>; +} #[tokio::main] async fn main() -> Result<()> { let args = CliArgs::parse(); - match args.cmd { - Command::Run => run(args.config).await, - Command::PrintConfig { default } if default => print_default_config().await, - Command::PrintConfig { .. } => print_config(args.config).await, + if args.with_mock_consensus { + type Node = transformers::WithConsensus< + FinalTypes, + MockConsensus< + <::Application as ApplicationInterface>::SyncExecutor, + ::Gossip, + >, + >; + + Cli::::new(args).exec().await + } else { + Cli::::new(args).exec().await } } - -/// Run the node with the provided configuration path. -async fn run(config_path: PathBuf) -> Result<()> { - let shutdown_controller = ShutdownController::default(); - shutdown_controller.install_ctrl_c_handler(); - - let config = Arc::new(load_or_write_config(config_path).await?); - let node = ConcreteNode::init(config).await?; - - node.start().await; - - shutdown_controller.wait_for_shutdown().await; - node.shutdown().await; - - Ok(()) -} - -/// Print the default configuration for the node, this function does not -/// create a new file. -async fn print_default_config() -> Result<()> { - let config = TomlConfigProvider::default(); - ConcreteNode::fill_configuration(&config); - println!("{}", config.serialize_config()); - Ok(()) -} - -/// Print the configuration from the given path. -async fn print_config(config_path: PathBuf) -> Result<()> { - let config = load_or_write_config(config_path).await?; - println!("{}", config.serialize_config()); - Ok(()) -} - -/// Load the configuration file and write the default to the disk. -async fn load_or_write_config(config_path: PathBuf) -> Result { - let config = TomlConfigProvider::open(&config_path)?; - ConcreteNode::fill_configuration(&config); - - if !config_path.exists() { - std::fs::write(&config_path, config.serialize_config())?; - } - - Ok(config) -}