Skip to content

Commit

Permalink
Merge pull request #591 from helium/andymck/output-speedtest-validity
Browse files Browse the repository at this point in the history
write out verified status of speedtests to s3
  • Loading branch information
andymck authored Sep 14, 2023
2 parents b8e3169 + 4657c2a commit 3c653cd
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 23 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ sqlx = {version = "0", features = [
"macros",
"runtime-tokio-rustls"
]}

helium-crypto = {version = "0.8.0", features=["sqlx-postgres", "multisig"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
hextree = "*"
Expand Down
5 changes: 5 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub const SUBSCRIBER_LOCATION_INGEST_REPORT: &str = "subscriber_location_report"
pub const VERIFIED_SUBSCRIBER_LOCATION_INGEST_REPORT: &str = "verified_subscriber_location_report";
pub const CELL_HEARTBEAT: &str = "cell_heartbeat";
pub const CELL_SPEEDTEST: &str = "cell_speedtest";
pub const VERIFIED_SPEEDTEST: &str = "verified_speedtest";
pub const CELL_HEARTBEAT_INGEST_REPORT: &str = "heartbeat_report";
pub const CELL_SPEEDTEST_INGEST_REPORT: &str = "speedtest_report";
pub const ENTROPY: &str = "entropy";
Expand Down Expand Up @@ -175,6 +176,7 @@ pub enum FileType {
VerifiedSubscriberLocationIngestReport,
MapperMsg,
CoverageObjectIngestReport,
VerifiedSpeedtest,
}

impl fmt::Display for FileType {
Expand All @@ -187,6 +189,7 @@ impl fmt::Display for FileType {
}
Self::CellHeartbeat => CELL_HEARTBEAT,
Self::CellSpeedtest => CELL_SPEEDTEST,
Self::VerifiedSpeedtest => VERIFIED_SPEEDTEST,
Self::CellHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT,
Self::CellSpeedtestIngestReport => CELL_SPEEDTEST_INGEST_REPORT,
Self::Entropy => ENTROPY,
Expand Down Expand Up @@ -231,6 +234,7 @@ impl FileType {
}
Self::CellHeartbeat => CELL_HEARTBEAT,
Self::CellSpeedtest => CELL_SPEEDTEST,
Self::VerifiedSpeedtest => VERIFIED_SPEEDTEST,
Self::CellHeartbeatIngestReport => CELL_HEARTBEAT_INGEST_REPORT,
Self::CellSpeedtestIngestReport => CELL_SPEEDTEST_INGEST_REPORT,
Self::Entropy => ENTROPY,
Expand Down Expand Up @@ -275,6 +279,7 @@ impl FromStr for FileType {
}
CELL_HEARTBEAT => Self::CellHeartbeat,
CELL_SPEEDTEST => Self::CellSpeedtest,
VERIFIED_SPEEDTEST => Self::VerifiedSpeedtest,
CELL_HEARTBEAT_INGEST_REPORT => Self::CellHeartbeatIngestReport,
CELL_SPEEDTEST_INGEST_REPORT => Self::CellSpeedtestIngestReport,
ENTROPY => Self::Entropy,
Expand Down
11 changes: 11 additions & 0 deletions file_store/src/speedtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ impl TryFrom<SpeedtestIngestReportV1> for CellSpeedtestIngestReport {
}
}

impl From<CellSpeedtestIngestReport> for SpeedtestIngestReportV1 {
fn from(v: CellSpeedtestIngestReport) -> Self {
let received_timestamp = v.timestamp();
let report: SpeedtestReqV1 = v.report.into();
Self {
received_timestamp,
report: Some(report),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 3 additions & 1 deletion iot_verifier/src/poc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ fn verify_beacon_payload(
region: beacon_region,
params: region_params.to_owned(),
gain: Decimal::new(gain as i64, 1),
timestamp: 0,
};
// generate a gateway rs beacon from the generated entropy and the beaconers region data
let generated_beacon = generate_beacon(
Expand Down Expand Up @@ -996,7 +997,7 @@ mod tests {
let region: ProtoRegion = ProtoRegion::Eu868;

let region_params =
beacon::RegionParams::from_bytes(region.into(), gain as u64, EU868_PARAMS)
beacon::RegionParams::from_bytes(region.into(), gain as u64, EU868_PARAMS, 0)
.expect("region params");

let generated_beacon = generate_beacon(
Expand Down Expand Up @@ -1765,6 +1766,7 @@ mod tests {
ProtoRegion::Eu868.into(),
BEACONER_GAIN,
EU868_PARAMS,
0,
)
.unwrap();
region_params.params
Expand Down
22 changes: 18 additions & 4 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use file_store::{
mobile_transfer::ValidDataTransferSession, speedtest::CellSpeedtestIngestReport, FileStore,
FileType,
};

use futures_util::TryFutureExt;
use mobile_config::client::{AuthorizationClient, EntityClient, GatewayClient};
use price::PriceTracker;
Expand Down Expand Up @@ -94,7 +93,7 @@ impl Cmd {
.create()?;
let speedtests_join_handle = speedtests_server.start(shutdown_listener.clone()).await?;

let (valid_speedtests, valid_speedtests_server) = file_sink::FileSinkBuilder::new(
let (speedtests_avg, speedtests_avg_server) = file_sink::FileSinkBuilder::new(
FileType::SpeedtestAvg,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"),
Expand All @@ -105,11 +104,23 @@ impl Cmd {
.create()
.await?;

let (speedtests_validity, speedtests_validity_server) = file_sink::FileSinkBuilder::new(
FileType::VerifiedSpeedtest,
store_base_path,
concat!(env!("CARGO_PKG_NAME"), "verified_speedtest"),
)
.deposits(Some(file_upload_tx.clone()))
.auto_commit(false)
.roll_time(Duration::minutes(15))
.create()
.await?;

let speedtest_daemon = SpeedtestDaemon::new(
pool.clone(),
gateway_client.clone(),
speedtests,
valid_speedtests,
speedtests_avg,
speedtests_validity,
);

// Mobile rewards
Expand Down Expand Up @@ -192,7 +203,10 @@ impl Cmd {
valid_heartbeats_server
.run(shutdown_listener.clone())
.map_err(Error::from),
valid_speedtests_server
speedtests_avg_server
.run(shutdown_listener.clone())
.map_err(Error::from),
speedtests_validity_server
.run(shutdown_listener.clone())
.map_err(Error::from),
mobile_rewards_server
Expand Down
74 changes: 60 additions & 14 deletions mobile_verifier/src/speedtests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ use futures::{
TryFutureExt,
};
use helium_crypto::PublicKeyBinary;
use helium_proto::services::poc_mobile::{
SpeedtestIngestReportV1, SpeedtestVerificationResult,
VerifiedSpeedtest as VerifiedSpeedtestProto,
};
use mobile_config::{gateway_info::GatewayInfoResolver, GatewayClient};
use sqlx::{postgres::PgRow, FromRow, Postgres, Row, Transaction};
use std::collections::HashMap;
Expand Down Expand Up @@ -43,21 +47,24 @@ pub struct SpeedtestDaemon {
pool: sqlx::Pool<sqlx::Postgres>,
gateway_client: GatewayClient,
speedtests: Receiver<FileInfoStream<CellSpeedtestIngestReport>>,
file_sink: FileSinkClient,
speedtest_avg_file_sink: FileSinkClient,
verified_speedtest_file_sink: FileSinkClient,
}

impl SpeedtestDaemon {
pub fn new(
pool: sqlx::Pool<sqlx::Postgres>,
gateway_client: GatewayClient,
speedtests: Receiver<FileInfoStream<CellSpeedtestIngestReport>>,
file_sink: FileSinkClient,
speedtest_avg_file_sink: FileSinkClient,
verified_speedtest_file_sink: FileSinkClient,
) -> Self {
Self {
pool,
gateway_client,
speedtests,
file_sink,
speedtest_avg_file_sink,
verified_speedtest_file_sink,
}
}

Expand Down Expand Up @@ -89,24 +96,63 @@ impl SpeedtestDaemon {
let mut transaction = self.pool.begin().await?;
let mut speedtests = file.into_stream(&mut transaction).await?;
while let Some(speedtest_report) = speedtests.next().await {
let pubkey = &speedtest_report.report.pubkey;
if self
.gateway_client
.resolve_gateway_info(pubkey)
.await?
.is_some()
{
let result = self.validate_speedtest(&speedtest_report).await?;
if result == SpeedtestVerificationResult::SpeedtestValid {
save_speedtest(&speedtest_report.report, &mut transaction).await?;
let latest_speedtests =
get_latest_speedtests_for_pubkey(pubkey, &mut transaction).await?;
let latest_speedtests = get_latest_speedtests_for_pubkey(
&speedtest_report.report.pubkey,
&mut transaction,
)
.await?;
let average = SpeedtestAverage::from(&latest_speedtests);
average.write(&self.file_sink, latest_speedtests).await?;
average
.write(&self.speedtest_avg_file_sink, latest_speedtests)
.await?;
}
// write out paper trail of speedtest validity
self.write_verified_speedtest(speedtest_report, result)
.await?;
}
self.file_sink.commit().await?;
self.speedtest_avg_file_sink.commit().await?;
self.verified_speedtest_file_sink.commit().await?;
transaction.commit().await?;
Ok(())
}

pub async fn validate_speedtest(
&self,
speedtest: &CellSpeedtestIngestReport,
) -> anyhow::Result<SpeedtestVerificationResult> {
let pubkey = speedtest.report.pubkey.clone();
if self
.gateway_client
.resolve_gateway_info(&pubkey)
.await?
.is_some()
{
Ok(SpeedtestVerificationResult::SpeedtestValid)
} else {
Ok(SpeedtestVerificationResult::SpeedtestGatewayNotFound)
}
}

pub async fn write_verified_speedtest(
&self,
speedtest_report: CellSpeedtestIngestReport,
result: SpeedtestVerificationResult,
) -> anyhow::Result<()> {
let ingest_report: SpeedtestIngestReportV1 = speedtest_report.try_into()?;
let timestamp: u64 = Utc::now().timestamp_millis() as u64;
let proto = VerifiedSpeedtestProto {
report: Some(ingest_report),
result: result as i32,
timestamp,
};
self.verified_speedtest_file_sink
.write(proto, &[("result", result.as_str_name())])
.await?;
Ok(())
}
}

pub async fn save_speedtest(
Expand Down

0 comments on commit 3c653cd

Please sign in to comment.