From 77f4486962096913951dd365236ec4991de6d26c Mon Sep 17 00:00:00 2001 From: Matthias Wright Date: Tue, 4 Jul 2023 13:51:22 +0200 Subject: [PATCH] test(rep-collector): test submit measurements --- Cargo.lock | 1 + core/application/src/genesis.rs | 25 ++++++ core/application/src/state.rs | 1 - core/rep-collector/Cargo.toml | 4 + core/rep-collector/src/aggregator.rs | 19 ++-- core/rep-collector/src/config.rs | 14 ++- core/rep-collector/src/lib.rs | 2 + core/rep-collector/src/tests.rs | 126 +++++++++++++++++++++++++++ core/signer/src/lib.rs | 6 +- core/test-utils/src/consensus.rs | 1 - 10 files changed, 187 insertions(+), 12 deletions(-) create mode 100644 core/rep-collector/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index ac06a74da..90d075595 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2304,6 +2304,7 @@ dependencies = [ "draco-interfaces", "draco-notifier", "draco-reputation", + "draco-signer", "draco-test-utils", "fleek-crypto", "lru", diff --git a/core/application/src/genesis.rs b/core/application/src/genesis.rs index 1dfacb652..6e6f5a656 100644 --- a/core/application/src/genesis.rs +++ b/core/application/src/genesis.rs @@ -116,3 +116,28 @@ impl From<&GenesisCommittee> for NodeInfo { } } } + +impl GenesisCommittee { + #[allow(clippy::too_many_arguments)] + pub fn new( + owner: String, + primary_public_key: String, + primary_address: String, + network_key: String, + worker_address: String, + worker_public_key: String, + worker_mempool: String, + staking: Option, + ) -> Self { + Self { + owner, + primary_public_key, + primary_address, + network_key, + worker_address, + worker_public_key, + worker_mempool, + staking, + } + } +} diff --git a/core/application/src/state.rs b/core/application/src/state.rs index eb675b3a3..714ae8851 100644 --- a/core/application/src/state.rs +++ b/core/application/src/state.rs @@ -685,7 +685,6 @@ impl State { } } let new_rep_scores = draco_reputation::calculate_reputation_scores(map); - // Store new scores in application state. new_rep_scores.iter().for_each(|(node, new_score)| { if let Some(old_score) = rep_scores.get(node) { diff --git a/core/rep-collector/Cargo.toml b/core/rep-collector/Cargo.toml index 419af8381..578ae3775 100644 --- a/core/rep-collector/Cargo.toml +++ b/core/rep-collector/Cargo.toml @@ -20,4 +20,8 @@ scc.workspace = true [dev-dependencies] draco-test-utils = {path="../test-utils"} +draco-signer = {path="../signer"} +draco-application = {path="../application"} +draco-notifier = {path="../notifier"} rand.workspace = true +fleek-crypto.workspace = true diff --git a/core/rep-collector/src/aggregator.rs b/core/rep-collector/src/aggregator.rs index 5346837f7..843a9e336 100644 --- a/core/rep-collector/src/aggregator.rs +++ b/core/rep-collector/src/aggregator.rs @@ -16,6 +16,11 @@ use tokio::sync::mpsc; use crate::{buffered_mpsc, config::Config, measurement_manager::MeasurementManager}; +#[cfg(not(test))] +const BEFORE_EPOCH_CHANGE: Duration = Duration::from_secs(3600); +#[cfg(test)] +const BEFORE_EPOCH_CHANGE: Duration = Duration::from_secs(2); + pub struct ReputationAggregator { report_rx: buffered_mpsc::BufferedReceiver, reporter: MyReputationReporter, @@ -30,10 +35,9 @@ pub struct ReputationAggregator { #[allow(dead_code)] impl ReputationAggregator { - async fn start(mut self) -> anyhow::Result<()> { - let hour = Duration::from_secs(3600); + pub async fn start(mut self) -> anyhow::Result<()> { self.notifier - .notify_before_epoch_change(hour, self.notify_tx.clone()); + .notify_before_epoch_change(BEFORE_EPOCH_CHANGE, self.notify_tx.clone()); loop { tokio::select! { report_msg = self.report_rx.recv() => { @@ -42,7 +46,11 @@ impl ReputationAggregator { notification = self.notify_rx.recv() => { if let Notification::BeforeEpochChange = notification.expect("Failed to receive notification.") { self.submit_aggregation(); - self.notifier.notify_before_epoch_change(hour, self.notify_tx.clone()); + self.notifier + .notify_before_epoch_change( + BEFORE_EPOCH_CHANGE, + self.notify_tx.clone() + ); self.measurement_manager.clear_measurements(); } } @@ -101,7 +109,8 @@ impl ReputationAggregatorInterface for ReputationAggregator { submit_tx: SubmitTxSocket, notifier: Self::Notifier, ) -> anyhow::Result { - let (report_tx, report_rx) = buffered_mpsc::buffered_channel(100, 2048); + let (report_tx, report_rx) = + buffered_mpsc::buffered_channel(config.reporter_buffer_size, 2048); let (notify_tx, notify_rx) = mpsc::channel(2048); let measurement_manager = MeasurementManager::new(); let local_reputation_ref = measurement_manager.get_local_reputation_ref(); diff --git a/core/rep-collector/src/config.rs b/core/rep-collector/src/config.rs index 72ef4ba67..8e362edff 100644 --- a/core/rep-collector/src/config.rs +++ b/core/rep-collector/src/config.rs @@ -1,4 +1,14 @@ use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Default)] -pub struct Config {} +#[derive(Serialize, Deserialize)] +pub struct Config { + pub reporter_buffer_size: usize, +} + +impl Default for Config { + fn default() -> Self { + Self { + reporter_buffer_size: 50, + } + } +} diff --git a/core/rep-collector/src/lib.rs b/core/rep-collector/src/lib.rs index 5f7d63817..c7244196a 100644 --- a/core/rep-collector/src/lib.rs +++ b/core/rep-collector/src/lib.rs @@ -3,3 +3,5 @@ pub mod buffered_mpsc; pub mod config; pub(crate) mod measurement_manager; pub use aggregator::{MyReputationQuery, MyReputationReporter, ReputationAggregator}; +#[cfg(test)] +mod tests; diff --git a/core/rep-collector/src/tests.rs b/core/rep-collector/src/tests.rs new file mode 100644 index 000000000..098d2183a --- /dev/null +++ b/core/rep-collector/src/tests.rs @@ -0,0 +1,126 @@ +use std::time::{Duration, SystemTime}; + +use draco_application::{ + app::Application, + config::{Config as AppConfig, Mode}, + genesis::{Genesis, GenesisCommittee}, +}; +use draco_interfaces::{ + application::ApplicationInterface, + common::WithStartAndShutdown, + consensus::ConsensusInterface, + notifier::NotifierInterface, + reputation::{ReputationAggregatorInterface, ReputationReporterInterface}, + signer::SignerInterface, + SyncQueryRunnerInterface, +}; +use draco_notifier::Notifier; +use draco_signer::{Config as SignerConfig, Signer}; +use draco_test_utils::consensus::{Config as ConsensusConfig, MockConsensus}; +use fleek_crypto::{AccountOwnerSecretKey, NodePublicKey, PublicKey, SecretKey}; + +use crate::{aggregator::ReputationAggregator, config::Config}; + +#[tokio::test] +async fn test_submit_measurements() { + let signer_config = SignerConfig::default(); + let mut signer = Signer::init(signer_config).await.unwrap(); + + let mut genesis = Genesis::load().unwrap(); + + let (network_secret_key, secret_key) = signer.get_sk(); + let public_key = secret_key.to_pk(); + + let network_public_key = network_secret_key.to_pk(); + + let owner_secret_key = AccountOwnerSecretKey::generate(); + let owner_public_key = owner_secret_key.to_pk(); + + genesis.committee.push(GenesisCommittee::new( + owner_public_key.to_base64(), + public_key.to_base64(), + "/ip4/127.0.0.1/udp/48000".to_owned(), + network_public_key.to_base64(), + "/ip4/127.0.0.1/udp/48101/http".to_owned(), + network_public_key.to_base64(), + "/ip4/127.0.0.1/tcp/48102/http".to_owned(), + None, + )); + + let epoch_start = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + genesis.epoch_start = epoch_start; + genesis.epoch_time = 4000; // millis + + let app = Application::init(AppConfig { + genesis: Some(genesis), + mode: Mode::Test, + }) + .await + .unwrap(); + app.start().await; + + let (update_socket, query_runner) = (app.transaction_executor(), app.sync_query()); + + let consensus_config = ConsensusConfig { + min_ordering_time: 0, + max_ordering_time: 1, + probability_txn_lost: 0.0, + lose_every_n_txn: None, + }; + let consensus = MockConsensus::init( + consensus_config, + &signer, + update_socket.clone(), + query_runner.clone(), + ) + .await + .unwrap(); + + signer.provide_mempool(consensus.mempool()); + signer.provide_query_runner(query_runner.clone()); + signer.start().await; + consensus.start().await; + + let notifier = Notifier::init(query_runner.clone()); + let config = Config { + reporter_buffer_size: 1, + }; + let rep_aggregator = ReputationAggregator::init(config, signer.get_socket(), notifier) + .await + .unwrap(); + + let rep_reporter = rep_aggregator.get_reporter(); + let mut aggregator_handle = tokio::spawn(async move { + rep_aggregator.start().await.unwrap(); + }); + + let peer = NodePublicKey([1; 96]); + rep_reporter.report_latency(&peer, Duration::from_millis(300)); + rep_reporter.report_latency(&peer, Duration::from_millis(100)); + rep_reporter.report_bytes_sent(&peer, 10_000, None); + rep_reporter.report_bytes_received(&peer, 20_000, None); + + let mut interval = tokio::time::interval(Duration::from_millis(100)); + loop { + tokio::select! { + _ = &mut aggregator_handle => {} + _ = interval.tick() => { + let measurements = query_runner.get_rep_measurements(peer); + if !measurements.is_empty() { + assert_eq!(measurements.len(), 1); + assert_eq!(measurements[0].reporting_node, public_key); + assert_eq!( + measurements[0].measurements.latency, + Some(Duration::from_millis(200)) + ); + assert_eq!(measurements[0].measurements.bytes_received, Some(20_000)); + assert_eq!(measurements[0].measurements.bytes_sent, Some(10_000)); + break; + } + } + } + } +} diff --git a/core/signer/src/lib.rs b/core/signer/src/lib.rs index 7e1213f53..4e127890e 100644 --- a/core/signer/src/lib.rs +++ b/core/signer/src/lib.rs @@ -1,16 +1,16 @@ mod config; -#[cfg(test)] -mod tests; mod utils; use std::{ collections::VecDeque, sync::{Arc, Mutex}, time::{Duration, SystemTime}, }; +#[cfg(test)] +mod tests; use affair::{Socket, Task}; use async_trait::async_trait; -use config::Config; +pub use config::Config; use draco_application::query_runner::QueryRunner; use draco_interfaces::{ common::WithStartAndShutdown, diff --git a/core/test-utils/src/consensus.rs b/core/test-utils/src/consensus.rs index 61df9266e..7a95f94ea 100644 --- a/core/test-utils/src/consensus.rs +++ b/core/test-utils/src/consensus.rs @@ -145,7 +145,6 @@ impl MockConsensusInner { .await .map_err(|r| anyhow::anyhow!(format!("{r:?}"))) .unwrap(); - } _ = shutdown_rx.recv() => break, }