Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deduping to mobile packet verifier #585

Merged
merged 3 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ sqlx = {version = "0", features = [
]}

helium-crypto = {version = "0.6.8", features=["sqlx-postgres", "multisig"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "bbalser/invalid-data-session-duplicate", features = ["services"]}
hextree = "*"
solana-client = "1.14"
solana-sdk = "1.14"
solana-program = "1.11"
spl-token = "3.5.0"
reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]}
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "bbalser/invalid-data-session-duplicate" }
humantime = "2"
metrics = "0"
metrics-exporter-prometheus = "0"
Expand Down
4 changes: 4 additions & 0 deletions mobile_packet_verifier/migrations/4_event_ids.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE event_ids (
event_id TEXT NOT NULL PRIMARY KEY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these event_ids constituted from? are we pretty sure they'll be unique for the life of a record in this database without aggregating to some other piece of data like a hotspot pubkey or cbsd_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are unique according to Dmytro.

received_timestamp TIMESTAMPTZ NOT NULL
);
51 changes: 29 additions & 22 deletions mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{DateTime, Utc};
use file_store::file_sink::FileSinkClient;
use file_store::mobile_session::{
DataTransferSessionIngestReport, DataTransferSessionReq, InvalidDataTransferIngestReport,
DataTransferSessionIngestReport, InvalidDataTransferIngestReport,
};
use futures::{Stream, StreamExt};
use helium_crypto::PublicKeyBinary;
Expand All @@ -11,22 +11,12 @@ use helium_proto::services::poc_mobile::{
InvalidDataTransferIngestReportV1,
};
use mobile_config::{
client::{AuthorizationClient, ClientError, GatewayClient},
client::{AuthorizationClient, GatewayClient},
gateway_info::GatewayInfoResolver,
};
use sqlx::{Postgres, Transaction};

