Skip to content

Commit

Permalink
Update iot-verifier to record last_rewarded_end_time as boot
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Jul 12, 2023
1 parent 4d8d0a0 commit 9e76d72
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 66 deletions.
2 changes: 1 addition & 1 deletion iot_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod hex_density;
pub mod last_beacon;
pub mod loader;
pub mod meta;
pub mod metrics;
pub mod packet_loader;
pub mod poc;
pub mod poc_report;
Expand All @@ -16,5 +15,6 @@ pub mod reward_share;
pub mod rewarder;
pub mod runner;
mod settings;
pub mod telemetry;
pub mod tx_scaler;
pub use settings::Settings;
2 changes: 1 addition & 1 deletion iot_verifier/src/loader.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
gateway_cache::GatewayCache,
meta::Meta,
metrics::LoaderMetricTracker,
poc_report::{InsertBindings, IotStatus, Report, ReportType},
telemetry::LoaderMetricTracker,
Settings,
};
use chrono::DateTime;
Expand Down
7 changes: 3 additions & 4 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use futures::TryFutureExt;
use iot_config::client::Client as IotConfigClient;
use iot_verifier::{
entropy_loader, gateway_cache::GatewayCache, gateway_updater::GatewayUpdater, loader,
metrics::Metrics, packet_loader, poc_report::Report, purger, region_cache::RegionCache,
rewarder::Rewarder, runner, tx_scaler::Server as DensityScaler, Settings,
packet_loader, purger, region_cache::RegionCache, rewarder::Rewarder, runner, telemetry,
tx_scaler::Server as DensityScaler, Settings,
};
use price::PriceTracker;
use std::path;
Expand Down Expand Up @@ -82,8 +82,7 @@ impl Server {
.await?;
sqlx::migrate!().run(&pool).await?;

let count_all_beacons = Report::count_all_beacons(&pool).await?;
Metrics::num_beacons(count_all_beacons);
telemetry::initialize(&pool).await?;

let iot_config_client = IotConfigClient::from_settings(&settings.iot_config_client)?;

Expand Down
2 changes: 1 addition & 1 deletion iot_verifier/src/packet_loader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
gateway_cache::GatewayCache, metrics::LoaderMetricTracker, reward_share::GatewayDCShare,
gateway_cache::GatewayCache, reward_share::GatewayDCShare, telemetry::LoaderMetricTracker,
Settings,
};
use chrono::{Duration as ChronoDuration, Utc};
Expand Down
4 changes: 2 additions & 2 deletions iot_verifier/src/purger.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{entropy::Entropy, metrics::Metrics, poc_report::Report, Settings};
use crate::{entropy::Entropy, poc_report::Report, telemetry, Settings};
use chrono::Duration;
use file_store::{
file_sink::{self, FileSinkClient},
Expand Down Expand Up @@ -209,7 +209,7 @@ impl Purger {
.await?;
// delete the report from the DB
Report::delete_report(tx.lock().await.deref_mut(), &beacon_id).await?;
Metrics::decrement_num_beacons();
telemetry::decrement_num_beacons();
Ok(())
}

Expand Down
12 changes: 6 additions & 6 deletions iot_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::reward_share::{operational_rewards, GatewayShares};
use crate::{
reward_share::{operational_rewards, GatewayShares},
telemetry,
};
use chrono::{DateTime, Duration, TimeZone, Utc};
use db_store::meta;
use file_store::{file_sink, traits::TimestampEncode};
Expand Down Expand Up @@ -129,10 +132,7 @@ impl Rewarder {
.await?
.await??;
self.reward_manifests_sink.commit().await?;
metrics::gauge!(
"last_rewarded_end_time",
scheduler.reward_period.end.timestamp() as f64
);
telemetry::last_rewarded_end_time(scheduler.reward_period.end);
Ok(())
}

Expand Down Expand Up @@ -181,7 +181,7 @@ impl Rewarder {
}
}

