Skip to content

Commit

Permalink
write out verified status of speedtests to s3
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Aug 9, 2023
1 parent 226da1d commit 53f85bb
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 22 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ sqlx = {version = "0", features = [
]}

helium-crypto = {version = "0.6.8", features=["sqlx-postgres", "multisig"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "andymck/invalid-speed-test-proto", features = ["services"]}
hextree = "*"
solana-client = "1.14"
solana-sdk = "1.14"
solana-program = "1.11"
spl-token = "3.5.0"
reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]}
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "andymck/invalid-speed-test-proto" }
humantime = "2"
metrics = "0"
metrics-exporter-prometheus = "0"
Expand Down
5 changes: 5 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub const SUBSCRIBER_LOCATION_INGEST_REPORT: &str = "subscriber_location_report"
pub const VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT: &str = "verified_subscriber_location_report";
pub const CELL_HEARTBEAT: &str = "cell_heartbeat";
pub const CELL_SPEEDTEST: &str = "cell_speedtest";
pub const VERIFIED_SPEEDTEST: &str = "verified_speedtest";
pub const CELL_HEARTBEAT_INGEST_REPORT: &str = "heartbeat_report";
pub const CELL_SPEEDTEST_INGEST_REPORT: &str = "speedtest_report";
pub const ENTROPY: &str = "entropy";
Expand Down Expand Up @@ -164,6 +165,7 @@ pub enum FileType {
VerifiedSubscriberLocationIngestReport,
MapperMsg,
CoverageObjectIngestReport,
VerifiedSpeedtest,
}

impl fmt::Display for FileType {
Expand All @@ -176,6 +178,7 @@ impl fmt::Display for FileType {
}
Self::CellHeartbeat => CELL_HEARTBEAT,
Self::CellSpeedtest => CELL_SPEEDTEST,
Self::VerifiedSpeedtest => VERIFIED_SPEEDTEST,
Self::CellHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT,
Self::CellSpeedtestIngestReport => CELL_SPEEDTEST_INGEST_REPORT,
Self::Entropy => ENTROPY,
Expand Down Expand Up @@ -220,6 +223,7 @@ impl FileType {
}
Self::CellHeartbeat => CELL_HEARTBEAT,
Self::CellSpeedtest => CELL_SPEEDTEST,
Self::VerifiedSpeedtest => VERIFIED_SPEEDTEST,
Self::CellHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT,
Self::CellSpeedtestIngestReport => CELL_SPEEDTEST_INGEST_REPORT,
Self::Entropy => ENTROPY,
Expand Down Expand Up @@ -264,6 +268,7 @@ impl FromStr for FileType {
}
CELL_HEARTBEAT => Self::CellHeartbeat,
CELL_SPEEDTEST => Self::CellSpeedtest,
VERIFIED_SPEEDTEST => Self::VerifiedSpeedtest,
CELL_HEARTBEAT_INGEST_REPORT => Self::CellHeartbeatIngestReport,
CELL_SPEEDTEST_INGEST_REPORT => Self::CellSpeedtestIngestReport,
ENTROPY => Self::Entropy,
Expand Down
11 changes: 11 additions & 0 deletions file_store/src/speedtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ impl TryFrom<SpeedtestIngestReportV1> for CellSpeedtestIngestReport {
}
}

