Skip to content

Commit

Permalink
handle speedtests and average reports in same db txn
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Jul 27, 2023
1 parent 0faa625 commit c9911d8
Showing 1 changed file with 10 additions and 33 deletions.
43 changes: 10 additions & 33 deletions mobile_verifier/src/speedtests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use file_store::{
file_info_poller::FileInfoStream, file_sink::FileSinkClient,
speedtest::CellSpeedtestIngestReport,
};
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use helium_crypto::PublicKeyBinary;
use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient};
use sqlx::{FromRow, Postgres, Transaction, Type};
use std::{collections::HashMap, ops::DerefMut, sync::Arc};
use tokio::sync::{mpsc::Receiver, Mutex};
use std::collections::HashMap;
use tokio::sync::mpsc::Receiver;

const SPEEDTEST_AVG_MAX_DATA_POINTS: usize = 6;

Expand Down Expand Up @@ -95,9 +95,7 @@ impl SpeedtestDaemon {
);
let mut transaction = self.pool.begin().await?;
// process the speedtest reports from the file, if valid insert to the db
// collect a list of pubkeys from valid reports
// and for each such pubkey, recalcuate a new average
let gateways_to_average = Arc::new(Mutex::new(Vec::<PublicKeyBinary>::new()));
// and recalcuate a new average
file_info_stream
.into_stream(&mut transaction)
.await?
Expand All @@ -106,43 +104,22 @@ impl SpeedtestDaemon {
let pubkey = report.report.pubkey.clone();
if self
.gateway_client
.resolve_gateway_info(&pubkey)
.resolve_gateway_info(&pubkey.clone())
.await
.is_ok()
{
save_speedtest_to_db(report, &mut transaction).await?;
// below is an o(n) op but the vec size will be limited
if !gateways_to_average
.lock()
.await
.deref_mut()
.contains(&pubkey)
{
gateways_to_average.lock().await.deref_mut().push(pubkey)
}
}
Ok(transaction)
})
.await?
.commit()
.await?;

let averages_transaction = self.pool.begin().await?;
stream::iter(gateways_to_average.lock().await.deref_mut())
.map(anyhow::Ok)
.try_fold(
averages_transaction,
|mut averages_transaction, pubkey| async move {
let latest_speedtests: Vec<Speedtest> =
get_latest_speedtests_for_pubkey(pubkey, &mut averages_transaction).await?;
get_latest_speedtests_for_pubkey(&pubkey, &mut transaction).await?;
let average = SpeedtestAverage::from(&latest_speedtests);
average.write(&self.file_sink, latest_speedtests).await?;
Ok(averages_transaction)
},
)
}
Ok(transaction)
})
.await?
.commit()
.await?;
// db work all done, commit the reports to s3
self.file_sink.commit().await?;

Ok(())
Expand Down

0 comments on commit c9911d8

Please sign in to comment.