async fn fetch_rewarded_timestamp(
pub async fn fetch_rewarded_timestamp(
timestamp_key: &str,
db: impl PgExecutor<'_>,
) -> db_store::Result<DateTime<Utc>> {
Expand Down
12 changes: 6 additions & 6 deletions iot_verifier/src/runner.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
gateway_cache::GatewayCache, hex_density::HexDensityMap, last_beacon::LastBeacon,
metrics::Metrics, poc::Poc, poc_report::Report, region_cache::RegionCache,
reward_share::GatewayPocShare, Settings,
gateway_cache::GatewayCache, hex_density::HexDensityMap, last_beacon::LastBeacon, poc::Poc,
poc_report::Report, region_cache::RegionCache, reward_share::GatewayPocShare, telemetry,
Settings,
};
use chrono::{Duration as ChronoDuration, Utc};
use file_store::{
Expand Down Expand Up @@ -447,7 +447,7 @@ impl Runner {
}
// done with these poc reports, purge em from the db
Report::delete_poc(&self.pool, &beacon_id).await?;
Metrics::decrement_num_beacons();
telemetry::decrement_num_beacons();
Ok(())
}

Expand Down Expand Up @@ -498,7 +498,7 @@ impl Runner {
// update timestamp of last beacon for the beaconer
LastBeacon::update_last_timestamp(&self.pool, pub_key.as_ref(), received_timestamp).await?;
Report::delete_poc(&self.pool, &packet_data).await?;
Metrics::decrement_num_beacons();
telemetry::decrement_num_beacons();
Ok(())
}
}
Expand Down Expand Up @@ -592,7 +592,7 @@ fn fire_invalid_witness_metric(witnesses: &[IotVerifiedWitnessReport]) {
.iter()
.filter(|witness| !matches!(witness.invalid_reason, InvalidReason::ReasonNone))
.for_each(|witness| {
Metrics::increment_invalid_witnesses(&[(
telemetry::increment_invalid_witnesses(&[(
"reason",
witness.invalid_reason.clone().as_str_name(),
)])
Expand Down
101 changes: 56 additions & 45 deletions iot_verifier/src/metrics.rs → iot_verifier/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use std::cell::RefCell;

use chrono::{DateTime, Utc};
use sqlx::{Pool, Postgres};

use crate::{poc_report::Report, rewarder};

const PACKET_COUNTER: &str = concat!(env!("CARGO_PKG_NAME"), "_", "packet");
const NON_REWARDABLE_PACKET_COUNTER: &str =
concat!(env!("CARGO_PKG_NAME"), "_", "non_rewardable_packet");
Expand All @@ -11,48 +16,57 @@ const LOADER_DROPPED_WITNESS_COUNTER: &str =
const BEACON_GUAGE: &str = concat!(env!("CARGO_PKG_NAME"), "_", "num_beacons");
const INVALID_WITNESS_COUNTER: &str =
concat!(env!("CARGO_PKG_NAME"), "_", "invalid_witness_report");
const LAST_REWARDED_END_TIME: &str = "last_rewarded_end_time";

pub struct Metrics;
pub async fn initialize(db: &Pool<Postgres>) -> anyhow::Result<()> {
last_rewarded_end_time(rewarder::fetch_rewarded_timestamp(LAST_REWARDED_END_TIME, db).await?);
num_beacons(Report::count_all_beacons(db).await?);

impl Metrics {
pub fn count_packets(count: u64) {
metrics::counter!(PACKET_COUNTER, count);
}
Ok(())
}

pub fn count_non_rewardable_packets(count: u64) {
metrics::counter!(NON_REWARDABLE_PACKET_COUNTER, count);
}
pub fn count_packets(count: u64) {
metrics::counter!(PACKET_COUNTER, count);
}

pub fn count_loader_beacons(count: u64) {
metrics::counter!(LOADER_BEACON_COUNTER, count);
}
pub fn count_non_rewardable_packets(count: u64) {
metrics::counter!(NON_REWARDABLE_PACKET_COUNTER, count);
}

pub fn count_loader_witnesses(count: u64) {
metrics::counter!(LOADER_WITNESS_COUNTER, count);
}
pub fn count_loader_beacons(count: u64) {
metrics::counter!(LOADER_BEACON_COUNTER, count);
}

pub fn count_loader_dropped_beacons(count: u64, labels: &[(&'static str, &'static str)]) {
metrics::counter!(LOADER_DROPPED_BEACON_COUNTER, count, labels);
}
pub fn count_loader_witnesses(count: u64) {
metrics::counter!(LOADER_WITNESS_COUNTER, count);
}

pub fn count_loader_dropped_witnesses(count: u64, labels: &[(&'static str, &'static str)]) {
metrics::counter!(LOADER_DROPPED_WITNESS_COUNTER, count, labels);
}
pub fn count_loader_dropped_beacons(count: u64, labels: &[(&'static str, &'static str)]) {
metrics::counter!(LOADER_DROPPED_BEACON_COUNTER, count, labels);
}

pub fn num_beacons(count: u64) {
metrics::gauge!(BEACON_GUAGE, count as f64);
}
pub fn count_loader_dropped_witnesses(count: u64, labels: &[(&'static str, &'static str)]) {
metrics::counter!(LOADER_DROPPED_WITNESS_COUNTER, count, labels);
}

pub fn increment_num_beacons_by(count: u64) {
metrics::increment_gauge!(BEACON_GUAGE, count as f64);
}
pub fn num_beacons(count: u64) {
metrics::gauge!(BEACON_GUAGE, count as f64);
}

pub fn decrement_num_beacons() {
metrics::decrement_gauge!(BEACON_GUAGE, 1.0)
}
pub fn increment_invalid_witnesses(labels: &[(&'static str, &'static str)]) {
metrics::increment_counter!(INVALID_WITNESS_COUNTER, labels);
}
pub fn increment_num_beacons_by(count: u64) {
metrics::increment_gauge!(BEACON_GUAGE, count as f64);
}

pub fn decrement_num_beacons() {
metrics::decrement_gauge!(BEACON_GUAGE, 1.0)
}

pub fn increment_invalid_witnesses(labels: &[(&'static str, &'static str)]) {
metrics::increment_counter!(INVALID_WITNESS_COUNTER, labels);
}

pub fn last_rewarded_end_time(datetime: DateTime<Utc>) {
metrics::gauge!(LAST_REWARDED_END_TIME, datetime.timestamp() as f64);
}

#[derive(Default)]
Expand Down Expand Up @@ -123,52 +137,49 @@ impl LoaderMetricTracker {
let non_rewardable_packets = self.non_rewardable_packets.into_inner();

if packets > 0 {
Metrics::count_packets(packets);
count_packets(packets);
}

if non_rewardable_packets > 0 {
Metrics::count_non_rewardable_packets(packets);
count_non_rewardable_packets(packets);
}

if beacons > 0 {
Metrics::count_loader_beacons(beacons);
Metrics::increment_num_beacons_by(beacons);
count_loader_beacons(beacons);
increment_num_beacons_by(beacons);
}

if beacons_denied > 0 {
Metrics::count_loader_dropped_beacons(
beacons_denied,
&[("status", "ok"), ("reason", "denied")],
);
count_loader_dropped_beacons(beacons_denied, &[("status", "ok"), ("reason", "denied")]);
}

if beacons_unknown > 0 {
Metrics::count_loader_dropped_beacons(
count_loader_dropped_beacons(
beacons_unknown,
&[("status", "ok"), ("reason", "gateway_not_found")],
);
}

if witnesses > 0 {
Metrics::count_loader_witnesses(witnesses);
count_loader_witnesses(witnesses);
}

if witnesses_no_beacon > 0 {
Metrics::count_loader_dropped_witnesses(
count_loader_dropped_witnesses(
witnesses_no_beacon,
&[("status", "ok"), ("reason", "no_associated_beacon_data")],
);
}

if witnesses_denied > 0 {
Metrics::count_loader_dropped_witnesses(
count_loader_dropped_witnesses(
witnesses_denied,
&[("status", "ok"), ("reason", "denied")],
);
}

if witnesses_unknown > 0 {
Metrics::count_loader_dropped_witnesses(
count_loader_dropped_witnesses(
witnesses_unknown,
&[("status", "ok"), ("reason", "gateway_not_found")],
);
Expand Down

0 comments on commit 9e76d72

Please sign in to comment.