From 9b43edb685d6d4e73ded02e015c36956b55a9247 Mon Sep 17 00:00:00 2001 From: Macpie Date: Wed, 16 Oct 2024 16:45:43 -0700 Subject: [PATCH] Make rewarder calculate distance --- .../src/heartbeats/location_cache.rs | 15 ++++ .../src/radio_location_estimates.rs | 58 ++++++++------- mobile_verifier/src/rewarder.rs | 72 ++++++++++++++++++- .../tests/integrations/hex_boosting.rs | 22 ++++-- .../tests/integrations/rewarder_poc_dc.rs | 5 +- 5 files changed, 135 insertions(+), 37 deletions(-) diff --git a/mobile_verifier/src/heartbeats/location_cache.rs b/mobile_verifier/src/heartbeats/location_cache.rs index cb9ea2422..76150bdb3 100644 --- a/mobile_verifier/src/heartbeats/location_cache.rs +++ b/mobile_verifier/src/heartbeats/location_cache.rs @@ -1,4 +1,5 @@ use chrono::{DateTime, Duration, Utc}; +use file_store::radio_location_estimates::Entity; use helium_crypto::PublicKeyBinary; use sqlx::PgPool; use std::{collections::HashMap, sync::Arc}; @@ -61,6 +62,7 @@ impl LocationCache { info!("cleaned {}", size_before - size_after); } }); + // TODO: We could spawn an hydrate from DB here? Self { pool: pool.clone(), data, @@ -71,6 +73,7 @@ impl LocationCache { { let data = self.data.lock().await; if let Some(&value) = data.get(&key) { + // TODO: When we get it timestamp more than 12h old should we remove an try to fetch new one? return Ok(Some(value)); } } @@ -82,6 +85,11 @@ impl LocationCache { } } + pub async fn get_all(&self) -> HashMap { + let data = self.data.lock().await; + data.clone() + } + pub async fn insert( &self, key: LocationCacheKey, @@ -158,3 +166,10 @@ impl LocationCache { } } } + +pub fn key_to_entity(entity: LocationCacheKey) -> Entity { + match entity { + LocationCacheKey::CbrsId(id) => Entity::CbrsId(id), + LocationCacheKey::WifiPubKey(pub_key) => Entity::WifiPubKey(pub_key), + } +} diff --git a/mobile_verifier/src/radio_location_estimates.rs b/mobile_verifier/src/radio_location_estimates.rs index 6a70f7e6d..8c8792c3e 100644 --- a/mobile_verifier/src/radio_location_estimates.rs +++ b/mobile_verifier/src/radio_location_estimates.rs @@ -1,6 +1,4 @@ -use std::str::FromStr; - -use crate::{heartbeats::HbType, sp_boosted_rewards_bans::BannedRadios, Settings}; +use crate::{heartbeats::HbType, Settings}; use chrono::{DateTime, Utc}; use file_store::{ file_info_poller::{FileInfoStream, LookbackBehavior}, @@ -23,14 +21,11 @@ use helium_proto::services::{ }; use mobile_config::client::authorization_client::AuthorizationVerifier; use rust_decimal::Decimal; -use rust_decimal_macros::dec; use sha2::{Digest, Sha256}; use sqlx::{PgPool, Pool, Postgres, Row, Transaction}; use task_manager::{ManagedTask, TaskManager}; use tokio::sync::mpsc::Receiver; -const CONFIDENCE_THRESHOLD: Decimal = dec!(0.75); - pub struct RadioLocationEstimatesDaemon { pool: Pool, authorization_verifier: AV, @@ -298,33 +293,36 @@ pub async fn clear_invalided( Ok(()) } -// This is wrong should be a get estimates but will fix later -pub async fn get_banned_radios(pool: &PgPool) -> anyhow::Result { - // TODO: Do we still want to ban any radio that is NOT in this table? - // Might be multiple per radio - // check assertion in circle as well - sqlx::query( +pub async fn get_valid_estimates( + pool: &PgPool, + radio_key: &Entity, + threshold: Decimal, +) -> anyhow::Result> { + let rows = sqlx::query( r#" - SELECT radio_type, radio_key - FROM radio_location_estimates - WHERE confidence < $1 - AND invalided_at IS NULL + SELECT radius, lat, long + FROM radio_location_estimates + WHERE radio_key = $1 + AND confidence >= $2 + AND invalided_at IS NULL "#, ) - .bind(CONFIDENCE_THRESHOLD) - .fetch(pool) - .map_err(anyhow::Error::from) - .try_fold(BannedRadios::default(), |mut set, row| async move { - let radio_type = row.get::("radio_type"); - let radio_key = row.get::("radio_key"); - match radio_type { - HbType::Wifi => set.insert_wifi(PublicKeyBinary::from_str(&radio_key)?), - HbType::Cbrs => set.insert_cbrs(radio_key), - }; - - Ok(set) - }) - .await + .bind(radio_key.to_string()) + .bind(threshold) + .fetch_all(pool) + .await?; + + let results = rows + .into_iter() + .map(|row| { + let radius: Decimal = row.get("radius"); + let lat: Decimal = row.get("lat"); + let lon: Decimal = row.get("long"); + (radius, lat, lon) + }) + .collect(); + + Ok(results) } fn entity_to_radio_type(entity: &Entity) -> HbType { diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 9da44171e..6d6b77657 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -2,7 +2,11 @@ use self::boosted_hex_eligibility::BoostedHexEligibility; use crate::{ boosting_oracles::db::check_for_unprocessed_data_sets, coverage, data_session, - heartbeats::{self, location_cache::LocationCache, HeartbeatReward}, + heartbeats::{ + self, + location_cache::{self, LocationCache}, + HeartbeatReward, + }, radio_location_estimates, radio_threshold, reward_shares::{ self, CalculatedPocRewardShares, CoverageShares, DataTransferAndPocAllocatedRewardBuckets, @@ -21,6 +25,7 @@ use file_store::{ traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode}, }; use futures_util::TryFutureExt; +use h3o::{LatLng, Resolution}; use helium_proto::{ reward_manifest::RewardData::MobileRewardData, services::poc_mobile::{ @@ -268,6 +273,7 @@ where &self.hex_service_client, &self.mobile_rewards, &self.speedtest_averages, + &self.location_cache, reward_period, mobile_bone_price, ) @@ -362,6 +368,7 @@ pub async fn reward_poc_and_dc( hex_service_client: &impl HexBoostingInfoResolver, mobile_rewards: &FileSinkClient, speedtest_avg_sink: &FileSinkClient, + location_cache: &LocationCache, reward_period: &Range>, mobile_bone_price: Decimal, ) -> anyhow::Result { @@ -399,6 +406,7 @@ pub async fn reward_poc_and_dc( speedtest_avg_sink, reward_period, reward_shares, + location_cache, ) .await?; @@ -425,6 +433,7 @@ async fn reward_poc( speedtest_avg_sink: &FileSinkClient, reward_period: &Range>, reward_shares: DataTransferAndPocAllocatedRewardBuckets, + location_cache: &LocationCache, ) -> anyhow::Result<(Decimal, CalculatedPocRewardShares)> { let heartbeats = HeartbeatReward::validated(pool, reward_period); let speedtest_averages = @@ -451,6 +460,27 @@ async fn reward_poc( ) .await?; + { + let locations = location_cache.get_all().await; + for (key, value) in locations.iter() { + let entity = location_cache::key_to_entity(key.clone()); + let estimates = + radio_location_estimates::get_valid_estimates(pool, &entity, dec!(0.75)).await?; + if estimates.is_empty() { + // TODO we ban that key + todo!() + } else { + match is_within_radius(value.lat, value.lon, estimates) { + Ok(true) => todo!(), + // TODO we ban that key + Ok(false) => todo!(), + // TODO we ban that key + Err(_) => todo!(), + } + } + } + } + let coverage_shares = CoverageShares::new( pool, heartbeats, @@ -497,6 +527,46 @@ async fn reward_poc( Ok((unallocated_poc_amount, calculated_poc_rewards_per_share)) } +fn is_within_radius( + lat_a: f64, + lon_a: f64, + comparators: Vec<(Decimal, Decimal, Decimal)>, +) -> anyhow::Result { + let resolution = Resolution::Twelve; + + let point_a = + LatLng::new(lat_a, lon_a).map_err(|e| anyhow::anyhow!("Invalid LatLng for A: {}", e))?; + let h3_index_a = point_a.to_cell(resolution); + + for (radius_meters, lat_b, lon_b) in comparators { + let lat_b_f64 = lat_b + .to_f64() + .ok_or_else(|| anyhow::anyhow!("Failed to convert lat_b to f64"))?; + let lon_b_f64 = lon_b + .to_f64() + .ok_or_else(|| anyhow::anyhow!("Failed to convert lon_b to f64"))?; + let radius_meters_f64 = radius_meters + .to_f64() + .ok_or_else(|| anyhow::anyhow!("Failed to convert radius_meters to f64"))?; + + let point_b = LatLng::new(lat_b_f64, lon_b_f64) + .map_err(|e| anyhow::anyhow!("Invalid LatLng for B: {}", e))?; + let h3_index_b = point_b.to_cell(resolution); + + let grid_distance = h3_index_a + .grid_distance(h3_index_b) + .map_err(|e| anyhow::anyhow!("Failed to calculate grid distance: {}", e))?; + + let max_grid_distance = (radius_meters_f64 / 9.0).round() as i32; + + if grid_distance <= max_grid_distance { + return Ok(true); + } + } + + Ok(false) +} + pub async fn reward_dc( mobile_rewards: &FileSinkClient, reward_period: &Range>, diff --git a/mobile_verifier/tests/integrations/hex_boosting.rs b/mobile_verifier/tests/integrations/hex_boosting.rs index ae11f4363..c9a9081c1 100644 --- a/mobile_verifier/tests/integrations/hex_boosting.rs +++ b/mobile_verifier/tests/integrations/hex_boosting.rs @@ -18,7 +18,7 @@ use mobile_config::boosted_hex_info::BoostedHexInfo; use mobile_verifier::{ cell_type::CellType, coverage::CoverageObject, - heartbeats::{HbType, Heartbeat, ValidatedHeartbeat}, + heartbeats::{location_cache::LocationCache, HbType, Heartbeat, ValidatedHeartbeat}, radio_threshold, reward_shares, rewarder, speedtests, }; use rust_decimal::prelude::*; @@ -137,6 +137,8 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { .to_u64() .unwrap(); + let location_cache = LocationCache::new(&pool); + let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -144,6 +146,7 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> { &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -320,7 +323,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu ]; let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - + let location_cache = LocationCache::new(&pool); let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -328,6 +331,7 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -483,6 +487,9 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res .unwrap(); let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); + + let location_cache = LocationCache::new(&pool); + let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -490,6 +497,7 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -657,7 +665,7 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { ]; let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - + let location_cache = LocationCache::new(&pool); let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -665,6 +673,7 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> { &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -790,6 +799,7 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: .to_u64() .unwrap(); + let location_cache = LocationCache::new(&pool); let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -797,6 +807,7 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow: &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -969,7 +980,7 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( let total_poc_emissions = reward_shares::get_scheduled_tokens_for_poc(epoch_duration) .to_u64() .unwrap(); - + let location_cache = LocationCache::new(&pool); let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -977,6 +988,7 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust( &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), @@ -1175,6 +1187,7 @@ async fn test_poc_with_cbrs_and_multi_coverage_boosted_hexes(pool: PgPool) -> an .to_u64() .unwrap(); + let location_cache = LocationCache::new(&pool); let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -1182,6 +1195,7 @@ async fn test_poc_with_cbrs_and_multi_coverage_boosted_hexes(pool: PgPool) -> an &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ), diff --git a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs index e26af88b8..ed9cc3c4f 100644 --- a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs @@ -13,7 +13,7 @@ use mobile_verifier::{ cell_type::CellType, coverage::CoverageObject, data_session, - heartbeats::{HbType, Heartbeat, ValidatedHeartbeat}, + heartbeats::{location_cache::LocationCache, HbType, Heartbeat, ValidatedHeartbeat}, reward_shares, rewarder, speedtests, }; use rust_decimal::prelude::*; @@ -44,7 +44,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { let boosted_hexes = vec![]; let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes); - + let location_cache = LocationCache::new(&pool); let (_, rewards) = tokio::join!( // run rewards for poc and dc rewarder::reward_poc_and_dc( @@ -52,6 +52,7 @@ async fn test_poc_and_dc_rewards(pool: PgPool) -> anyhow::Result<()> { &hex_boosting_client, &mobile_rewards_client, &speedtest_avg_client, + &location_cache, &epoch, dec!(0.0001) ),