-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add deduping to mobile packet verifier #585
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
CREATE TABLE event_ids ( | ||
event_id TEXT NOT NULL PRIMARY KEY, | ||
received_timestamp TIMESTAMPTZ NOT NULL | ||
); |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,7 +1,7 @@ | ||||||
use chrono::{DateTime, Utc}; | ||||||
use file_store::file_sink::FileSinkClient; | ||||||
use file_store::mobile_session::{ | ||||||
DataTransferSessionIngestReport, DataTransferSessionReq, InvalidDataTransferIngestReport, | ||||||
DataTransferSessionIngestReport, InvalidDataTransferIngestReport, | ||||||
}; | ||||||
use futures::{Stream, StreamExt}; | ||||||
use helium_crypto::PublicKeyBinary; | ||||||
|
@@ -11,22 +11,12 @@ use helium_proto::services::poc_mobile::{ | |||||
InvalidDataTransferIngestReportV1, | ||||||
}; | ||||||
use mobile_config::{ | ||||||
client::{AuthorizationClient, ClientError, GatewayClient}, | ||||||
client::{AuthorizationClient, GatewayClient}, | ||||||
gateway_info::GatewayInfoResolver, | ||||||
}; | ||||||
use sqlx::{Postgres, Transaction}; | ||||||
|
||||||
#[derive(thiserror::Error, Debug)] | ||||||
pub enum AccumulationError { | ||||||
#[error("file store error: {0}")] | ||||||
FileStoreError(#[from] file_store::Error), | ||||||
#[error("sqlx error: {0}")] | ||||||
SqlxError(#[from] sqlx::Error), | ||||||
#[error("reports stream dropped")] | ||||||
ReportsStreamDropped, | ||||||
#[error("config client error: {0}")] | ||||||
ConfigClientError(#[from] ClientError), | ||||||
} | ||||||
use crate::event_ids; | ||||||
|
||||||
pub async fn accumulate_sessions( | ||||||
gateway_client: &GatewayClient, | ||||||
|
@@ -35,7 +25,7 @@ pub async fn accumulate_sessions( | |||||
invalid_data_session_report_sink: &FileSinkClient, | ||||||
curr_file_ts: DateTime<Utc>, | ||||||
reports: impl Stream<Item = DataTransferSessionIngestReport>, | ||||||
) -> Result<(), AccumulationError> { | ||||||
) -> anyhow::Result<()> { | ||||||
tokio::pin!(reports); | ||||||
|
||||||
while let Some(report) = reports.next().await { | ||||||
|
@@ -51,7 +41,7 @@ pub async fn accumulate_sessions( | |||||
continue; | ||||||
} | ||||||
|
||||||
let report_validity = verify_report(gateway_client, auth_client, &report.report).await; | ||||||
let report_validity = verify_report(conn, gateway_client, auth_client, &report).await?; | ||||||
if report_validity != DataTransferIngestReportStatus::Valid { | ||||||
write_invalid_report(invalid_data_session_report_sink, report_validity, report).await?; | ||||||
continue; | ||||||
|
@@ -80,17 +70,29 @@ pub async fn accumulate_sessions( | |||||
} | ||||||
|
||||||
async fn verify_report( | ||||||
tx: &mut Transaction<'_, Postgres>, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
tx reads like a message channel sender |
||||||
gateway_client: &GatewayClient, | ||||||
auth_client: &AuthorizationClient, | ||||||
report: &DataTransferSessionReq, | ||||||
) -> DataTransferIngestReportStatus { | ||||||
if !verify_gateway(gateway_client, &report.data_transfer_usage.pub_key).await { | ||||||
return DataTransferIngestReportStatus::InvalidGatewayKey; | ||||||
report: &DataTransferSessionIngestReport, | ||||||
) -> anyhow::Result<DataTransferIngestReportStatus> { | ||||||
if event_ids::is_duplicate(tx, report.report.data_transfer_usage.event_id.clone()).await? { | ||||||
return Ok(DataTransferIngestReportStatus::Duplicate); | ||||||
} else { | ||||||
event_ids::record( | ||||||
tx, | ||||||
report.report.data_transfer_usage.event_id.clone(), | ||||||
report.received_timestamp, | ||||||
) | ||||||
.await?; | ||||||
} | ||||||
|
||||||
if !verify_gateway(gateway_client, &report.report.data_transfer_usage.pub_key).await { | ||||||
return Ok(DataTransferIngestReportStatus::InvalidGatewayKey); | ||||||
}; | ||||||
if !verify_known_routing_key(auth_client, &report.pub_key).await { | ||||||
return DataTransferIngestReportStatus::InvalidRoutingKey; | ||||||
if !verify_known_routing_key(auth_client, &report.report.pub_key).await { | ||||||
return Ok(DataTransferIngestReportStatus::InvalidRoutingKey); | ||||||
}; | ||||||
DataTransferIngestReportStatus::Valid | ||||||
Ok(DataTransferIngestReportStatus::Valid) | ||||||
} | ||||||
|
||||||
async fn verify_gateway(gateway_client: &GatewayClient, public_key: &PublicKeyBinary) -> bool { | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,73 @@ | ||||||
use chrono::{DateTime, Duration, Utc}; | ||||||
use sqlx::{Pool, Postgres, Transaction}; | ||||||
|
||||||
use crate::settings::Settings; | ||||||
|
||||||
pub async fn is_duplicate( | ||||||
tx: &mut Transaction<'_, Postgres>, | ||||||
event_id: String, | ||||||
) -> anyhow::Result<bool> { | ||||||
let result = | ||||||
sqlx::query_scalar::<_, bool>("SELECT EXISTS(SELECT 1 FROM event_ids WHERE event_id = $1)") | ||||||
.bind(event_id.clone()) | ||||||
.fetch_one(&mut *tx) | ||||||
.await?; | ||||||
|
||||||
Ok(result) | ||||||
} | ||||||
|
||||||
pub async fn record( | ||||||
tx: &mut Transaction<'_, Postgres>, | ||||||
event_id: String, | ||||||
received_timestamp: DateTime<Utc>, | ||||||
) -> anyhow::Result<()> { | ||||||
sqlx::query("INSERT INTO event_ids(event_id, received_timestamp) VALUES($1,$2)") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed 5c66469 |
||||||
.bind(event_id) | ||||||
.bind(received_timestamp) | ||||||
.execute(tx) | ||||||
.await | ||||||
.map(|_| ()) | ||||||
.map_err(anyhow::Error::from) | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. worth the speed boost to make this an upsert statement returning whether or not the insert happened and then do it in a single query? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i've done this in a different app which attempts to do an upsert and then returns the boolean if the row was inserted or not
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @jeffgrunewald here, this should be an upsert There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is would the query would look like: let is_duplicate = sqlx::query(
r#"
INSERT INTO event_ids (event_id, received_timestamp) VALUES ($1, $2)
ON CONFLICT DO NOTHING
"#
)
.bind(event_id)
.bind(received_timestamp)
.execute(tx)
.await?
.rows_affected() > 0; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed, upsert is better 5c66469 |
||||||
|
||||||
pub struct EventIdPurger { | ||||||
conn: Pool<Postgres>, | ||||||
interval: Duration, | ||||||
max_age: Duration, | ||||||
} | ||||||
|
||||||
impl EventIdPurger { | ||||||
pub fn from_settings(conn: Pool<Postgres>, settings: &Settings) -> Self { | ||||||
Self { | ||||||
conn, | ||||||
interval: settings.purger_interval(), | ||||||
max_age: settings.purger_max_age(), | ||||||
} | ||||||
} | ||||||
|
||||||
pub async fn run(self, mut shutdown: triggered::Listener) -> anyhow::Result<()> { | ||||||
let mut timer = tokio::time::interval(self.interval.to_std()?); | ||||||
|
||||||
loop { | ||||||
tokio::select! { | ||||||
_ = &mut shutdown => { | ||||||
return Ok(()) | ||||||
} | ||||||
_ = timer.tick() => { | ||||||
purge(&self.conn, self.max_age).await?; | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
async fn purge(conn: &Pool<Postgres>, max_age: Duration) -> anyhow::Result<()> { | ||||||
let timestamp = Utc::now() - max_age; | ||||||
|
||||||
sqlx::query("DELETE FROM event_ids where received_timestamp < $1") | ||||||
.bind(timestamp) | ||||||
.execute(conn) | ||||||
.await | ||||||
.map(|_| ()) | ||||||
.map_err(anyhow::Error::from) | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
pub mod accumulate; | ||
pub mod burner; | ||
pub mod daemon; | ||
pub mod event_ids; | ||
pub mod settings; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what are these event_ids constituted from? are we pretty sure they'll be unique for the life of a record in this database without aggregating to some other piece of data like a hotspot pubkey or cbsd_id?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are unique according to Dmytro.