#[derive(thiserror::Error, Debug)]
pub enum AccumulationError {
#[error("file store error: {0}")]
FileStoreError(#[from] file_store::Error),
#[error("sqlx error: {0}")]
SqlxError(#[from] sqlx::Error),
#[error("reports stream dropped")]
ReportsStreamDropped,
#[error("config client error: {0}")]
ConfigClientError(#[from] ClientError),
}
use crate::event_ids;

pub async fn accumulate_sessions(
gateway_client: &GatewayClient,
Expand All @@ -35,7 +25,7 @@ pub async fn accumulate_sessions(
invalid_data_session_report_sink: &FileSinkClient,
curr_file_ts: DateTime<Utc>,
reports: impl Stream<Item = DataTransferSessionIngestReport>,
) -> Result<(), AccumulationError> {
) -> anyhow::Result<()> {
tokio::pin!(reports);

while let Some(report) = reports.next().await {
Expand All @@ -51,7 +41,7 @@ pub async fn accumulate_sessions(
continue;
}

let report_validity = verify_report(gateway_client, auth_client, &report.report).await;
let report_validity = verify_report(conn, gateway_client, auth_client, &report).await?;
if report_validity != DataTransferIngestReportStatus::Valid {
write_invalid_report(invalid_data_session_report_sink, report_validity, report).await?;
continue;
Expand Down Expand Up @@ -80,17 +70,22 @@ pub async fn accumulate_sessions(
}

async fn verify_report(
txn: &mut Transaction<'_, Postgres>,
gateway_client: &GatewayClient,
auth_client: &AuthorizationClient,
report: &DataTransferSessionReq,
) -> DataTransferIngestReportStatus {
if !verify_gateway(gateway_client, &report.data_transfer_usage.pub_key).await {
return DataTransferIngestReportStatus::InvalidGatewayKey;
report: &DataTransferSessionIngestReport,
) -> anyhow::Result<DataTransferIngestReportStatus> {
if is_duplicate(txn, report).await? {
return Ok(DataTransferIngestReportStatus::Duplicate);
}

if !verify_gateway(gateway_client, &report.report.data_transfer_usage.pub_key).await {
return Ok(DataTransferIngestReportStatus::InvalidGatewayKey);
};
if !verify_known_routing_key(auth_client, &report.pub_key).await {
return DataTransferIngestReportStatus::InvalidRoutingKey;
if !verify_known_routing_key(auth_client, &report.report.pub_key).await {
return Ok(DataTransferIngestReportStatus::InvalidRoutingKey);
};
DataTransferIngestReportStatus::Valid
Ok(DataTransferIngestReportStatus::Valid)
}

async fn verify_gateway(gateway_client: &GatewayClient, public_key: &PublicKeyBinary) -> bool {
Expand All @@ -113,6 +108,18 @@ async fn verify_known_routing_key(
}
}

async fn is_duplicate(
txn: &mut Transaction<'_, Postgres>,
report: &DataTransferSessionIngestReport,
) -> anyhow::Result<bool> {
event_ids::is_duplicate(
txn,
report.report.data_transfer_usage.event_id.clone(),
report.received_timestamp,
)
.await
}

async fn write_invalid_report(
invalid_data_session_report_sink: &FileSinkClient,
reason: DataTransferIngestReportStatus,
Expand Down
7 changes: 5 additions & 2 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{burner::Burner, settings::Settings};
use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings};
use anyhow::{bail, Error, Result};
use chrono::{TimeZone, Utc};
use file_store::{
Expand Down Expand Up @@ -173,14 +173,16 @@ impl Cmd {

let daemon = Daemon::new(
settings,
pool,
pool.clone(),
reports,
burner,
gateway_client,
auth_client,
invalid_sessions,
);

let event_id_purger = EventIdPurger::from_settings(pool, settings);

tokio::try_join!(
source_join_handle.map_err(Error::from),
valid_sessions_server.run().map_err(Error::from),
Expand All @@ -189,6 +191,7 @@ impl Cmd {
daemon.run(&shutdown_listener).map_err(Error::from),
conn_handler.map_err(Error::from),
sol_balance_monitor.map_err(Error::from),
event_id_purger.run(shutdown_listener.clone()),
)?;

Ok(())
Expand Down
60 changes: 60 additions & 0 deletions mobile_packet_verifier/src/event_ids.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use chrono::{DateTime, Duration, Utc};
use sqlx::{Pool, Postgres, Transaction};

use crate::settings::Settings;

pub async fn is_duplicate(
txn: &mut Transaction<'_, Postgres>,
event_id: String,
received_timestamp: DateTime<Utc>,
) -> anyhow::Result<bool> {
sqlx::query("INSERT INTO event_ids(event_id, received_timestamp) VALUES($1, $2) ON CONFLICT (event_id) DO NOTHING")
.bind(event_id)
.bind(received_timestamp)
.execute(txn)
.await
.map(|result| result.rows_affected() > 0)
.map_err(anyhow::Error::from)
}

pub struct EventIdPurger {
conn: Pool<Postgres>,
interval: Duration,
max_age: Duration,
}

impl EventIdPurger {
pub fn from_settings(conn: Pool<Postgres>, settings: &Settings) -> Self {
Self {
conn,
interval: settings.purger_interval(),
max_age: settings.purger_max_age(),
}
}

pub async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> {
let mut timer = tokio::time::interval(self.interval.to_std()?);

loop {
tokio::select! {
_ = &mut shutdown => {
return Ok(())
}
_ = timer.tick() => {
purge(&self.conn, self.max_age).await?;
}
}
}
}
}

async fn purge(conn: &Pool<Postgres>, max_age: Duration) -> anyhow::Result<()> {
let timestamp = Utc::now() - max_age;

sqlx::query("DELETE FROM event_ids where received_timestamp < $1")
.bind(timestamp)
.execute(conn)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
}
1 change: 1 addition & 0 deletions mobile_packet_verifier/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod accumulate;
pub mod burner;
pub mod daemon;
pub mod event_ids;
pub mod settings;
22 changes: 21 additions & 1 deletion mobile_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, Duration, TimeZone, Utc};
use config::{Config, ConfigError, Environment, File};
use serde::Deserialize;
use std::path::Path;
Expand All @@ -24,6 +24,18 @@ pub struct Settings {
pub config_client: mobile_config::ClientSettings,
#[serde(default = "default_start_after")]
pub start_after: u64,
#[serde(default = "default_purger_interval_in_hours")]
pub purger_interval_in_hours: u64,
#[serde(default = "default_purger_max_age_in_hours")]
pub purger_max_age_in_hours: u64,
}

pub fn default_purger_interval_in_hours() -> u64 {
1
}

pub fn default_purger_max_age_in_hours() -> u64 {
24
}

pub fn default_start_after() -> u64 {
Expand Down Expand Up @@ -70,4 +82,12 @@ impl Settings {
.single()
.unwrap()
}

pub fn purger_interval(&self) -> Duration {
Duration::hours(self.purger_interval_in_hours as i64)
}

pub fn purger_max_age(&self) -> Duration {
Duration::hours(self.purger_max_age_in_hours as i64)
}
}
2 changes: 2 additions & 0 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ impl Heartbeat {
validity: self.validity as i32,
timestamp: self.timestamp.timestamp() as u64,
coverage_object: Vec::with_capacity(0), // Placeholder so the project compiles
lat: 0.0,
lon: 0.0,
},
&[("validity", self.validity.as_str_name())],
)
Expand Down