Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deduping to mobile packet verifier #585

Merged
merged 3 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ sqlx = {version = "0", features = [
]}

helium-crypto = {version = "0.6.8", features=["sqlx-postgres", "multisig"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "bbalser/invalid-data-session-duplicate", features = ["services"]}
hextree = "*"
solana-client = "1.14"
solana-sdk = "1.14"
solana-program = "1.11"
spl-token = "3.5.0"
reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]}
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "bbalser/invalid-data-session-duplicate" }
humantime = "2"
metrics = "0"
metrics-exporter-prometheus = "0"
Expand Down
4 changes: 4 additions & 0 deletions mobile_packet_verifier/migrations/4_event_ids.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE event_ids (
event_id TEXT NOT NULL PRIMARY KEY,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these event_ids constituted from? are we pretty sure they'll be unique for the life of a record in this database without aggregating to some other piece of data like a hotspot pubkey or cbsd_id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are unique according to Dmytro.

received_timestamp TIMESTAMPTZ NOT NULL
);
46 changes: 24 additions & 22 deletions mobile_packet_verifier/src/accumulate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use chrono::{DateTime, Utc};
use file_store::file_sink::FileSinkClient;
use file_store::mobile_session::{
DataTransferSessionIngestReport, DataTransferSessionReq, InvalidDataTransferIngestReport,
DataTransferSessionIngestReport, InvalidDataTransferIngestReport,
};
use futures::{Stream, StreamExt};
use helium_crypto::PublicKeyBinary;
Expand All @@ -11,22 +11,12 @@ use helium_proto::services::poc_mobile::{
InvalidDataTransferIngestReportV1,
};
use mobile_config::{
client::{AuthorizationClient, ClientError, GatewayClient},
client::{AuthorizationClient, GatewayClient},
gateway_info::GatewayInfoResolver,
};
use sqlx::{Postgres, Transaction};

