diff --git a/file_store/src/file_info_poller_tm.rs b/file_store/src/file_info_poller_tm.rs deleted file mode 100644 index 14e4833f1..000000000 --- a/file_store/src/file_info_poller_tm.rs +++ /dev/null @@ -1,297 +0,0 @@ -use crate::{traits::MsgDecode, Error, FileInfo, FileStore, FileType, Result}; -use chrono::{DateTime, Duration, TimeZone, Utc}; -use derive_builder::Builder; -use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt, TryFutureExt}; -use retainer::Cache; -use std::marker::PhantomData; -use task_manager::ManagedTask; -use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender}; - -const DEFAULT_POLL_DURATION_SECS: i64 = 30; -const DEFAULT_POLL_DURATION: std::time::Duration = - std::time::Duration::from_secs(DEFAULT_POLL_DURATION_SECS as u64); -const CLEAN_DURATION: std::time::Duration = std::time::Duration::from_secs(12 * 60 * 60); -const CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(3 * 60 * 60); - -type MemoryFileCache = Cache; - -pub struct FileInfoStream { - pub file_info: FileInfo, - stream: BoxStream<'static, T>, -} - -impl FileInfoStream -where - T: Send, -{ - pub async fn into_stream( - self, - transaction: &mut sqlx::Transaction<'_, sqlx::Postgres>, - ) -> Result> { - db::insert(transaction, self.file_info).await?; - Ok(self.stream) - } -} - -#[derive(Debug, Clone)] -pub enum LookbackBehavior { - StartAfter(DateTime), - Max(Duration), -} - -#[derive(Debug, Clone, Builder)] -#[builder(pattern = "owned")] -pub struct FileInfoPollerConfig { - #[builder(default = "Duration::seconds(DEFAULT_POLL_DURATION_SECS)")] - poll_duration: Duration, - db: sqlx::Pool, - store: FileStore, - file_type: FileType, - lookback: LookbackBehavior, - #[builder(default = "Duration::minutes(10)")] - offset: Duration, - #[builder(default = "20")] - queue_size: usize, - #[builder(setter(skip))] - p: PhantomData, -} - -pub struct FileInfoPollerServer { - config: FileInfoPollerConfig, - sender: Sender>, -} - -impl FileInfoPollerConfigBuilder -where - T: Clone, -{ - pub fn create(self) -> Result<(Receiver>, FileInfoPollerServer)> { - let config = self.build()?; - let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_size); - - Ok((receiver, FileInfoPollerServer { config, sender })) - } -} - -impl ManagedTask for FileInfoPollerServer -where - T: MsgDecode + TryFrom + Send + Sync + 'static, -{ - fn start_task( - self: Box, - shutdown: triggered::Listener, - ) -> LocalBoxFuture<'static, anyhow::Result<()>> { - let handle = tokio::spawn(self.run(shutdown)); - - Box::pin( - handle - .map_err(anyhow::Error::from) - .and_then(|result| async move { result.map_err(anyhow::Error::from) }), - ) - } -} - -impl FileInfoPollerServer -where - T: MsgDecode + TryFrom + Send + Sync + 'static, -{ - async fn run(self, shutdown: triggered::Listener) -> Result { - let cache = create_cache(); - let mut poll_trigger = tokio::time::interval(self.poll_duration()); - let mut cleanup_trigger = tokio::time::interval(CLEAN_DURATION); - - let mut latest_ts = db::latest_ts(&self.config.db, self.config.file_type).await?; - - loop { - let after = self.after(latest_ts); - let before = Utc::now(); - - tokio::select! { - _ = shutdown.clone() => { - tracing::info!("FileInfoPoller shutting down"); - break; - } - _ = cleanup_trigger.tick() => self.clean(&cache).await?, - _ = poll_trigger.tick() => { - let files = self.config.store.list_all(self.config.file_type.to_str(), after, before).await?; - for file in files { - if !is_already_processed(&self.config.db, &cache, &file).await? { - if send_stream(&self.sender, &self.config.store, file.clone()).await? { - latest_ts = Some(file.timestamp); - cache_file(&cache, &file).await; - } else { - tracing::info!("FileInfoPoller: channel full"); - break; - } - } - } - } - } - } - Ok(()) - } - - fn after(&self, latest: Option>) -> DateTime { - let latest_offset = latest.map(|lt| lt - self.config.offset); - match self.config.lookback { - LookbackBehavior::StartAfter(start_after) => latest_offset.unwrap_or(start_after), - LookbackBehavior::Max(max_lookback) => { - let max_ts = Utc::now() - max_lookback; - latest_offset.map(|lt| lt.max(max_ts)).unwrap_or(max_ts) - } - } - } - - async fn clean(&self, cache: &MemoryFileCache) -> Result { - cache.purge(4, 0.25).await; - db::clean(&self.config.db, &self.config.file_type).await?; - Ok(()) - } - - fn poll_duration(&self) -> std::time::Duration { - self.config - .poll_duration - .to_std() - .unwrap_or(DEFAULT_POLL_DURATION) - } -} - -async fn send_stream( - sender: &Sender>, - store: &FileStore, - file: FileInfo, -) -> Result -where - T: MsgDecode + TryFrom + Send + Sync + 'static, -{ - let stream = store - .stream_file(file.clone()) - .await? - .filter_map(|msg| async { - msg.map_err(|err| { - tracing::error!( - "Error streaming entry in file of type {}: {err:?}", - std::any::type_name::() - ); - err - }) - .ok() - }) - .filter_map(|msg| async { - ::decode(msg) - .map_err(|err| { - tracing::error!( - "Error in decoding message of type {}: {err:?}", - std::any::type_name::() - ); - err - }) - .ok() - }) - .boxed(); - - let incoming_data_stream = FileInfoStream { - file_info: file, - stream, - }; - - match sender.try_send(incoming_data_stream) { - Ok(_) => Ok(true), - Err(TrySendError::Full(_)) => Ok(false), - Err(TrySendError::Closed(_)) => Err(Error::channel()), - } -} - -fn create_cache() -> MemoryFileCache { - Cache::new() -} - -async fn is_already_processed( - db: impl sqlx::PgExecutor<'_>, - cache: &MemoryFileCache, - file_info: &FileInfo, -) -> Result { - if cache.get(&file_info.key).await.is_some() { - Ok(true) - } else { - db::exists(db, file_info).await - } -} - -async fn cache_file(cache: &MemoryFileCache, file_info: &FileInfo) { - cache.insert(file_info.key.clone(), true, CACHE_TTL).await; -} - -mod db { - use super::*; - - pub async fn latest_ts( - db: impl sqlx::PgExecutor<'_>, - file_type: FileType, - ) -> Result>> { - let default = Utc.timestamp_opt(0, 0).single().unwrap(); - - let result = sqlx::query_scalar::<_, DateTime>( - r#" - SELECT COALESCE(MAX(file_timestamp), $1) FROM files_processed where file_type = $2 - "#, - ) - .bind(default) - .bind(file_type.to_str()) - .fetch_one(db) - .await?; - - if result == default { - Ok(None) - } else { - Ok(Some(result)) - } - } - - pub async fn exists(db: impl sqlx::PgExecutor<'_>, file_info: &FileInfo) -> Result { - Ok(sqlx::query_scalar::<_, bool>( - r#" - SELECT EXISTS(SELECT 1 from files_processed where file_name = $1) - "#, - ) - .bind(file_info.key.clone()) - .fetch_one(db) - .await?) - } - - pub async fn insert( - tx: &mut sqlx::Transaction<'_, sqlx::Postgres>, - file_info: FileInfo, - ) -> Result { - sqlx::query(r#" - INSERT INTO files_processed(file_name, file_type, file_timestamp, processed_at) VALUES($1, $2, $3, $4) - "#) - .bind(file_info.key) - .bind(&file_info.prefix) - .bind(file_info.timestamp) - .bind(Utc::now()) - .execute(tx) - .await?; - - Ok(()) - } - - pub async fn clean(db: impl sqlx::PgExecutor<'_>, file_type: &FileType) -> Result { - sqlx::query( - r#" - DELETE FROM files_processed where file_name in ( - SELECT file_name - FROM files_processed - WHERE file_type = $1 - ORDER BY file_timestamp DESC - OFFSET 100 - ) - "#, - ) - .bind(file_type.to_str()) - .execute(db) - .await?; - - Ok(()) - } -} diff --git a/file_store/src/file_sink_tm.rs b/file_store/src/file_sink_tm.rs deleted file mode 100644 index 5165a78db..000000000 --- a/file_store/src/file_sink_tm.rs +++ /dev/null @@ -1,650 +0,0 @@ -use crate::{file_upload::FileUpload, Error, Result}; -use async_compression::tokio::write::GzipEncoder; -use bytes::Bytes; -use chrono::{DateTime, Duration, Utc}; -use futures::{future::LocalBoxFuture, SinkExt, TryFutureExt}; -use metrics::Label; -use std::{ - io, mem, - path::{Path, PathBuf}, -}; -use task_manager::ManagedTask; -use tokio::{ - fs::{self, File, OpenOptions}, - io::{AsyncWriteExt, BufWriter}, - sync::{ - mpsc::{self, error::SendTimeoutError}, - oneshot, - }, - time, -}; -use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedWrite}; - -pub const DEFAULT_SINK_ROLL_MINS: i64 = 3; - -#[cfg(not(test))] -pub const SINK_CHECK_MILLIS: i64 = 60_000; -#[cfg(test)] -pub const SINK_CHECK_MILLIS: i64 = 50; - -pub const MAX_FRAME_LENGTH: usize = 15_000_000; - -type Sink = GzipEncoder>; -type Transport = FramedWrite; -pub type FileManifest = Vec; - -fn new_transport(sink: Sink) -> Transport { - LengthDelimitedCodec::builder() - .max_frame_length(MAX_FRAME_LENGTH) - .new_write(sink) -} - -fn transport_sink(transport: &mut Transport) -> &mut Sink { - transport.get_mut() -} - -#[derive(Debug)] -pub enum Message { - Data(oneshot::Sender, Vec), - Commit(oneshot::Sender>), - Rollback(oneshot::Sender>), -} - -pub type MessageSender = mpsc::Sender; -pub type MessageReceiver = mpsc::Receiver; - -fn message_channel(size: usize) -> (MessageSender, MessageReceiver) { - mpsc::channel(size) -} - -pub struct FileSinkBuilder { - prefix: String, - target_path: PathBuf, - tmp_path: PathBuf, - max_size: usize, - roll_time: Duration, - file_upload: Option, - auto_commit: bool, - metric: &'static str, -} - -impl FileSinkBuilder { - pub fn new(prefix: impl ToString, target_path: &Path, metric: &'static str) -> Self { - Self { - prefix: prefix.to_string(), - target_path: target_path.to_path_buf(), - tmp_path: target_path.join("tmp"), - max_size: 50_000_000, - roll_time: Duration::minutes(DEFAULT_SINK_ROLL_MINS), - file_upload: None, - auto_commit: true, - metric, - } - } - - pub fn max_size(self, max_size: usize) -> Self { - Self { max_size, ..self } - } - - pub fn target_path(self, target_path: &Path) -> Self { - Self { - target_path: target_path.to_path_buf(), - ..self - } - } - - pub fn tmp_path(self, path: &Path) -> Self { - Self { - tmp_path: path.to_path_buf(), - ..self - } - } - - pub fn file_upload(self, file_upload: Option) -> Self { - Self { - file_upload, - ..self - } - } - - pub fn auto_commit(self, auto_commit: bool) -> Self { - Self { - auto_commit, - ..self - } - } - - pub fn roll_time(self, duration: Duration) -> Self { - Self { - roll_time: duration, - ..self - } - } - - pub async fn create(self) -> Result<(FileSinkClient, FileSink)> { - let (tx, rx) = message_channel(50); - - let client = FileSinkClient { - sender: tx, - metric: self.metric, - }; - - metrics::register_counter!(client.metric, vec![OK_LABEL]); - - let mut sink = FileSink { - target_path: self.target_path, - tmp_path: self.tmp_path, - prefix: self.prefix, - max_size: self.max_size, - file_upload: self.file_upload, - roll_time: self.roll_time, - messages: rx, - staged_files: Vec::new(), - auto_commit: self.auto_commit, - active_sink: None, - }; - sink.init().await?; - Ok((client, sink)) - } -} - -#[derive(Debug, Clone)] -pub struct FileSinkClient { - sender: MessageSender, - metric: &'static str, -} - -const OK_LABEL: Label = Label::from_static_parts("status", "ok"); -const ERROR_LABEL: Label = Label::from_static_parts("status", "error"); -const SEND_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); - -impl FileSinkClient { - pub async fn write( - &self, - item: T, - labels: impl IntoIterator, - ) -> Result> { - let (on_write_tx, on_write_rx) = oneshot::channel(); - let bytes = item.encode_to_vec(); - let labels = labels.into_iter().map(Label::from); - - tokio::select! { - // TODO: check this again, do we need shutdown handling here? - // _ = self.shutdown_listener.clone() => { - // Err(Error::Shutdown) - // } - result = self.sender.send_timeout(Message::Data(on_write_tx, bytes), SEND_TIMEOUT) => match result { - Ok(_) => { - metrics::increment_counter!( - self.metric, - labels - .chain(std::iter::once(OK_LABEL)) - .collect::>() - ); - tracing::debug!("file_sink write succeeded for {:?}", self.metric); - Ok(on_write_rx) - } - Err(SendTimeoutError::Closed(_)) => { - metrics::increment_counter!( - self.metric, - labels - .chain(std::iter::once(ERROR_LABEL)) - .collect::>() - ); - tracing::error!("file_sink write failed for {:?} channel closed", self.metric); - Err(Error::channel()) - } - Err(SendTimeoutError::Timeout(_)) => { - tracing::error!("file_sink write failed due to send timeout"); - Err(Error::SendTimeout) - } - }, - } - } - - pub async fn commit(&self) -> Result>> { - let (on_commit_tx, on_commit_rx) = oneshot::channel(); - self.sender - .send(Message::Commit(on_commit_tx)) - .await - .map_err(|e| { - tracing::error!("file_sink failed to commit with {e:?}"); - Error::channel() - }) - .map(|_| on_commit_rx) - } - - pub async fn rollback(&self) -> Result>> { - let (on_rollback_tx, on_rollback_rx) = oneshot::channel(); - self.sender - .send(Message::Rollback(on_rollback_tx)) - .await - .map_err(|e| { - tracing::error!("file_sink failed to rollback with {e:?}"); - Error::channel() - }) - .map(|_| on_rollback_rx) - } -} - -#[derive(Debug)] -pub struct FileSink { - target_path: PathBuf, - tmp_path: PathBuf, - prefix: String, - max_size: usize, - roll_time: Duration, - - messages: MessageReceiver, - file_upload: Option, - staged_files: Vec, - auto_commit: bool, - - active_sink: Option, -} - -#[derive(Debug)] -struct ActiveSink { - size: usize, - time: DateTime, - transport: Transport, -} - -impl ActiveSink { - async fn shutdown(&mut self) -> Result { - transport_sink(&mut self.transport).shutdown().await?; - Ok(()) - } -} - -impl ManagedTask for FileSink { - fn start_task( - self: Box, - shutdown: triggered::Listener, - ) -> LocalBoxFuture<'static, anyhow::Result<()>> { - let handle = tokio::spawn(self.run(shutdown)); - - Box::pin( - handle - .map_err(anyhow::Error::from) - .and_then(|result| async move { result.map_err(anyhow::Error::from) }), - ) - } -} - -impl FileSink { - async fn init(&mut self) -> Result { - fs::create_dir_all(&self.target_path).await?; - fs::create_dir_all(&self.tmp_path).await?; - // Move any partial previous sink files to the target - let mut dir = fs::read_dir(&self.tmp_path).await?; - loop { - match dir.next_entry().await { - Ok(Some(entry)) - if entry - .file_name() - .to_string_lossy() - .starts_with(&self.prefix) => - { - if self.auto_commit { - let _ = self.deposit_sink(&entry.path()).await; - } else { - let _ = fs::remove_file(&entry.path()).await; - } - } - Ok(None) => break, - _ => continue, - } - } - - // Notify all existing completed sinks - if let Some(file_upload) = &self.file_upload { - let mut dir = fs::read_dir(&self.target_path).await?; - loop { - match dir.next_entry().await { - Ok(Some(entry)) - if entry - .file_name() - .to_string_lossy() - .starts_with(&self.prefix) => - { - file_upload.upload_file(&entry.path()).await?; - } - Ok(None) => break, - _ => continue, - } - } - } - Ok(()) - } - - pub async fn run(mut self, shutdown: triggered::Listener) -> Result { - tracing::info!( - "starting file sink {} in {}", - self.prefix, - self.target_path.display() - ); - - let mut rollover_timer = time::interval( - Duration::milliseconds(SINK_CHECK_MILLIS) - .to_std() - .expect("valid sink roll time"), - ); - rollover_timer.set_missed_tick_behavior(time::MissedTickBehavior::Burst); - - loop { - tokio::select! { - _ = shutdown.clone() => break, - _ = rollover_timer.tick() => self.maybe_roll().await?, - msg = self.messages.recv() => match msg { - Some(Message::Data(on_write_tx, bytes)) => { - let res = match self.write(Bytes::from(bytes)).await { - Ok(_) => Ok(()), - Err(err) => { - tracing::error!("failed to store {}: {err:?}", &self.prefix); - Err(err) - } - }; - let _ = on_write_tx.send(res); - } - Some(Message::Commit(on_commit_tx)) => { - let res = self.commit().await; - let _ = on_commit_tx.send(res); - } - Some(Message::Rollback(on_rollback_tx)) => { - let res = self.rollback().await; - let _ = on_rollback_tx.send(res); - } - None => { - break - } - } - } - } - tracing::info!("stopping file sink {}", &self.prefix); - if let Some(active_sink) = self.active_sink.as_mut() { - let _ = active_sink.shutdown().await; - self.active_sink = None; - } - Ok(()) - } - - async fn new_sink(&mut self) -> Result { - let sink_time = Utc::now(); - let filename = format!("{}.{}.gz", self.prefix, sink_time.timestamp_millis()); - let new_path = self.tmp_path.join(filename); - let writer = GzipEncoder::new(BufWriter::new( - OpenOptions::new() - .write(true) - .create(true) - .open(&new_path) - .await?, - )); - - self.staged_files.push(new_path); - - self.active_sink = Some(ActiveSink { - size: 0, - time: sink_time, - transport: new_transport(writer), - }); - - Ok(()) - } - - pub async fn commit(&mut self) -> Result { - self.maybe_close_active_sink().await?; - - let mut manifest: FileManifest = Vec::new(); - let staged_files = mem::take(&mut self.staged_files); - - for staged_file in staged_files.into_iter() { - self.deposit_sink(staged_file.as_path()).await?; - manifest.push(file_name(&staged_file)?); - } - - Ok(manifest) - } - - pub async fn rollback(&mut self) -> Result { - self.maybe_close_active_sink().await?; - - let mut manifest: FileManifest = Vec::new(); - let staged_files = mem::take(&mut self.staged_files); - - for staged_file in staged_files.into_iter() { - fs::remove_file(&staged_file).await?; - manifest.push(file_name(&staged_file)?); - } - - Ok(manifest) - } - - pub async fn maybe_roll(&mut self) -> Result { - if let Some(active_sink) = self.active_sink.as_mut() { - if (active_sink.time + self.roll_time) <= Utc::now() { - if self.auto_commit { - self.commit().await?; - } else { - self.maybe_close_active_sink().await?; - } - } - } - Ok(()) - } - - async fn maybe_close_active_sink(&mut self) -> Result { - if let Some(active_sink) = self.active_sink.as_mut() { - active_sink.shutdown().await?; - self.active_sink = None; - } - - Ok(()) - } - - async fn deposit_sink(&mut self, sink_path: &Path) -> Result { - if !sink_path.exists() { - return Ok(()); - } - let target_filename = sink_path.file_name().ok_or_else(|| { - Error::from(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "expected sink filename", - )) - })?; - let target_path = self.target_path.join(target_filename); - - fs::rename(&sink_path, &target_path).await?; - if let Some(file_upload) = &self.file_upload { - file_upload.upload_file(&target_path).await?; - }; - - Ok(()) - } - - pub async fn write(&mut self, buf: Bytes) -> Result { - let buf_len = buf.len(); - - match self.active_sink.as_mut() { - // If there is an active sink check if the write would make it too - // large. if so deposit and make a new sink. Otherwise the current - // active sink is usable. - Some(active_sink) => { - if active_sink.size + buf_len >= self.max_size { - active_sink.shutdown().await?; - if self.auto_commit { - self.commit().await?; - } - self.new_sink().await?; - } - } - // No sink, make a new one - None => { - self.new_sink().await?; - } - } - - if let Some(active_sink) = self.active_sink.as_mut() { - active_sink.transport.send(buf).await?; - active_sink.size += buf_len; - Ok(()) - } else { - Err(Error::from(io::Error::new( - io::ErrorKind::Other, - "sink not available", - ))) - } - } -} - -fn file_name(path_buf: &Path) -> Result { - path_buf - .file_name() - .map(|os_str| os_str.to_string_lossy().to_string()) - .ok_or_else(|| { - Error::from(std::io::Error::new( - std::io::ErrorKind::InvalidInput, - "expected sink filename", - )) - }) -} - -// #[cfg(test)] -// mod tests { -// use super::*; -// use crate::{file_source, FileInfo, FileType}; -// use futures::stream::StreamExt; -// use std::str::FromStr; -// use tempfile::TempDir; -// use tokio::fs::DirEntry; - -// #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -// async fn writes_a_framed_gzip_encoded_file() { -// let tmp_dir = TempDir::new().expect("Unable to create temp dir"); -// let (shutdown_trigger, shutdown_listener) = triggered::trigger(); - -// let (file_sink_client, mut file_sink_server) = -// FileSinkBuilder::new(FileType::EntropyReport, tmp_dir.path(), "fake_metric") -// .roll_time(chrono::Duration::milliseconds(100)) -// .create() -// .await -// .expect("failed to create file sink"); - -// let sink_thread = tokio::spawn(async move { -// file_sink_server -// .run() -// .await -// .expect("failed to complete file sink"); -// }); - -// let (on_write_tx, _on_write_rx) = oneshot::channel(); - -// file_sink_client -// .sender -// .try_send(Message::Data( -// on_write_tx, -// String::into_bytes("hello".to_string()), -// )) -// .expect("failed to send bytes to file sink"); - -// tokio::time::sleep(time::Duration::from_millis(200)).await; - -// shutdown_trigger.trigger(); -// sink_thread.await.expect("file sink did not complete"); - -// let entropy_file = get_entropy_file(&tmp_dir) -// .await -// .expect("no entropy available"); -// assert_eq!("hello", read_file(&entropy_file).await); -// } - -// #[tokio::test] -// async fn only_uploads_after_commit_when_auto_commit_is_false() { -// let tmp_dir = TempDir::new().expect("Unable to create temp dir"); -// let (shutdown_trigger, shutdown_listener) = triggered::trigger(); -// let (file_upload_tx, mut file_upload_rx) = file_upload::message_channel(); - -// let (file_sink_client, mut file_sink_server) = FileSinkBuilder::new( -// FileType::EntropyReport, -// tmp_dir.path(), -// "fake_metric", -// shutdown_listener.clone(), -// ) -// .roll_time(chrono::Duration::milliseconds(100)) -// .auto_commit(false) -// .file_upload(Some(file_upload_tx)) -// .create() -// .await -// .expect("failed to create file sink"); - -// let sink_thread = tokio::spawn(async move { -// file_sink_server -// .run() -// .await -// .expect("failed to complete file sink"); -// }); - -// let (on_write_tx, _on_write_rx) = oneshot::channel(); -// file_sink_client -// .sender -// .try_send(Message::Data( -// on_write_tx, -// String::into_bytes("hello".to_string()), -// )) -// .expect("failed to send bytes to file sink"); - -// tokio::time::sleep(time::Duration::from_millis(200)).await; - -// assert!(get_entropy_file(&tmp_dir).await.is_err()); -// assert_eq!( -// Err(tokio::sync::mpsc::error::TryRecvError::Empty), -// file_upload_rx.try_recv() -// ); - -// let receiver = file_sink_client.commit().await.expect("commit failed"); -// let _ = receiver.await.expect("commit didn't complete completed"); - -// assert!(file_upload_rx.try_recv().is_ok()); - -// let entropy_file = get_entropy_file(&tmp_dir) -// .await -// .expect("no entropy available"); -// assert_eq!("hello", read_file(&entropy_file).await); - -// shutdown_trigger.trigger(); -// sink_thread.await.expect("file sink did not complete"); -// } - -// async fn read_file(entry: &DirEntry) -> bytes::BytesMut { -// file_source::source([entry.path()]) -// .next() -// .await -// .unwrap() -// .expect("invalid data in file") -// } - -// async fn get_entropy_file(tmp_dir: &TempDir) -> std::result::Result { -// let mut entries = fs::read_dir(tmp_dir.path()) -// .await -// .expect("failed to read tmp dir"); - -// while let Some(entry) = entries.next_entry().await.unwrap() { -// if is_entropy_file(&entry) { -// return Ok(entry); -// } -// } - -// Err("no entropy available".to_string()) -// } - -// fn is_entropy_file(entry: &DirEntry) -> bool { -// entry -// .file_name() -// .to_str() -// .and_then(|file_name| FileInfo::from_str(file_name).ok()) -// .map_or(false, |file_info| { -// FileType::from_str(&file_info.prefix).expect("entropy report prefix") -// == FileType::EntropyReport -// }) -// } -// } diff --git a/file_store/src/file_source_tm.rs b/file_store/src/file_source_tm.rs deleted file mode 100644 index 7b0fbc0ed..000000000 --- a/file_store/src/file_source_tm.rs +++ /dev/null @@ -1,93 +0,0 @@ -use crate::{file_info_poller_tm::FileInfoPollerConfigBuilder, file_sink, BytesMutStream, Error}; -use async_compression::tokio::bufread::GzipDecoder; -use futures::{ - stream::{self}, - StreamExt, TryFutureExt, TryStreamExt, -}; -use std::path::{Path, PathBuf}; -use tokio::{fs::File, io::BufReader}; -use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedRead}; - -pub fn continuous_source() -> FileInfoPollerConfigBuilder -where - T: Clone, -{ - FileInfoPollerConfigBuilder::::default() -} - -pub fn source(paths: I) -> BytesMutStream -where - I: IntoIterator, - P: AsRef, -{ - let paths: Vec = paths - .into_iter() - .map(|path| path.as_ref().to_path_buf()) - .collect(); - stream::iter(paths) - .map(|path| File::open(path).map_err(Error::from)) - .buffered(2) - .flat_map(|file| match file { - Ok(file) => { - let buf_reader = BufReader::new(file); - let codec = LengthDelimitedCodec::builder() - .max_frame_length(file_sink::MAX_FRAME_LENGTH) - .new_codec(); - - FramedRead::new(GzipDecoder::new(buf_reader), codec) - .map_err(Error::from) - .boxed() - } - Err(err) => stream::once(async { Err(err) }).boxed(), - }) - .boxed() -} - -#[cfg(test)] -mod test { - use super::*; - use crate::{FileInfo, FileInfoStream, FileStore, Settings}; - use std::str::FromStr; - - fn infos(names: &'static [&str]) -> FileInfoStream { - futures::stream::iter(names.iter().map(|v| FileInfo::from_str(v))).boxed() - } - - #[tokio::test] - #[ignore = "credentials required"] - async fn test_multi_read() { - // - // Run with `cargo test -- --include-ignored` - // - // Use FileStore::get. These two files exist in the devnet bucket: - // - // aws s3 ls s3://devnet-poc5g-rewards - // 2022-08-05 15:35:55 240363 cell_heartbeat.1658832527866.gz - // 2022-08-05 15:36:08 6525274 cell_heartbeat.1658834120042.gz - // - - let settings = Settings { - bucket: "devnet-poc5g-rewards".to_string(), - endpoint: None, - region: "us-east-1".to_string(), - access_key_id: None, - secret_access_key: None, - }; - - let file_store = FileStore::from_settings(&settings) - .await - .expect("file store"); - let stream = file_store.source(infos(&[ - "cell_heartbeat.1658832527866.gz", - "cell_heartbeat.1658834120042.gz", - ])); - let p1_stream = file_store.source(infos(&["cell_heartbeat.1658832527866.gz"])); - let p2_stream = file_store.source(infos(&["cell_heartbeat.1658834120042.gz"])); - - let p1_count = p1_stream.count().await; - let p2_count = p2_stream.count().await; - let multi_count = stream.count().await; - - assert_eq!(multi_count, p1_count + p2_count); - } -}