Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Aug 2, 2023
1 parent d1e4479 commit a9b971c
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 116 deletions.
3 changes: 2 additions & 1 deletion mobile_verifier/migrations/15_speedtests_one_to_one.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ CREATE TABLE speedtests_migration (
upload_speed bigint,
download_speed bigint,
latency integer,
timestamp timestamptz NOT NULL
timestamp timestamptz NOT NULL,
inserted_at timestamptz default now()
);
CREATE INDEX idx_speedtests_pubkey on speedtests_migration (pubkey);

Expand Down
15 changes: 14 additions & 1 deletion mobile_verifier/src/reward_shares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ mod test {
subscriber_location::SubscriberValidatedLocations,
};
use chrono::{Duration, Utc};
use file_store::speedtest::CellSpeedtest;
use futures::stream;
use helium_proto::services::poc_mobile::mobile_reward_share::Reward as MobileReward;
use prost::Message;
Expand Down Expand Up @@ -526,7 +527,7 @@ mod test {
assert_eq!(data_transfer_rewards.reward_scale().round_dp(1), dec!(0.5));
}

fn bytes_per_s(mbps: i64) -> i64 {
fn bytes_per_s(mbps: u64) -> u64 {
mbps * 125000
}

Expand All @@ -538,43 +539,55 @@ mod test {

fn acceptable_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(10),
download_speed: bytes_per_s(100),
latency: 25,
serial: "".to_string(),
}
}
}

fn degraded_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(5),
download_speed: bytes_per_s(60),
latency: 60,
serial: "".to_string(),
}
}
}

fn failed_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(1),
download_speed: bytes_per_s(20),
latency: 110,
serial: "".to_string(),
}
}
}

fn poor_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
report: CellSpeedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(2),
download_speed: bytes_per_s(40),
latency: 90,
serial: "".to_string(),
}
}
}

#[tokio::test]
async fn test_radio_weights() {
Expand Down
2 changes: 1 addition & 1 deletion mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl Rewarder {

// clear out the various db tables
heartbeats::clear_heartbeats(&mut transaction, reward_period).await?;
speedtests::clear_speedtests(&mut transaction).await?;
speedtests::clear_speedtests(&mut transaction, reward_period).await?;
data_session::clear_hotspot_data_sessions(&mut transaction, reward_period).await?;
subscriber_location::clear_location_shares(&mut transaction, reward_period).await?;

Expand Down
178 changes: 94 additions & 84 deletions mobile_verifier/src/speedtests.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,41 @@
use crate::speedtests_average::{SpeedtestAverage, SPEEDTEST_LAPSE};
use chrono::{DateTime, Duration, Utc};
use crate::speedtests_average::SpeedtestAverage;
use chrono::{DateTime, Utc};
use file_store::{
file_info_poller::FileInfoStream, file_sink::FileSinkClient,
speedtest::CellSpeedtestIngestReport,
file_info_poller::FileInfoStream,
file_sink::FileSinkClient,
speedtest::{CellSpeedtest, CellSpeedtestIngestReport},
};
use futures::{
stream::{Stream, StreamExt, TryStreamExt},
TryFutureExt,
};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use helium_crypto::PublicKeyBinary;
use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient};
use sqlx::{FromRow, Postgres, Transaction, Type};
use std::collections::HashMap;
use sqlx::{postgres::PgRow, FromRow, Postgres, Row, Transaction};
use std::{collections::HashMap, ops::Range, pin::pin};
use tokio::sync::mpsc::Receiver;

const SPEEDTEST_AVG_MAX_DATA_POINTS: usize = 6;

pub type EpochSpeedTests = HashMap<PublicKeyBinary, Vec<Speedtest>>;

#[derive(Debug, Clone, Type, FromRow)]
#[sqlx(type_name = "speedtest")]
#[derive(Debug, Clone)]
pub struct Speedtest {
pub pubkey: PublicKeyBinary,
pub upload_speed: i64,
pub download_speed: i64,
pub latency: i32,
pub timestamp: DateTime<Utc>,
pub report: CellSpeedtest,
}

impl Speedtest {
#[cfg(test)]
pub fn new(
pubkey: PublicKeyBinary,
timestamp: DateTime<Utc>,
upload_speed: i64,
download_speed: i64,
latency: i32,
) -> Self {
Self {
pubkey,
timestamp,
upload_speed,
download_speed,
latency,
}
impl FromRow<'_, PgRow> for Speedtest {
fn from_row(row: &PgRow) -> sqlx::Result<Speedtest> {
Ok(Self {
report: CellSpeedtest {
pubkey: row.get::<PublicKeyBinary, &str>("pubkey"),
serial: row.get::<String, &str>("serial"),
upload_speed: row.get::<i64, &str>("upload_speed") as u64,
download_speed: row.get::<i64, &str>("download_speed") as u64,
timestamp: row.get::<DateTime<Utc>, &str>("timestamp"),
latency: row.get::<i32, &str>("latency") as u32,
},
})
}
}

Expand Down Expand Up @@ -87,46 +82,81 @@ impl SpeedtestDaemon {

async fn process_file(
&self,
file_info_stream: FileInfoStream<CellSpeedtestIngestReport>,
file: FileInfoStream<CellSpeedtestIngestReport>,
) -> anyhow::Result<()> {
tracing::info!(
"Processing speedtest file {}",
file_info_stream.file_info.key
);
tracing::info!("Processing speedtest file {}", file.file_info.key);

let mut transaction = self.pool.begin().await?;
// process the speedtest reports from the file, if valid insert to the db
// and recalcuate a new average
file_info_stream
.into_stream(&mut transaction)
.await?
.map(anyhow::Ok)
.try_fold(transaction, |mut transaction, report| async {
let pubkey = report.report.pubkey.clone();
if self
.gateway_client
let speedtests = file.into_stream(&mut transaction).await?;

let mut validated_speedtests =
pin!(Speedtest::validate_speedtests(&self.gateway_client, speedtests).await);

// todo, the double some is odd, what be the alternative ?
while let Some(Some(speedtest)) = validated_speedtests.next().await {
let pubkey = speedtest.report.pubkey.clone();
speedtest.save(&mut transaction).await?;
let latest_speedtests =
get_latest_speedtests_for_pubkey(&pubkey, &mut transaction).await?;
let average = SpeedtestAverage::from(&latest_speedtests);
average.write(&self.file_sink, latest_speedtests).await?;
}
self.file_sink.commit().await?;
transaction.commit().await?;

Ok(())
}
}

impl Speedtest {
pub async fn validate_speedtests<'a>(
gateway_client: &'a GatewayClient,
speedtests: impl Stream<Item = CellSpeedtestIngestReport> + 'a,
) -> impl Stream<Item = Option<Self>> + 'a {
speedtests.then(move |report| {
let pubkey = report.report.pubkey.clone();
async move {
if gateway_client
.resolve_gateway_info(&pubkey.clone())
.await
.is_ok()
{
save_speedtest_to_db(report, &mut transaction).await?;
let latest_speedtests: Vec<Speedtest> =
get_latest_speedtests_for_pubkey(&pubkey, &mut transaction).await?;
let average = SpeedtestAverage::from(&latest_speedtests);
average.write(&self.file_sink, latest_speedtests).await?;
Some(Speedtest {
report: report.report.into(),
})
}
Ok(transaction)
})
.await?
.commit()
.await?;
// db work all done, commit the reports to s3
self.file_sink.commit().await?;
else {
None
}
}
})
}

pub async fn save(
&self,
exec: &mut Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
insert into speedtests (pubkey, upload_speed, download_speed, latency, serial, timestamp)
values ($1, $2, $3, $4, $5)
"#,
)
.bind(self.report.pubkey.clone())
.bind(self.report.upload_speed as i64)
.bind(self.report.download_speed as i64)
.bind(self.report.latency as i64)
.bind(self.report.latency as i64)
.bind(self.report.serial.clone())
.bind(self.report.timestamp)
.execute(exec)
.await?;
Ok(())
}
}

