Skip to content

Commit

Permalink
Make rewarder calculate distance
Browse files Browse the repository at this point in the history
  • Loading branch information
macpie committed Oct 16, 2024
1 parent df8d6b3 commit 9b43edb
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 37 deletions.
15 changes: 15 additions & 0 deletions mobile_verifier/src/heartbeats/location_cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::{DateTime, Duration, Utc};
use file_store::radio_location_estimates::Entity;
use helium_crypto::PublicKeyBinary;
use sqlx::PgPool;
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -61,6 +62,7 @@ impl LocationCache {
info!("cleaned {}", size_before - size_after);
}
});
// TODO: We could spawn an hydrate from DB here?
Self {
pool: pool.clone(),
data,
Expand All @@ -71,6 +73,7 @@ impl LocationCache {
{
let data = self.data.lock().await;
if let Some(&value) = data.get(&key) {
// TODO: When we get it timestamp more than 12h old should we remove an try to fetch new one?
return Ok(Some(value));
}
}
Expand All @@ -82,6 +85,11 @@ impl LocationCache {
}
}

pub async fn get_all(&self) -> HashMap<LocationCacheKey, LocationCacheValue> {
let data = self.data.lock().await;
data.clone()
}

pub async fn insert(
&self,
key: LocationCacheKey,
Expand Down Expand Up @@ -158,3 +166,10 @@ impl LocationCache {
}
}
}

pub fn key_to_entity(entity: LocationCacheKey) -> Entity {
match entity {
LocationCacheKey::CbrsId(id) => Entity::CbrsId(id),
LocationCacheKey::WifiPubKey(pub_key) => Entity::WifiPubKey(pub_key),
}
}
58 changes: 28 additions & 30 deletions mobile_verifier/src/radio_location_estimates.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use std::str::FromStr;

