From da4daf71767379af7e21f1700cbabd988f21dd41 Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Tue, 1 Aug 2023 17:32:08 -0400 Subject: [PATCH 1/5] replace file_type with prefix string --- file_store/src/cli/info.rs | 13 ++++++++----- file_store/src/file_info.rs | 8 ++++---- file_store/src/file_info_poller.rs | 2 +- iot_verifier/src/loader.rs | 10 +++++----- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/file_store/src/cli/info.rs b/file_store/src/cli/info.rs index f9c438037..84a6f4391 100644 --- a/file_store/src/cli/info.rs +++ b/file_store/src/cli/info.rs @@ -17,7 +17,10 @@ use helium_proto::{ EntropyReportV1, Message, PriceReportV1, }; use serde_json::json; -use std::path::PathBuf; +use std::{ + path::PathBuf, + str::FromStr, +}; /// Print information about a given store file. #[derive(Debug, clap::Args)] @@ -40,7 +43,7 @@ impl Cmd { } }; - let first_timestamp = get_timestamp(&file_info.file_type, &buf)?; + let first_timestamp = get_timestamp(&file_info.prefix, &buf)?; { let mut last_buf: Option = None; while let Some(result) = file_stream.next().await { @@ -50,7 +53,7 @@ impl Cmd { } let last_timestamp = if let Some(buf) = last_buf { - Some(get_timestamp(&file_info.file_type, &buf)?) + Some(get_timestamp(&file_info.prefix, &buf)?) } else { None }; @@ -72,8 +75,8 @@ impl MsgTimestamp>> for PriceReportV1 { } } -fn get_timestamp(file_type: &FileType, buf: &[u8]) -> Result> { - let result = match file_type { +fn get_timestamp(file_type: &str, buf: &[u8]) -> Result> { + let result = match FileType::from_str(file_type)? { FileType::CellHeartbeat => CellHeartbeatReqV1::decode(buf) .map_err(Error::from) .and_then(|entry| entry.timestamp())?, diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 8ccc8ef2e..31bb3765f 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -8,7 +8,7 @@ use std::{fmt, io, os::unix::fs::MetadataExt, path::Path, str::FromStr}; #[derive(Debug, Clone, Serialize)] pub struct FileInfo { pub key: String, - pub file_type: FileType, + pub prefix: String, pub timestamp: DateTime, pub size: usize, } @@ -24,13 +24,13 @@ impl FromStr for FileInfo { let cap = RE .captures(s) .ok_or_else(|| DecodeError::file_info("failed to decode file info"))?; - let file_type = FileType::from_str(&cap[1])?; + let prefix = cap[1].to_owned(); let timestamp = u64::from_str(&cap[2]) .map_err(|_| DecodeError::file_info("failed to decode timestamp"))? .to_timestamp_millis()?; Ok(Self { key, - file_type, + prefix, timestamp, size: 0, }) @@ -59,7 +59,7 @@ impl From<(FileType, DateTime)> for FileInfo { fn from(v: (FileType, DateTime)) -> Self { Self { key: format!("{}.{}.gz", &v.0, v.1.timestamp_millis()), - file_type: v.0, + prefix: v.0.to_string(), timestamp: v.1, size: 0, } diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index cfdaa0296..23fff11bf 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -246,7 +246,7 @@ mod db { INSERT INTO files_processed(file_name, file_type, file_timestamp, processed_at) VALUES($1, $2, $3, $4) "#) .bind(file_info.key) - .bind(file_info.file_type.to_str()) + .bind(&file_info.prefix) .bind(file_info.timestamp) .bind(Utc::now()) .execute(tx) diff --git a/iot_verifier/src/loader.rs b/iot_verifier/src/loader.rs index 04ee62584..30ed43216 100644 --- a/iot_verifier/src/loader.rs +++ b/iot_verifier/src/loader.rs @@ -16,7 +16,7 @@ use file_store::{ use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; use sqlx::PgPool; -use std::{hash::Hasher, ops::DerefMut}; +use std::{hash::Hasher, ops::DerefMut, str::FromStr, time::Duration}; use tokio::{ sync::Mutex, time::{self, MissedTickBehavior}, @@ -264,7 +264,7 @@ impl Loader { xor_data: Option<&Mutex>>, xor_filter: Option<&Xor16>, ) -> anyhow::Result<()> { - let file_type = file_info.file_type; + let file_type = file_info.prefix.clone(); let tx = Mutex::new(self.pool.begin().await?); let metrics = LoaderMetricTracker::new(); store @@ -279,7 +279,7 @@ impl Loader { tracing::warn!("skipping report of type {file_type} due to error {err:?}") } Ok(buf) => { - match self.handle_report(file_type, &buf, gateway_cache, xor_data, xor_filter, &metrics).await + match self.handle_report(&file_type, &buf, gateway_cache, xor_data, xor_filter, &metrics).await { Ok(Some(bindings)) => inserts.push(bindings), Ok(None) => (), @@ -305,14 +305,14 @@ impl Loader { async fn handle_report( &self, - file_type: FileType, + file_type: &str, buf: &[u8], gateway_cache: &GatewayCache, xor_data: Option<&Mutex>>, xor_filter: Option<&Xor16>, metrics: &LoaderMetricTracker, ) -> anyhow::Result> { - match file_type { + match FileType::from_str(file_type)? { FileType::IotBeaconIngestReport => { let beacon = IotBeaconIngestReport::decode(buf)?; tracing::debug!("beacon report from ingestor: {:?}", &beacon); From d03e9e325a3ac6ad38e35ebde7138560f7cedbe3 Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Wed, 2 Aug 2023 10:25:41 -0400 Subject: [PATCH 2/5] exposing file filter for outside filestore commands --- file_store/src/cli/bucket.rs | 24 ++++++++++++------------ file_store/src/cli/info.rs | 5 +---- file_store/src/file_info.rs | 11 +++++++++++ file_store/src/file_sink.rs | 3 ++- file_store/src/file_store.rs | 14 +++++++------- file_store/src/lib.rs | 1 + 6 files changed, 34 insertions(+), 24 deletions(-) diff --git a/file_store/src/cli/bucket.rs b/file_store/src/cli/bucket.rs index b603f6fbe..a54a5f8a2 100644 --- a/file_store/src/cli/bucket.rs +++ b/file_store/src/cli/bucket.rs @@ -10,6 +10,7 @@ use serde::{ser::SerializeSeq, Serializer}; use std::{ io, path::{Path, PathBuf}, + str::FromStr, }; use tokio::fs; @@ -48,7 +49,7 @@ impl BucketCmd { } #[derive(Debug, clap::Args)] -struct FileFilter { +pub struct FileFilter { /// Optional start time to look for (inclusive). Defaults to the oldest /// timestamp in the bucket. #[clap(long)] @@ -57,15 +58,15 @@ struct FileFilter { /// available timestamp in the bucket. #[clap(long)] before: Option, - /// The file type to search for + /// The file type prefix to search for #[clap(long)] - file_type: FileType, + prefix: String, } impl FileFilter { fn list(&self, store: &FileStore) -> FileInfoStream { store.list( - self.file_type, + self.prefix.clone(), self.after.as_ref().map(|dt| Utc.from_utc_datetime(dt)), self.before.as_ref().map(|dt| Utc.from_utc_datetime(dt)), ) @@ -180,12 +181,15 @@ impl Locate { pub async fn run(&self, settings: &Settings) -> Result { let store = FileStore::from_settings(settings).await?; let file_infos = self.filter.list(&store); - let file_type = self.filter.file_type; + let prefix = self.filter.prefix.clone(); let gateway = &self.gateway.clone(); let mut events = store .source(file_infos) .map_ok(|buf| (buf, gateway)) - .try_filter_map(|(buf, gateway)| async move { locate(file_type, gateway, &buf) }) + .try_filter_map(|(buf, gateway)| { + let prefix = prefix.clone(); + async move { locate(&prefix, gateway, &buf) } + }) .boxed(); let mut ser = serde_json::Serializer::new(io::stdout()); let mut seq = ser.serialize_seq(None)?; @@ -197,13 +201,9 @@ impl Locate { } } -fn locate( - file_type: FileType, - gateway: &PublicKey, - buf: &[u8], -) -> Result> { +fn locate(prefix: &str, gateway: &PublicKey, buf: &[u8]) -> Result> { let pub_key = gateway.to_vec(); - match file_type { + match FileType::from_str(prefix)? { FileType::CellHeartbeat => { CellHeartbeat::decode(buf).and_then(|event| event.to_value_if(pub_key)) } diff --git a/file_store/src/cli/info.rs b/file_store/src/cli/info.rs index 84a6f4391..ecf8d4769 100644 --- a/file_store/src/cli/info.rs +++ b/file_store/src/cli/info.rs @@ -17,10 +17,7 @@ use helium_proto::{ EntropyReportV1, Message, PriceReportV1, }; use serde_json::json; -use std::{ - path::PathBuf, - str::FromStr, -}; +use std::{path::PathBuf, str::FromStr}; /// Print information about a given store file. #[derive(Debug, clap::Args)] diff --git a/file_store/src/file_info.rs b/file_store/src/file_info.rs index 31bb3765f..20d0c94e5 100644 --- a/file_store/src/file_info.rs +++ b/file_store/src/file_info.rs @@ -66,6 +66,17 @@ impl From<(FileType, DateTime)> for FileInfo { } } +impl From<(String, DateTime)> for FileInfo { + fn from(v: (String, DateTime)) -> Self { + Self { + key: format!("{}.{}.gz", &v.0, v.1.timestamp_millis()), + prefix: v.0, + timestamp: v.1, + size: 0, + } + } +} + impl TryFrom<&aws_sdk_s3::model::Object> for FileInfo { type Error = Error; fn try_from(value: &aws_sdk_s3::model::Object) -> Result { diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 5fd0f7fbf..dfc8de60a 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -638,7 +638,8 @@ mod tests { .to_str() .and_then(|file_name| FileInfo::from_str(file_name).ok()) .map_or(false, |file_info| { - file_info.file_type == FileType::EntropyReport + FileType::from_str(&file_info.prefix).expect("entropy report prefix") + == FileType::EntropyReport }) } } diff --git a/file_store/src/file_store.rs b/file_store/src/file_store.rs index 64a04c837..ad4eafa71 100644 --- a/file_store/src/file_store.rs +++ b/file_store/src/file_store.rs @@ -1,5 +1,5 @@ use crate::{ - error::DecodeError, BytesMutStream, Error, FileInfo, FileInfoStream, FileType, Result, Settings, + error::DecodeError, BytesMutStream, Error, FileInfo, FileInfoStream, Result, Settings, }; use aws_config::meta::region::RegionProviderChain; use aws_sdk_s3::{types::ByteStream, Client, Endpoint, Region}; @@ -57,27 +57,27 @@ impl FileStore { }) } - pub async fn list_all( + pub async fn list_all( &self, - file_type: F, + file_type: P, after: A, before: B, ) -> Result> where - F: Into + Copy, + P: ToString + std::fmt::Display, A: Into>> + Copy, B: Into>> + Copy, { self.list(file_type, after, before).try_collect().await } - pub fn list(&self, file_type: F, after: A, before: B) -> FileInfoStream + pub fn list(&self, prefix: P, after: A, before: B) -> FileInfoStream where - F: Into + Copy, + P: ToString, A: Into>> + Copy, B: Into>> + Copy, { - let file_type = file_type.into(); + let file_type = prefix.to_string(); let before = before.into(); let after = after.into(); diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index 5e5c6dcad..338ef9f68 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -22,6 +22,7 @@ pub mod speedtest; pub mod traits; pub use crate::file_store::FileStore; +pub use cli::bucket::FileFilter; pub use error::{Error, Result}; pub use file_info::{FileInfo, FileType}; pub use file_sink::{FileSink, FileSinkBuilder}; From e6b935b5c0abe3dbc263bbf80cc01d67d6d8789c Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Wed, 2 Aug 2023 11:21:17 -0400 Subject: [PATCH 3/5] publish filter things for extension --- file_store/src/cli/bucket.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/file_store/src/cli/bucket.rs b/file_store/src/cli/bucket.rs index a54a5f8a2..247d41718 100644 --- a/file_store/src/cli/bucket.rs +++ b/file_store/src/cli/bucket.rs @@ -53,18 +53,18 @@ pub struct FileFilter { /// Optional start time to look for (inclusive). Defaults to the oldest /// timestamp in the bucket. #[clap(long)] - after: Option, + pub after: Option, /// Optional end time to look for (exclusive). Defaults to the latest /// available timestamp in the bucket. #[clap(long)] - before: Option, + pub before: Option, /// The file type prefix to search for #[clap(long)] - prefix: String, + pub prefix: String, } impl FileFilter { - fn list(&self, store: &FileStore) -> FileInfoStream { + pub fn list(&self, store: &FileStore) -> FileInfoStream { store.list( self.prefix.clone(), self.after.as_ref().map(|dt| Utc.from_utc_datetime(dt)), From 21a7c809c72e00478fad7e8732cecb54d5018e48 Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Wed, 9 Aug 2023 10:53:24 -0400 Subject: [PATCH 4/5] restrict list inputs to &str --- file_store/src/cli/bucket.rs | 2 +- file_store/src/file_info_poller.rs | 2 +- file_store/src/file_store.rs | 8 +++----- iot_verifier/src/loader.rs | 2 +- price/src/price_tracker.rs | 2 +- 5 files changed, 7 insertions(+), 9 deletions(-) diff --git a/file_store/src/cli/bucket.rs b/file_store/src/cli/bucket.rs index 247d41718..fbb0da4fa 100644 --- a/file_store/src/cli/bucket.rs +++ b/file_store/src/cli/bucket.rs @@ -66,7 +66,7 @@ pub struct FileFilter { impl FileFilter { pub fn list(&self, store: &FileStore) -> FileInfoStream { store.list( - self.prefix.clone(), + &self.prefix, self.after.as_ref().map(|dt| Utc.from_utc_datetime(dt)), self.before.as_ref().map(|dt| Utc.from_utc_datetime(dt)), ) diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index 23fff11bf..889fcdc7e 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -95,7 +95,7 @@ where } _ = cleanup_trigger.tick() => self.clean(&cache).await?, _ = poll_trigger.tick() => { - let files = self.store.list_all(self.file_type, after, before).await?; + let files = self.store.list_all(self.file_type.to_str(), after, before).await?; for file in files { if !is_already_processed(&self.db, &cache, &file).await? { if send_stream(&sender, &self.store, file.clone()).await? { diff --git a/file_store/src/file_store.rs b/file_store/src/file_store.rs index ad4eafa71..a1ca50dfc 100644 --- a/file_store/src/file_store.rs +++ b/file_store/src/file_store.rs @@ -57,23 +57,21 @@ impl FileStore { }) } - pub async fn list_all( + pub async fn list_all( &self, - file_type: P, + file_type: &str, after: A, before: B, ) -> Result> where - P: ToString + std::fmt::Display, A: Into>> + Copy, B: Into>> + Copy, { self.list(file_type, after, before).try_collect().await } - pub fn list(&self, prefix: P, after: A, before: B) -> FileInfoStream + pub fn list(&self, prefix: &str, after: A, before: B) -> FileInfoStream where - P: ToString, A: Into>> + Copy, B: Into>> + Copy, { diff --git a/iot_verifier/src/loader.rs b/iot_verifier/src/loader.rs index 30ed43216..8fed0a73b 100644 --- a/iot_verifier/src/loader.rs +++ b/iot_verifier/src/loader.rs @@ -220,7 +220,7 @@ impl Loader { tracing::info!( "checking for new ingest files of type {file_type} after {after} and before {before}" ); - let infos = store.list_all(file_type, after, before).await?; + let infos = store.list_all(file_type.to_str(), after, before).await?; if infos.is_empty() { tracing::info!("no available ingest files of type {file_type}"); return Ok(()); diff --git a/price/src/price_tracker.rs b/price/src/price_tracker.rs index ccc00f331..df64f4b06 100644 --- a/price/src/price_tracker.rs +++ b/price/src/price_tracker.rs @@ -183,7 +183,7 @@ async fn process_files( after: DateTime, ) -> Result>, PriceTrackerError> { file_store - .list(FileType::PriceReport, after, None) + .list(FileType::PriceReport.to_str(), after, None) .map_err(PriceTrackerError::from) .and_then(|file| process_file(file_store, file, sender)) .try_fold(None, |_old, ts| async move { Ok(Some(ts)) }) From 97dfb6f291d6aad806ddc63120cf08932016d712 Mon Sep 17 00:00:00 2001 From: jeffgrunewald Date: Wed, 9 Aug 2023 11:15:19 -0400 Subject: [PATCH 5/5] remove unused import --- iot_verifier/src/loader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iot_verifier/src/loader.rs b/iot_verifier/src/loader.rs index 8fed0a73b..9652309a7 100644 --- a/iot_verifier/src/loader.rs +++ b/iot_verifier/src/loader.rs @@ -16,7 +16,7 @@ use file_store::{ use futures::{stream, StreamExt}; use helium_crypto::PublicKeyBinary; use sqlx::PgPool; -use std::{hash::Hasher, ops::DerefMut, str::FromStr, time::Duration}; +use std::{hash::Hasher, ops::DerefMut, str::FromStr}; use tokio::{ sync::Mutex, time::{self, MissedTickBehavior},