Skip to content

Commit

Permalink
make speedtests time validity more explicit
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Aug 10, 2023
1 parent bb9bca5 commit 57f8126
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
2 changes: 1 addition & 1 deletion mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl Rewarder {

// clear out the various db tables
heartbeats::clear_heartbeats(&mut transaction, &reward_period.start).await?;
speedtests::clear_speedtests(&mut transaction, &reward_period.start).await?;
speedtests::clear_speedtests(&mut transaction, &reward_period.end).await?;
data_session::clear_hotspot_data_sessions(&mut transaction, &reward_period.end).await?;
// subscriber_location::clear_location_shares(&mut transaction, &reward_period.end).await?;

Expand Down
17 changes: 11 additions & 6 deletions mobile_verifier/src/speedtests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::speedtests_average::SpeedtestAverage;
use chrono::{DateTime, Utc};
use crate::speedtests_average::{SpeedtestAverage, SPEEDTEST_LAPSE};
use chrono::{DateTime, Duration, Utc};
use file_store::{
file_info_poller::FileInfoStream,
file_sink::FileSinkClient,
Expand Down Expand Up @@ -154,14 +154,17 @@ pub async fn aggregate_epoch_speedtests<'a>(
exec: &sqlx::Pool<sqlx::Postgres>,
) -> Result<EpochSpeedTests, sqlx::Error> {
let mut speedtests = EpochSpeedTests::new();
// use latest speedtest which are no older than hours defined by SPEEDTEST_LAPSE
let start = epoch_end - Duration::hours(SPEEDTEST_LAPSE);
// pull the last N most recent speedtests from prior to the epoch end for each pubkey
let mut rows = sqlx::query_as::<_, Speedtest>(
"select * from (
SELECT distinct(pubkey), upload_speed, download_speed, latency, timestamp, serial, row_number()
over (partition by pubkey order by timestamp desc) as count FROM speedtests where timestamp < $1
over (partition by pubkey order by timestamp desc) as count FROM speedtests where timestamp >= $1 and timestamp < $2
) as tmp
where count <= $2"
where count <= $3"
)
.bind(start)
.bind(epoch_end)
.bind(SPEEDTEST_AVG_MAX_DATA_POINTS as i64)
.fetch(exec);
Expand All @@ -175,12 +178,14 @@ pub async fn aggregate_epoch_speedtests<'a>(
Ok(speedtests)
}


pub async fn clear_speedtests(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
timestamp: &DateTime<Utc>,
epoch_end: &DateTime<Utc>,
) -> Result<(), sqlx::Error> {
let oldest_ts = *epoch_end - Duration::hours(SPEEDTEST_LAPSE);
sqlx::query("DELETE FROM speedtests WHERE timestamp < $1")
.bind(timestamp)
.bind(oldest_ts)
.execute(&mut *tx)
.await?;
Ok(())
Expand Down

0 comments on commit 57f8126

Please sign in to comment.