Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into map/modeled-coverage-…
Browse files Browse the repository at this point in the history
…phase-1
  • Loading branch information
maplant committed Aug 8, 2023
2 parents 49c2f1e + a253899 commit 3c8fb99
Show file tree
Hide file tree
Showing 19 changed files with 366 additions and 236 deletions.
76 changes: 55 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 33 additions & 6 deletions iot_config/src/route_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,21 @@ impl RouteService {
}
}

async fn verify_request_signature_or_stream<'a, R>(
&self,
signer: &PublicKey,
request: &R,
id: OrgId<'a>,
) -> Result<(), Status>
where
R: MsgVerify,
{
if let Ok(()) = self.verify_request_signature(signer, request, id).await {
return Ok(());
}
self.verify_stream_request_signature(signer, request)
}

fn sign_response(&self, response: &[u8]) -> Result<Vec<u8>, Status> {
self.signing_key
.sign(response)
Expand Down Expand Up @@ -418,8 +433,12 @@ impl iot_config::Route for RouteService {
telemetry::count_request("route", "get-euis");

let signer = verify_public_key(&request.signer)?;
self.verify_request_signature(&signer, &request, OrgId::RouteId(&request.route_id))
.await?;
self.verify_request_signature_or_stream(
&signer,
&request,
OrgId::RouteId(&request.route_id),
)
.await?;

let pool = self.pool.clone();
let (tx, rx) = tokio::sync::mpsc::channel(20);
Expand Down Expand Up @@ -575,8 +594,12 @@ impl iot_config::Route for RouteService {
telemetry::count_request("route", "get-devaddr-ranges");

let signer = verify_public_key(&request.signer)?;
self.verify_request_signature(&signer, &request, OrgId::RouteId(&request.route_id))
.await?;
self.verify_request_signature_or_stream(
&signer,
&request,
OrgId::RouteId(&request.route_id),
)
.await?;

let (tx, rx) = tokio::sync::mpsc::channel(20);
let pool = self.pool.clone();
Expand Down Expand Up @@ -739,8 +762,12 @@ impl iot_config::Route for RouteService {
telemetry::count_request("route", "list-skfs");

let signer = verify_public_key(&request.signer)?;
self.verify_request_signature(&signer, &request, OrgId::RouteId(&request.route_id))
.await?;
self.verify_request_signature_or_stream(
&signer,
&request,
OrgId::RouteId(&request.route_id),
)
.await?;

let pool = self.pool.clone();
let (tx, rx) = tokio::sync::mpsc::channel(20);
Expand Down
63 changes: 35 additions & 28 deletions iot_packet_verifier/src/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ use crate::{
use futures_util::StreamExt;
use helium_crypto::PublicKeyBinary;
use solana::SolanaNetwork;
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
use tokio::sync::Mutex;

/// Caches balances fetched from the solana chain and debits made by the
/// packet verifier.
pub struct BalanceCache<S> {
balances: BalanceStore,
payer_accounts: BalanceStore,
solana: S,
}

pub type BalanceStore = Arc<Mutex<HashMap<PublicKeyBinary, Balance>>>;
pub type BalanceStore = Arc<Mutex<HashMap<PublicKeyBinary, PayerAccount>>>;

impl<S> BalanceCache<S>
where
Expand All @@ -40,23 +43,23 @@ where
let balance = solana.payer_balance(&payer).await?;
balances.insert(
payer,
Balance {
PayerAccount {
burned: burn_amount as u64,
balance,
},
);
}

Ok(Self {
balances: Arc::new(Mutex::new(balances)),
payer_accounts: Arc::new(Mutex::new(balances)),
solana,
})
}
}

impl<S> BalanceCache<S> {
pub fn balances(&self) -> BalanceStore {
self.balances.clone()
self.payer_accounts.clone()
}
}

Expand All @@ -73,40 +76,44 @@ where
&self,
payer: &PublicKeyBinary,
amount: u64,
trigger_balance_check_threshold: u64,
) -> Result<Option<u64>, S::Error> {
let mut balances = self.balances.lock().await;
let mut payer_accounts = self.payer_accounts.lock().await;

let balance = if !balances.contains_key(payer) {
let new_balance = self.solana.payer_balance(payer).await?;
balances.insert(payer.clone(), Balance::new(new_balance));
balances.get_mut(payer).unwrap()
} else {
let balance = balances.get_mut(payer).unwrap();
// Fetch the balance if we haven't seen the payer before
if let Entry::Vacant(payer_account) = payer_accounts.entry(payer.clone()) {
let payer_account =
payer_account.insert(PayerAccount::new(self.solana.payer_balance(payer).await?));
return Ok((payer_account.balance >= amount).then(|| {
payer_account.burned += amount;
payer_account.balance - amount
}));
}

// If the balance is not sufficient, check to see if it has been increased
if balance.balance < amount + balance.burned {
balance.balance = self.solana.payer_balance(payer).await?;
let payer_account = payer_accounts.get_mut(payer).unwrap();
match payer_account
.balance
.checked_sub(amount + payer_account.burned)
{
Some(remaining_balance) => {
if remaining_balance < trigger_balance_check_threshold {
payer_account.balance = self.solana.payer_balance(payer).await?;
}
payer_account.burned += amount;
Ok(Some(payer_account.balance - payer_account.burned))
}

balance
};

Ok(if balance.balance >= amount + balance.burned {
balance.burned += amount;
Some(balance.balance - balance.burned)
} else {
None
})
None => Ok(None),
}
}
}

#[derive(Copy, Clone, Debug, Default)]
pub struct Balance {
pub struct PayerAccount {
pub balance: u64,
pub burned: u64,
}

impl Balance {
impl PayerAccount {
pub fn new(balance: u64) -> Self {
Self { balance, burned: 0 }
}
Expand Down
16 changes: 10 additions & 6 deletions iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,22 @@ where
.await
.map_err(BurnError::SolanaError)?;

// Now that we have successfully executed the burn and are no long in
// sync land, we can remove the amount burned.
// Now that we have successfully executed the burn and are no longer in
// sync land, we can remove the amount burned:
self.pending_burns
.subtract_burned_amount(&payer, amount)
.await
.map_err(BurnError::SqlError)?;

let mut balance_lock = self.balances.lock().await;
let balances = balance_lock.get_mut(&payer).unwrap();
balances.burned -= amount;
// Zero the balance in order to force a reset:
balances.balance = 0;
let payer_account = balance_lock.get_mut(&payer).unwrap();
payer_account.burned -= amount;
// Reset the balance of the payer:
payer_account.balance = self
.solana
.payer_balance(&payer)
.await
.map_err(BurnError::SolanaError)?;

metrics::counter!("burned", amount, "payer" => payer.to_string());

Expand Down
Loading

0 comments on commit 3c8fb99

Please sign in to comment.