From 0faa62536f6507f096788f691df55fd4059cbb43 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Thu, 27 Jul 2023 17:12:31 +0100 Subject: [PATCH] make it a lil less smelly? --- mobile_verifier/src/speedtests.rs | 53 +++++++++++------------ mobile_verifier/src/speedtests_average.rs | 1 - 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 7fcf5ed5d..da3578e6b 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -8,8 +8,8 @@ use futures::{stream, StreamExt, TryFutureExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient}; use sqlx::{FromRow, Postgres, Transaction, Type}; -use std::collections::HashMap; -use tokio::sync::mpsc::Receiver; +use std::{collections::HashMap, ops::DerefMut, sync::Arc}; +use tokio::sync::{mpsc::Receiver, Mutex}; const SPEEDTEST_AVG_MAX_DATA_POINTS: usize = 6; @@ -97,45 +97,44 @@ impl SpeedtestDaemon { // process the speedtest reports from the file, if valid insert to the db // collect a list of pubkeys from valid reports // and for each such pubkey, recalcuate a new average - - // TODO: remove `gateways_to_average` from fold accumulator - // let gateways_to_average = Vec::::new(); - let (gateways_to_average, transaction) = file_info_stream + let gateways_to_average = Arc::new(Mutex::new(Vec::::new())); + file_info_stream .into_stream(&mut transaction) .await? .map(anyhow::Ok) - .try_fold( - (Vec::::new(), transaction), - |(mut gateways_to_average, mut transaction), report| async move { - let pubkey = report.report.pubkey.clone(); - if self - .gateway_client - .resolve_gateway_info(&pubkey) + .try_fold(transaction, |mut transaction, report| async { + let pubkey = report.report.pubkey.clone(); + if self + .gateway_client + .resolve_gateway_info(&pubkey) + .await + .is_ok() + { + save_speedtest_to_db(report, &mut transaction).await?; + // below is an o(n) op but the vec size will be limited + if !gateways_to_average + .lock() .await - .is_ok() + .deref_mut() + .contains(&pubkey) { - save_speedtest_to_db(report, &mut transaction).await?; - // below is an o(n) op but the vec size will be limited - // todo: consider alternative - if !gateways_to_average.contains(&pubkey) { - gateways_to_average.push(pubkey) - } + gateways_to_average.lock().await.deref_mut().push(pubkey) } - Ok((gateways_to_average, transaction)) - }, - ) + } + Ok(transaction) + }) + .await? + .commit() .await?; - transaction.commit().await?; let averages_transaction = self.pool.begin().await?; - stream::iter(gateways_to_average) + stream::iter(gateways_to_average.lock().await.deref_mut()) .map(anyhow::Ok) .try_fold( averages_transaction, |mut averages_transaction, pubkey| async move { let latest_speedtests: Vec = - get_latest_speedtests_for_pubkey(&pubkey, &mut averages_transaction) - .await?; + get_latest_speedtests_for_pubkey(pubkey, &mut averages_transaction).await?; let average = SpeedtestAverage::from(&latest_speedtests); average.write(&self.file_sink, latest_speedtests).await?; Ok(averages_transaction) diff --git a/mobile_verifier/src/speedtests_average.rs b/mobile_verifier/src/speedtests_average.rs index d3c6151f8..b6b73dafa 100644 --- a/mobile_verifier/src/speedtests_average.rs +++ b/mobile_verifier/src/speedtests_average.rs @@ -219,7 +219,6 @@ pub struct SpeedtestAverages { } impl SpeedtestAverages { - pub fn get_average(&self, pub_key: &PublicKeyBinary) -> Option { self.averages.get(pub_key).cloned() }