Skip to content

Commit

Permalink
Add deduping to mobile packet verifier
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Aug 1, 2023
1 parent 46b9c71 commit c71a88a
Show file tree
Hide file tree
Showing 9 changed files with 135 additions and 30 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ sqlx = {version = "0", features = [
]}

helium-crypto = {version = "0.6.8", features=["sqlx-postgres", "multisig"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "bbalser/invalid-data-session-duplicate", features = ["services"]}
hextree = "*"
solana-client = "1.14"
solana-sdk = "1.14"
solana-program = "1.11"
spl-token = "3.5.0"
reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]}
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "bbalser/invalid-data-session-duplicate" }
humantime = "2"
metrics = "0"
metrics-exporter-prometheus = "0"
Expand Down
4 changes: 4 additions & 0 deletions mobile_packet_verifier/migrations/4_event_ids.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE event_ids (
event_id TEXT NOT NULL PRIMARY KEY,
received_timestamp TIMESTAMPTZ NOT NULL
);
46 changes: 24 additions & 22 deletions mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{DateTime, Utc};
use file_store::file_sink::FileSinkClient;
use file_store::mobile_session::{
DataTransferSessionIngestReport, DataTransferSessionReq, InvalidDataTransferIngestReport,
DataTransferSessionIngestReport, InvalidDataTransferIngestReport,
};
use futures::{Stream, StreamExt};
use helium_crypto::PublicKeyBinary;
Expand All @@ -11,22 +11,12 @@ use helium_proto::services::poc_mobile::{
InvalidDataTransferIngestReportV1,
};
use mobile_config::{
client::{AuthorizationClient, ClientError, GatewayClient},
client::{AuthorizationClient, GatewayClient},
gateway_info::GatewayInfoResolver,
};
use sqlx::{Postgres, Transaction};

