Skip to content

Commit

Permalink
exposing file filter for outside filestore commands
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffgrunewald committed Aug 4, 2023
1 parent 3d00a78 commit 8da434f
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 24 deletions.
24 changes: 12 additions & 12 deletions file_store/src/cli/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use serde::{ser::SerializeSeq, Serializer};
use std::{
io,
path::{Path, PathBuf},
str::FromStr,
};
use tokio::fs;

Expand Down Expand Up @@ -48,7 +49,7 @@ impl BucketCmd {
}

#[derive(Debug, clap::Args)]
struct FileFilter {
pub struct FileFilter {
/// Optional start time to look for (inclusive). Defaults to the oldest
/// timestamp in the bucket.
#[clap(long)]
Expand All @@ -57,15 +58,15 @@ struct FileFilter {
/// available timestamp in the bucket.
#[clap(long)]
before: Option<NaiveDateTime>,
/// The file type to search for
/// The file type prefix to search for
#[clap(long)]
file_type: FileType,
prefix: String,
}

impl FileFilter {
fn list(&self, store: &FileStore) -> FileInfoStream {
store.list(
self.file_type,
self.prefix.clone(),
self.after.as_ref().map(|dt| Utc.from_utc_datetime(dt)),
self.before.as_ref().map(|dt| Utc.from_utc_datetime(dt)),
)
Expand Down Expand Up @@ -180,12 +181,15 @@ impl Locate {
pub async fn run(&self, settings: &Settings) -> Result {
let store = FileStore::from_settings(settings).await?;
let file_infos = self.filter.list(&store);
let file_type = self.filter.file_type;
let prefix = self.filter.prefix.clone();
let gateway = &self.gateway.clone();
let mut events = store
.source(file_infos)
.map_ok(|buf| (buf, gateway))
.try_filter_map(|(buf, gateway)| async move { locate(file_type, gateway, &buf) })
.try_filter_map(|(buf, gateway)| {
let prefix = prefix.clone();
async move { locate(&prefix, gateway, &buf) }
})
.boxed();
let mut ser = serde_json::Serializer::new(io::stdout());
let mut seq = ser.serialize_seq(None)?;
Expand All @@ -197,13 +201,9 @@ impl Locate {
}
}

fn locate(
file_type: FileType,
gateway: &PublicKey,
buf: &[u8],
) -> Result<Option<serde_json::Value>> {
fn locate(prefix: &str, gateway: &PublicKey, buf: &[u8]) -> Result<Option<serde_json::Value>> {
let pub_key = gateway.to_vec();
match file_type {
match FileType::from_str(prefix)? {
FileType::CellHeartbeat => {
CellHeartbeat::decode(buf).and_then(|event| event.to_value_if(pub_key))
}
Expand Down
5 changes: 1 addition & 4 deletions file_store/src/cli/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ use helium_proto::{
EntropyReportV1, Message, PriceReportV1,
};
use serde_json::json;
use std::{
path::PathBuf,
str::FromStr,
};
use std::{path::PathBuf, str::FromStr};

/// Print information about a given store file.
#[derive(Debug, clap::Args)]
Expand Down
11 changes: 11 additions & 0 deletions file_store/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ impl From<(FileType, DateTime<Utc>)> for FileInfo {
}
}

impl From<(String, DateTime<Utc>)> for FileInfo {
fn from(v: (String, DateTime<Utc>)) -> Self {
Self {
key: format!("{}.{}.gz", &v.0, v.1.timestamp_millis()),
prefix: v.0,
timestamp: v.1,
size: 0,
}
}
}

impl TryFrom<&aws_sdk_s3::model::Object> for FileInfo {
type Error = Error;
fn try_from(value: &aws_sdk_s3::model::Object) -> Result<Self> {
Expand Down
3 changes: 2 additions & 1 deletion file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,8 @@ mod tests {
.to_str()
.and_then(|file_name| FileInfo::from_str(file_name).ok())
.map_or(false, |file_info| {
file_info.file_type == FileType::EntropyReport
FileType::from_str(&file_info.prefix).expect("entropy report prefix")
== FileType::EntropyReport
})
}
}
14 changes: 7 additions & 7 deletions file_store/src/file_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
error::DecodeError, BytesMutStream, Error, FileInfo, FileInfoStream, FileType, Result, Settings,
error::DecodeError, BytesMutStream, Error, FileInfo, FileInfoStream, Result, Settings,
};
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::{types::ByteStream, Client, Endpoint, Region};
Expand Down Expand Up @@ -57,27 +57,27 @@ impl FileStore {
})
}

pub async fn list_all<A, B, F>(
pub async fn list_all<A, B, P>(
&self,
file_type: F,
file_type: P,
after: A,
before: B,
) -> Result<Vec<FileInfo>>
where
F: Into<FileType> + Copy,
P: ToString + std::fmt::Display,
A: Into<Option<DateTime<Utc>>> + Copy,
B: Into<Option<DateTime<Utc>>> + Copy,
{
self.list(file_type, after, before).try_collect().await
}

pub fn list<A, B, F>(&self, file_type: F, after: A, before: B) -> FileInfoStream
pub fn list<A, B, P>(&self, prefix: P, after: A, before: B) -> FileInfoStream
where
F: Into<FileType> + Copy,
P: ToString,
A: Into<Option<DateTime<Utc>>> + Copy,
B: Into<Option<DateTime<Utc>>> + Copy,
{
let file_type = file_type.into();
let file_type = prefix.to_string();
let before = before.into();
let after = after.into();

Expand Down
1 change: 1 addition & 0 deletions file_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod speedtest;
pub mod traits;

pub use crate::file_store::FileStore;
pub use cli::bucket::FileFilter;
pub use error::{Error, Result};
pub use file_info::{FileInfo, FileType};
pub use file_sink::{FileSink, FileSinkBuilder};
Expand Down

0 comments on commit 8da434f

Please sign in to comment.