From 9e76d72261da4aa1bc463a02c787656ae0b05af6 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Wed, 12 Jul 2023 14:16:05 -0400 Subject: [PATCH] Update iot-verifier to record last_rewarded_end_time as boot --- iot_verifier/src/lib.rs | 2 +- iot_verifier/src/loader.rs | 2 +- iot_verifier/src/main.rs | 7 +- iot_verifier/src/packet_loader.rs | 2 +- iot_verifier/src/purger.rs | 4 +- iot_verifier/src/rewarder.rs | 12 +-- iot_verifier/src/runner.rs | 12 +-- iot_verifier/src/{metrics.rs => telemetry.rs} | 101 ++++++++++-------- 8 files changed, 76 insertions(+), 66 deletions(-) rename iot_verifier/src/{metrics.rs => telemetry.rs} (62%) diff --git a/iot_verifier/src/lib.rs b/iot_verifier/src/lib.rs index e2e0fb7fe..412543d0d 100644 --- a/iot_verifier/src/lib.rs +++ b/iot_verifier/src/lib.rs @@ -6,7 +6,6 @@ mod hex_density; pub mod last_beacon; pub mod loader; pub mod meta; -pub mod metrics; pub mod packet_loader; pub mod poc; pub mod poc_report; @@ -16,5 +15,6 @@ pub mod reward_share; pub mod rewarder; pub mod runner; mod settings; +pub mod telemetry; pub mod tx_scaler; pub use settings::Settings; diff --git a/iot_verifier/src/loader.rs b/iot_verifier/src/loader.rs index 611627f6e..42f1a43ca 100644 --- a/iot_verifier/src/loader.rs +++ b/iot_verifier/src/loader.rs @@ -1,8 +1,8 @@ use crate::{ gateway_cache::GatewayCache, meta::Meta, - metrics::LoaderMetricTracker, poc_report::{InsertBindings, IotStatus, Report, ReportType}, + telemetry::LoaderMetricTracker, Settings, }; use chrono::DateTime; diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index e8c40f9d1..a80f820ef 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -9,8 +9,8 @@ use futures::TryFutureExt; use iot_config::client::Client as IotConfigClient; use iot_verifier::{ entropy_loader, gateway_cache::GatewayCache, gateway_updater::GatewayUpdater, loader, - metrics::Metrics, packet_loader, poc_report::Report, purger, region_cache::RegionCache, - rewarder::Rewarder, runner, tx_scaler::Server as DensityScaler, Settings, + packet_loader, purger, region_cache::RegionCache, rewarder::Rewarder, runner, telemetry, + tx_scaler::Server as DensityScaler, Settings, }; use price::PriceTracker; use std::path; @@ -82,8 +82,7 @@ impl Server { .await?; sqlx::migrate!().run(&pool).await?; - let count_all_beacons = Report::count_all_beacons(&pool).await?; - Metrics::num_beacons(count_all_beacons); + telemetry::initialize(&pool).await?; let iot_config_client = IotConfigClient::from_settings(&settings.iot_config_client)?; diff --git a/iot_verifier/src/packet_loader.rs b/iot_verifier/src/packet_loader.rs index 32f164993..c0a30fe28 100644 --- a/iot_verifier/src/packet_loader.rs +++ b/iot_verifier/src/packet_loader.rs @@ -1,5 +1,5 @@ use crate::{ - gateway_cache::GatewayCache, metrics::LoaderMetricTracker, reward_share::GatewayDCShare, + gateway_cache::GatewayCache, reward_share::GatewayDCShare, telemetry::LoaderMetricTracker, Settings, }; use chrono::{Duration as ChronoDuration, Utc}; diff --git a/iot_verifier/src/purger.rs b/iot_verifier/src/purger.rs index 80e2e7248..b565fbef4 100644 --- a/iot_verifier/src/purger.rs +++ b/iot_verifier/src/purger.rs @@ -1,4 +1,4 @@ -use crate::{entropy::Entropy, metrics::Metrics, poc_report::Report, Settings}; +use crate::{entropy::Entropy, poc_report::Report, telemetry, Settings}; use chrono::Duration; use file_store::{ file_sink::{self, FileSinkClient}, @@ -209,7 +209,7 @@ impl Purger { .await?; // delete the report from the DB Report::delete_report(tx.lock().await.deref_mut(), &beacon_id).await?; - Metrics::decrement_num_beacons(); + telemetry::decrement_num_beacons(); Ok(()) } diff --git a/iot_verifier/src/rewarder.rs b/iot_verifier/src/rewarder.rs index 5c90f013d..80cd6cb92 100644 --- a/iot_verifier/src/rewarder.rs +++ b/iot_verifier/src/rewarder.rs @@ -1,4 +1,7 @@ -use crate::reward_share::{operational_rewards, GatewayShares}; +use crate::{ + reward_share::{operational_rewards, GatewayShares}, + telemetry, +}; use chrono::{DateTime, Duration, TimeZone, Utc}; use db_store::meta; use file_store::{file_sink, traits::TimestampEncode}; @@ -129,10 +132,7 @@ impl Rewarder { .await? .await??; self.reward_manifests_sink.commit().await?; - metrics::gauge!( - "last_rewarded_end_time", - scheduler.reward_period.end.timestamp() as f64 - ); + telemetry::last_rewarded_end_time(scheduler.reward_period.end); Ok(()) } @@ -181,7 +181,7 @@ impl Rewarder { } } -async fn fetch_rewarded_timestamp( +pub async fn fetch_rewarded_timestamp( timestamp_key: &str, db: impl PgExecutor<'_>, ) -> db_store::Result> { diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 12baf7406..2a9b30995 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -1,7 +1,7 @@ use crate::{ - gateway_cache::GatewayCache, hex_density::HexDensityMap, last_beacon::LastBeacon, - metrics::Metrics, poc::Poc, poc_report::Report, region_cache::RegionCache, - reward_share::GatewayPocShare, Settings, + gateway_cache::GatewayCache, hex_density::HexDensityMap, last_beacon::LastBeacon, poc::Poc, + poc_report::Report, region_cache::RegionCache, reward_share::GatewayPocShare, telemetry, + Settings, }; use chrono::{Duration as ChronoDuration, Utc}; use file_store::{ @@ -447,7 +447,7 @@ impl Runner { } // done with these poc reports, purge em from the db Report::delete_poc(&self.pool, &beacon_id).await?; - Metrics::decrement_num_beacons(); + telemetry::decrement_num_beacons(); Ok(()) } @@ -498,7 +498,7 @@ impl Runner { // update timestamp of last beacon for the beaconer LastBeacon::update_last_timestamp(&self.pool, pub_key.as_ref(), received_timestamp).await?; Report::delete_poc(&self.pool, &packet_data).await?; - Metrics::decrement_num_beacons(); + telemetry::decrement_num_beacons(); Ok(()) } } @@ -592,7 +592,7 @@ fn fire_invalid_witness_metric(witnesses: &[IotVerifiedWitnessReport]) { .iter() .filter(|witness| !matches!(witness.invalid_reason, InvalidReason::ReasonNone)) .for_each(|witness| { - Metrics::increment_invalid_witnesses(&[( + telemetry::increment_invalid_witnesses(&[( "reason", witness.invalid_reason.clone().as_str_name(), )]) diff --git a/iot_verifier/src/metrics.rs b/iot_verifier/src/telemetry.rs similarity index 62% rename from iot_verifier/src/metrics.rs rename to iot_verifier/src/telemetry.rs index 94cef51cd..2629a6a7e 100644 --- a/iot_verifier/src/metrics.rs +++ b/iot_verifier/src/telemetry.rs @@ -1,5 +1,10 @@ use std::cell::RefCell; +use chrono::{DateTime, Utc}; +use sqlx::{Pool, Postgres}; + +use crate::{poc_report::Report, rewarder}; + const PACKET_COUNTER: &str = concat!(env!("CARGO_PKG_NAME"), "_", "packet"); const NON_REWARDABLE_PACKET_COUNTER: &str = concat!(env!("CARGO_PKG_NAME"), "_", "non_rewardable_packet"); @@ -11,48 +16,57 @@ const LOADER_DROPPED_WITNESS_COUNTER: &str = const BEACON_GUAGE: &str = concat!(env!("CARGO_PKG_NAME"), "_", "num_beacons"); const INVALID_WITNESS_COUNTER: &str = concat!(env!("CARGO_PKG_NAME"), "_", "invalid_witness_report"); +const LAST_REWARDED_END_TIME: &str = "last_rewarded_end_time"; -pub struct Metrics; +pub async fn initialize(db: &Pool) -> anyhow::Result<()> { + last_rewarded_end_time(rewarder::fetch_rewarded_timestamp(LAST_REWARDED_END_TIME, db).await?); + num_beacons(Report::count_all_beacons(db).await?); -impl Metrics { - pub fn count_packets(count: u64) { - metrics::counter!(PACKET_COUNTER, count); - } + Ok(()) +} - pub fn count_non_rewardable_packets(count: u64) { - metrics::counter!(NON_REWARDABLE_PACKET_COUNTER, count); - } +pub fn count_packets(count: u64) { + metrics::counter!(PACKET_COUNTER, count); +} - pub fn count_loader_beacons(count: u64) { - metrics::counter!(LOADER_BEACON_COUNTER, count); - } +pub fn count_non_rewardable_packets(count: u64) { + metrics::counter!(NON_REWARDABLE_PACKET_COUNTER, count); +} - pub fn count_loader_witnesses(count: u64) { - metrics::counter!(LOADER_WITNESS_COUNTER, count); - } +pub fn count_loader_beacons(count: u64) { + metrics::counter!(LOADER_BEACON_COUNTER, count); +} - pub fn count_loader_dropped_beacons(count: u64, labels: &[(&'static str, &'static str)]) { - metrics::counter!(LOADER_DROPPED_BEACON_COUNTER, count, labels); - } +pub fn count_loader_witnesses(count: u64) { + metrics::counter!(LOADER_WITNESS_COUNTER, count); +} - pub fn count_loader_dropped_witnesses(count: u64, labels: &[(&'static str, &'static str)]) { - metrics::counter!(LOADER_DROPPED_WITNESS_COUNTER, count, labels); - } +pub fn count_loader_dropped_beacons(count: u64, labels: &[(&'static str, &'static str)]) { + metrics::counter!(LOADER_DROPPED_BEACON_COUNTER, count, labels); +} - pub fn num_beacons(count: u64) { - metrics::gauge!(BEACON_GUAGE, count as f64); - } +pub fn count_loader_dropped_witnesses(count: u64, labels: &[(&'static str, &'static str)]) { + metrics::counter!(LOADER_DROPPED_WITNESS_COUNTER, count, labels); +} - pub fn increment_num_beacons_by(count: u64) { - metrics::increment_gauge!(BEACON_GUAGE, count as f64); - } +pub fn num_beacons(count: u64) { + metrics::gauge!(BEACON_GUAGE, count as f64); +} - pub fn decrement_num_beacons() { - metrics::decrement_gauge!(BEACON_GUAGE, 1.0) - } - pub fn increment_invalid_witnesses(labels: &[(&'static str, &'static str)]) { - metrics::increment_counter!(INVALID_WITNESS_COUNTER, labels); - } +pub fn increment_num_beacons_by(count: u64) { + metrics::increment_gauge!(BEACON_GUAGE, count as f64); +} + +pub fn decrement_num_beacons() { + metrics::decrement_gauge!(BEACON_GUAGE, 1.0) +} + +pub fn increment_invalid_witnesses(labels: &[(&'static str, &'static str)]) { + metrics::increment_counter!(INVALID_WITNESS_COUNTER, labels); +} + +pub fn last_rewarded_end_time(datetime: DateTime) { + metrics::gauge!(LAST_REWARDED_END_TIME, datetime.timestamp() as f64); } #[derive(Default)] @@ -123,52 +137,49 @@ impl LoaderMetricTracker { let non_rewardable_packets = self.non_rewardable_packets.into_inner(); if packets > 0 { - Metrics::count_packets(packets); + count_packets(packets); } if non_rewardable_packets > 0 { - Metrics::count_non_rewardable_packets(packets); + count_non_rewardable_packets(packets); } if beacons > 0 { - Metrics::count_loader_beacons(beacons); - Metrics::increment_num_beacons_by(beacons); + count_loader_beacons(beacons); + increment_num_beacons_by(beacons); } if beacons_denied > 0 { - Metrics::count_loader_dropped_beacons( - beacons_denied, - &[("status", "ok"), ("reason", "denied")], - ); + count_loader_dropped_beacons(beacons_denied, &[("status", "ok"), ("reason", "denied")]); } if beacons_unknown > 0 { - Metrics::count_loader_dropped_beacons( + count_loader_dropped_beacons( beacons_unknown, &[("status", "ok"), ("reason", "gateway_not_found")], ); } if witnesses > 0 { - Metrics::count_loader_witnesses(witnesses); + count_loader_witnesses(witnesses); } if witnesses_no_beacon > 0 { - Metrics::count_loader_dropped_witnesses( + count_loader_dropped_witnesses( witnesses_no_beacon, &[("status", "ok"), ("reason", "no_associated_beacon_data")], ); } if witnesses_denied > 0 { - Metrics::count_loader_dropped_witnesses( + count_loader_dropped_witnesses( witnesses_denied, &[("status", "ok"), ("reason", "denied")], ); } if witnesses_unknown > 0 { - Metrics::count_loader_dropped_witnesses( + count_loader_dropped_witnesses( witnesses_unknown, &[("status", "ok"), ("reason", "gateway_not_found")], );