Skip to content

Commit

Permalink
Merge pull request #587 from helium/jg/file-store-refactor
Browse files Browse the repository at this point in the history
Make file_store usable outside of oracles
  • Loading branch information
jeffgrunewald authored Aug 9, 2023
2 parents fd07b23 + 97dfb6f commit 55d077f
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 41 deletions.
30 changes: 15 additions & 15 deletions file_store/src/cli/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde::{ser::SerializeSeq, Serializer};
use std::{
io,
path::{Path, PathBuf},
str::FromStr,
};
use tokio::fs;

Expand Down Expand Up @@ -48,24 +49,24 @@ impl BucketCmd {
}

#[derive(Debug, clap::Args)]
struct FileFilter {
pub struct FileFilter {
/// Optional start time to look for (inclusive). Defaults to the oldest
/// timestamp in the bucket.
#[clap(long)]
after: Option<NaiveDateTime>,
pub after: Option<NaiveDateTime>,
/// Optional end time to look for (exclusive). Defaults to the latest
/// available timestamp in the bucket.
#[clap(long)]
before: Option<NaiveDateTime>,
/// The file type to search for
pub before: Option<NaiveDateTime>,
/// The file type prefix to search for
#[clap(long)]
file_type: FileType,
pub prefix: String,
}

impl FileFilter {
fn list(&self, store: &FileStore) -> FileInfoStream {
pub fn list(&self, store: &FileStore) -> FileInfoStream {
store.list(
self.file_type,
&self.prefix,
self.after.as_ref().map(|dt| Utc.from_utc_datetime(dt)),
self.before.as_ref().map(|dt| Utc.from_utc_datetime(dt)),
)
Expand Down Expand Up @@ -180,12 +181,15 @@ impl Locate {
pub async fn run(&self, settings: &Settings) -> Result {
let store = FileStore::from_settings(settings).await?;
let file_infos = self.filter.list(&store);
let file_type = self.filter.file_type;
let prefix = self.filter.prefix.clone();
let gateway = &self.gateway.clone();
let mut events = store
.source(file_infos)
.map_ok(|buf| (buf, gateway))
.try_filter_map(|(buf, gateway)| async move { locate(file_type, gateway, &buf) })
.try_filter_map(|(buf, gateway)| {
let prefix = prefix.clone();
async move { locate(&prefix, gateway, &buf) }
})
.boxed();
let mut ser = serde_json::Serializer::new(io::stdout());
let mut seq = ser.serialize_seq(None)?;
Expand All @@ -197,13 +201,9 @@ impl Locate {
}
}

fn locate(
file_type: FileType,
gateway: &PublicKey,
buf: &[u8],
) -> Result<Option<serde_json::Value>> {
fn locate(prefix: &str, gateway: &PublicKey, buf: &[u8]) -> Result<Option<serde_json::Value>> {
let pub_key = gateway.to_vec();
match file_type {
match FileType::from_str(prefix)? {
FileType::CellHeartbeat => {
CellHeartbeat::decode(buf).and_then(|event| event.to_value_if(pub_key))
}
Expand Down
10 changes: 5 additions & 5 deletions file_store/src/cli/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use helium_proto::{
EntropyReportV1, Message, PriceReportV1,
};
use serde_json::json;
use std::path::PathBuf;
use std::{path::PathBuf, str::FromStr};

/// Print information about a given store file.
#[derive(Debug, clap::Args)]
Expand All @@ -40,7 +40,7 @@ impl Cmd {
}
};

let first_timestamp = get_timestamp(&file_info.file_type, &buf)?;
let first_timestamp = get_timestamp(&file_info.prefix, &buf)?;
{
let mut last_buf: Option<BytesMut> = None;
while let Some(result) = file_stream.next().await {
Expand All @@ -50,7 +50,7 @@ impl Cmd {
}

let last_timestamp = if let Some(buf) = last_buf {
Some(get_timestamp(&file_info.file_type, &buf)?)
Some(get_timestamp(&file_info.prefix, &buf)?)
} else {
None
};
Expand All @@ -72,8 +72,8 @@ impl MsgTimestamp<Result<DateTime<Utc>>> for PriceReportV1 {
}
}

fn get_timestamp(file_type: &FileType, buf: &[u8]) -> Result<DateTime<Utc>> {
let result = match file_type {
fn get_timestamp(file_type: &str, buf: &[u8]) -> Result<DateTime<Utc>> {
let result = match FileType::from_str(file_type)? {
FileType::CellHeartbeat => CellHeartbeatReqV1::decode(buf)
.map_err(Error::from)
.and_then(|entry| entry.timestamp())?,
Expand Down
19 changes: 15 additions & 4 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{fmt, io, os::unix::fs::MetadataExt, path::Path, str::FromStr};
#[derive(Debug, Clone, Serialize)]
pub struct FileInfo {
pub key: String,
pub file_type: FileType,
pub prefix: String,
pub timestamp: DateTime<Utc>,
pub size: usize,
}
Expand All @@ -24,13 +24,13 @@ impl FromStr for FileInfo {
let cap = RE
.captures(s)
.ok_or_else(|| DecodeError::file_info("failed to decode file info"))?;
let file_type = FileType::from_str(&cap[1])?;
let prefix = cap[1].to_owned();
let timestamp = u64::from_str(&cap[2])
.map_err(|_| DecodeError::file_info("failed to decode timestamp"))?
.to_timestamp_millis()?;
Ok(Self {
key,
file_type,
prefix,
timestamp,
size: 0,
})
Expand Down Expand Up @@ -59,7 +59,18 @@ impl From<(FileType, DateTime<Utc>)> for FileInfo {
fn from(v: (FileType, DateTime<Utc>)) -> Self {
Self {
key: format!("{}.{}.gz", &v.0, v.1.timestamp_millis()),
file_type: v.0,
prefix: v.0.to_string(),
timestamp: v.1,
size: 0,
}
}
}

impl From<(String, DateTime<Utc>)> for FileInfo {
fn from(v: (String, DateTime<Utc>)) -> Self {
Self {
key: format!("{}.{}.gz", &v.0, v.1.timestamp_millis()),
prefix: v.0,
timestamp: v.1,
size: 0,
}
Expand Down
4 changes: 2 additions & 2 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ where
}
_ = cleanup_trigger.tick() => self.clean(&cache).await?,
_ = poll_trigger.tick() => {
let files = self.store.list_all(self.file_type, after, before).await?;
let files = self.store.list_all(self.file_type.to_str(), after, before).await?;
for file in files {
if !is_already_processed(&self.db, &cache, &file).await? {
if send_stream(&sender, &self.store, file.clone()).await? {
Expand Down Expand Up @@ -246,7 +246,7 @@ mod db {
INSERT INTO files_processed(file_name, file_type, file_timestamp, processed_at) VALUES($1, $2, $3, $4)
"#)
.bind(file_info.key)
.bind(file_info.file_type.to_str())
.bind(&file_info.prefix)
.bind(file_info.timestamp)
.bind(Utc::now())
.execute(tx)
Expand Down
3 changes: 2 additions & 1 deletion file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,8 @@ mod tests {
.to_str()
.and_then(|file_name| FileInfo::from_str(file_name).ok())
.map_or(false, |file_info| {
file_info.file_type == FileType::EntropyReport
FileType::from_str(&file_info.prefix).expect("entropy report prefix")
== FileType::EntropyReport
})
}
}
12 changes: 5 additions & 7 deletions file_store/src/file_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
error::DecodeError, BytesMutStream, Error, FileInfo, FileInfoStream, FileType, Result, Settings,
error::DecodeError, BytesMutStream, Error, FileInfo, FileInfoStream, Result, Settings,
};
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::{types::ByteStream, Client, Endpoint, Region};
Expand Down Expand Up @@ -57,27 +57,25 @@ impl FileStore {
})
}

pub async fn list_all<A, B, F>(
pub async fn list_all<A, B>(
&self,
file_type: F,
file_type: &str,
after: A,
before: B,
) -> Result<Vec<FileInfo>>
where
F: Into<FileType> + Copy,
A: Into<Option<DateTime<Utc>>> + Copy,
B: Into<Option<DateTime<Utc>>> + Copy,
{
self.list(file_type, after, before).try_collect().await
}

pub fn list<A, B, F>(&self, file_type: F, after: A, before: B) -> FileInfoStream
pub fn list<A, B>(&self, prefix: &str, after: A, before: B) -> FileInfoStream
where
F: Into<FileType> + Copy,
A: Into<Option<DateTime<Utc>>> + Copy,
B: Into<Option<DateTime<Utc>>> + Copy,
{
let file_type = file_type.into();
let file_type = prefix.to_string();
let before = before.into();
let after = after.into();

Expand Down
1 change: 1 addition & 0 deletions file_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod speedtest;
pub mod traits;

pub use crate::file_store::FileStore;
pub use cli::bucket::FileFilter;
pub use error::{Error, Result};
pub use file_info::{FileInfo, FileType};
pub use file_sink::{FileSink, FileSinkBuilder};
Expand Down
12 changes: 6 additions & 6 deletions iot_verifier/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use file_store::{
use futures::{stream, StreamExt};
use helium_crypto::PublicKeyBinary;
use sqlx::PgPool;
use std::{hash::Hasher, ops::DerefMut};
use std::{hash::Hasher, ops::DerefMut, str::FromStr};
use tokio::{
sync::Mutex,
time::{self, MissedTickBehavior},
Expand Down Expand Up @@ -220,7 +220,7 @@ impl Loader {
tracing::info!(
"checking for new ingest files of type {file_type} after {after} and before {before}"
);
let infos = store.list_all(file_type, after, before).await?;
let infos = store.list_all(file_type.to_str(), after, before).await?;
if infos.is_empty() {
tracing::info!("no available ingest files of type {file_type}");
return Ok(());
Expand Down Expand Up @@ -264,7 +264,7 @@ impl Loader {
xor_data: Option<&Mutex<Vec<u64>>>,
xor_filter: Option<&Xor16>,
) -> anyhow::Result<()> {
let file_type = file_info.file_type;
let file_type = file_info.prefix.clone();
let tx = Mutex::new(self.pool.begin().await?);
let metrics = LoaderMetricTracker::new();
store
Expand All @@ -279,7 +279,7 @@ impl Loader {
tracing::warn!("skipping report of type {file_type} due to error {err:?}")
}
Ok(buf) => {
match self.handle_report(file_type, &buf, gateway_cache, xor_data, xor_filter, &metrics).await
match self.handle_report(&file_type, &buf, gateway_cache, xor_data, xor_filter, &metrics).await
{
Ok(Some(bindings)) => inserts.push(bindings),
Ok(None) => (),
Expand All @@ -305,14 +305,14 @@ impl Loader {

async fn handle_report(
&self,
file_type: FileType,
file_type: &str,
buf: &[u8],
gateway_cache: &GatewayCache,
xor_data: Option<&Mutex<Vec<u64>>>,
xor_filter: Option<&Xor16>,
metrics: &LoaderMetricTracker,
) -> anyhow::Result<Option<InsertBindings>> {
match file_type {
match FileType::from_str(file_type)? {
FileType::IotBeaconIngestReport => {
let beacon = IotBeaconIngestReport::decode(buf)?;
tracing::debug!("beacon report from ingestor: {:?}", &beacon);
Expand Down
2 changes: 1 addition & 1 deletion price/src/price_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ async fn process_files(
after: DateTime<Utc>,
) -> Result<Option<DateTime<Utc>>, PriceTrackerError> {
file_store
.list(FileType::PriceReport, after, None)
.list(FileType::PriceReport.to_str(), after, None)
.map_err(PriceTrackerError::from)
.and_then(|file| process_file(file_store, file, sender))
.try_fold(None, |_old, ts| async move { Ok(Some(ts)) })
Expand Down

0 comments on commit 55d077f

Please sign in to comment.