Skip to content

Commit

Permalink
make it a lil less smelly?
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Jul 27, 2023
1 parent 6e3a727 commit 0faa625
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 28 deletions.
53 changes: 26 additions & 27 deletions mobile_verifier/src/speedtests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use helium_crypto::PublicKeyBinary;
use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient};
use sqlx::{FromRow, Postgres, Transaction, Type};
use std::collections::HashMap;
use tokio::sync::mpsc::Receiver;
use std::{collections::HashMap, ops::DerefMut, sync::Arc};
use tokio::sync::{mpsc::Receiver, Mutex};

const SPEEDTEST_AVG_MAX_DATA_POINTS: usize = 6;

Expand Down Expand Up @@ -97,45 +97,44 @@ impl SpeedtestDaemon {
// process the speedtest reports from the file, if valid insert to the db
// collect a list of pubkeys from valid reports
// and for each such pubkey, recalcuate a new average

// TODO: remove `gateways_to_average` from fold accumulator
// let gateways_to_average = Vec::<PublicKeyBinary>::new();
let (gateways_to_average, transaction) = file_info_stream
let gateways_to_average = Arc::new(Mutex::new(Vec::<PublicKeyBinary>::new()));
file_info_stream
.into_stream(&mut transaction)
.await?
.map(anyhow::Ok)
.try_fold(
(Vec::<PublicKeyBinary>::new(), transaction),
|(mut gateways_to_average, mut transaction), report| async move {
let pubkey = report.report.pubkey.clone();
if self
.gateway_client
.resolve_gateway_info(&pubkey)
.try_fold(transaction, |mut transaction, report| async {
let pubkey = report.report.pubkey.clone();
if self
.gateway_client
.resolve_gateway_info(&pubkey)
.await
.is_ok()
{
save_speedtest_to_db(report, &mut transaction).await?;
// below is an o(n) op but the vec size will be limited
if !gateways_to_average
.lock()
.await
.is_ok()
.deref_mut()
.contains(&pubkey)
{
save_speedtest_to_db(report, &mut transaction).await?;
// below is an o(n) op but the vec size will be limited
// todo: consider alternative
if !gateways_to_average.contains(&pubkey) {
gateways_to_average.push(pubkey)
}
gateways_to_average.lock().await.deref_mut().push(pubkey)
}
Ok((gateways_to_average, transaction))
},
)
}
Ok(transaction)
})
.await?
.commit()
.await?;
transaction.commit().await?;

let averages_transaction = self.pool.begin().await?;
stream::iter(gateways_to_average)
stream::iter(gateways_to_average.lock().await.deref_mut())
.map(anyhow::Ok)
.try_fold(
averages_transaction,
|mut averages_transaction, pubkey| async move {
let latest_speedtests: Vec<Speedtest> =
get_latest_speedtests_for_pubkey(&pubkey, &mut averages_transaction)
.await?;
get_latest_speedtests_for_pubkey(pubkey, &mut averages_transaction).await?;
let average = SpeedtestAverage::from(&latest_speedtests);
average.write(&self.file_sink, latest_speedtests).await?;
Ok(averages_transaction)
Expand Down
1 change: 0 additions & 1 deletion mobile_verifier/src/speedtests_average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ pub struct SpeedtestAverages {
}

impl SpeedtestAverages {

pub fn get_average(&self, pub_key: &PublicKeyBinary) -> Option<SpeedtestAverage> {
self.averages.get(pub_key).cloned()
}
Expand Down

0 comments on commit 0faa625

Please sign in to comment.