use crate::{heartbeats::HbType, sp_boosted_rewards_bans::BannedRadios, Settings};
use crate::{heartbeats::HbType, Settings};
use chrono::{DateTime, Utc};
use file_store::{
file_info_poller::{FileInfoStream, LookbackBehavior},
Expand All @@ -23,14 +21,11 @@ use helium_proto::services::{
};
use mobile_config::client::authorization_client::AuthorizationVerifier;
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use sha2::{Digest, Sha256};
use sqlx::{PgPool, Pool, Postgres, Row, Transaction};
use task_manager::{ManagedTask, TaskManager};
use tokio::sync::mpsc::Receiver;

const CONFIDENCE_THRESHOLD: Decimal = dec!(0.75);

pub struct RadioLocationEstimatesDaemon<AV> {
pool: Pool<Postgres>,
authorization_verifier: AV,
Expand Down Expand Up @@ -298,33 +293,36 @@ pub async fn clear_invalided(
Ok(())
}

// This is wrong should be a get estimates but will fix later
pub async fn get_banned_radios(pool: &PgPool) -> anyhow::Result<BannedRadios> {
// TODO: Do we still want to ban any radio that is NOT in this table?
// Might be multiple per radio
// check assertion in circle as well
sqlx::query(
pub async fn get_valid_estimates(
pool: &PgPool,
radio_key: &Entity,
threshold: Decimal,
) -> anyhow::Result<Vec<(Decimal, Decimal, Decimal)>> {
let rows = sqlx::query(
r#"
SELECT radio_type, radio_key
FROM radio_location_estimates
WHERE confidence < $1
AND invalided_at IS NULL
SELECT radius, lat, long
FROM radio_location_estimates
WHERE radio_key = $1
AND confidence >= $2
AND invalided_at IS NULL
"#,
)
.bind(CONFIDENCE_THRESHOLD)
.fetch(pool)
.map_err(anyhow::Error::from)
.try_fold(BannedRadios::default(), |mut set, row| async move {
let radio_type = row.get::<HbType, &str>("radio_type");
let radio_key = row.get::<String, &str>("radio_key");
match radio_type {
HbType::Wifi => set.insert_wifi(PublicKeyBinary::from_str(&radio_key)?),
HbType::Cbrs => set.insert_cbrs(radio_key),
};

Ok(set)
})
.await
.bind(radio_key.to_string())
.bind(threshold)
.fetch_all(pool)
.await?;

let results = rows
.into_iter()
.map(|row| {
let radius: Decimal = row.get("radius");
let lat: Decimal = row.get("lat");
let lon: Decimal = row.get("long");
(radius, lat, lon)
})
.collect();

Ok(results)
}

fn entity_to_radio_type(entity: &Entity) -> HbType {
Expand Down
72 changes: 71 additions & 1 deletion mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@ use self::boosted_hex_eligibility::BoostedHexEligibility;
use crate::{
boosting_oracles::db::check_for_unprocessed_data_sets,
coverage, data_session,
heartbeats::{self, location_cache::LocationCache, HeartbeatReward},
heartbeats::{
self,
location_cache::{self, LocationCache},
HeartbeatReward,
},
radio_location_estimates, radio_threshold,
reward_shares::{
self, CalculatedPocRewardShares, CoverageShares, DataTransferAndPocAllocatedRewardBuckets,
Expand All @@ -21,6 +25,7 @@ use file_store::{
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode},
};
use futures_util::TryFutureExt;
use h3o::{LatLng, Resolution};
use helium_proto::{
reward_manifest::RewardData::MobileRewardData,
services::poc_mobile::{
Expand Down Expand Up @@ -268,6 +273,7 @@ where
&self.hex_service_client,
&self.mobile_rewards,
&self.speedtest_averages,
&self.location_cache,
reward_period,
mobile_bone_price,
)
Expand Down Expand Up @@ -362,6 +368,7 @@ pub async fn reward_poc_and_dc(
hex_service_client: &impl HexBoostingInfoResolver<Error = ClientError>,
mobile_rewards: &FileSinkClient<proto::MobileRewardShare>,
speedtest_avg_sink: &FileSinkClient<proto::SpeedtestAvg>,
location_cache: &LocationCache,
reward_period: &Range<DateTime<Utc>>,
mobile_bone_price: Decimal,
) -> anyhow::Result<CalculatedPocRewardShares> {
Expand Down Expand Up @@ -399,6 +406,7 @@ pub async fn reward_poc_and_dc(
speedtest_avg_sink,
reward_period,
reward_shares,
location_cache,
)
.await?;

Expand All @@ -425,6 +433,7 @@ async fn reward_poc(
speedtest_avg_sink: &FileSinkClient<proto::SpeedtestAvg>,
reward_period: &Range<DateTime<Utc>>,
reward_shares: DataTransferAndPocAllocatedRewardBuckets,
location_cache: &LocationCache,
) -> anyhow::Result<(Decimal, CalculatedPocRewardShares)> {
let heartbeats = HeartbeatReward::validated(pool, reward_period);
let speedtest_averages =
Expand All @@ -451,6 +460,27 @@ async fn reward_poc(
)
.await?;

{
let locations = location_cache.get_all().await;
for (key, value) in locations.iter() {
let entity = location_cache::key_to_entity(key.clone());
let estimates =
radio_location_estimates::get_valid_estimates(pool, &entity, dec!(0.75)).await?;
if estimates.is_empty() {
// TODO we ban that key
todo!()
} else {
match is_within_radius(value.lat, value.lon, estimates) {
Ok(true) => todo!(),
// TODO we ban that key
Ok(false) => todo!(),
// TODO we ban that key
Err(_) => todo!(),
}
}
}
}

let coverage_shares = CoverageShares::new(
pool,
heartbeats,
Expand Down Expand Up @@ -497,6 +527,46 @@ async fn reward_poc(
Ok((unallocated_poc_amount, calculated_poc_rewards_per_share))
}

fn is_within_radius(
lat_a: f64,
lon_a: f64,
comparators: Vec<(Decimal, Decimal, Decimal)>,
) -> anyhow::Result<bool> {
let resolution = Resolution::Twelve;

let point_a =
LatLng::new(lat_a, lon_a).map_err(|e| anyhow::anyhow!("Invalid LatLng for A: {}", e))?;
let h3_index_a = point_a.to_cell(resolution);

for (radius_meters, lat_b, lon_b) in comparators {
let lat_b_f64 = lat_b
.to_f64()
.ok_or_else(|| anyhow::anyhow!("Failed to convert lat_b to f64"))?;
let lon_b_f64 = lon_b
.to_f64()
.ok_or_else(|| anyhow::anyhow!("Failed to convert lon_b to f64"))?;
let radius_meters_f64 = radius_meters
.to_f64()
.ok_or_else(|| anyhow::anyhow!("Failed to convert radius_meters to f64"))?;

let point_b = LatLng::new(lat_b_f64, lon_b_f64)
.map_err(|e| anyhow::anyhow!("Invalid LatLng for B: {}", e))?;
let h3_index_b = point_b.to_cell(resolution);

let grid_distance = h3_index_a
.grid_distance(h3_index_b)
.map_err(|e| anyhow::anyhow!("Failed to calculate grid distance: {}", e))?;

let max_grid_distance = (radius_meters_f64 / 9.0).round() as i32;

if grid_distance <= max_grid_distance {
return Ok(true);
}
}

Ok(false)
}

pub async fn reward_dc(
mobile_rewards: &FileSinkClient<proto::MobileRewardShare>,
reward_period: &Range<DateTime<Utc>>,
Expand Down
22 changes: 18 additions & 4 deletions mobile_verifier/tests/integrations/hex_boosting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use mobile_config::boosted_hex_info::BoostedHexInfo;
use mobile_verifier::{
cell_type::CellType,
coverage::CoverageObject,
heartbeats::{HbType, Heartbeat, ValidatedHeartbeat},
heartbeats::{location_cache::LocationCache, HbType, Heartbeat, ValidatedHeartbeat},
radio_threshold, reward_shares, rewarder, speedtests,
};
use rust_decimal::prelude::*;
Expand Down Expand Up @@ -137,13 +137,16 @@ async fn test_poc_with_boosted_hexes(pool: PgPool) -> anyhow::Result<()> {
.to_u64()
.unwrap();

let location_cache = LocationCache::new(&pool);

let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
&pool,
&hex_boosting_client,
&mobile_rewards_client,
&speedtest_avg_client,
&location_cache,
&epoch,
dec!(0.0001)
),
Expand Down Expand Up @@ -320,14 +323,15 @@ async fn test_poc_boosted_hexes_thresholds_not_met(pool: PgPool) -> anyhow::Resu
];

let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes);

let location_cache = LocationCache::new(&pool);
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
&pool,
&hex_boosting_client,
&mobile_rewards_client,
&speedtest_avg_client,
&location_cache,
&epoch,
dec!(0.0001)
),
Expand Down Expand Up @@ -483,13 +487,17 @@ async fn test_poc_with_multi_coverage_boosted_hexes(pool: PgPool) -> anyhow::Res
.unwrap();

let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes);

let location_cache = LocationCache::new(&pool);

let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
&pool,
&hex_boosting_client,
&mobile_rewards_client,
&speedtest_avg_client,
&location_cache,
&epoch,
dec!(0.0001)
),
Expand Down Expand Up @@ -657,14 +665,15 @@ async fn test_expired_boosted_hex(pool: PgPool) -> anyhow::Result<()> {
];

let hex_boosting_client = MockHexBoostingClient::new(boosted_hexes);

let location_cache = LocationCache::new(&pool);
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
&pool,
&hex_boosting_client,
&mobile_rewards_client,
&speedtest_avg_client,
&location_cache,
&epoch,
dec!(0.0001)
),
Expand Down Expand Up @@ -790,13 +799,15 @@ async fn test_reduced_location_score_with_boosted_hexes(pool: PgPool) -> anyhow:
.to_u64()
.unwrap();

let location_cache = LocationCache::new(&pool);
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
&pool,
&hex_boosting_client,
&mobile_rewards_client,
&speedtest_avg_client,
&location_cache,
&epoch,
dec!(0.0001)
),
Expand Down Expand Up @@ -969,14 +980,15 @@ async fn test_distance_from_asserted_removes_boosting_but_not_location_trust(
let total_poc_emissions = reward_shares::get_scheduled_tokens_for_poc(epoch_duration)
.to_u64()
.unwrap();

let location_cache = LocationCache::new(&pool);
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
&pool,
&hex_boosting_client,
&mobile_rewards_client,
&speedtest_avg_client,
&location_cache,
&epoch,
dec!(0.0001)
),
Expand Down Expand Up @@ -1175,13 +1187,15 @@ async fn test_poc_with_cbrs_and_multi_coverage_boosted_hexes(pool: PgPool) -> an
.to_u64()
.unwrap();

let location_cache = LocationCache::new(&pool);
let (_, rewards) = tokio::join!(
// run rewards for poc and dc
rewarder::reward_poc_and_dc(
&pool,
&hex_boosting_client,
&mobile_rewards_client,
&speedtest_avg_client,
&location_cache,
&epoch,
dec!(0.0001)
),
Expand Down
Loading

0 comments on commit 9b43edb

Please sign in to comment.