pub async fn get_latest_speedtests_for_pubkey<'a>(

pub async fn get_latest_speedtests_for_pubkey(
pubkey: &PublicKeyBinary,
exec: &mut Transaction<'_, Postgres>,
) -> Result<Vec<Speedtest>, sqlx::Error> {
Expand All @@ -147,13 +177,13 @@ pub async fn get_latest_speedtests_for_pubkey<'a>(

pub async fn aggregate_epoch_speedtests<'a>(
epoch_end: DateTime<Utc>,
exec: impl sqlx::PgExecutor<'a> + Copy + 'a,
exec: &sqlx::Pool<sqlx::Postgres>
) -> Result<EpochSpeedTests, sqlx::Error> {
let mut speedtests = EpochSpeedTests::new();
// pull the last N most recent speedtests from prior to the epoch end for each pubkey
let mut rows = sqlx::query_as::<_, Speedtest>(
"select * from (
SELECT distinct(pubkey), upload_speed, download_speed, latency, timestamp, row_number()
SELECT distinct(pubkey), upload_speed, download_speed, latency, timestamp, serial, row_number()
over (partition by pubkey order by timestamp desc) as count FROM speedtests where timestamp < $1
) as tmp
where count < $2"
Expand All @@ -164,39 +194,19 @@ pub async fn aggregate_epoch_speedtests<'a>(
// collate the returned speedtests based on pubkey
while let Some(speedtest) = rows.try_next().await? {
speedtests
.entry(speedtest.pubkey.clone())
.entry(speedtest.report.pubkey.clone())
.or_default()
.push(speedtest);
}
Ok(speedtests)
}

pub async fn save_speedtest_to_db(
report: CellSpeedtestIngestReport,
exec: &mut Transaction<'_, Postgres>,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
insert into speedtests (pubkey, upload_speed, download_speed, latency, timestamp)
values ($1, $2, $3, $4, $5)
"#,
)
.bind(report.report.pubkey)
.bind(report.report.upload_speed as i64)
.bind(report.report.download_speed as i64)
.bind(report.report.latency as i64)
.bind(report.report.timestamp)
.execute(exec)
.await?;
Ok(())
}

// Clear the speedtests table of tests older than hours defined by SPEEDTEST_LAPSE
pub async fn clear_speedtests(
tx: &mut sqlx::Transaction<'_, sqlx::Postgres>,
reward_period: &Range<DateTime<Utc>>,
) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM speedtests where timestamp < $1")
.bind(Utc::now() - Duration::hours(SPEEDTEST_LAPSE))
sqlx::query("DELETE FROM speedtests WHERE timestamp < $1")
.bind(reward_period.start)
.execute(&mut *tx)
.await?;
Ok(())
Expand Down
Loading

0 comments on commit a9b971c

Please sign in to comment.