Skip to content

Commit

Permalink
refactor speedtests handling, ensure rewards only include tests up un…
Browse files Browse the repository at this point in the history
…til epoch end
  • Loading branch information
andymck committed Jul 26, 2023
1 parent e82216d commit 6ef491b
Show file tree
Hide file tree
Showing 6 changed files with 677 additions and 644 deletions.
14 changes: 7 additions & 7 deletions mobile_verifier/src/cli/reward_from_db.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
heartbeats::HeartbeatReward,
reward_shares::{get_scheduled_tokens_for_poc_and_dc, PocShares},
speedtests::{Average, SpeedtestAverages},
speedtests_average::SpeedtestAverages,
Settings,
};
use anyhow::Result;
Expand Down Expand Up @@ -39,8 +39,8 @@ impl Cmd {
.await?;

let heartbeats = HeartbeatReward::validated(&pool, &epoch);
let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?;
let reward_shares = PocShares::aggregate(heartbeats, speedtests.clone()).await?;
let averages = SpeedtestAverages::aggregate_epoch_averages(epoch.end, &pool).await?;
let reward_shares = PocShares::aggregate(heartbeats, &averages).await?;

let mut total_rewards = 0_u64;
let mut owner_rewards = HashMap::<_, u64>::new();
Expand All @@ -62,11 +62,11 @@ impl Cmd {
}
let rewards: Vec<_> = owner_rewards.into_iter().collect();
let mut multiplier_count = HashMap::<_, usize>::new();
let speedtest_multipliers: Vec<_> = speedtests
.speedtests
let speedtest_multipliers: Vec<_> = averages
.averages
.into_iter()
.map(|(pub_key, avg)| {
let reward_multiplier = Average::from(&avg).reward_multiplier();
.map(|(pub_key, average)| {
let reward_multiplier = average.reward_multiplier;
*multiplier_count.entry(reward_multiplier).or_default() += 1;
(pub_key, reward_multiplier)
})
Expand Down
1 change: 1 addition & 0 deletions mobile_verifier/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod heartbeats;
mod reward_shares;
mod settings;
mod speedtests;
mod speedtests_average;
mod subscriber_location;
mod telemetry;

Expand Down
114 changes: 66 additions & 48 deletions mobile_verifier/src/reward_shares.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::speedtests_average::SpeedtestAverages;
use crate::{
data_session::HotspotMap,
heartbeats::HeartbeatReward,
speedtests::{Average, SpeedtestAverages},
data_session::HotspotMap, heartbeats::HeartbeatReward, speedtests_average::SpeedtestAverage,
subscriber_location::SubscriberValidatedLocations,
};

Expand Down Expand Up @@ -219,17 +218,17 @@ pub struct PocShares {
}

impl PocShares {
pub async fn aggregate(
pub async fn aggregate<'a>(
heartbeats: impl Stream<Item = Result<HeartbeatReward, sqlx::Error>>,
speedtests: SpeedtestAverages,
averages: &SpeedtestAverages,
) -> Result<Self, sqlx::Error> {
let mut poc_shares = Self::default();
let mut heartbeats = std::pin::pin!(heartbeats);
while let Some(heartbeat) = heartbeats.next().await.transpose()? {
let speedmultiplier = speedtests
.get_average(&heartbeat.hotspot_key)
.as_ref()
.map_or(Decimal::ZERO, Average::reward_multiplier);
let speedmultiplier = averages
.averages
.get(&heartbeat.hotspot_key)
.map_or(Decimal::ZERO, SpeedtestAverage::reward_multiplier);
*poc_shares
.hotspot_shares
.entry(heartbeat.hotspot_key)
Expand Down Expand Up @@ -326,14 +325,15 @@ mod test {
data_session,
data_session::HotspotDataSession,
heartbeats::HeartbeatReward,
speedtests::{Speedtest, SpeedtestAverages},
speedtests::Speedtest,
speedtests_average::{SpeedtestAverage, SpeedtestAverages},
subscriber_location::SubscriberValidatedLocations,
};
use chrono::{Duration, Utc};
use futures::stream;
use helium_proto::services::poc_mobile::mobile_reward_share::Reward as MobileReward;
use prost::Message;
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;

fn valid_shares() -> RadioShares {
let mut radio_shares: HashMap<String, Decimal> = Default::default();
Expand Down Expand Up @@ -536,35 +536,39 @@ mod test {
.reward_weight()
}

fn acceptable_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn acceptable_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(10),
download_speed: bytes_per_s(100),
latency: 25,
}
}

fn degraded_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn degraded_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(5),
download_speed: bytes_per_s(60),
latency: 60,
}
}

fn failed_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn failed_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(1),
download_speed: bytes_per_s(20),
latency: 110,
}
}

fn poor_speedtest(timestamp: DateTime<Utc>) -> Speedtest {
fn poor_speedtest(pubkey: PublicKeyBinary, timestamp: DateTime<Utc>) -> Speedtest {
Speedtest {
pubkey,
timestamp,
upload_speed: bytes_per_s(2),
download_speed: bytes_per_s(40),
Expand Down Expand Up @@ -611,19 +615,21 @@ mod test {

let last_timestamp = timestamp - Duration::hours(12);
let g1_speedtests = vec![
acceptable_speedtest(last_timestamp),
acceptable_speedtest(timestamp),
acceptable_speedtest(g1.clone(), last_timestamp),
acceptable_speedtest(g1.clone(), timestamp),
];
let g2_speedtests = vec![
acceptable_speedtest(last_timestamp),
acceptable_speedtest(timestamp),
acceptable_speedtest(g2.clone(), last_timestamp),
acceptable_speedtest(g2.clone(), timestamp),
];
let mut speedtests = HashMap::new();
speedtests.insert(g1.clone(), VecDeque::from(g1_speedtests));
speedtests.insert(g2.clone(), VecDeque::from(g2_speedtests));
let speedtest_avgs = SpeedtestAverages { speedtests };

let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs)
let g1_average = SpeedtestAverage::from(&g1_speedtests);
let g2_average = SpeedtestAverage::from(&g2_speedtests);
let mut averages = HashMap::new();
averages.insert(g1.clone(), g1_average);
averages.insert(g2.clone(), g2_average);
let speedtest_avgs = SpeedtestAverages { averages };

let rewards = PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs)
.await
.unwrap();

Expand Down Expand Up @@ -779,44 +785,56 @@ mod test {
// setup speedtests
let last_speedtest = timestamp - Duration::hours(12);
let gw1_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw1.clone(), last_speedtest),
acceptable_speedtest(gw1.clone(), timestamp),
];
let gw2_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw2.clone(), last_speedtest),
acceptable_speedtest(gw2.clone(), timestamp),
];
let gw3_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw3.clone(), last_speedtest),
acceptable_speedtest(gw3.clone(), timestamp),
];
let gw4_speedtests = vec![
acceptable_speedtest(last_speedtest),
acceptable_speedtest(timestamp),
acceptable_speedtest(gw4.clone(), last_speedtest),
acceptable_speedtest(gw4.clone(), timestamp),
];
let gw5_speedtests = vec![
degraded_speedtest(last_speedtest),
degraded_speedtest(timestamp),
degraded_speedtest(gw5.clone(), last_speedtest),
degraded_speedtest(gw5.clone(), timestamp),
];
let gw6_speedtests = vec![
failed_speedtest(last_speedtest),
failed_speedtest(timestamp),
failed_speedtest(gw6.clone(), last_speedtest),
failed_speedtest(gw6.clone(), timestamp),
];
let gw7_speedtests = vec![
poor_speedtest(gw7.clone(), last_speedtest),
poor_speedtest(gw7.clone(), timestamp),
];
let gw7_speedtests = vec![poor_speedtest(last_speedtest), poor_speedtest(timestamp)];
let mut speedtests = HashMap::new();
speedtests.insert(gw1, VecDeque::from(gw1_speedtests));
speedtests.insert(gw2, VecDeque::from(gw2_speedtests));
speedtests.insert(gw3, VecDeque::from(gw3_speedtests));
speedtests.insert(gw4, VecDeque::from(gw4_speedtests));
speedtests.insert(gw5, VecDeque::from(gw5_speedtests));
speedtests.insert(gw6, VecDeque::from(gw6_speedtests));
speedtests.insert(gw7, VecDeque::from(gw7_speedtests));
let speedtest_avgs = SpeedtestAverages { speedtests };

