Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make file_store usable outside of oracles #587

Merged
merged 5 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions file_store/src/cli/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde::{ser::SerializeSeq, Serializer};
use std::{
io,
path::{Path, PathBuf},
str::FromStr,
};
use tokio::fs;

Expand Down Expand Up @@ -48,24 +49,24 @@ impl BucketCmd {
}

#[derive(Debug, clap::Args)]
struct FileFilter {
pub struct FileFilter {
/// Optional start time to look for (inclusive). Defaults to the oldest
/// timestamp in the bucket.
#[clap(long)]
after: Option<NaiveDateTime>,
pub after: Option<NaiveDateTime>,
/// Optional end time to look for (exclusive). Defaults to the latest
/// available timestamp in the bucket.
#[clap(long)]
before: Option<NaiveDateTime>,
/// The file type to search for
pub before: Option<NaiveDateTime>,
/// The file type prefix to search for
#[clap(long)]
file_type: FileType,
pub prefix: String,
}

impl FileFilter {
fn list(&self, store: &FileStore) -> FileInfoStream {
pub fn list(&self, store: &FileStore) -> FileInfoStream {
store.list(
self.file_type,
&self.prefix,
self.after.as_ref().map(|dt| Utc.from_utc_datetime(dt)),
self.before.as_ref().map(|dt| Utc.from_utc_datetime(dt)),
)
Expand Down Expand Up @@ -180,12 +181,15 @@ impl Locate {
pub async fn run(&self, settings: &Settings) -> Result {
let store = FileStore::from_settings(settings).await?;
let file_infos = self.filter.list(&store);
let file_type = self.filter.file_type;
let prefix = self.filter.prefix.clone();
let gateway = &self.gateway.clone();
let mut events = store
.source(file_infos)
.map_ok(|buf| (buf, gateway))
.try_filter_map(|(buf, gateway)| async move { locate(file_type, gateway, &buf) })
.try_filter_map(|(buf, gateway)| {
let prefix = prefix.clone();
async move { locate(&prefix, gateway, &buf) }
})
.boxed();
let mut ser = serde_json::Serializer::new(io::stdout());
let mut seq = ser.serialize_seq(None)?;
Expand All @@ -197,13 +201,9 @@ impl Locate {
}
}

fn locate(
file_type: FileType,
gateway: &PublicKey,
buf: &[u8],
) -> Result<Option<serde_json::Value>> {
fn locate(prefix: &str, gateway: &PublicKey, buf: &[u8]) -> Result<Option<serde_json::Value>> {
let pub_key = gateway.to_vec();
match file_type {
match FileType::from_str(prefix)? {
FileType::CellHeartbeat => {
CellHeartbeat::decode(buf).and_then(|event| event.to_value_if(pub_key))
}
Expand Down
10 changes: 5 additions & 5 deletions file_store/src/cli/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use helium_proto::{
EntropyReportV1, Message, PriceReportV1,
};
use serde_json::json;
use std::path::PathBuf;
use std::{path::PathBuf, str::FromStr};

/// Print information about a given store file.
#[derive(Debug, clap::Args)]
Expand All @@ -40,7 +40,7 @@ impl Cmd {
}
};

let first_timestamp = get_timestamp(&file_info.file_type, &buf)?;
let first_timestamp = get_timestamp(&file_info.prefix, &buf)?;
{
let mut last_buf: Option<BytesMut> = None;
while let Some(result) = file_stream.next().await {
Expand All @@ -50,7 +50,7 @@ impl Cmd {
}

let last_timestamp = if let Some(buf) = last_buf {
Some(get_timestamp(&file_info.file_type, &buf)?)
Some(get_timestamp(&file_info.prefix, &buf)?)
} else {
None
};
Expand All @@ -72,8 +72,8 @@ impl MsgTimestamp<Result<DateTime<Utc>>> for PriceReportV1 {
}
}

fn get_timestamp(file_type: &FileType, buf: &[u8]) -> Result<DateTime<Utc>> {
let result = match file_type {
fn get_timestamp(file_type: &str, buf: &[u8]) -> Result<DateTime<Utc>> {
let result = match FileType::from_str(file_type)? {
FileType::CellHeartbeat => CellHeartbeatReqV1::decode(buf)
.map_err(Error::from)
.and_then(|entry| entry.timestamp())?,
Expand Down
19 changes: 15 additions & 4 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{fmt, io, os::unix::fs::MetadataExt, path::Path, str::FromStr};
#[derive(Debug, Clone, Serialize)]
pub struct FileInfo {
pub key: String,
pub file_type: FileType,
pub prefix: String,
pub timestamp: DateTime<Utc>,
pub size: usize,
}
Expand All @@ -24,13 +24,13 @@ impl FromStr for FileInfo {
let cap = RE
.captures(s)
.ok_or_else(|| DecodeError::file_info("failed to decode file info"))?;
let file_type = FileType::from_str(&cap[1])?;
let prefix = cap[1].to_owned();
let timestamp = u64::from_str(&cap[2])
.map_err(|_| DecodeError::file_info("failed to decode timestamp"))?
.to_timestamp_millis()?;
Ok(Self {
key,
file_type,
prefix,
timestamp,
size: 0,
})
Expand Down Expand Up @@ -59,7 +59,18 @@ impl From<(FileType, DateTime<Utc>)> for FileInfo {
fn from(v: (FileType, DateTime<Utc>)) -> Self {
Self {
key: format!("{}.{}.gz", &v.0, v.1.timestamp_millis()),
file_type: v.0,
prefix: v.0.to_string(),
timestamp: v.1,
size: 0,
}
}
}

impl From<(String, DateTime<Utc>)> for FileInfo {
fn from(v: (String, DateTime<Utc>)) -> Self {
Self {
key: format!("{}.{}.gz", &v.0, v.1.timestamp_millis()),
prefix: v.0,
timestamp: v.1,
size: 0,
}
Expand Down
4 changes: 2 additions & 2 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ where
}
_ = cleanup_trigger.tick() => self.clean(&cache).await?,
_ = poll_trigger.tick() => {
let files = self.store.list_all(self.file_type, after, before).await?;
let files = self.store.list_all(self.file_type.to_str(), after, before).await?;
for file in files {
if !is_already_processed(&self.db, &cache, &file).await? {
if send_stream(&sender, &self.store, file.clone()).await? {
Expand Down Expand Up @@ -246,7 +246,7 @@ mod db {
INSERT INTO files_processed(file_name, file_type, file_timestamp, processed_at) VALUES($1, $2, $3, $4)
"#)
.bind(file_info.key)
.bind(file_info.file_type.to_str())
.bind(&file_info.prefix)
.bind(file_info.timestamp)
.bind(Utc::now())
.execute(tx)
Expand Down
3 changes: 2 additions & 1 deletion file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,8 @@ mod tests {
.to_str()
.and_then(|file_name| FileInfo::from_str(file_name).ok())
.map_or(false, |file_info| {
file_info.file_type == FileType::EntropyReport
FileType::from_str(&file_info.prefix).expect("entropy report prefix")
== FileType::EntropyReport
})
}
}
12 changes: 5 additions & 7 deletions file_store/src/file_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
error::DecodeError, BytesMutStream, Error, FileInfo, FileInfoStream, FileType, Result, Settings,
error::DecodeError, BytesMutStream, Error, FileInfo, FileInfoStream, Result, Settings,
};
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::{types::ByteStream, Client, Endpoint, Region};
Expand Down Expand Up @@ -57,27 +57,25 @@ impl FileStore {
})
}

pub async fn list_all<A, B, F>(
pub async fn list_all<A, B>(
&self,
file_type: F,
file_type: &str,
after: A,
before: B,
) -> Result<Vec<FileInfo>>
where
F: Into<FileType> + Copy,
A: Into<Option<DateTime<Utc>>> + Copy,
B: Into<Option<DateTime<Utc>>> + Copy,
{
self.list(file_type, after, before).try_collect().await
}

pub fn list<A, B, F>(&self, file_type: F, after: A, before: B) -> FileInfoStream
pub fn list<A, B>(&self, prefix: &str, after: A, before: B) -> FileInfoStream
where
F: Into<FileType> + Copy,
A: Into<Option<DateTime<Utc>>> + Copy,
B: Into<Option<DateTime<Utc>>> + Copy,
{
let file_type = file_type.into();
let file_type = prefix.to_string();
let before = before.into();
let after = after.into();

Expand Down
1 change: 1 addition & 0 deletions file_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod speedtest;
pub mod traits;

pub use crate::file_store::FileStore;
pub use cli::bucket::FileFilter;
pub use error::{Error, Result};
pub use file_info::{FileInfo, FileType};
pub use file_sink::{FileSink, FileSinkBuilder};
Expand Down
12 changes: 6 additions & 6 deletions iot_verifier/src/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use file_store::{
use futures::{stream, StreamExt};
use helium_crypto::PublicKeyBinary;
use sqlx::PgPool;
use std::{hash::Hasher, ops::DerefMut};
use std::{hash::Hasher, ops::DerefMut, str::FromStr};
use tokio::{
sync::Mutex,
time::{self, MissedTickBehavior},
Expand Down Expand Up @@ -220,7 +220,7 @@ impl Loader {
tracing::info!(
"checking for new ingest files of type {file_type} after {after} and before {before}"
);
let infos = store.list_all(file_type, after, before).await?;
let infos = store.list_all(file_type.to_str(), after, before).await?;
if infos.is_empty() {
tracing::info!("no available ingest files of type {file_type}");
return Ok(());
Expand Down Expand Up @@ -264,7 +264,7 @@ impl Loader {
xor_data: Option<&Mutex<Vec<u64>>>,
xor_filter: Option<&Xor16>,
) -> anyhow::Result<()> {
let file_type = file_info.file_type;
let file_type = file_info.prefix.clone();
let tx = Mutex::new(self.pool.begin().await?);
let metrics = LoaderMetricTracker::new();
store
Expand All @@ -279,7 +279,7 @@ impl Loader {
tracing::warn!("skipping report of type {file_type} due to error {err:?}")
}
Ok(buf) => {
match self.handle_report(file_type, &buf, gateway_cache, xor_data, xor_filter, &metrics).await
match self.handle_report(&file_type, &buf, gateway_cache, xor_data, xor_filter, &metrics).await
{
Ok(Some(bindings)) => inserts.push(bindings),
Ok(None) => (),
Expand All @@ -305,14 +305,14 @@ impl Loader {

async fn handle_report(
&self,
file_type: FileType,
file_type: &str,
buf: &[u8],
gateway_cache: &GatewayCache,
xor_data: Option<&Mutex<Vec<u64>>>,
xor_filter: Option<&Xor16>,
metrics: &LoaderMetricTracker,
) -> anyhow::Result<Option<InsertBindings>> {
match file_type {
match FileType::from_str(file_type)? {
FileType::IotBeaconIngestReport => {
let beacon = IotBeaconIngestReport::decode(buf)?;
tracing::debug!("beacon report from ingestor: {:?}", &beacon);
Expand Down
2 changes: 1 addition & 1 deletion price/src/price_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ async fn process_files(
after: DateTime<Utc>,
) -> Result<Option<DateTime<Utc>>, PriceTrackerError> {
file_store
.list(FileType::PriceReport, after, None)
.list(FileType::PriceReport.to_str(), after, None)
.map_err(PriceTrackerError::from)
.and_then(|file| process_file(file_store, file, sender))
.try_fold(None, |_old, ts| async move { Ok(Some(ts)) })
Expand Down