From cf527c143dc25ce85d313cf4773022b95faacee0 Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Tue, 25 Jul 2023 14:34:42 -0400 Subject: [PATCH] Remove code that isn't part of phase 1 --- .../15_heartbeat_coverage_object.sql | 2 + .../migrations/15_modeled_coverage.sql | 27 - mobile_verifier/src/cli/reward_from_db.rs | 5 +- mobile_verifier/src/coverage.rs | 6 + mobile_verifier/src/heartbeats.rs | 52 +- mobile_verifier/src/reward_shares.rs | 516 ++++-------------- mobile_verifier/src/rewarder.rs | 10 +- 7 files changed, 145 insertions(+), 473 deletions(-) create mode 100644 mobile_verifier/migrations/15_heartbeat_coverage_object.sql delete mode 100644 mobile_verifier/migrations/15_modeled_coverage.sql diff --git a/mobile_verifier/migrations/15_heartbeat_coverage_object.sql b/mobile_verifier/migrations/15_heartbeat_coverage_object.sql new file mode 100644 index 000000000..f81b957e7 --- /dev/null +++ b/mobile_verifier/migrations/15_heartbeat_coverage_object.sql @@ -0,0 +1,2 @@ +-- Coverage object can be NULL +ALTER TABLE heartbeats ADD COLUMN coverage_object UUID; diff --git a/mobile_verifier/migrations/15_modeled_coverage.sql b/mobile_verifier/migrations/15_modeled_coverage.sql deleted file mode 100644 index b4aa717fe..000000000 --- a/mobile_verifier/migrations/15_modeled_coverage.sql +++ /dev/null @@ -1,27 +0,0 @@ -CREATE TYPE signal_level AS ENUM ( - 'none', - 'low', - 'medium', - 'high' -); - -CREATE TABLE hex_coverage ( - uuid UUID NOT NULL, - hex BIGINT NOT NULL, - indoor BOOLEAN NOT NULL, - cbsd_id TEXT NOT NULL, - signal_level signal_level NOT NULL, - coverage_claim_time TIMESTAMPTZ NOT NULL, - inserted_at TIMESTAMPTZ NOT NULL, - PRIMARY KEY (uuid, hex) -); - -CREATE TABLE seniority ( - cbsd_id TEXT NOT NULL, - seniority_ts TIMESTAMPTZ NOT NULL, - last_heartbeat TIMESTAMPTZ NOT NULL, - uuid UUID NOT NULL, - PRIMARY KEY (cbsd_id, seniority_ts) -); - -ALTER TABLE heartbeats ADD COLUMN coverage_object UUID; diff --git a/mobile_verifier/src/cli/reward_from_db.rs b/mobile_verifier/src/cli/reward_from_db.rs index bb7a1e3de..0246cc80f 100644 --- a/mobile_verifier/src/cli/reward_from_db.rs +++ b/mobile_verifier/src/cli/reward_from_db.rs @@ -1,6 +1,6 @@ use crate::{ heartbeats::HeartbeatReward, - reward_shares::{get_scheduled_tokens_for_poc_and_dc, CoveragePoints}, + reward_shares::{get_scheduled_tokens_for_poc_and_dc, PocShares}, speedtests::{Average, SpeedtestAverages}, Settings, }; @@ -40,8 +40,7 @@ impl Cmd { let heartbeats = HeartbeatReward::validated(&pool, &epoch); let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?; - let reward_shares = - CoveragePoints::aggregate_points(&pool, heartbeats, speedtests.clone(), end).await?; + let reward_shares = PocShares::aggregate(heartbeats, speedtests.clone()).await?; let mut total_rewards = 0_u64; let mut owner_rewards = HashMap::<_, u64>::new(); diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index b27900d57..2bfc30821 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -231,6 +231,7 @@ pub struct HexCoverage { } #[derive(Eq)] +#[allow(dead_code)] struct CoverageLevel { cbsd_id: String, coverage_claim_time: DateTime, @@ -258,6 +259,7 @@ impl Ord for CoverageLevel { } impl CoverageLevel { + #[allow(dead_code)] fn coverage_points(&self) -> anyhow::Result { Ok(match (self.indoor, self.signal_level) { (true, SignalLevel::High) => dec!(400), @@ -278,6 +280,7 @@ pub struct CoverageReward { pub hotspot: PublicKeyBinary, } +#[allow(dead_code)] pub const MAX_RADIOS_PER_HEX: usize = 5; #[async_trait::async_trait] @@ -363,11 +366,13 @@ impl CoveredHexStream for Pool { } #[derive(Default)] +#[allow(dead_code)] pub struct CoveredHexes { hexes: HashMap>; 2]>, } impl CoveredHexes { + #[allow(dead_code)] pub async fn aggregate_coverage( &mut self, hotspot: &PublicKeyBinary, @@ -402,6 +407,7 @@ impl CoveredHexes { } /// Returns the radios that should be rewarded for giving coverage. + #[allow(dead_code)] pub fn into_iter(self) -> impl Iterator { self.hexes .into_values() diff --git a/mobile_verifier/src/heartbeats.rs b/mobile_verifier/src/heartbeats.rs index 1cff04d4b..3c28a0101 100644 --- a/mobile_verifier/src/heartbeats.rs +++ b/mobile_verifier/src/heartbeats.rs @@ -26,21 +26,17 @@ use uuid::Uuid; #[derive(Debug, Clone, PartialEq, Eq, Hash, sqlx::FromRow)] pub struct HeartbeatKey { - coverage_object: Uuid, hotspot_key: PublicKeyBinary, cbsd_id: String, cell_type: CellType, - latest_timestamp: DateTime, } impl From for HeartbeatReward { fn from(value: HeartbeatKey) -> Self { Self { - coverage_object: value.coverage_object, hotspot_key: value.hotspot_key, cbsd_id: value.cbsd_id, reward_weight: value.cell_type.reward_weight(), - latest_timestamp: value.latest_timestamp, } } } @@ -135,18 +131,20 @@ impl HeartbeatDaemon { )); while let Some(heartbeat) = validated_heartbeats.next().await.transpose()? { - let coverage_claim_time = coverage_claim_time_cache - .fetch_coverage_claim_time( - &heartbeat.cbsd_id, - &heartbeat.coverage_object, - &mut transaction, - ) - .await?; + if heartbeat.coverage_object.is_some() { + let coverage_claim_time = coverage_claim_time_cache + .fetch_coverage_claim_time( + &heartbeat.cbsd_id, + &heartbeat.coverage_object, + &mut transaction, + ) + .await?; + heartbeat + .update_seniority(coverage_claim_time, &self.seniority_sink, &mut transaction) + .await?; + } heartbeat.write(&self.heartbeat_sink).await?; - heartbeat - .update_seniority(coverage_claim_time, &self.seniority_sink, &mut transaction) - .await?; let key = ( heartbeat.cbsd_id.clone(), @@ -170,11 +168,9 @@ impl HeartbeatDaemon { } pub struct HeartbeatReward { - pub coverage_object: Uuid, pub hotspot_key: PublicKeyBinary, pub cbsd_id: String, pub reward_weight: Decimal, - pub latest_timestamp: DateTime, } /// Minimum number of heartbeats required to give a reward to the hotspot. @@ -187,33 +183,18 @@ impl HeartbeatReward { ) -> impl Stream> + 'a { sqlx::query_as::<_, HeartbeatKey>( r#" - WITH coverage_objs AS ( - SELECT t1.cbsd_id, t1.coverage_object, t1.latest_timestamp - FROM heartbeats t1 - WHERE t1.latest_timestamp = ( - SELECT MAX(t2.latest_timestamp) - FROM heartbeats t2 - WHERE t2.cbsd_id = t1.cbsd_id - AND truncated_timestamp >= $1 - AND truncated_timestamp < $2 - ) - ) SELECT hotspot_key, heartbeats.cbsd_id, - cell_type, - coverage_objs.coverage_object, - coverage_objs.latest_timestamp, - FROM heartbeats - JOIN coverage_objs ON heartbeats.cbsd_id = coverage_objs.cbsd_id + cell_type + FROM + heartbeats WHERE truncated_timestamp >= $1 AND truncated_timestamp < $2 GROUP BY heartbeats.cbsd_id, hotspot_key, cell_type, - coverage_objs.coverage_object, - coverage_objs.latest_timestamp, HAVING count(*) >= $3 "#, ) @@ -459,7 +440,8 @@ async fn validate_heartbeat( } let Some(coverage_object) = heartbeat.report.coverage_object() else { - return Ok((cell_type, proto::HeartbeatValidity::BadCoverageObject)); + return Ok((cell_type, proto::HeartbeatValidity::Valid)); + // return Ok((cell_type, proto::HeartbeatValidity::BadCoverageObject)); }; let Some(coverage) = coverage_cache.fetch_coverage(&coverage_object).await? else { diff --git a/mobile_verifier/src/reward_shares.rs b/mobile_verifier/src/reward_shares.rs index eb0e65589..c59101e9c 100644 --- a/mobile_verifier/src/reward_shares.rs +++ b/mobile_verifier/src/reward_shares.rs @@ -1,5 +1,4 @@ use crate::{ - coverage::{CoverageReward, CoveredHexStream, CoveredHexes}, data_session::HotspotMap, heartbeats::HeartbeatReward, speedtests::{Average, SpeedtestAverages}, @@ -14,7 +13,8 @@ use helium_proto::services::poc_mobile as proto; use helium_proto::services::poc_mobile::mobile_reward_share::Reward as ProtoReward; use rust_decimal::prelude::*; use rust_decimal_macros::dec; -use std::{collections::HashMap, ops::Range}; +use std::collections::HashMap; +use std::ops::Range; /// Total tokens emissions pool per 365 days const TOTAL_EMISSIONS_POOL: Decimal = dec!(60_000_000_000_000_000); @@ -29,7 +29,7 @@ const DC_USD_PRICE: Decimal = dec!(0.00001); /// Default precision used for rounding const DEFAULT_PREC: u32 = 15; -/// Percent of total emissions allocated for mapper rewards +// Percent of total emissions allocated for mapper rewards const MAPPERS_REWARDS_PERCENT: Decimal = dec!(0.2); /// shares of the mappers pool allocated per eligble subscriber for discovery mapping @@ -58,7 +58,7 @@ impl TransferRewards { pub async fn from_transfer_sessions( mobile_bone_price: Decimal, transfer_sessions: HotspotMap, - hotspots: &CoveragePoints, + hotspots: &PocShares, epoch: &Range>, ) -> Self { let mut reward_sum = Decimal::ZERO; @@ -200,122 +200,60 @@ pub fn dc_to_mobile_bones(dc_amount: Decimal, mobile_bone_price: Decimal) -> Dec .round_dp_with_strategy(DEFAULT_PREC, RoundingStrategy::ToPositiveInfinity) } -#[derive(Debug)] -struct RadioPoints { - heartbeat_multiplier: Decimal, - points: Decimal, -} - -impl RadioPoints { - fn new(heartbeat_multiplier: Decimal) -> Self { - Self { - heartbeat_multiplier, - points: Decimal::ZERO, - } - } - - fn points(&self) -> Decimal { - (self.heartbeat_multiplier * self.points).max(Decimal::ZERO) - } -} - -#[derive(Debug, Default)] -struct HotspotPoints { - /// Points are multiplied by the multiplier to get shares. - /// Multiplier should never be zero. - speedtest_multiplier: Decimal, - radio_points: HashMap, -} - -impl HotspotPoints { - pub fn new(speedtest_multiplier: Decimal) -> Self { - Self { - speedtest_multiplier, - radio_points: HashMap::new(), - } - } +#[derive(Default)] +pub struct RadioShares { + radio_shares: HashMap, } -impl HotspotPoints { - pub fn total_points(&self) -> Decimal { - self.speedtest_multiplier - * self - .radio_points - .values() - .fold(Decimal::ZERO, |sum, radio| sum + radio.points()) +impl RadioShares { + fn total_shares(&self) -> Decimal { + self.radio_shares + .values() + .fold(Decimal::ZERO, |sum, amount| sum + amount) } } -#[derive(Debug)] -pub struct CoveragePoints { - coverage_points: HashMap, +#[derive(Default)] +pub struct PocShares { + pub hotspot_shares: HashMap, } -impl CoveragePoints { - pub async fn aggregate_points( - hex_streams: &impl CoveredHexStream, +impl PocShares { + pub async fn aggregate( heartbeats: impl Stream>, speedtests: SpeedtestAverages, - period_end: DateTime, ) -> Result { + let mut poc_shares = Self::default(); let mut heartbeats = std::pin::pin!(heartbeats); - let mut covered_hexes = CoveredHexes::default(); - let mut coverage_points = HashMap::new(); while let Some(heartbeat) = heartbeats.next().await.transpose()? { - let speedtest_multiplier = speedtests + let speedmultiplier = speedtests .get_average(&heartbeat.hotspot_key) .as_ref() .map_or(Decimal::ZERO, Average::reward_multiplier); - - if speedtest_multiplier.is_zero() { - continue; - } - - let covered_hex_stream = hex_streams - .covered_hex_stream(&heartbeat.cbsd_id, &heartbeat.coverage_object, period_end) - .await?; - covered_hexes - .aggregate_coverage(&heartbeat.hotspot_key, covered_hex_stream) - .await?; - coverage_points + *poc_shares + .hotspot_shares .entry(heartbeat.hotspot_key) - .or_insert_with(|| HotspotPoints::new(speedtest_multiplier)) - .radio_points - .insert(heartbeat.cbsd_id, RadioPoints::new(heartbeat.reward_weight)); + .or_default() + .radio_shares + .entry(heartbeat.cbsd_id) + .or_default() += heartbeat.reward_weight * speedmultiplier; } - - for CoverageReward { - cbsd_id, - points, - hotspot, - } in covered_hexes.into_iter() - { - // Guaranteed that points contains the given hotspot. - coverage_points - .get_mut(&hotspot) - .unwrap() - .radio_points - .get_mut(&cbsd_id) - .unwrap() - .points += points; - } - - Ok(Self { coverage_points }) + Ok(poc_shares) } pub fn is_valid(&self, hotspot: &PublicKeyBinary) -> bool { - if let Some(coverage_points) = self.coverage_points.get(hotspot) { - !coverage_points.total_points().is_zero() + if let Some(shares) = self.hotspot_shares.get(hotspot) { + !shares.total_shares().is_zero() } else { false } } pub fn total_shares(&self) -> Decimal { - self.coverage_points + self.hotspot_shares .values() - .fold(Decimal::ZERO, |sum, radio_points| { - sum + radio_points.total_points() + .fold(Decimal::ZERO, |sum, radio_shares| { + sum + radio_shares.total_shares() }) } @@ -327,65 +265,43 @@ impl CoveragePoints { let total_shares = self.total_shares(); let available_poc_rewards = get_scheduled_tokens_for_poc_and_dc(epoch.end - epoch.start) - transfer_rewards_sum; - available_poc_rewards - .checked_div(total_shares) - .map(|poc_rewards_per_share| { - let start_period = epoch.start.encode_timestamp(); - let end_period = epoch.end.encode_timestamp(); - self.coverage_points + if let Some(poc_rewards_per_share) = available_poc_rewards.checked_div(total_shares) { + let start_period = epoch.start.encode_timestamp(); + let end_period = epoch.end.encode_timestamp(); + Some( + self.hotspot_shares .into_iter() - .flat_map( - move |( - hotspot_key, - HotspotPoints { - speedtest_multiplier, - radio_points, - }, - )| { - radio_points.into_iter().map( - move |( - cbsd_id, - RadioPoints { - heartbeat_multiplier, - points, + .flat_map(move |(hotspot_key, RadioShares { radio_shares })| { + radio_shares.into_iter().map(move |(cbsd_id, amount)| { + let poc_reward = poc_rewards_per_share * amount; + let hotspot_key: Vec = hotspot_key.clone().into(); + proto::MobileRewardShare { + start_period, + end_period, + reward: Some(proto::mobile_reward_share::Reward::RadioReward( + proto::RadioReward { + hotspot_key, + cbsd_id, + poc_reward: poc_reward + .round_dp_with_strategy(0, RoundingStrategy::ToZero) + .to_u64() + .unwrap_or(0), + ..Default::default() }, - )| { - let poc_reward = poc_rewards_per_share - * speedtest_multiplier - * heartbeat_multiplier - * points; - let hotspot_key: Vec = hotspot_key.clone().into(); - proto::MobileRewardShare { - start_period, - end_period, - reward: Some( - proto::mobile_reward_share::Reward::RadioReward( - proto::RadioReward { - hotspot_key, - cbsd_id, - poc_reward: poc_reward - .round_dp_with_strategy( - 0, - RoundingStrategy::ToZero, - ) - .to_u64() - .unwrap_or(0), - ..Default::default() - }, - ), - ), - } - }, - ) - }, - ) + )), + } + }) + }) .filter(|mobile_reward| match mobile_reward.reward { Some(proto::mobile_reward_share::Reward::RadioReward(ref radio_reward)) => { radio_reward.poc_reward > 0 } _ => false, - }) - }) + }), + ) + } else { + None + } } } @@ -407,7 +323,6 @@ mod test { use super::*; use crate::{ cell_type::CellType, - coverage::{CoveredHexStream, HexCoverage}, data_session, data_session::HotspotDataSession, heartbeats::HeartbeatReward, @@ -415,14 +330,19 @@ mod test { subscriber_location::SubscriberValidatedLocations, }; use chrono::{Duration, Utc}; - use futures::stream::{self, BoxStream}; + use futures::stream; use helium_proto::services::poc_mobile::mobile_reward_share::Reward as MobileReward; use prost::Message; use std::collections::{HashMap, VecDeque}; - use uuid::Uuid; + + fn valid_shares() -> RadioShares { + let mut radio_shares: HashMap = Default::default(); + radio_shares.insert(String::new(), Decimal::ONE); + RadioShares { radio_shares } + } #[test] - fn ensure_correct_conversion_of_bytes_to_bones() { + fn bytes_to_bones() { assert_eq!( dc_to_mobile_bones(Decimal::from(1), dec!(1.0)), dec!(0.00001) @@ -433,21 +353,6 @@ mod test { ); } - fn valid_points() -> HotspotPoints { - let mut radio_points: HashMap = Default::default(); - radio_points.insert( - String::new(), - RadioPoints { - heartbeat_multiplier: Decimal::ONE, - points: Decimal::ONE, - }, - ); - HotspotPoints { - speedtest_multiplier: Decimal::ONE, - radio_points, - } - } - #[tokio::test] async fn discover_mapping_amount() { // test based on example defined at https://github.com/helium/oracles/issues/422 @@ -506,9 +411,8 @@ mod test { assert!(diff < NUM_SUBSCRIBERS); } - /// Test to ensure that the correct data transfer amount is rewarded. #[tokio::test] - async fn ensure_data_correct_transfer_reward_amount() { + async fn transfer_reward_amount() { let owner: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() .expect("failed owner parse"); @@ -531,9 +435,9 @@ mod test { data_transfer_session.num_dcs as u64, ); - let mut coverage_points = HashMap::default(); - coverage_points.insert(owner.clone(), valid_points()); - let coverage_points = CoveragePoints { coverage_points }; + let mut hotspot_shares = HashMap::default(); + hotspot_shares.insert(owner.clone(), valid_shares()); + let poc_shares = PocShares { hotspot_shares }; let now = Utc::now(); let epoch = (now - Duration::hours(1))..now; @@ -549,7 +453,7 @@ mod test { let data_transfer_rewards = TransferRewards::from_transfer_sessions( dec!(1.0), data_transfer_map, - &coverage_points, + &poc_shares, &epoch, ) .await; @@ -565,9 +469,8 @@ mod test { ); } - /// Test to ensure that excess transfer rewards are properly scaled down. #[tokio::test] - async fn ensure_excess_transfer_rewards_scale() { + async fn transfer_reward_scale() { let owner: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() .expect("failed owner parse"); @@ -596,14 +499,14 @@ mod test { let now = Utc::now(); let epoch = (now - Duration::hours(24))..now; - let mut coverage_points = HashMap::default(); - coverage_points.insert(owner.clone(), valid_points()); - let coverage_points = CoveragePoints { coverage_points }; + let mut hotspot_shares = HashMap::default(); + hotspot_shares.insert(owner.clone(), valid_shares()); + let poc_shares = PocShares { hotspot_shares }; let data_transfer_rewards = TransferRewards::from_transfer_sessions( dec!(1.0), aggregated_data_transfer_sessions, - &coverage_points, + &poc_shares, &epoch, ) .await; @@ -669,28 +572,8 @@ mod test { } } - #[async_trait::async_trait] - impl CoveredHexStream for HashMap<(String, Uuid), Vec> { - async fn covered_hex_stream<'a>( - &'a self, - cbsd_id: &'a str, - coverage_obj: &'a Uuid, - _period_end: DateTime, - ) -> Result>, sqlx::Error> { - Ok(stream::iter( - self.get(&(cbsd_id.to_string(), *coverage_obj)) - .unwrap() - .clone(), - ) - .map(Ok) - .boxed()) - } - } - - /// Test to ensure that a hotspot with radios that have higher heartbeat multipliers - /// will receive more rewards than a hotspot with a lower heartbeat multiplier. #[tokio::test] - async fn ensure_correct_radio_weights() { + async fn test_radio_weights() { let g1: PublicKeyBinary = "11eX55faMbqZB7jzN4p67m6w7ScPMH6ubnvCjCPLh72J49PaJEL" .parse() .expect("unable to construct pubkey"); @@ -700,13 +583,6 @@ mod test { let c1 = "P27-SCE4255W2107CW5000014".to_string(); let c2 = "2AG32PBS3101S1202000464223GY0153".to_string(); - let c3 = "P27-SCE4255W2107CW5000016".to_string(); - let c4 = "P27-SCE4255W2107CW5000018".to_string(); - - let cov_obj_1 = Uuid::new_v4(); - let cov_obj_2 = Uuid::new_v4(); - let cov_obj_3 = Uuid::new_v4(); - let cov_obj_4 = Uuid::new_v4(); let timestamp = Utc::now(); @@ -715,50 +591,24 @@ mod test { cbsd_id: c1.clone(), hotspot_key: g1.clone(), reward_weight: cell_type_weight(&c1), - coverage_object: cov_obj_1, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c2.clone(), hotspot_key: g1.clone(), reward_weight: cell_type_weight(&c2), - coverage_object: cov_obj_2, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { - cbsd_id: c3.clone(), + cbsd_id: c1.clone(), hotspot_key: g2.clone(), - reward_weight: cell_type_weight(&c3), - coverage_object: cov_obj_3, - latest_timestamp: DateTime::::MIN_UTC, + reward_weight: cell_type_weight(&c1), }, HeartbeatReward { - cbsd_id: c4.clone(), + cbsd_id: c1.clone(), hotspot_key: g2.clone(), - reward_weight: cell_type_weight(&c4), - coverage_object: cov_obj_4, - latest_timestamp: DateTime::::MIN_UTC, + reward_weight: cell_type_weight(&c1), }, ]; - let mut hex_coverage = HashMap::new(); - hex_coverage.insert( - (c1.clone(), cov_obj_1), - simple_hex_coverage(&c1, 0x8a1fb46692dffff), - ); - hex_coverage.insert( - (c2.clone(), cov_obj_2), - simple_hex_coverage(&c2, 0x8a1fb46522dffff), - ); - hex_coverage.insert( - (c3.clone(), cov_obj_3), - simple_hex_coverage(&c3, 0x8a1fb46622dffff), - ); - hex_coverage.insert( - (c4.clone(), cov_obj_4), - simple_hex_coverage(&c4, 0x8a1fb46632dffff), - ); - let last_timestamp = timestamp - Duration::hours(12); let g1_speedtests = vec![ acceptable_speedtest(last_timestamp), @@ -773,44 +623,27 @@ mod test { speedtests.insert(g2.clone(), VecDeque::from(g2_speedtests)); let speedtest_avgs = SpeedtestAverages { speedtests }; - let rewards = CoveragePoints::aggregate_points( - &hex_coverage, - stream::iter(heartbeats).map(Ok), - speedtest_avgs, - // Field isn't used: - DateTime::::MIN_UTC, - ) - .await - .unwrap(); + let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs) + .await + .unwrap(); + // The owner with two hotspots gets more rewards assert!( rewards - .coverage_points + .hotspot_shares .get(&g1) .expect("Could not fetch gateway1 shares") - .total_points() + .total_shares() > rewards - .coverage_points + .hotspot_shares .get(&g2) .expect("Could not fetch gateway2 shares") - .total_points() + .total_shares() ); } - fn simple_hex_coverage(cbsd_id: &str, hex: u64) -> Vec { - vec![HexCoverage { - uuid: Uuid::new_v4(), - hex: hex as i64, - indoor: false, - cbsd_id: cbsd_id.to_string(), - signal_level: crate::coverage::SignalLevel::Low, - coverage_claim_time: DateTime::::MIN_UTC, - }] - } - - /// Test to ensure that different speedtest averages correctly afferct reward shares. #[tokio::test] - async fn ensure_speedtest_averages_affect_reward_shares() { + async fn reward_shares_with_speed_multiplier() { // init owners let owner1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" .parse() @@ -876,19 +709,6 @@ mod test { let c13 = "P27-SCE4255W2107CW5000022".to_string(); let c14 = "2AG32PBS3101S1202000464223GY0157".to_string(); - let cov_obj_2 = Uuid::new_v4(); - let cov_obj_4 = Uuid::new_v4(); - let cov_obj_5 = Uuid::new_v4(); - let cov_obj_6 = Uuid::new_v4(); - let cov_obj_7 = Uuid::new_v4(); - let cov_obj_8 = Uuid::new_v4(); - let cov_obj_9 = Uuid::new_v4(); - let cov_obj_10 = Uuid::new_v4(); - let cov_obj_11 = Uuid::new_v4(); - let cov_obj_12 = Uuid::new_v4(); - let cov_obj_13 = Uuid::new_v4(); - let cov_obj_14 = Uuid::new_v4(); - let now = Utc::now(); let timestamp = now - Duration::minutes(20); @@ -898,139 +718,64 @@ mod test { cbsd_id: c2.clone(), hotspot_key: gw2.clone(), reward_weight: cell_type_weight(&c2), - coverage_object: cov_obj_2, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c4.clone(), hotspot_key: gw3.clone(), reward_weight: cell_type_weight(&c4), - coverage_object: cov_obj_4, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c5.clone(), hotspot_key: gw4.clone(), reward_weight: cell_type_weight(&c5), - coverage_object: cov_obj_5, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c6.clone(), hotspot_key: gw4.clone(), reward_weight: cell_type_weight(&c6), - coverage_object: cov_obj_6, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c7.clone(), hotspot_key: gw4.clone(), reward_weight: cell_type_weight(&c7), - coverage_object: cov_obj_7, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c8.clone(), hotspot_key: gw4.clone(), reward_weight: cell_type_weight(&c8), - coverage_object: cov_obj_8, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c9.clone(), hotspot_key: gw4.clone(), reward_weight: cell_type_weight(&c9), - coverage_object: cov_obj_9, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c10.clone(), hotspot_key: gw4.clone(), reward_weight: cell_type_weight(&c10), - coverage_object: cov_obj_10, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c11.clone(), hotspot_key: gw4.clone(), reward_weight: cell_type_weight(&c11), - coverage_object: cov_obj_11, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c12.clone(), hotspot_key: gw5.clone(), reward_weight: cell_type_weight(&c12), - coverage_object: cov_obj_12, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c13.clone(), hotspot_key: gw6.clone(), reward_weight: cell_type_weight(&c13), - coverage_object: cov_obj_13, - latest_timestamp: DateTime::::MIN_UTC, }, HeartbeatReward { cbsd_id: c14.clone(), hotspot_key: gw7.clone(), reward_weight: cell_type_weight(&c14), - coverage_object: cov_obj_14, - latest_timestamp: DateTime::::MIN_UTC, }, ]; - // Setup hex coverages - let mut hex_coverage = HashMap::new(); - hex_coverage.insert( - (c2.clone(), cov_obj_2), - simple_hex_coverage(&c2, 0x8a1fb46622dffff), - ); - hex_coverage.insert( - (c4.clone(), cov_obj_4), - simple_hex_coverage(&c4, 0x8a1fb46632dffff), - ); - hex_coverage.insert( - (c5.clone(), cov_obj_5), - simple_hex_coverage(&c5, 0x8a1fb46642dffff), - ); - hex_coverage.insert( - (c6.clone(), cov_obj_6), - simple_hex_coverage(&c6, 0x8a1fb46652dffff), - ); - hex_coverage.insert( - (c7.clone(), cov_obj_7), - simple_hex_coverage(&c7, 0x8a1fb46662dffff), - ); - hex_coverage.insert( - (c8.clone(), cov_obj_8), - simple_hex_coverage(&c8, 0x8a1fb46522dffff), - ); - hex_coverage.insert( - (c9.clone(), cov_obj_9), - simple_hex_coverage(&c9, 0x8a1fb46682dffff), - ); - hex_coverage.insert( - (c10.clone(), cov_obj_10), - simple_hex_coverage(&c10, 0x8a1fb46692dffff), - ); - hex_coverage.insert( - (c11.clone(), cov_obj_11), - simple_hex_coverage(&c11, 0x8a1fb466a2dffff), - ); - hex_coverage.insert( - (c12.clone(), cov_obj_12), - simple_hex_coverage(&c12, 0x8a1fb466b2dffff), - ); - hex_coverage.insert( - (c13.clone(), cov_obj_13), - simple_hex_coverage(&c13, 0x8a1fb466c2dffff), - ); - hex_coverage.insert( - (c14.clone(), cov_obj_14), - simple_hex_coverage(&c14, 0x8a1fb466d2dffff), - ); - // setup speedtests let last_speedtest = timestamp - Duration::hours(12); let gw1_speedtests = vec![ @@ -1071,17 +816,11 @@ mod test { // calculate the rewards for the sample group let mut owner_rewards = HashMap::::new(); let epoch = (now - Duration::hours(1))..now; - for mobile_reward in CoveragePoints::aggregate_points( - &hex_coverage, - stream::iter(heartbeats).map(Ok), - speedtest_avgs, - // Field isn't used: - DateTime::::MIN_UTC, - ) - .await - .unwrap() - .into_rewards(Decimal::ZERO, &epoch) - .unwrap() + for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs) + .await + .unwrap() + .into_rewards(Decimal::ZERO, &epoch) + .unwrap() { let radio_reward = match mobile_reward.reward { Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward, @@ -1107,6 +846,7 @@ mod test { .expect("Could not fetch owner2 rewards"), 2_950_474_183_346 ); + assert_eq!( *owner_rewards .get(&owner3) @@ -1123,9 +863,8 @@ mod test { assert_eq!(total, 4_109_589_041_089); // total emissions for 1 hour } - /// Test to ensure that rewards that are zeroed are not written out. #[tokio::test] - async fn ensure_zeroed_rewards_are_not_written() { + async fn dont_write_zero_rewards() { use rust_decimal_macros::dec; let gw1: PublicKeyBinary = "112NqN2WWMwtK29PMzRby62fDydBJfsCLkCAf392stdok48ovNT6" @@ -1139,55 +878,30 @@ mod test { let c2 = "P27-SCE4255W2107CW5000015".to_string(); let c3 = "2AG32PBS3101S1202000464223GY0153".to_string(); - let mut coverage_points = HashMap::new(); + let mut hotspot_shares = HashMap::new(); - coverage_points.insert( + hotspot_shares.insert( gw1.clone(), - HotspotPoints { - speedtest_multiplier: dec!(1.0), - radio_points: vec![( - c1, - RadioPoints { - heartbeat_multiplier: dec!(1.0), - points: dec!(10.0), - }, - )] - .into_iter() - .collect(), + RadioShares { + radio_shares: vec![(c1, dec!(10.0))].into_iter().collect(), }, ); - coverage_points.insert( + hotspot_shares.insert( gw2, - HotspotPoints { - speedtest_multiplier: dec!(1.0), - radio_points: vec![ - ( - c2, - RadioPoints { - heartbeat_multiplier: dec!(1.0), - points: dec!(-1.0), - }, - ), - ( - c3, - RadioPoints { - heartbeat_multiplier: dec!(1.0), - points: dec!(0.0), - }, - ), - ] - .into_iter() - .collect(), + RadioShares { + radio_shares: vec![(c2, dec!(-1.0)), (c3, dec!(0.0))] + .into_iter() + .collect(), }, ); let now = Utc::now(); // We should never see any radio shares from owner2, since all of them are // less than or equal to zero. - let coverage_points = CoveragePoints { coverage_points }; + let owner_shares = PocShares { hotspot_shares }; let epoch = now - Duration::hours(1)..now; let expected_hotspot = gw1; - for mobile_reward in coverage_points.into_rewards(Decimal::ZERO, &epoch).unwrap() { + for mobile_reward in owner_shares.into_rewards(Decimal::ZERO, &epoch).unwrap() { let radio_reward = match mobile_reward.reward { Some(proto::mobile_reward_share::Reward::RadioReward(radio_reward)) => radio_reward, _ => unreachable!(), @@ -1199,15 +913,13 @@ mod test { #[tokio::test] async fn skip_empty_radio_rewards() { - let coverage_points = CoveragePoints { - coverage_points: HashMap::new(), + let owner_shares = PocShares { + hotspot_shares: HashMap::new(), }; let now = Utc::now(); let epoch = now - Duration::hours(1)..now; - assert!(coverage_points - .into_rewards(Decimal::ZERO, &epoch) - .is_none()); + assert!(owner_shares.into_rewards(Decimal::ZERO, &epoch).is_none()); } } diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index ecc645426..49ca11ef1 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -1,7 +1,7 @@ use crate::{ data_session, heartbeats::HeartbeatReward, - reward_shares::{CoveragePoints, MapperShares, TransferRewards}, + reward_shares::{MapperShares, PocShares, TransferRewards}, speedtests::SpeedtestAverages, subscriber_location, telemetry, }; @@ -145,9 +145,7 @@ impl Rewarder { let heartbeats = HeartbeatReward::validated(&self.pool, reward_period); let speedtests = SpeedtestAverages::validated(&self.pool, reward_period.end).await?; - let coverage_points = - CoveragePoints::aggregate_points(&self.pool, heartbeats, speedtests, reward_period.end) - .await?; + let poc_rewards = PocShares::aggregate(heartbeats, speedtests).await?; let mobile_price = self .price_tracker .price(&helium_proto::BlockchainTokenTypeV1::Mobile) @@ -160,7 +158,7 @@ impl Rewarder { let transfer_rewards = TransferRewards::from_transfer_sessions( mobile_bone_price, data_session::aggregate_hotspot_data_sessions_to_dc(&self.pool, reward_period).await?, - &coverage_points, + &poc_rewards, reward_period, ) .await; @@ -173,7 +171,7 @@ impl Rewarder { telemetry::data_transfer_rewards_scale(scale); if let Some(mobile_reward_shares) = - coverage_points.into_rewards(transfer_rewards.reward_sum(), reward_period) + poc_rewards.into_rewards(transfer_rewards.reward_sum(), reward_period) { for mobile_reward_share in mobile_reward_shares { self.mobile_rewards