From 59595dc76d4a3741f46f238b8140ab96d9431b6f Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Tue, 8 Oct 2024 12:43:29 -0700 Subject: [PATCH] Verified data transfer (#872) * Replace invalid-data-transfer-session with verified-data-transfer-session Verified meaning we know about the entities in the report. Even data sessions that have no rewardable data are written. They will not, however, be put into the database for rewarding. * point at updated proto for new verified data report * More tests for data transfer reports * remove assert for extreme edgecase in service provider rewards --- Cargo.lock | 56 ++-- Cargo.toml | 4 +- file_store/src/file_info.rs | 4 + file_store/src/mobile_session.rs | 30 ++- file_store/src/traits/file_sink_write.rs | 5 + mobile_packet_verifier/src/accumulate.rs | 246 ++++++++++++------ mobile_packet_verifier/src/daemon.rs | 14 +- .../src/service_provider/reward.rs | 10 +- 8 files changed, 261 insertions(+), 108 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 983472175..93ea6d33e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,17 +1617,17 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#197ff9c6cde7dc0d8334d6b4e27c58779e6a7ce0" +source = "git+https://github.com/helium/proto?branch=mj/verified-data-transfer#d8562d29307abbfc9f92a8c8a2b493336981de3f" dependencies = [ "base64 0.21.7", "byteorder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "prost", "rand 0.8.5", "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.8", + "sha2 0.9.9", "thiserror", ] @@ -1791,7 +1791,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "http 0.2.11", "http-serde", "humantime-serde", @@ -2642,7 +2642,7 @@ dependencies = [ "axum 0.7.4", "bs58 0.4.0", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "http 0.2.11", "notify", "serde", @@ -3242,7 +3242,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "hex-literal", "http 0.2.11", "lazy_static", @@ -3855,7 +3855,7 @@ dependencies = [ "h3o", "helium-anchor-gen 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git?branch=main)", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=master)", "hex", "hex-literal", "itertools", @@ -3895,6 +3895,22 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "helium-proto" +version = "0.1.0" +source = "git+https://github.com/helium/proto?branch=mj/verified-data-transfer#d8562d29307abbfc9f92a8c8a2b493336981de3f" +dependencies = [ + "bytes", + "prost", + "prost-build", + "serde", + "serde_json", + "strum", + "strum_macros", + "tonic", + "tonic-build", +] + [[package]] name = "helium-sub-daos" version = "0.1.8" @@ -3945,7 +3961,7 @@ dependencies = [ "async-trait", "chrono", "derive_builder", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "hextree", "rust_decimal", "rust_decimal_macros", @@ -4370,7 +4386,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "http 0.2.11", "humantime-serde", "metrics", @@ -4439,7 +4455,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "hextree", "http 0.2.11", "http-serde", @@ -4481,7 +4497,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "http 0.2.11", "http-serde", "humantime-serde", @@ -4523,7 +4539,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "http-serde", "humantime-serde", "iot-config", @@ -5129,7 +5145,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "hextree", "http 0.2.11", "http-serde", @@ -5169,7 +5185,7 @@ dependencies = [ "futures", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "mobile-config", "prost", "rand 0.8.5", @@ -5214,7 +5230,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "http 0.2.11", "http-serde", "humantime-serde", @@ -5258,7 +5274,7 @@ dependencies = [ "futures-util", "h3o", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "hex-assignments", "hextree", "http-serde", @@ -5942,7 +5958,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "http 0.2.11", "hyper 0.14.28", "jsonrpsee", @@ -6025,7 +6041,7 @@ dependencies = [ "futures-util", "helium-anchor-gen 0.1.0 (git+https://github.com/helium/helium-anchor-gen.git)", "helium-lib", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -6150,7 +6166,7 @@ dependencies = [ "custom-tracing", "file-store", "futures", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "humantime-serde", "metrics", "metrics-exporter-prometheus", @@ -6702,7 +6718,7 @@ dependencies = [ "futures", "futures-util", "helium-crypto", - "helium-proto", + "helium-proto 0.1.0 (git+https://github.com/helium/proto?branch=mj/verified-data-transfer)", "humantime-serde", "lazy_static", "metrics", diff --git a/Cargo.toml b/Cargo.toml index d891570a3..f7ab85bc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,10 +71,10 @@ helium-lib = { git = "https://github.com/helium/helium-wallet-rs.git", branch = hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [ "disktree", ] } -helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [ +helium-proto = { git = "https://github.com/helium/proto", branch = "mj/verified-data-transfer", features = [ "services", ] } -beacon = { git = "https://github.com/helium/proto", branch = "master" } +beacon = { git = "https://github.com/helium/proto", branch = "mj/verified-data-transfer" } solana-client = "1.18" solana-sdk = "1.18" solana-program = "1.18" diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index ea330c0a3..d0f824432 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -146,6 +146,7 @@ pub const DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "data_transfer_session_ing pub const INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT: &str = "invalid_data_transfer_session_ingest_report"; pub const VALID_DATA_TRANSFER_SESSION: &str = "valid_data_transfer_session"; +pub const VERIFIED_DATA_TRANSFER_SESSION: &str = "verified_data_transfer_session"; pub const PRICE_REPORT: &str = "price_report"; pub const MOBILE_REWARD_SHARE: &str = "mobile_reward_share"; pub const MAPPER_MSG: &str = "mapper_msg"; @@ -196,6 +197,7 @@ pub enum FileType { DataTransferSessionIngestReport, InvalidDataTransferSessionIngestReport, ValidDataTransferSession, + VerifiedDataTransferSession, PriceReport, MobileRewardShare, SubscriberLocationReq, @@ -276,6 +278,7 @@ impl fmt::Display for FileType { INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT } Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION, + Self::VerifiedDataTransferSession => VERIFIED_DATA_TRANSFER_SESSION, Self::PriceReport => PRICE_REPORT, Self::MobileRewardShare => MOBILE_REWARD_SHARE, Self::MapperMsg => MAPPER_MSG, @@ -353,6 +356,7 @@ impl FileType { INVALID_DATA_TRANSFER_SESSION_INGEST_REPORT } Self::ValidDataTransferSession => VALID_DATA_TRANSFER_SESSION, + Self::VerifiedDataTransferSession => VERIFIED_DATA_TRANSFER_SESSION, Self::PriceReport => PRICE_REPORT, Self::MobileRewardShare => MOBILE_REWARD_SHARE, Self::MapperMsg => MAPPER_MSG, diff --git a/file_store/src/mobile_session.rs b/file_store/src/mobile_session.rs index d5f1eda6c..e5b70e773 100644 --- a/file_store/src/mobile_session.rs +++ b/file_store/src/mobile_session.rs @@ -7,8 +7,9 @@ use chrono::{DateTime, Utc}; use helium_crypto::PublicKeyBinary; use helium_proto::services::poc_mobile::{ invalid_data_transfer_ingest_report_v1::DataTransferIngestReportStatus, - DataTransferEvent as DataTransferEventProto, DataTransferRadioAccessTechnology, - DataTransferSessionIngestReportV1, DataTransferSessionReqV1, InvalidDataTransferIngestReportV1, + verified_data_transfer_ingest_report_v1, DataTransferEvent as DataTransferEventProto, + DataTransferRadioAccessTechnology, DataTransferSessionIngestReportV1, DataTransferSessionReqV1, + InvalidDataTransferIngestReportV1, VerifiedDataTransferIngestReportV1, }; use serde::Serialize; @@ -112,6 +113,31 @@ impl From for InvalidDataTransferIngestReportV1 } } +#[derive(Serialize, Clone, Debug)] +pub struct VerifiedDataTransferIngestReport { + pub report: DataTransferSessionIngestReport, + pub status: verified_data_transfer_ingest_report_v1::ReportStatus, + pub timestamp: DateTime, +} + +impl MsgTimestamp for VerifiedDataTransferIngestReport { + fn timestamp(&self) -> u64 { + self.timestamp.encode_timestamp_millis() + } +} + +impl From for VerifiedDataTransferIngestReportV1 { + fn from(v: VerifiedDataTransferIngestReport) -> Self { + let timestamp = v.timestamp(); + let report: DataTransferSessionIngestReportV1 = v.report.into(); + Self { + report: Some(report), + status: v.status as i32, + timestamp, + } + } +} + #[derive(Serialize, Clone, Debug)] pub struct DataTransferEvent { pub pub_key: PublicKeyBinary, diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index 651ebe080..54ea53100 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -163,6 +163,11 @@ impl_file_sink!( FileType::InvalidDataTransferSessionIngestReport.to_str(), "invalid_data_transfer_session" ); +impl_file_sink!( + poc_mobile::VerifiedDataTransferIngestReportV1, + FileType::VerifiedDataTransferSession.to_str(), + "verified_data_transfer_session" +); impl_file_sink!( poc_mobile::InvalidatedRadioThresholdIngestReportV1, FileType::InvalidatedRadioThresholdIngestReport.to_str(), diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index 68697b945..88947194a 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -1,12 +1,11 @@ use chrono::{DateTime, Utc}; use file_store::file_sink::FileSinkClient; use file_store::mobile_session::{ - DataTransferSessionIngestReport, InvalidDataTransferIngestReport, + DataTransferSessionIngestReport, VerifiedDataTransferIngestReport, }; use futures::{Stream, StreamExt}; use helium_proto::services::poc_mobile::{ - invalid_data_transfer_ingest_report_v1::DataTransferIngestReportStatus, - InvalidDataTransferIngestReportV1, + verified_data_transfer_ingest_report_v1::ReportStatus, VerifiedDataTransferIngestReportV1, }; use sqlx::{Postgres, Transaction}; @@ -15,7 +14,7 @@ use crate::{event_ids, MobileConfigResolverExt}; pub async fn accumulate_sessions( mobile_config: &impl MobileConfigResolverExt, conn: &mut Transaction<'_, Postgres>, - invalid_data_session_report_sink: &FileSinkClient, + verified_data_session_report_sink: &FileSinkClient, curr_file_ts: DateTime, reports: impl Stream, ) -> anyhow::Result<()> { @@ -23,12 +22,21 @@ pub async fn accumulate_sessions( while let Some(report) = reports.next().await { let report_validity = verify_report(conn, mobile_config, &report).await?; - if report_validity != DataTransferIngestReportStatus::Valid { - // If the reward has been cancelled or it fails verification checks then skip - // the report and write it out to s3 as invalid - write_invalid_report(invalid_data_session_report_sink, report_validity, report).await?; + write_verified_report( + verified_data_session_report_sink, + report_validity, + report.clone(), + ) + .await?; + + if report_validity != ReportStatus::Valid { continue; } + + if report.report.rewardable_bytes == 0 { + continue; + } + let event = report.report.data_transfer_usage; sqlx::query( r#" @@ -58,27 +66,23 @@ async fn verify_report( txn: &mut Transaction<'_, Postgres>, mobile_config: &impl MobileConfigResolverExt, report: &DataTransferSessionIngestReport, -) -> anyhow::Result { - if report.report.rewardable_bytes == 0 { - return Ok(DataTransferIngestReportStatus::Cancelled); - } - +) -> anyhow::Result { if is_duplicate(txn, report).await? { - return Ok(DataTransferIngestReportStatus::Duplicate); + return Ok(ReportStatus::Duplicate); } let gw_pub_key = &report.report.data_transfer_usage.pub_key; let routing_pub_key = &report.report.pub_key; if !mobile_config.is_gateway_known(gw_pub_key).await { - return Ok(DataTransferIngestReportStatus::InvalidGatewayKey); + return Ok(ReportStatus::InvalidGatewayKey); } if !mobile_config.is_routing_key_known(routing_pub_key).await { - return Ok(DataTransferIngestReportStatus::InvalidRoutingKey); + return Ok(ReportStatus::InvalidRoutingKey); } - Ok(DataTransferIngestReportStatus::Valid) + Ok(ReportStatus::Valid) } async fn is_duplicate( @@ -93,20 +97,20 @@ async fn is_duplicate( .await } -async fn write_invalid_report( - invalid_data_session_report_sink: &FileSinkClient, - reason: DataTransferIngestReportStatus, +async fn write_verified_report( + verified_data_session_report_sink: &FileSinkClient, + status: ReportStatus, report: DataTransferSessionIngestReport, ) -> Result<(), file_store::Error> { - let proto: InvalidDataTransferIngestReportV1 = InvalidDataTransferIngestReport { + let proto: VerifiedDataTransferIngestReportV1 = VerifiedDataTransferIngestReport { report, - reason, + status, timestamp: Utc::now(), } .into(); - invalid_data_session_report_sink - .write(proto, &[("reason", reason.as_str_name())]) + verified_data_session_report_sink + .write(proto, &[("status", status.as_str_name())]) .await?; Ok(()) } @@ -125,16 +129,41 @@ mod tests { use super::*; - struct MockResolver; + struct MockResolver { + gateway_known: bool, + routing_key_known: bool, + } + + impl MockResolver { + fn new() -> Self { + Self { + gateway_known: true, + routing_key_known: true, + } + } + + fn unknown_gateway(self) -> Self { + Self { + gateway_known: false, + routing_key_known: self.routing_key_known, + } + } + fn unknown_routing_key(self) -> Self { + Self { + gateway_known: self.gateway_known, + routing_key_known: false, + } + } + } #[async_trait::async_trait] impl MobileConfigResolverExt for MockResolver { async fn is_gateway_known(&self, _public_key: &PublicKeyBinary) -> bool { - true + self.gateway_known } async fn is_routing_key_known(&self, _public_key: &PublicKeyBinary) -> bool { - true + self.routing_key_known } } @@ -146,7 +175,7 @@ mod tests { let invalid_data_session_report_sink = FileSinkClient::new(tx, "test"); accumulate_sessions( - &MockResolver, + &MockResolver::new(), &mut txn, &invalid_data_session_report_sink, Utc::now(), @@ -169,33 +198,18 @@ mod tests { } #[sqlx::test] - async fn accumulate_writes_zero_data_event_as_invalid(pool: PgPool) -> anyhow::Result<()> { + async fn accumulate_writes_zero_data_event_as_verified_but_not_for_burning( + pool: PgPool, + ) -> anyhow::Result<()> { let mut txn = pool.begin().await?; let (tx, mut rx) = tokio::sync::mpsc::channel(10); let invalid_data_session_report_sink = FileSinkClient::new(tx, "test"); - let report = DataTransferSessionIngestReport { - report: DataTransferSessionReq { - data_transfer_usage: DataTransferEvent { - pub_key: vec![0].into(), - upload_bytes: 0, - download_bytes: 0, - radio_access_technology: DataTransferRadioAccessTechnology::Wlan, - event_id: "test".to_string(), - payer: vec![0].into(), - timestamp: Utc::now(), - signature: vec![], - }, - rewardable_bytes: 0, - pub_key: vec![0].into(), - signature: vec![], - }, - received_timestamp: Utc::now(), - }; + let report = make_data_transfer_with_rewardable_bytes(0); accumulate_sessions( - &MockResolver, + &MockResolver::new(), &mut txn, &invalid_data_session_report_sink, Utc::now(), @@ -205,11 +219,14 @@ mod tests { txn.commit().await?; - // single record written to invalid sink - match rx.try_recv() { - Ok(_) => (), - other => panic!("unexpected: {other:?}"), - } + // single record written to verified sink + rx.assert_not_empty()?; + + let sessions: Vec = + sqlx::query_as("SELECT * from data_transfer_sessions") + .fetch_all(&pool) + .await?; + assert!(sessions.is_empty()); Ok(()) } @@ -221,27 +238,10 @@ mod tests { let (tx, mut rx) = tokio::sync::mpsc::channel(10); let invalid_data_session_report_sink = FileSinkClient::new(tx, "test"); - let report = DataTransferSessionIngestReport { - report: DataTransferSessionReq { - data_transfer_usage: DataTransferEvent { - pub_key: vec![0].into(), - upload_bytes: 1, - download_bytes: 2, - radio_access_technology: DataTransferRadioAccessTechnology::Wlan, - event_id: "test".to_string(), - payer: vec![0].into(), - timestamp: Utc::now(), - signature: vec![], - }, - rewardable_bytes: 3, - pub_key: vec![0].into(), - signature: vec![], - }, - received_timestamp: Utc::now(), - }; + let report = make_data_transfer_with_rewardable_bytes(3); accumulate_sessions( - &MockResolver, + &MockResolver::new(), &mut txn, &invalid_data_session_report_sink, Utc::now(), @@ -251,8 +251,8 @@ mod tests { txn.commit().await?; - // no records written to invalid sink - rx.assert_is_empty()?; + // record written as verified + rx.assert_not_empty()?; let sessions: Vec = sqlx::query_as("SELECT * from data_transfer_sessions") @@ -263,15 +263,111 @@ mod tests { Ok(()) } + #[sqlx::test] + async fn unknown_gateway(pool: PgPool) -> anyhow::Result<()> { + let mut txn = pool.begin().await?; + + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + let invalid_data_session_report_sink = FileSinkClient::new(tx, "test"); + + let report = make_data_transfer_with_rewardable_bytes(3); + + accumulate_sessions( + &MockResolver::new().unknown_gateway(), + &mut txn, + &invalid_data_session_report_sink, + Utc::now(), + futures::stream::iter(vec![report]), + ) + .await?; + + txn.commit().await?; + + // record written as verified + rx.assert_not_empty()?; + + let sessions: Vec = + sqlx::query_as("SELECT * from data_transfer_sessions") + .fetch_all(&pool) + .await?; + assert!(sessions.is_empty()); + + Ok(()) + } + + #[sqlx::test] + fn unknown_routing_key(pool: PgPool) -> anyhow::Result<()> { + let mut txn = pool.begin().await?; + + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + let invalid_data_session_report_sink = FileSinkClient::new(tx, "test"); + + let report = make_data_transfer_with_rewardable_bytes(3); + + accumulate_sessions( + &MockResolver::new().unknown_routing_key(), + &mut txn, + &invalid_data_session_report_sink, + Utc::now(), + futures::stream::iter(vec![report]), + ) + .await?; + + txn.commit().await?; + + // record written as verified + rx.assert_not_empty()?; + + let sessions: Vec = + sqlx::query_as("SELECT * from data_transfer_sessions") + .fetch_all(&pool) + .await?; + assert!(sessions.is_empty()); + + Ok(()) + } + + fn make_data_transfer_with_rewardable_bytes( + rewardable_bytes: u64, + ) -> DataTransferSessionIngestReport { + DataTransferSessionIngestReport { + report: DataTransferSessionReq { + data_transfer_usage: DataTransferEvent { + pub_key: vec![0].into(), + upload_bytes: 1, + download_bytes: 2, + radio_access_technology: DataTransferRadioAccessTechnology::Wlan, + event_id: "test".to_string(), + payer: vec![0].into(), + timestamp: Utc::now(), + signature: vec![], + }, + rewardable_bytes, + pub_key: vec![0].into(), + signature: vec![], + }, + received_timestamp: Utc::now(), + } + } + trait ChannelExt { + fn assert_not_empty(&mut self) -> anyhow::Result<()>; fn assert_is_empty(&mut self) -> anyhow::Result<()>; } impl ChannelExt for tokio::sync::mpsc::Receiver { + fn assert_not_empty(&mut self) -> anyhow::Result<()> { + match self.try_recv() { + Ok(_) => (), + other => panic!("expected message in channel: {other:?}"), + } + Ok(()) + } + fn assert_is_empty(&mut self) -> anyhow::Result<()> { match self.try_recv() { Err(tokio::sync::mpsc::error::TryRecvError::Empty) => (), - other => panic!("unexpected message: {other:?}"), + other => panic!("expected channel to be empty: {other:?}"), } Ok(()) } diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 271dc8f7b..6a7a8853f 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -14,7 +14,7 @@ use file_store::{ }; use helium_proto::services::{ - packet_verifier::ValidDataTransferSession, poc_mobile::InvalidDataTransferIngestReportV1, + packet_verifier::ValidDataTransferSession, poc_mobile::VerifiedDataTransferIngestReportV1, }; use solana::burn::{SolanaNetwork, SolanaRpc}; use sqlx::{Pool, Postgres}; @@ -31,7 +31,7 @@ pub struct Daemon { burn_period: Duration, min_burn_period: Duration, mobile_config_resolver: MCR, - invalid_data_session_report_sink: FileSinkClient, + verified_data_session_report_sink: FileSinkClient, } impl Daemon { @@ -41,7 +41,7 @@ impl Daemon { reports: Receiver>, burner: Burner, mobile_config_resolver: MCR, - invalid_data_session_report_sink: FileSinkClient, + verified_data_session_report_sink: FileSinkClient, ) -> Self { Self { pool, @@ -50,7 +50,7 @@ impl Daemon { burn_period: settings.burn_period, min_burn_period: settings.min_burn_period, mobile_config_resolver, - invalid_data_session_report_sink, + verified_data_session_report_sink, } } } @@ -86,9 +86,9 @@ where let ts = file.file_info.timestamp; let mut transaction = self.pool.begin().await?; let reports = file.into_stream(&mut transaction).await?; - crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.invalid_data_session_report_sink, ts, reports).await?; + crate::accumulate::accumulate_sessions(&self.mobile_config_resolver, &mut transaction, &self.verified_data_session_report_sink, ts, reports).await?; transaction.commit().await?; - self.invalid_data_session_report_sink.commit().await?; + self.verified_data_session_report_sink.commit().await?; }, _ = sleep_until(burn_time) => { // It's time to burn @@ -145,7 +145,7 @@ impl Cmd { .await?; let (invalid_sessions, invalid_sessions_server) = - InvalidDataTransferIngestReportV1::file_sink( + VerifiedDataTransferIngestReportV1::file_sink( store_base_path, file_upload.clone(), FileSinkCommitStrategy::Manual, diff --git a/mobile_verifier/src/service_provider/reward.rs b/mobile_verifier/src/service_provider/reward.rs index da460d39b..ae7a85531 100644 --- a/mobile_verifier/src/service_provider/reward.rs +++ b/mobile_verifier/src/service_provider/reward.rs @@ -697,8 +697,14 @@ mod tests { epoch ); - let total_perc= sp_infos.total_percent(); - prop_assert!(total_perc <= dec!(1)); + // NOTE: This can be a sanity check when debugging. There are cases + // generated where the total percentage is + // 1.0000000000000000000000000001%, but as long as we don't + // allocated more than what is available, this is okay. + + // let total_perc = sp_infos.total_percent(); + // println!("total_perc: {}", total_perc); + // prop_assert!(total_perc <= dec!(1)); let mut allocated = dec!(0); for (amount, _) in sp_infos.iter_rewards() {