#[derive(thiserror::Error, Debug)]
pub enum AccumulationError {
#[error("file store error: {0}")]
FileStoreError(#[from] file_store::Error),
#[error("sqlx error: {0}")]
SqlxError(#[from] sqlx::Error),
#[error("reports stream dropped")]
ReportsStreamDropped,
#[error("config client error: {0}")]
ConfigClientError(#[from] ClientError),
}
use crate::event_ids;

pub async fn accumulate_sessions(
gateway_client: &GatewayClient,
Expand All @@ -35,7 +25,7 @@ pub async fn accumulate_sessions(
invalid_data_session_report_sink: &FileSinkClient,
curr_file_ts: DateTime<Utc>,
reports: impl Stream<Item = DataTransferSessionIngestReport>,
) -> Result<(), AccumulationError> {
) -> anyhow::Result<()> {
tokio::pin!(reports);

while let Some(report) = reports.next().await {
Expand All @@ -51,7 +41,7 @@ pub async fn accumulate_sessions(
continue;
}

let report_validity = verify_report(gateway_client, auth_client, &report.report).await;
let report_validity = verify_report(conn, gateway_client, auth_client, &report).await?;
if report_validity != DataTransferIngestReportStatus::Valid {
write_invalid_report(invalid_data_session_report_sink, report_validity, report).await?;
continue;
Expand Down Expand Up @@ -80,17 +70,29 @@ pub async fn accumulate_sessions(
}

async fn verify_report(
tx: &mut Transaction<'_, Postgres>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tx: &mut Transaction<'_, Postgres>,
txn: &mut Transaction<'_, Postgres>,

tx reads like a message channel sender

gateway_client: &GatewayClient,
auth_client: &AuthorizationClient,
report: &DataTransferSessionReq,
) -> DataTransferIngestReportStatus {
if !verify_gateway(gateway_client, &report.data_transfer_usage.pub_key).await {
return DataTransferIngestReportStatus::InvalidGatewayKey;
report: &DataTransferSessionIngestReport,
) -> anyhow::Result<DataTransferIngestReportStatus> {
if event_ids::is_duplicate(tx, report.report.data_transfer_usage.event_id.clone()).await? {
return Ok(DataTransferIngestReportStatus::Duplicate);
} else {
event_ids::record(
tx,
report.report.data_transfer_usage.event_id.clone(),
report.received_timestamp,
)
.await?;
}

if !verify_gateway(gateway_client, &report.report.data_transfer_usage.pub_key).await {
return Ok(DataTransferIngestReportStatus::InvalidGatewayKey);
};
if !verify_known_routing_key(auth_client, &report.pub_key).await {
return DataTransferIngestReportStatus::InvalidRoutingKey;
if !verify_known_routing_key(auth_client, &report.report.pub_key).await {
return Ok(DataTransferIngestReportStatus::InvalidRoutingKey);
};
DataTransferIngestReportStatus::Valid
Ok(DataTransferIngestReportStatus::Valid)
}

async fn verify_gateway(gateway_client: &GatewayClient, public_key: &PublicKeyBinary) -> bool {
Expand Down
7 changes: 5 additions & 2 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{burner::Burner, settings::Settings};
use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings};
use anyhow::{bail, Error, Result};
use chrono::{TimeZone, Utc};
use file_store::{
Expand Down Expand Up @@ -173,14 +173,16 @@ impl Cmd {

let daemon = Daemon::new(
settings,
pool,
pool.clone(),
reports,
burner,
gateway_client,
auth_client,
invalid_sessions,
);

let event_id_purger = EventIdPurger::from_settings(pool, &settings);

tokio::try_join!(
source_join_handle.map_err(Error::from),
valid_sessions_server.run().map_err(Error::from),
Expand All @@ -189,6 +191,7 @@ impl Cmd {
daemon.run(&shutdown_listener).map_err(Error::from),
conn_handler.map_err(Error::from),
sol_balance_monitor.map_err(Error::from),
event_id_purger.run(shutdown_listener.clone()),
)?;

Ok(())
Expand Down
73 changes: 73 additions & 0 deletions mobile_packet_verifier/src/event_ids.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use chrono::{DateTime, Duration, Utc};
use sqlx::{Pool, Postgres, Transaction};

use crate::settings::Settings;

pub async fn is_duplicate(
tx: &mut Transaction<'_, Postgres>,
event_id: String,
) -> anyhow::Result<bool> {
let result =
sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM event_ids WHERE event_id = $1)")
.bind(event_id.clone())
.fetch_one(&mut *tx)
.await?;

Ok(result)
}

pub async fn record(
tx: &mut Transaction<'_, Postgres>,
event_id: String,
received_timestamp: DateTime<Utc>,
) -> anyhow::Result<()> {
sqlx::query("INSERT INTO event_ids(event_id, received_timestamp) VALUES($1,$2)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sqlx::query("INSERT INTO event_ids(event_id, received_timestamp) VALUES($1,$2)")
sqlx::query("INSERT INTO event_ids(event_id, received_timestamp) VALUES ($1, $2)")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed 5c66469

.bind(event_id)
.bind(received_timestamp)
.execute(tx)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth the speed boost to make this an upsert statement returning whether or not the insert happened and then do it in a single query?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've done this in a different app which attempts to do an upsert and then returns the boolean if the row was inserted or not

    query_with_bindings.execute(&mut tx).await?.rows_affected() > 0;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @jeffgrunewald here, this should be an upsert

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is would the query would look like:

let is_duplicate = sqlx::query(
    r#"
    INSERT INTO event_ids (event_id, received_timestamp) VALUES ($1, $2) 
    ON CONFLICT DO NOTHING
    "#
)
.bind(event_id)
.bind(received_timestamp)
.execute(tx)
.await?
.rows_affected() > 0;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, upsert is better 5c66469


pub struct EventIdPurger {
conn: Pool<Postgres>,
interval: Duration,
max_age: Duration,
}

impl EventIdPurger {
pub fn from_settings(conn: Pool<Postgres>, settings: &Settings) -> Self {
Self {
conn,
interval: settings.purger_interval(),
max_age: settings.purger_max_age(),
}
}

pub async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> {
let mut timer = tokio::time::interval(self.interval.to_std()?);

loop {
tokio::select! {
_ = &mut shutdown => {
return Ok(())
}
_ = timer.tick() => {
purge(&self.conn, self.max_age).await?;
}
}
}
}
}

async fn purge(conn: &Pool<Postgres>, max_age: Duration) -> anyhow::Result<()> {
let timestamp = Utc::now() - max_age;

sqlx::query("DELETE FROM event_ids where received_timestamp < $1")
.bind(timestamp)
.execute(conn)
.await
.map(|_| ())
.map_err(anyhow::Error::from)
}
1 change: 1 addition & 0 deletions mobile_packet_verifier/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod accumulate;
pub mod burner;
pub mod daemon;
pub mod event_ids;
pub mod settings;
22 changes: 21 additions & 1 deletion mobile_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{DateTime, TimeZone, Utc};
use chrono::{DateTime, Duration, TimeZone, Utc};
use config::{Config, ConfigError, Environment, File};
use serde::Deserialize;
use std::path::Path;
Expand All @@ -24,6 +24,18 @@ pub struct Settings {
pub config_client: mobile_config::ClientSettings,
#[serde(default = "default_start_after")]
pub start_after: u64,
#[serde(default = "default_purger_interval_in_hours")]
pub purger_interval_in_hours: u64,
#[serde(default = "default_purger_max_age_in_hours")]
pub purger_max_age_in_hours: u64,
}

pub fn default_purger_interval_in_hours() -> u64 {
1
}

pub fn default_purger_max_age_in_hours() -> u64 {
24
}

pub fn default_start_after() -> u64 {
Expand Down Expand Up @@ -70,4 +82,12 @@ impl Settings {
.single()
.unwrap()
}

pub fn purger_interval(&self) -> Duration {
Duration::hours(self.purger_interval_in_hours as i64)
}

pub fn purger_max_age(&self) -> Duration {
Duration::hours(self.purger_max_age_in_hours as i64)
}
}
2 changes: 2 additions & 0 deletions mobile_verifier/src/heartbeats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ impl Heartbeat {
validity: self.validity as i32,
timestamp: self.timestamp.timestamp() as u64,
coverage_object: Vec::with_capacity(0), // Placeholder so the project compiles
lat: 0.0,
lon: 0.0,
},
&[("validity", self.validity.as_str_name())],
)
Expand Down
Loading