Skip to content

Commit

Permalink
Merge pull request #588 from helium/andymck/move-denylist-check-to-ve…
Browse files Browse the repository at this point in the history
…rifications

move denylist check from loader to the runner verification list
  • Loading branch information
andymck authored Aug 9, 2023
2 parents a253899 + 965f619 commit fd07b23
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 99 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions denylist/src/denylist.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{client::DenyListClient, models::metadata::Asset, Error, Result};
use bytes::Buf;
use helium_crypto::{PublicKey, Verify};
use helium_crypto::{PublicKey, PublicKeyBinary, Verify};
use serde::Serialize;
use std::{fs, hash::Hasher, path, str::FromStr};
use twox_hash::XxHash64;
Expand All @@ -25,6 +25,20 @@ pub struct DenyList {
pub filter: Xor32,
}

impl TryFrom<Vec<PublicKeyBinary>> for DenyList {
type Error = Error;
fn try_from(v: Vec<PublicKeyBinary>) -> Result<Self> {
let keys: Vec<u64> = v.into_iter().map(public_key_hash).collect();
let filter = Xor32::from(&keys);
let client = DenyListClient::new()?;
Ok(Self {
tag_name: 0,
client,
filter,
})
}
}

impl DenyList {
pub fn new() -> Result<Self> {
tracing::debug!("initializing new denylist");
Expand Down Expand Up @@ -89,7 +103,7 @@ impl DenyList {
Ok(())
}

pub async fn check_key<K: AsRef<[u8]>>(&self, pub_key: K) -> bool {
pub fn check_key<K: AsRef<[u8]>>(&self, pub_key: K) -> bool {
if self.filter.len() == 0 {
tracing::warn!("empty denylist filter, rejecting key");
return true;
Expand Down
12 changes: 12 additions & 0 deletions file_store/src/iot_invalid_poc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub struct IotInvalidBeaconReport {
pub received_timestamp: DateTime<Utc>,
pub reason: InvalidReason,
pub report: IotBeaconReport,
pub location: Option<u64>,
pub gain: i32,
pub elevation: i32,
}

#[derive(Serialize, Clone)]
Expand Down Expand Up @@ -75,6 +78,9 @@ impl TryFrom<LoraInvalidBeaconReportV1> for IotInvalidBeaconReport {
.report
.ok_or_else(|| Error::not_found("iot invalid beacon report v1"))?
.try_into()?,
location: v.location.parse().ok(),
gain: v.gain,
elevation: v.elevation,
})
}
}
Expand All @@ -87,6 +93,12 @@ impl From<IotInvalidBeaconReport> for LoraInvalidBeaconReportV1 {
received_timestamp,
reason: v.reason as i32,
report: Some(report),
location: v
.location
.map(|l| l.to_string())
.unwrap_or_else(String::new),
gain: v.gain,
elevation: v.elevation,
}
}
}
Expand Down
56 changes: 1 addition & 55 deletions iot_verifier/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
};
use chrono::DateTime;
use chrono::{Duration as ChronoDuration, Utc};
use denylist::DenyList;
use file_store::{
iot_beacon_report::IotBeaconIngestReport,
iot_witness_report::IotWitnessIngestReport,
Expand All @@ -17,7 +16,7 @@ use file_store::{
use futures::{stream, StreamExt};
use helium_crypto::PublicKeyBinary;
use sqlx::PgPool;
use std::{hash::Hasher, ops::DerefMut, time::Duration};
use std::{hash::Hasher, ops::DerefMut};
use tokio::{
sync::Mutex,
time::{self, MissedTickBehavior},
Expand All @@ -34,9 +33,6 @@ pub struct Loader {
window_width: ChronoDuration,
ingestor_rollup_time: ChronoDuration,
max_lookback_age: ChronoDuration,
deny_list_latest_url: String,
deny_list_trigger_interval: Duration,
deny_list: DenyList,
}

#[derive(thiserror::Error, Debug)]
Expand All @@ -45,13 +41,10 @@ pub enum NewLoaderError {
FileStoreError(#[from] file_store::Error),
#[error("db_store error: {0}")]
DbStoreError(#[from] db_store::Error),
#[error("denylist error: {0}")]
DenyListError(#[from] denylist::Error),
}

pub enum ValidGatewayResult {
Valid,
Denied,
Unknown,
}

Expand All @@ -63,17 +56,13 @@ impl Loader {
let window_width = settings.poc_loader_window_width();
let ingestor_rollup_time = settings.ingestor_rollup_time();
let max_lookback_age = settings.loader_window_max_lookback_age();
let deny_list = DenyList::new()?;
Ok(Self {
pool,
ingest_store,
poll_time,
window_width,
ingestor_rollup_time,
max_lookback_age,
deny_list_latest_url: settings.denylist.denylist_url.clone(),
deny_list_trigger_interval: settings.denylist.trigger_interval(),
deny_list,
})
}

Expand All @@ -85,21 +74,12 @@ impl Loader {
tracing::info!("started verifier loader");
let mut report_timer = time::interval(self.poll_time);
report_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
let mut denylist_timer = time::interval(self.deny_list_trigger_interval);
denylist_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
if shutdown.is_triggered() {
break;
}
tokio::select! {
_ = shutdown.clone() => break,
_ = denylist_timer.tick() =>
match self.handle_denylist_tick().await {
Ok(()) => (),
Err(err) => {
tracing::error!("fatal loader error, denylist_tick triggered: {err:?}");
}
},
_ = report_timer.tick() => match self.handle_report_tick(gateway_cache).await {
Ok(()) => (),
Err(err) => {
Expand All @@ -112,23 +92,6 @@ impl Loader {
Ok(())
}

async fn handle_denylist_tick(&mut self) -> anyhow::Result<()> {
tracing::info!("handling denylist tick");
// sink any errors whilst updating the denylist
// the verifier should not stop just because github
// could not be reached for example
match self
.deny_list
.update_to_latest(&self.deny_list_latest_url)
.await
{
Ok(()) => (),
Err(e) => tracing::warn!("failed to update denylist: {e}"),
}
tracing::info!("completed handling denylist tick");
Ok(())
}

async fn handle_report_tick(&self, gateway_cache: &GatewayCache) -> anyhow::Result<()> {
tracing::info!("handling report tick");
let now = Utc::now();
Expand Down Expand Up @@ -375,11 +338,6 @@ impl Loader {
};
Ok(Some(res))
}
ValidGatewayResult::Denied => {
metrics.increment_beacons_denied();
Ok(None)
}

ValidGatewayResult::Unknown => {
metrics.increment_beacons_unknown();
Ok(None)
Expand Down Expand Up @@ -410,10 +368,6 @@ impl Loader {
metrics.increment_witnesses();
Ok(Some(res))
}
ValidGatewayResult::Denied => {
metrics.increment_witnesses_denied();
Ok(None)
}
ValidGatewayResult::Unknown => {
metrics.increment_witnesses_unknown();
Ok(None)
Expand Down Expand Up @@ -445,10 +399,6 @@ impl Loader {
pub_key: &PublicKeyBinary,
gateway_cache: &GatewayCache,
) -> ValidGatewayResult {
if self.check_gw_denied(pub_key).await {
tracing::debug!("dropping denied gateway : {:?}", &pub_key);
return ValidGatewayResult::Denied;
}
if self.check_unknown_gw(pub_key, gateway_cache).await {
tracing::debug!("dropping unknown gateway: {:?}", &pub_key);
return ValidGatewayResult::Unknown;
Expand All @@ -463,10 +413,6 @@ impl Loader {
) -> bool {
gateway_cache.resolve_gateway_info(pub_key).await.is_err()
}

async fn check_gw_denied(&self, pub_key: &PublicKeyBinary) -> bool {
self.deny_list.check_key(pub_key).await
}
}

fn filter_key_hash(data: &[u8]) -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl Server {

// init da processes
let mut loader = loader::Loader::from_settings(settings, pool.clone()).await?;
let mut runner = runner::Runner::from_settings(settings, pool.clone()).await?;
let mut runner = runner::Runner::new(settings, pool.clone()).await?;
let purger = purger::Purger::from_settings(settings, pool.clone()).await?;
let mut density_scaler =
DensityScaler::from_settings(settings, pool, gateway_updater_receiver.clone()).await?;
Expand Down
Loading

0 comments on commit fd07b23

Please sign in to comment.