From 65ac93381bfdad9e6fdec477c1f548b261863847 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Wed, 26 Jul 2023 17:01:05 +0100 Subject: [PATCH 1/4] refactor speedtests handling, ensure rewards only include tests up until epoch end --- .../migrations/15_speedtests_one_to_one.sql | 22 + mobile_verifier/src/cli/reward_from_db.rs | 15 +- mobile_verifier/src/data_session.rs | 4 +- mobile_verifier/src/heartbeats.rs | 11 + mobile_verifier/src/lib.rs | 1 + mobile_verifier/src/reward_shares.rs | 155 ++-- mobile_verifier/src/rewarder.rs | 35 +- mobile_verifier/src/speedtests.rs | 722 +++--------------- mobile_verifier/src/speedtests_average.rs | 478 ++++++++++++ mobile_verifier/src/subscriber_location.rs | 4 +- 10 files changed, 751 insertions(+), 696 deletions(-) create mode 100644 mobile_verifier/migrations/15_speedtests_one_to_one.sql create mode 100644 mobile_verifier/src/speedtests_average.rs diff --git a/mobile_verifier/migrations/15_speedtests_one_to_one.sql b/mobile_verifier/migrations/15_speedtests_one_to_one.sql new file mode 100644 index 000000000..af6a185bc --- /dev/null +++ b/mobile_verifier/migrations/15_speedtests_one_to_one.sql @@ -0,0 +1,22 @@ + +CREATE TABLE speedtests_migration ( + pubkey text NOT NULL, + upload_speed bigint, + download_speed bigint, + latency integer, + serial text, + timestamp timestamptz NOT NULL, + inserted_at timestamptz default now(), + PRIMARY KEY(pubkey, timestamp) +); +CREATE INDEX idx_speedtests_pubkey on speedtests_migration (pubkey); + +INSERT INTO speedtests_migration (pubkey, upload_speed, download_speed, latency, serial, timestamp) +SELECT id, (st).upload_speed, (st).download_speed, (st).latency, '', (st).timestamp +FROM (select id, unnest(speedtests) as st from speedtests) as tmp +ON CONFLICT DO NOTHING; + +ALTER TABLE speedtests RENAME TO speedtests_old; +ALTER TABLE speedtests_migration RENAME TO speedtests; + + diff --git a/mobile_verifier/src/cli/reward_from_db.rs b/mobile_verifier/src/cli/reward_from_db.rs index 0246cc80f..d88cbcb27 100644 --- a/mobile_verifier/src/cli/reward_from_db.rs +++ b/mobile_verifier/src/cli/reward_from_db.rs @@ -1,7 +1,7 @@ use crate::{ heartbeats::HeartbeatReward, reward_shares::{get_scheduled_tokens_for_poc_and_dc, PocShares}, - speedtests::{Average, SpeedtestAverages}, + speedtests_average::SpeedtestAverages, Settings, }; use anyhow::Result; @@ -39,8 +39,9 @@ impl Cmd { .await?; let heartbeats = HeartbeatReward::validated(&pool, &epoch); - let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?; - let reward_shares = PocShares::aggregate(heartbeats, speedtests.clone()).await?; + let speedtest_averages = + SpeedtestAverages::aggregate_epoch_averages(epoch.end, &pool).await?; + let reward_shares = PocShares::aggregate(heartbeats, &speedtest_averages).await?; let mut total_rewards = 0_u64; let mut owner_rewards = HashMap::<_, u64>::new(); @@ -62,11 +63,11 @@ impl Cmd { } let rewards: Vec<_> = owner_rewards.into_iter().collect(); let mut multiplier_count = HashMap::<_, usize>::new(); - let speedtest_multipliers: Vec<_> = speedtests - .speedtests + let speedtest_multipliers: Vec<_> = speedtest_averages + .averages .into_iter() - .map(|(pub_key, avg)| { - let reward_multiplier = Average::from(&avg).reward_multiplier(); + .map(|(pub_key, average)| { + let reward_multiplier = average.reward_multiplier; *multiplier_count.entry(reward_multiplier).or_default() += 1; (pub_key, reward_multiplier) }) diff --git a/mobile_verifier/src/data_session.rs b/mobile_verifier/src/data_session.rs index 49f1b6d0f..2353fd8c6 100644 --- a/mobile_verifier/src/data_session.rs +++ b/mobile_verifier/src/data_session.rs @@ -144,10 +144,10 @@ pub async fn data_sessions_to_dc<'a>( pub async fn clear_hotspot_data_sessions( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, - reward_period: &Range>, + timestamp: &DateTime, ) -> Result<(), sqlx::Error> { sqlx::query("delete from hotspot_data_transfer_sessions where received_timestamp < $1") - .bind(reward_period.end) + .bind(timestamp) .execute(&mut *tx) .await?; Ok(()) diff --git a/mobile_verifier/src/heartbeats.rs b/mobile_verifier/src/heartbeats.rs index 01e1fd383..715a7a5e2 100644 --- a/mobile_verifier/src/heartbeats.rs +++ b/mobile_verifier/src/heartbeats.rs @@ -289,3 +289,14 @@ async fn validate_heartbeat( Ok((cell_type, proto::HeartbeatValidity::Valid)) } + +pub async fn clear_heartbeats( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + timestamp: &DateTime, +) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM heartbeats WHERE truncated_timestamp < $1") + .bind(timestamp) + .execute(&mut *tx) + .await?; + Ok(()) +} diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index 84d891206..8f6252359 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -4,6 +4,7 @@ mod heartbeats; mod reward_shares; mod settings; mod speedtests; +mod speedtests_average; mod subscriber_location; mod telemetry; diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index acaa22195..67bcde84f 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -1,7 +1,6 @@ +use crate::speedtests_average::SpeedtestAverages; use crate::{ - data_session::HotspotMap, - heartbeats::HeartbeatReward, - speedtests::{Average, SpeedtestAverages}, + data_session::HotspotMap, heartbeats::HeartbeatReward, speedtests_average::SpeedtestAverage, subscriber_location::SubscriberValidatedLocations, }; @@ -221,15 +220,15 @@ pub struct PocShares { impl PocShares { pub async fn aggregate( heartbeats: impl Stream>, - speedtests: SpeedtestAverages, + speedtest_averages: &SpeedtestAverages, ) -> Result { let mut poc_shares = Self::default(); let mut heartbeats = std::pin::pin!(heartbeats); while let Some(heartbeat) = heartbeats.next().await.transpose()? { - let speedmultiplier = speedtests + let speedmultiplier = speedtest_averages .get_average(&heartbeat.hotspot_key) .as_ref() - .map_or(Decimal::ZERO, Average::reward_multiplier); + .map_or(Decimal::ZERO, SpeedtestAverage::reward_multiplier); *poc_shares .hotspot_shares .entry(heartbeat.hotspot_key) @@ -326,14 +325,16 @@ mod test { data_session, data_session::HotspotDataSession, heartbeats::HeartbeatReward, - speedtests::{Speedtest, SpeedtestAverages}, + speedtests::Speedtest, + speedtests_average::{SpeedtestAverage, SpeedtestAverages}, subscriber_location::SubscriberValidatedLocations, }; use chrono::{Duration, Utc}; + use file_store::speedtest::CellSpeedtest; use futures::stream; use helium_proto::services::poc_mobile::mobile_reward_share::Reward as MobileReward; use prost::Message; - use std::collections::{HashMap, VecDeque}; + use std::collections::HashMap; fn valid_shares() -> RadioShares { let mut radio_shares: HashMap = Default::default(); @@ -526,7 +527,7 @@ mod test { assert_eq!(data_transfer_rewards.reward_scale().round_dp(1), dec!(0.5)); } - fn bytes_per_s(mbps: i64) -> i64 { + fn bytes_per_s(mbps: u64) -> u64 { mbps * 125000 } @@ -536,39 +537,55 @@ mod test { .reward_weight() } - fn acceptable_speedtest(timestamp: DateTime) -> Speedtest { + fn acceptable_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { - timestamp, - upload_speed: bytes_per_s(10), - download_speed: bytes_per_s(100), - latency: 25, + report: CellSpeedtest { + pubkey, + timestamp, + upload_speed: bytes_per_s(10), + download_speed: bytes_per_s(100), + latency: 25, + serial: "".to_string(), + }, } } - fn degraded_speedtest(timestamp: DateTime) -> Speedtest { + fn degraded_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { - timestamp, - upload_speed: bytes_per_s(5), - download_speed: bytes_per_s(60), - latency: 60, + report: CellSpeedtest { + pubkey, + timestamp, + upload_speed: bytes_per_s(5), + download_speed: bytes_per_s(60), + latency: 60, + serial: "".to_string(), + }, } } - fn failed_speedtest(timestamp: DateTime) -> Speedtest { + fn failed_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { - timestamp, - upload_speed: bytes_per_s(1), - download_speed: bytes_per_s(20), - latency: 110, + report: CellSpeedtest { + pubkey, + timestamp, + upload_speed: bytes_per_s(1), + download_speed: bytes_per_s(20), + latency: 110, + serial: "".to_string(), + }, } } - fn poor_speedtest(timestamp: DateTime) -> Speedtest { + fn poor_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { - timestamp, - upload_speed: bytes_per_s(2), - download_speed: bytes_per_s(40), - latency: 90, + report: CellSpeedtest { + pubkey, + timestamp, + upload_speed: bytes_per_s(2), + download_speed: bytes_per_s(40), + latency: 90, + serial: "".to_string(), + }, } } @@ -611,19 +628,21 @@ mod test { let last_timestamp = timestamp - Duration::hours(12); let g1_speedtests = vec![ - acceptable_speedtest(last_timestamp), - acceptable_speedtest(timestamp), + acceptable_speedtest(g1.clone(), last_timestamp), + acceptable_speedtest(g1.clone(), timestamp), ]; let g2_speedtests = vec![ - acceptable_speedtest(last_timestamp), - acceptable_speedtest(timestamp), + acceptable_speedtest(g2.clone(), last_timestamp), + acceptable_speedtest(g2.clone(), timestamp), ]; - let mut speedtests = HashMap::new(); - speedtests.insert(g1.clone(), VecDeque::from(g1_speedtests)); - speedtests.insert(g2.clone(), VecDeque::from(g2_speedtests)); - let speedtest_avgs = SpeedtestAverages { speedtests }; - - let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs) + let g1_average = SpeedtestAverage::from(&g1_speedtests); + let g2_average = SpeedtestAverage::from(&g2_speedtests); + let mut averages = HashMap::new(); + averages.insert(g1.clone(), g1_average); + averages.insert(g2.clone(), g2_average); + let speedtest_avgs = SpeedtestAverages { averages }; + + let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs) .await .unwrap(); @@ -779,44 +798,56 @@ mod test { // setup speedtests let last_speedtest = timestamp - Duration::hours(12); let gw1_speedtests = vec![ - acceptable_speedtest(last_speedtest), - acceptable_speedtest(timestamp), + acceptable_speedtest(gw1.clone(), last_speedtest), + acceptable_speedtest(gw1.clone(), timestamp), ]; let gw2_speedtests = vec![ - acceptable_speedtest(last_speedtest), - acceptable_speedtest(timestamp), + acceptable_speedtest(gw2.clone(), last_speedtest), + acceptable_speedtest(gw2.clone(), timestamp), ]; let gw3_speedtests = vec![ - acceptable_speedtest(last_speedtest), - acceptable_speedtest(timestamp), + acceptable_speedtest(gw3.clone(), last_speedtest), + acceptable_speedtest(gw3.clone(), timestamp), ]; let gw4_speedtests = vec![ - acceptable_speedtest(last_speedtest), - acceptable_speedtest(timestamp), + acceptable_speedtest(gw4.clone(), last_speedtest), + acceptable_speedtest(gw4.clone(), timestamp), ]; let gw5_speedtests = vec![ - degraded_speedtest(last_speedtest), - degraded_speedtest(timestamp), + degraded_speedtest(gw5.clone(), last_speedtest), + degraded_speedtest(gw5.clone(), timestamp), ]; let gw6_speedtests = vec![ - failed_speedtest(last_speedtest), - failed_speedtest(timestamp), + failed_speedtest(gw6.clone(), last_speedtest), + failed_speedtest(gw6.clone(), timestamp), + ]; + let gw7_speedtests = vec![ + poor_speedtest(gw7.clone(), last_speedtest), + poor_speedtest(gw7.clone(), timestamp), ]; - let gw7_speedtests = vec![poor_speedtest(last_speedtest), poor_speedtest(timestamp)]; - let mut speedtests = HashMap::new(); - speedtests.insert(gw1, VecDeque::from(gw1_speedtests)); - speedtests.insert(gw2, VecDeque::from(gw2_speedtests)); - speedtests.insert(gw3, VecDeque::from(gw3_speedtests)); - speedtests.insert(gw4, VecDeque::from(gw4_speedtests)); - speedtests.insert(gw5, VecDeque::from(gw5_speedtests)); - speedtests.insert(gw6, VecDeque::from(gw6_speedtests)); - speedtests.insert(gw7, VecDeque::from(gw7_speedtests)); - let speedtest_avgs = SpeedtestAverages { speedtests }; + + let gw1_average = SpeedtestAverage::from(&gw1_speedtests); + let gw2_average = SpeedtestAverage::from(&gw2_speedtests); + let gw3_average = SpeedtestAverage::from(&gw3_speedtests); + let gw4_average = SpeedtestAverage::from(&gw4_speedtests); + let gw5_average = SpeedtestAverage::from(&gw5_speedtests); + let gw6_average = SpeedtestAverage::from(&gw6_speedtests); + let gw7_average = SpeedtestAverage::from(&gw7_speedtests); + let mut averages = HashMap::new(); + averages.insert(gw1.clone(), gw1_average); + averages.insert(gw2.clone(), gw2_average); + averages.insert(gw3.clone(), gw3_average); + averages.insert(gw4.clone(), gw4_average); + averages.insert(gw5.clone(), gw5_average); + averages.insert(gw6.clone(), gw6_average); + averages.insert(gw7.clone(), gw7_average); + + let speedtest_avgs = SpeedtestAverages { averages }; // calculate the rewards for the sample group let mut owner_rewards = HashMap::::new(); let epoch = (now - Duration::hours(1))..now; - for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs) + for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs) .await .unwrap() .into_rewards(Decimal::ZERO, &epoch) diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 5ed8b9255..42678d293 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -1,8 +1,9 @@ use crate::{ data_session, - heartbeats::HeartbeatReward, + heartbeats::{self, HeartbeatReward}, reward_shares::{MapperShares, PocShares, TransferRewards}, - speedtests::SpeedtestAverages, + speedtests, + speedtests_average::SpeedtestAverages, subscriber_location, telemetry, }; use anyhow::bail; @@ -111,12 +112,10 @@ impl Rewarder { return Ok(false); } - if sqlx::query_scalar::<_, i64>( - "SELECT COUNT(*) FROM speedtests WHERE latest_timestamp >= $1", - ) - .bind(reward_period.end) - .fetch_one(&self.pool) - .await? + if sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM speedtests WHERE timestamp >= $1") + .bind(reward_period.end) + .fetch_one(&self.pool) + .await? == 0 { tracing::info!("No speedtests found past reward period"); @@ -139,9 +138,9 @@ impl Rewarder { ); let heartbeats = HeartbeatReward::validated(&self.pool, reward_period); - let speedtests = SpeedtestAverages::validated(&self.pool, reward_period.end).await?; - - let poc_rewards = PocShares::aggregate(heartbeats, speedtests).await?; + let speedtest_averages = + SpeedtestAverages::aggregate_epoch_averages(reward_period.end, &self.pool).await?; + let poc_rewards = PocShares::aggregate(heartbeats, &speedtest_averages).await?; let mobile_price = self .price_tracker .price(&helium_proto::BlockchainTokenTypeV1::Mobile) @@ -214,15 +213,11 @@ impl Rewarder { let mut transaction = self.pool.begin().await?; - // Clear the heartbeats table of old heartbeats: - sqlx::query("DELETE FROM heartbeats WHERE truncated_timestamp < $1") - .bind(reward_period.start) - .execute(&mut transaction) - .await?; - - // clear the db of data sessions data & subscriber location data for the epoch - data_session::clear_hotspot_data_sessions(&mut transaction, reward_period).await?; - // subscriber_location::clear_location_shares(&mut transaction, reward_period).await?; + // clear out the various db tables + heartbeats::clear_heartbeats(&mut transaction, &reward_period.start).await?; + speedtests::clear_speedtests(&mut transaction, &reward_period.end).await?; + data_session::clear_hotspot_data_sessions(&mut transaction, &reward_period.end).await?; + // subscriber_location::clear_location_shares(&mut transaction, &reward_period.end).await?; let next_reward_period = scheduler.next_reward_period(); save_last_rewarded_end_time(&mut transaction, &next_reward_period.start).await?; diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index db70e7fae..ae5b84456 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -1,55 +1,41 @@ +use crate::speedtests_average::{SpeedtestAverage, SPEEDTEST_LAPSE}; use chrono::{DateTime, Duration, Utc}; use file_store::{ file_info_poller::FileInfoStream, - file_sink::{self, FileSinkClient}, + file_sink::FileSinkClient, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, - traits::TimestampEncode, }; use futures::{ - stream::{Stream, StreamExt, TryStreamExt}, + stream::{StreamExt, TryStreamExt}, TryFutureExt, }; use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_mobile as proto; -use mobile_config::{client::ClientError, gateway_info::GatewayInfoResolver, GatewayClient}; -use rust_decimal::Decimal; -use rust_decimal_macros::dec; -use sqlx::{ - postgres::{types::PgHasArrayType, PgTypeInfo}, - FromRow, Postgres, Transaction, Type, -}; -use std::{ - collections::{HashMap, VecDeque}, - pin::pin, -}; +use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient}; +use sqlx::{postgres::PgRow, FromRow, Postgres, Row, Transaction}; +use std::collections::HashMap; use tokio::sync::mpsc::Receiver; const SPEEDTEST_AVG_MAX_DATA_POINTS: usize = 6; -const SPEEDTEST_LAPSE: i64 = 48; -#[derive(Debug, Clone, Type)] -#[sqlx(type_name = "speedtest")] -pub struct Speedtest { - pub timestamp: DateTime, - pub upload_speed: i64, - pub download_speed: i64, - pub latency: i32, -} +pub type EpochSpeedTests = HashMap>; -impl Speedtest { - #[cfg(test)] - pub fn new( - timestamp: DateTime, - upload_speed: i64, - download_speed: i64, - latency: i32, - ) -> Self { - Self { - timestamp, - upload_speed, - download_speed, - latency, - } +#[derive(Debug, Clone)] +pub struct Speedtest { + pub report: CellSpeedtest, +} + +impl FromRow<'_, PgRow> for Speedtest { + fn from_row(row: &PgRow) -> sqlx::Result { + Ok(Self { + report: CellSpeedtest { + pubkey: row.get::("pubkey"), + serial: row.get::("serial"), + upload_speed: row.get::("upload_speed") as u64, + download_speed: row.get::("download_speed") as u64, + timestamp: row.get::, &str>("timestamp"), + latency: row.get::("latency") as u32, + }, + }) } } @@ -99,577 +85,107 @@ impl SpeedtestDaemon { file: FileInfoStream, ) -> anyhow::Result<()> { tracing::info!("Processing speedtest file {}", file.file_info.key); - let mut transaction = self.pool.begin().await?; - let reports = file.into_stream(&mut transaction).await?; - - let mut validated_speedtests = pin!( - SpeedtestRollingAverage::validate_speedtests( - &self.gateway_client, - reports.map(|s| s.report), - &mut transaction, - ) - .await? - ); - while let Some(speedtest) = validated_speedtests.next().await.transpose()? { - speedtest.write(&self.file_sink).await?; - speedtest.save(&mut transaction).await?; + let mut speedtests = file.into_stream(&mut transaction).await?; + while let Some(speedtest_report) = speedtests.next().await { + let pubkey = speedtest_report.report.pubkey.clone(); + if self + .gateway_client + .resolve_gateway_info(&pubkey) + .await? + .is_some() + { + save_speedtest(&speedtest_report.report, &mut transaction).await?; + let latest_speedtests = + get_latest_speedtests_for_pubkey(&pubkey, &mut transaction).await?; + let average = SpeedtestAverage::from(&latest_speedtests); + average.write(&self.file_sink, latest_speedtests).await?; + } } - self.file_sink.commit().await?; transaction.commit().await?; - Ok(()) } } -impl From for Speedtest { - fn from(cell_speedtest: CellSpeedtest) -> Self { - Self { - timestamp: cell_speedtest.timestamp, - upload_speed: cell_speedtest.upload_speed as i64, - download_speed: cell_speedtest.download_speed as i64, - latency: cell_speedtest.latency as i32, - } - } -} - -impl PgHasArrayType for Speedtest { - fn array_type_info() -> PgTypeInfo { - PgTypeInfo::with_name("_speedtest") - } -} - -#[derive(FromRow)] -pub struct SpeedtestRollingAverage { - pub id: PublicKeyBinary, - pub speedtests: Vec, - pub latest_timestamp: DateTime, -} - -impl SpeedtestRollingAverage { - pub fn new(id: PublicKeyBinary) -> Self { - Self { - id, - speedtests: Vec::new(), - latest_timestamp: DateTime::::default(), - } - } - - pub async fn validate_speedtests<'a>( - gateway_client: &'a GatewayClient, - speedtests: impl Stream + 'a, - exec: &mut Transaction<'_, Postgres>, - ) -> Result> + 'a, sqlx::Error> { - let tests_by_publickey = speedtests - .fold( - HashMap::>::new(), - |mut map, cell_speedtest| async move { - map.entry(cell_speedtest.pubkey.clone()) - .or_default() - .push(cell_speedtest); - map - }, - ) - .await; - - let mut speedtests = Vec::new(); - for (pubkey, cell_speedtests) in tests_by_publickey.into_iter() { - let rolling_average: SpeedtestRollingAverage = - sqlx::query_as::<_, SpeedtestRollingAverage>( - "SELECT * FROM speedtests WHERE id = $1", - ) - .bind(&pubkey) - .fetch_optional(&mut *exec) - .await? - .unwrap_or_else(|| SpeedtestRollingAverage::new(pubkey.clone())); - speedtests.push((rolling_average, cell_speedtests)); - } - - Ok(futures::stream::iter(speedtests.into_iter()) - .then(move |(rolling_average, cell_speedtests)| { - async move { - // If we get back some gateway info for the given address, it's a valid address - if gateway_client - .resolve_gateway_info(&rolling_average.id) - .await? - .is_none() - { - return Ok(None); - } - Ok(Some((rolling_average, cell_speedtests))) - } - }) - .filter_map(|item| async move { item.transpose() }) - .map_ok(|(rolling_average, cell_speedtests)| { - let speedtests = cell_speedtests - .into_iter() - .map(Speedtest::from) - .chain(rolling_average.speedtests.into_iter()) - .take(SPEEDTEST_AVG_MAX_DATA_POINTS) - .collect::>(); - - Self { - id: rolling_average.id, - latest_timestamp: speedtests[0].timestamp, - speedtests, - } - })) - } - - pub async fn save(self, exec: impl sqlx::PgExecutor<'_>) -> Result { - #[derive(FromRow)] - struct SaveResult { - inserted: bool, - } - - sqlx::query_as::<_, SaveResult>( - r#" - insert into speedtests (id, speedtests, latest_timestamp) - values ($1, $2, $3) - on conflict (id) do update set - speedtests = EXCLUDED.speedtests, latest_timestamp = EXCLUDED.latest_timestamp - returning (xmax = 0) as inserted - "#, - ) - .bind(self.id) - .bind(self.speedtests) - .bind(self.latest_timestamp) - .fetch_one(exec) - .await - .map(|result| result.inserted) - } - - pub async fn write(&self, averages: &file_sink::FileSinkClient) -> file_store::Result { - // Write out the speedtests to S3 - let average = Average::from(&self.speedtests); - let validity = average.validity(); - // this is guaratneed to safely convert and not panic as it can only be one of - // four possible decimal values based on the speedtest average tier - let reward_multiplier = average.reward_multiplier().try_into().unwrap(); - let Average { - upload_speed_avg_bps, - download_speed_avg_bps, - latency_avg_ms, - .. - } = average; - averages - .write( - proto::SpeedtestAvg { - pub_key: self.id.clone().into(), - upload_speed_avg_bps, - download_speed_avg_bps, - latency_avg_ms, - timestamp: Utc::now().encode_timestamp(), - speedtests: speedtests_without_lapsed( - self.speedtests.iter(), - Duration::hours(SPEEDTEST_LAPSE), - ) - .map(|st| proto::Speedtest { - timestamp: st.timestamp.timestamp() as u64, - upload_speed_bps: st.upload_speed as u64, - download_speed_bps: st.download_speed as u64, - latency_ms: st.latency as u32, - }) - .collect(), - validity: validity as i32, - reward_multiplier, - }, - &[("validity", validity.as_str_name())], - ) - .await?; - - Ok(()) - } -} - -#[derive(Clone, Default)] -pub struct SpeedtestAverages { - // I'm not sure that VecDeque is actually all that useful here, considering - // we have to constantly convert between the two. - // It does make me more confident in the implementation of validate_speedtests - // though. - pub speedtests: HashMap>, -} - -impl SpeedtestAverages { - #[allow(dead_code)] - pub fn into_iter(self) -> impl IntoIterator { - self.speedtests - .into_iter() - .map(|(id, window)| SpeedtestRollingAverage { - id, - // window is guaranteed to be non-empty. For safety, we set the - // latest timestamp to epoch. - latest_timestamp: window - .front() - .map_or_else(DateTime::::default, |st| st.timestamp), - speedtests: Vec::from(window), - }) - } - - pub fn get_average(&self, pub_key: &PublicKeyBinary) -> Option { - self.speedtests.get(pub_key).map(Average::from) - } - - pub async fn validated( - exec: impl sqlx::PgExecutor<'_> + Copy, - period_end: DateTime, - ) -> Result { - let mut speedtests = HashMap::new(); - - let mut rows = sqlx::query_as::<_, SpeedtestRollingAverage>( - "SELECT * FROM speedtests where latest_timestamp >= $1", - ) - .bind((period_end - Duration::hours(SPEEDTEST_LAPSE)).naive_utc()) - .fetch(exec); - - while let Some(SpeedtestRollingAverage { - id, - speedtests: window, - .. - }) = rows.try_next().await? - { - speedtests.insert(id, VecDeque::from(window)); - } - - Ok(Self { speedtests }) - } -} - -impl Extend for SpeedtestAverages { - fn extend(&mut self, iter: T) - where - T: IntoIterator, - { - for SpeedtestRollingAverage { id, speedtests, .. } in iter.into_iter() { - self.speedtests.insert(id, VecDeque::from(speedtests)); - } - } -} - -fn speedtests_without_lapsed<'a>( - iterable: impl Iterator, - lapse_cliff: Duration, -) -> impl Iterator { - let mut last_timestamp = None; - iterable.take_while(move |speedtest| match last_timestamp { - Some(ts) if ts - speedtest.timestamp > lapse_cliff => false, - None | Some(_) => { - last_timestamp = Some(speedtest.timestamp); - true - } - }) -} - -#[derive(Clone, Debug, Default)] -pub struct Average { - pub window_size: usize, - pub upload_speed_avg_bps: u64, - pub download_speed_avg_bps: u64, - pub latency_avg_ms: u32, -} - -impl<'a, I: ?Sized> From<&'a I> for Average -where - &'a I: IntoIterator, -{ - fn from(iter: &'a I) -> Self { - let mut window_size = 0; - let mut sum_upload = 0; - let mut sum_download = 0; - let mut sum_latency = 0; - - for Speedtest { - upload_speed, - download_speed, - latency, - .. - } in speedtests_without_lapsed(iter.into_iter(), Duration::hours(SPEEDTEST_LAPSE)) - { - sum_upload += *upload_speed as u64; - sum_download += *download_speed as u64; - sum_latency += *latency as u32; - window_size += 1; - } - - if window_size > 0 { - Average { - window_size, - upload_speed_avg_bps: sum_upload / window_size as u64, - download_speed_avg_bps: sum_download / window_size as u64, - latency_avg_ms: sum_latency / window_size as u32, - } - } else { - Average::default() - } - } -} - -const MIN_DOWNLOAD: u64 = mbps(30); -const MIN_UPLOAD: u64 = mbps(2); -const MAX_LATENCY: u32 = 100; -pub const MIN_REQUIRED_SAMPLES: usize = 2; - -impl Average { - // TODO: Change this to a multiplier - pub fn validity(&self) -> proto::SpeedtestAvgValidity { - if self.window_size < MIN_REQUIRED_SAMPLES { - return proto::SpeedtestAvgValidity::TooFewSamples; - } - if self.download_speed_avg_bps < MIN_DOWNLOAD { - return proto::SpeedtestAvgValidity::SlowDownloadSpeed; - } - if self.upload_speed_avg_bps < MIN_UPLOAD { - return proto::SpeedtestAvgValidity::SlowUploadSpeed; - } - if self.latency_avg_ms > MAX_LATENCY { - return proto::SpeedtestAvgValidity::HighLatency; - } - proto::SpeedtestAvgValidity::Valid - } - - pub fn tier(&self) -> SpeedtestTier { - if self.window_size < MIN_REQUIRED_SAMPLES { - SpeedtestTier::Failed - } else { - SpeedtestTier::from_download_speed(self.download_speed_avg_bps) - .min(SpeedtestTier::from_upload_speed(self.upload_speed_avg_bps)) - .min(SpeedtestTier::from_latency(self.latency_avg_ms)) - } - } - - pub fn reward_multiplier(&self) -> Decimal { - self.tier().into_multiplier() - } -} - -const fn mbps(mbps: u64) -> u64 { - mbps * 125000 -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub enum SpeedtestTier { - Failed = 0, - Poor = 1, - Degraded = 2, - Acceptable = 3, -} - -impl SpeedtestTier { - fn into_multiplier(self) -> Decimal { - match self { - Self::Acceptable => dec!(1.0), - Self::Degraded => dec!(0.5), - Self::Poor => dec!(0.25), - Self::Failed => dec!(0.0), - } - } - - fn from_download_speed(download_speed: u64) -> Self { - if download_speed >= mbps(100) { - Self::Acceptable - } else if download_speed >= mbps(50) { - Self::Degraded - } else if download_speed >= mbps(30) { - Self::Poor - } else { - Self::Failed - } - } - - fn from_upload_speed(upload_speed: u64) -> Self { - if upload_speed >= mbps(10) { - Self::Acceptable - } else if upload_speed >= mbps(5) { - Self::Degraded - } else if upload_speed >= mbps(2) { - Self::Poor - } else { - Self::Failed - } - } - - fn from_latency(latency: u32) -> Self { - if latency <= 50 { - Self::Acceptable - } else if latency <= 75 { - Self::Degraded - } else if latency <= 100 { - Self::Poor - } else { - Self::Failed - } - } -} - -#[cfg(test)] -mod test { - use super::*; - use chrono::TimeZone; - - fn parse_dt(dt: &str) -> DateTime { - Utc.datetime_from_str(dt, "%Y-%m-%d %H:%M:%S %z") - .expect("unable_to_parse") - } - - fn bytes_per_s(mbps: i64) -> i64 { - mbps * 125000 - } - - fn known_speedtests() -> Vec { - // This data is taken from the spreadsheet - // Timestamp DL UL Latency DL RA UL RA Latency RA Acceptable? - // 2022-08-02 18:00:00 70 30 40 103.33 19.17 30.00 TRUE - // 2022-08-02 12:00:00 100 10 30 116.67 17.50 35.00 TRUE - // 2022-08-02 6:00:00 130 20 10 100.00 15.83 30.00 TRUE - // 2022-08-02 0:00:00 90 15 10 94.00 15.00 34.00 FALSE - // 2022-08-01 18:00:00 112 30 40 95.00 15.00 40.00 FALSE - // 2022-08-01 12:00:00 118 10 50 89.33 10.00 40.00 FALSE - // 2022-08-01 6:00:00 150 20 70 75.00 10.00 35.00 FALSE - // 2022-08-01 0:00:00 0 0 0 0.00 0.00 0.00 FALSE* - vec![ - Speedtest::new(parse_dt("2022-08-02 18:00:00 +0000"), 0, 0, 0), - Speedtest::new( - parse_dt("2022-08-02 12:00:00 +0000"), - bytes_per_s(20), - bytes_per_s(150), - 70, - ), - Speedtest::new( - parse_dt("2022-08-02 6:00:00 +0000"), - bytes_per_s(10), - bytes_per_s(118), - 50, - ), - Speedtest::new( - parse_dt("2022-08-02 0:00:00 +0000"), - bytes_per_s(30), - bytes_per_s(112), - 40, - ), - Speedtest::new( - parse_dt("2022-08-02 0:00:00 +0000"), - bytes_per_s(15), - bytes_per_s(90), - 10, - ), - Speedtest::new( - parse_dt("2022-08-01 18:00:00 +0000"), - bytes_per_s(20), - bytes_per_s(130), - 10, - ), - Speedtest::new( - parse_dt("2022-08-01 12:00:00 +0000"), - bytes_per_s(10), - bytes_per_s(100), - 30, - ), - Speedtest::new( - parse_dt("2022-08-01 6:00:00 +0000"), - bytes_per_s(30), - bytes_per_s(70), - 40, - ), - ] - } - - #[test] - fn check_tier_cmp() { - assert_eq!( - SpeedtestTier::Acceptable.min(SpeedtestTier::Failed), - SpeedtestTier::Failed, - ); - } - - #[test] - fn check_known_valid() { - let speedtests = known_speedtests(); - assert_ne!( - Average::from(&speedtests[0..5]).tier(), - SpeedtestTier::Acceptable, - ); - assert_eq!( - Average::from(&speedtests[0..6]).tier(), - SpeedtestTier::Acceptable - ); - } - - #[test] - fn check_minimum_known_valid() { - let speedtests = known_speedtests(); - assert_ne!( - Average::from(&speedtests[4..4]).tier(), - SpeedtestTier::Acceptable - ); - assert_eq!( - Average::from(&speedtests[4..=5]).tier(), - SpeedtestTier::Acceptable - ); - assert_eq!( - Average::from(&speedtests[4..=6]).tier(), - SpeedtestTier::Acceptable - ); - } - - #[test] - fn check_minimum_known_invalid() { - let speedtests = known_speedtests(); - assert_ne!( - Average::from(&speedtests[5..6]).tier(), - SpeedtestTier::Acceptable - ); - } - - #[test] - fn check_speedtest_rolling_avg() { - let owner: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" - .parse() - .expect("failed owner parse"); - let speedtests = VecDeque::from(known_speedtests()); - let avgs = SpeedtestAverages { - speedtests: HashMap::from([(owner, speedtests)]), - } - .into_iter(); - for avg in avgs { - if let Some(first) = avg.speedtests.first() { - assert_eq!(avg.latest_timestamp, first.timestamp); - } - } - } - - #[test] - fn check_speedtest_without_lapsed() { - let speedtest_cutoff = Duration::hours(10); - let contiguos_speedtests = known_speedtests(); - let contiguous_speedtests = - speedtests_without_lapsed(contiguos_speedtests.iter(), speedtest_cutoff); - - let disjoint_speedtests = vec![ - Speedtest::new( - parse_dt("2022-08-02 6:00:00 +0000"), - bytes_per_s(20), - bytes_per_s(150), - 70, - ), - Speedtest::new( - parse_dt("2022-08-01 18:00:00 +0000"), - bytes_per_s(10), - bytes_per_s(118), - 50, - ), - Speedtest::new( - parse_dt("2022-08-01 12:00:00 +0000"), - bytes_per_s(30), - bytes_per_s(112), - 40, - ), - ]; - let disjoint_speedtests = - speedtests_without_lapsed(disjoint_speedtests.iter(), speedtest_cutoff); - - assert_eq!(contiguous_speedtests.count(), 8); - assert_eq!(disjoint_speedtests.count(), 1); - } +pub async fn save_speedtest( + speedtest: &CellSpeedtest, + exec: &mut Transaction<'_, Postgres>, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + insert into speedtests (pubkey, upload_speed, download_speed, latency, serial, timestamp) + values ($1, $2, $3, $4, $5, $6) + on conflict (pubkey, timestamp) do nothing + "#, + ) + .bind(speedtest.pubkey.clone()) + .bind(speedtest.upload_speed as i64) + .bind(speedtest.download_speed as i64) + .bind(speedtest.latency as i64) + .bind(speedtest.serial.clone()) + .bind(speedtest.timestamp) + .execute(exec) + .await?; + Ok(()) +} + +pub async fn get_latest_speedtests_for_pubkey( + pubkey: &PublicKeyBinary, + exec: &mut Transaction<'_, Postgres>, +) -> Result, sqlx::Error> { + let mut speedtests = Vec::new(); + + let mut rows = sqlx::query_as::<_, Speedtest>( + "SELECT * FROM speedtests where pubkey = $1 order by timestamp desc limit $2", + ) + .bind(pubkey) + .bind(SPEEDTEST_AVG_MAX_DATA_POINTS as i64) + .fetch(exec); + + while let Some(speedtest) = rows.try_next().await? { + speedtests.push(speedtest); + } + Ok(speedtests) +} + +pub async fn aggregate_epoch_speedtests<'a>( + epoch_end: DateTime, + exec: &sqlx::Pool, +) -> Result { + let mut speedtests = EpochSpeedTests::new(); + // use latest speedtest which are no older than hours defined by SPEEDTEST_LAPSE + let start = epoch_end - Duration::hours(SPEEDTEST_LAPSE); + // pull the last N most recent speedtests from prior to the epoch end for each pubkey + let mut rows = sqlx::query_as::<_, Speedtest>( + "select * from ( + SELECT distinct(pubkey), upload_speed, download_speed, latency, timestamp, serial, row_number() + over (partition by pubkey order by timestamp desc) as count FROM speedtests where timestamp >= $1 and timestamp < $2 + ) as tmp + where count <= $3" + ) + .bind(start) + .bind(epoch_end) + .bind(SPEEDTEST_AVG_MAX_DATA_POINTS as i64) + .fetch(exec); + // collate the returned speedtests based on pubkey + while let Some(speedtest) = rows.try_next().await? { + speedtests + .entry(speedtest.report.pubkey.clone()) + .or_default() + .push(speedtest); + } + Ok(speedtests) +} + +pub async fn clear_speedtests( + tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, + epoch_end: &DateTime, +) -> Result<(), sqlx::Error> { + let oldest_ts = *epoch_end - Duration::hours(SPEEDTEST_LAPSE); + sqlx::query("DELETE FROM speedtests WHERE timestamp < $1") + .bind(oldest_ts) + .execute(&mut *tx) + .await?; + Ok(()) } diff --git a/mobile_verifier/src/speedtests_average.rs b/mobile_verifier/src/speedtests_average.rs new file mode 100644 index 000000000..23653b9d3 --- /dev/null +++ b/mobile_verifier/src/speedtests_average.rs @@ -0,0 +1,478 @@ +use crate::speedtests::{self, Speedtest}; +use chrono::{DateTime, Duration, Utc}; +use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::{self as proto, SpeedtestAvgValidity}; +use rust_decimal::Decimal; +use rust_decimal_macros::dec; +use std::collections::HashMap; + +pub const SPEEDTEST_LAPSE: i64 = 48; +const MIN_DOWNLOAD: u64 = mbps(30); +const MIN_UPLOAD: u64 = mbps(2); +const MAX_LATENCY: u32 = 100; +pub const MIN_REQUIRED_SAMPLES: usize = 2; + +pub type EpochAverages = HashMap; + +#[derive(Debug, Clone)] +pub struct SpeedtestAverage { + pub pubkey: PublicKeyBinary, + pub window_size: usize, + pub upload_speed_avg_bps: u64, + pub download_speed_avg_bps: u64, + pub latency_avg_ms: u32, + pub validity: SpeedtestAvgValidity, + pub reward_multiplier: Decimal, +} + +impl<'a, I: ?Sized> From<&'a I> for SpeedtestAverage +where + &'a I: IntoIterator, +{ + fn from(iter: &'a I) -> Self { + let mut id = vec![]; // eww! + let mut window_size = 0; + let mut sum_upload = 0; + let mut sum_download = 0; + let mut sum_latency = 0; + + for Speedtest { report, .. } in + speedtests_without_lapsed(iter.into_iter(), Duration::hours(SPEEDTEST_LAPSE)) + { + id = report.pubkey.as_ref().to_vec(); // eww! + sum_upload += report.upload_speed; + sum_download += report.download_speed; + sum_latency += report.latency; + window_size += 1; + } + + if window_size > 0 { + let upload_speed_avg_bps = sum_upload / window_size as u64; + let download_speed_avg_bps = sum_download / window_size as u64; + let latency_avg_ms = sum_latency / window_size as u32; + let validity = validity( + window_size as usize, + upload_speed_avg_bps, + download_speed_avg_bps, + latency_avg_ms, + ); + let tier = SpeedtestTier::new( + window_size as usize, + upload_speed_avg_bps, + download_speed_avg_bps, + latency_avg_ms, + ); + let reward_multiplier = tier.into_multiplier(); + SpeedtestAverage { + pubkey: id.into(), + window_size: window_size as usize, + upload_speed_avg_bps, + download_speed_avg_bps, + latency_avg_ms, + validity, + reward_multiplier, + } + } else { + SpeedtestAverage { + pubkey: id.into(), + window_size: 0, + upload_speed_avg_bps: sum_upload, + download_speed_avg_bps: sum_download, + latency_avg_ms: sum_latency, + validity: proto::SpeedtestAvgValidity::TooFewSamples, + reward_multiplier: Decimal::ZERO, + } + } + } +} + +impl SpeedtestAverage { + pub async fn write( + &self, + filesink: &FileSinkClient, + speedtests: Vec, + ) -> file_store::Result { + filesink + .write( + proto::SpeedtestAvg { + pub_key: self.pubkey.clone().into(), + upload_speed_avg_bps: self.upload_speed_avg_bps, + download_speed_avg_bps: self.download_speed_avg_bps, + latency_avg_ms: self.latency_avg_ms, + timestamp: Utc::now().encode_timestamp(), + speedtests: speedtests_without_lapsed( + speedtests.iter(), + Duration::hours(SPEEDTEST_LAPSE), + ) + .map(|st| proto::Speedtest { + timestamp: st.report.timestamp.timestamp() as u64, + upload_speed_bps: st.report.upload_speed, + download_speed_bps: st.report.download_speed, + latency_ms: st.report.latency, + }) + .collect(), + validity: self.validity as i32, + reward_multiplier: self.reward_multiplier.try_into().unwrap(), + }, + &[("validity", self.validity.as_str_name())], + ) + .await?; + Ok(()) + } + + pub fn reward_multiplier(&self) -> Decimal { + self.reward_multiplier + } + + #[allow(dead_code)] + // function used by tests only + pub fn tier(&self) -> SpeedtestTier { + calculate_tier( + self.window_size, + self.upload_speed_avg_bps, + self.download_speed_avg_bps, + self.latency_avg_ms, + ) + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum SpeedtestTier { + Failed = 0, + Poor = 1, + Degraded = 2, + Acceptable = 3, +} + +impl SpeedtestTier { + pub fn new( + window_size: usize, + upload_speed_avg_bps: u64, + download_speed_avg_bps: u64, + latency_avg_ms: u32, + ) -> SpeedtestTier { + calculate_tier( + window_size, + upload_speed_avg_bps, + download_speed_avg_bps, + latency_avg_ms, + ) + } + + fn into_multiplier(self) -> Decimal { + match self { + Self::Acceptable => dec!(1.0), + Self::Degraded => dec!(0.5), + Self::Poor => dec!(0.25), + Self::Failed => dec!(0.0), + } + } + + fn from_download_speed(download_speed: u64) -> Self { + if download_speed >= mbps(100) { + Self::Acceptable + } else if download_speed >= mbps(50) { + Self::Degraded + } else if download_speed >= mbps(30) { + Self::Poor + } else { + Self::Failed + } + } + + fn from_upload_speed(upload_speed: u64) -> Self { + if upload_speed >= mbps(10) { + Self::Acceptable + } else if upload_speed >= mbps(5) { + Self::Degraded + } else if upload_speed >= mbps(2) { + Self::Poor + } else { + Self::Failed + } + } + + fn from_latency(latency: u32) -> Self { + if latency <= 50 { + Self::Acceptable + } else if latency <= 75 { + Self::Degraded + } else if latency <= 100 { + Self::Poor + } else { + Self::Failed + } + } +} + +#[derive(Clone, Default)] +pub struct SpeedtestAverages { + pub averages: HashMap, +} + +impl SpeedtestAverages { + pub fn get_average(&self, pub_key: &PublicKeyBinary) -> Option { + self.averages.get(pub_key).cloned() + } + + pub async fn aggregate_epoch_averages( + epoch_end: DateTime, + pool: &sqlx::Pool, + ) -> Result { + let averages: EpochAverages = speedtests::aggregate_epoch_speedtests(epoch_end, pool) + .await? + .into_iter() + .map(|(pub_key, speedtests)| { + let average = SpeedtestAverage::from(&speedtests); + (pub_key, average) + }) + .collect(); + + Ok(Self { averages }) + } +} + +pub fn calculate_tier( + window_size: usize, + upload_speed_avg_bps: u64, + download_speed_avg_bps: u64, + latency_avg_ms: u32, +) -> SpeedtestTier { + if window_size < MIN_REQUIRED_SAMPLES { + SpeedtestTier::Failed + } else { + SpeedtestTier::from_download_speed(download_speed_avg_bps) + .min(SpeedtestTier::from_upload_speed(upload_speed_avg_bps)) + .min(SpeedtestTier::from_latency(latency_avg_ms)) + } +} + +pub fn validity( + window_size: usize, + upload_speed_avg_bps: u64, + download_speed_avg_bps: u64, + latency_avg_ms: u32, +) -> proto::SpeedtestAvgValidity { + if window_size < MIN_REQUIRED_SAMPLES { + return proto::SpeedtestAvgValidity::TooFewSamples; + } + if download_speed_avg_bps < MIN_DOWNLOAD { + return proto::SpeedtestAvgValidity::SlowDownloadSpeed; + } + if upload_speed_avg_bps < MIN_UPLOAD { + return proto::SpeedtestAvgValidity::SlowUploadSpeed; + } + if latency_avg_ms > MAX_LATENCY { + return proto::SpeedtestAvgValidity::HighLatency; + } + proto::SpeedtestAvgValidity::Valid +} + +fn speedtests_without_lapsed<'a>( + iterable: impl Iterator, + lapse_cliff: Duration, +) -> impl Iterator { + let mut last_timestamp = None; + iterable.take_while(move |speedtest| match last_timestamp { + Some(ts) if ts - speedtest.report.timestamp > lapse_cliff => false, + None | Some(_) => { + last_timestamp = Some(speedtest.report.timestamp); + true + } + }) +} + +const fn mbps(mbps: u64) -> u64 { + mbps * 125000 +} + +#[cfg(test)] +mod test { + use super::*; + use chrono::TimeZone; + use file_store::speedtest::CellSpeedtest; + + fn parse_dt(dt: &str) -> DateTime { + Utc.datetime_from_str(dt, "%Y-%m-%d %H:%M:%S %z") + .expect("unable_to_parse") + } + + fn bytes_per_s(mbps: u64) -> u64 { + mbps * 125000 + } + + #[test] + fn check_tier_cmp() { + assert_eq!( + SpeedtestTier::Acceptable.min(SpeedtestTier::Failed), + SpeedtestTier::Failed, + ); + } + + #[test] + fn check_known_valid() { + let speedtests = known_speedtests(); + assert_ne!( + SpeedtestAverage::from(&speedtests[0..5]).tier(), + SpeedtestTier::Acceptable, + ); + assert_eq!( + SpeedtestAverage::from(&speedtests[0..6]).tier(), + SpeedtestTier::Acceptable + ); + } + + #[test] + fn check_minimum_known_valid() { + let speedtests = known_speedtests(); + assert_ne!( + SpeedtestAverage::from(&speedtests[4..4]).tier(), + SpeedtestTier::Acceptable + ); + assert_eq!( + SpeedtestAverage::from(&speedtests[4..=5]).tier(), + SpeedtestTier::Acceptable + ); + assert_eq!( + SpeedtestAverage::from(&speedtests[4..=6]).tier(), + SpeedtestTier::Acceptable + ); + } + + #[test] + fn check_minimum_known_invalid() { + let speedtests = known_speedtests(); + assert_ne!( + SpeedtestAverage::from(&speedtests[5..6]).tier(), + SpeedtestTier::Acceptable + ); + } + + fn known_speedtests() -> Vec { + // This data is taken from the spreadsheet + // Timestamp DL UL Latency DL RA UL RA Latency RA Acceptable? + // 2022-08-02 18:00:00 70 30 40 103.33 19.17 30.00 TRUE + // 2022-08-02 12:00:00 100 10 30 116.67 17.50 35.00 TRUE + // 2022-08-02 6:00:00 130 20 10 100.00 15.83 30.00 TRUE + // 2022-08-02 0:00:00 90 15 10 94.00 15.00 34.00 FALSE + // 2022-08-01 18:00:00 112 30 40 95.00 15.00 40.00 FALSE + // 2022-08-01 12:00:00 118 10 50 89.33 10.00 40.00 FALSE + // 2022-08-01 6:00:00 150 20 70 75.00 10.00 35.00 FALSE + // 2022-08-01 0:00:00 0 0 0 0.00 0.00 0.00 FALSE* + let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed gw1 parse"); + + vec![ + default_cellspeedtest(gw1.clone(), parse_dt("2022-08-02 18:00:00 +0000"), 0, 0, 0), + default_cellspeedtest( + gw1.clone(), + parse_dt("2022-08-02 12:00:00 +0000"), + bytes_per_s(20), + bytes_per_s(150), + 70, + ), + default_cellspeedtest( + gw1.clone(), + parse_dt("2022-08-02 6:00:00 +0000"), + bytes_per_s(10), + bytes_per_s(118), + 50, + ), + default_cellspeedtest( + gw1.clone(), + parse_dt("2022-08-02 0:00:00 +0000"), + bytes_per_s(30), + bytes_per_s(112), + 40, + ), + default_cellspeedtest( + gw1.clone(), + parse_dt("2022-08-02 0:00:00 +0000"), + bytes_per_s(15), + bytes_per_s(90), + 10, + ), + default_cellspeedtest( + gw1.clone(), + parse_dt("2022-08-01 18:00:00 +0000"), + bytes_per_s(20), + bytes_per_s(130), + 10, + ), + default_cellspeedtest( + gw1.clone(), + parse_dt("2022-08-01 12:00:00 +0000"), + bytes_per_s(10), + bytes_per_s(100), + 30, + ), + default_cellspeedtest( + gw1, + parse_dt("2022-08-01 6:00:00 +0000"), + bytes_per_s(30), + bytes_per_s(70), + 40, + ), + ] + } + + #[test] + fn check_speedtest_without_lapsed() { + let speedtest_cutoff = Duration::hours(10); + let contiguos_speedtests = known_speedtests(); + let contiguous_speedtests = + speedtests_without_lapsed(contiguos_speedtests.iter(), speedtest_cutoff); + let pubkey: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed owner parse"); + let disjoint_speedtests = vec![ + default_cellspeedtest( + pubkey.clone(), + parse_dt("2022-08-02 6:00:00 +0000"), + bytes_per_s(20), + bytes_per_s(150), + 70, + ), + default_cellspeedtest( + pubkey.clone(), + parse_dt("2022-08-01 18:00:00 +0000"), + bytes_per_s(10), + bytes_per_s(118), + 50, + ), + default_cellspeedtest( + pubkey, + parse_dt("2022-08-01 12:00:00 +0000"), + bytes_per_s(30), + bytes_per_s(112), + 40, + ), + ]; + let disjoint_speedtests = + speedtests_without_lapsed(disjoint_speedtests.iter(), speedtest_cutoff); + + assert_eq!(contiguous_speedtests.count(), 8); + assert_eq!(disjoint_speedtests.count(), 1); + } + + fn default_cellspeedtest( + pubkey: PublicKeyBinary, + timestamp: DateTime, + upload_speed: u64, + download_speed: u64, + latency: u32, + ) -> Speedtest { + Speedtest { + report: CellSpeedtest { + pubkey, + timestamp, + upload_speed, + download_speed, + latency, + serial: "".to_string(), + }, + } + } +} diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 5a9aa9aab..c1fc11236 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -193,10 +193,10 @@ pub async fn aggregate_location_shares( #[allow(dead_code)] pub async fn clear_location_shares( tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, - reward_period: &Range>, + timestamp: &DateTime, ) -> Result<(), sqlx::Error> { sqlx::query("delete from subscriber_loc_verified where received_timestamp < $1") - .bind(reward_period.end) + .bind(timestamp) .execute(&mut *tx) .await?; Ok(()) From 67effc3558d2cd2c56ef07956dba0b19b02ffd95 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Fri, 18 Aug 2023 11:25:40 +0100 Subject: [PATCH 2/4] address review comments --- .../migrations/15_speedtests_one_to_one.sql | 6 ++-- mobile_verifier/src/reward_shares.rs | 14 +++----- mobile_verifier/src/rewarder.rs | 2 +- mobile_verifier/src/speedtests.rs | 19 ++++------- mobile_verifier/src/speedtests_average.rs | 33 ++++++++++--------- 5 files changed, 34 insertions(+), 40 deletions(-) diff --git a/mobile_verifier/migrations/15_speedtests_one_to_one.sql b/mobile_verifier/migrations/15_speedtests_one_to_one.sql index af6a185bc..59e1c943a 100644 --- a/mobile_verifier/migrations/15_speedtests_one_to_one.sql +++ b/mobile_verifier/migrations/15_speedtests_one_to_one.sql @@ -4,16 +4,16 @@ CREATE TABLE speedtests_migration ( upload_speed bigint, download_speed bigint, latency integer, - serial text, + "serial" text, timestamp timestamptz NOT NULL, inserted_at timestamptz default now(), PRIMARY KEY(pubkey, timestamp) ); -CREATE INDEX idx_speedtests_pubkey on speedtests_migration (pubkey); +CREATE INDEX idx_speedtests2_pubkey on speedtests_migration (pubkey); INSERT INTO speedtests_migration (pubkey, upload_speed, download_speed, latency, serial, timestamp) SELECT id, (st).upload_speed, (st).download_speed, (st).latency, '', (st).timestamp -FROM (select id, unnest(speedtests) as st from speedtests) as tmp +FROM (select id, unnest(speedtests) as st from speedtests_old) as tmp ON CONFLICT DO NOTHING; ALTER TABLE speedtests RENAME TO speedtests_old; diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index 67bcde84f..24a7887fe 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -1,9 +1,9 @@ -use crate::speedtests_average::SpeedtestAverages; use crate::{ - data_session::HotspotMap, heartbeats::HeartbeatReward, speedtests_average::SpeedtestAverage, + data_session::HotspotMap, + heartbeats::HeartbeatReward, + speedtests_average::{SpeedtestAverage, SpeedtestAverages}, subscriber_location::SubscriberValidatedLocations, }; - use chrono::{DateTime, Duration, Utc}; use file_store::traits::TimestampEncode; use futures::{Stream, StreamExt}; @@ -321,12 +321,8 @@ pub fn get_scheduled_tokens_for_mappers(duration: Duration) -> Decimal { mod test { use super::*; use crate::{ - cell_type::CellType, - data_session, - data_session::HotspotDataSession, - heartbeats::HeartbeatReward, - speedtests::Speedtest, - speedtests_average::{SpeedtestAverage, SpeedtestAverages}, + cell_type::CellType, data_session, data_session::HotspotDataSession, + heartbeats::HeartbeatReward, speedtests::Speedtest, subscriber_location::SubscriberValidatedLocations, }; use chrono::{Duration, Utc}; diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 42678d293..836ec3fac 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -215,7 +215,7 @@ impl Rewarder { // clear out the various db tables heartbeats::clear_heartbeats(&mut transaction, &reward_period.start).await?; - speedtests::clear_speedtests(&mut transaction, &reward_period.end).await?; + speedtests::clear_speedtests(&mut transaction, &reward_period.start).await?; data_session::clear_hotspot_data_sessions(&mut transaction, &reward_period.end).await?; // subscriber_location::clear_location_shares(&mut transaction, &reward_period.end).await?; diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index ae5b84456..d845c4674 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -29,7 +29,7 @@ impl FromRow<'_, PgRow> for Speedtest { Ok(Self { report: CellSpeedtest { pubkey: row.get::("pubkey"), - serial: row.get::("serial"), + serial: row.get::("serial_num"), upload_speed: row.get::("upload_speed") as u64, download_speed: row.get::("download_speed") as u64, timestamp: row.get::, &str>("timestamp"), @@ -114,7 +114,7 @@ pub async fn save_speedtest( ) -> Result<(), sqlx::Error> { sqlx::query( r#" - insert into speedtests (pubkey, upload_speed, download_speed, latency, serial, timestamp) + insert into speedtests (pubkey, upload_speed, download_speed, latency, serial_num, timestamp) values ($1, $2, $3, $4, $5, $6) on conflict (pubkey, timestamp) do nothing "#, @@ -134,18 +134,13 @@ pub async fn get_latest_speedtests_for_pubkey( pubkey: &PublicKeyBinary, exec: &mut Transaction<'_, Postgres>, ) -> Result, sqlx::Error> { - let mut speedtests = Vec::new(); - - let mut rows = sqlx::query_as::<_, Speedtest>( + let speedtests = sqlx::query_as::<_, Speedtest>( "SELECT * FROM speedtests where pubkey = $1 order by timestamp desc limit $2", ) .bind(pubkey) .bind(SPEEDTEST_AVG_MAX_DATA_POINTS as i64) - .fetch(exec); - - while let Some(speedtest) = rows.try_next().await? { - speedtests.push(speedtest); - } + .fetch_all(exec) + .await?; Ok(speedtests) } @@ -154,12 +149,12 @@ pub async fn aggregate_epoch_speedtests<'a>( exec: &sqlx::Pool, ) -> Result { let mut speedtests = EpochSpeedTests::new(); - // use latest speedtest which are no older than hours defined by SPEEDTEST_LAPSE + // use latest speedtest which are no older than N hours, defined by SPEEDTEST_LAPSE let start = epoch_end - Duration::hours(SPEEDTEST_LAPSE); // pull the last N most recent speedtests from prior to the epoch end for each pubkey let mut rows = sqlx::query_as::<_, Speedtest>( "select * from ( - SELECT distinct(pubkey), upload_speed, download_speed, latency, timestamp, serial, row_number() + SELECT distinct(pubkey), upload_speed, download_speed, latency, timestamp, serial_num, row_number() over (partition by pubkey order by timestamp desc) as count FROM speedtests where timestamp >= $1 and timestamp < $2 ) as tmp where count <= $3" diff --git a/mobile_verifier/src/speedtests_average.rs b/mobile_verifier/src/speedtests_average.rs index 23653b9d3..c39debc3f 100644 --- a/mobile_verifier/src/speedtests_average.rs +++ b/mobile_verifier/src/speedtests_average.rs @@ -1,8 +1,11 @@ use crate::speedtests::{self, Speedtest}; use chrono::{DateTime, Duration, Utc}; -use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; +use file_store::{ + file_sink::FileSinkClient, + traits::{MsgTimestamp, TimestampEncode}, +}; use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_mobile::{self as proto, SpeedtestAvgValidity}; +use helium_proto::services::poc_mobile as proto; use rust_decimal::Decimal; use rust_decimal_macros::dec; use std::collections::HashMap; @@ -22,7 +25,7 @@ pub struct SpeedtestAverage { pub upload_speed_avg_bps: u64, pub download_speed_avg_bps: u64, pub latency_avg_ms: u32, - pub validity: SpeedtestAvgValidity, + pub validity: proto::SpeedtestAvgValidity, pub reward_multiplier: Decimal, } @@ -106,7 +109,7 @@ impl SpeedtestAverage { Duration::hours(SPEEDTEST_LAPSE), ) .map(|st| proto::Speedtest { - timestamp: st.report.timestamp.timestamp() as u64, + timestamp: st.report.timestamp(), upload_speed_bps: st.report.upload_speed, download_speed_bps: st.report.download_speed, latency_ms: st.report.latency, @@ -124,17 +127,6 @@ impl SpeedtestAverage { pub fn reward_multiplier(&self) -> Decimal { self.reward_multiplier } - - #[allow(dead_code)] - // function used by tests only - pub fn tier(&self) -> SpeedtestTier { - calculate_tier( - self.window_size, - self.upload_speed_avg_bps, - self.download_speed_avg_bps, - self.latency_avg_ms, - ) - } } #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] @@ -293,6 +285,17 @@ mod test { use chrono::TimeZone; use file_store::speedtest::CellSpeedtest; + impl SpeedtestAverage { + pub fn tier(&self) -> SpeedtestTier { + calculate_tier( + self.window_size, + self.upload_speed_avg_bps, + self.download_speed_avg_bps, + self.latency_avg_ms, + ) + } + } + fn parse_dt(dt: &str) -> DateTime { Utc.datetime_from_str(dt, "%Y-%m-%d %H:%M:%S %z") .expect("unable_to_parse") From 6986efdc98580ff00a69959698d806d6f202852d Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Tue, 22 Aug 2023 12:00:15 -0400 Subject: [PATCH 3/4] pass by reference when possible --- mobile_verifier/src/speedtests.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index d845c4674..f0bded1f7 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -88,16 +88,16 @@ impl SpeedtestDaemon { let mut transaction = self.pool.begin().await?; let mut speedtests = file.into_stream(&mut transaction).await?; while let Some(speedtest_report) = speedtests.next().await { - let pubkey = speedtest_report.report.pubkey.clone(); + let pubkey = &speedtest_report.report.pubkey; if self .gateway_client - .resolve_gateway_info(&pubkey) + .resolve_gateway_info(pubkey) .await? .is_some() { save_speedtest(&speedtest_report.report, &mut transaction).await?; let latest_speedtests = - get_latest_speedtests_for_pubkey(&pubkey, &mut transaction).await?; + get_latest_speedtests_for_pubkey(pubkey, &mut transaction).await?; let average = SpeedtestAverage::from(&latest_speedtests); average.write(&self.file_sink, latest_speedtests).await?; } @@ -119,11 +119,11 @@ pub async fn save_speedtest( on conflict (pubkey, timestamp) do nothing "#, ) - .bind(speedtest.pubkey.clone()) + .bind(&speedtest.pubkey) .bind(speedtest.upload_speed as i64) .bind(speedtest.download_speed as i64) .bind(speedtest.latency as i64) - .bind(speedtest.serial.clone()) + .bind(&speedtest.serial) .bind(speedtest.timestamp) .execute(exec) .await?; From 28fc25980f6bdc2539e65f4025045159dbafad59 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Wed, 23 Aug 2023 15:55:35 +0100 Subject: [PATCH 4/4] fix accidently pushed changes --- mobile_verifier/migrations/15_speedtests_one_to_one.sql | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mobile_verifier/migrations/15_speedtests_one_to_one.sql b/mobile_verifier/migrations/15_speedtests_one_to_one.sql index 59e1c943a..4ead44e03 100644 --- a/mobile_verifier/migrations/15_speedtests_one_to_one.sql +++ b/mobile_verifier/migrations/15_speedtests_one_to_one.sql @@ -4,16 +4,16 @@ CREATE TABLE speedtests_migration ( upload_speed bigint, download_speed bigint, latency integer, - "serial" text, + serial_num text, timestamp timestamptz NOT NULL, inserted_at timestamptz default now(), PRIMARY KEY(pubkey, timestamp) ); -CREATE INDEX idx_speedtests2_pubkey on speedtests_migration (pubkey); +CREATE INDEX idx_speedtests_pubkey on speedtests_migration (pubkey); -INSERT INTO speedtests_migration (pubkey, upload_speed, download_speed, latency, serial, timestamp) +INSERT INTO speedtests_migration (pubkey, upload_speed, download_speed, latency, serial_num, timestamp) SELECT id, (st).upload_speed, (st).download_speed, (st).latency, '', (st).timestamp -FROM (select id, unnest(speedtests) as st from speedtests_old) as tmp +FROM (select id, unnest(speedtests) as st from speedtests) as tmp ON CONFLICT DO NOTHING; ALTER TABLE speedtests RENAME TO speedtests_old;