From 172f92940a60613a43bd8306626a1d1b741658cf Mon Sep 17 00:00:00 2001 From: Matthew Plant Date: Tue, 21 May 2024 15:45:43 -0400 Subject: [PATCH] Automatically download data sets (#760) --- Cargo.lock | 4 + Cargo.toml | 1 + file_store/Cargo.toml | 2 +- file_store/src/file_info.rs | 15 + mobile_verifier/Cargo.toml | 4 + mobile_verifier/migrations/33_data_sets.sql | 26 + .../src/boosting_oracles/assignment.rs | 55 +- .../src/boosting_oracles/data_sets.rs | 733 ++++++++++++++++++ .../src/boosting_oracles/footfall.rs | 70 ++ .../src/boosting_oracles/landtype.rs | 152 ++++ mobile_verifier/src/boosting_oracles/mod.rs | 205 ++--- .../src/boosting_oracles/urbanization.rs | 68 ++ mobile_verifier/src/cli/server.rs | 27 +- mobile_verifier/src/cli/verify_disktree.rs | 4 +- mobile_verifier/src/coverage.rs | 224 +----- mobile_verifier/src/geofence.rs | 20 +- mobile_verifier/src/heartbeats/cbrs.rs | 4 +- mobile_verifier/src/heartbeats/mod.rs | 4 +- mobile_verifier/src/heartbeats/wifi.rs | 4 +- mobile_verifier/src/rewarder.rs | 6 + mobile_verifier/src/settings.rs | 15 +- .../tests/integrations/boosting_oracles.rs | 38 +- .../tests/integrations/common/mod.rs | 94 ++- .../tests/integrations/heartbeats.rs | 8 +- .../tests/integrations/hex_boosting.rs | 8 +- .../tests/integrations/modeled_coverage.rs | 22 +- .../tests/integrations/rewarder_poc_dc.rs | 8 +- 27 files changed, 1369 insertions(+), 452 deletions(-) create mode 100644 mobile_verifier/migrations/33_data_sets.sql create mode 100644 mobile_verifier/src/boosting_oracles/data_sets.rs create mode 100644 mobile_verifier/src/boosting_oracles/footfall.rs create mode 100644 mobile_verifier/src/boosting_oracles/landtype.rs create mode 100644 mobile_verifier/src/boosting_oracles/urbanization.rs diff --git a/Cargo.lock b/Cargo.lock index 39e95b2ea..35c5bea7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4695,6 +4695,7 @@ name = "mobile-verifier" version = "0.1.0" dependencies = [ "anyhow", + "async-compression", "async-trait", "backon", "base64 0.21.7", @@ -4703,6 +4704,7 @@ dependencies = [ "config", "custom-tracing", "db-store", + "derive_builder", "file-store", "flate2", "futures", @@ -4723,6 +4725,7 @@ dependencies = [ "price", "prost", "rand 0.8.5", + "regex", "retainer", "reward-scheduler", "rust_decimal", @@ -4736,6 +4739,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "tonic", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 41399f4b9..47649d3ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,6 +116,7 @@ itertools = "*" tokio-util = "0" uuid = { version = "1", features = ["v4", "serde"] } tower-http = { version = "0", features = ["trace"] } +derive_builder = "0" [patch.crates-io] sqlx = { git = "https://github.com/helium/sqlx.git", rev = "92a2268f02e0cac6fccb34d3e926347071dbb88d" } diff --git a/file_store/Cargo.toml b/file_store/Cargo.toml index 601143fa9..f5ffbef93 100644 --- a/file_store/Cargo.toml +++ b/file_store/Cargo.toml @@ -45,7 +45,7 @@ base64 = {workspace = true} beacon = {workspace = true} sqlx = {workspace = true, optional = true} async-trait = {workspace = true} -derive_builder = "0" +derive_builder = {workspace = true} retainer = {workspace = true} uuid = {workspace = true} h3o = {workspace = true} diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index f34fff2d0..1494af783 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -154,6 +154,9 @@ pub const COVERAGE_OBJECT_INGEST_REPORT: &str = "coverage_object_ingest_report"; pub const SENIORITY_UPDATE: &str = "seniority_update"; pub const BOOSTED_HEX_UPDATE: &str = "boosted_hex_update"; pub const ORACLE_BOOSTING_REPORT: &str = "oracle_boosting_report"; +pub const URBANIZATION_DATA_SET: &str = "urbanization"; +pub const FOOTFALL_DATA_SET: &str = "footfall"; +pub const LANDTYPE_DATA_SET: &str = "landtype"; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Copy, strum::EnumCount)] #[serde(rename_all = "snake_case")] @@ -200,6 +203,9 @@ pub enum FileType { RadioThresholdReq, RadioThresholdIngestReport, VerifiedRadioThresholdIngestReport, + UrbanizationDataSet, + FootfallDataSet, + LandtypeDataSet, InvalidatedRadioThresholdReq, InvalidatedRadioThresholdIngestReport, VerifiedInvalidatedRadioThresholdIngestReport, @@ -261,6 +267,9 @@ impl fmt::Display for FileType { Self::SeniorityUpdate => SENIORITY_UPDATE, Self::BoostedHexUpdate => BOOSTED_HEX_UPDATE, Self::OracleBoostingReport => ORACLE_BOOSTING_REPORT, + Self::UrbanizationDataSet => URBANIZATION_DATA_SET, + Self::FootfallDataSet => FOOTFALL_DATA_SET, + Self::LandtypeDataSet => LANDTYPE_DATA_SET, }; f.write_str(s) } @@ -322,6 +331,9 @@ impl FileType { Self::SeniorityUpdate => SENIORITY_UPDATE, Self::BoostedHexUpdate => BOOSTED_HEX_UPDATE, Self::OracleBoostingReport => ORACLE_BOOSTING_REPORT, + Self::UrbanizationDataSet => URBANIZATION_DATA_SET, + Self::FootfallDataSet => FOOTFALL_DATA_SET, + Self::LandtypeDataSet => LANDTYPE_DATA_SET, } } } @@ -383,6 +395,9 @@ impl FromStr for FileType { SENIORITY_UPDATE => Self::SeniorityUpdate, BOOSTED_HEX_UPDATE => Self::BoostedHexUpdate, ORACLE_BOOSTING_REPORT => Self::OracleBoostingReport, + URBANIZATION_DATA_SET => Self::UrbanizationDataSet, + FOOTFALL_DATA_SET => Self::FootfallDataSet, + LANDTYPE_DATA_SET => Self::LandtypeDataSet, _ => return Err(Error::from(io::Error::from(io::ErrorKind::InvalidInput))), }; Ok(result) diff --git a/mobile_verifier/Cargo.toml b/mobile_verifier/Cargo.toml index 100343a09..044256d9d 100644 --- a/mobile_verifier/Cargo.toml +++ b/mobile_verifier/Cargo.toml @@ -8,6 +8,7 @@ authors.workspace = true [dependencies] anyhow = { workspace = true } +async-compression = { version = "0", features = ["tokio", "gzip"] } config = { workspace = true } thiserror = { workspace = true } serde = { workspace = true } @@ -37,6 +38,7 @@ rust_decimal = { workspace = true } rust_decimal_macros = { workspace = true } tonic = { workspace = true } tokio-stream = { workspace = true } +tokio-util = { workspace = true } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } mobile-config = { path = "../mobile_config" } @@ -51,6 +53,8 @@ retainer = { workspace = true } uuid = { workspace = true } task-manager = { path = "../task_manager" } solana-sdk = { workspace = true } +derive_builder = { workspace = true } +regex = "1" humantime-serde = { workspace = true } custom-tracing = { path = "../custom_tracing" } diff --git a/mobile_verifier/migrations/33_data_sets.sql b/mobile_verifier/migrations/33_data_sets.sql new file mode 100644 index 000000000..9c726bf9b --- /dev/null +++ b/mobile_verifier/migrations/33_data_sets.sql @@ -0,0 +1,26 @@ +DO $$ BEGIN +CREATE TYPE data_set_status AS enum ( + 'pending', + 'downloaded', + 'processed' +); +EXCEPTION + WHEN duplicate_object THEN null; +END $$; + +DO $$ BEGIN +CREATE TYPE data_set_type AS enum ( + 'urbanization', + 'footfall', + 'landtype' +); +EXCEPTION + WHEN duplicate_object THEN null; +END $$; + +CREATE TABLE IF NOT EXISTS hex_assignment_data_set_status ( + filename TEXT PRIMARY KEY, + data_set data_set_type NOT NULL, + time_to_use TIMESTAMPTZ NOT NULL, + status data_set_status NOT NULL +); diff --git a/mobile_verifier/src/boosting_oracles/assignment.rs b/mobile_verifier/src/boosting_oracles/assignment.rs index b02f219ac..8d8a37321 100644 --- a/mobile_verifier/src/boosting_oracles/assignment.rs +++ b/mobile_verifier/src/boosting_oracles/assignment.rs @@ -1,8 +1,11 @@ +use anyhow::Result; use helium_proto::services::poc_mobile::oracle_boosting_hex_assignment::Assignment as ProtoAssignment; use rust_decimal::Decimal; use rust_decimal_macros::dec; use std::fmt; +use super::HexAssignment; + #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] pub struct HexAssignments { pub footfall: Assignment, @@ -58,11 +61,20 @@ impl fmt::Display for Assignment { } impl HexAssignments { + pub fn builder(cell: hextree::Cell) -> HexAssignmentsBuilder { + HexAssignmentsBuilder { + cell, + footfall: None, + landtype: None, + urbanized: None, + } + } + pub fn boosting_multiplier(&self) -> Decimal { let HexAssignments { footfall, - urbanized, landtype, + urbanized, } = self; use Assignment::*; @@ -97,6 +109,47 @@ impl HexAssignments { } } +pub struct HexAssignmentsBuilder { + cell: hextree::Cell, + footfall: Option>, + landtype: Option>, + urbanized: Option>, +} + +impl HexAssignmentsBuilder { + pub fn footfall(mut self, footfall: &impl HexAssignment) -> Self { + self.footfall = Some(footfall.assignment(self.cell)); + self + } + + pub fn landtype(mut self, landtype: &impl HexAssignment) -> Self { + self.landtype = Some(landtype.assignment(self.cell)); + self + } + + pub fn urbanized(mut self, urbanized: &impl HexAssignment) -> Self { + self.urbanized = Some(urbanized.assignment(self.cell)); + self + } + + pub fn build(self) -> anyhow::Result { + let Some(footfall) = self.footfall else { + anyhow::bail!("footfall assignment not set"); + }; + let Some(landtype) = self.landtype else { + anyhow::bail!("landtype assignment not set"); + }; + let Some(urbanized) = self.urbanized else { + anyhow::bail!("urbanized assignment not set"); + }; + Ok(HexAssignments { + footfall: footfall?, + urbanized: urbanized?, + landtype: landtype?, + }) + } +} + #[cfg(test)] impl HexAssignments { pub fn test_best() -> Self { diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs new file mode 100644 index 000000000..76eaec1a4 --- /dev/null +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -0,0 +1,733 @@ +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + pin::pin, + time::Duration, +}; + +use chrono::{DateTime, Utc}; +use file_store::{ + file_sink::{self, FileSinkClient}, + file_upload::FileUpload, + traits::{TimestampDecode, TimestampEncode}, + FileStore, FileType, +}; +use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use helium_proto::services::poc_mobile as proto; +use lazy_static::lazy_static; +use regex::Regex; +use rust_decimal::prelude::ToPrimitive; +use rust_decimal_macros::dec; +use sqlx::{FromRow, PgPool, QueryBuilder}; +use task_manager::{ManagedTask, TaskManager}; +use tokio::{fs::File, io::AsyncWriteExt, time::Instant}; + +use crate::{ + boosting_oracles::assignment::HexAssignments, + coverage::{NewCoverageObjectNotification, SignalLevel}, + Settings, +}; + +use super::{ + footfall::Footfall, landtype::Landtype, urbanization::Urbanization, HexAssignment, HexBoostData, +}; + +#[async_trait::async_trait] +pub trait DataSet: HexAssignment + Send + Sync + 'static { + const TYPE: DataSetType; + + fn timestamp(&self) -> Option>; + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()>; + + fn is_ready(&self) -> bool; + + async fn fetch_first_data_set( + &mut self, + pool: &PgPool, + data_set_directory: &Path, + ) -> anyhow::Result<()> { + let Some(first_data_set) = db::fetch_latest_processed_data_set(pool, Self::TYPE).await? + else { + return Ok(()); + }; + let path = get_data_set_path(data_set_directory, Self::TYPE, first_data_set.time_to_use); + self.update(Path::new(&path), first_data_set.time_to_use)?; + Ok(()) + } + + async fn check_for_available_data_sets( + &self, + store: &FileStore, + pool: &PgPool, + ) -> anyhow::Result<()> { + let mut new_data_sets = store.list(Self::TYPE.to_prefix(), self.timestamp(), None); + while let Some(new_data_set) = new_data_sets.next().await.transpose()? { + db::insert_new_data_set(pool, &new_data_set.key, Self::TYPE, new_data_set.timestamp) + .await?; + } + Ok(()) + } + + async fn fetch_next_available_data_set( + &mut self, + store: &FileStore, + pool: &PgPool, + data_set_directory: &Path, + ) -> anyhow::Result> { + self.check_for_available_data_sets(store, pool).await?; + + tracing::info!("Checking for new {} data sets", Self::TYPE.to_prefix()); + let latest_unprocessed_data_set = + db::fetch_latest_unprocessed_data_set(pool, Self::TYPE, self.timestamp()).await?; + + let Some(latest_unprocessed_data_set) = latest_unprocessed_data_set else { + return Ok(None); + }; + + let path = get_data_set_path( + data_set_directory, + Self::TYPE, + latest_unprocessed_data_set.time_to_use, + ); + + if !latest_unprocessed_data_set.status.is_downloaded() { + download_data_set(store, &latest_unprocessed_data_set.filename, &path).await?; + latest_unprocessed_data_set.mark_as_downloaded(pool).await?; + tracing::info!( + data_set = latest_unprocessed_data_set.filename, + "Data set download complete" + ); + } + + self.update(Path::new(&path), latest_unprocessed_data_set.time_to_use)?; + + Ok(Some(latest_unprocessed_data_set)) + } +} + +pub struct DataSetDownloaderDaemon { + pool: PgPool, + data_sets: HexBoostData, + store: FileStore, + oracle_boosting_sink: FileSinkClient, + data_set_directory: PathBuf, + new_coverage_object_notification: NewCoverageObjectNotification, + poll_duration: Duration, +} + +#[derive(FromRow)] +pub struct NewDataSet { + filename: String, + time_to_use: DateTime, + status: DataSetStatus, +} + +impl NewDataSet { + async fn mark_as_downloaded(&self, pool: &PgPool) -> anyhow::Result<()> { + db::set_data_set_status(pool, &self.filename, DataSetStatus::Downloaded).await?; + Ok(()) + } + + async fn mark_as_processed(&self, pool: &PgPool) -> anyhow::Result<()> { + db::set_data_set_status(pool, &self.filename, DataSetStatus::Processed).await?; + Ok(()) + } +} + +#[derive(Copy, Clone, sqlx::Type)] +#[sqlx(type_name = "data_set_status")] +#[sqlx(rename_all = "lowercase")] +pub enum DataSetStatus { + Pending, + Downloaded, + Processed, +} + +impl DataSetStatus { + pub fn is_downloaded(&self) -> bool { + matches!(self, Self::Downloaded) + } +} + +impl ManagedTask for DataSetDownloaderDaemon +where + A: DataSet, + B: DataSet, + C: DataSet, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> futures::prelude::future::LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(async move { + #[rustfmt::skip] + tokio::select! { + biased; + _ = shutdown.clone() => Ok(()), + result = self.run() => result, + } + }); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + +impl DataSetDownloaderDaemon { + pub async fn create_managed_task( + pool: PgPool, + settings: &Settings, + file_upload: FileUpload, + new_coverage_object_notification: NewCoverageObjectNotification, + ) -> anyhow::Result { + let (oracle_boosting_reports, oracle_boosting_reports_server) = + file_sink::FileSinkBuilder::new( + FileType::OracleBoostingReport, + settings.store_base_path(), + file_upload.clone(), + concat!(env!("CARGO_PKG_NAME"), "_oracle_boosting_report"), + ) + .auto_commit(true) + .roll_time(chrono::Duration::minutes(15)) + .create() + .await?; + + let urbanization = Urbanization::new(); + let footfall = Footfall::new(); + let landtype = Landtype::new(); + let hex_boost_data = HexBoostData::builder() + .footfall(footfall) + .landtype(landtype) + .urbanization(urbanization) + .build()?; + + let data_set_downloader = Self::new( + pool, + hex_boost_data, + FileStore::from_settings(&settings.data_sets).await?, + oracle_boosting_reports, + settings.data_sets_directory.clone(), + new_coverage_object_notification, + settings.data_sets_poll_duration, + ); + + Ok(TaskManager::builder() + .add_task(oracle_boosting_reports_server) + .add_task(data_set_downloader) + .build()) + } +} + +impl DataSetDownloaderDaemon +where + A: DataSet, + B: DataSet, + C: DataSet, +{ + pub fn new( + pool: PgPool, + data_sets: HexBoostData, + store: FileStore, + oracle_boosting_sink: FileSinkClient, + data_set_directory: PathBuf, + new_coverage_object_notification: NewCoverageObjectNotification, + poll_duration: Duration, + ) -> Self { + Self { + pool, + data_sets, + store, + oracle_boosting_sink, + data_set_directory, + new_coverage_object_notification, + poll_duration, + } + } + + async fn check_for_new_data_sets(&mut self) -> anyhow::Result<()> { + let new_urbanized = self + .data_sets + .urbanization + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + let new_footfall = self + .data_sets + .footfall + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + let new_landtype = self + .data_sets + .landtype + .fetch_next_available_data_set(&self.store, &self.pool, &self.data_set_directory) + .await?; + + // If all of the data sets are ready and there is at least one new one, re-process all + // hex assignments: + let new_data_set = + new_urbanized.is_some() || new_footfall.is_some() || new_landtype.is_some(); + if self.data_sets.is_ready() && new_data_set { + tracing::info!("Processing new data sets"); + set_all_oracle_boosting_assignments( + &self.pool, + &self.data_sets, + &self.oracle_boosting_sink, + ) + .await?; + } + + // Mark the new data sets as processed and delete the old ones + if let Some(new_urbanized) = new_urbanized { + new_urbanized.mark_as_processed(&self.pool).await?; + delete_old_data_sets( + &self.data_set_directory, + DataSetType::Urbanization, + new_urbanized.time_to_use, + ) + .await?; + } + if let Some(new_footfall) = new_footfall { + new_footfall.mark_as_processed(&self.pool).await?; + delete_old_data_sets( + &self.data_set_directory, + DataSetType::Footfall, + new_footfall.time_to_use, + ) + .await?; + } + if let Some(new_landtype) = new_landtype { + new_landtype.mark_as_processed(&self.pool).await?; + delete_old_data_sets( + &self.data_set_directory, + DataSetType::Landtype, + new_landtype.time_to_use, + ) + .await?; + } + + Ok(()) + } + + pub async fn run(mut self) -> anyhow::Result<()> { + self.data_sets + .urbanization + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + self.data_sets + .footfall + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + self.data_sets + .landtype + .fetch_first_data_set(&self.pool, &self.data_set_directory) + .await?; + + // Attempt to fill in any unassigned hexes. This is for the edge case in + // which we shutdown before a coverage object updates. + if self.data_sets.is_ready() { + set_unassigned_oracle_boosting_assignments( + &self.pool, + &self.data_sets, + &self.oracle_boosting_sink, + ) + .await?; + } + + let mut wakeup = Instant::now() + self.poll_duration; + loop { + #[rustfmt::skip] + tokio::select! { + _ = self.new_coverage_object_notification.await_new_coverage_object() => { + // If we see a new coverage object, we want to assign only those hexes + // that don't have an assignment + if self.data_sets.is_ready() { + set_unassigned_oracle_boosting_assignments( + &self.pool, + &self.data_sets, + &self.oracle_boosting_sink + ).await?; + } + }, + _ = tokio::time::sleep_until(wakeup) => { + self.check_for_new_data_sets().await?; + wakeup = Instant::now() + self.poll_duration; + } + } + } + } +} + +fn get_data_set_path( + data_set_directory: &Path, + data_set_type: DataSetType, + time_to_use: DateTime, +) -> PathBuf { + let path = PathBuf::from(format!( + "{}.{}.res10.h3tree", + data_set_type.to_prefix(), + time_to_use.timestamp_millis() + )); + let mut dir = data_set_directory.to_path_buf(); + dir.push(path); + dir +} + +lazy_static! { + static ref RE: Regex = Regex::new(r"([a-z,_]+).(\d+)(.res10.h3tree)?").unwrap(); +} + +async fn delete_old_data_sets( + data_set_directory: &Path, + data_set_type: DataSetType, + time_to_use: DateTime, +) -> anyhow::Result<()> { + let mut data_sets = tokio::fs::read_dir(data_set_directory).await?; + while let Some(data_set) = data_sets.next_entry().await? { + let file_name = data_set.file_name(); + let file_name = file_name.to_string_lossy(); + let Some(cap) = RE.captures(&file_name) else { + tracing::warn!("Could not determine data set file type: {}", file_name); + continue; + }; + let prefix = &cap[1]; + let timestamp = cap[2].parse::()?.to_timestamp_millis()?; + if prefix == data_set_type.to_prefix() && timestamp < time_to_use { + tracing::info!(data_set = &*file_name, "Deleting old data set file"); + tokio::fs::remove_file(data_set.path()).await?; + } + } + Ok(()) +} + +async fn download_data_set( + store: &FileStore, + in_file_name: &str, + out_path: &Path, +) -> anyhow::Result<()> { + tracing::info!("Downloading new data set: {}", out_path.to_string_lossy()); + let stream = store.get_raw(in_file_name).await?; + let mut bytes = tokio_util::codec::FramedRead::new( + async_compression::tokio::bufread::GzipDecoder::new(tokio_util::io::StreamReader::new( + stream, + )), + tokio_util::codec::BytesCodec::new(), + ); + let mut file = File::create(&out_path).await?; + while let Some(bytes) = bytes.next().await.transpose()? { + file.write_all(&bytes).await?; + } + Ok(()) +} + +#[derive(Copy, Clone, sqlx::Type)] +#[sqlx(type_name = "data_set_type")] +#[sqlx(rename_all = "lowercase")] +pub enum DataSetType { + Urbanization, + Footfall, + Landtype, +} + +impl DataSetType { + pub fn to_prefix(self) -> &'static str { + match self { + Self::Urbanization => "urbanization", + Self::Footfall => "footfall", + Self::Landtype => "landtype", + } + } +} + +pub mod db { + use super::*; + + pub async fn fetch_latest_file_date( + pool: &PgPool, + data_set_type: DataSetType, + ) -> sqlx::Result>> { + sqlx::query_scalar("SELECT time_to_use FROM hex_assignment_data_set_status WHERE data_set = $1 ORDER BY time_to_use DESC LIMIT 1") + .bind(data_set_type) + .fetch_optional(pool) + .await + } + + pub async fn insert_new_data_set( + pool: &PgPool, + filename: &str, + data_set_type: DataSetType, + time_to_use: DateTime, + ) -> sqlx::Result<()> { + sqlx::query( + r#" + INSERT INTO hex_assignment_data_set_status (filename, data_set, time_to_use, status) + VALUES ($1, $2, $3, 'pending') + ON CONFLICT DO NOTHING + "#, + ) + .bind(filename) + .bind(data_set_type) + .bind(time_to_use) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn fetch_latest_unprocessed_data_set( + pool: &PgPool, + data_set_type: DataSetType, + since: Option>, + ) -> sqlx::Result> { + sqlx::query_as( + "SELECT filename, time_to_use, status FROM hex_assignment_data_set_status WHERE status != 'processed' AND data_set = $1 AND COALESCE(time_to_use > $2, TRUE) AND time_to_use <= $3 ORDER BY time_to_use DESC LIMIT 1" + ) + .bind(data_set_type) + .bind(since) + .bind(Utc::now()) + .fetch_optional(pool) + .await + } + + pub async fn fetch_latest_processed_data_set( + pool: &PgPool, + data_set_type: DataSetType, + ) -> sqlx::Result> { + sqlx::query_as( + "SELECT filename, time_to_use, status FROM hex_assignment_data_set_status WHERE status = 'processed' AND data_set = $1 ORDER BY time_to_use DESC LIMIT 1" + ) + .bind(data_set_type) + .fetch_optional(pool) + .await + } + + pub async fn set_data_set_status( + pool: &PgPool, + filename: &str, + status: DataSetStatus, + ) -> sqlx::Result<()> { + sqlx::query("UPDATE hex_assignment_data_set_status SET status = $1 WHERE filename = $2") + .bind(status) + .bind(filename) + .execute(pool) + .await?; + Ok(()) + } + + pub async fn fetch_time_of_latest_processed_data_set( + pool: &PgPool, + data_set_type: DataSetType, + ) -> sqlx::Result>> { + sqlx::query_scalar( + "SELECT time_to_use FROM hex_assignment_data_set_status WHERE status = 'processed' AND data_set = $1 ORDER BY time_to_use DESC LIMIT 1" + ) + .bind(data_set_type) + .fetch_optional(pool) + .await + } + + /// Check if there are any pending or downloaded files prior to the given reward period + pub async fn check_for_unprocessed_data_sets( + pool: &PgPool, + period_end: DateTime, + ) -> sqlx::Result { + Ok(sqlx::query_scalar( + "SELECT COUNT(*) > 0 FROM hex_assignment_data_set_status WHERE time_to_use <= $1 AND status != 'processed'", + ) + .bind(period_end) + .fetch_one(pool) + .await? + || sqlx::query_scalar( + r#" + SELECT COUNT(*) > 0 FROM coverage_objects + WHERE inserted_at < $1 AND uuid IN ( + SELECT + DISTINCT uuid + FROM + hexes + WHERE + urbanized IS NULL + OR footfall IS NULL + OR landtype IS NULL + ) + "#, + ) + .bind(period_end) + .fetch_one(pool) + .await?) + } + + pub fn fetch_all_hexes(pool: &PgPool) -> impl Stream> + '_ { + sqlx::query_as("SELECT uuid, hex, signal_level, signal_power FROM hexes").fetch(pool) + } + + pub fn fetch_hexes_with_null_assignments( + pool: &PgPool, + ) -> impl Stream> + '_ { + sqlx::query_as( + "SELECT + uuid, hex, signal_level, signal_power + FROM + hexes + WHERE + urbanized IS NULL + OR footfall IS NULL + OR landtype IS NULL", + ) + .fetch(pool) + } +} + +pub struct AssignedCoverageObjects { + pub coverage_objs: HashMap>, +} + +impl AssignedCoverageObjects { + pub async fn assign_hex_stream( + stream: impl Stream>, + data_sets: &HexBoostData, + ) -> anyhow::Result { + let mut coverage_objs = HashMap::>::new(); + let mut stream = pin!(stream); + while let Some(hex) = stream.try_next().await? { + let hex = hex.assign(data_sets)?; + coverage_objs.entry(hex.uuid).or_default().push(hex); + } + Ok(Self { coverage_objs }) + } + + async fn write(&self, boosting_reports: &FileSinkClient) -> file_store::Result { + let timestamp = Utc::now().encode_timestamp(); + for (uuid, hexes) in self.coverage_objs.iter() { + let assignments: Vec<_> = hexes + .iter() + .map(|hex| { + let location = format!("{:x}", hex.hex); + let assignment_multiplier = (hex.assignments.boosting_multiplier() + * dec!(1000)) + .to_u32() + .unwrap_or(0); + proto::OracleBoostingHexAssignment { + location, + urbanized: hex.assignments.urbanized.into(), + footfall: hex.assignments.footfall.into(), + landtype: hex.assignments.landtype.into(), + assignment_multiplier, + } + }) + .collect(); + boosting_reports + .write( + proto::OracleBoostingReportV1 { + coverage_object: Vec::from(uuid.into_bytes()), + assignments, + timestamp, + }, + &[], + ) + .await?; + } + + Ok(()) + } + + pub async fn save(self, pool: &PgPool) -> anyhow::Result<()> { + const NUMBER_OF_FIELDS_IN_QUERY: u16 = 7; + const ASSIGNMENTS_MAX_BATCH_ENTRIES: usize = + (u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize; + + let assigned_hexes: Vec<_> = self.coverage_objs.into_values().flatten().collect(); + for assigned_hexes in assigned_hexes.chunks(ASSIGNMENTS_MAX_BATCH_ENTRIES) { + QueryBuilder::new( + "INSERT INTO hexes (uuid, hex, signal_level, signal_power, footfall, landtype, urbanized)", + ) + .push_values(assigned_hexes, |mut b, hex| { + b.push_bind(hex.uuid) + .push_bind(hex.hex as i64) + .push_bind(hex.signal_level) + .push_bind(hex.signal_power) + .push_bind(hex.assignments.footfall) + .push_bind(hex.assignments.landtype) + .push_bind(hex.assignments.urbanized); + }) + .push( + r#" + ON CONFLICT (uuid, hex) DO UPDATE SET + footfall = EXCLUDED.footfall, + landtype = EXCLUDED.landtype, + urbanized = EXCLUDED.urbanized + "#, + ) + .build() + .execute(pool) + .await?; + } + + Ok(()) + } +} + +#[derive(FromRow)] +pub struct UnassignedHex { + uuid: uuid::Uuid, + #[sqlx(try_from = "i64")] + hex: u64, + signal_level: SignalLevel, + signal_power: i32, +} + +impl UnassignedHex { + fn assign( + self, + data_sets: &HexBoostData, + ) -> anyhow::Result { + let cell = hextree::Cell::try_from(self.hex)?; + let assignments = HexAssignments::builder(cell) + .footfall(&data_sets.footfall) + .landtype(&data_sets.landtype) + .urbanized(&data_sets.urbanization) + .build()?; + Ok(AssignedHex { + uuid: self.uuid, + hex: self.hex, + signal_level: self.signal_level, + signal_power: self.signal_power, + assignments, + }) + } +} + +pub struct AssignedHex { + pub uuid: uuid::Uuid, + pub hex: u64, + pub signal_level: SignalLevel, + pub signal_power: i32, + pub assignments: HexAssignments, +} + +pub async fn set_all_oracle_boosting_assignments( + pool: &PgPool, + data_sets: &HexBoostData, + file_sink: &FileSinkClient, +) -> anyhow::Result<()> { + let assigned_coverage_objs = + AssignedCoverageObjects::assign_hex_stream(db::fetch_all_hexes(pool), data_sets).await?; + assigned_coverage_objs.write(file_sink).await?; + assigned_coverage_objs.save(pool).await?; + Ok(()) +} + +pub async fn set_unassigned_oracle_boosting_assignments( + pool: &PgPool, + data_sets: &HexBoostData, + file_sink: &FileSinkClient, +) -> anyhow::Result<()> { + let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream( + db::fetch_hexes_with_null_assignments(pool), + data_sets, + ) + .await?; + assigned_coverage_objs.write(file_sink).await?; + assigned_coverage_objs.save(pool).await?; + Ok(()) +} diff --git a/mobile_verifier/src/boosting_oracles/footfall.rs b/mobile_verifier/src/boosting_oracles/footfall.rs new file mode 100644 index 000000000..4af2ebaf5 --- /dev/null +++ b/mobile_verifier/src/boosting_oracles/footfall.rs @@ -0,0 +1,70 @@ +use std::path::Path; + +use chrono::{DateTime, Utc}; +use hextree::disktree::DiskTreeMap; + +use super::{Assignment, DataSet, DataSetType, HexAssignment}; + +pub struct Footfall { + footfall: Option, + timestamp: Option>, +} + +impl Footfall { + pub fn new() -> Self { + Self { + footfall: None, + timestamp: None, + } + } + + pub fn new_mock(footfall: DiskTreeMap) -> Self { + Self { + footfall: Some(footfall), + timestamp: None, + } + } +} + +impl Default for Footfall { + fn default() -> Self { + Self::new() + } +} + +#[async_trait::async_trait] +impl DataSet for Footfall { + const TYPE: DataSetType = DataSetType::Footfall; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.footfall = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.footfall.is_some() + } +} + +impl HexAssignment for Footfall { + fn assignment(&self, cell: hextree::Cell) -> anyhow::Result { + let Some(ref footfall) = self.footfall else { + anyhow::bail!("No footfall data set has been loaded"); + }; + + // The footfall disktree maps hexes to a single byte, a value of one indicating + // assignment A and a value of zero indicating assignment B. If no value is present, + // assignment C is given. + match footfall.get(cell)? { + Some((_, &[x])) if x >= 1 => Ok(Assignment::A), + Some((_, &[0])) => Ok(Assignment::B), + None => Ok(Assignment::C), + Some((_, other)) => anyhow::bail!("Unexpected disktree data: {cell:?} {other:?}"), + } + } +} diff --git a/mobile_verifier/src/boosting_oracles/landtype.rs b/mobile_verifier/src/boosting_oracles/landtype.rs new file mode 100644 index 000000000..b6cda7aef --- /dev/null +++ b/mobile_verifier/src/boosting_oracles/landtype.rs @@ -0,0 +1,152 @@ +use std::path::Path; + +use chrono::{DateTime, Utc}; +use hextree::disktree::DiskTreeMap; + +use super::{Assignment, DataSet, DataSetType, HexAssignment}; + +pub struct Landtype { + landtype: Option, + timestamp: Option>, +} + +impl Landtype { + pub fn new() -> Self { + Self { + landtype: None, + timestamp: None, + } + } + + pub fn new_mock(landtype: DiskTreeMap) -> Self { + Self { + landtype: Some(landtype), + timestamp: None, + } + } +} + +impl Default for Landtype { + fn default() -> Self { + Self::new() + } +} + +#[async_trait::async_trait] +impl DataSet for Landtype { + const TYPE: DataSetType = DataSetType::Landtype; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.landtype = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.landtype.is_some() + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum LandtypeValue { + Tree = 10, + Shrub = 20, + Grass = 30, + Crop = 40, + Built = 50, + Bare = 60, + Frozen = 70, + Water = 80, + Wet = 90, + Mangrove = 95, + Moss = 100, +} + +impl std::fmt::Display for LandtypeValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.to_str()) + } +} + +impl LandtypeValue { + pub(crate) fn to_str(self) -> &'static str { + match self { + LandtypeValue::Tree => "TreeCover", + LandtypeValue::Shrub => "Shrubland", + LandtypeValue::Grass => "Grassland", + LandtypeValue::Crop => "Cropland", + LandtypeValue::Built => "BuiltUp", + LandtypeValue::Bare => "BareOrSparseVeg", + LandtypeValue::Frozen => "SnowAndIce", + LandtypeValue::Water => "Water", + LandtypeValue::Wet => "HerbaceousWetland", + LandtypeValue::Mangrove => "Mangroves", + LandtypeValue::Moss => "MossAndLichen", + } + } +} + +impl TryFrom for LandtypeValue { + type Error = anyhow::Error; + fn try_from(other: u8) -> anyhow::Result { + let val = match other { + 10 => LandtypeValue::Tree, + 20 => LandtypeValue::Shrub, + 30 => LandtypeValue::Grass, + 40 => LandtypeValue::Crop, + 50 => LandtypeValue::Built, + 60 => LandtypeValue::Bare, + 70 => LandtypeValue::Frozen, + 80 => LandtypeValue::Water, + 90 => LandtypeValue::Wet, + 95 => LandtypeValue::Mangrove, + 100 => LandtypeValue::Moss, + other => anyhow::bail!("unexpected landtype disktree value: {other:?}"), + }; + Ok(val) + } +} + +impl From for Assignment { + fn from(value: LandtypeValue) -> Self { + match value { + LandtypeValue::Built => Assignment::A, + // + LandtypeValue::Tree => Assignment::B, + LandtypeValue::Shrub => Assignment::B, + LandtypeValue::Grass => Assignment::B, + // + LandtypeValue::Bare => Assignment::C, + LandtypeValue::Crop => Assignment::C, + LandtypeValue::Frozen => Assignment::C, + LandtypeValue::Water => Assignment::C, + LandtypeValue::Wet => Assignment::C, + LandtypeValue::Mangrove => Assignment::C, + LandtypeValue::Moss => Assignment::C, + } + } +} + +impl HexAssignment for Landtype { + fn assignment(&self, cell: hextree::Cell) -> anyhow::Result { + let Some(ref landtype) = self.landtype else { + anyhow::bail!("No landtype data set has been loaded"); + }; + + let Some((_, vals)) = landtype.get(cell)? else { + return Ok(Assignment::C); + }; + + anyhow::ensure!( + vals.len() == 1, + "unexpected landtype disktree data: {cell:?} {vals:?}" + ); + + let cover = LandtypeValue::try_from(vals[0])?; + Ok(cover.into()) + } +} diff --git a/mobile_verifier/src/boosting_oracles/mod.rs b/mobile_verifier/src/boosting_oracles/mod.rs index 5b13f10af..0f63845cb 100644 --- a/mobile_verifier/src/boosting_oracles/mod.rs +++ b/mobile_verifier/src/boosting_oracles/mod.rs @@ -1,162 +1,67 @@ pub mod assignment; +pub mod data_sets; +pub mod footfall; +pub mod landtype; +pub mod urbanization; -use crate::Settings; -pub use assignment::{Assignment, HexAssignments}; -use hextree::disktree::DiskTreeMap; +use std::collections::HashMap; -pub trait BoostedHexAssignments: Send + Sync { - fn assignments(&self, cell: hextree::Cell) -> anyhow::Result; -} +use crate::boosting_oracles::assignment::HexAssignments; +pub use assignment::Assignment; +pub use data_sets::*; -pub struct HexBoostData { - urbanized: DiskTreeMap, - footfall: DiskTreeMap, - landtype: DiskTreeMap, +pub trait HexAssignment: Send + Sync + 'static { + fn assignment(&self, cell: hextree::Cell) -> anyhow::Result; } -pub fn make_hex_boost_data(settings: &Settings) -> anyhow::Result { - let urban_disktree = DiskTreeMap::open(&settings.urbanization_data_set)?; - let footfall_disktree = DiskTreeMap::open(&settings.footfall_data_set)?; - let landtype_disktree = DiskTreeMap::open(&settings.landtype_data_set)?; - - let hex_boost_data = HexBoostData { - urbanized: urban_disktree, - footfall: footfall_disktree, - landtype: landtype_disktree, - }; - - Ok(hex_boost_data) -} -impl BoostedHexAssignments for HexBoostData { - fn assignments(&self, cell: hextree::Cell) -> anyhow::Result { - let footfall = self.footfall_assignment(cell)?; - let urbanized = self.urbanized_assignment(cell)?; - let landtype = self.landtype_assignment(cell)?; - - Ok(HexAssignments { - footfall, - urbanized, - landtype, - }) +impl HexAssignment for HashMap { + fn assignment(&self, cell: hextree::Cell) -> anyhow::Result { + Ok(*self.get(&cell).unwrap()) } } -impl HexBoostData { - fn urbanized_assignment(&self, cell: hextree::Cell) -> anyhow::Result { - match self.urbanized.get(cell)? { - Some((_, &[1])) => Ok(Assignment::A), - Some((_, &[0])) => Ok(Assignment::B), - None => Ok(Assignment::C), - Some((_, other)) => { - anyhow::bail!("unexpected urbanization disktree data: {cell:?} {other:?}") - } - } - } - - fn footfall_assignment(&self, cell: hextree::Cell) -> anyhow::Result { - let Some((_, vals)) = self.footfall.get(cell)? else { - return Ok(Assignment::C); - }; - - match vals { - &[x] if x >= 1 => Ok(Assignment::A), - &[0] => Ok(Assignment::B), - other => anyhow::bail!("unexpected footfall disktree data: {cell:?} {other:?}"), - } - } - - fn landtype_assignment(&self, cell: hextree::Cell) -> anyhow::Result { - let Some((_, vals)) = self.landtype.get(cell)? else { - return Ok(Assignment::C); - }; - - anyhow::ensure!( - vals.len() == 1, - "unexpected landtype disktree data: {cell:?} {vals:?}" - ); - - let cover = Landtype::try_from(vals[0])?; - Ok(cover.into()) - } -} - -impl From for Assignment { - fn from(value: Landtype) -> Self { - match value { - Landtype::Built => Assignment::A, - // - Landtype::Tree => Assignment::B, - Landtype::Shrub => Assignment::B, - Landtype::Grass => Assignment::B, - // - Landtype::Bare => Assignment::C, - Landtype::Crop => Assignment::C, - Landtype::Frozen => Assignment::C, - Landtype::Water => Assignment::C, - Landtype::Wet => Assignment::C, - Landtype::Mangrove => Assignment::C, - Landtype::Moss => Assignment::C, - } +impl HexAssignment for Assignment { + fn assignment(&self, _cell: hextree::Cell) -> anyhow::Result { + Ok(*self) } } -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum Landtype { - Tree = 10, - Shrub = 20, - Grass = 30, - Crop = 40, - Built = 50, - Bare = 60, - Frozen = 70, - Water = 80, - Wet = 90, - Mangrove = 95, - Moss = 100, +#[derive(derive_builder::Builder)] +#[builder(pattern = "owned")] +pub struct HexBoostData { + pub footfall: Foot, + pub landtype: Land, + pub urbanization: Urban, } - -impl std::fmt::Display for Landtype { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(self.to_str()) +impl HexBoostData { + pub fn builder() -> HexBoostDataBuilder { + HexBoostDataBuilder::default() } } -impl Landtype { - pub(crate) fn to_str(self) -> &'static str { - match self { - Landtype::Tree => "TreeCover", - Landtype::Shrub => "Shrubland", - Landtype::Grass => "Grassland", - Landtype::Crop => "Cropland", - Landtype::Built => "BuiltUp", - Landtype::Bare => "BareOrSparseVeg", - Landtype::Frozen => "SnowAndIce", - Landtype::Water => "Water", - Landtype::Wet => "HerbaceousWetland", - Landtype::Mangrove => "Mangroves", - Landtype::Moss => "MossAndLichen", - } +impl HexBoostData +where + Foot: DataSet, + Land: DataSet, + Urban: DataSet, +{ + pub fn is_ready(&self) -> bool { + self.urbanization.is_ready() && self.footfall.is_ready() && self.landtype.is_ready() } } -impl TryFrom for Landtype { - type Error = anyhow::Error; - fn try_from(other: u8) -> anyhow::Result { - let val = match other { - 10 => Landtype::Tree, - 20 => Landtype::Shrub, - 30 => Landtype::Grass, - 40 => Landtype::Crop, - 50 => Landtype::Built, - 60 => Landtype::Bare, - 70 => Landtype::Frozen, - 80 => Landtype::Water, - 90 => Landtype::Wet, - 95 => Landtype::Mangrove, - 100 => Landtype::Moss, - other => anyhow::bail!("unexpected landtype disktree value: {other:?}"), - }; - Ok(val) +impl HexBoostData +where + Foot: HexAssignment, + Land: HexAssignment, + Urban: HexAssignment, +{ + pub fn assignments(&self, cell: hextree::Cell) -> anyhow::Result { + HexAssignments::builder(cell) + .footfall(&self.footfall) + .landtype(&self.landtype) + .urbanized(&self.urbanization) + .build() } } @@ -165,7 +70,9 @@ mod tests { use std::io::Cursor; - use hextree::HexTreeMap; + use hextree::{disktree::DiskTreeMap, HexTreeMap}; + + use self::{footfall::Footfall, landtype::Landtype, urbanization::Urbanization}; use super::*; @@ -316,16 +223,16 @@ mod tests { footfall.to_disktree(Cursor::new(&mut footfall_buff), |w, v| w.write_all(&[*v]))?; landtype.to_disktree(Cursor::new(&mut landtype_buf), |w, v| w.write_all(&[*v]))?; - let urbanized = DiskTreeMap::with_buf(urbanized_buf)?; - let footfall = DiskTreeMap::with_buf(footfall_buff)?; - let landtype = DiskTreeMap::with_buf(landtype_buf)?; + let footfall = Footfall::new_mock(DiskTreeMap::with_buf(footfall_buff)?); + let landtype = Landtype::new_mock(DiskTreeMap::with_buf(landtype_buf)?); + let urbanization = Urbanization::new_mock(DiskTreeMap::with_buf(urbanized_buf)?); // Let the testing commence - let data = HexBoostData { - urbanized, - footfall, - landtype, - }; + let data = HexBoostData::builder() + .footfall(footfall) + .landtype(landtype) + .urbanization(urbanization) + .build()?; // NOTE(mj): formatting ignored to make it easier to see the expected change in assignments. // NOTE(mj): The semicolon at the end of the block is there to keep rust from diff --git a/mobile_verifier/src/boosting_oracles/urbanization.rs b/mobile_verifier/src/boosting_oracles/urbanization.rs new file mode 100644 index 000000000..8070c445c --- /dev/null +++ b/mobile_verifier/src/boosting_oracles/urbanization.rs @@ -0,0 +1,68 @@ +use std::path::Path; + +use chrono::{DateTime, Utc}; +use hextree::disktree::DiskTreeMap; + +use super::{Assignment, DataSet, DataSetType, HexAssignment}; + +pub struct Urbanization { + urbanized: Option, + timestamp: Option>, +} + +impl Urbanization { + pub fn new() -> Self { + Self { + urbanized: None, + timestamp: None, + } + } + + pub fn new_mock(urbanized: DiskTreeMap) -> Self { + Self { + urbanized: Some(urbanized), + timestamp: None, + } + } +} + +impl Default for Urbanization { + fn default() -> Self { + Self::new() + } +} + +#[async_trait::async_trait] +impl DataSet for Urbanization { + const TYPE: DataSetType = DataSetType::Urbanization; + + fn timestamp(&self) -> Option> { + self.timestamp + } + + fn update(&mut self, path: &Path, time_to_use: DateTime) -> anyhow::Result<()> { + self.urbanized = Some(DiskTreeMap::open(path)?); + self.timestamp = Some(time_to_use); + Ok(()) + } + + fn is_ready(&self) -> bool { + self.urbanized.is_some() + } +} + +impl HexAssignment for Urbanization { + fn assignment(&self, cell: hextree::Cell) -> anyhow::Result { + let Some(ref urbanized) = self.urbanized else { + anyhow::bail!("No urbanization data set has been loaded"); + }; + match urbanized.get(cell)? { + Some((_, &[1])) => Ok(Assignment::A), + Some((_, &[0])) => Ok(Assignment::B), + None => Ok(Assignment::C), + Some((_, other)) => { + anyhow::bail!("unexpected urbanization disktree data: {cell:?} {other:?}") + } + } + } +} diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 33381f2c8..c1ded037c 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -1,6 +1,6 @@ use crate::{ - boosting_oracles, - coverage::CoverageDaemon, + boosting_oracles::DataSetDownloaderDaemon, + coverage::{new_coverage_object_notification_channel, CoverageDaemon}, data_session::DataSessionIngestor, geofence::Geofence, heartbeats::{cbrs::CbrsHeartbeatDaemon, wifi::WifiHeartbeatDaemon}, @@ -12,12 +12,7 @@ use crate::{ }; use anyhow::Result; use chrono::Duration; -use file_store::{ - file_sink, - file_upload::{self}, - FileStore, FileType, -}; - +use file_store::{file_sink, file_upload, FileStore, FileType}; use mobile_config::client::{ entity_client::EntityClient, hex_boosting_client::HexBoostingClient, AuthorizationClient, CarrierServiceClient, GatewayClient, @@ -101,6 +96,9 @@ impl Cmd { settings.usa_and_mexico_fencing_resolution()?, )?; + let (new_coverage_obj_notifier, new_coverage_obj_notification) = + new_coverage_object_notification_channel(); + TaskManager::builder() .add_task(file_upload_server) .add_task(valid_heartbeats_server) @@ -114,7 +112,7 @@ impl Cmd { gateway_client.clone(), valid_heartbeats.clone(), seniority_updates.clone(), - usa_geofence.clone(), + usa_geofence, ) .await?, ) @@ -148,7 +146,16 @@ impl Cmd { file_upload.clone(), report_ingest.clone(), auth_client.clone(), - boosting_oracles::make_hex_boost_data(settings)?, + new_coverage_obj_notifier, + ) + .await?, + ) + .add_task( + DataSetDownloaderDaemon::create_managed_task( + pool.clone(), + settings, + file_upload.clone(), + new_coverage_obj_notification, ) .await?, ) diff --git a/mobile_verifier/src/cli/verify_disktree.rs b/mobile_verifier/src/cli/verify_disktree.rs index 4c2aa4529..621b24ad2 100644 --- a/mobile_verifier/src/cli/verify_disktree.rs +++ b/mobile_verifier/src/cli/verify_disktree.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, path::PathBuf}; use hextree::disktree::DiskTreeMap; use crate::{ - boosting_oracles::{Assignment, Landtype}, + boosting_oracles::{landtype::LandtypeValue, Assignment}, Settings, }; @@ -45,7 +45,7 @@ impl Cmd { match self.r#type { DisktreeType::Landtype => { for (key, count) in value_counts { - let landtype = Landtype::try_from(key); + let landtype = LandtypeValue::try_from(key); let assignment = landtype.as_ref().map(|lt| Assignment::from(*lt)); // cover is formatted twice to allow for padding a result println!( diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 506d7e389..11dfe7d88 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -1,5 +1,5 @@ use crate::{ - boosting_oracles::{BoostedHexAssignments, HexAssignments, HexBoostData}, + boosting_oracles::assignment::HexAssignments, heartbeats::{HbType, KeyType, OwnedKeyType}, IsAuthorized, Settings, }; @@ -21,10 +21,7 @@ use h3o::{CellIndex, LatLng}; use helium_crypto::PublicKeyBinary; use helium_proto::services::{ mobile_config::NetworkKeyRole, - poc_mobile::{ - self as proto, CoverageObjectValidity, OracleBoostingReportV1, - SignalLevel as SignalLevelProto, - }, + poc_mobile::{self as proto, CoverageObjectValidity, SignalLevel as SignalLevelProto}, }; use hextree::Cell; use mobile_config::{ @@ -32,7 +29,7 @@ use mobile_config::{ client::AuthorizationClient, }; use retainer::{entry::CacheReadGuard, Cache}; -use rust_decimal::{prelude::ToPrimitive, Decimal}; +use rust_decimal::Decimal; use rust_decimal_macros::dec; use sqlx::{FromRow, PgPool, Pool, Postgres, QueryBuilder, Transaction, Type}; use std::{ @@ -44,7 +41,7 @@ use std::{ time::Instant, }; use task_manager::{ManagedTask, TaskManager}; -use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use uuid::Uuid; #[derive(Copy, Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Type)] @@ -71,11 +68,9 @@ impl From for SignalLevel { pub struct CoverageDaemon { pool: Pool, auth_client: AuthorizationClient, - hex_boost_data: HexBoostData, coverage_objs: Receiver>, - initial_boosting_reports: Option>, coverage_obj_sink: FileSinkClient, - oracle_boosting_sink: FileSinkClient, + new_coverage_object_notifier: NewCoverageObjectNotifier, } impl CoverageDaemon { @@ -85,7 +80,7 @@ impl CoverageDaemon { file_upload: FileUpload, file_store: FileStore, auth_client: AuthorizationClient, - hex_boost_data: HexBoostData, + new_coverage_object_notifier: NewCoverageObjectNotifier, ) -> anyhow::Result { let (valid_coverage_objs, valid_coverage_objs_server) = file_sink::FileSinkBuilder::new( FileType::CoverageObject, @@ -98,19 +93,6 @@ impl CoverageDaemon { .create() .await?; - // Oracle boosting reports - let (oracle_boosting_reports, oracle_boosting_reports_server) = - file_sink::FileSinkBuilder::new( - FileType::OracleBoostingReport, - settings.store_base_path(), - file_upload, - concat!(env!("CARGO_PKG_NAME"), "_oracle_boosting_report"), - ) - .auto_commit(false) - .roll_time(Duration::minutes(15)) - .create() - .await?; - let (coverage_objs, coverage_objs_server) = file_source::continuous_source::() .state(pool.clone()) @@ -124,59 +106,35 @@ impl CoverageDaemon { let coverage_daemon = CoverageDaemon::new( pool, auth_client, - hex_boost_data, coverage_objs, valid_coverage_objs, - oracle_boosting_reports, - ) - .await?; + new_coverage_object_notifier, + ); Ok(TaskManager::builder() .add_task(valid_coverage_objs_server) - .add_task(oracle_boosting_reports_server) .add_task(coverage_objs_server) .add_task(coverage_daemon) .build()) } - pub async fn new( + pub fn new( pool: PgPool, auth_client: AuthorizationClient, - hex_boost_data: HexBoostData, coverage_objs: Receiver>, coverage_obj_sink: FileSinkClient, - oracle_boosting_sink: FileSinkClient, - ) -> anyhow::Result { - tracing::info!("Setting initial values for the urbanization column"); - - let unassigned_hexes = UnassignedHex::fetch(&pool); - let initial_boosting_reports = Some( - set_oracle_boosting_assignments(unassigned_hexes, &hex_boost_data, &pool) - .await? - .collect(), - ); - - Ok(Self { + new_coverage_object_notifier: NewCoverageObjectNotifier, + ) -> Self { + Self { pool, auth_client, - hex_boost_data, coverage_objs, coverage_obj_sink, - oracle_boosting_sink, - initial_boosting_reports, - }) + new_coverage_object_notifier, + } } pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { - let Some(initial_boosting_reports) = std::mem::take(&mut self.initial_boosting_reports) - else { - anyhow::bail!("Initial boosting reports is None"); - }; - self.oracle_boosting_sink - .write_all(initial_boosting_reports) - .await?; - self.oracle_boosting_sink.commit().await?; - loop { tokio::select! { _ = shutdown.clone() => { @@ -219,138 +177,13 @@ impl CoverageDaemon { self.coverage_obj_sink.commit().await?; transaction.commit().await?; - // After writing all of the coverage objects, we set their oracle boosting assignments. This is - // done in two steps to improve the testability of the assignments. - let unassigned_hexes = UnassignedHex::fetch(&self.pool); - let boosting_reports = - set_oracle_boosting_assignments(unassigned_hexes, &self.hex_boost_data, &self.pool) - .await?; - self.oracle_boosting_sink - .write_all(boosting_reports) - .await?; - self.oracle_boosting_sink.commit().await?; + // Tell the data set manager to update the assignments. + self.new_coverage_object_notifier.notify(); Ok(()) } } -#[derive(FromRow)] -pub struct UnassignedHex { - uuid: Uuid, - #[sqlx(try_from = "i64")] - hex: u64, - signal_level: SignalLevel, - signal_power: i32, -} - -impl UnassignedHex { - pub fn fetch(pool: &PgPool) -> impl Stream> + '_ { - sqlx::query_as( - "SELECT - uuid, hex, signal_level, signal_power - FROM - hexes - WHERE - urbanized IS NULL - OR footfall IS NULL - OR landtype IS NULL", - ) - .fetch(pool) - } - - fn to_location_string(&self) -> String { - format!("{:x}", self.hex) - } -} - -pub async fn set_oracle_boosting_assignments( - unassigned_urbinization_hexes: impl Stream>, - hex_boost_data: &impl BoostedHexAssignments, - pool: &PgPool, -) -> anyhow::Result> { - let now = Utc::now(); - - let boost_results = - initialize_unassigned_hexes(unassigned_urbinization_hexes, hex_boost_data, pool).await?; - - Ok(boost_results - .into_iter() - .map( - move |(coverage_object, assignments)| proto::OracleBoostingReportV1 { - coverage_object: Vec::from(coverage_object.into_bytes()), - assignments, - timestamp: now.encode_timestamp(), - }, - )) -} - -async fn initialize_unassigned_hexes( - unassigned_urbinization_hexes: impl Stream>, - hex_boost_data: &impl BoostedHexAssignments, - pool: &Pool, -) -> Result>, anyhow::Error> { - const NUMBER_OF_FIELDS_IN_QUERY: u16 = 7; - const ASSIGNMENTS_MAX_BATCH_ENTRIES: usize = (u16::MAX / NUMBER_OF_FIELDS_IN_QUERY) as usize; - - let mut boost_results = HashMap::>::new(); - - let mut unassigned_hexes = - pin!(unassigned_urbinization_hexes.try_chunks(ASSIGNMENTS_MAX_BATCH_ENTRIES)); - - while let Some(hexes) = unassigned_hexes.try_next().await? { - let hexes: anyhow::Result> = hexes - .into_iter() - .map(|hex| { - let cell = hextree::Cell::from_raw(hex.hex)?; - let assignments = hex_boost_data.assignments(cell)?; - - let location = hex.to_location_string(); - let assignment_multiplier = (assignments.boosting_multiplier() * dec!(1000)) - .to_u32() - .unwrap_or(0); - - boost_results.entry(hex.uuid).or_default().push( - proto::OracleBoostingHexAssignment { - location, - urbanized: assignments.urbanized.into(), - footfall: assignments.footfall.into(), - landtype: assignments.landtype.into(), - assignment_multiplier, - }, - ); - - Ok((hex, assignments)) - }) - .collect(); - - QueryBuilder::new( - "INSERT INTO hexes (uuid, hex, signal_level, signal_power, urbanized, footfall, landtype)", - ) - .push_values(hexes?, |mut b, (hex, assignments)| { - b.push_bind(hex.uuid) - .push_bind(hex.hex as i64) - .push_bind(hex.signal_level) - .push_bind(hex.signal_power) - .push_bind(assignments.urbanized) - .push_bind(assignments.footfall) - .push_bind(assignments.landtype); - }) - .push( - r#" - ON CONFLICT (uuid, hex) DO UPDATE SET - urbanized = EXCLUDED.urbanized, - footfall = EXCLUDED.footfall, - landtype = EXCLUDED.landtype - "#, - ) - .build() - .execute(pool) - .await?; - } - - Ok(boost_results) -} - impl ManagedTask for CoverageDaemon { fn start_task( self: Box, @@ -365,6 +198,31 @@ impl ManagedTask for CoverageDaemon { } } +pub struct NewCoverageObjectNotifier(Sender<()>); + +impl NewCoverageObjectNotifier { + fn notify(&self) { + let _ = self.0.try_send(()); + } +} + +pub struct NewCoverageObjectNotification(Receiver<()>); + +impl NewCoverageObjectNotification { + pub async fn await_new_coverage_object(&mut self) { + let _ = self.0.recv().await; + } +} + +pub fn new_coverage_object_notification_channel( +) -> (NewCoverageObjectNotifier, NewCoverageObjectNotification) { + let (tx, rx) = channel(1); + ( + NewCoverageObjectNotifier(tx), + NewCoverageObjectNotification(rx), + ) +} + pub struct CoverageObject { pub coverage_object: file_store::coverage::CoverageObject, pub validity: CoverageObjectValidity, diff --git a/mobile_verifier/src/geofence.rs b/mobile_verifier/src/geofence.rs index 100aded3e..a944f2ef3 100644 --- a/mobile_verifier/src/geofence.rs +++ b/mobile_verifier/src/geofence.rs @@ -1,18 +1,12 @@ use base64::{engine::general_purpose, Engine as _}; use h3o::{LatLng, Resolution}; use hextree::{Cell, HexTreeSet}; -use std::{collections::HashSet, fs, io::Read, path, sync::Arc}; +use std::{fs, io::Read, path, sync::Arc}; use crate::heartbeats::Heartbeat; -pub trait GeofenceValidator: Clone + Send + Sync + 'static { - fn in_valid_region(&self, t: &T) -> bool; -} - -impl GeofenceValidator for HashSet { - fn in_valid_region(&self, cell: &hextree::Cell) -> bool { - self.contains(cell) - } +pub trait GeofenceValidator: Clone + Send + Sync + 'static { + fn in_valid_region(&self, t: &Heartbeat) -> bool; } #[derive(Clone)] @@ -38,7 +32,7 @@ impl Geofence { } } -impl GeofenceValidator for Geofence { +impl GeofenceValidator for Geofence { fn in_valid_region(&self, heartbeat: &Heartbeat) -> bool { let Ok(lat_lon) = LatLng::new(heartbeat.lat, heartbeat.lon) else { return false; @@ -50,12 +44,6 @@ impl GeofenceValidator for Geofence { } } -impl GeofenceValidator for Geofence { - fn in_valid_region(&self, cell: &hextree::Cell) -> bool { - self.regions.contains(*cell) - } -} - pub fn valid_mapping_regions(encoded_files: Vec) -> anyhow::Result { let mut combined_regions: Vec = Vec::new(); for file in encoded_files { diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index 0275915b6..a95ef1a2a 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -39,7 +39,7 @@ pub struct CbrsHeartbeatDaemon { impl CbrsHeartbeatDaemon where GIR: GatewayResolver, - GFV: GeofenceValidator, + GFV: GeofenceValidator, { #[allow(clippy::too_many_arguments)] pub async fn create_managed_task( @@ -197,7 +197,7 @@ where impl ManagedTask for CbrsHeartbeatDaemon where GIR: GatewayResolver, - GFV: GeofenceValidator, + GFV: GeofenceValidator, { fn start_task( self: Box, diff --git a/mobile_verifier/src/heartbeats/mod.rs b/mobile_verifier/src/heartbeats/mod.rs index 7cdebb18b..49d74339b 100644 --- a/mobile_verifier/src/heartbeats/mod.rs +++ b/mobile_verifier/src/heartbeats/mod.rs @@ -388,7 +388,7 @@ impl ValidatedHeartbeat { max_distance_to_asserted: u32, max_distance_to_coverage: u32, epoch: &Range>, - geofence: &impl GeofenceValidator, + geofence: &impl GeofenceValidator, ) -> anyhow::Result { let Some(coverage_object) = heartbeat.coverage_object else { return Ok(Self::new( @@ -589,7 +589,7 @@ impl ValidatedHeartbeat { max_distance_to_asserted: u32, max_distance_to_coverage: u32, epoch: &'a Range>, - geofence: &'a impl GeofenceValidator, + geofence: &'a impl GeofenceValidator, ) -> impl Stream> + 'a { heartbeats.then(move |heartbeat| async move { Self::validate( diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index 51433e8e7..295a1c8df 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -38,7 +38,7 @@ pub struct WifiHeartbeatDaemon { impl WifiHeartbeatDaemon where GIR: GatewayResolver, - GFV: GeofenceValidator, + GFV: GeofenceValidator, { #[allow(clippy::too_many_arguments)] pub async fn create_managed_task( @@ -188,7 +188,7 @@ where impl ManagedTask for WifiHeartbeatDaemon where GIR: GatewayResolver, - GFV: GeofenceValidator, + GFV: GeofenceValidator, { fn start_task( self: Box, diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 8a0f7d839..c9c3792b3 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -1,4 +1,5 @@ use crate::{ + boosting_oracles::db::check_for_unprocessed_data_sets, coverage, data_session, heartbeats::{self, HeartbeatReward}, radio_threshold, @@ -217,6 +218,11 @@ where tracing::info!("No speedtests found past reward period"); return Ok(false); } + + if check_for_unprocessed_data_sets(&self.pool, reward_period.end).await? { + tracing::info!("Data sets still need to be processed"); + return Ok(false); + } } else { tracing::info!("Complete data checks are disabled for this reward period"); } diff --git a/mobile_verifier/src/settings.rs b/mobile_verifier/src/settings.rs index cfd5d91a6..4a5c681b5 100644 --- a/mobile_verifier/src/settings.rs +++ b/mobile_verifier/src/settings.rs @@ -26,6 +26,9 @@ pub struct Settings { pub ingest: file_store::Settings, pub data_transfer_ingest: file_store::Settings, pub output: file_store::Settings, + /// S3 bucket from which new data sets are downloaded for oracle boosting + /// assignments + pub data_sets: file_store::Settings, pub metrics: poc_metrics::Settings, pub price_tracker: price::price_tracker::Settings, pub config_client: mobile_config::ClientSettings, @@ -41,6 +44,11 @@ pub struct Settings { /// beyond which its location weight will be reduced #[serde(default = "default_max_asserted_distance_deviation")] pub max_asserted_distance_deviation: u32, + /// Directory in which new oracle boosting data sets are downloaded into + pub data_sets_directory: PathBuf, + /// Poll duration for new data sets + #[serde(with = "humantime_serde", default = "default_data_sets_poll_duration")] + pub data_sets_poll_duration: Duration, // Geofencing settings pub usa_and_mexico_geofence_regions: String, #[serde(default = "default_fencing_resolution")] @@ -48,9 +56,6 @@ pub struct Settings { pub usa_geofence_regions: String, #[serde(default = "default_fencing_resolution")] pub usa_fencing_resolution: u8, - pub urbanization_data_set: PathBuf, - pub footfall_data_set: PathBuf, - pub landtype_data_set: PathBuf, } fn default_fencing_resolution() -> u8 { @@ -82,6 +87,10 @@ fn default_reward_period_offset() -> Duration { humantime::parse_duration("30 minutes").unwrap() } +fn default_data_sets_poll_duration() -> Duration { + humantime::parse_duration("30 minutes").unwrap() +} + impl Settings { /// Load Settings from a given path. Settings are loaded from a given /// optional path and can be overriden with environment variables. diff --git a/mobile_verifier/tests/integrations/boosting_oracles.rs b/mobile_verifier/tests/integrations/boosting_oracles.rs index ddaae09f2..3f791d2cc 100644 --- a/mobile_verifier/tests/integrations/boosting_oracles.rs +++ b/mobile_verifier/tests/integrations/boosting_oracles.rs @@ -1,4 +1,3 @@ -use crate::common; use anyhow::Context; use chrono::{DateTime, Duration, Utc}; use file_store::{ @@ -14,11 +13,8 @@ use helium_proto::services::poc_mobile::{ }; use mobile_config::boosted_hex_info::BoostedHexes; use mobile_verifier::{ - boosting_oracles::Assignment, - coverage::{ - set_oracle_boosting_assignments, CoverageClaimTimeCache, CoverageObject, - CoverageObjectCache, Seniority, UnassignedHex, - }, + boosting_oracles::{Assignment, HexBoostData}, + coverage::{CoverageClaimTimeCache, CoverageObject, CoverageObjectCache, Seniority}, geofence::GeofenceValidator, heartbeats::{Heartbeat, HeartbeatReward, LocationCache, SeniorityUpdate, ValidatedHeartbeat}, radio_threshold::VerifiedRadioThresholds, @@ -33,21 +29,17 @@ use sqlx::PgPool; use std::{collections::HashMap, pin::pin}; use uuid::Uuid; +use crate::common; + #[derive(Clone)] struct MockGeofence; -impl GeofenceValidator for MockGeofence { +impl GeofenceValidator for MockGeofence { fn in_valid_region(&self, _heartbeat: &Heartbeat) -> bool { true } } -impl GeofenceValidator for MockGeofence { - fn in_valid_region(&self, _cell: &u64) -> bool { - true - } -} - #[derive(Copy, Clone)] struct AllOwnersValid; @@ -219,11 +211,12 @@ async fn test_footfall_and_urbanization_report(pool: PgPool) -> anyhow::Result<( .await?; transaction.commit().await?; - let unassigned_hexes = UnassignedHex::fetch(&pool); - let hex_boost_data = common::MockHexAssignments::new(footfall, urbanized, landtype); - let oba = set_oracle_boosting_assignments(unassigned_hexes, &hex_boost_data, &pool) - .await? - .collect::>(); + let hex_boost_data = HexBoostData::builder() + .footfall(footfall) + .landtype(landtype) + .urbanization(urbanized) + .build()?; + let oba = common::set_unassigned_oracle_boosting_assignments(&pool, &hex_boost_data).await?; assert_eq!(oba.len(), 1); assert_eq!(oba[0].assignments, hexes); @@ -346,9 +339,12 @@ async fn test_footfall_and_urbanization_and_landtype(pool: PgPool) -> anyhow::Re .await?; transaction.commit().await?; - let unassigned_hexes = UnassignedHex::fetch(&pool); - let hex_boost_data = common::MockHexAssignments::new(footfall, urbanized, landtype); - let _ = set_oracle_boosting_assignments(unassigned_hexes, &hex_boost_data, &pool).await?; + let hex_boost_data = HexBoostData::builder() + .footfall(footfall) + .landtype(landtype) + .urbanization(urbanized) + .build()?; + let _ = common::set_unassigned_oracle_boosting_assignments(&pool, &hex_boost_data).await?; let heartbeats = heartbeats(12, start, &owner, &cbsd_id, 0.0, 0.0, uuid); diff --git a/mobile_verifier/tests/integrations/common/mod.rs b/mobile_verifier/tests/integrations/common/mod.rs index 499e0c695..5722f6a7a 100644 --- a/mobile_verifier/tests/integrations/common/mod.rs +++ b/mobile_verifier/tests/integrations/common/mod.rs @@ -1,10 +1,14 @@ use chrono::{DateTime, Utc}; -use file_store::file_sink::{FileSinkClient, Message as SinkMessage}; +use file_store::{ + file_sink::{FileSinkClient, Message as SinkMessage}, + traits::TimestampEncode, +}; use futures::{stream, StreamExt}; use helium_proto::{ services::poc_mobile::{ - mobile_reward_share::Reward as MobileReward, GatewayReward, MobileRewardShare, RadioReward, - ServiceProviderReward, SpeedtestAvg, SubscriberReward, UnallocatedReward, + mobile_reward_share::Reward as MobileReward, GatewayReward, MobileRewardShare, + OracleBoostingHexAssignment, OracleBoostingReportV1, RadioReward, ServiceProviderReward, + SpeedtestAvg, SubscriberReward, UnallocatedReward, }, Message, }; @@ -12,11 +16,17 @@ use mobile_config::{ boosted_hex_info::{BoostedHexInfo, BoostedHexInfoStream}, client::{hex_boosting_client::HexBoostingInfoResolver, ClientError}, }; -use mobile_verifier::boosting_oracles::{Assignment, BoostedHexAssignments, HexAssignments}; +use mobile_verifier::boosting_oracles::{ + AssignedCoverageObjects, Assignment, HexAssignment, HexBoostData, +}; +use rust_decimal::prelude::ToPrimitive; +use rust_decimal_macros::dec; +use sqlx::PgPool; use std::collections::HashMap; use tokio::{sync::mpsc::error::TryRecvError, time::timeout}; #[derive(Debug, Clone)] +#[allow(dead_code)] pub struct MockHexBoostingClient { boosted_hexes: Vec, } @@ -179,35 +189,65 @@ pub fn seconds(s: u64) -> std::time::Duration { std::time::Duration::from_secs(s) } +pub fn mock_hex_boost_data_default() -> HexBoostData { + HexBoostData::builder() + .urbanization(Assignment::A) + .footfall(Assignment::A) + .landtype(Assignment::A) + .build() + .unwrap() +} + type MockAssignmentMap = HashMap; -#[derive(Default)] -pub struct MockHexAssignments { +#[allow(dead_code)] +pub fn mock_hex_boost_data( footfall: MockAssignmentMap, urbanized: MockAssignmentMap, landtype: MockAssignmentMap, +) -> HexBoostData { + HexBoostData::builder() + .footfall(footfall) + .urbanization(urbanized) + .landtype(landtype) + .build() + .unwrap() } -impl MockHexAssignments { - pub fn new( - footfall: MockAssignmentMap, - urbanized: MockAssignmentMap, - landtype: MockAssignmentMap, - ) -> Self { - Self { - footfall, - urbanized, - landtype, - } - } -} - -impl BoostedHexAssignments for MockHexAssignments { - fn assignments(&self, cell: hextree::Cell) -> anyhow::Result { - Ok(HexAssignments { - footfall: self.footfall.get(&cell).cloned().unwrap_or(Assignment::A), - urbanized: self.urbanized.get(&cell).cloned().unwrap_or(Assignment::A), - landtype: self.landtype.get(&cell).cloned().unwrap_or(Assignment::A), - }) +pub async fn set_unassigned_oracle_boosting_assignments( + pool: &PgPool, + data_sets: &HexBoostData, +) -> anyhow::Result> { + let assigned_coverage_objs = AssignedCoverageObjects::assign_hex_stream( + mobile_verifier::boosting_oracles::data_sets::db::fetch_hexes_with_null_assignments(pool), + data_sets, + ) + .await?; + let timestamp = Utc::now().encode_timestamp(); + let mut output = Vec::new(); + for (uuid, hexes) in assigned_coverage_objs.coverage_objs.iter() { + let assignments: Vec<_> = hexes + .iter() + .map(|hex| { + let location = format!("{:x}", hex.hex); + let assignment_multiplier = (hex.assignments.boosting_multiplier() * dec!(1000)) + .to_u32() + .unwrap_or(0); + OracleBoostingHexAssignment { + location, + urbanized: hex.assignments.urbanized.into(), + footfall: hex.assignments.footfall.into(), + landtype: hex.assignments.landtype.into(), + assignment_multiplier, + } + }) + .collect(); + output.push(OracleBoostingReportV1 { + coverage_object: Vec::from(uuid.into_bytes()), + assignments, + timestamp, + }); } + assigned_coverage_objs.save(pool).await?; + Ok(output) } diff --git a/mobile_verifier/tests/integrations/heartbeats.rs b/mobile_verifier/tests/integrations/heartbeats.rs index 7b759d45b..61601bb08 100644 --- a/mobile_verifier/tests/integrations/heartbeats.rs +++ b/mobile_verifier/tests/integrations/heartbeats.rs @@ -385,18 +385,12 @@ fn signal_level(hex: &str, signal_level: SignalLevel) -> anyhow::Result for MockGeofence { +impl GeofenceValidator for MockGeofence { fn in_valid_region(&self, _heartbeat: &Heartbeat) -> bool { true } } -impl GeofenceValidator for MockGeofence { - fn in_valid_region(&self, _cell: &u64) -> bool { - true - } -} - #[derive(Copy, Clone)] struct AllOwnersValid; diff --git a/mobile_verifier/tests/integrations/hex_boosting.rs b/mobile_verifier/tests/integrations/hex_boosting.rs index 4266824c1..6dd082915 100644 --- a/mobile_verifier/tests/integrations/hex_boosting.rs +++ b/mobile_verifier/tests/integrations/hex_boosting.rs @@ -14,7 +14,7 @@ use hextree::Cell; use mobile_config::boosted_hex_info::BoostedHexInfo; use mobile_verifier::{ cell_type::CellType, - coverage::{set_oracle_boosting_assignments, CoverageObject, UnassignedHex}, + coverage::CoverageObject, heartbeats::{HbType, Heartbeat, ValidatedHeartbeat}, radio_threshold, reward_shares, rewarder, speedtests, }; @@ -34,11 +34,9 @@ const BOOST_HEX_PUBKEY: &str = "J9JiLTpjaShxL8eMvUs8txVw6TZ36E38SiJ89NxnMbLU"; const BOOST_CONFIG_PUBKEY: &str = "BZM1QTud72B2cpTW7PhEnFmRX7ZWzvY7DpPpNJJuDrWG"; async fn update_assignments(pool: &PgPool) -> anyhow::Result<()> { - let unassigned_hexes = UnassignedHex::fetch(pool); - let _ = set_oracle_boosting_assignments( - unassigned_hexes, - &common::MockHexAssignments::default(), + let _ = common::set_unassigned_oracle_boosting_assignments( pool, + &common::mock_hex_boost_data_default(), ) .await?; Ok(()) diff --git a/mobile_verifier/tests/integrations/modeled_coverage.rs b/mobile_verifier/tests/integrations/modeled_coverage.rs index 8adc09246..1069f166f 100644 --- a/mobile_verifier/tests/integrations/modeled_coverage.rs +++ b/mobile_verifier/tests/integrations/modeled_coverage.rs @@ -1,4 +1,3 @@ -use crate::common; use chrono::{DateTime, Duration, Utc}; use file_store::{ coverage::{CoverageObjectIngestReport, RadioHexSignalLevel}, @@ -16,10 +15,7 @@ use hextree::Cell; use mobile_config::boosted_hex_info::{BoostedHexInfo, BoostedHexes}; use mobile_verifier::{ - coverage::{ - set_oracle_boosting_assignments, CoverageClaimTimeCache, CoverageObject, - CoverageObjectCache, Seniority, UnassignedHex, - }, + coverage::{CoverageClaimTimeCache, CoverageObject, CoverageObjectCache, Seniority}, geofence::GeofenceValidator, heartbeats::{ Heartbeat, HeartbeatReward, KeyType, LocationCache, SeniorityUpdate, ValidatedHeartbeat, @@ -36,21 +32,17 @@ use sqlx::PgPool; use std::{collections::HashMap, num::NonZeroU32, ops::Range, pin::pin, str::FromStr}; use uuid::Uuid; +use crate::common; + #[derive(Clone)] struct MockGeofence; -impl GeofenceValidator for MockGeofence { +impl GeofenceValidator for MockGeofence { fn in_valid_region(&self, _heartbeat: &Heartbeat) -> bool { true } } -impl GeofenceValidator for MockGeofence { - fn in_valid_region(&self, _cell: &hextree::Cell) -> bool { - true - } -} - const BOOST_HEX_PUBKEY: &str = "J9JiLTpjaShxL8eMvUs8txVw6TZ36E38SiJ89NxnMbLU"; const BOOST_CONFIG_PUBKEY: &str = "BZM1QTud72B2cpTW7PhEnFmRX7ZWzvY7DpPpNJJuDrWG"; @@ -413,11 +405,9 @@ async fn process_input( } transaction.commit().await?; - let unassigned_hexes = UnassignedHex::fetch(pool); - let _ = set_oracle_boosting_assignments( - unassigned_hexes, - &common::MockHexAssignments::default(), + let _ = common::set_unassigned_oracle_boosting_assignments( pool, + &common::mock_hex_boost_data_default(), ) .await?; diff --git a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs index 8909e4ca2..f3eaf1e4a 100644 --- a/mobile_verifier/tests/integrations/rewarder_poc_dc.rs +++ b/mobile_verifier/tests/integrations/rewarder_poc_dc.rs @@ -11,7 +11,7 @@ use helium_proto::services::poc_mobile::{ }; use mobile_verifier::{ cell_type::CellType, - coverage::{set_oracle_boosting_assignments, CoverageObject, UnassignedHex}, + coverage::CoverageObject, data_session, heartbeats::{HbType, Heartbeat, ValidatedHeartbeat}, reward_shares, rewarder, speedtests, @@ -265,11 +265,9 @@ async fn seed_heartbeats( } async fn update_assignments(pool: &PgPool) -> anyhow::Result<()> { - let unassigned_hexes = UnassignedHex::fetch(pool); - let _ = set_oracle_boosting_assignments( - unassigned_hexes, - &common::MockHexAssignments::default(), + let _ = common::set_unassigned_oracle_boosting_assignments( pool, + &common::mock_hex_boost_data_default(), ) .await?; Ok(())