diff --git a/Cargo.lock b/Cargo.lock index 88398a676..a19d470b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1118,7 +1118,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "beacon" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#065699f13438ab7aa148df8d5b68efbaa53d6369" +source = "git+https://github.com/helium/proto?branch=master#d86b3e3394e8d9f014fcef3ee08740b3fe269e99" dependencies = [ "base64 0.21.0", "byteorder", @@ -1128,7 +1128,7 @@ dependencies = [ "rand_chacha 0.3.0", "rust_decimal", "serde", - "sha2 0.10.6", + "sha2 0.9.9", "thiserror", ] @@ -2925,7 +2925,7 @@ dependencies = [ [[package]] name = "helium-proto" version = "0.1.0" -source = "git+https://github.com/helium/proto?branch=master#065699f13438ab7aa148df8d5b68efbaa53d6369" +source = "git+https://github.com/helium/proto?branch=master#d86b3e3394e8d9f014fcef3ee08740b3fe269e99" dependencies = [ "bytes", "prost", diff --git a/mobile_packet_verifier/migrations/4_event_ids.sql b/mobile_packet_verifier/migrations/4_event_ids.sql new file mode 100644 index 000000000..1f6415071 --- /dev/null +++ b/mobile_packet_verifier/migrations/4_event_ids.sql @@ -0,0 +1,4 @@ +CREATE TABLE event_ids ( + event_id TEXT NOT NULL PRIMARY KEY, + received_timestamp TIMESTAMPTZ NOT NULL +); \ No newline at end of file diff --git a/mobile_packet_verifier/src/accumulate.rs b/mobile_packet_verifier/src/accumulate.rs index f968973dc..60bf1124b 100644 --- a/mobile_packet_verifier/src/accumulate.rs +++ b/mobile_packet_verifier/src/accumulate.rs @@ -1,7 +1,7 @@ use chrono::{DateTime, Utc}; use file_store::file_sink::FileSinkClient; use file_store::mobile_session::{ - DataTransferSessionIngestReport, DataTransferSessionReq, InvalidDataTransferIngestReport, + DataTransferSessionIngestReport, InvalidDataTransferIngestReport, }; use futures::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; @@ -11,22 +11,12 @@ use helium_proto::services::poc_mobile::{ InvalidDataTransferIngestReportV1, }; use mobile_config::{ - client::{AuthorizationClient, ClientError, GatewayClient}, + client::{AuthorizationClient, GatewayClient}, gateway_info::GatewayInfoResolver, }; use sqlx::{Postgres, Transaction}; -#[derive(thiserror::Error, Debug)] -pub enum AccumulationError { - #[error("file store error: {0}")] - FileStoreError(#[from] file_store::Error), - #[error("sqlx error: {0}")] - SqlxError(#[from] sqlx::Error), - #[error("reports stream dropped")] - ReportsStreamDropped, - #[error("config client error: {0}")] - ConfigClientError(#[from] ClientError), -} +use crate::event_ids; pub async fn accumulate_sessions( gateway_client: &GatewayClient, @@ -35,7 +25,7 @@ pub async fn accumulate_sessions( invalid_data_session_report_sink: &FileSinkClient, curr_file_ts: DateTime, reports: impl Stream, -) -> Result<(), AccumulationError> { +) -> anyhow::Result<()> { tokio::pin!(reports); while let Some(report) = reports.next().await { @@ -51,7 +41,7 @@ pub async fn accumulate_sessions( continue; } - let report_validity = verify_report(gateway_client, auth_client, &report.report).await; + let report_validity = verify_report(conn, gateway_client, auth_client, &report).await?; if report_validity != DataTransferIngestReportStatus::Valid { write_invalid_report(invalid_data_session_report_sink, report_validity, report).await?; continue; @@ -80,17 +70,22 @@ pub async fn accumulate_sessions( } async fn verify_report( + txn: &mut Transaction<'_, Postgres>, gateway_client: &GatewayClient, auth_client: &AuthorizationClient, - report: &DataTransferSessionReq, -) -> DataTransferIngestReportStatus { - if !verify_gateway(gateway_client, &report.data_transfer_usage.pub_key).await { - return DataTransferIngestReportStatus::InvalidGatewayKey; + report: &DataTransferSessionIngestReport, +) -> anyhow::Result { + if is_duplicate(txn, report).await? { + return Ok(DataTransferIngestReportStatus::Duplicate); + } + + if !verify_gateway(gateway_client, &report.report.data_transfer_usage.pub_key).await { + return Ok(DataTransferIngestReportStatus::InvalidGatewayKey); }; - if !verify_known_routing_key(auth_client, &report.pub_key).await { - return DataTransferIngestReportStatus::InvalidRoutingKey; + if !verify_known_routing_key(auth_client, &report.report.pub_key).await { + return Ok(DataTransferIngestReportStatus::InvalidRoutingKey); }; - DataTransferIngestReportStatus::Valid + Ok(DataTransferIngestReportStatus::Valid) } async fn verify_gateway(gateway_client: &GatewayClient, public_key: &PublicKeyBinary) -> bool { @@ -113,6 +108,18 @@ async fn verify_known_routing_key( } } +async fn is_duplicate( + txn: &mut Transaction<'_, Postgres>, + report: &DataTransferSessionIngestReport, +) -> anyhow::Result { + event_ids::is_duplicate( + txn, + report.report.data_transfer_usage.event_id.clone(), + report.received_timestamp, + ) + .await +} + async fn write_invalid_report( invalid_data_session_report_sink: &FileSinkClient, reason: DataTransferIngestReportStatus, diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 22780e854..53742c5da 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -1,4 +1,4 @@ -use crate::{burner::Burner, settings::Settings}; +use crate::{burner::Burner, event_ids::EventIdPurger, settings::Settings}; use anyhow::{bail, Error, Result}; use chrono::{TimeZone, Utc}; use file_store::{ @@ -173,7 +173,7 @@ impl Cmd { let daemon = Daemon::new( settings, - pool, + pool.clone(), reports, burner, gateway_client, @@ -181,6 +181,8 @@ impl Cmd { invalid_sessions, ); + let event_id_purger = EventIdPurger::from_settings(pool, settings); + tokio::try_join!( source_join_handle.map_err(Error::from), valid_sessions_server.run().map_err(Error::from), @@ -189,6 +191,7 @@ impl Cmd { daemon.run(&shutdown_listener).map_err(Error::from), conn_handler.map_err(Error::from), sol_balance_monitor.map_err(Error::from), + event_id_purger.run(shutdown_listener.clone()), )?; Ok(()) diff --git a/mobile_packet_verifier/src/event_ids.rs b/mobile_packet_verifier/src/event_ids.rs new file mode 100644 index 000000000..be2caada0 --- /dev/null +++ b/mobile_packet_verifier/src/event_ids.rs @@ -0,0 +1,60 @@ +use chrono::{DateTime, Duration, Utc}; +use sqlx::{Pool, Postgres, Transaction}; + +use crate::settings::Settings; + +pub async fn is_duplicate( + txn: &mut Transaction<'_, Postgres>, + event_id: String, + received_timestamp: DateTime, +) -> anyhow::Result { + sqlx::query("INSERT INTO event_ids(event_id, received_timestamp) VALUES($1, $2) ON CONFLICT (event_id) DO NOTHING") + .bind(event_id) + .bind(received_timestamp) + .execute(txn) + .await + .map(|result| result.rows_affected() > 0) + .map_err(anyhow::Error::from) +} + +pub struct EventIdPurger { + conn: Pool, + interval: Duration, + max_age: Duration, +} + +impl EventIdPurger { + pub fn from_settings(conn: Pool, settings: &Settings) -> Self { + Self { + conn, + interval: settings.purger_interval(), + max_age: settings.purger_max_age(), + } + } + + pub async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { + let mut timer = tokio::time::interval(self.interval.to_std()?); + + loop { + tokio::select! { + _ = &mut shutdown => { + return Ok(()) + } + _ = timer.tick() => { + purge(&self.conn, self.max_age).await?; + } + } + } + } +} + +async fn purge(conn: &Pool, max_age: Duration) -> anyhow::Result<()> { + let timestamp = Utc::now() - max_age; + + sqlx::query("DELETE FROM event_ids where received_timestamp < $1") + .bind(timestamp) + .execute(conn) + .await + .map(|_| ()) + .map_err(anyhow::Error::from) +} diff --git a/mobile_packet_verifier/src/lib.rs b/mobile_packet_verifier/src/lib.rs index ed66af217..4d6c71332 100644 --- a/mobile_packet_verifier/src/lib.rs +++ b/mobile_packet_verifier/src/lib.rs @@ -1,4 +1,5 @@ pub mod accumulate; pub mod burner; pub mod daemon; +pub mod event_ids; pub mod settings; diff --git a/mobile_packet_verifier/src/settings.rs b/mobile_packet_verifier/src/settings.rs index 0ee241cf4..0ae5e2ee4 100644 --- a/mobile_packet_verifier/src/settings.rs +++ b/mobile_packet_verifier/src/settings.rs @@ -1,4 +1,4 @@ -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, Duration, TimeZone, Utc}; use config::{Config, ConfigError, Environment, File}; use serde::Deserialize; use std::path::Path; @@ -24,6 +24,18 @@ pub struct Settings { pub config_client: mobile_config::ClientSettings, #[serde(default = "default_start_after")] pub start_after: u64, + #[serde(default = "default_purger_interval_in_hours")] + pub purger_interval_in_hours: u64, + #[serde(default = "default_purger_max_age_in_hours")] + pub purger_max_age_in_hours: u64, +} + +pub fn default_purger_interval_in_hours() -> u64 { + 1 +} + +pub fn default_purger_max_age_in_hours() -> u64 { + 24 } pub fn default_start_after() -> u64 { @@ -70,4 +82,12 @@ impl Settings { .single() .unwrap() } + + pub fn purger_interval(&self) -> Duration { + Duration::hours(self.purger_interval_in_hours as i64) + } + + pub fn purger_max_age(&self) -> Duration { + Duration::hours(self.purger_max_age_in_hours as i64) + } } diff --git a/mobile_verifier/src/heartbeats.rs b/mobile_verifier/src/heartbeats.rs index 3bcc89353..01e1fd383 100644 --- a/mobile_verifier/src/heartbeats.rs +++ b/mobile_verifier/src/heartbeats.rs @@ -213,6 +213,8 @@ impl Heartbeat { validity: self.validity as i32, timestamp: self.timestamp.timestamp() as u64, coverage_object: Vec::with_capacity(0), // Placeholder so the project compiles + lat: 0.0, + lon: 0.0, }, &[("validity", self.validity.as_str_name())], )