Skip to content

Commit

Permalink
Update Last Location Cache to be valid for up to 12 hours from latest…
Browse files Browse the repository at this point in the history
… heartbeat with valid location (#822)

* Move LastLocation and LocationCache to own module

* Update LastLocation to be 12 hours from last heartbeat with validate location

* refactor last location tests
  • Loading branch information
bbalser authored Jun 4, 2024
1 parent c44d5d0 commit 6337f86
Show file tree
Hide file tree
Showing 8 changed files with 475 additions and 479 deletions.
3 changes: 2 additions & 1 deletion mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ pub fn new_coverage_object_notification_channel(
)
}

#[derive(Clone)]
pub struct CoverageObject {
pub coverage_object: file_store::coverage::CoverageObject,
pub validity: CoverageObjectValidity,
Expand Down Expand Up @@ -298,7 +299,7 @@ impl CoverageObject {
Ok(())
}

pub async fn save(self, transaction: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
pub async fn save(&self, transaction: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> {
let insertion_time = Utc::now();
let key = self.key();
let hb_type = key.hb_type();
Expand Down
124 changes: 124 additions & 0 deletions mobile_verifier/src/heartbeats/last_location.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::sync::Arc;

use chrono::{DateTime, Duration, Utc};
use helium_crypto::PublicKeyBinary;
use retainer::Cache;
use sqlx::PgPool;

#[derive(sqlx::FromRow, Copy, Clone)]
pub struct LastLocation {
pub location_validation_timestamp: DateTime<Utc>,
pub latest_timestamp: DateTime<Utc>,
pub lat: f64,
pub lon: f64,
}

impl LastLocation {
pub fn new(
location_validation_timestamp: DateTime<Utc>,
latest_timestamp: DateTime<Utc>,
lat: f64,
lon: f64,
) -> Self {
Self {
location_validation_timestamp,
latest_timestamp,
lat,
lon,
}
}

/// Calculates the duration from now in which last_valid_timestamp is 12 hours old
pub fn duration_to_expiration(&self) -> Duration {
((self.latest_timestamp + Duration::hours(12)) - Utc::now()).max(Duration::zero())
}
}

/// A cache for previous valid (or invalid) WiFi heartbeat locations
#[derive(Clone)]
pub struct LocationCache {
pool: PgPool,
locations: Arc<Cache<PublicKeyBinary, Option<LastLocation>>>,
}

impl LocationCache {
pub fn new(pool: &PgPool) -> Self {
let locations = Arc::new(Cache::new());
let locations_clone = locations.clone();
tokio::spawn(async move {
locations_clone
.monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 24))
.await
});
Self {
pool: pool.clone(),
locations,
}
}

async fn fetch_from_db_and_set(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
let last_location: Option<LastLocation> = sqlx::query_as(
r#"
SELECT location_validation_timestamp, latest_timestamp, lat, lon
FROM wifi_heartbeats
WHERE location_validation_timestamp IS NOT NULL
AND latest_timestamp >= $1
AND hotspot_key = $2
ORDER BY latest_timestamp DESC
LIMIT 1
"#,
)
.bind(Utc::now() - Duration::hours(12))
.bind(hotspot)
.fetch_optional(&self.pool)
.await?;
self.locations
.insert(
hotspot.clone(),
last_location,
last_location
.map(|x| x.duration_to_expiration())
.unwrap_or_else(|| Duration::days(365))
.to_std()?,
)
.await;
Ok(last_location)
}

pub async fn fetch_last_location(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
Ok(
if let Some(last_location) = self.locations.get(hotspot).await {
*last_location
} else {
self.fetch_from_db_and_set(hotspot).await?
},
)
}

pub async fn set_last_location(
&self,
hotspot: &PublicKeyBinary,
last_location: LastLocation,
) -> anyhow::Result<()> {
let duration_to_expiration = last_location.duration_to_expiration();
self.locations
.insert(
hotspot.clone(),
Some(last_location),
duration_to_expiration.to_std()?,
)
.await;
Ok(())
}

/// Only used for testing.
pub async fn delete_last_location(&self, hotspot: &PublicKeyBinary) {
self.locations.remove(hotspot).await;
}
}
120 changes: 6 additions & 114 deletions mobile_verifier/src/heartbeats/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod cbrs;
pub mod last_location;
pub mod wifi;

use crate::{
Expand All @@ -20,10 +21,12 @@ use helium_proto::services::poc_mobile as proto;
use retainer::Cache;
use rust_decimal::{prelude::ToPrimitive, Decimal};
use rust_decimal_macros::dec;
use sqlx::{postgres::PgTypeInfo, Decode, Encode, PgPool, Postgres, Transaction, Type};
use std::{ops::Range, pin::pin, sync::Arc, time};
use sqlx::{postgres::PgTypeInfo, Decode, Encode, Postgres, Transaction, Type};
use std::{ops::Range, pin::pin, time};
use uuid::Uuid;

use self::last_location::{LastLocation, LocationCache};

/// Minimum number of heartbeats required to give a reward to the hotspot.
const MINIMUM_HEARTBEAT_COUNT: i64 = 12;

Expand Down Expand Up @@ -541,6 +544,7 @@ impl ValidatedHeartbeat {
&heartbeat.hotspot_key,
LastLocation::new(
location_validation_timestamp,
heartbeat.timestamp,
heartbeat.lat,
heartbeat.lon,
),
Expand Down Expand Up @@ -780,118 +784,6 @@ pub async fn clear_heartbeats(
Ok(())
}

/// A cache for previous valid (or invalid) WiFi heartbeat locations
#[derive(Clone)]
pub struct LocationCache {
pool: PgPool,
locations: Arc<Cache<PublicKeyBinary, Option<LastLocation>>>,
}

impl LocationCache {
pub fn new(pool: &PgPool) -> Self {
let locations = Arc::new(Cache::new());
let locations_clone = locations.clone();
tokio::spawn(async move {
locations_clone
.monitor(4, 0.25, std::time::Duration::from_secs(60 * 60 * 24))
.await
});
Self {
pool: pool.clone(),
locations,
}
}

async fn fetch_from_db_and_set(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
let last_location: Option<LastLocation> = sqlx::query_as(
r#"
SELECT location_validation_timestamp, lat, lon
FROM wifi_heartbeats
WHERE location_validation_timestamp IS NOT NULL
AND location_validation_timestamp >= $1
AND hotspot_key = $2
ORDER BY location_validation_timestamp DESC
LIMIT 1
"#,
)
.bind(Utc::now() - Duration::hours(12))
.bind(hotspot)
.fetch_optional(&self.pool)
.await?;
self.locations
.insert(
hotspot.clone(),
last_location,
last_location
.map(|x| x.duration_to_expiration())
.unwrap_or_else(|| Duration::days(365))
.to_std()?,
)
.await;
Ok(last_location)
}

pub async fn fetch_last_location(
&self,
hotspot: &PublicKeyBinary,
) -> anyhow::Result<Option<LastLocation>> {
Ok(
if let Some(last_location) = self.locations.get(hotspot).await {
*last_location
} else {
self.fetch_from_db_and_set(hotspot).await?
},
)
}

pub async fn set_last_location(
&self,
hotspot: &PublicKeyBinary,
last_location: LastLocation,
) -> anyhow::Result<()> {
let duration_to_expiration = last_location.duration_to_expiration();
self.locations
.insert(
hotspot.clone(),
Some(last_location),
duration_to_expiration.to_std()?,
)
.await;
Ok(())
}

/// Only used for testing.
pub async fn delete_last_location(&self, hotspot: &PublicKeyBinary) {
self.locations.remove(hotspot).await;
}
}

#[derive(sqlx::FromRow, Copy, Clone)]
pub struct LastLocation {
pub location_validation_timestamp: DateTime<Utc>,
pub lat: f64,
pub lon: f64,
}

impl LastLocation {
fn new(location_validation_timestamp: DateTime<Utc>, lat: f64, lon: f64) -> Self {
Self {
location_validation_timestamp,
lat,
lon,
}
}

/// Calculates the duration from now in which last_valid_timestamp is 12 hours old
fn duration_to_expiration(&self) -> Duration {
((self.location_validation_timestamp + Duration::hours(12)) - Utc::now())
.max(Duration::zero())
}
}

pub struct SeniorityUpdate<'a> {
heartbeat: &'a ValidatedHeartbeat,
action: SeniorityUpdateAction,
Expand Down
5 changes: 4 additions & 1 deletion mobile_verifier/tests/integrations/boosting_oracles.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use mobile_config::boosted_hex_info::BoostedHexes;
use mobile_verifier::{
coverage::{CoverageClaimTimeCache, CoverageObject, CoverageObjectCache, Seniority},
geofence::GeofenceValidator,
heartbeats::{Heartbeat, HeartbeatReward, LocationCache, SeniorityUpdate, ValidatedHeartbeat},
heartbeats::{
last_location::LocationCache, Heartbeat, HeartbeatReward, SeniorityUpdate,
ValidatedHeartbeat,
},
radio_threshold::VerifiedRadioThresholds,
reward_shares::CoveragePoints,
speedtests::Speedtest,
Expand Down
Loading

0 comments on commit 6337f86

Please sign in to comment.