Skip to content

Commit

Permalink
add topology and gossip interface to node
Browse files Browse the repository at this point in the history
  • Loading branch information
qti3e committed Jul 4, 2023
1 parent 58d76fc commit aeb5990
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2228,6 +2228,7 @@ dependencies = [
name = "draco-node"
version = "0.1.0"
dependencies = [
"affair",
"anyhow",
"async-trait",
"bytes",
Expand Down
23 changes: 17 additions & 6 deletions core/interfaces/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::sync::Arc;

use affair::Socket;
use anyhow::Result;
use async_trait::async_trait;
use serde::de::DeserializeOwned;

use crate::{topology::TopologyInterface, ConfigConsumer, NotifierInterface, WithStartAndShutdown};
use crate::{
signer::SignerInterface, topology::TopologyInterface, ConfigConsumer, NotifierInterface,
WithStartAndShutdown,
};

/// Numerical value for different gossip topics used by Fleek Network.
// New topics can be added as the system grows.
Expand Down Expand Up @@ -33,11 +37,18 @@ pub trait GossipInterface: WithStartAndShutdown + ConfigConsumer + Sized + Send
/// The notifier that allows us to refresh the connections once the epoch changes.
type Notifier: NotifierInterface;

/// The signer that we can used to sign and submit messages.
type Signer: SignerInterface;

/// Subscriber implementation used for listening on a topic.
type Subscriber<T: DeserializeOwned>: GossipSubscriber<T>;
type Subscriber<T: DeserializeOwned + Send + Sync>: GossipSubscriberInterface<T>;

/// Initialize the gossip system with the config and the topology object..
async fn init(config: Self::Config, topology: Arc<Self::Topology>) -> Self;
async fn init(
config: Self::Config,
topology: Arc<Self::Topology>,
signer: &Self::Signer,
) -> Result<Self>;

/// Get a socket which can be used to broadcast a message globally under any topic.
fn broadcast_socket(&self) -> Socket<GossipMessage, ()>;
Expand All @@ -46,14 +57,14 @@ pub trait GossipInterface: WithStartAndShutdown + ConfigConsumer + Sized + Send
/// be deserialized as `T` are returned to the listener.
fn subscribe<T>(&self, topic: Topic) -> Self::Subscriber<T>
where
T: DeserializeOwned;
T: DeserializeOwned + Send + Sync;
}

/// A subscriber for the incoming messages under a topic.
#[async_trait]
pub trait GossipSubscriber<T>
pub trait GossipSubscriberInterface<T>: Send + Sync
where
T: DeserializeOwned,
T: DeserializeOwned + Send + Sync,
{
/// Await the next message in the topic, should only return `None` if there are
/// no longer any new messages coming. (indicating that the gossip instance is
Expand Down
26 changes: 26 additions & 0 deletions core/interfaces/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
sdk::{HandlerFn, SdkInterface},
signer::SignerInterface,
types::ServiceId,
GossipInterface, TopologyInterface,
};

pub struct Node<
Expand All @@ -40,6 +41,8 @@ pub struct Node<
FileSystem = FileSystem,
>,
Handshake: HandshakeInterface<Sdk = Sdk>,
Topology: TopologyInterface<SyncQuery = Application::SyncExecutor>,
Gossip: GossipInterface<Topology = Topology, Notifier = Notifier, Signer = Signer>,
> {
pub configuration: Arc<ConfigProvider>,
pub consensus: Consensus,
Expand All @@ -53,6 +56,8 @@ pub struct Node<
pub delivery_acknowledgment_aggregator: DeliveryAcknowledgmentAggregator,
pub reputation_aggregator: ReputationAggregator,
pub handshake: Handshake,
pub topology: Arc<Topology>,
pub gossip: Gossip,
pub sdk: PhantomData<Sdk>,
pub notifier: PhantomData<Notifier>,
}
Expand All @@ -76,6 +81,8 @@ impl<
FileSystem = FileSystem,
>,
Handshake: HandshakeInterface<Sdk = Sdk>,
Topology: TopologyInterface<SyncQuery = Application::SyncExecutor>,
Gossip: GossipInterface<Topology = Topology, Notifier = Notifier, Signer = Signer>,
>
Node<
ConfigProvider,
Expand All @@ -92,13 +99,26 @@ impl<
Rpc,
Sdk,
Handshake,
Topology,
Gossip,
>
{
pub async fn init(configuration: Arc<ConfigProvider>) -> anyhow::Result<Self> {
let mut signer = Signer::init(configuration.get::<Signer>()).await?;

let application = Application::init(configuration.get::<Application>()).await?;

let topology = Arc::new(
Topology::init(
configuration.get::<Topology>(),
signer.get_bls_pk(),
application.sync_query(),
)
.await?,
);

let gossip = Gossip::init(configuration.get::<Gossip>(), topology.clone(), &signer).await?;

let consensus = Consensus::init(
configuration.get::<Consensus>(),
&signer,
Expand Down Expand Up @@ -153,6 +173,8 @@ impl<
delivery_acknowledgment_aggregator,
reputation_aggregator,
handshake,
topology,
gossip,
sdk: PhantomData,
notifier: PhantomData,
})
Expand Down Expand Up @@ -229,6 +251,8 @@ impl<
FileSystem = FileSystem,
>,
Handshake: HandshakeInterface<Sdk = Sdk>,
Topology: TopologyInterface<SyncQuery = Application::SyncExecutor>,
Gossip: GossipInterface<Topology = Topology, Notifier = Notifier, Signer = Signer>,
> WithStartAndShutdown
for Node<
ConfigProvider,
Expand All @@ -245,6 +269,8 @@ impl<
Rpc,
Sdk,
Handshake,
Topology,
Gossip,
>
where
Self: Send,
Expand Down
6 changes: 5 additions & 1 deletion core/interfaces/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ pub trait TopologyInterface: ConfigConsumer + Sized + Send + Sync {
/// Due to that reason instead we just pass the public key here. For consistency and
/// correctness of the implementation it is required that this public key to be our
/// actual public key which is obtained from [get_bls_pk](crate::SignerInterface::get_bls_pk).
async fn init(config: Self::Config, our_public_key: NodePublicKey) -> anyhow::Result<Self>;
async fn init(
config: Self::Config,
our_public_key: NodePublicKey,
query_runner: Self::SyncQuery,
) -> anyhow::Result<Self>;

/// Suggest a list of connections that our current node must connect to. This should be
/// according to the `our_public_key` value passed during the initialization.
Expand Down
1 change: 1 addition & 0 deletions core/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ bytes.workspace = true
tokio-stream.workspace = true
fleek-crypto.workspace = true
tracing = "0.1"
affair.workspace = true
3 changes: 3 additions & 0 deletions core/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use draco_handshake::server::{StreamProvider, TcpHandshakeServer, TcpProvider};
use draco_interfaces::{common::WithStartAndShutdown as _, ConfigProviderInterface, Node};
use draco_notifier::Notifier;
use draco_rep_collector::ReputationAggregator;
use template::{gossip::Gossip, topology::Topology};

use crate::{
cli::{CliArgs, Command},
Expand Down Expand Up @@ -41,6 +42,8 @@ pub type ConcreteNode = Node<
TcpHandshakeServer<
Sdk<<TcpProvider as StreamProvider>::Reader, <TcpProvider as StreamProvider>::Writer>,
>,
Topology<QueryRunner>,
Gossip<Signer, Topology<QueryRunner>, Notifier>,
>;

#[tokio::main]
Expand Down
90 changes: 90 additions & 0 deletions core/node/src/template/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::{marker::PhantomData, sync::Arc};

use anyhow::Result;
use async_trait::async_trait;
use draco_interfaces::{
signer::SignerInterface, ConfigConsumer, GossipInterface, GossipSubscriberInterface,
NotifierInterface, TopologyInterface, WithStartAndShutdown,
};
use serde::de::DeserializeOwned;

use super::config::Config;

pub struct Gossip<S: SignerInterface, Topo: TopologyInterface, N: NotifierInterface> {
signer: PhantomData<S>,
topology: PhantomData<Topo>,
notifier: PhantomData<N>,
}

#[async_trait]
impl<S: SignerInterface, Topo: TopologyInterface, N: NotifierInterface + Send + Sync>
WithStartAndShutdown for Gossip<S, Topo, N>
{
/// Returns true if this system is running or not.
fn is_running(&self) -> bool {
todo!()
}

/// Start the system, should not do anything if the system is already
/// started.
async fn start(&self) {
todo!()
}

/// Send the shutdown signal to the system.
async fn shutdown(&self) {
todo!()
}
}

#[async_trait]
impl<S: SignerInterface, Topo: TopologyInterface, N: NotifierInterface + Send + Sync>
GossipInterface for Gossip<S, Topo, N>
{
type Signer = S;

type Topology = Topo;

type Notifier = N;

type Subscriber<T: DeserializeOwned + Send + Sync> = GossipSubscriber<T>;

async fn init(
_config: Self::Config,
_topology: Arc<Self::Topology>,
_signer: &Self::Signer,
) -> Result<Self> {
todo!()
}

fn subscribe<T>(&self, _topic: draco_interfaces::Topic) -> Self::Subscriber<T>
where
T: DeserializeOwned + Send + Sync,
{
todo!()
}

fn broadcast_socket(&self) -> affair::Socket<draco_interfaces::GossipMessage, ()> {
todo!()
}
}

impl<S: SignerInterface, Topo: TopologyInterface, N: NotifierInterface> ConfigConsumer
for Gossip<S, Topo, N>
{
type Config = Config;

const KEY: &'static str = "GOSSIP";
}

pub struct GossipSubscriber<T>(PhantomData<T>);

#[async_trait]
impl<T> GossipSubscriberInterface<T> for GossipSubscriber<T>
where
T: DeserializeOwned + Send + Sync,
{
async fn recv(&mut self) -> Option<T> {
todo!()
}
}
2 changes: 2 additions & 0 deletions core/node/src/template/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
pub mod blockstore;
pub mod config;
pub mod fs;
pub mod gossip;
pub mod indexer;
pub mod origin;
pub mod pod;
pub mod rpc;
pub mod sdk;
pub mod signer;
pub mod topology;
34 changes: 34 additions & 0 deletions core/node/src/template/topology.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::{marker::PhantomData, sync::Arc};

use async_trait::async_trait;
use draco_interfaces::{ConfigConsumer, SyncQueryRunnerInterface, TopologyInterface};
use fleek_crypto::NodePublicKey;

use super::config::Config;

pub struct Topology<Q: SyncQueryRunnerInterface> {
query: PhantomData<Q>,
}

#[async_trait]
impl<Q: SyncQueryRunnerInterface> TopologyInterface for Topology<Q> {
type SyncQuery = Q;

async fn init(
_config: Self::Config,
_our_public_key: NodePublicKey,
_query_runner: Self::SyncQuery,
) -> anyhow::Result<Self> {
todo!()
}

fn suggest_connections(&self) -> Arc<Vec<Vec<NodePublicKey>>> {
todo!()
}
}

impl<Q: SyncQueryRunnerInterface> ConfigConsumer for Topology<Q> {
type Config = Config;

const KEY: &'static str = "TOPOLOGY";
}

0 comments on commit aeb5990

Please sign in to comment.