From 8f6d6e8ce1e706574cf4c2e43de7aa24ed0a1aeb Mon Sep 17 00:00:00 2001 From: Macpie Date: Wed, 4 Sep 2024 15:32:33 -0700 Subject: [PATCH] Refactor validate to be simpler --- mobile_verifier/src/promotion_reward.rs | 171 ++++++++++-------------- 1 file changed, 72 insertions(+), 99 deletions(-) diff --git a/mobile_verifier/src/promotion_reward.rs b/mobile_verifier/src/promotion_reward.rs index d937e06f4..6f0fa259e 100644 --- a/mobile_verifier/src/promotion_reward.rs +++ b/mobile_verifier/src/promotion_reward.rs @@ -8,7 +8,7 @@ use file_store::{ traits::{FileSinkWriteExt, TimestampEncode}, FileType, }; -use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; use helium_crypto::PublicKeyBinary; use helium_proto::services::{ mobile_config::NetworkKeyRole, @@ -30,7 +30,6 @@ use sqlx::{postgres::PgRow, PgPool, Postgres, Row, Transaction}; use std::{ collections::HashMap, ops::Range, - pin::pin, time::{Duration, Instant}, }; use task_manager::{ManagedTask, TaskManager}; @@ -117,21 +116,27 @@ impl PromotionRewardDaemon { tracing::info!("Processing promotion reward file {}", file.file_info.key); let mut transaction = self.pool.begin().await?; - let reports = file.into_stream(&mut transaction).await?; + let mut promotion_rewards = file.into_stream(&mut transaction).await?; - let mut verified_promotion_rewards = - pin!(ValidatedPromotionReward::validate_promotion_rewards( - reports, + while let Some(promotion_reward) = promotion_rewards.next().await { + let promotion_reward_status = validate_promotion_reward( + &promotion_reward, &self.authorization_verifier, &self.gateway_info_resolver, - &self.entity_verifier - )); + &self.entity_verifier, + ) + .await?; - while let Some(promotion_reward) = verified_promotion_rewards.try_next().await? { - promotion_reward.write(&self.promotion_rewards_sink).await?; - if promotion_reward.is_valid() { - promotion_reward.save(&mut transaction).await?; + if promotion_reward_status == PromotionRewardStatus::Valid { + save_promotion_reward(&mut transaction, &promotion_reward).await?; } + + write_promotion_reward( + &self.promotion_rewards_sink, + &promotion_reward, + promotion_reward_status, + ) + .await?; } self.promotion_rewards_sink.commit().await?; @@ -155,11 +160,6 @@ impl ManagedTask for PromotionRewardDaemon { } } -pub struct ValidatedPromotionReward { - validity: PromotionRewardStatus, - promotion_reward: PromotionReward, -} - async fn validate_promotion_reward( promotion_reward: &PromotionReward, authorization_verifier: &impl AuthorizationVerifier, @@ -197,93 +197,66 @@ async fn validate_promotion_reward( } } -impl ValidatedPromotionReward { - fn validate_promotion_rewards<'a>( - promotion_rewards: impl Stream + 'a, - authorization_verifier: &'a impl AuthorizationVerifier, - gateway_info_resolver: &'a impl GatewayResolver, - entity_verifier: &'a impl EntityVerifier, - ) -> impl Stream> + 'a { - promotion_rewards.then(move |promotion_reward| async move { - async move { - let validity = validate_promotion_reward( - &promotion_reward, - authorization_verifier, - gateway_info_resolver, - entity_verifier, - ) - .await?; - Ok(Self { - validity, - promotion_reward: promotion_reward, - }) - } - .await - }) - } - - fn is_valid(&self) -> bool { - matches!(self.validity, PromotionRewardStatus::Valid) - } +async fn write_promotion_reward( + file_sink: &FileSinkClient, + promotion_reward: &PromotionReward, + status: PromotionRewardStatus, +) -> anyhow::Result<()> { + file_sink + .write( + VerifiedPromotionRewardV1 { + report: Some(PromotionRewardIngestReportV1 { + received_timestamp: promotion_reward + .received_timestamp + .encode_timestamp_millis(), + report: Some(promotion_reward.clone().into()), + }), + status: status as i32, + timestamp: Utc::now().encode_timestamp_millis(), + }, + &[("validity", status.as_str_name())], + ) + .await?; + Ok(()) +} - async fn write( - &self, - promotion_rewards: &FileSinkClient, - ) -> anyhow::Result<()> { - promotion_rewards - .write( - VerifiedPromotionRewardV1 { - report: Some(PromotionRewardIngestReportV1 { - received_timestamp: self - .promotion_reward - .received_timestamp - .encode_timestamp_millis(), - report: Some(self.promotion_reward.clone().into()), - }), - status: self.validity as i32, - timestamp: Utc::now().encode_timestamp_millis(), - }, - &[("validity", self.validity.as_str_name())], +async fn save_promotion_reward( + transaction: &mut Transaction<'_, Postgres>, + promotion_reward: &PromotionReward, +) -> anyhow::Result<()> { + match &promotion_reward.entity { + Entity::SubscriberId(subscriber_id) => { + sqlx::query( + r#" + INSERT INTO subscriber_promotion_rewards (time_of_reward, subscriber_id, carrier_key, shares) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING + "# ) + .bind(promotion_reward.timestamp) + .bind(subscriber_id) + .bind(&promotion_reward.carrier_pub_key) + .bind(promotion_reward.shares as i64) + .execute(&mut *transaction) + .await?; + } + Entity::GatewayKey(gateway_key) => { + sqlx::query( + r#" + INSERT INTO gateway_promotion_rewards (time_of_reward, gateway_key, carrier_key, shares) + VALUES ($1, $2, $3, $4) + ON CONFLICT DO NOTHING + "# + ) + .bind(promotion_reward.timestamp) + .bind(gateway_key) + .bind(&promotion_reward.carrier_pub_key) + .bind(promotion_reward.shares as i64) + .execute(&mut *transaction) .await?; - Ok(()) - } - - async fn save(self, transaction: &mut Transaction<'_, Postgres>) -> anyhow::Result<()> { - match self.promotion_reward.entity { - Entity::SubscriberId(subscriber_id) => { - sqlx::query( - r#" - INSERT INTO subscriber_promotion_rewards (time_of_reward, subscriber_id, carrier_key, shares) - VALUES ($1, $2, $3, $4) - ON CONFLICT DO NOTHING - "# - ) - .bind(self.promotion_reward.timestamp) - .bind(subscriber_id) - .bind(self.promotion_reward.carrier_pub_key) - .bind(self.promotion_reward.shares as i64) - .execute(&mut *transaction) - .await?; - } - Entity::GatewayKey(gateway_key) => { - sqlx::query( - r#" - INSERT INTO gateway_promotion_rewards (time_of_reward, gateway_key, carrier_key, shares) - VALUES ($1, $2, $3, $4) - ON CONFLICT DO NOTHING - "# - ) - .bind(self.promotion_reward.timestamp) - .bind(gateway_key) - .bind(self.promotion_reward.carrier_pub_key) - .bind(self.promotion_reward.shares as i64) - .execute(&mut *transaction) - .await?; - } } - Ok(()) } + Ok(()) } pub async fn clear_promotion_rewards(