Skip to content

Commit

Permalink
test(rep-collector): test submit measurements
Browse files Browse the repository at this point in the history
  • Loading branch information
matthias-wright committed Jul 4, 2023
1 parent aeb5990 commit 77f4486
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2304,6 +2304,7 @@ dependencies = [
"draco-interfaces",
"draco-notifier",
"draco-reputation",
"draco-signer",
"draco-test-utils",
"fleek-crypto",
"lru",
Expand Down
25 changes: 25 additions & 0 deletions core/application/src/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,28 @@ impl From<&GenesisCommittee> for NodeInfo {
}
}
}

impl GenesisCommittee {
#[allow(clippy::too_many_arguments)]
pub fn new(
owner: String,
primary_public_key: String,
primary_address: String,
network_key: String,
worker_address: String,
worker_public_key: String,
worker_mempool: String,
staking: Option<u64>,
) -> Self {
Self {
owner,
primary_public_key,
primary_address,
network_key,
worker_address,
worker_public_key,
worker_mempool,
staking,
}
}
}
1 change: 0 additions & 1 deletion core/application/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,6 @@ impl<B: Backend> State<B> {
}
}
let new_rep_scores = draco_reputation::calculate_reputation_scores(map);

// Store new scores in application state.
new_rep_scores.iter().for_each(|(node, new_score)| {
if let Some(old_score) = rep_scores.get(node) {
Expand Down
4 changes: 4 additions & 0 deletions core/rep-collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ scc.workspace = true

[dev-dependencies]
draco-test-utils = {path="../test-utils"}
draco-signer = {path="../signer"}
draco-application = {path="../application"}
draco-notifier = {path="../notifier"}
rand.workspace = true
fleek-crypto.workspace = true
19 changes: 14 additions & 5 deletions core/rep-collector/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ use tokio::sync::mpsc;

use crate::{buffered_mpsc, config::Config, measurement_manager::MeasurementManager};

#[cfg(not(test))]
const BEFORE_EPOCH_CHANGE: Duration = Duration::from_secs(3600);
#[cfg(test)]
const BEFORE_EPOCH_CHANGE: Duration = Duration::from_secs(2);

pub struct ReputationAggregator {
report_rx: buffered_mpsc::BufferedReceiver<ReportMessage>,
reporter: MyReputationReporter,
Expand All @@ -30,10 +35,9 @@ pub struct ReputationAggregator {

#[allow(dead_code)]
impl ReputationAggregator {
async fn start(mut self) -> anyhow::Result<()> {
let hour = Duration::from_secs(3600);
pub async fn start(mut self) -> anyhow::Result<()> {
self.notifier
.notify_before_epoch_change(hour, self.notify_tx.clone());
.notify_before_epoch_change(BEFORE_EPOCH_CHANGE, self.notify_tx.clone());
loop {
tokio::select! {
report_msg = self.report_rx.recv() => {
Expand All @@ -42,7 +46,11 @@ impl ReputationAggregator {
notification = self.notify_rx.recv() => {
if let Notification::BeforeEpochChange = notification.expect("Failed to receive notification.") {
self.submit_aggregation();
self.notifier.notify_before_epoch_change(hour, self.notify_tx.clone());
self.notifier
.notify_before_epoch_change(
BEFORE_EPOCH_CHANGE,
self.notify_tx.clone()
);
self.measurement_manager.clear_measurements();
}
}
Expand Down Expand Up @@ -101,7 +109,8 @@ impl ReputationAggregatorInterface for ReputationAggregator {
submit_tx: SubmitTxSocket,
notifier: Self::Notifier,
) -> anyhow::Result<Self> {
let (report_tx, report_rx) = buffered_mpsc::buffered_channel(100, 2048);
let (report_tx, report_rx) =
buffered_mpsc::buffered_channel(config.reporter_buffer_size, 2048);
let (notify_tx, notify_rx) = mpsc::channel(2048);
let measurement_manager = MeasurementManager::new();
let local_reputation_ref = measurement_manager.get_local_reputation_ref();
Expand Down
14 changes: 12 additions & 2 deletions core/rep-collector/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Default)]
pub struct Config {}
#[derive(Serialize, Deserialize)]
pub struct Config {
pub reporter_buffer_size: usize,
}

impl Default for Config {
fn default() -> Self {
Self {
reporter_buffer_size: 50,
}
}
}
2 changes: 2 additions & 0 deletions core/rep-collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ pub mod buffered_mpsc;
pub mod config;
pub(crate) mod measurement_manager;
pub use aggregator::{MyReputationQuery, MyReputationReporter, ReputationAggregator};
#[cfg(test)]
mod tests;
126 changes: 126 additions & 0 deletions core/rep-collector/src/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use std::time::{Duration, SystemTime};

use draco_application::{
app::Application,
config::{Config as AppConfig, Mode},
genesis::{Genesis, GenesisCommittee},
};
use draco_interfaces::{
application::ApplicationInterface,
common::WithStartAndShutdown,
consensus::ConsensusInterface,
notifier::NotifierInterface,
reputation::{ReputationAggregatorInterface, ReputationReporterInterface},
signer::SignerInterface,
SyncQueryRunnerInterface,
};
use draco_notifier::Notifier;
use draco_signer::{Config as SignerConfig, Signer};
use draco_test_utils::consensus::{Config as ConsensusConfig, MockConsensus};
use fleek_crypto::{AccountOwnerSecretKey, NodePublicKey, PublicKey, SecretKey};

use crate::{aggregator::ReputationAggregator, config::Config};

#[tokio::test]
async fn test_submit_measurements() {
let signer_config = SignerConfig::default();
let mut signer = Signer::init(signer_config).await.unwrap();

let mut genesis = Genesis::load().unwrap();

let (network_secret_key, secret_key) = signer.get_sk();
let public_key = secret_key.to_pk();

let network_public_key = network_secret_key.to_pk();

let owner_secret_key = AccountOwnerSecretKey::generate();
let owner_public_key = owner_secret_key.to_pk();

genesis.committee.push(GenesisCommittee::new(
owner_public_key.to_base64(),
public_key.to_base64(),
"/ip4/127.0.0.1/udp/48000".to_owned(),
network_public_key.to_base64(),
"/ip4/127.0.0.1/udp/48101/http".to_owned(),
network_public_key.to_base64(),
"/ip4/127.0.0.1/tcp/48102/http".to_owned(),
None,
));

let epoch_start = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
genesis.epoch_start = epoch_start;
genesis.epoch_time = 4000; // millis

let app = Application::init(AppConfig {
genesis: Some(genesis),
mode: Mode::Test,
})
.await
.unwrap();
app.start().await;

let (update_socket, query_runner) = (app.transaction_executor(), app.sync_query());

let consensus_config = ConsensusConfig {
min_ordering_time: 0,
max_ordering_time: 1,
probability_txn_lost: 0.0,
lose_every_n_txn: None,
};
let consensus = MockConsensus::init(
consensus_config,
&signer,
update_socket.clone(),
query_runner.clone(),
)
.await
.unwrap();

signer.provide_mempool(consensus.mempool());
signer.provide_query_runner(query_runner.clone());
signer.start().await;
consensus.start().await;

let notifier = Notifier::init(query_runner.clone());
let config = Config {
reporter_buffer_size: 1,
};
let rep_aggregator = ReputationAggregator::init(config, signer.get_socket(), notifier)
.await
.unwrap();

let rep_reporter = rep_aggregator.get_reporter();
let mut aggregator_handle = tokio::spawn(async move {
rep_aggregator.start().await.unwrap();
});

let peer = NodePublicKey([1; 96]);
rep_reporter.report_latency(&peer, Duration::from_millis(300));
rep_reporter.report_latency(&peer, Duration::from_millis(100));
rep_reporter.report_bytes_sent(&peer, 10_000, None);
rep_reporter.report_bytes_received(&peer, 20_000, None);

let mut interval = tokio::time::interval(Duration::from_millis(100));
loop {
tokio::select! {
_ = &mut aggregator_handle => {}
_ = interval.tick() => {
let measurements = query_runner.get_rep_measurements(peer);
if !measurements.is_empty() {
assert_eq!(measurements.len(), 1);
assert_eq!(measurements[0].reporting_node, public_key);
assert_eq!(
measurements[0].measurements.latency,
Some(Duration::from_millis(200))
);
assert_eq!(measurements[0].measurements.bytes_received, Some(20_000));
assert_eq!(measurements[0].measurements.bytes_sent, Some(10_000));
break;
}
}
}
}
}
6 changes: 3 additions & 3 deletions core/signer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
mod config;
#[cfg(test)]
mod tests;
mod utils;
use std::{
collections::VecDeque,
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};
#[cfg(test)]
mod tests;

use affair::{Socket, Task};
use async_trait::async_trait;
use config::Config;
pub use config::Config;
use draco_application::query_runner::QueryRunner;
use draco_interfaces::{
common::WithStartAndShutdown,
Expand Down
1 change: 0 additions & 1 deletion core/test-utils/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ impl<Q: SyncQueryRunnerInterface> MockConsensusInner<Q> {
.await
.map_err(|r| anyhow::anyhow!(format!("{r:?}")))
.unwrap();

}
_ = shutdown_rx.recv() => break,
}
Expand Down

0 comments on commit 77f4486

Please sign in to comment.