Skip to content

Commit

Permalink
Update mobile verifier to record last_reward_end_time metric on boot
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Jul 12, 2023
1 parent a76a917 commit 4d8d0a0
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 25 deletions.
5 changes: 4 additions & 1 deletion mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
data_session::DataSessionIngestor, heartbeats::HeartbeatDaemon, rewarder::Rewarder,
speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, Settings,
speedtests::SpeedtestDaemon, subscriber_location::SubscriberLocationIngestor, telemetry,
Settings,
};
use anyhow::{Error, Result};
use chrono::Duration;
Expand Down Expand Up @@ -38,6 +39,8 @@ impl Cmd {
.await?;
sqlx::migrate!().run(&pool).await?;

telemetry::initialize(&pool).await?;

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
let file_upload =
file_upload::FileUpload::from_settings(&settings.output, file_upload_rx).await?;
Expand Down
2 changes: 1 addition & 1 deletion mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Heartbeat {
timestamp: self.timestamp.timestamp() as u64,
coverage_object: Vec::with_capacity(0), // Placeholder so the project compiles
},
[],
&[("validity", self.validity.as_str_name())],
)
.await?;
Ok(())
Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod reward_shares;
mod settings;
mod speedtests;
mod subscriber_location;
mod telemetry;

pub mod cli;
pub mod rewarder;
Expand Down
37 changes: 17 additions & 20 deletions mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
heartbeats::HeartbeatReward,
reward_shares::{MapperShares, PocShares, TransferRewards},
speedtests::SpeedtestAverages,
subscriber_location,
subscriber_location, telemetry,
};
use anyhow::bail;
use chrono::{DateTime, Duration, TimeZone, Utc};
Expand Down Expand Up @@ -54,8 +54,8 @@ impl Rewarder {

pub async fn run(self, shutdown: triggered::Listener) -> anyhow::Result<()> {
loop {
let last_rewarded_end_time = self.last_rewarded_end_time().await?;
let next_rewarded_end_time = self.next_rewarded_end_time().await?;
let last_rewarded_end_time = last_rewarded_end_time(&self.pool).await?;
let next_rewarded_end_time = next_rewarded_end_time(&self.pool).await?;
let scheduler = Scheduler::new(
self.reward_period_duration,
last_rewarded_end_time,
Expand Down Expand Up @@ -88,18 +88,6 @@ impl Rewarder {
Ok(())
}

async fn last_rewarded_end_time(&self) -> db_store::Result<DateTime<Utc>> {
Utc.timestamp_opt(meta::fetch(&self.pool, "last_rewarded_end_time").await?, 0)
.single()
.ok_or(db_store::Error::DecodeError)
}

async fn next_rewarded_end_time(&self) -> db_store::Result<DateTime<Utc>> {
Utc.timestamp_opt(meta::fetch(&self.pool, "next_rewarded_end_time").await?, 0)
.single()
.ok_or(db_store::Error::DecodeError)
}

async fn disable_complete_data_checks_until(&self) -> db_store::Result<DateTime<Utc>> {
Utc.timestamp_opt(
meta::fetch(&self.pool, "disable_complete_data_checks_until").await?,
Expand Down Expand Up @@ -180,7 +168,7 @@ impl Rewarder {
let Some(scale) = transfer_rewards.reward_scale().to_f64() else {
bail!("The data transfer rewards scale cannot be converted to a float");
};
metrics::gauge!("data_transfer_rewards_scale", scale);
telemetry::data_transfer_rewards_scale(scale);

for mobile_reward_share in
poc_rewards.into_rewards(transfer_rewards.reward_sum(), reward_period)
Expand Down Expand Up @@ -270,14 +258,23 @@ impl Rewarder {
.await??;

self.reward_manifests.commit().await?;
metrics::gauge!(
"last_rewarded_end_time",
next_reward_period.start.timestamp() as f64
);
telemetry::last_rewarded_end_time(next_reward_period.start);
Ok(())
}
}

pub async fn last_rewarded_end_time(db: &Pool<Postgres>) -> db_store::Result<DateTime<Utc>> {
Utc.timestamp_opt(meta::fetch(db, "last_rewarded_end_time").await?, 0)
.single()
.ok_or(db_store::Error::DecodeError)
}

async fn next_rewarded_end_time(db: &Pool<Postgres>) -> db_store::Result<DateTime<Utc>> {
Utc.timestamp_opt(meta::fetch(db, "next_rewarded_end_time").await?, 0)
.single()
.ok_or(db_store::Error::DecodeError)
}

async fn save_last_rewarded_end_time(
exec: impl PgExecutor<'_>,
value: &DateTime<Utc>,
Expand Down
6 changes: 3 additions & 3 deletions mobile_verifier/src/speedtests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl SpeedtestRollingAverage {
pub async fn write(&self, averages: &file_sink::FileSinkClient) -> file_store::Result {
// Write out the speedtests to S3
let average = Average::from(&self.speedtests);
let validity = average.validity() as i32;
let validity = average.validity();
// this is guaratneed to safely convert and not panic as it can only be one of
// four possible decimal values based on the speedtest average tier
let reward_multiplier = average.reward_multiplier().try_into().unwrap();
Expand Down Expand Up @@ -272,10 +272,10 @@ impl SpeedtestRollingAverage {
latency_ms: st.latency as u32,
})
.collect(),
validity,
validity: validity as i32,
reward_multiplier,
},
[],
&[("validity", validity.as_str_name())],
)
.await?;

Expand Down
21 changes: 21 additions & 0 deletions mobile_verifier/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use chrono::{DateTime, Utc};
use sqlx::{Pool, Postgres};

use crate::rewarder;

const LAST_REWARDED_END_TIME: &str = "last_rewarded_end_time";
const DATA_TRANSFER_REWARDS_SCALE: &str = "data_transfer_rewards_scale";

pub async fn initialize(db: &Pool<Postgres>) -> anyhow::Result<()> {
last_rewarded_end_time(rewarder::last_rewarded_end_time(db).await?);

Ok(())
}

pub fn last_rewarded_end_time(timestamp: DateTime<Utc>) {
metrics::gauge!(LAST_REWARDED_END_TIME, timestamp.timestamp() as f64);
}

pub fn data_transfer_rewards_scale(scale: f64) {
metrics::gauge!(DATA_TRANSFER_REWARDS_SCALE, scale);
}

0 comments on commit 4d8d0a0

Please sign in to comment.