Skip to content

Commit

Permalink
simplify node.rs and group generics
Browse files Browse the repository at this point in the history
  • Loading branch information
qti3e committed Jul 18, 2023
1 parent b44aa2e commit ed5cac9
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 225 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2253,6 +2253,7 @@ dependencies = [
"draco-rpc",
"draco-signer",
"fleek-crypto",
"mock",
"mockall",
"serde",
"tokio",
Expand Down
279 changes: 127 additions & 152 deletions core/interfaces/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,106 +22,82 @@ use crate::{
GossipInterface, TopologyInterface,
};

pub struct Node<
ConfigProvider: ConfigProviderInterface,
Consensus: ConsensusInterface<QueryRunner = Application::SyncExecutor, Gossip = Gossip>,
Application: ApplicationInterface,
BlockStore: BlockStoreInterface,
Indexer: IndexerInterface,
FileSystem: FileSystemInterface<BlockStore = BlockStore, Indexer = Indexer>,
Signer: SignerInterface,
Stream: tokio_stream::Stream<Item = bytes::BytesMut>,
DeliveryAcknowledgmentAggregator: DeliveryAcknowledgmentAggregatorInterface,
Notifier: NotifierInterface<SyncQuery = Application::SyncExecutor>,
ReputationAggregator: ReputationAggregatorInterface,
Rpc: RpcInterface<Application::SyncExecutor>,
Sdk: SdkInterface<
SyncQuery = Application::SyncExecutor,
ReputationReporter = ReputationAggregator::ReputationReporter,
FileSystem = FileSystem,
>,
Handshake: HandshakeInterface<Sdk = Sdk>,
Topology: TopologyInterface<SyncQuery = Application::SyncExecutor>,
Gossip: GossipInterface<Topology = Topology, Notifier = Notifier, Signer = Signer>,
> {
pub configuration: Arc<ConfigProvider>,
pub consensus: Consensus,
pub application: Application,
pub store: BlockStore,
pub indexer: Indexer,
pub fs: FileSystem,
pub signer: Signer,
pub origin_providers: HashMap<String, Box<dyn OriginProviderInterface<Stream>>>,
pub rpc: Rpc,
pub delivery_acknowledgment_aggregator: DeliveryAcknowledgmentAggregator,
pub reputation_aggregator: ReputationAggregator,
pub handshake: Handshake,
pub topology: Arc<Topology>,
pub gossip: Arc<Gossip>,
pub sdk: PhantomData<Sdk>,
pub notifier: PhantomData<Notifier>,
pub trait DracoTypes: Send + Sync {
type ConfigProvider: ConfigProviderInterface;
type Consensus: ConsensusInterface<
QueryRunner = <Self::Application as ApplicationInterface>::SyncExecutor,
Gossip = Self::Gossip,
>;
type Application: ApplicationInterface;
type BlockStore: BlockStoreInterface;
type Indexer: IndexerInterface;
type FileSystem: FileSystemInterface<BlockStore = Self::BlockStore, Indexer = Self::Indexer>;
type Signer: SignerInterface;
type Stream: tokio_stream::Stream<Item = bytes::BytesMut>;
type DeliveryAcknowledgmentAggregator: DeliveryAcknowledgmentAggregatorInterface;
type Notifier: NotifierInterface<
SyncQuery = <Self::Application as ApplicationInterface>::SyncExecutor,
>;
type ReputationAggregator: ReputationAggregatorInterface<Notifier = Self::Notifier>;
type Rpc: RpcInterface<<Self::Application as ApplicationInterface>::SyncExecutor>;
type Sdk: SdkInterface<
SyncQuery = <Self::Application as ApplicationInterface>::SyncExecutor,
ReputationReporter = <
Self::ReputationAggregator as ReputationAggregatorInterface
>::ReputationReporter,
FileSystem = Self::FileSystem,
>;
type Handshake: HandshakeInterface<Sdk = Self::Sdk>;
type Topology: TopologyInterface<
SyncQuery = <Self::Application as ApplicationInterface>::SyncExecutor,
>;
type Gossip: GossipInterface<
Topology = Self::Topology,
Notifier = Self::Notifier,
Signer = Self::Signer,
>;
}

impl<
ConfigProvider: ConfigProviderInterface,
Consensus: ConsensusInterface<QueryRunner = Application::SyncExecutor, Gossip = Gossip>,
Application: ApplicationInterface,
BlockStore: BlockStoreInterface,
Indexer: IndexerInterface,
FileSystem: FileSystemInterface<BlockStore = BlockStore, Indexer = Indexer>,
Signer: SignerInterface,
Stream: tokio_stream::Stream<Item = bytes::BytesMut>,
DeliveryAcknowledgmentAggregator: DeliveryAcknowledgmentAggregatorInterface,
Notifier: NotifierInterface<SyncQuery = Application::SyncExecutor>,
ReputationAggregator: ReputationAggregatorInterface<Notifier = Notifier>,
Rpc: RpcInterface<Application::SyncExecutor>,
Sdk: SdkInterface<
SyncQuery = Application::SyncExecutor,
ReputationReporter = ReputationAggregator::ReputationReporter,
FileSystem = FileSystem,
>,
Handshake: HandshakeInterface<Sdk = Sdk>,
Topology: TopologyInterface<SyncQuery = Application::SyncExecutor>,
Gossip: GossipInterface<Topology = Topology, Notifier = Notifier, Signer = Signer>,
>
Node<
ConfigProvider,
Consensus,
Application,
BlockStore,
Indexer,
FileSystem,
Signer,
Stream,
DeliveryAcknowledgmentAggregator,
Notifier,
ReputationAggregator,
Rpc,
Sdk,
Handshake,
Topology,
Gossip,
>
{
pub async fn init(configuration: Arc<ConfigProvider>) -> anyhow::Result<Self> {
let mut signer = Signer::init(configuration.get::<Signer>()).await?;
pub struct Node<T: DracoTypes> {
pub configuration: Arc<T::ConfigProvider>,
pub consensus: T::Consensus,
pub application: T::Application,
pub store: T::BlockStore,
pub indexer: T::Indexer,
pub fs: T::FileSystem,
pub signer: T::Signer,
pub origin_providers: HashMap<String, Box<dyn OriginProviderInterface<T::Stream>>>,
pub rpc: T::Rpc,
pub delivery_acknowledgment_aggregator: T::DeliveryAcknowledgmentAggregator,
pub reputation_aggregator: T::ReputationAggregator,
pub handshake: T::Handshake,
pub topology: Arc<T::Topology>,
pub gossip: Arc<T::Gossip>,
pub sdk: PhantomData<T::Sdk>,
pub notifier: PhantomData<T::Notifier>,
}

impl<T: DracoTypes> Node<T> {
pub async fn init(configuration: Arc<T::ConfigProvider>) -> anyhow::Result<Self> {
let mut signer = T::Signer::init(configuration.get::<T::Signer>()).await?;

let application = Application::init(configuration.get::<Application>()).await?;
let application = T::Application::init(configuration.get::<T::Application>()).await?;

let topology = Arc::new(
Topology::init(
configuration.get::<Topology>(),
T::Topology::init(
configuration.get::<T::Topology>(),
signer.get_bls_pk(),
application.sync_query(),
)
.await?,
);

let gossip =
Arc::new(Gossip::init(configuration.get::<Gossip>(), topology.clone(), &signer).await?);
let gossip = Arc::new(
T::Gossip::init(configuration.get::<T::Gossip>(), topology.clone(), &signer).await?,
);

let consensus = Consensus::init(
configuration.get::<Consensus>(),
let consensus = T::Consensus::init(
configuration.get::<T::Consensus>(),
&signer,
application.transaction_executor(),
application.sync_query(),
Expand All @@ -132,35 +108,35 @@ impl<
// Provide the mempool socket to the signer so it can use it to send messages to consensus.
signer.provide_mempool(consensus.mempool());

let store = BlockStore::init(configuration.get::<BlockStore>()).await?;
let store = T::BlockStore::init(configuration.get::<T::BlockStore>()).await?;

let indexer = Indexer::init(configuration.get::<Indexer>()).await?;
let indexer = T::Indexer::init(configuration.get::<T::Indexer>()).await?;

let fs = FileSystem::new(&store, &indexer);
let fs = T::FileSystem::new(&store, &indexer);

let delivery_acknowledgment_aggregator = DeliveryAcknowledgmentAggregator::init(
configuration.get::<DeliveryAcknowledgmentAggregator>(),
let delivery_acknowledgment_aggregator = T::DeliveryAcknowledgmentAggregator::init(
configuration.get::<T::DeliveryAcknowledgmentAggregator>(),
signer.get_socket(),
)
.await?;

let notifier = Notifier::init(application.sync_query());
let notifier = T::Notifier::init(application.sync_query());

let reputation_aggregator = ReputationAggregator::init(
configuration.get::<ReputationAggregator>(),
let reputation_aggregator = T::ReputationAggregator::init(
configuration.get::<T::ReputationAggregator>(),
signer.get_socket(),
notifier,
)
.await?;

let rpc = Rpc::init(
configuration.get::<Rpc>(),
let rpc = T::Rpc::init(
configuration.get::<T::Rpc>(),
consensus.mempool(),
application.sync_query(),
)
.await?;

let handshake = Handshake::init(configuration.get::<Handshake>()).await?;
let handshake = T::Handshake::init(configuration.get::<T::Handshake>()).await?;

Ok(Self {
configuration,
Expand All @@ -185,19 +161,19 @@ impl<
pub fn register_origin_provider(
&mut self,
name: String,
provider: Box<dyn OriginProviderInterface<Stream>>,
provider: Box<dyn OriginProviderInterface<T::Stream>>,
) {
if self.origin_providers.insert(name, provider).is_some() {
panic!("Duplicate origin provider.");
}
}

pub fn register_service<S: FnOnce(Sdk) -> HandlerFn<'static, Sdk>>(
pub fn register_service<S: FnOnce(T::Sdk) -> HandlerFn<'static, T::Sdk>>(
&mut self,
id: ServiceId,
setup: S,
) {
let sdk = Sdk::new(
let sdk = T::Sdk::new(
self.application.sync_query(),
self.reputation_aggregator.get_reporter(),
self.fs.clone(),
Expand All @@ -220,60 +196,21 @@ impl<
}

/// An associated function that consumes every
pub fn fill_configuration(configuration: &ConfigProvider) {
configuration.get::<Consensus>();
configuration.get::<Application>();
configuration.get::<BlockStore>();
configuration.get::<Indexer>();
configuration.get::<Signer>();
configuration.get::<DeliveryAcknowledgmentAggregator>();
configuration.get::<ReputationAggregator>();
configuration.get::<Rpc>();
configuration.get::<Handshake>();
pub fn fill_configuration(configuration: &T::ConfigProvider) {
configuration.get::<T::Consensus>();
configuration.get::<T::Application>();
configuration.get::<T::BlockStore>();
configuration.get::<T::Indexer>();
configuration.get::<T::Signer>();
configuration.get::<T::DeliveryAcknowledgmentAggregator>();
configuration.get::<T::ReputationAggregator>();
configuration.get::<T::Rpc>();
configuration.get::<T::Handshake>();
}
}

#[async_trait]
impl<
ConfigProvider: ConfigProviderInterface,
Consensus: ConsensusInterface<QueryRunner = Application::SyncExecutor, Gossip = Gossip>,
Application: ApplicationInterface,
BlockStore: BlockStoreInterface,
Indexer: IndexerInterface,
FileSystem: FileSystemInterface<BlockStore = BlockStore, Indexer = Indexer>,
Signer: SignerInterface,
Stream: tokio_stream::Stream<Item = bytes::BytesMut>,
DeliveryAcknowledgmentAggregator: DeliveryAcknowledgmentAggregatorInterface,
Notifier: NotifierInterface<SyncQuery = Application::SyncExecutor>,
ReputationAggregator: ReputationAggregatorInterface,
Rpc: RpcInterface<Application::SyncExecutor>,
Sdk: SdkInterface<
SyncQuery = Application::SyncExecutor,
ReputationReporter = ReputationAggregator::ReputationReporter,
FileSystem = FileSystem,
>,
Handshake: HandshakeInterface<Sdk = Sdk>,
Topology: TopologyInterface<SyncQuery = Application::SyncExecutor>,
Gossip: GossipInterface<Topology = Topology, Notifier = Notifier, Signer = Signer>,
> WithStartAndShutdown
for Node<
ConfigProvider,
Consensus,
Application,
BlockStore,
Indexer,
FileSystem,
Signer,
Stream,
DeliveryAcknowledgmentAggregator,
Notifier,
ReputationAggregator,
Rpc,
Sdk,
Handshake,
Topology,
Gossip,
>
impl<T: DracoTypes> WithStartAndShutdown for Node<T>
where
Self: Send,
for<'a> &'a Self: Send,
Expand Down Expand Up @@ -304,3 +241,41 @@ where
self.rpc.shutdown().await;
}
}

pub mod transformers {
use super::*;

pub struct WithConsensus<
T: DracoTypes,
Consensus: ConsensusInterface<
QueryRunner = <T::Application as ApplicationInterface>::SyncExecutor,
Gossip = T::Gossip,
>,
>(T, PhantomData<Consensus>);

impl<
T: DracoTypes,
Consensus: ConsensusInterface<
QueryRunner = <T::Application as ApplicationInterface>::SyncExecutor,
Gossip = T::Gossip,
>,
> DracoTypes for WithConsensus<T, Consensus>
{
type ConfigProvider = T::ConfigProvider;
type Consensus = Consensus;
type Application = T::Application;
type BlockStore = T::BlockStore;
type Indexer = T::Indexer;
type FileSystem = T::FileSystem;
type Signer = T::Signer;
type Stream = T::Stream;
type DeliveryAcknowledgmentAggregator = T::DeliveryAcknowledgmentAggregator;
type Notifier = T::Notifier;
type ReputationAggregator = T::ReputationAggregator;
type Rpc = T::Rpc;
type Sdk = T::Sdk;
type Handshake = T::Handshake;
type Topology = T::Topology;
type Gossip = T::Gossip;
}
}
1 change: 1 addition & 0 deletions core/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ draco-consensus = { path = "../consensus" }
draco-notifier = { path = "../notifier" }
draco-rpc = { path = "../rpc" }
draco-signer = { path = "../signer" }
mock = { path = "../mock" }
anyhow.workspace = true
clap = { version = "4.2", features = ["derive"] }
serde.workspace = true
Expand Down
28 changes: 28 additions & 0 deletions core/node/draco.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[application]
mode = "Dev"

[blockstore]

[consensus]
address = "/ip4/0.0.0.0/udp/8000"
mempool_address = "/ip4/0.0.0.0/udp/8002"
store_path = "~/.fleek/data/narwhal_store"
worker_address = "/ip4/0.0.0.0/udp/8001"

[handshake]
listen_addr = "0.0.0.0:6969"

[indexer]

[pod]

[rep-collector]
reporter_buffer_size = 50

[rpc]
addr = "127.0.0.1"
port = 4069

[signer]
network_key_path = "~/.draco/keystore/network.pem"
node_key_path = "~/.draco/keystore/node.pem"
Loading

0 comments on commit ed5cac9

Please sign in to comment.