diff --git a/Cargo.lock b/Cargo.lock index 687490d42..ac06a74da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2228,6 +2228,7 @@ dependencies = [ name = "draco-node" version = "0.1.0" dependencies = [ + "affair", "anyhow", "async-trait", "bytes", diff --git a/core/interfaces/src/gossip.rs b/core/interfaces/src/gossip.rs index a4b919b39..63ee77ccd 100644 --- a/core/interfaces/src/gossip.rs +++ b/core/interfaces/src/gossip.rs @@ -1,10 +1,14 @@ use std::sync::Arc; use affair::Socket; +use anyhow::Result; use async_trait::async_trait; use serde::de::DeserializeOwned; -use crate::{topology::TopologyInterface, ConfigConsumer, NotifierInterface, WithStartAndShutdown}; +use crate::{ + signer::SignerInterface, topology::TopologyInterface, ConfigConsumer, NotifierInterface, + WithStartAndShutdown, +}; /// Numerical value for different gossip topics used by Fleek Network. // New topics can be added as the system grows. @@ -33,11 +37,18 @@ pub trait GossipInterface: WithStartAndShutdown + ConfigConsumer + Sized + Send /// The notifier that allows us to refresh the connections once the epoch changes. type Notifier: NotifierInterface; + /// The signer that we can used to sign and submit messages. + type Signer: SignerInterface; + /// Subscriber implementation used for listening on a topic. - type Subscriber: GossipSubscriber; + type Subscriber: GossipSubscriberInterface; /// Initialize the gossip system with the config and the topology object.. - async fn init(config: Self::Config, topology: Arc) -> Self; + async fn init( + config: Self::Config, + topology: Arc, + signer: &Self::Signer, + ) -> Result; /// Get a socket which can be used to broadcast a message globally under any topic. fn broadcast_socket(&self) -> Socket; @@ -46,14 +57,14 @@ pub trait GossipInterface: WithStartAndShutdown + ConfigConsumer + Sized + Send /// be deserialized as `T` are returned to the listener. fn subscribe(&self, topic: Topic) -> Self::Subscriber where - T: DeserializeOwned; + T: DeserializeOwned + Send + Sync; } /// A subscriber for the incoming messages under a topic. #[async_trait] -pub trait GossipSubscriber +pub trait GossipSubscriberInterface: Send + Sync where - T: DeserializeOwned, + T: DeserializeOwned + Send + Sync, { /// Await the next message in the topic, should only return `None` if there are /// no longer any new messages coming. (indicating that the gossip instance is diff --git a/core/interfaces/src/node.rs b/core/interfaces/src/node.rs index 2d86ebb3a..387691869 100644 --- a/core/interfaces/src/node.rs +++ b/core/interfaces/src/node.rs @@ -19,6 +19,7 @@ use crate::{ sdk::{HandlerFn, SdkInterface}, signer::SignerInterface, types::ServiceId, + GossipInterface, TopologyInterface, }; pub struct Node< @@ -40,6 +41,8 @@ pub struct Node< FileSystem = FileSystem, >, Handshake: HandshakeInterface, + Topology: TopologyInterface, + Gossip: GossipInterface, > { pub configuration: Arc, pub consensus: Consensus, @@ -53,6 +56,8 @@ pub struct Node< pub delivery_acknowledgment_aggregator: DeliveryAcknowledgmentAggregator, pub reputation_aggregator: ReputationAggregator, pub handshake: Handshake, + pub topology: Arc, + pub gossip: Gossip, pub sdk: PhantomData, pub notifier: PhantomData, } @@ -76,6 +81,8 @@ impl< FileSystem = FileSystem, >, Handshake: HandshakeInterface, + Topology: TopologyInterface, + Gossip: GossipInterface, > Node< ConfigProvider, @@ -92,6 +99,8 @@ impl< Rpc, Sdk, Handshake, + Topology, + Gossip, > { pub async fn init(configuration: Arc) -> anyhow::Result { @@ -99,6 +108,17 @@ impl< let application = Application::init(configuration.get::()).await?; + let topology = Arc::new( + Topology::init( + configuration.get::(), + signer.get_bls_pk(), + application.sync_query(), + ) + .await?, + ); + + let gossip = Gossip::init(configuration.get::(), topology.clone(), &signer).await?; + let consensus = Consensus::init( configuration.get::(), &signer, @@ -153,6 +173,8 @@ impl< delivery_acknowledgment_aggregator, reputation_aggregator, handshake, + topology, + gossip, sdk: PhantomData, notifier: PhantomData, }) @@ -229,6 +251,8 @@ impl< FileSystem = FileSystem, >, Handshake: HandshakeInterface, + Topology: TopologyInterface, + Gossip: GossipInterface, > WithStartAndShutdown for Node< ConfigProvider, @@ -245,6 +269,8 @@ impl< Rpc, Sdk, Handshake, + Topology, + Gossip, > where Self: Send, diff --git a/core/interfaces/src/topology.rs b/core/interfaces/src/topology.rs index 07545eb99..c615e82ba 100644 --- a/core/interfaces/src/topology.rs +++ b/core/interfaces/src/topology.rs @@ -23,7 +23,11 @@ pub trait TopologyInterface: ConfigConsumer + Sized + Send + Sync { /// Due to that reason instead we just pass the public key here. For consistency and /// correctness of the implementation it is required that this public key to be our /// actual public key which is obtained from [get_bls_pk](crate::SignerInterface::get_bls_pk). - async fn init(config: Self::Config, our_public_key: NodePublicKey) -> anyhow::Result; + async fn init( + config: Self::Config, + our_public_key: NodePublicKey, + query_runner: Self::SyncQuery, + ) -> anyhow::Result; /// Suggest a list of connections that our current node must connect to. This should be /// according to the `our_public_key` value passed during the initialization. diff --git a/core/node/Cargo.toml b/core/node/Cargo.toml index ffdc85295..0a699ea98 100644 --- a/core/node/Cargo.toml +++ b/core/node/Cargo.toml @@ -24,3 +24,4 @@ bytes.workspace = true tokio-stream.workspace = true fleek-crypto.workspace = true tracing = "0.1" +affair.workspace = true diff --git a/core/node/src/main.rs b/core/node/src/main.rs index 929dff6ac..5b737028f 100644 --- a/core/node/src/main.rs +++ b/core/node/src/main.rs @@ -13,6 +13,7 @@ use draco_handshake::server::{StreamProvider, TcpHandshakeServer, TcpProvider}; use draco_interfaces::{common::WithStartAndShutdown as _, ConfigProviderInterface, Node}; use draco_notifier::Notifier; use draco_rep_collector::ReputationAggregator; +use template::{gossip::Gossip, topology::Topology}; use crate::{ cli::{CliArgs, Command}, @@ -41,6 +42,8 @@ pub type ConcreteNode = Node< TcpHandshakeServer< Sdk<::Reader, ::Writer>, >, + Topology, + Gossip, Notifier>, >; #[tokio::main] diff --git a/core/node/src/template/gossip.rs b/core/node/src/template/gossip.rs new file mode 100644 index 000000000..0b02d70e4 --- /dev/null +++ b/core/node/src/template/gossip.rs @@ -0,0 +1,90 @@ +use std::{marker::PhantomData, sync::Arc}; + +use anyhow::Result; +use async_trait::async_trait; +use draco_interfaces::{ + signer::SignerInterface, ConfigConsumer, GossipInterface, GossipSubscriberInterface, + NotifierInterface, TopologyInterface, WithStartAndShutdown, +}; +use serde::de::DeserializeOwned; + +use super::config::Config; + +pub struct Gossip { + signer: PhantomData, + topology: PhantomData, + notifier: PhantomData, +} + +#[async_trait] +impl + WithStartAndShutdown for Gossip +{ + /// Returns true if this system is running or not. + fn is_running(&self) -> bool { + todo!() + } + + /// Start the system, should not do anything if the system is already + /// started. + async fn start(&self) { + todo!() + } + + /// Send the shutdown signal to the system. + async fn shutdown(&self) { + todo!() + } +} + +#[async_trait] +impl + GossipInterface for Gossip +{ + type Signer = S; + + type Topology = Topo; + + type Notifier = N; + + type Subscriber = GossipSubscriber; + + async fn init( + _config: Self::Config, + _topology: Arc, + _signer: &Self::Signer, + ) -> Result { + todo!() + } + + fn subscribe(&self, _topic: draco_interfaces::Topic) -> Self::Subscriber + where + T: DeserializeOwned + Send + Sync, + { + todo!() + } + + fn broadcast_socket(&self) -> affair::Socket { + todo!() + } +} + +impl ConfigConsumer + for Gossip +{ + type Config = Config; + + const KEY: &'static str = "GOSSIP"; +} + +pub struct GossipSubscriber(PhantomData); + +#[async_trait] +impl GossipSubscriberInterface for GossipSubscriber +where + T: DeserializeOwned + Send + Sync, +{ + async fn recv(&mut self) -> Option { + todo!() + } +} diff --git a/core/node/src/template/mod.rs b/core/node/src/template/mod.rs index 4e39e7c8a..0ccd37bd4 100644 --- a/core/node/src/template/mod.rs +++ b/core/node/src/template/mod.rs @@ -1,9 +1,11 @@ pub mod blockstore; pub mod config; pub mod fs; +pub mod gossip; pub mod indexer; pub mod origin; pub mod pod; pub mod rpc; pub mod sdk; pub mod signer; +pub mod topology; diff --git a/core/node/src/template/topology.rs b/core/node/src/template/topology.rs new file mode 100644 index 000000000..970b3951a --- /dev/null +++ b/core/node/src/template/topology.rs @@ -0,0 +1,34 @@ +use std::{marker::PhantomData, sync::Arc}; + +use async_trait::async_trait; +use draco_interfaces::{ConfigConsumer, SyncQueryRunnerInterface, TopologyInterface}; +use fleek_crypto::NodePublicKey; + +use super::config::Config; + +pub struct Topology { + query: PhantomData, +} + +#[async_trait] +impl TopologyInterface for Topology { + type SyncQuery = Q; + + async fn init( + _config: Self::Config, + _our_public_key: NodePublicKey, + _query_runner: Self::SyncQuery, + ) -> anyhow::Result { + todo!() + } + + fn suggest_connections(&self) -> Arc>> { + todo!() + } +} + +impl ConfigConsumer for Topology { + type Config = Config; + + const KEY: &'static str = "TOPOLOGY"; +}