diff --git a/Cargo.lock b/Cargo.lock index 6958ec2d2..932ee6aa0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3452,8 +3452,10 @@ dependencies = [ "serde_json", "sha2 0.10.6", "sqlx", + "task-manager", "thiserror", "tokio", + "tokio-util", "tonic", "tracing", "tracing-subscriber", @@ -4786,8 +4788,10 @@ dependencies = [ "serde_json", "solana-client", "solana-sdk", + "task-manager", "thiserror", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "triggered", diff --git a/iot_verifier/Cargo.toml b/iot_verifier/Cargo.toml index 6b96b75da..302020f5b 100644 --- a/iot_verifier/Cargo.toml +++ b/iot_verifier/Cargo.toml @@ -53,3 +53,5 @@ itertools = {workspace = true} rand = {workspace = true} beacon = {workspace = true} price = { path = "../price" } +tokio-util = { workspace = true } +task-manager = { path = "../task_manager" } diff --git a/iot_verifier/src/entropy_loader.rs b/iot_verifier/src/entropy_loader.rs index 04398418f..5f858cf22 100644 --- a/iot_verifier/src/entropy_loader.rs +++ b/iot_verifier/src/entropy_loader.rs @@ -1,12 +1,14 @@ use crate::entropy::Entropy; use blake3::hash; use file_store::{entropy_report::EntropyReport, file_info_poller::FileInfoStream}; -use futures::{StreamExt, TryStreamExt}; +use futures::{future::LocalBoxFuture, StreamExt, TryStreamExt}; use sqlx::PgPool; +use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; pub struct EntropyLoader { pub pool: PgPool, + pub file_receiver: Receiver>, } #[derive(thiserror::Error, Debug)] @@ -17,24 +19,28 @@ pub enum NewLoaderError { DbStoreError(#[from] db_store::Error), } +impl ManagedTask for EntropyLoader { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } +} + impl EntropyLoader { - pub async fn run( - &mut self, - mut receiver: Receiver>, - shutdown: &triggered::Listener, - ) -> anyhow::Result<()> { + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("starting entropy_loader"); loop { - if shutdown.is_triggered() { - break; - } tokio::select! { + biased; _ = shutdown.clone() => break, - msg = receiver.recv() => if let Some(stream) = msg { + msg = self.file_receiver.recv() => if let Some(stream) = msg { self.handle_report(stream).await?; } } } - tracing::info!("stopping verifier entropy_loader"); + tracing::info!("stopping entropy_loader"); Ok(()) } diff --git a/iot_verifier/src/gateway_cache.rs b/iot_verifier/src/gateway_cache.rs index fea190b32..4b76018e8 100644 --- a/iot_verifier/src/gateway_cache.rs +++ b/iot_verifier/src/gateway_cache.rs @@ -2,6 +2,7 @@ use crate::gateway_updater::MessageReceiver; use helium_crypto::PublicKeyBinary; use iot_config::gateway_info::GatewayInfo; +#[derive(Clone)] pub struct GatewayCache { gateway_cache_receiver: MessageReceiver, } diff --git a/iot_verifier/src/gateway_updater.rs b/iot_verifier/src/gateway_updater.rs index 955c5a699..1c5ef5719 100644 --- a/iot_verifier/src/gateway_updater.rs +++ b/iot_verifier/src/gateway_updater.rs @@ -1,12 +1,13 @@ use crate::Settings; use chrono::Duration; -use futures::stream::StreamExt; +use futures::{future::LocalBoxFuture, stream::StreamExt, TryFutureExt}; use helium_crypto::PublicKeyBinary; use iot_config::{ client::{Client as IotConfigClient, ClientError as IotConfigClientError}, gateway_info::{GatewayInfo, GatewayInfoResolver}, }; use std::collections::HashMap; +use task_manager::ManagedTask; use tokio::sync::watch; use tokio::time; @@ -28,6 +29,20 @@ pub enum GatewayUpdaterError { SendError(#[from] watch::error::SendError), } +impl ManagedTask for GatewayUpdater { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + impl GatewayUpdater { pub async fn from_settings( settings: &Settings, @@ -45,26 +60,22 @@ impl GatewayUpdater { )) } - pub async fn run(mut self, shutdown: &triggered::Listener) -> Result<(), GatewayUpdaterError> { + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { tracing::info!("starting gateway_updater"); - let mut trigger_timer = time::interval( self.refresh_interval .to_std() .expect("valid interval in seconds"), ); - loop { - if shutdown.is_triggered() { - tracing::info!("stopping gateway_updater"); - return Ok(()); - } - tokio::select! { + biased; + _ = shutdown.clone() => break, _ = trigger_timer.tick() => self.handle_refresh_tick().await?, - _ = shutdown.clone() => return Ok(()), } } + tracing::info!("stopping gateway_updater"); + Ok(()) } async fn handle_refresh_tick(&mut self) -> Result<(), GatewayUpdaterError> { diff --git a/iot_verifier/src/hex_density.rs b/iot_verifier/src/hex_density.rs index b0ba46ef8..f72457f8b 100644 --- a/iot_verifier/src/hex_density.rs +++ b/iot_verifier/src/hex_density.rs @@ -64,28 +64,25 @@ lazy_static! { }; } -#[async_trait::async_trait] -pub trait HexDensityMap: Clone { - async fn get(&self, hex: u64) -> Option; - async fn swap(&self, new_map: HashMap); -} - #[derive(Debug, Clone)] -pub struct SharedHexDensityMap(Arc>>); +pub struct HexDensityMap(Arc>>); + +impl Default for HexDensityMap { + fn default() -> Self { + Self::new() + } +} -impl SharedHexDensityMap { +impl HexDensityMap { pub fn new() -> Self { Self(Arc::new(RwLock::new(HashMap::new()))) } -} -#[async_trait::async_trait] -impl HexDensityMap for SharedHexDensityMap { - async fn get(&self, hex: u64) -> Option { + pub async fn get(&self, hex: u64) -> Option { self.0.read().await.get(&hex).cloned() } - async fn swap(&self, new_map: HashMap) { + pub async fn swap(&self, new_map: HashMap) { *self.0.write().await = new_map; } } diff --git a/iot_verifier/src/loader.rs b/iot_verifier/src/loader.rs index 5cc539666..0be84f53d 100644 --- a/iot_verifier/src/loader.rs +++ b/iot_verifier/src/loader.rs @@ -13,10 +13,11 @@ use file_store::{ traits::{IngestId, MsgDecode}, FileInfo, FileStore, FileType, }; -use futures::{stream, StreamExt}; +use futures::{future::LocalBoxFuture, stream, StreamExt}; use helium_crypto::PublicKeyBinary; use sqlx::PgPool; use std::{hash::Hasher, ops::DerefMut, str::FromStr}; +use task_manager::ManagedTask; use tokio::{ sync::Mutex, time::{self, MissedTickBehavior}, @@ -33,6 +34,7 @@ pub struct Loader { window_width: ChronoDuration, ingestor_rollup_time: ChronoDuration, max_lookback_age: ChronoDuration, + gateway_cache: GatewayCache, } #[derive(thiserror::Error, Debug)] @@ -48,8 +50,21 @@ pub enum ValidGatewayResult { Unknown, } +impl ManagedTask for Loader { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } +} + impl Loader { - pub async fn from_settings(settings: &Settings, pool: PgPool) -> Result { + pub async fn from_settings( + settings: &Settings, + pool: PgPool, + gateway_cache: GatewayCache, + ) -> Result { tracing::info!("from_settings verifier loader"); let ingest_store = FileStore::from_settings(&settings.ingest).await?; let poll_time = settings.poc_loader_poll_time(); @@ -63,24 +78,19 @@ impl Loader { window_width, ingestor_rollup_time, max_lookback_age, + gateway_cache, }) } - pub async fn run( - &mut self, - shutdown: &triggered::Listener, - gateway_cache: &GatewayCache, - ) -> anyhow::Result<()> { - tracing::info!("started verifier loader"); + pub async fn run(self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("starting loader"); let mut report_timer = time::interval(self.poll_time); report_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { - if shutdown.is_triggered() { - break; - } tokio::select! { + biased; _ = shutdown.clone() => break, - _ = report_timer.tick() => match self.handle_report_tick(gateway_cache).await { + _ = report_timer.tick() => match self.handle_report_tick().await { Ok(()) => (), Err(err) => { tracing::error!("loader error, report_tick triggered: {err:?}"); @@ -88,11 +98,11 @@ impl Loader { } } } - tracing::info!("stopping verifier loader"); + tracing::info!("stopping loader"); Ok(()) } - async fn handle_report_tick(&self, gateway_cache: &GatewayCache) -> anyhow::Result<()> { + async fn handle_report_tick(&self) -> anyhow::Result<()> { tracing::info!("handling report tick"); let now = Utc::now(); // the loader loads files from s3 via a sliding window @@ -124,7 +134,7 @@ impl Loader { tracing::info!("current window width insufficient. completed handling poc_report tick"); return Ok(()); } - self.process_window(gateway_cache, after, before).await?; + self.process_window(after, before).await?; Meta::update_last_timestamp(&self.pool, REPORTS_META_NAME, Some(before)).await?; Report::pending_beacons_to_ready(&self.pool, now).await?; tracing::info!("completed handling poc_report tick"); @@ -133,7 +143,6 @@ impl Loader { async fn process_window( &self, - gateway_cache: &GatewayCache, after: DateTime, before: DateTime, ) -> anyhow::Result<()> { @@ -150,7 +159,6 @@ impl Loader { .process_events( FileType::IotBeaconIngestReport, &self.ingest_store, - gateway_cache, after, before, Some(&xor_data), @@ -191,7 +199,6 @@ impl Loader { .process_events( FileType::IotWitnessIngestReport, &self.ingest_store, - gateway_cache, after - (self.ingestor_rollup_time + ChronoDuration::seconds(120)), before + (self.ingestor_rollup_time + ChronoDuration::seconds(120)), None, @@ -213,7 +220,6 @@ impl Loader { &self, file_type: FileType, store: &FileStore, - gateway_cache: &GatewayCache, after: chrono::DateTime, before: chrono::DateTime, xor_data: Option<&Mutex>>, @@ -232,13 +238,7 @@ impl Loader { stream::iter(infos) .for_each_concurrent(10, |file_info| async move { match self - .process_file( - store, - file_info.clone(), - gateway_cache, - xor_data, - xor_filter, - ) + .process_file(store, file_info.clone(), xor_data, xor_filter) .await { Ok(()) => tracing::debug!( @@ -262,7 +262,6 @@ impl Loader { &self, store: &FileStore, file_info: FileInfo, - gateway_cache: &GatewayCache, xor_data: Option<&Mutex>>, xor_filter: Option<&Xor16>, ) -> anyhow::Result<()> { @@ -281,7 +280,7 @@ impl Loader { tracing::warn!("skipping report of type {file_type} due to error {err:?}") } Ok(buf) => { - match self.handle_report(&file_type, &buf, gateway_cache, xor_data, xor_filter, &metrics).await + match self.handle_report(&file_type, &buf, xor_data, xor_filter, &metrics).await { Ok(Some(bindings)) => inserts.push(bindings), Ok(None) => (), @@ -309,7 +308,6 @@ impl Loader { &self, file_type: &str, buf: &[u8], - gateway_cache: &GatewayCache, xor_data: Option<&Mutex>>, xor_filter: Option<&Xor16>, metrics: &LoaderMetricTracker, @@ -319,10 +317,7 @@ impl Loader { let beacon = IotBeaconIngestReport::decode(buf)?; tracing::debug!("beacon report from ingestor: {:?}", &beacon); let packet_data = beacon.report.data.clone(); - match self - .check_valid_gateway(&beacon.report.pub_key, gateway_cache) - .await - { + match self.check_valid_gateway(&beacon.report.pub_key).await { ValidGatewayResult::Valid => { let res = InsertBindings { id: beacon.ingest_id(), @@ -352,30 +347,25 @@ impl Loader { let packet_data = witness.report.data.clone(); if let Some(filter) = xor_filter { match verify_witness_packet_data(&packet_data, filter) { - true => { - match self - .check_valid_gateway(&witness.report.pub_key, gateway_cache) - .await - { - ValidGatewayResult::Valid => { - let res = InsertBindings { - id: witness.ingest_id(), - remote_entropy: Vec::::with_capacity(0), - packet_data, - buf: buf.to_vec(), - received_ts: witness.received_timestamp, - report_type: ReportType::Witness, - status: IotStatus::Ready, - }; - metrics.increment_witnesses(); - Ok(Some(res)) - } - ValidGatewayResult::Unknown => { - metrics.increment_witnesses_unknown(); - Ok(None) - } + true => match self.check_valid_gateway(&witness.report.pub_key).await { + ValidGatewayResult::Valid => { + let res = InsertBindings { + id: witness.ingest_id(), + remote_entropy: Vec::::with_capacity(0), + packet_data, + buf: buf.to_vec(), + received_ts: witness.received_timestamp, + report_type: ReportType::Witness, + status: IotStatus::Ready, + }; + metrics.increment_witnesses(); + Ok(Some(res)) } - } + ValidGatewayResult::Unknown => { + metrics.increment_witnesses_unknown(); + Ok(None) + } + }, false => { tracing::debug!( "dropping witness report as no associated beacon data: {:?}", @@ -396,24 +386,19 @@ impl Loader { } } - async fn check_valid_gateway( - &self, - pub_key: &PublicKeyBinary, - gateway_cache: &GatewayCache, - ) -> ValidGatewayResult { - if self.check_unknown_gw(pub_key, gateway_cache).await { + async fn check_valid_gateway(&self, pub_key: &PublicKeyBinary) -> ValidGatewayResult { + if self.check_unknown_gw(pub_key).await { tracing::debug!("dropping unknown gateway: {:?}", &pub_key); return ValidGatewayResult::Unknown; } ValidGatewayResult::Valid } - async fn check_unknown_gw( - &self, - pub_key: &PublicKeyBinary, - gateway_cache: &GatewayCache, - ) -> bool { - gateway_cache.resolve_gateway_info(pub_key).await.is_err() + async fn check_unknown_gw(&self, pub_key: &PublicKeyBinary) -> bool { + self.gateway_cache + .resolve_gateway_info(pub_key) + .await + .is_err() } } diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 8c4f49f1f..66aadce57 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -1,20 +1,20 @@ use crate::entropy_loader::EntropyLoader; -use anyhow::{Error, Result}; +use anyhow::Result; +use chrono::Duration as ChronoDuration; use clap::Parser; use file_store::{ entropy_report::EntropyReport, file_info_poller::LookbackBehavior, file_sink, file_source, file_upload, iot_packet::IotValidPacket, FileStore, FileType, }; -use futures::TryFutureExt; use iot_config::client::Client as IotConfigClient; use iot_verifier::{ entropy_loader, gateway_cache::GatewayCache, gateway_updater::GatewayUpdater, loader, - packet_loader, purger, region_cache::RegionCache, rewarder::Rewarder, runner, telemetry, + packet_loader, purger, rewarder::Rewarder, runner, telemetry, tx_scaler::Server as DensityScaler, Settings, }; use price::PriceTracker; use std::path; -use tokio::signal; +use task_manager::TaskManager; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Debug, clap::Parser)] @@ -23,8 +23,8 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; pub struct Cli { /// Optional configuration file to use. If present the toml file at the - /// given path will be loaded. Environemnt variables can override the - /// settins in the given file. + /// given path will be loaded. Environment variables can override the + /// settings in the given file. #[clap(short = 'c')] config: Option, @@ -65,56 +65,68 @@ impl Server { // Install the prometheus metrics exporter poc_metrics::start_metrics(&settings.metrics)?; - // configure shutdown trigger - let (shutdown_trigger, shutdown) = triggered::trigger(); - let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?; - tokio::spawn(async move { - tokio::select! { - _ = sigterm.recv() => shutdown_trigger.trigger(), - _ = signal::ctrl_c() => shutdown_trigger.trigger(), - } - }); - // Create database pool and run migrations let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?; sqlx::migrate!().run(&pool).await?; telemetry::initialize(&pool).await?; + let (file_upload, file_upload_server) = + file_upload::FileUpload::from_settings_tm(&settings.output).await?; + let store_base_path = path::Path::new(&settings.cache); + let iot_config_client = IotConfigClient::from_settings(&settings.iot_config_client)?; - let (gateway_updater_receiver, gateway_updater) = + // * + // setup caches + // * + let (gateway_updater_receiver, gateway_updater_server) = GatewayUpdater::from_settings(settings, iot_config_client.clone()).await?; let gateway_cache = GatewayCache::new(gateway_updater_receiver.clone()); - let region_cache = RegionCache::from_settings(settings, iot_config_client.clone())?; + // * + // setup the price tracker requirements + // * + let (price_tracker, price_daemon) = PriceTracker::new_tm(&settings.price_tracker).await?; + + // * + // setup the loader requirements + // * + let loader = + loader::Loader::from_settings(settings, pool.clone(), gateway_cache.clone()).await?; + + // * + // setup the density scaler requirements + // * + let density_scaler = + DensityScaler::from_settings(settings, pool.clone(), gateway_updater_receiver).await?; - let (file_upload_tx, file_upload_rx) = file_upload::message_channel(); - let file_upload = - file_upload::FileUpload::from_settings(&settings.output, file_upload_rx).await?; + // * + // setup the rewarder requirements + // * - let store_base_path = std::path::Path::new(&settings.cache); // Gateway reward shares sink - let (rewards_sink, gateway_rewards_server) = file_sink::FileSinkBuilder::new( + let (rewards_sink, gateway_rewards_sink_server) = file_sink::FileSinkBuilder::new( FileType::IotRewardShare, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_gateway_reward_shares"), ) - .deposits(Some(file_upload_tx.clone())) + .file_upload(Some(file_upload.clone())) .auto_commit(false) .create() .await?; // Reward manifest - let (reward_manifests_sink, reward_manifests_server) = file_sink::FileSinkBuilder::new( - FileType::RewardManifest, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_iot_reward_manifest"), - ) - .deposits(Some(file_upload_tx.clone())) - .auto_commit(false) - .create() - .await?; + let (reward_manifests_sink, reward_manifests_sink_server) = + file_sink::FileSinkBuilder::new( + FileType::RewardManifest, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_iot_reward_manifest"), + ) + .file_upload(Some(file_upload.clone())) + .auto_commit(false) + .create() + .await?; let rewarder = Rewarder { pool: pool.clone(), @@ -122,27 +134,45 @@ impl Server { reward_manifests_sink, reward_period_hours: settings.rewards, reward_offset: settings.reward_offset_duration(), + price_tracker, }; - // setup the entropy loader continious source + // * + // setup entropy requirements + // * let max_lookback_age = settings.loader_window_max_lookback_age(); - let mut entropy_loader = EntropyLoader { pool: pool.clone() }; let entropy_store = FileStore::from_settings(&settings.entropy).await?; let entropy_interval = settings.entropy_interval(); let (entropy_loader_receiver, entropy_loader_server) = file_source::continuous_source::() .db(pool.clone()) - .store(entropy_store.clone()) + .store(entropy_store) .file_type(FileType::EntropyReport) .lookback(LookbackBehavior::Max(max_lookback_age)) .poll_duration(entropy_interval) .offset(entropy_interval * 2) .create()?; - let entropy_loader_source_join_handle = - entropy_loader_server.start(shutdown.clone()).await?; - // setup the packet loader continious source - let packet_loader = packet_loader::PacketLoader::from_settings(settings, pool.clone()); + let entropy_loader = EntropyLoader { + pool: pool.clone(), + file_receiver: entropy_loader_receiver, + }; + + // * + // setup the packet loader requirements + // * + + let (non_rewardable_packet_sink, non_rewardable_packet_sink_server) = + file_sink::FileSinkBuilder::new( + FileType::NonRewardablePacket, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_non_rewardable_packet"), + ) + .file_upload(Some(file_upload.clone())) + .roll_time(ChronoDuration::minutes(5)) + .create() + .await?; + let packet_store = FileStore::from_settings(&settings.packet_ingest).await?; let packet_interval = settings.packet_interval(); let (pk_loader_receiver, pk_loader_server) = @@ -154,49 +184,120 @@ impl Server { .poll_duration(packet_interval) .offset(packet_interval * 2) .create()?; - let pk_loader_source_join_handle = pk_loader_server.start(shutdown.clone()).await?; - - // init da processes - let mut loader = loader::Loader::from_settings(settings, pool.clone()).await?; - let mut runner = runner::Runner::new(settings, pool.clone()).await?; - let purger = purger::Purger::from_settings(settings, pool.clone()).await?; - let mut density_scaler = - DensityScaler::from_settings(settings, pool, gateway_updater_receiver.clone()).await?; - let (price_tracker, price_receiver) = - PriceTracker::start(&settings.price_tracker, shutdown.clone()).await?; - - tokio::try_join!( - gateway_updater.run(&shutdown).map_err(Error::from), - gateway_rewards_server - .run(shutdown.clone()) - .map_err(Error::from), - reward_manifests_server - .run(shutdown.clone()) - .map_err(Error::from), - file_upload.run(shutdown.clone()).map_err(Error::from), - runner.run( - file_upload_tx.clone(), - &gateway_cache, - ®ion_cache, - density_scaler.hex_density_map(), - &shutdown - ), - entropy_loader.run(entropy_loader_receiver, &shutdown), - loader.run(&shutdown, &gateway_cache), - packet_loader.run( - pk_loader_receiver, - shutdown.clone(), - &gateway_cache, - file_upload_tx.clone() - ), - purger.run(shutdown.clone()), - rewarder.run(price_tracker, &shutdown), - density_scaler.run(&shutdown).map_err(Error::from), - price_receiver.map_err(Error::from), - entropy_loader_source_join_handle.map_err(anyhow::Error::from), - pk_loader_source_join_handle.map_err(anyhow::Error::from), + + let packet_loader = packet_loader::PacketLoader::from_settings( + settings, + pool.clone(), + gateway_cache.clone(), + pk_loader_receiver, + non_rewardable_packet_sink, + ); + + // * + // setup the purger requirements + // * + + let (purger_invalid_beacon_sink, purger_invalid_beacon_sink_server) = + file_sink::FileSinkBuilder::new( + FileType::IotInvalidBeaconReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon"), + ) + .file_upload(Some(file_upload.clone())) + .auto_commit(false) + .create() + .await?; + + let (purger_invalid_witness_sink, purger_invalid_witness_sink_server) = + file_sink::FileSinkBuilder::new( + FileType::IotInvalidWitnessReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"), + ) + .file_upload(Some(file_upload.clone())) + .auto_commit(false) + .create() + .await?; + + let purger = purger::Purger::from_settings( + settings, + pool.clone(), + purger_invalid_beacon_sink, + purger_invalid_witness_sink, + ) + .await?; + + // * + // setup the runner requirements + // * + + let (runner_invalid_beacon_sink, runner_invalid_beacon_sink_server) = + file_sink::FileSinkBuilder::new( + FileType::IotInvalidBeaconReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon_report"), + ) + .file_upload(Some(file_upload.clone())) + .roll_time(ChronoDuration::minutes(5)) + .create() + .await?; + + let (runner_invalid_witness_sink, runner_invalid_witness_sink_server) = + file_sink::FileSinkBuilder::new( + FileType::IotInvalidWitnessReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"), + ) + .file_upload(Some(file_upload.clone())) + .roll_time(ChronoDuration::minutes(5)) + .create() + .await?; + + let (runner_poc_sink, runner_poc_sink_server) = file_sink::FileSinkBuilder::new( + FileType::IotPoc, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_valid_poc"), + ) + .file_upload(Some(file_upload.clone())) + .roll_time(ChronoDuration::minutes(2)) + .create() + .await?; + + let runner = runner::Runner::from_settings( + settings, + iot_config_client.clone(), + pool.clone(), + gateway_cache.clone(), + runner_invalid_beacon_sink, + runner_invalid_witness_sink, + runner_poc_sink, + density_scaler.hex_density_map.clone(), ) - .map(|_| ()) + .await?; + + TaskManager::builder() + .add_task(file_upload_server) + .add_task(gateway_rewards_sink_server) + .add_task(reward_manifests_sink_server) + .add_task(non_rewardable_packet_sink_server) + .add_task(purger_invalid_beacon_sink_server) + .add_task(purger_invalid_witness_sink_server) + .add_task(runner_invalid_beacon_sink_server) + .add_task(runner_invalid_witness_sink_server) + .add_task(runner_poc_sink_server) + .add_task(price_daemon) + .add_task(density_scaler) + .add_task(gateway_updater_server) + .add_task(purger) + .add_task(runner) + .add_task(entropy_loader) + .add_task(packet_loader) + .add_task(loader) + .add_task(pk_loader_server) + .add_task(entropy_loader_server) + .add_task(rewarder) + .start() + .await } } diff --git a/iot_verifier/src/packet_loader.rs b/iot_verifier/src/packet_loader.rs index e48a25457..e6a7f1f49 100644 --- a/iot_verifier/src/packet_loader.rs +++ b/iot_verifier/src/packet_loader.rs @@ -2,21 +2,21 @@ use crate::{ gateway_cache::GatewayCache, reward_share::GatewayDCShare, telemetry::LoaderMetricTracker, Settings, }; -use chrono::{Duration as ChronoDuration, Utc}; -use file_store::{ - file_info_poller::FileInfoStream, file_sink, file_sink::FileSinkClient, - file_upload::MessageSender as FileUploadSender, iot_packet::IotValidPacket, FileType, -}; -use futures::{StreamExt, TryStreamExt}; +use chrono::Utc; +use file_store::{file_info_poller::FileInfoStream, file_sink, iot_packet::IotValidPacket}; +use futures::{future::LocalBoxFuture, StreamExt, TryStreamExt}; use helium_proto::services::packet_verifier::ValidPacket; use helium_proto::services::poc_lora::{NonRewardablePacket, NonRewardablePacketReason}; use sqlx::PgPool; -use std::path::Path; +use task_manager::ManagedTask; use tokio::sync::mpsc::Receiver; pub struct PacketLoader { pub pool: PgPool, pub cache: String, + gateway_cache: GatewayCache, + file_receiver: Receiver>, + file_sink: file_sink::FileSinkClient, } #[derive(thiserror::Error, Debug)] @@ -27,46 +27,47 @@ pub enum NewLoaderError { DbStoreError(#[from] db_store::Error), } +impl ManagedTask for PacketLoader { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } +} + impl PacketLoader { - pub fn from_settings(settings: &Settings, pool: PgPool) -> Self { + pub fn from_settings( + settings: &Settings, + pool: PgPool, + gateway_cache: GatewayCache, + file_receiver: Receiver>, + file_sink: file_sink::FileSinkClient, + ) -> Self { tracing::info!("from_settings packet loader"); let cache = settings.cache.clone(); - Self { pool, cache } + Self { + pool, + cache, + gateway_cache, + file_receiver, + file_sink, + } } - pub async fn run( - &self, - mut receiver: Receiver>, - shutdown: triggered::Listener, - gateway_cache: &GatewayCache, - file_upload_tx: FileUploadSender, - ) -> anyhow::Result<()> { - tracing::info!("starting verifier iot packet loader"); - let store_base_path = Path::new(&self.cache); - let (non_rewardable_packet_sink, non_rewardable_packet_server) = - file_sink::FileSinkBuilder::new( - FileType::NonRewardablePacket, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_non_rewardable_packet"), - ) - .deposits(Some(file_upload_tx.clone())) - .roll_time(ChronoDuration::minutes(5)) - .create() - .await?; - let shutdown1 = shutdown.clone(); - tokio::spawn(async move { non_rewardable_packet_server.run(shutdown1).await }); + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("starting packet loader"); loop { tokio::select! { + biased; _ = shutdown.clone() => break, - msg = receiver.recv() => if let Some(stream) = msg { + msg = self.file_receiver.recv() => if let Some(stream) = msg { let metrics = LoaderMetricTracker::new(); - match self.handle_packet_file(stream, gateway_cache, &non_rewardable_packet_sink, &metrics).await { + match self.handle_packet_file(stream, &metrics).await { Ok(()) => { - // todo: maybe two actions below can occur in handle_packet - // but wasnt able to get it to work ? metrics.record_metrics(); - non_rewardable_packet_sink.commit().await?; + self.file_sink.commit().await?; }, Err(err) => { return Err(err)} @@ -74,15 +75,13 @@ impl PacketLoader { } } } - tracing::info!("stopping verifier iot packet loader"); + tracing::info!("stopping packet loader"); Ok(()) } async fn handle_packet_file( &self, file_info_stream: FileInfoStream, - gateway_cache: &GatewayCache, - non_rewardable_packet_sink: &FileSinkClient, metrics: &LoaderMetricTracker, ) -> anyhow::Result<()> { let mut transaction = self.pool.begin().await?; @@ -100,7 +99,8 @@ impl PacketLoader { .try_fold( transaction, |mut transaction, (valid_packet, reward_share)| async move { - if gateway_cache + if self + .gateway_cache .resolve_gateway_info(&reward_share.hotspot_key) .await .is_ok() @@ -117,7 +117,7 @@ impl PacketLoader { reason: reason as i32, timestamp, }; - non_rewardable_packet_sink + self.file_sink .write( non_rewardable_packet_proto, &[("reason", reason.as_str_name())], diff --git a/iot_verifier/src/poc.rs b/iot_verifier/src/poc.rs index cf028a8bd..a3fe3e77f 100644 --- a/iot_verifier/src/poc.rs +++ b/iot_verifier/src/poc.rs @@ -111,7 +111,7 @@ impl Poc { #[allow(clippy::too_many_arguments)] pub async fn verify_beacon( &mut self, - hex_density_map: impl HexDensityMap, + hex_density_map: &HexDensityMap, gateway_cache: &GatewayCache, region_cache: &RegionCache, pool: &PgPool, @@ -178,7 +178,7 @@ impl Poc { pub async fn verify_witnesses( &mut self, beacon_info: &GatewayInfo, - hex_density_map: impl HexDensityMap, + hex_density_map: &HexDensityMap, gateway_cache: &GatewayCache, deny_list: &DenyList, ) -> Result { @@ -198,7 +198,7 @@ impl Poc { &witness_report, beacon_info, gateway_cache, - &hex_density_map, + hex_density_map, ) .await { @@ -238,7 +238,7 @@ impl Poc { witness_report: &IotWitnessIngestReport, beaconer_info: &GatewayInfo, gateway_cache: &GatewayCache, - hex_density_map: &impl HexDensityMap, + hex_density_map: &HexDensityMap, ) -> Result { let witness = &witness_report.report; let witness_pub_key = witness.pub_key.clone(); @@ -277,7 +277,7 @@ impl Poc { } }; // to avoid assuming beaconer location is set and to avoid unwrap - // we explicity match location here again + // we explicitly match location here again let Some(ref beaconer_metadata) = beaconer_info.metadata else { return Ok(IotVerifiedWitnessReport::invalid( InvalidReason::NotAsserted, @@ -1320,7 +1320,7 @@ mod tests { // in order to assert the presence of each expected verification // by confirming the beacon report is rendered as invalid // asserting the presence of each will guard against - // one of more verifications being accidently removed + // one of more verifications being accidentally removed // from `do_beacon_verifications` // create default data structs @@ -1358,7 +1358,7 @@ mod tests { resp1 ); - // test entropy lifepsan verification is active in the beacon validation list + // test entropy lifespan verification is active in the beacon validation list let beacon_report1 = valid_beacon_report(PUBKEY1, entropy_start + Duration::minutes(4)); let resp1 = do_beacon_verifications( &deny_list, @@ -1498,7 +1498,7 @@ mod tests { // in order to assert the presence of each expected verification // by confirming the witness report is rendered as invalid // asserting the presence of each will guard against - // one of more verifications being accidently removed + // one of more verifications being accidentally removed // from `do_witness_verifications` // create default data structs @@ -1533,7 +1533,7 @@ mod tests { resp1 ); - // test entropy lifepsan verification is active in the witness validation list + // test entropy lifespan verification is active in the witness validation list let witness_report2 = valid_witness_report(PUBKEY2, entropy_start + Duration::minutes(5)); let resp2 = do_witness_verifications( &deny_list, diff --git a/iot_verifier/src/purger.rs b/iot_verifier/src/purger.rs index fa1376967..6f647a728 100644 --- a/iot_verifier/src/purger.rs +++ b/iot_verifier/src/purger.rs @@ -1,22 +1,24 @@ use crate::{entropy::Entropy, poc_report::Report, telemetry, Settings}; use chrono::Duration; use file_store::{ - file_sink::{self, FileSinkClient}, - file_upload, + file_sink::FileSinkClient, iot_beacon_report::IotBeaconIngestReport, iot_invalid_poc::IotInvalidBeaconReport, iot_invalid_poc::IotInvalidWitnessReport, iot_witness_report::IotWitnessIngestReport, traits::{IngestId, MsgDecode}, - FileType, }; -use futures::stream::{self, StreamExt}; +use futures::{ + future::LocalBoxFuture, + stream::{self, StreamExt}, +}; use helium_proto::services::poc_lora::{ InvalidParticipantSide, InvalidReason, LoraInvalidBeaconReportV1, LoraInvalidWitnessReportV1, }; use lazy_static::lazy_static; use sqlx::{PgPool, Postgres}; -use std::{ops::DerefMut, path::Path}; +use std::ops::DerefMut; +use task_manager::ManagedTask; use tokio::{ sync::Mutex, time::{self, MissedTickBehavior}, @@ -36,74 +38,52 @@ lazy_static! { pub struct Purger { pool: PgPool, - cache: String, - output: file_store::Settings, base_stale_period: Duration, + invalid_beacon_sink: FileSinkClient, + invalid_witness_sink: FileSinkClient, } #[derive(thiserror::Error, Debug)] #[error("error creating purger: {0}")] pub struct NewPurgerError(#[from] db_store::Error); +impl ManagedTask for Purger { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } +} + impl Purger { - pub async fn from_settings(settings: &Settings, pool: PgPool) -> Result { - let cache = settings.cache.clone(); - let output = settings.output.clone(); + pub async fn from_settings( + settings: &Settings, + pool: PgPool, + invalid_beacon_sink: FileSinkClient, + invalid_witness_sink: FileSinkClient, + ) -> Result { let base_stale_period = settings.base_stale_period(); Ok(Self { pool, - cache, - output, base_stale_period, + invalid_beacon_sink, + invalid_witness_sink, }) } - pub async fn run(&self, shutdown: triggered::Listener) -> anyhow::Result<()> { + pub async fn run(self, shutdown: triggered::Listener) -> anyhow::Result<()> { tracing::info!("starting purger"); let mut db_timer = time::interval(DB_POLL_TIME); db_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); - let store_base_path = Path::new(&self.cache); - let (file_upload_tx, file_upload_rx) = file_upload::message_channel(); - let file_upload = - file_upload::FileUpload::from_settings(&self.output, file_upload_rx).await?; - - let (invalid_beacon_sink, invalid_beacon_sink_server) = file_sink::FileSinkBuilder::new( - FileType::IotInvalidBeaconReport, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon"), - ) - .deposits(Some(file_upload_tx.clone())) - .auto_commit(false) - .create() - .await?; - - let (invalid_witness_sink, invalid_witness_sink_server) = file_sink::FileSinkBuilder::new( - FileType::IotInvalidWitnessReport, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"), - ) - .deposits(Some(file_upload_tx.clone())) - .auto_commit(false) - .create() - .await?; - - let shutdown1 = shutdown.clone(); - let shutdown2 = shutdown.clone(); - let shutdown3 = shutdown.clone(); - tokio::spawn(async move { invalid_beacon_sink_server.run(shutdown1).await }); - tokio::spawn(async move { invalid_witness_sink_server.run(shutdown2).await }); - tokio::spawn(async move { file_upload.run(shutdown3).await }); - loop { - if shutdown.is_triggered() { - break; - } tokio::select! { + biased; _ = shutdown.clone() => break, _ = db_timer.tick() => - match self.handle_db_tick(&invalid_beacon_sink, &invalid_witness_sink).await { + match self.handle_db_tick().await { Ok(()) => (), Err(err) => { tracing::error!("fatal purger error: {err:?}"); @@ -115,11 +95,7 @@ impl Purger { Ok(()) } - async fn handle_db_tick( - &self, - invalid_beacon_sink: &FileSinkClient, - invalid_witness_sink: &FileSinkClient, - ) -> anyhow::Result<()> { + async fn handle_db_tick(&self) -> anyhow::Result<()> { // pull stale beacons and witnesses // for each we have to write out an invalid report to S3 // as these wont have previously resulted in a file going to s3 @@ -135,10 +111,7 @@ impl Purger { let tx = Mutex::new(self.pool.begin().await?); stream::iter(stale_beacons) .for_each_concurrent(PURGER_WORKERS, |report| async { - match self - .handle_purged_beacon(&tx, report, invalid_beacon_sink) - .await - { + match self.handle_purged_beacon(&tx, report).await { Ok(()) => (), Err(err) => { tracing::warn!("failed to purge beacon: {err:?}") @@ -146,7 +119,7 @@ impl Purger { } }) .await; - invalid_beacon_sink.commit().await?; + self.invalid_beacon_sink.commit().await?; tx.into_inner().commit().await?; let witness_stale_period = self.base_stale_period + *WITNESS_STALE_PERIOD; @@ -161,10 +134,7 @@ impl Purger { let tx = Mutex::new(self.pool.begin().await?); stream::iter(stale_witnesses) .for_each_concurrent(PURGER_WORKERS, |report| async { - match self - .handle_purged_witness(&tx, report, invalid_witness_sink) - .await - { + match self.handle_purged_witness(&tx, report).await { Ok(()) => (), Err(err) => { tracing::warn!("failed to purge witness: {err:?}") @@ -172,7 +142,7 @@ impl Purger { } }) .await; - invalid_witness_sink.commit().await?; + self.invalid_witness_sink.commit().await?; tx.into_inner().commit().await?; tracing::info!("completed purging {num_stale_witnesses} stale witnesses"); @@ -185,7 +155,6 @@ impl Purger { &self, tx: &Mutex>, db_beacon: Report, - invalid_beacon_sink: &FileSinkClient, ) -> anyhow::Result<()> { let beacon_buf: &[u8] = &db_beacon.report_data; let beacon_report = IotBeaconIngestReport::decode(beacon_buf)?; @@ -203,7 +172,7 @@ impl Purger { } .into(); - invalid_beacon_sink + self.invalid_beacon_sink .write( invalid_beacon_proto, &[("reason", InvalidReason::Stale.as_str_name())], @@ -219,7 +188,6 @@ impl Purger { &self, tx: &Mutex>, db_witness: Report, - invalid_witness_sink: &FileSinkClient, ) -> anyhow::Result<()> { let witness_buf: &[u8] = &db_witness.report_data; let witness_report = IotWitnessIngestReport::decode(witness_buf)?; @@ -234,7 +202,7 @@ impl Purger { } .into(); - invalid_witness_sink + self.invalid_witness_sink .write( invalid_witness_report_proto, &[("reason", InvalidReason::Stale.as_str_name())], diff --git a/iot_verifier/src/region_cache.rs b/iot_verifier/src/region_cache.rs index 021102a17..0ca18ee29 100644 --- a/iot_verifier/src/region_cache.rs +++ b/iot_verifier/src/region_cache.rs @@ -10,6 +10,7 @@ use std::{sync::Arc, time::Duration}; /// how often to evict expired items from the cache ( every 5 mins) const CACHE_EVICTION_FREQUENCY: Duration = Duration::from_secs(60 * 5); +#[derive(Clone)] pub struct RegionCache { pub iot_config_client: IotConfigClient, pub cache: Arc>, diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index 80cd6cb92..fcd1002ff 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -5,12 +5,14 @@ use crate::{ use chrono::{DateTime, Duration, TimeZone, Utc}; use db_store::meta; use file_store::{file_sink, traits::TimestampEncode}; +use futures::future::LocalBoxFuture; use helium_proto::RewardManifest; use price::PriceTracker; use reward_scheduler::Scheduler; use rust_decimal::prelude::*; -use sqlx::{PgExecutor, Pool, Postgres}; +use sqlx::{PgExecutor, PgPool, Pool, Postgres}; use std::ops::Range; +use task_manager::ManagedTask; use tokio::time::sleep; const REWARDS_NOT_CURRENT_DELAY_PERIOD: i64 = 5; @@ -21,15 +23,39 @@ pub struct Rewarder { pub reward_manifests_sink: file_sink::FileSinkClient, pub reward_period_hours: i64, pub reward_offset: Duration, + pub price_tracker: PriceTracker, +} + +impl ManagedTask for Rewarder { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } } impl Rewarder { - pub async fn run( - mut self, + pub async fn new( + pool: PgPool, + rewards_sink: file_sink::FileSinkClient, + reward_manifests_sink: file_sink::FileSinkClient, + reward_period_hours: i64, + reward_offset: Duration, price_tracker: PriceTracker, - shutdown: &triggered::Listener, - ) -> anyhow::Result<()> { - tracing::info!("Starting iot verifier rewarder"); + ) -> Self { + Self { + pool, + rewards_sink, + reward_manifests_sink, + reward_period_hours, + reward_offset, + price_tracker, + } + } + + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("Starting rewarder"); let reward_period_length = Duration::hours(self.reward_period_hours); @@ -44,7 +70,8 @@ impl Rewarder { ); let sleep_duration = if scheduler.should_reward(now) { - let iot_price = price_tracker + let iot_price = self + .price_tracker .price(&helium_proto::BlockchainTokenTypeV1::Iot) .await?; tracing::info!( @@ -64,17 +91,15 @@ impl Rewarder { scheduler.sleep_duration(Utc::now())? }; - tracing::info!( - "Sleeping for {}", - humantime::format_duration(sleep_duration) - ); - let shutdown = shutdown.clone(); tokio::select! { - _ = shutdown => return Ok(()), + biased; + _ = shutdown => break, _ = sleep(sleep_duration) => (), } } + tracing::info!("stopping rewarder"); + Ok(()) } pub async fn reward( diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 375eaf674..07fe928e0 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -11,29 +11,29 @@ use crate::{ use chrono::{Duration as ChronoDuration, Utc}; use denylist::DenyList; use file_store::{ - file_sink, file_sink::FileSinkClient, - file_upload::MessageSender as FileUploadSender, iot_beacon_report::IotBeaconIngestReport, iot_invalid_poc::{IotInvalidBeaconReport, IotInvalidWitnessReport}, iot_valid_poc::{IotPoc, IotValidBeaconReport, IotVerifiedWitnessReport}, iot_witness_report::IotWitnessIngestReport, traits::{IngestId, MsgDecode, ReportId}, - FileType, SCALING_PRECISION, + SCALING_PRECISION, }; -use futures::stream::{self, StreamExt}; +use futures::{future::LocalBoxFuture, stream, StreamExt, TryFutureExt}; use helium_proto::services::poc_lora::{ InvalidParticipantSide, InvalidReason, LoraInvalidBeaconReportV1, LoraInvalidWitnessReportV1, LoraPocV1, VerificationStatus, }; +use iot_config::client::Client as IotConfigClient; use rust_decimal::{Decimal, MathematicalOps}; use rust_decimal_macros::dec; use sqlx::PgPool; -use std::{path::Path, time::Duration}; +use std::time::Duration; +use task_manager::ManagedTask; use tokio::time::{self, MissedTickBehavior}; /// the cadence in seconds at which the DB is polled for ready POCs -const DB_POLL_TIME: time::Duration = time::Duration::from_secs(30); +const DB_POLL_TIME: Duration = Duration::from_secs(30); const BEACON_WORKERS: usize = 100; const WITNESS_REDUNDANCY: u32 = 4; @@ -42,7 +42,6 @@ const HIP15_TX_REWARD_UNIT_CAP: Decimal = Decimal::TWO; pub struct Runner { pool: PgPool, - cache: String, beacon_interval: ChronoDuration, beacon_interval_tolerance: ChronoDuration, max_witnesses_per_poc: u64, @@ -51,6 +50,12 @@ pub struct Runner { deny_list_latest_url: String, deny_list_trigger_interval: Duration, deny_list: DenyList, + gateway_cache: GatewayCache, + region_cache: RegionCache, + invalid_beacon_sink: FileSinkClient, + invalid_witness_sink: FileSinkClient, + poc_sink: FileSinkClient, + hex_density_map: HexDensityMap, } #[derive(thiserror::Error, Debug)] @@ -64,9 +69,32 @@ pub enum FilterStatus { Include, } +impl ManagedTask for Runner { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + impl Runner { - pub async fn new(settings: &Settings, pool: PgPool) -> anyhow::Result { - let cache = settings.cache.clone(); + #[allow(clippy::too_many_arguments)] + pub async fn from_settings( + settings: &Settings, + iot_config_client: IotConfigClient, + pool: PgPool, + gateway_cache: GatewayCache, + invalid_beacon_sink: FileSinkClient, + invalid_witness_sink: FileSinkClient, + poc_sink: FileSinkClient, + hex_density_map: HexDensityMap, + ) -> anyhow::Result { let beacon_interval = settings.beacon_interval(); let beacon_interval_tolerance = settings.beacon_interval_tolerance(); let max_witnesses_per_poc = settings.max_witnesses_per_poc; @@ -74,6 +102,7 @@ impl Runner { let witness_max_retries = settings.witness_max_retries; let deny_list_latest_url = settings.denylist.denylist_url.clone(); let mut deny_list = DenyList::new(&settings.denylist)?; + let region_cache = RegionCache::from_settings(settings, iot_config_client)?; // force update to latest in order to update the tag name // when first run, the denylist will load the local filter // but we dont save the tag name so it defaults to 0 @@ -89,26 +118,24 @@ impl Runner { Ok(Self { pool, - cache, beacon_interval, beacon_interval_tolerance, max_witnesses_per_poc, + gateway_cache, + region_cache, beacon_max_retries, witness_max_retries, deny_list_latest_url, deny_list_trigger_interval: settings.denylist.trigger_interval(), deny_list, + invalid_beacon_sink, + invalid_witness_sink, + poc_sink, + hex_density_map, }) } - pub async fn run( - &mut self, - file_upload_tx: FileUploadSender, - gateway_cache: &GatewayCache, - region_cache: &RegionCache, - hex_density_map: impl HexDensityMap, - shutdown: &triggered::Listener, - ) -> anyhow::Result<()> { + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { tracing::info!("starting runner"); let mut db_timer = time::interval(DB_POLL_TIME); @@ -117,51 +144,9 @@ impl Runner { let mut denylist_timer = time::interval(self.deny_list_trigger_interval); denylist_timer.set_missed_tick_behavior(MissedTickBehavior::Skip); - let store_base_path = Path::new(&self.cache); - - let (iot_invalid_beacon_sink, iot_invalid_beacon_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::IotInvalidBeaconReport, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon_report"), - ) - .deposits(Some(file_upload_tx.clone())) - .roll_time(ChronoDuration::minutes(5)) - .create() - .await?; - - let (iot_invalid_witness_sink, iot_invalid_witness_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::IotInvalidWitnessReport, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"), - ) - .deposits(Some(file_upload_tx.clone())) - .roll_time(ChronoDuration::minutes(5)) - .create() - .await?; - - let (iot_poc_sink, iot_poc_sink_server) = file_sink::FileSinkBuilder::new( - FileType::IotPoc, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_valid_poc"), - ) - .deposits(Some(file_upload_tx.clone())) - .roll_time(ChronoDuration::minutes(2)) - .create() - .await?; - let shutdown1 = shutdown.clone(); - let shutdown2 = shutdown.clone(); - let shutdown3 = shutdown.clone(); - tokio::spawn(async move { iot_invalid_beacon_sink_server.run(shutdown1).await }); - tokio::spawn(async move { iot_invalid_witness_sink_server.run(shutdown2).await }); - tokio::spawn(async move { iot_poc_sink_server.run(shutdown3).await }); - loop { - if shutdown.is_triggered() { - break; - } tokio::select! { + biased; _ = shutdown.clone() => break, _ = denylist_timer.tick() => match self.handle_denylist_tick().await { @@ -171,13 +156,7 @@ impl Runner { } }, _ = db_timer.tick() => - match self.handle_db_tick( shutdown.clone(), - &iot_invalid_beacon_sink, - &iot_invalid_witness_sink, - &iot_poc_sink, - gateway_cache, - region_cache, - hex_density_map.clone()).await { + match self.handle_db_tick().await { Ok(()) => (), Err(err) => { tracing::error!("fatal db runner error: {err:?}"); @@ -206,17 +185,7 @@ impl Runner { Ok(()) } - #[allow(clippy::too_many_arguments)] - async fn handle_db_tick( - &self, - _shutdown: triggered::Listener, - iot_invalid_beacon_sink: &FileSinkClient, - iot_invalid_witness_sink: &FileSinkClient, - iot_poc_sink: &FileSinkClient, - gateway_cache: &GatewayCache, - region_cache: &RegionCache, - hex_density_map: impl HexDensityMap, - ) -> anyhow::Result<()> { + async fn handle_db_tick(&self) -> anyhow::Result<()> { tracing::info!("starting query get_next_beacons"); let db_beacon_reports = Report::get_next_beacons(&self.pool, self.beacon_max_retries).await?; @@ -234,27 +203,13 @@ impl Runner { tracing::info!("{beacon_len} beacons ready for verification"); stream::iter(db_beacon_reports) - .for_each_concurrent(BEACON_WORKERS, |db_beacon| { - let hdm = hex_density_map.clone(); - async move { - let beacon_id = db_beacon.id.clone(); - match self - .handle_beacon_report( - db_beacon, - iot_invalid_beacon_sink, - iot_invalid_witness_sink, - iot_poc_sink, - gateway_cache, - region_cache, - hdm, - ) - .await - { - Ok(()) => (), - Err(err) => { - tracing::warn!("failed to handle beacon: {err:?}"); - _ = Report::update_attempts(&self.pool, &beacon_id, Utc::now()).await; - } + .for_each_concurrent(BEACON_WORKERS, |db_beacon| async move { + let beacon_id = db_beacon.id.clone(); + match self.handle_beacon_report(db_beacon).await { + Ok(()) => (), + Err(err) => { + tracing::warn!("failed to handle beacon: {err:?}"); + _ = Report::update_attempts(&self.pool, &beacon_id, Utc::now()).await; } } }) @@ -263,17 +218,7 @@ impl Runner { Ok(()) } - #[allow(clippy::too_many_arguments)] - async fn handle_beacon_report( - &self, - db_beacon: Report, - iot_invalid_beacon_sink: &FileSinkClient, - iot_invalid_witness_sink: &FileSinkClient, - iot_poc_sink: &FileSinkClient, - gateway_cache: &GatewayCache, - region_cache: &RegionCache, - hex_density_map: impl HexDensityMap, - ) -> anyhow::Result<()> { + async fn handle_beacon_report(&self, db_beacon: Report) -> anyhow::Result<()> { let entropy_start_time = match db_beacon.timestamp { Some(v) => v, None => return Ok(()), @@ -314,9 +259,9 @@ impl Runner { // verify POC beacon let beacon_verify_result = poc .verify_beacon( - hex_density_map.clone(), - gateway_cache, - region_cache, + &self.hex_density_map, + &self.gateway_cache, + &self.region_cache, &self.pool, self.beacon_interval, self.beacon_interval_tolerance, @@ -330,8 +275,8 @@ impl Runner { let verified_witnesses_result = poc .verify_witnesses( &beacon_info, - hex_density_map, - gateway_cache, + &self.hex_density_map, + &self.gateway_cache, &self.deny_list, ) .await?; @@ -371,7 +316,7 @@ impl Runner { sort_and_split_witnesses(&mut selected_witnesses, max_witnesses_per_poc)?; // concat the unselected valid witnesses and the invalid witnesses - // these will then form the unseleted list on the poc + // these will then form the unselected list on the poc unselected_witnesses = [&unselected_witnesses[..], &invalid_witnesses[..]].concat(); @@ -414,21 +359,14 @@ impl Runner { valid_beacon_report, selected_witnesses, unselected_witnesses, - iot_poc_sink, ) .await?; } } VerificationStatus::Invalid => { // the beacon is invalid, which in turn renders all witnesses invalid - self.handle_invalid_poc( - beacon_verify_result, - &beacon_report, - witnesses, - iot_invalid_beacon_sink, - iot_invalid_witness_sink, - ) - .await?; + self.handle_invalid_poc(beacon_verify_result, &beacon_report, witnesses) + .await?; } } Ok(()) @@ -439,8 +377,6 @@ impl Runner { beacon_verify_result: VerifyBeaconResult, beacon_report: &IotBeaconIngestReport, witness_reports: Vec, - iot_invalid_beacon_sink: &FileSinkClient, - iot_invalid_witness_sink: &FileSinkClient, ) -> anyhow::Result<()> { // the beacon is invalid, which in turn renders all witnesses invalid let beacon = &beacon_report.report; @@ -469,7 +405,8 @@ impl Runner { let invalid_poc_proto: LoraInvalidBeaconReportV1 = invalid_poc.into(); // save invalid poc to s3, if write fails update attempts and go no further // allow the poc to be reprocessed next tick - match iot_invalid_beacon_sink + match self + .invalid_beacon_sink .write( invalid_poc_proto, &[("reason", beacon_verify_result.invalid_reason.as_str_name())], @@ -485,7 +422,7 @@ impl Runner { } // save invalid witnesses to s3, ignore any failed witness writes // taking the lossly approach here as if we re attempt the POC later - // we will have to clean out any sucessful writes of other witnesses + // we will have to clean out any successful writes of other witnesses // and also the invalid poc // so if a report fails from this point on, it shall be lost for ever more for witness_report in witness_reports { @@ -498,7 +435,8 @@ impl Runner { }; let invalid_witness_report_proto: LoraInvalidWitnessReportV1 = invalid_witness_report.into(); - match iot_invalid_witness_sink + match self + .invalid_witness_sink .write( invalid_witness_report_proto, &[("reason", beacon_verify_result.invalid_reason.as_str_name())], @@ -522,7 +460,6 @@ impl Runner { valid_beacon_report: IotValidBeaconReport, selected_witnesses: Vec, unselected_witnesses: Vec, - iot_poc_sink: &FileSinkClient, ) -> anyhow::Result<()> { let received_timestamp = valid_beacon_report.received_timestamp; let pub_key = valid_beacon_report.report.pub_key.clone(); @@ -546,7 +483,7 @@ impl Runner { let poc_proto: LoraPocV1 = iot_poc.into(); // save the poc to s3, if write fails update attempts and go no further // allow the poc to be reprocessed next tick - match iot_poc_sink.write(poc_proto, []).await { + match self.poc_sink.write(poc_proto, []).await { Ok(_) => (), Err(err) => { tracing::error!("failed to save invalid_witness_report to s3, {err}"); @@ -555,7 +492,7 @@ impl Runner { } } // write out metrics for any witness which failed verification - // TODO: work our approach that doesnt require the prior cloning of + // TODO: work our approach that doesn't require the prior cloning of // the selected and unselected witnesses vecs // tried to do this directly from the now discarded poc_proto // but could nae get it to get a way past the lack of COPY diff --git a/iot_verifier/src/tx_scaler.rs b/iot_verifier/src/tx_scaler.rs index 6289d5c04..b02f292c3 100644 --- a/iot_verifier/src/tx_scaler.rs +++ b/iot_verifier/src/tx_scaler.rs @@ -1,20 +1,22 @@ use crate::{ gateway_updater::MessageReceiver, - hex_density::{compute_hex_density_map, GlobalHexMap, HexDensityMap, SharedHexDensityMap}, + hex_density::{compute_hex_density_map, GlobalHexMap, HexDensityMap}, last_beacon::LastBeacon, Settings, }; use chrono::{DateTime, Duration, Utc}; +use futures::future::LocalBoxFuture; use helium_crypto::PublicKeyBinary; use sqlx::PgPool; use std::collections::HashMap; +use task_manager::ManagedTask; // The number in minutes within which the gateway has registered a beacon // to the oracle for inclusion in transmit scaling density calculations const HIP_17_INTERACTIVITY_LIMIT: i64 = 3600; pub struct Server { - hex_density_map: SharedHexDensityMap, + pub hex_density_map: HexDensityMap, pool: PgPool, refresh_offset: Duration, gateway_cache_receiver: MessageReceiver, @@ -28,14 +30,23 @@ pub enum TxScalerError { RecentActivity(#[from] sqlx::Error), } +impl ManagedTask for Server { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } +} + impl Server { pub async fn from_settings( settings: &Settings, pool: PgPool, gateway_cache_receiver: MessageReceiver, - ) -> Result { + ) -> anyhow::Result { let mut server = Self { - hex_density_map: SharedHexDensityMap::new(), + hex_density_map: HexDensityMap::new(), pool, refresh_offset: settings.loader_window_max_lookback_age(), gateway_cache_receiver, @@ -46,27 +57,22 @@ impl Server { Ok(server) } - pub fn hex_density_map(&self) -> impl HexDensityMap { - self.hex_density_map.clone() - } - - pub async fn run(&mut self, shutdown: &triggered::Listener) -> Result<(), TxScalerError> { - tracing::info!("density_scaler: starting transmit scaler process"); + pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("starting tx scaler process"); loop { - if shutdown.is_triggered() { - tracing::info!("density_scaler: stopping transmit scaler"); - return Ok(()); - } - tokio::select! { + biased; + _ = shutdown.clone() => break, _ = self.gateway_cache_receiver.changed() => self.refresh_scaling_map().await?, - _ = shutdown.clone() => return Ok(()), } } + + tracing::info!("stopping tx scaler process"); + Ok(()) } - pub async fn refresh_scaling_map(&mut self) -> Result<(), TxScalerError> { + pub async fn refresh_scaling_map(&mut self) -> anyhow::Result<()> { let refresh_start = Utc::now() - self.refresh_offset; tracing::info!("density_scaler: generating hex scaling map, starting at {refresh_start:?}"); let mut global_map = GlobalHexMap::new(); diff --git a/mobile_verifier/src/data_session.rs b/mobile_verifier/src/data_session.rs index 2353fd8c6..e99fd1a58 100644 --- a/mobile_verifier/src/data_session.rs +++ b/mobile_verifier/src/data_session.rs @@ -29,6 +29,7 @@ impl DataSessionIngestor { tokio::spawn(async move { loop { tokio::select! { + biased; _ = shutdown.clone() => { tracing::info!("DataSessionIngestor shutting down"); break; diff --git a/mobile_verifier/src/heartbeats.rs b/mobile_verifier/src/heartbeats.rs index e954b53e9..313a42a75 100644 --- a/mobile_verifier/src/heartbeats.rs +++ b/mobile_verifier/src/heartbeats.rs @@ -78,6 +78,7 @@ impl HeartbeatDaemon { loop { tokio::select! { + biased; _ = shutdown.clone() => { tracing::info!("HeartbeatDaemon shutting down"); break; diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 836ec3fac..407e96916 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -77,6 +77,7 @@ impl Rewarder { ); tokio::select! { + biased; _ = shutdown.clone() => break, _ = sleep(sleep_duration) => (), } diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index f0bded1f7..1d0e55aef 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -65,6 +65,7 @@ impl SpeedtestDaemon { tokio::spawn(async move { loop { tokio::select! { + biased; _ = shutdown.clone() => { tracing::info!("SpeedtestDaemon shutting down"); break; diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 03b113d20..fbae70eb9 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -46,10 +46,8 @@ impl SubscriberLocationIngestor { } pub async fn run(mut self, shutdown: &triggered::Listener) -> anyhow::Result<()> { loop { - if shutdown.is_triggered() { - break; - } tokio::select! { + biased; _ = shutdown.clone() => break, msg = self.reports_receiver.recv() => if let Some(stream) = msg { match self.process_file(stream).await { diff --git a/poc_entropy/src/entropy_generator.rs b/poc_entropy/src/entropy_generator.rs index aec84bd3d..dd78a5048 100644 --- a/poc_entropy/src/entropy_generator.rs +++ b/poc_entropy/src/entropy_generator.rs @@ -125,10 +125,8 @@ impl EntropyGenerator { entropy_timer.set_missed_tick_behavior(time::MissedTickBehavior::Delay); loop { - if shutdown.is_triggered() { - break; - } tokio::select! { + biased; _ = shutdown.clone() => break, _ = entropy_timer.tick() => match self.handle_entropy_tick(&file_sink).await { Ok(()) => (), diff --git a/price/Cargo.toml b/price/Cargo.toml index fde5da4dc..2b51b92fd 100644 --- a/price/Cargo.toml +++ b/price/Cargo.toml @@ -21,6 +21,7 @@ tracing-subscriber = { workspace = true } metrics = {workspace = true } metrics-exporter-prometheus = { workspace = true } tokio = { workspace = true } +tokio-util = { workspace = true } chrono = { workspace = true } helium-proto = { workspace = true } file-store = { path = "../file_store" } @@ -28,5 +29,6 @@ poc-metrics = { path = "../metrics" } triggered = {workspace = true} solana-client = {workspace = true} solana-sdk = {workspace = true} +task-manager = {path = "../task_manager"} price-oracle = {workspace = true} anchor-lang = {workspace = true} diff --git a/price/src/main.rs b/price/src/main.rs index 9e8564e94..37cdd3e28 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -6,6 +6,7 @@ use futures_util::TryFutureExt; use helium_proto::BlockchainTokenTypeV1; use price::{cli::check, PriceGenerator, Settings}; use std::path::{self, PathBuf}; +use task_manager::TaskManager; use tokio::{self, signal}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -47,6 +48,16 @@ impl Cmd { Self::Check(options) => check::run(options.into()).await, } } + + pub async fn run_tm(&self, config: Option) -> Result<()> { + match self { + Self::Server(cmd) => { + let settings = Settings::new(config)?; + cmd.run_tm(&settings).await + } + Self::Check(options) => check::run(options.into()).await, + } + } } #[derive(Debug, clap::Args)] @@ -136,6 +147,55 @@ impl Server { ) .map(|_| ()) } + + pub async fn run_tm(&self, settings: &Settings) -> Result<()> { + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::new(&settings.log)) + .with(tracing_subscriber::fmt::layer()) + .init(); + + // Install the prometheus metrics exporter + poc_metrics::start_metrics(&settings.metrics)?; + + // Initialize uploader + let (file_upload, file_upload_server) = + file_upload::FileUpload::from_settings_tm(&settings.output).await?; + + let store_base_path = path::Path::new(&settings.cache); + + let (price_sink, price_sink_server) = file_sink::FileSinkBuilder::new( + FileType::PriceReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_report_submission"), + ) + .file_upload(Some(file_upload.clone())) + .roll_time(Duration::minutes(PRICE_SINK_ROLL_MINS)) + .create() + .await?; + + // price generators + let hnt_price_generator = + PriceGenerator::new_tm(settings, BlockchainTokenTypeV1::Hnt, price_sink.clone()) + .await?; + let mobile_price_generator = + PriceGenerator::new_tm(settings, BlockchainTokenTypeV1::Mobile, price_sink.clone()) + .await?; + let iot_price_generator = + PriceGenerator::new_tm(settings, BlockchainTokenTypeV1::Iot, price_sink.clone()) + .await?; + let hst_price_generator = + PriceGenerator::new_tm(settings, BlockchainTokenTypeV1::Hst, price_sink).await?; + + TaskManager::builder() + .add_task(file_upload_server) + .add_task(price_sink_server) + .add_task(hnt_price_generator) + .add_task(mobile_price_generator) + .add_task(iot_price_generator) + .add_task(hst_price_generator) + .start() + .await + } } #[tokio::main] diff --git a/price/src/price_generator.rs b/price/src/price_generator.rs index c3a5bf805..3dcf1fc40 100644 --- a/price/src/price_generator.rs +++ b/price/src/price_generator.rs @@ -3,13 +3,14 @@ use anchor_lang::AccountDeserialize; use anyhow::{anyhow, Error, Result}; use chrono::{DateTime, Duration, TimeZone, Utc}; use file_store::file_sink; -use futures::TryFutureExt; +use futures::{future::LocalBoxFuture, TryFutureExt}; use helium_proto::{BlockchainTokenTypeV1, PriceReportV1}; use price_oracle::{calculate_current_price, PriceOracleV0}; use serde::{Deserialize, Serialize}; use solana_client::nonblocking::rpc_client::RpcClient; use solana_sdk::pubkey::Pubkey as SolPubkey; use std::{path::PathBuf, str::FromStr}; +use task_manager::ManagedTask; use tokio::{fs, time}; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -38,6 +39,16 @@ pub struct PriceGenerator { default_price: Option, stale_price_duration: Duration, latest_price_file: PathBuf, + file_sink: Option, +} + +impl ManagedTask for PriceGenerator { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run_tm(shutdown)) + } } impl From for PriceReportV1 { @@ -80,6 +91,27 @@ impl PriceGenerator { stale_price_duration: settings.stale_price_duration(), latest_price_file: PathBuf::from_str(&settings.cache)? .join(format!("{token_type:?}.latest")), + file_sink: None, + }) + } + + pub async fn new_tm( + settings: &Settings, + token_type: BlockchainTokenTypeV1, + file_sink: file_sink::FileSinkClient, + ) -> Result { + let client = RpcClient::new(settings.source.clone()); + Ok(Self { + last_price_opt: None, + token_type, + client, + key: settings.price_key(token_type)?, + default_price: settings.default_price(token_type), + interval_duration: settings.interval().to_std()?, + stale_price_duration: settings.stale_price_duration(), + latest_price_file: PathBuf::from_str(&settings.cache)? + .join(format!("{token_type:?}.latest")), + file_sink: Some(file_sink), }) } @@ -104,8 +136,25 @@ impl PriceGenerator { } } + pub async fn run_tm(mut self, shutdown: triggered::Listener) -> Result<()> { + match (self.key, self.default_price, self.file_sink.clone()) { + (Some(key), _, Some(file_sink)) => self.run_with_key(key, file_sink, &shutdown).await, + (None, Some(defaut_price), Some(file_sink)) => { + self.run_with_default(defaut_price, file_sink, &shutdown) + .await + } + _ => { + tracing::warn!( + "stopping price generator for {:?}, not configured", + self.token_type + ); + Ok(()) + } + } + } + async fn run_with_default( - &self, + &mut self, default_price: u64, file_sink: file_sink::FileSinkClient, shutdown: &triggered::Listener, @@ -118,6 +167,7 @@ impl PriceGenerator { loop { tokio::select! { + biased; _ = shutdown.clone() => break, _ = trigger.tick() => { let price = Price::new(Utc::now(), default_price, self.token_type); @@ -144,6 +194,7 @@ impl PriceGenerator { loop { tokio::select! { + biased; _ = shutdown.clone() => break, _ = trigger.tick() => self.handle(&key, &file_sink).await?, } diff --git a/price/src/price_tracker.rs b/price/src/price_tracker.rs index df64f4b06..190c42098 100644 --- a/price/src/price_tracker.rs +++ b/price/src/price_tracker.rs @@ -1,9 +1,15 @@ +use anyhow::anyhow; use chrono::{DateTime, Duration, TimeZone, Utc}; use file_store::{FileInfo, FileStore, FileType}; -use futures::stream::{StreamExt, TryStreamExt}; +use futures::{ + future::LocalBoxFuture, + stream::{StreamExt, TryStreamExt}, +}; use helium_proto::{BlockchainTokenTypeV1, Message, PriceReportV1}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use task_manager::ManagedTask; +use tokio; use tokio::sync::{mpsc, watch}; #[derive(thiserror::Error, Debug)] @@ -27,7 +33,7 @@ pub enum PriceTrackerError { } #[derive(Clone)] -struct Price { +pub struct Price { price: u64, timestamp: DateTime, } @@ -111,6 +117,29 @@ impl PriceTracker { })) } + pub async fn new_tm(settings: &Settings) -> anyhow::Result<(Self, PriceTrackerDaemon)> { + let file_store = FileStore::from_settings(&settings.file_store).await?; + let price_duration = settings.price_duration(); + let (price_sender, price_receiver) = watch::channel(Prices::new()); + let (task_kill_sender, task_kill_receiver) = mpsc::channel(1); + let initial_timestamp = + calculate_initial_prices(&file_store, price_duration, &price_sender).await?; + + Ok(( + Self { + price_duration: settings.price_duration(), + price_receiver, + task_killer: task_kill_sender, + }, + PriceTrackerDaemon { + file_store, + price_sender, + task_killer: task_kill_receiver, + after: initial_timestamp, + }, + )) + } + pub async fn price( &self, token_type: &BlockchainTokenTypeV1, @@ -166,6 +195,44 @@ async fn run( Ok(()) } +pub struct PriceTrackerDaemon { + file_store: FileStore, + price_sender: watch::Sender, + task_killer: mpsc::Receiver, + after: DateTime, +} + +impl ManagedTask for PriceTrackerDaemon { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + Box::pin(self.run(shutdown)) + } +} + +impl PriceTrackerDaemon { + async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { + tracing::info!("starting price tracker"); + let mut trigger = tokio::time::interval(std::time::Duration::from_secs(30)); + loop { + let shutdown = shutdown.clone(); + tokio::select! { + _ = shutdown => break, + _ = trigger.tick() => { + let timestamp = process_files(&self.file_store, &self.price_sender, self.after).await?; + self.after = timestamp.unwrap_or(self.after); + } + msg = self.task_killer.recv() => if let Some(error) = msg { + return Err(anyhow!(error)); + } + } + } + tracing::info!("stopping price tracker"); + Ok(()) + } +} + async fn calculate_initial_prices( file_store: &FileStore, price_duration: Duration, diff --git a/reward_index/src/indexer.rs b/reward_index/src/indexer.rs index a5b559fa7..e576cf3dc 100644 --- a/reward_index/src/indexer.rs +++ b/reward_index/src/indexer.rs @@ -62,6 +62,7 @@ impl Indexer { loop { tokio::select! { + biased; _ = shutdown.clone() => { tracing::info!("Indexer shutting down"); return Ok(()); diff --git a/task_manager/src/lib.rs b/task_manager/src/lib.rs index d509089c3..0f3de24c2 100644 --- a/task_manager/src/lib.rs +++ b/task_manager/src/lib.rs @@ -138,12 +138,14 @@ fn start_futures( async fn stop_all(futures: Vec) -> anyhow::Result<()> { #[allow(clippy::manual_try_fold)] futures::stream::iter(futures.into_iter().rev()) - .fold(Ok(()), |last_result, local| async move { + .then(|local| async move { local.shutdown_trigger.trigger(); - let result = local.future.await; - last_result.and(result) + local.future.await }) + .collect::>() .await + .into_iter() + .collect() } fn create_triggers(n: usize) -> Vec<(triggered::Trigger, triggered::Listener)> {