#[derive(thiserror::Error, Debug)]
pub enum AccumulationError {
#[error("file store error: {0}")]
FileStoreError(#[from] file_store::Error),
#[error("sqlx error: {0}")]
SqlxError(#[from] sqlx::Error),
#[error("reports stream dropped")]
ReportsStreamDropped,
#[error("config client error: {0}")]
ConfigClientError(#[from] ClientError),
}
use crate::event_ids;

pub async fn accumulate_sessions(
gateway_client: &GatewayClient,
Expand All @@ -35,7 +25,7 @@ pub async fn accumulate_sessions(
invalid_data_session_report_sink: &FileSinkClient,
curr_file_ts: DateTime<Utc>,
reports: impl Stream<Item = DataTransferSessionIngestReport>,
) -> Result<(), AccumulationError> {
) -> anyhow::Result<()> {
tokio::pin!(reports);

while let Some(report) = reports.next().await {
Expand All @@ -51,7 +41,7 @@ pub async fn accumulate_sessions(
continue;
}

let report_validity = verify_report(gateway_client, auth_client, &report.report).await;
let report_validity = verify_report(conn, gateway_client, auth_client, &report).await?;
if report_validity != DataTransferIngestReportStatus::Valid {
write_invalid_report(invalid_data_session_report_sink, report_validity, report).await?;
continue;
Expand Down Expand Up @@ -80,17 +70,29 @@ pub async fn accumulate_sessions(
}

async fn verify_report(
tx: &mut Transaction<'_, Postgres>,
gateway_client: &GatewayClient,
auth_client: &AuthorizationClient,
report: &DataTransferSessionReq,
) -> DataTransferIngestReportStatus {
if !verify_gateway(gateway_client, &report.data_transfer_usage.pub_key).await {
return DataTransferIngestReportStatus::InvalidGatewayKey;
report: &DataTransferSessionIngestReport,
) -> anyhow::Result<DataTransferIngestReportStatus> {
if event_ids::is_duplicate(tx, report.report.data_transfer_usage.event_id.clone()).await? {
return Ok(DataTransferIngestReportStatus::Duplicate);
} else {
event_ids::record(
tx,
report.report.data_transfer_usage.event_id.clone(),
report.received_timestamp,
)
.await?;
}

if !verify_gateway(gateway_client, &report.report.data_transfer_usage.pub_key).await {
return Ok(DataTransferIngestReportStatus::InvalidGatewayKey);
};
if !verify_known_routing_key(auth_client, &report.pub_key).await {
return DataTransferIngestReportStatus::InvalidRoutingKey;
if !verify_known_routing_key(auth_client, &report.report.pub_key).await {
return Ok(DataTransferIngestReportStatus::InvalidRoutingKey);
};
DataTransferIngestReportStatus::Valid
Ok(DataTransferIngestReportStatus::Valid)
}

async fn verify_gateway(gateway_client: &GatewayClient, public_key: &PublicKeyBinary) -> bool {
Expand Down
7 changes: 5 additions & 2 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{burner::Burner, settings::Settings};
use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings};
use anyhow::{bail, Error, Result};
use chrono::{TimeZone, Utc};
use file_store::{
Expand Down Expand Up @@ -173,14 +173,16 @@ impl Cmd {

let daemon = Daemon::new(
settings,
pool,
pool.clone(),
reports,
burner,
gateway_client,
auth_client,
invalid_sessions,
);

let event_id_purger = EventIdPurger::from_settings(pool, &settings);

tokio::try_join!(
source_join_handle.map_err(Error::from),
valid_sessions_server.run().map_err(Error::from),
Expand All @@ -189,6 +191,7 @@ impl Cmd {
daemon.run(&shutdown_listener).map_err(Error::from),
conn_handler.map_err(Error::from),
sol_balance_monitor.map_err(Error::from),
event_id_purger.run(shutdown_listener.clone()),
)?;

Ok(())
Expand Down
73 changes: 73 additions & 0 deletions mobile_packet_verifier/src/event_ids.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use chrono::{DateTime, Duration, Utc};
use sqlx::{Pool, Postgres, Transaction};

use crate::settings::Settings;

pub async fn is_duplicate(
tx: &mut Transaction<'_, Postgres>,
event_id: String,
) -> anyhow::Result<bool> {
let result =
sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM event_ids WHERE event_id = $1)")
.bind(event_id.clone())
.fetch_one(&mut *tx)
.await?;

Ok(result)
}

pub async fn record(
tx: &mut Transaction<'_, Postgres>,
event_id: String,
received_timestamp: DateTime<Utc>,
) -> anyhow::Result<()> {
sqlx::query("INSERT INTO event_ids(event_id, received_timestamp) VALUES($1,$2)")
.bind(event_id)
.bind(received_timestamp)
.execute(tx)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
}

pub struct EventIdPurger {
conn: Pool<Postgres>,
interval: Duration,
max_age: Duration,
}

impl EventIdPurger {
pub fn from_settings(conn: Pool<Postgres>, settings: &Settings) -> Self {
Self {
conn,
interval: settings.purger_interval(),
max_age: settings.purger_max_age(),
}
}

pub async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> {
let mut timer = tokio::time::interval(self.interval.to_std()?);

loop {
tokio::select! {
_ = &mut shutdown => {
return Ok(())
}
_ = timer.tick() => {
purge(&self.conn, self.max_age).await?;
}
}
}
}
}

async fn purge(conn: &Pool<Postgres>, max_age: Duration) -> anyhow::Result<()> {
let timestamp = Utc::now() - max_age;

sqlx::query("DELETE FROM event_ids where received_timestamp < $1")
.bind(timestamp)
.execute(conn)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
}
1 change: 1 addition & 0 deletions mobile_packet_verifier/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod accumulate;
pub mod burner;
pub mod daemon;
pub mod event_ids;
pub mod settings;
22 changes: 21 additions & 1 deletion mobile_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, Duration, TimeZone, Utc};
use config::{Config, ConfigError, Environment, File};
use serde::Deserialize;
use std::path::Path;
Expand All @@ -24,6 +24,18 @@ pub struct Settings {
pub config_client: mobile_config::ClientSettings,
#[serde(default = "default_start_after")]
pub start_after: u64,
#[serde(default = "default_purger_interval_in_hours")]
pub purger_interval_in_hours: u64,
#[serde(default = "default_purger_max_age_in_hours")]
pub purger_max_age_in_hours: u64,
}

pub fn default_purger_interval_in_hours() -> u64 {
1
}

pub fn default_purger_max_age_in_hours() -> u64 {
24
}

pub fn default_start_after() -> u64 {
Expand Down Expand Up @@ -70,4 +82,12 @@ impl Settings {
.single()
.unwrap()
}

pub fn purger_interval(&self) -> Duration {
Duration::hours(self.purger_interval_in_hours as i64)
}

pub fn purger_max_age(&self) -> Duration {
Duration::hours(self.purger_max_age_in_hours as i64)
}
}
2 changes: 2 additions & 0 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ impl Heartbeat {
validity: self.validity as i32,
timestamp: self.timestamp.timestamp() as u64,
coverage_object: Vec::with_capacity(0), // Placeholder so the project compiles
lat: 0.0,
lon: 0.0,
},
&[("validity", self.validity.as_str_name())],
)
Expand Down

0 comments on commit c71a88a

Please sign in to comment.