Skip to content

Commit

Permalink
Add deduping to mobile packet verifier (#585)
Browse files Browse the repository at this point in the history
* Add deduping to mobile packet verifier

* Addressing feedback

* Change protobufs back to master
  • Loading branch information
bbalser authored Aug 2, 2023
1 parent 021efe8 commit 76cd5ae
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 28 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions mobile_packet_verifier/migrations/4_event_ids.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE event_ids (
event_id TEXT NOT NULL PRIMARY KEY,
received_timestamp TIMESTAMPTZ NOT NULL
);
51 changes: 29 additions & 22 deletions mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{DateTime, Utc};
use file_store::file_sink::FileSinkClient;
use file_store::mobile_session::{
DataTransferSessionIngestReport, DataTransferSessionReq, InvalidDataTransferIngestReport,
DataTransferSessionIngestReport, InvalidDataTransferIngestReport,
};
use futures::{Stream, StreamExt};
use helium_crypto::PublicKeyBinary;
Expand All @@ -11,22 +11,12 @@ use helium_proto::services::poc_mobile::{
InvalidDataTransferIngestReportV1,
};
use mobile_config::{
client::{AuthorizationClient, ClientError, GatewayClient},
client::{AuthorizationClient, GatewayClient},
gateway_info::GatewayInfoResolver,
};
use sqlx::{Postgres, Transaction};

#[derive(thiserror::Error, Debug)]
pub enum AccumulationError {
#[error("file store error: {0}")]
FileStoreError(#[from] file_store::Error),
#[error("sqlx error: {0}")]
SqlxError(#[from] sqlx::Error),
#[error("reports stream dropped")]
ReportsStreamDropped,
#[error("config client error: {0}")]
ConfigClientError(#[from] ClientError),
}
use crate::event_ids;

pub async fn accumulate_sessions(
gateway_client: &GatewayClient,
Expand All @@ -35,7 +25,7 @@ pub async fn accumulate_sessions(
invalid_data_session_report_sink: &FileSinkClient,
curr_file_ts: DateTime<Utc>,
reports: impl Stream<Item = DataTransferSessionIngestReport>,
) -> Result<(), AccumulationError> {
) -> anyhow::Result<()> {
tokio::pin!(reports);

while let Some(report) = reports.next().await {
Expand All @@ -51,7 +41,7 @@ pub async fn accumulate_sessions(
continue;
}

let report_validity = verify_report(gateway_client, auth_client, &report.report).await;
let report_validity = verify_report(conn, gateway_client, auth_client, &report).await?;
if report_validity != DataTransferIngestReportStatus::Valid {
write_invalid_report(invalid_data_session_report_sink, report_validity, report).await?;
continue;
Expand Down Expand Up @@ -80,17 +70,22 @@ pub async fn accumulate_sessions(
}

async fn verify_report(
txn: &mut Transaction<'_, Postgres>,
gateway_client: &GatewayClient,
auth_client: &AuthorizationClient,
report: &DataTransferSessionReq,
) -> DataTransferIngestReportStatus {
if !verify_gateway(gateway_client, &report.data_transfer_usage.pub_key).await {
return DataTransferIngestReportStatus::InvalidGatewayKey;
report: &DataTransferSessionIngestReport,
) -> anyhow::Result<DataTransferIngestReportStatus> {
if is_duplicate(txn, report).await? {
return Ok(DataTransferIngestReportStatus::Duplicate);
}

if !verify_gateway(gateway_client, &report.report.data_transfer_usage.pub_key).await {
return Ok(DataTransferIngestReportStatus::InvalidGatewayKey);
};
if !verify_known_routing_key(auth_client, &report.pub_key).await {
return DataTransferIngestReportStatus::InvalidRoutingKey;
if !verify_known_routing_key(auth_client, &report.report.pub_key).await {
return Ok(DataTransferIngestReportStatus::InvalidRoutingKey);
};
DataTransferIngestReportStatus::Valid
Ok(DataTransferIngestReportStatus::Valid)
}

async fn verify_gateway(gateway_client: &GatewayClient, public_key: &PublicKeyBinary) -> bool {
Expand All @@ -113,6 +108,18 @@ async fn verify_known_routing_key(
}
}

async fn is_duplicate(
txn: &mut Transaction<'_, Postgres>,
report: &DataTransferSessionIngestReport,
) -> anyhow::Result<bool> {
event_ids::is_duplicate(
txn,
report.report.data_transfer_usage.event_id.clone(),
report.received_timestamp,
)
.await
}

async fn write_invalid_report(
invalid_data_session_report_sink: &FileSinkClient,
reason: DataTransferIngestReportStatus,
Expand Down
7 changes: 5 additions & 2 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{burner::Burner, settings::Settings};
use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings};
use anyhow::{bail, Error, Result};
use chrono::{TimeZone, Utc};
use file_store::{
Expand Down Expand Up @@ -173,14 +173,16 @@ impl Cmd {

let daemon = Daemon::new(
settings,
pool,
pool.clone(),
reports,
burner,
gateway_client,
auth_client,
invalid_sessions,
);

let event_id_purger = EventIdPurger::from_settings(pool, settings);

tokio::try_join!(
source_join_handle.map_err(Error::from),
valid_sessions_server.run().map_err(Error::from),
Expand All @@ -189,6 +191,7 @@ impl Cmd {
daemon.run(&shutdown_listener).map_err(Error::from),
conn_handler.map_err(Error::from),
sol_balance_monitor.map_err(Error::from),
event_id_purger.run(shutdown_listener.clone()),
)?;

Ok(())
Expand Down
60 changes: 60 additions & 0 deletions mobile_packet_verifier/src/event_ids.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use chrono::{DateTime, Duration, Utc};
use sqlx::{Pool, Postgres, Transaction};

use crate::settings::Settings;

pub async fn is_duplicate(
txn: &mut Transaction<'_, Postgres>,
event_id: String,
received_timestamp: DateTime<Utc>,
) -> anyhow::Result<bool> {
sqlx::query("INSERT INTO event_ids(event_id, received_timestamp) VALUES($1, $2) ON CONFLICT (event_id) DO NOTHING")
.bind(event_id)
.bind(received_timestamp)
.execute(txn)
.await
.map(|result| result.rows_affected() > 0)
.map_err(anyhow::Error::from)
}

pub struct EventIdPurger {
conn: Pool<Postgres>,
interval: Duration,
max_age: Duration,
}

impl EventIdPurger {
pub fn from_settings(conn: Pool<Postgres>, settings: &Settings) -> Self {
Self {
conn,
interval: settings.purger_interval(),
max_age: settings.purger_max_age(),
}
}

pub async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> {
let mut timer = tokio::time::interval(self.interval.to_std()?);

loop {
tokio::select! {
_ = &mut shutdown => {
return Ok(())
}
_ = timer.tick() => {
purge(&self.conn, self.max_age).await?;
}
}
}
}
}

async fn purge(conn: &Pool<Postgres>, max_age: Duration) -> anyhow::Result<()> {
let timestamp = Utc::now() - max_age;

sqlx::query("DELETE FROM event_ids where received_timestamp < $1")
.bind(timestamp)
.execute(conn)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
}
1 change: 1 addition & 0 deletions mobile_packet_verifier/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod accumulate;
pub mod burner;
pub mod daemon;
pub mod event_ids;
pub mod settings;
22 changes: 21 additions & 1 deletion mobile_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, Duration, TimeZone, Utc};
use config::{Config, ConfigError, Environment, File};
use serde::Deserialize;
use std::path::Path;
Expand All @@ -24,6 +24,18 @@ pub struct Settings {
pub config_client: mobile_config::ClientSettings,
#[serde(default = "default_start_after")]
pub start_after: u64,
#[serde(default = "default_purger_interval_in_hours")]
pub purger_interval_in_hours: u64,
#[serde(default = "default_purger_max_age_in_hours")]
pub purger_max_age_in_hours: u64,
}

pub fn default_purger_interval_in_hours() -> u64 {
1
}

pub fn default_purger_max_age_in_hours() -> u64 {
24
}

pub fn default_start_after() -> u64 {
Expand Down Expand Up @@ -70,4 +82,12 @@ impl Settings {
.single()
.unwrap()
}

pub fn purger_interval(&self) -> Duration {
Duration::hours(self.purger_interval_in_hours as i64)
}

pub fn purger_max_age(&self) -> Duration {
Duration::hours(self.purger_max_age_in_hours as i64)
}
}
2 changes: 2 additions & 0 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ impl Heartbeat {
validity: self.validity as i32,
timestamp: self.timestamp.timestamp() as u64,
coverage_object: Vec::with_capacity(0), // Placeholder so the project compiles
lat: 0.0,
lon: 0.0,
},
&[("validity", self.validity.as_str_name())],
)
Expand Down

0 comments on commit 76cd5ae

Please sign in to comment.