diff --git a/mobile_verifier/migrations/15_speedtests_one_to_one.sql b/mobile_verifier/migrations/15_speedtests_one_to_one.sql index 21e7e0e51..543219ee3 100644 --- a/mobile_verifier/migrations/15_speedtests_one_to_one.sql +++ b/mobile_verifier/migrations/15_speedtests_one_to_one.sql @@ -4,7 +4,8 @@ CREATE TABLE speedtests_migration ( upload_speed bigint, download_speed bigint, latency integer, - timestamp timestamptz NOT NULL + timestamp timestamptz NOT NULL, + inserted_at timestamptz default now() ); CREATE INDEX idx_speedtests_pubkey on speedtests_migration (pubkey); diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index f5cc91e61..29bb678ba 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -330,6 +330,7 @@ mod test { subscriber_location::SubscriberValidatedLocations, }; use chrono::{Duration, Utc}; + use file_store::speedtest::CellSpeedtest; use futures::stream; use helium_proto::services::poc_mobile::mobile_reward_share::Reward as MobileReward; use prost::Message; @@ -526,7 +527,7 @@ mod test { assert_eq!(data_transfer_rewards.reward_scale().round_dp(1), dec!(0.5)); } - fn bytes_per_s(mbps: i64) -> i64 { + fn bytes_per_s(mbps: u64) -> u64 { mbps * 125000 } @@ -538,43 +539,55 @@ mod test { fn acceptable_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { + report: CellSpeedtest { pubkey, timestamp, upload_speed: bytes_per_s(10), download_speed: bytes_per_s(100), latency: 25, + serial: "".to_string(), } } + } fn degraded_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { + report: CellSpeedtest { pubkey, timestamp, upload_speed: bytes_per_s(5), download_speed: bytes_per_s(60), latency: 60, + serial: "".to_string(), } } + } fn failed_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { + report: CellSpeedtest { pubkey, timestamp, upload_speed: bytes_per_s(1), download_speed: bytes_per_s(20), latency: 110, + serial: "".to_string(), } } + } fn poor_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { + report: CellSpeedtest { pubkey, timestamp, upload_speed: bytes_per_s(2), download_speed: bytes_per_s(40), latency: 90, + serial: "".to_string(), } } + } #[tokio::test] async fn test_radio_weights() { diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 8101fbcdb..c89737894 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -232,7 +232,7 @@ impl Rewarder { // clear out the various db tables heartbeats::clear_heartbeats(&mut transaction, reward_period).await?; - speedtests::clear_speedtests(&mut transaction).await?; + speedtests::clear_speedtests(&mut transaction, reward_period).await?; data_session::clear_hotspot_data_sessions(&mut transaction, reward_period).await?; subscriber_location::clear_location_shares(&mut transaction, reward_period).await?; diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 25fa9c893..7da4e9154 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -1,46 +1,41 @@ -use crate::speedtests_average::{SpeedtestAverage, SPEEDTEST_LAPSE}; -use chrono::{DateTime, Duration, Utc}; +use crate::speedtests_average::SpeedtestAverage; +use chrono::{DateTime, Utc}; use file_store::{ - file_info_poller::FileInfoStream, file_sink::FileSinkClient, - speedtest::CellSpeedtestIngestReport, + file_info_poller::FileInfoStream, + file_sink::FileSinkClient, + speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, +}; +use futures::{ + stream::{Stream, StreamExt, TryStreamExt}, + TryFutureExt, }; -use futures::{StreamExt, TryFutureExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient}; -use sqlx::{FromRow, Postgres, Transaction, Type}; -use std::collections::HashMap; +use sqlx::{postgres::PgRow, FromRow, Postgres, Row, Transaction}; +use std::{collections::HashMap, ops::Range, pin::pin}; use tokio::sync::mpsc::Receiver; const SPEEDTEST_AVG_MAX_DATA_POINTS: usize = 6; pub type EpochSpeedTests = HashMap>; -#[derive(Debug, Clone, Type, FromRow)] -#[sqlx(type_name = "speedtest")] +#[derive(Debug, Clone)] pub struct Speedtest { - pub pubkey: PublicKeyBinary, - pub upload_speed: i64, - pub download_speed: i64, - pub latency: i32, - pub timestamp: DateTime, + pub report: CellSpeedtest, } -impl Speedtest { - #[cfg(test)] - pub fn new( - pubkey: PublicKeyBinary, - timestamp: DateTime, - upload_speed: i64, - download_speed: i64, - latency: i32, - ) -> Self { - Self { - pubkey, - timestamp, - upload_speed, - download_speed, - latency, - } +impl FromRow<'_, PgRow> for Speedtest { + fn from_row(row: &PgRow) -> sqlx::Result { + Ok(Self { + report: CellSpeedtest { + pubkey: row.get::("pubkey"), + serial: row.get::("serial"), + upload_speed: row.get::("upload_speed") as u64, + download_speed: row.get::("download_speed") as u64, + timestamp: row.get::, &str>("timestamp"), + latency: row.get::("latency") as u32, + }, + }) } } @@ -87,46 +82,81 @@ impl SpeedtestDaemon { async fn process_file( &self, - file_info_stream: FileInfoStream, + file: FileInfoStream, ) -> anyhow::Result<()> { - tracing::info!( - "Processing speedtest file {}", - file_info_stream.file_info.key - ); + tracing::info!("Processing speedtest file {}", file.file_info.key); + let mut transaction = self.pool.begin().await?; - // process the speedtest reports from the file, if valid insert to the db - // and recalcuate a new average - file_info_stream - .into_stream(&mut transaction) - .await? - .map(anyhow::Ok) - .try_fold(transaction, |mut transaction, report| async { - let pubkey = report.report.pubkey.clone(); - if self - .gateway_client + let speedtests = file.into_stream(&mut transaction).await?; + + let mut validated_speedtests = + pin!(Speedtest::validate_speedtests(&self.gateway_client, speedtests).await); + + // todo, the double some is odd, what be the alternative ? + while let Some(Some(speedtest)) = validated_speedtests.next().await { + let pubkey = speedtest.report.pubkey.clone(); + speedtest.save(&mut transaction).await?; + let latest_speedtests = + get_latest_speedtests_for_pubkey(&pubkey, &mut transaction).await?; + let average = SpeedtestAverage::from(&latest_speedtests); + average.write(&self.file_sink, latest_speedtests).await?; + } + self.file_sink.commit().await?; + transaction.commit().await?; + + Ok(()) + } +} + +impl Speedtest { + pub async fn validate_speedtests<'a>( + gateway_client: &'a GatewayClient, + speedtests: impl Stream + 'a, + ) -> impl Stream> + 'a { + speedtests.then(move |report| { + let pubkey = report.report.pubkey.clone(); + async move { + if gateway_client .resolve_gateway_info(&pubkey.clone()) .await .is_ok() { - save_speedtest_to_db(report, &mut transaction).await?; - let latest_speedtests: Vec = - get_latest_speedtests_for_pubkey(&pubkey, &mut transaction).await?; - let average = SpeedtestAverage::from(&latest_speedtests); - average.write(&self.file_sink, latest_speedtests).await?; + Some(Speedtest { + report: report.report.into(), + }) } - Ok(transaction) - }) - .await? - .commit() - .await?; - // db work all done, commit the reports to s3 - self.file_sink.commit().await?; + else { + None + } + } + }) + } + pub async fn save( + &self, + exec: &mut Transaction<'_, Postgres>, + ) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + insert into speedtests (pubkey, upload_speed, download_speed, latency, serial, timestamp) + values ($1, $2, $3, $4, $5) + "#, + ) + .bind(self.report.pubkey.clone()) + .bind(self.report.upload_speed as i64) + .bind(self.report.download_speed as i64) + .bind(self.report.latency as i64) + .bind(self.report.latency as i64) + .bind(self.report.serial.clone()) + .bind(self.report.timestamp) + .execute(exec) + .await?; Ok(()) } } -pub async fn get_latest_speedtests_for_pubkey<'a>( + +pub async fn get_latest_speedtests_for_pubkey( pubkey: &PublicKeyBinary, exec: &mut Transaction<'_, Postgres>, ) -> Result, sqlx::Error> { @@ -147,13 +177,13 @@ pub async fn get_latest_speedtests_for_pubkey<'a>( pub async fn aggregate_epoch_speedtests<'a>( epoch_end: DateTime, - exec: impl sqlx::PgExecutor<'a> + Copy + 'a, + exec: &sqlx::Pool ) -> Result { let mut speedtests = EpochSpeedTests::new(); // pull the last N most recent speedtests from prior to the epoch end for each pubkey let mut rows = sqlx::query_as::<_, Speedtest>( "select * from ( - SELECT distinct(pubkey), upload_speed, download_speed, latency, timestamp, row_number() + SELECT distinct(pubkey), upload_speed, download_speed, latency, timestamp, serial, row_number() over (partition by pubkey order by timestamp desc) as count FROM speedtests where timestamp < $1 ) as tmp where count < $2" @@ -164,39 +194,19 @@ pub async fn aggregate_epoch_speedtests<'a>( // collate the returned speedtests based on pubkey while let Some(speedtest) = rows.try_next().await? { speedtests - .entry(speedtest.pubkey.clone()) + .entry(speedtest.report.pubkey.clone()) .or_default() .push(speedtest); } Ok(speedtests) } -pub async fn save_speedtest_to_db( - report: CellSpeedtestIngestReport, - exec: &mut Transaction<'_, Postgres>, -) -> Result<(), sqlx::Error> { - sqlx::query( - r#" - insert into speedtests (pubkey, upload_speed, download_speed, latency, timestamp) - values ($1, $2, $3, $4, $5) - "#, - ) - .bind(report.report.pubkey) - .bind(report.report.upload_speed as i64) - .bind(report.report.download_speed as i64) - .bind(report.report.latency as i64) - .bind(report.report.timestamp) - .execute(exec) - .await?; - Ok(()) -} - -// Clear the speedtests table of tests older than hours defined by SPEEDTEST_LAPSE pub async fn clear_speedtests( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + reward_period: &Range>, ) -> Result<(), sqlx::Error> { - sqlx::query("DELETE FROM speedtests where timestamp < $1") - .bind(Utc::now() - Duration::hours(SPEEDTEST_LAPSE)) + sqlx::query("DELETE FROM speedtests WHERE timestamp < $1") + .bind(reward_period.start) .execute(&mut *tx) .await?; Ok(()) diff --git a/mobile_verifier/src/speedtests_average.rs b/mobile_verifier/src/speedtests_average.rs index 12976de9c..33a608b9b 100644 --- a/mobile_verifier/src/speedtests_average.rs +++ b/mobile_verifier/src/speedtests_average.rs @@ -38,17 +38,14 @@ where let mut sum_latency = 0; for Speedtest { - pubkey, // eww! - upload_speed, - download_speed, - latency, + report, .. } in speedtests_without_lapsed(iter.into_iter(), Duration::hours(SPEEDTEST_LAPSE)) { - id = pubkey.as_ref().to_vec(); // eww! - sum_upload += *upload_speed as u64; - sum_download += *download_speed as u64; - sum_latency += *latency as u32; + id = report.pubkey.as_ref().to_vec(); // eww! + sum_upload += report.upload_speed as u64; + sum_download += report.download_speed as u64; + sum_latency += report.latency as u32; window_size += 1; } @@ -111,10 +108,10 @@ impl SpeedtestAverage { Duration::hours(SPEEDTEST_LAPSE), ) .map(|st| proto::Speedtest { - timestamp: st.timestamp.timestamp() as u64, - upload_speed_bps: st.upload_speed as u64, - download_speed_bps: st.download_speed as u64, - latency_ms: st.latency as u32, + timestamp: st.report.timestamp.timestamp() as u64, + upload_speed_bps: st.report.upload_speed as u64, + download_speed_bps: st.report.download_speed as u64, + latency_ms: st.report.latency as u32, }) .collect(), validity: self.validity as i32, @@ -221,11 +218,11 @@ impl SpeedtestAverages { self.averages.get(pub_key).cloned() } - pub async fn aggregate_epoch_averages<'a>( + pub async fn aggregate_epoch_averages( epoch_end: DateTime, - exec: impl sqlx::PgExecutor<'a> + Copy + 'a, + pool: &sqlx::Pool, ) -> Result { - let averages: EpochAverages = speedtests::aggregate_epoch_speedtests(epoch_end, exec) + let averages: EpochAverages = speedtests::aggregate_epoch_speedtests(epoch_end, pool) .await? .into_iter() .map(|(pub_key, speedtests)| { @@ -280,9 +277,9 @@ fn speedtests_without_lapsed<'a>( ) -> impl Iterator { let mut last_timestamp = None; iterable.take_while(move |speedtest| match last_timestamp { - Some(ts) if ts - speedtest.timestamp > lapse_cliff => false, + Some(ts) if ts - speedtest.report.timestamp > lapse_cliff => false, None | Some(_) => { - last_timestamp = Some(speedtest.timestamp); + last_timestamp = Some(speedtest.report.timestamp); true } }) @@ -296,13 +293,14 @@ const fn mbps(mbps: u64) -> u64 { mod test { use super::*; use chrono::TimeZone; + use file_store::speedtest::CellSpeedtest; fn parse_dt(dt: &str) -> DateTime { Utc.datetime_from_str(dt, "%Y-%m-%d %H:%M:%S %z") .expect("unable_to_parse") } - fn bytes_per_s(mbps: i64) -> i64 { + fn bytes_per_s(mbps: u64) -> u64 { mbps * 125000 } @@ -369,50 +367,50 @@ mod test { .expect("failed gw1 parse"); vec![ - Speedtest::new(gw1.clone(), parse_dt("2022-08-02 18:00:00 +0000"), 0, 0, 0), - Speedtest::new( + default_cellspeedtest(gw1.clone(), parse_dt("2022-08-02 18:00:00 +0000"), 0, 0, 0), + default_cellspeedtest( gw1.clone(), parse_dt("2022-08-02 12:00:00 +0000"), bytes_per_s(20), bytes_per_s(150), 70, ), - Speedtest::new( + default_cellspeedtest( gw1.clone(), parse_dt("2022-08-02 6:00:00 +0000"), bytes_per_s(10), bytes_per_s(118), 50, ), - Speedtest::new( + default_cellspeedtest( gw1.clone(), parse_dt("2022-08-02 0:00:00 +0000"), bytes_per_s(30), bytes_per_s(112), 40, ), - Speedtest::new( + default_cellspeedtest( gw1.clone(), parse_dt("2022-08-02 0:00:00 +0000"), bytes_per_s(15), bytes_per_s(90), 10, ), - Speedtest::new( + default_cellspeedtest( gw1.clone(), parse_dt("2022-08-01 18:00:00 +0000"), bytes_per_s(20), bytes_per_s(130), 10, ), - Speedtest::new( + default_cellspeedtest( gw1.clone(), parse_dt("2022-08-01 12:00:00 +0000"), bytes_per_s(10), bytes_per_s(100), 30, ), - Speedtest::new( + default_cellspeedtest( gw1, parse_dt("2022-08-01 6:00:00 +0000"), bytes_per_s(30), @@ -432,21 +430,21 @@ mod test { .parse() .expect("failed owner parse"); let disjoint_speedtests = vec![ - Speedtest::new( + default_cellspeedtest( pubkey.clone(), parse_dt("2022-08-02 6:00:00 +0000"), bytes_per_s(20), bytes_per_s(150), 70, ), - Speedtest::new( + default_cellspeedtest( pubkey.clone(), parse_dt("2022-08-01 18:00:00 +0000"), bytes_per_s(10), bytes_per_s(118), 50, ), - Speedtest::new( + default_cellspeedtest( pubkey, parse_dt("2022-08-01 12:00:00 +0000"), bytes_per_s(30), @@ -460,4 +458,25 @@ mod test { assert_eq!(contiguous_speedtests.count(), 8); assert_eq!(disjoint_speedtests.count(), 1); } + + fn default_cellspeedtest( + pubkey: PublicKeyBinary, + timestamp: DateTime, + upload_speed: u64, + download_speed: u64, + latency: u32, + ) -> Speedtest { + Speedtest{ + report: + CellSpeedtest { + pubkey, + timestamp, + upload_speed, + download_speed, + latency, + serial: "".to_string(), + } + } + } } +