let gw1_average = SpeedtestAverage::from(&gw1_speedtests);
let gw2_average = SpeedtestAverage::from(&gw2_speedtests);
let gw3_average = SpeedtestAverage::from(&gw3_speedtests);
let gw4_average = SpeedtestAverage::from(&gw4_speedtests);
let gw5_average = SpeedtestAverage::from(&gw5_speedtests);
let gw6_average = SpeedtestAverage::from(&gw6_speedtests);
let gw7_average = SpeedtestAverage::from(&gw7_speedtests);
let mut averages = HashMap::new();
averages.insert(gw1.clone(), gw1_average);
averages.insert(gw2.clone(), gw2_average);
averages.insert(gw3.clone(), gw3_average);
averages.insert(gw4.clone(), gw4_average);
averages.insert(gw5.clone(), gw5_average);
averages.insert(gw6.clone(), gw6_average);
averages.insert(gw7.clone(), gw7_average);

let speedtest_avgs = SpeedtestAverages { averages };

// calculate the rewards for the sample group
let mut owner_rewards = HashMap::<PublicKeyBinary, u64>::new();
let epoch = (now - Duration::hours(1))..now;
for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), speedtest_avgs)
for mobile_reward in PocShares::aggregate(stream::iter(heartbeats).map(Ok), &speedtest_avgs)
.await
.unwrap()
.into_rewards(Decimal::ZERO, &epoch)
Expand Down
8 changes: 4 additions & 4 deletions mobile_verifier/src/rewarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
data_session,
heartbeats::HeartbeatReward,
reward_shares::{MapperShares, PocShares, TransferRewards},
speedtests::SpeedtestAverages,
speedtests_average::SpeedtestAverages,
subscriber_location, telemetry,
};
use anyhow::bail;
Expand Down Expand Up @@ -143,9 +143,9 @@ impl Rewarder {
);

let heartbeats = HeartbeatReward::validated(&self.pool, reward_period);
let speedtests = SpeedtestAverages::validated(&self.pool, reward_period.end).await?;

let poc_rewards = PocShares::aggregate(heartbeats, speedtests).await?;
let averages =
SpeedtestAverages::aggregate_epoch_averages(reward_period.end, &self.pool).await?;
let poc_rewards = PocShares::aggregate(heartbeats, &averages).await?;
let mobile_price = self
.price_tracker
.price(&helium_proto::BlockchainTokenTypeV1::Mobile)
Expand Down
Loading

0 comments on commit 6ef491b

Please sign in to comment.