impl From<CellSpeedtestIngestReport> for SpeedtestIngestReportV1 {
fn from(v: CellSpeedtestIngestReport) -> Self {
let received_timestamp = v.timestamp();
let report: SpeedtestReqV1 = v.report.into();
Self {
received_timestamp,
report: Some(report),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
22 changes: 18 additions & 4 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use file_store::{
mobile_transfer::ValidDataTransferSession, speedtest::CellSpeedtestIngestReport, FileStore,
FileType,
};

use futures_util::TryFutureExt;
use mobile_config::client::{AuthorizationClient, EntityClient, GatewayClient};
use price::PriceTracker;
Expand Down Expand Up @@ -100,7 +99,7 @@ impl Cmd {
.start(shutdown_listener.clone())
.await?;

let (valid_speedtests, mut valid_speedtests_server) = file_sink::FileSinkBuilder::new(
let (speedtests_avg, mut speedtests_avg_server) = file_sink::FileSinkBuilder::new(
FileType::SpeedtestAvg,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"),
Expand All @@ -112,11 +111,25 @@ impl Cmd {
.create()
.await?;

let (speedtests_validity, mut speedtests_validity_server) =
file_sink::FileSinkBuilder::new(
FileType::VerifiedSpeedtest,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "verified_speedtest"),
shutdown_listener.clone(),
)
.deposits(Some(file_upload_tx.clone()))
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
.await?;

let speedtest_daemon = SpeedtestDaemon::new(
pool.clone(),
gateway_client.clone(),
speedtests,
valid_speedtests,
speedtests_avg,
speedtests_validity,
);

// Mobile rewards
Expand Down Expand Up @@ -199,7 +212,8 @@ impl Cmd {
tokio::try_join!(
db_join_handle.map_err(Error::from),
valid_heartbeats_server.run().map_err(Error::from),
valid_speedtests_server.run().map_err(Error::from),
speedtests_avg_server.run().map_err(Error::from),
speedtests_validity_server.run().map_err(Error::from),
mobile_rewards_server.run().map_err(Error::from),
file_upload.run(&shutdown_listener).map_err(Error::from),
reward_manifests_server.run().map_err(Error::from),
Expand Down
74 changes: 60 additions & 14 deletions mobile_verifier/src/speedtests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ use futures::{
TryFutureExt,
};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
SpeedtestIngestReportV1, SpeedtestVerificationResult,
VerifiedSpeedtest as VerifiedSpeedtestProto,
};
use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient};
use sqlx::{postgres::PgRow, FromRow, Postgres, Row, Transaction};
use std::collections::HashMap;
Expand Down Expand Up @@ -43,21 +47,24 @@ pub struct SpeedtestDaemon {
pool: sqlx::Pool<sqlx::Postgres>,
gateway_client: GatewayClient,
speedtests: Receiver<FileInfoStream<CellSpeedtestIngestReport>>,
file_sink: FileSinkClient,
speedtest_avg_file_sink: FileSinkClient,
verified_speedtest_file_sink: FileSinkClient,
}

impl SpeedtestDaemon {
pub fn new(
pool: sqlx::Pool<sqlx::Postgres>,
gateway_client: GatewayClient,
speedtests: Receiver<FileInfoStream<CellSpeedtestIngestReport>>,
file_sink: FileSinkClient,
speedtest_avg_file_sink: FileSinkClient,
verified_speedtest_file_sink: FileSinkClient,
) -> Self {
Self {
pool,
gateway_client,
speedtests,
file_sink,
speedtest_avg_file_sink,
verified_speedtest_file_sink,
}
}

Expand Down Expand Up @@ -88,24 +95,63 @@ impl SpeedtestDaemon {
let mut transaction = self.pool.begin().await?;
let mut speedtests = file.into_stream(&mut transaction).await?;
while let Some(speedtest_report) = speedtests.next().await {
let pubkey = speedtest_report.report.pubkey.clone();
if self
.gateway_client
.resolve_gateway_info(&pubkey)
.await?
.is_some()
{
let result = self.validate_speedtest(&speedtest_report).await?;
if result == SpeedtestVerificationResult::SpeedtestValid {
save_speedtest(&speedtest_report.report, &mut transaction).await?;
let latest_speedtests =
get_latest_speedtests_for_pubkey(&pubkey, &mut transaction).await?;
let latest_speedtests = get_latest_speedtests_for_pubkey(
&speedtest_report.report.pubkey,
&mut transaction,
)
.await?;
let average = SpeedtestAverage::from(&latest_speedtests);
average.write(&self.file_sink, latest_speedtests).await?;
average
.write(&self.speedtest_avg_file_sink, latest_speedtests)
.await?;
}
// write out paper trail of speedtest validity
self.write_speedtest_validity(speedtest_report, result)
.await?;
}
self.file_sink.commit().await?;
self.speedtest_avg_file_sink.commit().await?;
self.verified_speedtest_file_sink.commit().await?;
transaction.commit().await?;
Ok(())
}

pub async fn validate_speedtest(
&self,
speedtest: &CellSpeedtestIngestReport,
) -> anyhow::Result<SpeedtestVerificationResult> {
let pubkey = speedtest.report.pubkey.clone();
if self
.gateway_client
.resolve_gateway_info(&pubkey)
.await?
.is_some()
{
return Ok(SpeedtestVerificationResult::SpeedtestValid);
} else {
return Ok(SpeedtestVerificationResult::SpeedtestGatewayNotFound);
}
}

pub async fn write_speedtest_validity(
&self,
speedtest_report: CellSpeedtestIngestReport,
result: SpeedtestVerificationResult,
) -> anyhow::Result<()> {
let ingest_report: SpeedtestIngestReportV1 = speedtest_report.try_into()?;
let timestamp: u64 = Utc::now().timestamp_millis() as u64;
let proto = VerifiedSpeedtestProto {
report: Some(ingest_report),
result: result as i32,
timestamp,
};
self.verified_speedtest_file_sink
.write(proto, &[("result", result.as_str_name())])
.await?;
Ok(())
}
}

pub async fn save_speedtest(
Expand Down

0 comments on commit 53f85bb

Please sign in to comment.