From 6ef491b27ae819d2ffe913bf0d12e571ddcdff42 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Wed, 26 Jul 2023 17:01:05 +0100 Subject: [PATCH] refactor speedtests handling, ensure rewards only include tests up until epoch end --- mobile_verifier/src/cli/reward_from_db.rs | 14 +- mobile_verifier/src/lib.rs | 1 + mobile_verifier/src/reward_shares.rs | 114 ++-- mobile_verifier/src/rewarder.rs | 8 +- mobile_verifier/src/speedtests.rs | 713 ++++------------------ mobile_verifier/src/speedtests_average.rs | 471 ++++++++++++++ 6 files changed, 677 insertions(+), 644 deletions(-) create mode 100644 mobile_verifier/src/speedtests_average.rs diff --git a/mobile_verifier/src/cli/reward_from_db.rs b/mobile_verifier/src/cli/reward_from_db.rs index 0246cc80f..85e8d5cce 100644 --- a/mobile_verifier/src/cli/reward_from_db.rs +++ b/mobile_verifier/src/cli/reward_from_db.rs @@ -1,7 +1,7 @@ use crate::{ heartbeats::HeartbeatReward, reward_shares::{get_scheduled_tokens_for_poc_and_dc, PocShares}, - speedtests::{Average, SpeedtestAverages}, + speedtests_average::SpeedtestAverages, Settings, }; use anyhow::Result; @@ -39,8 +39,8 @@ impl Cmd { .await?; let heartbeats = HeartbeatReward::validated(&pool, &epoch); - let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?; - let reward_shares = PocShares::aggregate(heartbeats, speedtests.clone()).await?; + let averages = SpeedtestAverages::aggregate_epoch_averages(epoch.end, &pool).await?; + let reward_shares = PocShares::aggregate(heartbeats, &averages).await?; let mut total_rewards = 0_u64; let mut owner_rewards = HashMap::<_, u64>::new(); @@ -62,11 +62,11 @@ impl Cmd { } let rewards: Vec<_> = owner_rewards.into_iter().collect(); let mut multiplier_count = HashMap::<_, usize>::new(); - let speedtest_multipliers: Vec<_> = speedtests - .speedtests + let speedtest_multipliers: Vec<_> = averages + .averages .into_iter() - .map(|(pub_key, avg)| { - let reward_multiplier = Average::from(&avg).reward_multiplier(); + .map(|(pub_key, average)| { + let reward_multiplier = average.reward_multiplier; *multiplier_count.entry(reward_multiplier).or_default() += 1; (pub_key, reward_multiplier) }) diff --git a/mobile_verifier/src/lib.rs b/mobile_verifier/src/lib.rs index 84d891206..8f6252359 100644 --- a/mobile_verifier/src/lib.rs +++ b/mobile_verifier/src/lib.rs @@ -4,6 +4,7 @@ mod heartbeats; mod reward_shares; mod settings; mod speedtests; +mod speedtests_average; mod subscriber_location; mod telemetry; diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index c59101e9c..5df86d7fb 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -1,7 +1,6 @@ +use crate::speedtests_average::SpeedtestAverages; use crate::{ - data_session::HotspotMap, - heartbeats::HeartbeatReward, - speedtests::{Average, SpeedtestAverages}, + data_session::HotspotMap, heartbeats::HeartbeatReward, speedtests_average::SpeedtestAverage, subscriber_location::SubscriberValidatedLocations, }; @@ -219,17 +218,17 @@ pub struct PocShares { } impl PocShares { - pub async fn aggregate( + pub async fn aggregate<'a>( heartbeats: impl Stream>, - speedtests: SpeedtestAverages, + averages: &SpeedtestAverages, ) -> Result { let mut poc_shares = Self::default(); let mut heartbeats = std::pin::pin!(heartbeats); while let Some(heartbeat) = heartbeats.next().await.transpose()? { - let speedmultiplier = speedtests - .get_average(&heartbeat.hotspot_key) - .as_ref() - .map_or(Decimal::ZERO, Average::reward_multiplier); + let speedmultiplier = averages + .averages + .get(&heartbeat.hotspot_key) + .map_or(Decimal::ZERO, SpeedtestAverage::reward_multiplier); *poc_shares .hotspot_shares .entry(heartbeat.hotspot_key) @@ -326,14 +325,15 @@ mod test { data_session, data_session::HotspotDataSession, heartbeats::HeartbeatReward, - speedtests::{Speedtest, SpeedtestAverages}, + speedtests::Speedtest, + speedtests_average::{SpeedtestAverage, SpeedtestAverages}, subscriber_location::SubscriberValidatedLocations, }; use chrono::{Duration, Utc}; use futures::stream; use helium_proto::services::poc_mobile::mobile_reward_share::Reward as MobileReward; use prost::Message; - use std::collections::{HashMap, VecDeque}; + use std::collections::HashMap; fn valid_shares() -> RadioShares { let mut radio_shares: HashMap = Default::default(); @@ -536,8 +536,9 @@ mod test { .reward_weight() } - fn acceptable_speedtest(timestamp: DateTime) -> Speedtest { + fn acceptable_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { + pubkey, timestamp, upload_speed: bytes_per_s(10), download_speed: bytes_per_s(100), @@ -545,8 +546,9 @@ mod test { } } - fn degraded_speedtest(timestamp: DateTime) -> Speedtest { + fn degraded_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { + pubkey, timestamp, upload_speed: bytes_per_s(5), download_speed: bytes_per_s(60), @@ -554,8 +556,9 @@ mod test { } } - fn failed_speedtest(timestamp: DateTime) -> Speedtest { + fn failed_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { + pubkey, timestamp, upload_speed: bytes_per_s(1), download_speed: bytes_per_s(20), @@ -563,8 +566,9 @@ mod test { } } - fn poor_speedtest(timestamp: DateTime) -> Speedtest { + fn poor_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime) -> Speedtest { Speedtest { + pubkey, timestamp, upload_speed: bytes_per_s(2), download_speed: bytes_per_s(40), @@ -611,19 +615,21 @@ mod test { let last_timestamp = timestamp - Duration::hours(12); let g1_speedtests = vec![ - acceptable_speedtest(last_timestamp), - acceptable_speedtest(timestamp), + acceptable_speedtest(g1.clone(), last_timestamp), + acceptable_speedtest(g1.clone(), timestamp), ]; let g2_speedtests = vec![ - acceptable_speedtest(last_timestamp), - acceptable_speedtest(timestamp), + acceptable_speedtest(g2.clone(), last_timestamp), + acceptable_speedtest(g2.clone(), timestamp), ]; - let mut speedtests = HashMap::new(); - speedtests.insert(g1.clone(), VecDeque::from(g1_speedtests)); - speedtests.insert(g2.clone(), VecDeque::from(g2_speedtests)); - let speedtest_avgs = SpeedtestAverages { speedtests }; - - let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs) + let g1_average = SpeedtestAverage::from(&g1_speedtests); + let g2_average = SpeedtestAverage::from(&g2_speedtests); + let mut averages = HashMap::new(); + averages.insert(g1.clone(), g1_average); + averages.insert(g2.clone(), g2_average); + let speedtest_avgs = SpeedtestAverages { averages }; + + let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs) .await .unwrap(); @@ -779,44 +785,56 @@ mod test { // setup speedtests let last_speedtest = timestamp - Duration::hours(12); let gw1_speedtests = vec![ - acceptable_speedtest(last_speedtest), - acceptable_speedtest(timestamp), + acceptable_speedtest(gw1.clone(), last_speedtest), + acceptable_speedtest(gw1.clone(), timestamp), ]; let gw2_speedtests = vec![ - acceptable_speedtest(last_speedtest), - acceptable_speedtest(timestamp), + acceptable_speedtest(gw2.clone(), last_speedtest), + acceptable_speedtest(gw2.clone(), timestamp), ]; let gw3_speedtests = vec![ - acceptable_speedtest(last_speedtest), - acceptable_speedtest(timestamp), + acceptable_speedtest(gw3.clone(), last_speedtest), + acceptable_speedtest(gw3.clone(), timestamp), ]; let gw4_speedtests = vec![ - acceptable_speedtest(last_speedtest), - acceptable_speedtest(timestamp), + acceptable_speedtest(gw4.clone(), last_speedtest), + acceptable_speedtest(gw4.clone(), timestamp), ]; let gw5_speedtests = vec![ - degraded_speedtest(last_speedtest), - degraded_speedtest(timestamp), + degraded_speedtest(gw5.clone(), last_speedtest), + degraded_speedtest(gw5.clone(), timestamp), ]; let gw6_speedtests = vec![ - failed_speedtest(last_speedtest), - failed_speedtest(timestamp), + failed_speedtest(gw6.clone(), last_speedtest), + failed_speedtest(gw6.clone(), timestamp), + ]; + let gw7_speedtests = vec![ + poor_speedtest(gw7.clone(), last_speedtest), + poor_speedtest(gw7.clone(), timestamp), ]; - let gw7_speedtests = vec![poor_speedtest(last_speedtest), poor_speedtest(timestamp)]; - let mut speedtests = HashMap::new(); - speedtests.insert(gw1, VecDeque::from(gw1_speedtests)); - speedtests.insert(gw2, VecDeque::from(gw2_speedtests)); - speedtests.insert(gw3, VecDeque::from(gw3_speedtests)); - speedtests.insert(gw4, VecDeque::from(gw4_speedtests)); - speedtests.insert(gw5, VecDeque::from(gw5_speedtests)); - speedtests.insert(gw6, VecDeque::from(gw6_speedtests)); - speedtests.insert(gw7, VecDeque::from(gw7_speedtests)); - let speedtest_avgs = SpeedtestAverages { speedtests }; + + let gw1_average = SpeedtestAverage::from(&gw1_speedtests); + let gw2_average = SpeedtestAverage::from(&gw2_speedtests); + let gw3_average = SpeedtestAverage::from(&gw3_speedtests); + let gw4_average = SpeedtestAverage::from(&gw4_speedtests); + let gw5_average = SpeedtestAverage::from(&gw5_speedtests); + let gw6_average = SpeedtestAverage::from(&gw6_speedtests); + let gw7_average = SpeedtestAverage::from(&gw7_speedtests); + let mut averages = HashMap::new(); + averages.insert(gw1.clone(), gw1_average); + averages.insert(gw2.clone(), gw2_average); + averages.insert(gw3.clone(), gw3_average); + averages.insert(gw4.clone(), gw4_average); + averages.insert(gw5.clone(), gw5_average); + averages.insert(gw6.clone(), gw6_average); + averages.insert(gw7.clone(), gw7_average); + + let speedtest_avgs = SpeedtestAverages { averages }; // calculate the rewards for the sample group let mut owner_rewards = HashMap::::new(); let epoch = (now - Duration::hours(1))..now; - for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs) + for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs) .await .unwrap() .into_rewards(Decimal::ZERO, &epoch) diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 49ca11ef1..8c4396a73 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -2,7 +2,7 @@ use crate::{ data_session, heartbeats::HeartbeatReward, reward_shares::{MapperShares, PocShares, TransferRewards}, - speedtests::SpeedtestAverages, + speedtests_average::SpeedtestAverages, subscriber_location, telemetry, }; use anyhow::bail; @@ -143,9 +143,9 @@ impl Rewarder { ); let heartbeats = HeartbeatReward::validated(&self.pool, reward_period); - let speedtests = SpeedtestAverages::validated(&self.pool, reward_period.end).await?; - - let poc_rewards = PocShares::aggregate(heartbeats, speedtests).await?; + let averages = + SpeedtestAverages::aggregate_epoch_averages(reward_period.end, &self.pool).await?; + let poc_rewards = PocShares::aggregate(heartbeats, &averages).await?; let mobile_price = self .price_tracker .price(&helium_proto::BlockchainTokenTypeV1::Mobile) diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index db70e7fae..96dd299de 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -1,35 +1,24 @@ -use chrono::{DateTime, Duration, Utc}; +use crate::speedtests_average::SpeedtestAverage; +use chrono::{DateTime, Utc}; use file_store::{ - file_info_poller::FileInfoStream, - file_sink::{self, FileSinkClient}, - speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, - traits::TimestampEncode, -}; -use futures::{ - stream::{Stream, StreamExt, TryStreamExt}, - TryFutureExt, + file_info_poller::FileInfoStream, file_sink::FileSinkClient, + speedtest::CellSpeedtestIngestReport, }; +use futures::{stream, StreamExt, TryFutureExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; -use helium_proto::services::poc_mobile as proto; -use mobile_config::{client::ClientError, gateway_info::GatewayInfoResolver, GatewayClient}; -use rust_decimal::Decimal; -use rust_decimal_macros::dec; -use sqlx::{ - postgres::{types::PgHasArrayType, PgTypeInfo}, - FromRow, Postgres, Transaction, Type, -}; -use std::{ - collections::{HashMap, VecDeque}, - pin::pin, -}; +use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient}; +use sqlx::{FromRow, Postgres, Transaction, Type}; +use std::collections::HashMap; use tokio::sync::mpsc::Receiver; const SPEEDTEST_AVG_MAX_DATA_POINTS: usize = 6; -const SPEEDTEST_LAPSE: i64 = 48; -#[derive(Debug, Clone, Type)] +pub type EpochSpeedTests = HashMap>; + +#[derive(Debug, Clone, Type, FromRow)] #[sqlx(type_name = "speedtest")] pub struct Speedtest { + pub pubkey: PublicKeyBinary, pub timestamp: DateTime, pub upload_speed: i64, pub download_speed: i64, @@ -39,12 +28,14 @@ pub struct Speedtest { impl Speedtest { #[cfg(test)] pub fn new( + pubkey: PublicKeyBinary, timestamp: DateTime, upload_speed: i64, download_speed: i64, latency: i32, ) -> Self { Self { + pubkey, timestamp, upload_speed, download_speed, @@ -96,580 +87,132 @@ impl SpeedtestDaemon { async fn process_file( &self, - file: FileInfoStream, + file_info_stream: FileInfoStream, ) -> anyhow::Result<()> { - tracing::info!("Processing speedtest file {}", file.file_info.key); - let mut transaction = self.pool.begin().await?; - let reports = file.into_stream(&mut transaction).await?; - - let mut validated_speedtests = pin!( - SpeedtestRollingAverage::validate_speedtests( - &self.gateway_client, - reports.map(|s| s.report), - &mut transaction, - ) + // process the speedtest reports from the file, if valid insert to the db + // collect a list of pubkeys from valid reports + // and for each such pubkey, recalcuate a new average + + // TODO: remove `gateways_to_average` from fold accumulator + // let gateways_to_average = Vec::::new(); + let (gateways_to_average, transaction) = file_info_stream + .into_stream(&mut transaction) .await? - ); - while let Some(speedtest) = validated_speedtests.next().await.transpose()? { - speedtest.write(&self.file_sink).await?; - speedtest.save(&mut transaction).await?; - } - - self.file_sink.commit().await?; - transaction.commit().await?; - - Ok(()) - } -} - -impl From for Speedtest { - fn from(cell_speedtest: CellSpeedtest) -> Self { - Self { - timestamp: cell_speedtest.timestamp, - upload_speed: cell_speedtest.upload_speed as i64, - download_speed: cell_speedtest.download_speed as i64, - latency: cell_speedtest.latency as i32, - } - } -} - -impl PgHasArrayType for Speedtest { - fn array_type_info() -> PgTypeInfo { - PgTypeInfo::with_name("_speedtest") - } -} - -#[derive(FromRow)] -pub struct SpeedtestRollingAverage { - pub id: PublicKeyBinary, - pub speedtests: Vec, - pub latest_timestamp: DateTime, -} - -impl SpeedtestRollingAverage { - pub fn new(id: PublicKeyBinary) -> Self { - Self { - id, - speedtests: Vec::new(), - latest_timestamp: DateTime::::default(), - } - } - - pub async fn validate_speedtests<'a>( - gateway_client: &'a GatewayClient, - speedtests: impl Stream + 'a, - exec: &mut Transaction<'_, Postgres>, - ) -> Result> + 'a, sqlx::Error> { - let tests_by_publickey = speedtests - .fold( - HashMap::>::new(), - |mut map, cell_speedtest| async move { - map.entry(cell_speedtest.pubkey.clone()) - .or_default() - .push(cell_speedtest); - map + .map(anyhow::Ok) + .try_fold( + (Vec::::new(), transaction), + |(mut gateways_to_average, mut transaction), report| async move { + let pubkey = report.report.pubkey.clone(); + if self + .gateway_client + .resolve_gateway_info(&pubkey) + .await + .is_ok() + { + save_speedtest_to_db(report, &mut transaction).await?; + // below is an o(n) op but the vec size will be limited + // todo: consider alternative + if !gateways_to_average.contains(&pubkey) { + &gateways_to_average.push(pubkey) + } else { + &() + }; + }; + Ok((gateways_to_average, transaction)) }, ) - .await; - - let mut speedtests = Vec::new(); - for (pubkey, cell_speedtests) in tests_by_publickey.into_iter() { - let rolling_average: SpeedtestRollingAverage = - sqlx::query_as::<_, SpeedtestRollingAverage>( - "SELECT * FROM speedtests WHERE id = $1", - ) - .bind(&pubkey) - .fetch_optional(&mut *exec) - .await? - .unwrap_or_else(|| SpeedtestRollingAverage::new(pubkey.clone())); - speedtests.push((rolling_average, cell_speedtests)); - } - - Ok(futures::stream::iter(speedtests.into_iter()) - .then(move |(rolling_average, cell_speedtests)| { - async move { - // If we get back some gateway info for the given address, it's a valid address - if gateway_client - .resolve_gateway_info(&rolling_average.id) - .await? - .is_none() - { - return Ok(None); - } - Ok(Some((rolling_average, cell_speedtests))) - } - }) - .filter_map(|item| async move { item.transpose() }) - .map_ok(|(rolling_average, cell_speedtests)| { - let speedtests = cell_speedtests - .into_iter() - .map(Speedtest::from) - .chain(rolling_average.speedtests.into_iter()) - .take(SPEEDTEST_AVG_MAX_DATA_POINTS) - .collect::>(); - - Self { - id: rolling_average.id, - latest_timestamp: speedtests[0].timestamp, - speedtests, - } - })) - } - - pub async fn save(self, exec: impl sqlx::PgExecutor<'_>) -> Result { - #[derive(FromRow)] - struct SaveResult { - inserted: bool, - } - - sqlx::query_as::<_, SaveResult>( - r#" - insert into speedtests (id, speedtests, latest_timestamp) - values ($1, $2, $3) - on conflict (id) do update set - speedtests = EXCLUDED.speedtests, latest_timestamp = EXCLUDED.latest_timestamp - returning (xmax = 0) as inserted - "#, - ) - .bind(self.id) - .bind(self.speedtests) - .bind(self.latest_timestamp) - .fetch_one(exec) - .await - .map(|result| result.inserted) - } + .await?; + // commit the speedtests to the db + transaction.commit().await?; - pub async fn write(&self, averages: &file_sink::FileSinkClient) -> file_store::Result { - // Write out the speedtests to S3 - let average = Average::from(&self.speedtests); - let validity = average.validity(); - // this is guaratneed to safely convert and not panic as it can only be one of - // four possible decimal values based on the speedtest average tier - let reward_multiplier = average.reward_multiplier().try_into().unwrap(); - let Average { - upload_speed_avg_bps, - download_speed_avg_bps, - latency_avg_ms, - .. - } = average; - averages - .write( - proto::SpeedtestAvg { - pub_key: self.id.clone().into(), - upload_speed_avg_bps, - download_speed_avg_bps, - latency_avg_ms, - timestamp: Utc::now().encode_timestamp(), - speedtests: speedtests_without_lapsed( - self.speedtests.iter(), - Duration::hours(SPEEDTEST_LAPSE), - ) - .map(|st| proto::Speedtest { - timestamp: st.timestamp.timestamp() as u64, - upload_speed_bps: st.upload_speed as u64, - download_speed_bps: st.download_speed as u64, - latency_ms: st.latency as u32, - }) - .collect(), - validity: validity as i32, - reward_multiplier, + // the processed speedtests are committed to the DB + // so now calculate the latest averages for each gateway + // from which we recevied a new and valid speedtest + let averages_transaction = self.pool.begin().await?; + stream::iter(gateways_to_average) + .map(anyhow::Ok) + .try_fold( + averages_transaction, + |mut averages_transaction, pubkey| async move { + let latest_speedtests: Vec = + get_latest_speedtests_for_pubkey(&pubkey, &mut averages_transaction) + .await?; + let average = SpeedtestAverage::from(&latest_speedtests); + average.write(&self.file_sink, latest_speedtests).await?; + Ok(averages_transaction) }, - &[("validity", validity.as_str_name())], ) + .await? + .commit() .await?; + self.file_sink.commit().await?; Ok(()) } } -#[derive(Clone, Default)] -pub struct SpeedtestAverages { - // I'm not sure that VecDeque is actually all that useful here, considering - // we have to constantly convert between the two. - // It does make me more confident in the implementation of validate_speedtests - // though. - pub speedtests: HashMap>, -} - -impl SpeedtestAverages { - #[allow(dead_code)] - pub fn into_iter(self) -> impl IntoIterator { - self.speedtests - .into_iter() - .map(|(id, window)| SpeedtestRollingAverage { - id, - // window is guaranteed to be non-empty. For safety, we set the - // latest timestamp to epoch. - latest_timestamp: window - .front() - .map_or_else(DateTime::::default, |st| st.timestamp), - speedtests: Vec::from(window), - }) - } - - pub fn get_average(&self, pub_key: &PublicKeyBinary) -> Option { - self.speedtests.get(pub_key).map(Average::from) - } - - pub async fn validated( - exec: impl sqlx::PgExecutor<'_> + Copy, - period_end: DateTime, - ) -> Result { - let mut speedtests = HashMap::new(); - - let mut rows = sqlx::query_as::<_, SpeedtestRollingAverage>( - "SELECT * FROM speedtests where latest_timestamp >= $1", - ) - .bind((period_end - Duration::hours(SPEEDTEST_LAPSE)).naive_utc()) - .fetch(exec); - - while let Some(SpeedtestRollingAverage { - id, - speedtests: window, - .. - }) = rows.try_next().await? - { - speedtests.insert(id, VecDeque::from(window)); - } - - Ok(Self { speedtests }) - } -} - -impl Extend for SpeedtestAverages { - fn extend(&mut self, iter: T) - where - T: IntoIterator, - { - for SpeedtestRollingAverage { id, speedtests, .. } in iter.into_iter() { - self.speedtests.insert(id, VecDeque::from(speedtests)); - } - } -} - -fn speedtests_without_lapsed<'a>( - iterable: impl Iterator, - lapse_cliff: Duration, -) -> impl Iterator { - let mut last_timestamp = None; - iterable.take_while(move |speedtest| match last_timestamp { - Some(ts) if ts - speedtest.timestamp > lapse_cliff => false, - None | Some(_) => { - last_timestamp = Some(speedtest.timestamp); - true - } - }) -} - -#[derive(Clone, Debug, Default)] -pub struct Average { - pub window_size: usize, - pub upload_speed_avg_bps: u64, - pub download_speed_avg_bps: u64, - pub latency_avg_ms: u32, -} - -impl<'a, I: ?Sized> From<&'a I> for Average -where - &'a I: IntoIterator, -{ - fn from(iter: &'a I) -> Self { - let mut window_size = 0; - let mut sum_upload = 0; - let mut sum_download = 0; - let mut sum_latency = 0; - - for Speedtest { - upload_speed, - download_speed, - latency, - .. - } in speedtests_without_lapsed(iter.into_iter(), Duration::hours(SPEEDTEST_LAPSE)) - { - sum_upload += *upload_speed as u64; - sum_download += *download_speed as u64; - sum_latency += *latency as u32; - window_size += 1; - } - - if window_size > 0 { - Average { - window_size, - upload_speed_avg_bps: sum_upload / window_size as u64, - download_speed_avg_bps: sum_download / window_size as u64, - latency_avg_ms: sum_latency / window_size as u32, - } - } else { - Average::default() - } - } -} - -const MIN_DOWNLOAD: u64 = mbps(30); -const MIN_UPLOAD: u64 = mbps(2); -const MAX_LATENCY: u32 = 100; -pub const MIN_REQUIRED_SAMPLES: usize = 2; - -impl Average { - // TODO: Change this to a multiplier - pub fn validity(&self) -> proto::SpeedtestAvgValidity { - if self.window_size < MIN_REQUIRED_SAMPLES { - return proto::SpeedtestAvgValidity::TooFewSamples; - } - if self.download_speed_avg_bps < MIN_DOWNLOAD { - return proto::SpeedtestAvgValidity::SlowDownloadSpeed; - } - if self.upload_speed_avg_bps < MIN_UPLOAD { - return proto::SpeedtestAvgValidity::SlowUploadSpeed; - } - if self.latency_avg_ms > MAX_LATENCY { - return proto::SpeedtestAvgValidity::HighLatency; - } - proto::SpeedtestAvgValidity::Valid - } - - pub fn tier(&self) -> SpeedtestTier { - if self.window_size < MIN_REQUIRED_SAMPLES { - SpeedtestTier::Failed - } else { - SpeedtestTier::from_download_speed(self.download_speed_avg_bps) - .min(SpeedtestTier::from_upload_speed(self.upload_speed_avg_bps)) - .min(SpeedtestTier::from_latency(self.latency_avg_ms)) - } - } - - pub fn reward_multiplier(&self) -> Decimal { - self.tier().into_multiplier() - } -} - -const fn mbps(mbps: u64) -> u64 { - mbps * 125000 -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub enum SpeedtestTier { - Failed = 0, - Poor = 1, - Degraded = 2, - Acceptable = 3, -} - -impl SpeedtestTier { - fn into_multiplier(self) -> Decimal { - match self { - Self::Acceptable => dec!(1.0), - Self::Degraded => dec!(0.5), - Self::Poor => dec!(0.25), - Self::Failed => dec!(0.0), - } - } - - fn from_download_speed(download_speed: u64) -> Self { - if download_speed >= mbps(100) { - Self::Acceptable - } else if download_speed >= mbps(50) { - Self::Degraded - } else if download_speed >= mbps(30) { - Self::Poor - } else { - Self::Failed - } - } - - fn from_upload_speed(upload_speed: u64) -> Self { - if upload_speed >= mbps(10) { - Self::Acceptable - } else if upload_speed >= mbps(5) { - Self::Degraded - } else if upload_speed >= mbps(2) { - Self::Poor - } else { - Self::Failed - } - } - - fn from_latency(latency: u32) -> Self { - if latency <= 50 { - Self::Acceptable - } else if latency <= 75 { - Self::Degraded - } else if latency <= 100 { - Self::Poor - } else { - Self::Failed - } - } -} - -#[cfg(test)] -mod test { - use super::*; - use chrono::TimeZone; - - fn parse_dt(dt: &str) -> DateTime { - Utc.datetime_from_str(dt, "%Y-%m-%d %H:%M:%S %z") - .expect("unable_to_parse") - } - - fn bytes_per_s(mbps: i64) -> i64 { - mbps * 125000 - } - - fn known_speedtests() -> Vec { - // This data is taken from the spreadsheet - // Timestamp DL UL Latency DL RA UL RA Latency RA Acceptable? - // 2022-08-02 18:00:00 70 30 40 103.33 19.17 30.00 TRUE - // 2022-08-02 12:00:00 100 10 30 116.67 17.50 35.00 TRUE - // 2022-08-02 6:00:00 130 20 10 100.00 15.83 30.00 TRUE - // 2022-08-02 0:00:00 90 15 10 94.00 15.00 34.00 FALSE - // 2022-08-01 18:00:00 112 30 40 95.00 15.00 40.00 FALSE - // 2022-08-01 12:00:00 118 10 50 89.33 10.00 40.00 FALSE - // 2022-08-01 6:00:00 150 20 70 75.00 10.00 35.00 FALSE - // 2022-08-01 0:00:00 0 0 0 0.00 0.00 0.00 FALSE* - vec![ - Speedtest::new(parse_dt("2022-08-02 18:00:00 +0000"), 0, 0, 0), - Speedtest::new( - parse_dt("2022-08-02 12:00:00 +0000"), - bytes_per_s(20), - bytes_per_s(150), - 70, - ), - Speedtest::new( - parse_dt("2022-08-02 6:00:00 +0000"), - bytes_per_s(10), - bytes_per_s(118), - 50, - ), - Speedtest::new( - parse_dt("2022-08-02 0:00:00 +0000"), - bytes_per_s(30), - bytes_per_s(112), - 40, - ), - Speedtest::new( - parse_dt("2022-08-02 0:00:00 +0000"), - bytes_per_s(15), - bytes_per_s(90), - 10, - ), - Speedtest::new( - parse_dt("2022-08-01 18:00:00 +0000"), - bytes_per_s(20), - bytes_per_s(130), - 10, - ), - Speedtest::new( - parse_dt("2022-08-01 12:00:00 +0000"), - bytes_per_s(10), - bytes_per_s(100), - 30, - ), - Speedtest::new( - parse_dt("2022-08-01 6:00:00 +0000"), - bytes_per_s(30), - bytes_per_s(70), - 40, - ), - ] - } - - #[test] - fn check_tier_cmp() { - assert_eq!( - SpeedtestTier::Acceptable.min(SpeedtestTier::Failed), - SpeedtestTier::Failed, - ); - } - - #[test] - fn check_known_valid() { - let speedtests = known_speedtests(); - assert_ne!( - Average::from(&speedtests[0..5]).tier(), - SpeedtestTier::Acceptable, - ); - assert_eq!( - Average::from(&speedtests[0..6]).tier(), - SpeedtestTier::Acceptable - ); - } - - #[test] - fn check_minimum_known_valid() { - let speedtests = known_speedtests(); - assert_ne!( - Average::from(&speedtests[4..4]).tier(), - SpeedtestTier::Acceptable - ); - assert_eq!( - Average::from(&speedtests[4..=5]).tier(), - SpeedtestTier::Acceptable - ); - assert_eq!( - Average::from(&speedtests[4..=6]).tier(), - SpeedtestTier::Acceptable - ); - } - - #[test] - fn check_minimum_known_invalid() { - let speedtests = known_speedtests(); - assert_ne!( - Average::from(&speedtests[5..6]).tier(), - SpeedtestTier::Acceptable - ); - } - - #[test] - fn check_speedtest_rolling_avg() { - let owner: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" - .parse() - .expect("failed owner parse"); - let speedtests = VecDeque::from(known_speedtests()); - let avgs = SpeedtestAverages { - speedtests: HashMap::from([(owner, speedtests)]), - } - .into_iter(); - for avg in avgs { - if let Some(first) = avg.speedtests.first() { - assert_eq!(avg.latest_timestamp, first.timestamp); - } - } - } - - #[test] - fn check_speedtest_without_lapsed() { - let speedtest_cutoff = Duration::hours(10); - let contiguos_speedtests = known_speedtests(); - let contiguous_speedtests = - speedtests_without_lapsed(contiguos_speedtests.iter(), speedtest_cutoff); - - let disjoint_speedtests = vec![ - Speedtest::new( - parse_dt("2022-08-02 6:00:00 +0000"), - bytes_per_s(20), - bytes_per_s(150), - 70, - ), - Speedtest::new( - parse_dt("2022-08-01 18:00:00 +0000"), - bytes_per_s(10), - bytes_per_s(118), - 50, - ), - Speedtest::new( - parse_dt("2022-08-01 12:00:00 +0000"), - bytes_per_s(30), - bytes_per_s(112), - 40, - ), - ]; - let disjoint_speedtests = - speedtests_without_lapsed(disjoint_speedtests.iter(), speedtest_cutoff); - - assert_eq!(contiguous_speedtests.count(), 8); - assert_eq!(disjoint_speedtests.count(), 1); - } +pub async fn get_latest_speedtests_for_pubkey<'a>( + pubkey: &PublicKeyBinary, + exec: &mut Transaction<'_, Postgres>, +) -> Result, sqlx::Error> { + let mut speedtests = Vec::new(); + + let mut rows = sqlx::query_as::<_, Speedtest>( + "SELECT * FROM speedtests where pubkey = $1 order by timestamp desc limit $2", + ) + .bind(pubkey) + .bind(SPEEDTEST_AVG_MAX_DATA_POINTS as i64) + .fetch(exec); + + while let Some(speedtest) = rows.try_next().await? { + speedtests.push(speedtest); + } + Ok(speedtests) +} + +pub async fn aggregate_epoch_speedtests<'a>( + epoch_end: DateTime, + exec: impl sqlx::PgExecutor<'a> + Copy + 'a, +) -> Result { + let mut speedtests = EpochSpeedTests::new(); + + let mut rows = sqlx::query_as::<_, Speedtest>( + "select * from ( + SELECT distinct(pubkey), upload_speed, download_speed, latency, timestamp, row_number() + over (partition by pubkey order by timestamp desc) as count FROM speedtests where timestamp < $1 + ) as tmp + where count < $2" + ) + .bind(epoch_end) + .bind(SPEEDTEST_AVG_MAX_DATA_POINTS as i64) + .fetch(exec); + + while let Some(speedtest) = rows.try_next().await? { + speedtests + .entry(speedtest.pubkey.clone()) + .or_default() + .push(speedtest); + } + Ok(speedtests) +} + +pub async fn save_speedtest_to_db( + report: CellSpeedtestIngestReport, + exec: &mut Transaction<'_, Postgres>, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + insert into speedtests (pubkey, upload_speed, download_speed, latency, timestamp) + values ($1, $2, $3, $4, $5) + "#, + ) + .bind(report.report.pubkey) + .bind(report.report.upload_speed as i64) + .bind(report.report.download_speed as i64) + .bind(report.report.latency as i64) + .bind(report.report.timestamp) + .execute(exec) + .await?; + Ok(()) } diff --git a/mobile_verifier/src/speedtests_average.rs b/mobile_verifier/src/speedtests_average.rs new file mode 100644 index 000000000..c8a124634 --- /dev/null +++ b/mobile_verifier/src/speedtests_average.rs @@ -0,0 +1,471 @@ +use crate::speedtests::{self, Speedtest}; +use chrono::{DateTime, Duration, Utc}; +use file_store::{file_sink::FileSinkClient, traits::TimestampEncode}; +use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::SpeedtestAvgValidity; +use helium_proto::services::poc_mobile::{self as proto}; +use rust_decimal::Decimal; +use rust_decimal_macros::dec; +use sqlx::FromRow; + +use std::collections::HashMap; + +const SPEEDTEST_LAPSE: i64 = 48; +const MIN_DOWNLOAD: u64 = mbps(30); +const MIN_UPLOAD: u64 = mbps(2); +const MAX_LATENCY: u32 = 100; +pub const MIN_REQUIRED_SAMPLES: usize = 2; + +pub type EpochAverages = HashMap; + +#[derive(Debug, Clone)] +pub struct SpeedtestAverage { + pub pubkey: PublicKeyBinary, + pub window_size: usize, + pub upload_speed_avg_bps: u64, + pub download_speed_avg_bps: u64, + pub latency_avg_ms: u32, + pub validity: SpeedtestAvgValidity, + pub reward_multiplier: Decimal, +} + +impl<'a, I: ?Sized> From<&'a I> for SpeedtestAverage +where + &'a I: IntoIterator, +{ + fn from(iter: &'a I) -> Self { + let mut id = vec![]; // eww! + let mut window_size = 0; + let mut sum_upload = 0; + let mut sum_download = 0; + let mut sum_latency = 0; + + for Speedtest { + pubkey, // eww! + upload_speed, + download_speed, + latency, + .. + } in speedtests_without_lapsed(iter.into_iter(), Duration::hours(SPEEDTEST_LAPSE)) + { + id = pubkey.as_ref().to_vec(); // eww! + sum_upload += *upload_speed as u64; + sum_download += *download_speed as u64; + sum_latency += *latency as u32; + window_size += 1; + } + + if window_size > 0 { + let upload_speed_avg_bps = sum_upload / window_size as u64; + let download_speed_avg_bps = sum_download / window_size as u64; + let latency_avg_ms = sum_latency / window_size as u32; + let validity = validity( + window_size as usize, + upload_speed_avg_bps, + download_speed_avg_bps, + latency_avg_ms, + ); + let tier = SpeedtestTier::new( + window_size as usize, + upload_speed_avg_bps, + download_speed_avg_bps, + latency_avg_ms, + ); + let reward_multiplier = tier.into_multiplier(); + SpeedtestAverage { + pubkey: id.into(), + window_size: window_size as usize, + upload_speed_avg_bps, + download_speed_avg_bps, + latency_avg_ms, + validity, + reward_multiplier, + } + } else { + SpeedtestAverage { + pubkey: id.into(), + window_size: 0, + upload_speed_avg_bps: sum_upload, + download_speed_avg_bps: sum_download, + latency_avg_ms: sum_latency, + validity: proto::SpeedtestAvgValidity::TooFewSamples, + reward_multiplier: Decimal::ZERO, + } + } + } +} + +impl SpeedtestAverage { + pub async fn write( + &self, + filesink: &FileSinkClient, + speedtests: Vec, + ) -> file_store::Result { + filesink + .write( + proto::SpeedtestAvg { + pub_key: self.pubkey.clone().into(), + upload_speed_avg_bps: self.upload_speed_avg_bps, + download_speed_avg_bps: self.download_speed_avg_bps, + latency_avg_ms: self.latency_avg_ms, + timestamp: Utc::now().encode_timestamp(), + speedtests: speedtests_without_lapsed( + speedtests.iter(), + Duration::hours(SPEEDTEST_LAPSE), + ) + .map(|st| proto::Speedtest { + timestamp: st.timestamp.timestamp() as u64, + upload_speed_bps: st.upload_speed as u64, + download_speed_bps: st.download_speed as u64, + latency_ms: st.latency as u32, + }) + .collect(), + validity: self.validity as i32, + reward_multiplier: self.reward_multiplier.try_into().unwrap(), + }, + &[("validity", self.validity.as_str_name())], + ) + .await?; + Ok(()) + } + + pub fn reward_multiplier(&self) -> Decimal { + self.reward_multiplier + } + + #[allow(dead_code)] + pub fn tier(&self) -> SpeedtestTier { + calculate_tier( + self.window_size, + self.upload_speed_avg_bps, + self.download_speed_avg_bps, + self.latency_avg_ms, + ) + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum SpeedtestTier { + Failed = 0, + Poor = 1, + Degraded = 2, + Acceptable = 3, +} + +impl SpeedtestTier { + pub fn new( + window_size: usize, + upload_speed_avg_bps: u64, + download_speed_avg_bps: u64, + latency_avg_ms: u32, + ) -> SpeedtestTier { + calculate_tier( + window_size, + upload_speed_avg_bps, + download_speed_avg_bps, + latency_avg_ms, + ) + } + + fn into_multiplier(self) -> Decimal { + match self { + Self::Acceptable => dec!(1.0), + Self::Degraded => dec!(0.5), + Self::Poor => dec!(0.25), + Self::Failed => dec!(0.0), + } + } + + fn from_download_speed(download_speed: u64) -> Self { + if download_speed >= mbps(100) { + Self::Acceptable + } else if download_speed >= mbps(50) { + Self::Degraded + } else if download_speed >= mbps(30) { + Self::Poor + } else { + Self::Failed + } + } + + fn from_upload_speed(upload_speed: u64) -> Self { + if upload_speed >= mbps(10) { + Self::Acceptable + } else if upload_speed >= mbps(5) { + Self::Degraded + } else if upload_speed >= mbps(2) { + Self::Poor + } else { + Self::Failed + } + } + + fn from_latency(latency: u32) -> Self { + if latency <= 50 { + Self::Acceptable + } else if latency <= 75 { + Self::Degraded + } else if latency <= 100 { + Self::Poor + } else { + Self::Failed + } + } +} + +#[derive(Clone, Default, FromRow)] +pub struct SpeedtestAverages { + pub averages: HashMap, +} + +impl SpeedtestAverages { + #[allow(dead_code)] + pub fn into_iter(self) -> impl IntoIterator { + self.averages.into_values() + } + + #[allow(dead_code)] + pub fn get_average(&self, pub_key: &PublicKeyBinary) -> Option { + self.averages.get(pub_key).cloned() + } + + pub async fn aggregate_epoch_averages<'a>( + epoch_end: DateTime, + exec: impl sqlx::PgExecutor<'a> + Copy + 'a, + ) -> Result { + let averages: EpochAverages = speedtests::aggregate_epoch_speedtests(epoch_end, exec) + .await? + .into_iter() + .map(|(pub_key, speedtests)| { + let average = SpeedtestAverage::from(&speedtests); + (pub_key, average) + }) + .collect(); + + Ok(Self { averages }) + } +} + +pub fn calculate_tier( + window_size: usize, + upload_speed_avg_bps: u64, + download_speed_avg_bps: u64, + latency_avg_ms: u32, +) -> SpeedtestTier { + if window_size < MIN_REQUIRED_SAMPLES { + SpeedtestTier::Failed + } else { + SpeedtestTier::from_download_speed(download_speed_avg_bps) + .min(SpeedtestTier::from_upload_speed(upload_speed_avg_bps)) + .min(SpeedtestTier::from_latency(latency_avg_ms)) + } +} + +pub fn validity( + window_size: usize, + upload_speed_avg_bps: u64, + download_speed_avg_bps: u64, + latency_avg_ms: u32, +) -> proto::SpeedtestAvgValidity { + if window_size < MIN_REQUIRED_SAMPLES { + return proto::SpeedtestAvgValidity::TooFewSamples; + } + if download_speed_avg_bps < MIN_DOWNLOAD { + return proto::SpeedtestAvgValidity::SlowDownloadSpeed; + } + if upload_speed_avg_bps < MIN_UPLOAD { + return proto::SpeedtestAvgValidity::SlowUploadSpeed; + } + if latency_avg_ms > MAX_LATENCY { + return proto::SpeedtestAvgValidity::HighLatency; + } + proto::SpeedtestAvgValidity::Valid +} + +fn speedtests_without_lapsed<'a>( + iterable: impl Iterator, + lapse_cliff: Duration, +) -> impl Iterator { + let mut last_timestamp = None; + iterable.take_while(move |speedtest| match last_timestamp { + Some(ts) if ts - speedtest.timestamp > lapse_cliff => false, + None | Some(_) => { + last_timestamp = Some(speedtest.timestamp); + true + } + }) +} + +const fn mbps(mbps: u64) -> u64 { + mbps * 125000 +} + +#[cfg(test)] +mod test { + use super::*; + use chrono::TimeZone; + + fn parse_dt(dt: &str) -> DateTime { + Utc.datetime_from_str(dt, "%Y-%m-%d %H:%M:%S %z") + .expect("unable_to_parse") + } + + fn bytes_per_s(mbps: i64) -> i64 { + mbps * 125000 + } + + #[test] + fn check_tier_cmp() { + assert_eq!( + SpeedtestTier::Acceptable.min(SpeedtestTier::Failed), + SpeedtestTier::Failed, + ); + } + + #[test] + fn check_known_valid() { + let speedtests = known_speedtests(); + assert_ne!( + SpeedtestAverage::from(&speedtests[0..5]).tier(), + SpeedtestTier::Acceptable, + ); + assert_eq!( + SpeedtestAverage::from(&speedtests[0..6]).tier(), + SpeedtestTier::Acceptable + ); + } + + #[test] + fn check_minimum_known_valid() { + let speedtests = known_speedtests(); + assert_ne!( + SpeedtestAverage::from(&speedtests[4..4]).tier(), + SpeedtestTier::Acceptable + ); + assert_eq!( + SpeedtestAverage::from(&speedtests[4..=5]).tier(), + SpeedtestTier::Acceptable + ); + assert_eq!( + SpeedtestAverage::from(&speedtests[4..=6]).tier(), + SpeedtestTier::Acceptable + ); + } + + #[test] + fn check_minimum_known_invalid() { + let speedtests = known_speedtests(); + assert_ne!( + SpeedtestAverage::from(&speedtests[5..6]).tier(), + SpeedtestTier::Acceptable + ); + } + + fn known_speedtests() -> Vec { + // This data is taken from the spreadsheet + // Timestamp DL UL Latency DL RA UL RA Latency RA Acceptable? + // 2022-08-02 18:00:00 70 30 40 103.33 19.17 30.00 TRUE + // 2022-08-02 12:00:00 100 10 30 116.67 17.50 35.00 TRUE + // 2022-08-02 6:00:00 130 20 10 100.00 15.83 30.00 TRUE + // 2022-08-02 0:00:00 90 15 10 94.00 15.00 34.00 FALSE + // 2022-08-01 18:00:00 112 30 40 95.00 15.00 40.00 FALSE + // 2022-08-01 12:00:00 118 10 50 89.33 10.00 40.00 FALSE + // 2022-08-01 6:00:00 150 20 70 75.00 10.00 35.00 FALSE + // 2022-08-01 0:00:00 0 0 0 0.00 0.00 0.00 FALSE* + let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed gw1 parse"); + + vec![ + Speedtest::new(gw1.clone(), parse_dt("2022-08-02 18:00:00 +0000"), 0, 0, 0), + Speedtest::new( + gw1.clone(), + parse_dt("2022-08-02 12:00:00 +0000"), + bytes_per_s(20), + bytes_per_s(150), + 70, + ), + Speedtest::new( + gw1.clone(), + parse_dt("2022-08-02 6:00:00 +0000"), + bytes_per_s(10), + bytes_per_s(118), + 50, + ), + Speedtest::new( + gw1.clone(), + parse_dt("2022-08-02 0:00:00 +0000"), + bytes_per_s(30), + bytes_per_s(112), + 40, + ), + Speedtest::new( + gw1.clone(), + parse_dt("2022-08-02 0:00:00 +0000"), + bytes_per_s(15), + bytes_per_s(90), + 10, + ), + Speedtest::new( + gw1.clone(), + parse_dt("2022-08-01 18:00:00 +0000"), + bytes_per_s(20), + bytes_per_s(130), + 10, + ), + Speedtest::new( + gw1.clone(), + parse_dt("2022-08-01 12:00:00 +0000"), + bytes_per_s(10), + bytes_per_s(100), + 30, + ), + Speedtest::new( + gw1, + parse_dt("2022-08-01 6:00:00 +0000"), + bytes_per_s(30), + bytes_per_s(70), + 40, + ), + ] + } + + #[test] + fn check_speedtest_without_lapsed() { + let speedtest_cutoff = Duration::hours(10); + let contiguos_speedtests = known_speedtests(); + let contiguous_speedtests = + speedtests_without_lapsed(contiguos_speedtests.iter(), speedtest_cutoff); + let pubkey: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" + .parse() + .expect("failed owner parse"); + let disjoint_speedtests = vec![ + Speedtest::new( + pubkey.clone(), + parse_dt("2022-08-02 6:00:00 +0000"), + bytes_per_s(20), + bytes_per_s(150), + 70, + ), + Speedtest::new( + pubkey.clone(), + parse_dt("2022-08-01 18:00:00 +0000"), + bytes_per_s(10), + bytes_per_s(118), + 50, + ), + Speedtest::new( + pubkey, + parse_dt("2022-08-01 12:00:00 +0000"), + bytes_per_s(30), + bytes_per_s(112), + 40, + ), + ]; + let disjoint_speedtests = + speedtests_without_lapsed(disjoint_speedtests.iter(), speedtest_cutoff); + + assert_eq!(contiguous_speedtests.count(), 8); + assert_eq!(disjoint_speedtests.count(), 1); + } +}