From 53f85bb9a20a743e22a55983e71636c0d0a2b0c1 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Tue, 8 Aug 2023 16:01:46 +0100 Subject: [PATCH] write out verified status of speedtests to s3 --- Cargo.lock | 4 +- Cargo.toml | 4 +- file_store/src/file_info.rs | 5 +++ file_store/src/speedtest.rs | 11 +++++ mobile_verifier/src/cli/server.rs | 22 +++++++-- mobile_verifier/src/speedtests.rs | 74 +++++++++++++++++++++++++------ 6 files changed, 98 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f727a1c07..ede0b03ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1118,7 +1118,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#7d85f190b207a3ac8d6ad081e2f70e45eecd1a3a" +source = "git+https://github.com/helium/proto?branch=andymck/invalid-speed-test-proto#22166182793303548e7e2622738dc4b2faa4812f" dependencies = [ "base64 0.21.0", "byteorder", @@ -2944,7 +2944,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#7d85f190b207a3ac8d6ad081e2f70e45eecd1a3a" +source = "git+https://github.com/helium/proto?branch=andymck/invalid-speed-test-proto#22166182793303548e7e2622738dc4b2faa4812f" dependencies = [ "bytes", "prost", diff --git a/Cargo.toml b/Cargo.toml index fd747c1d0..3251fe545 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,14 +59,14 @@ sqlx = {version = "0", features = [ ]} helium-crypto = {version = "0.6.8", features=["sqlx-postgres", "multisig"]} -helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]} +helium-proto = {git = "https://github.com/helium/proto", branch = "andymck/invalid-speed-test-proto", features = ["services"]} hextree = "*" solana-client = "1.14" solana-sdk = "1.14" solana-program = "1.11" spl-token = "3.5.0" reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]} -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "andymck/invalid-speed-test-proto" } humantime = "2" metrics = "0" metrics-exporter-prometheus = "0" diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 8ccc8ef2e..0b14b00f8 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -100,6 +100,7 @@ pub const SUBSCRIBER_LOCATION_INGEST_REPORT: &str = "subscriber_location_report" pub const VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT: &str = "verified_subscriber_location_report"; pub const CELL_HEARTBEAT: &str = "cell_heartbeat"; pub const CELL_SPEEDTEST: &str = "cell_speedtest"; +pub const VERIFIED_SPEEDTEST: &str = "verified_speedtest"; pub const CELL_HEARTBEAT_INGEST_REPORT: &str = "heartbeat_report"; pub const CELL_SPEEDTEST_INGEST_REPORT: &str = "speedtest_report"; pub const ENTROPY: &str = "entropy"; @@ -164,6 +165,7 @@ pub enum FileType { VerifiedSubscriberLocationIngestReport, MapperMsg, CoverageObjectIngestReport, + VerifiedSpeedtest, } impl fmt::Display for FileType { @@ -176,6 +178,7 @@ impl fmt::Display for FileType { } Self::CellHeartbeat => CELL_HEARTBEAT, Self::CellSpeedtest => CELL_SPEEDTEST, + Self::VerifiedSpeedtest => VERIFIED_SPEEDTEST, Self::CellHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT, Self::CellSpeedtestIngestReport => CELL_SPEEDTEST_INGEST_REPORT, Self::Entropy => ENTROPY, @@ -220,6 +223,7 @@ impl FileType { } Self::CellHeartbeat => CELL_HEARTBEAT, Self::CellSpeedtest => CELL_SPEEDTEST, + Self::VerifiedSpeedtest => VERIFIED_SPEEDTEST, Self::CellHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT, Self::CellSpeedtestIngestReport => CELL_SPEEDTEST_INGEST_REPORT, Self::Entropy => ENTROPY, @@ -264,6 +268,7 @@ impl FromStr for FileType { } CELL_HEARTBEAT => Self::CellHeartbeat, CELL_SPEEDTEST => Self::CellSpeedtest, + VERIFIED_SPEEDTEST => Self::VerifiedSpeedtest, CELL_HEARTBEAT_INGEST_REPORT => Self::CellHeartbeatIngestReport, CELL_SPEEDTEST_INGEST_REPORT => Self::CellSpeedtestIngestReport, ENTROPY => Self::Entropy, diff --git a/file_store/src/speedtest.rs b/file_store/src/speedtest.rs index 039e98503..29a64b558 100644 --- a/file_store/src/speedtest.rs +++ b/file_store/src/speedtest.rs @@ -98,6 +98,17 @@ impl TryFrom for CellSpeedtestIngestReport { } } +impl From for SpeedtestIngestReportV1 { + fn from(v: CellSpeedtestIngestReport) -> Self { + let received_timestamp = v.timestamp(); + let report: SpeedtestReqV1 = v.report.into(); + Self { + received_timestamp, + report: Some(report), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 51590b75d..de233a5ee 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -11,7 +11,6 @@ use file_store::{ mobile_transfer::ValidDataTransferSession, speedtest::CellSpeedtestIngestReport, FileStore, FileType, }; - use futures_util::TryFutureExt; use mobile_config::client::{AuthorizationClient, EntityClient, GatewayClient}; use price::PriceTracker; @@ -100,7 +99,7 @@ impl Cmd { .start(shutdown_listener.clone()) .await?; - let (valid_speedtests, mut valid_speedtests_server) = file_sink::FileSinkBuilder::new( + let (speedtests_avg, mut speedtests_avg_server) = file_sink::FileSinkBuilder::new( FileType::SpeedtestAvg, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"), @@ -112,11 +111,25 @@ impl Cmd { .create() .await?; + let (speedtests_validity, mut speedtests_validity_server) = + file_sink::FileSinkBuilder::new( + FileType::VerifiedSpeedtest, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "verified_speedtest"), + shutdown_listener.clone(), + ) + .deposits(Some(file_upload_tx.clone())) + .auto_commit(false) + .roll_time(Duration::minutes(15)) + .create() + .await?; + let speedtest_daemon = SpeedtestDaemon::new( pool.clone(), gateway_client.clone(), speedtests, - valid_speedtests, + speedtests_avg, + speedtests_validity, ); // Mobile rewards @@ -199,7 +212,8 @@ impl Cmd { tokio::try_join!( db_join_handle.map_err(Error::from), valid_heartbeats_server.run().map_err(Error::from), - valid_speedtests_server.run().map_err(Error::from), + speedtests_avg_server.run().map_err(Error::from), + speedtests_validity_server.run().map_err(Error::from), mobile_rewards_server.run().map_err(Error::from), file_upload.run(&shutdown_listener).map_err(Error::from), reward_manifests_server.run().map_err(Error::from), diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 2cf7864d7..5d6c96921 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -10,6 +10,10 @@ use futures::{ TryFutureExt, }; use helium_crypto::PublicKeyBinary; +use helium_proto::services::poc_mobile::{ + SpeedtestIngestReportV1, SpeedtestVerificationResult, + VerifiedSpeedtest as VerifiedSpeedtestProto, +}; use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient}; use sqlx::{postgres::PgRow, FromRow, Postgres, Row, Transaction}; use std::collections::HashMap; @@ -43,7 +47,8 @@ pub struct SpeedtestDaemon { pool: sqlx::Pool, gateway_client: GatewayClient, speedtests: Receiver>, - file_sink: FileSinkClient, + speedtest_avg_file_sink: FileSinkClient, + verified_speedtest_file_sink: FileSinkClient, } impl SpeedtestDaemon { @@ -51,13 +56,15 @@ impl SpeedtestDaemon { pool: sqlx::Pool, gateway_client: GatewayClient, speedtests: Receiver>, - file_sink: FileSinkClient, + speedtest_avg_file_sink: FileSinkClient, + verified_speedtest_file_sink: FileSinkClient, ) -> Self { Self { pool, gateway_client, speedtests, - file_sink, + speedtest_avg_file_sink, + verified_speedtest_file_sink, } } @@ -88,24 +95,63 @@ impl SpeedtestDaemon { let mut transaction = self.pool.begin().await?; let mut speedtests = file.into_stream(&mut transaction).await?; while let Some(speedtest_report) = speedtests.next().await { - let pubkey = speedtest_report.report.pubkey.clone(); - if self - .gateway_client - .resolve_gateway_info(&pubkey) - .await? - .is_some() - { + let result = self.validate_speedtest(&speedtest_report).await?; + if result == SpeedtestVerificationResult::SpeedtestValid { save_speedtest(&speedtest_report.report, &mut transaction).await?; - let latest_speedtests = - get_latest_speedtests_for_pubkey(&pubkey, &mut transaction).await?; + let latest_speedtests = get_latest_speedtests_for_pubkey( + &speedtest_report.report.pubkey, + &mut transaction, + ) + .await?; let average = SpeedtestAverage::from(&latest_speedtests); - average.write(&self.file_sink, latest_speedtests).await?; + average + .write(&self.speedtest_avg_file_sink, latest_speedtests) + .await?; } + // write out paper trail of speedtest validity + self.write_speedtest_validity(speedtest_report, result) + .await?; } - self.file_sink.commit().await?; + self.speedtest_avg_file_sink.commit().await?; + self.verified_speedtest_file_sink.commit().await?; transaction.commit().await?; Ok(()) } + + pub async fn validate_speedtest( + &self, + speedtest: &CellSpeedtestIngestReport, + ) -> anyhow::Result { + let pubkey = speedtest.report.pubkey.clone(); + if self + .gateway_client + .resolve_gateway_info(&pubkey) + .await? + .is_some() + { + return Ok(SpeedtestVerificationResult::SpeedtestValid); + } else { + return Ok(SpeedtestVerificationResult::SpeedtestGatewayNotFound); + } + } + + pub async fn write_speedtest_validity( + &self, + speedtest_report: CellSpeedtestIngestReport, + result: SpeedtestVerificationResult, + ) -> anyhow::Result<()> { + let ingest_report: SpeedtestIngestReportV1 = speedtest_report.try_into()?; + let timestamp: u64 = Utc::now().timestamp_millis() as u64; + let proto = VerifiedSpeedtestProto { + report: Some(ingest_report), + result: result as i32, + timestamp, + }; + self.verified_speedtest_file_sink + .write(proto, &[("result", result.as_str_name())]) + .await?; + Ok(()) + } } pub async fn save_speedtest(