From 0cced219afd21d7c8d6c9bbccf759f2c0d4a78f9 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Wed, 21 Aug 2024 10:02:57 -0700 Subject: [PATCH] `FileInfoPollerServer` lookback exceeds limit leads to reprocessing files (#855) * break out file poller store into trait for testing * fix reprocessing files when more than the limit arrives within the lookback offset When the cleaning process is triggered, it will get the timestamp of the 100th oldest file that has been processed. If that time is greater than the lookback offset, we will only remove files older than the lookback. Otherwise, we will remove any file older than the 100th entry. - add cache clean logging - break out getting FileInfo from s3 with `FileInfoPollerStore` trait * move file-store to the CI job that needs a postgres container for tests `FileInfoPollerServer` has a test that uses a database now. --- .github/workflows/CI.yml | 4 +- file_store/src/file_info_poller.rs | 314 ++++++++++++++++-- file_store/src/file_source.rs | 7 +- .../src/sp_boosted_rewards_bans.rs | 1 + 4 files changed, 286 insertions(+), 40 deletions(-) diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 3db75735f..21500e3a7 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -92,7 +92,7 @@ jobs: strategy: fail-fast: false matrix: - package: [boost-manager,iot-config,iot-packet-verifier,iot-verifier,mobile-config,mobile-verifier] + package: [boost-manager,file-store,iot-config,iot-packet-verifier,iot-verifier,mobile-config,mobile-verifier] concurrency: group: ${{ github.workflow }}-${{ github.ref }}-tests-postgres-${{ matrix.package }} cancel-in-progress: true @@ -142,7 +142,7 @@ jobs: strategy: fail-fast: false matrix: - package: [coverage-map,coverage-point-calculator,file-store,ingest,mobile-packet-verifier,reward-scheduler,task-manager] + package: [coverage-map,coverage-point-calculator,ingest,mobile-packet-verifier,reward-scheduler,task-manager] concurrency: group: ${{ github.workflow }}-${{ github.ref }}-tests-${{ matrix.package }} cancel-in-progress: true diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index 7d80183bf..ecaa9bfdb 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -5,6 +5,7 @@ use derive_builder::Builder; use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt}; use futures_util::TryFutureExt; use retainer::Cache; +use sqlx::postgres::PgQueryResult; use std::{collections::VecDeque, marker::PhantomData, sync::Arc, time::Duration}; use task_manager::ManagedTask; use tokio::sync::mpsc::{Receiver, Sender}; @@ -27,7 +28,13 @@ pub trait FileInfoPollerState: Send + Sync + 'static { async fn exists(&self, process_name: &str, file_info: &FileInfo) -> Result; - async fn clean(&self, process_name: &str, file_type: &str) -> Result; + // Returns number of items cleaned + async fn clean( + &self, + process_name: &str, + file_type: &str, + offset: DateTime, + ) -> Result; } #[async_trait::async_trait] @@ -40,6 +47,19 @@ pub trait FileInfoPollerStateRecorder { async fn record(self, process_name: &str, file_info: &FileInfo) -> Result; } +#[async_trait::async_trait] +pub trait FileInfoPollerStore: Send + Sync + 'static { + async fn list_all(&self, file_type: &str, after: A, before: B) -> Result> + where + A: Into>> + Send + Sync + Copy, + B: Into>> + Send + Sync + Copy; + + async fn get_raw(&self, key: K) -> Result + where + K: Into + Send + Sync; +} + +#[derive(Debug)] pub struct FileInfoStream { pub file_info: FileInfo, process_name: String, @@ -84,13 +104,13 @@ pub enum LookbackBehavior { #[derive(Debug, Clone, Builder)] #[builder(pattern = "owned")] -pub struct FileInfoPollerConfig { +pub struct FileInfoPollerConfig { #[builder(default = "DEFAULT_POLL_DURATION")] poll_duration: Duration, - state: S, - store: FileStore, + state: State, + store: Store, prefix: String, - parser: P, + parser: Parser, lookback: LookbackBehavior, #[builder(default = "Duration::from_secs(10 * 60)")] offset: Duration, @@ -99,28 +119,33 @@ pub struct FileInfoPollerConfig { #[builder(default = r#""default".to_string()"#)] process_name: String, #[builder(setter(skip))] - p: PhantomData, + p: PhantomData, } #[derive(Clone)] -pub struct FileInfoPollerServer { - config: FileInfoPollerConfig, - sender: Sender>, +pub struct FileInfoPollerServer { + config: FileInfoPollerConfig, + sender: Sender>, file_queue: VecDeque, latest_file_timestamp: Option>, cache: MemoryFileCache, } type FileInfoStreamReceiver = Receiver>; -impl FileInfoPollerConfigBuilder + +impl FileInfoPollerConfigBuilder where - T: Clone, - S: FileInfoPollerState, - P: FileInfoPollerParser, + Message: Clone, + State: FileInfoPollerState, + Parser: FileInfoPollerParser, + Store: FileInfoPollerStore, { pub async fn create( self, - ) -> Result<(FileInfoStreamReceiver, FileInfoPollerServer)> { + ) -> Result<( + FileInfoStreamReceiver, + FileInfoPollerServer, + )> { let config = self.build()?; let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_size); let latest_file_timestamp = config @@ -141,11 +166,11 @@ where } } -impl ManagedTask for FileInfoPollerServer +impl ManagedTask for FileInfoPollerServer where - T: Send + Sync + 'static, - S: FileInfoPollerState, - P: FileInfoPollerParser, + Message: Send + Sync + 'static, + State: FileInfoPollerState, + Parser: FileInfoPollerParser, { fn start_task( self: Box, @@ -161,11 +186,12 @@ where } } -impl FileInfoPollerServer +impl FileInfoPollerServer where - T: Send + Sync + 'static, - S: FileInfoPollerState, - P: FileInfoPollerParser, + Message: Send + Sync + 'static, + State: FileInfoPollerState, + Parser: FileInfoPollerParser, + Store: FileInfoPollerStore + Send + Sync + 'static, { pub async fn start( self, @@ -253,11 +279,26 @@ where } async fn clean(&self, cache: &MemoryFileCache) -> Result { + let cache_before = cache.len().await; cache.purge(4, 0.25).await; - self.config + let cache_after = cache.len().await; + + let db_removed = self + .config .state - .clean(&self.config.process_name, &self.config.prefix) + .clean( + &self.config.process_name, + &self.config.prefix, + self.after(self.latest_file_timestamp), + ) .await?; + + tracing::info!( + cache_removed = cache_before - cache_after, + db_removed, + "cache clean" + ); + Ok(()) } @@ -355,6 +396,24 @@ async fn cache_file(cache: &MemoryFileCache, file_info: &FileInfo) { cache.insert(file_info.key.clone(), true, CACHE_TTL).await; } +#[async_trait::async_trait] +impl FileInfoPollerStore for FileStore { + async fn list_all(&self, file_type: &str, after: A, before: B) -> Result> + where + A: Into>> + Send + Sync + Copy, + B: Into>> + Send + Sync + Copy, + { + self.list_all(file_type, after, before).await + } + + async fn get_raw(&self, key: K) -> Result + where + K: Into + Send + Sync, + { + self.get_raw(key).await + } +} + #[cfg(feature = "sqlx-postgres")] #[async_trait::async_trait] impl FileInfoPollerStateRecorder for &mut sqlx::Transaction<'_, sqlx::Postgres> { @@ -408,23 +467,208 @@ impl FileInfoPollerState for sqlx::Pool { .map_err(Error::from) } - async fn clean(&self, process_name: &str, file_type: &str) -> Result { - sqlx::query( + async fn clean( + &self, + process_name: &str, + file_type: &str, + offset: DateTime, + ) -> Result { + let t100_timestamp: Option> = sqlx::query_scalar( + r#" + SELECT file_timestamp + FROM files_processed + WHERE process_name = $1 + AND file_type = $2 + ORDER BY file_timestamp DESC + LIMIT 1 OFFSET 100; + "#, + ) + .bind(process_name) + .bind(file_type) + .fetch_optional(self) + .await?; + + let Some(t100) = t100_timestamp else { + // The cleaning limit has not been reached, remove nothing. + return Ok(0); + }; + + // To keep from reprocessing files, we need to make sure rows that exist + // within the offset window are not removed. + let older_than_limit = t100.min(offset); + + let query_result: PgQueryResult = sqlx::query( r#" - DELETE FROM files_processed where file_name in ( - SELECT file_name - FROM files_processed - WHERE process_name = $1 and file_type = $2 - ORDER BY file_timestamp DESC - OFFSET 100 - ) + DELETE FROM files_processed + WHERE process_name = $1 + AND file_type = $2 + AND file_timestamp < $3 "#, ) .bind(process_name) .bind(file_type) + .bind(older_than_limit) .execute(self) .await - .map(|_| ()) - .map_err(Error::from) + .map_err(Error::from)?; + + Ok(query_result.rows_affected()) + } +} + +#[cfg(test)] +mod tests { + + use sqlx::{Executor, PgPool}; + use std::time::Duration; + use tokio::time::timeout; + + use super::*; + + struct TestParser; + struct TestStore(Vec); + + #[async_trait::async_trait] + impl FileInfoPollerParser for TestParser { + async fn parse(&self, _byte_stream: ByteStream) -> Result> { + Ok(vec![]) + } + } + + #[async_trait::async_trait] + impl FileInfoPollerStore for TestStore { + async fn list_all( + &self, + _file_type: &str, + after: A, + before: B, + ) -> Result> + where + A: Into>> + Send + Sync + Copy, + B: Into>> + Send + Sync + Copy, + { + let after = after.into(); + let before = before.into(); + + Ok(self + .0 + .clone() + .into_iter() + .filter(|file_info| after.map_or(true, |v| file_info.timestamp > v)) + .filter(|file_info| before.map_or(true, |v| file_info.timestamp <= v)) + .collect()) + } + + async fn get_raw(&self, _key: K) -> Result + where + K: Into + Send + Sync, + { + Ok(ByteStream::default()) + } + } + + #[sqlx::test] + async fn do_not_reprocess_files_when_offset_exceeds_earliest_file( + pool: PgPool, + ) -> anyhow::Result<()> { + // Cleaning the files_processed table should not cause files within the + // `FileInfoPoller.config.offset` window to be reprocessed. + + // There is no auto-migration for tests in this lib workspace. + pool.execute( + r#" + CREATE TABLE files_processed ( + process_name TEXT NOT NULL DEFAULT 'default', + file_name VARCHAR PRIMARY KEY, + file_type VARCHAR NOT NULL, + file_timestamp TIMESTAMPTZ NOT NULL, + processed_at TIMESTAMPTZ NOT NULL + ); + "#, + ) + .await?; + + // The important aspect of this test is that all the files to be + // processed happen _within_ the lookback offset. + const EXPECTED_FILE_COUNT: i64 = 150; + let mut infos = vec![]; + for seconds in 0..EXPECTED_FILE_COUNT { + let file_info = FileInfo { + key: format!("key-{seconds}"), + prefix: "file_type".to_string(), + timestamp: Utc::now() - chrono::Duration::seconds(seconds), + size: 42, + }; + infos.push(file_info); + } + + // To simulate a restart, we're going to make a new FileInfoPoller. + // This closure is to ensure they have the same settings. + let file_info_builder = || { + let six_hours = chrono::Duration::hours(6).to_std().unwrap(); + FileInfoPollerConfigBuilder::::default() + .parser(TestParser) + .state(pool.clone()) + .store(TestStore(infos.clone())) + .lookback(LookbackBehavior::Max(six_hours)) + .prefix("file_type".to_string()) + .offset(six_hours) + .create() + }; + + // The first startup of the file info poller, there is nothing to clean. + // And all file_infos will be returned to be processed. + let (mut receiver, ingest_server) = file_info_builder().await?; + let (trigger, shutdown) = triggered::trigger(); + tokio::spawn(async move { + if let Err(status) = ingest_server.run(shutdown).await { + println!("ingest server went down unexpectedly: {status:?}"); + } + }); + + // "process" all the files. They are not recorded into the database + // until the file is consumed as a stream. + let mut processed = 0; + while processed < EXPECTED_FILE_COUNT { + match timeout(Duration::from_secs(1), receiver.recv()).await? { + Some(msg) => { + processed += 1; + let mut txn = pool.begin().await?; + let _x = msg.into_stream(&mut txn).await?; + txn.commit().await?; + } + err => panic!("something went wrong: {err:?}"), + }; + } + + // Shutdown the ingest server, we're going to create a new one and start it. + trigger.trigger(); + + // The second startup of the file info poller, there are 100+ files that + // have been processed. The initial clean should not remove processed + // files in a way that causes us to re-receive any files within our + // offset for processing. + let (mut receiver, ingest_server) = file_info_builder().await?; + let (trigger, shutdown) = triggered::trigger(); + let _handle = tokio::spawn(async move { + if let Err(status) = ingest_server.run(shutdown).await { + println!("ingest server went down unexpectedly: {status:?}"); + } + }); + + // Attempting to recieve files for processing. The timeout should fire, + // because all the files we have setup exist within the offset, and + // should still be in the database. + match timeout(Duration::from_secs(1), receiver.recv()).await { + Err(_err) => (), + Ok(msg) => { + panic!("we got something when we expected nothing.: {msg:?}"); + } + } + + // Shut down for great good + trigger.trigger(); + + Ok(()) } } diff --git a/file_store/src/file_source.rs b/file_store/src/file_source.rs index ff77f1162..a028d2d8b 100644 --- a/file_store/src/file_source.rs +++ b/file_store/src/file_source.rs @@ -1,6 +1,6 @@ use crate::{ file_info_poller::{FileInfoPollerConfigBuilder, MsgDecodeFileInfoPollerParser}, - file_sink, BytesMutStream, Error, + file_sink, BytesMutStream, Error, FileStore, }; use async_compression::tokio::bufread::GzipDecoder; use futures::{ @@ -11,11 +11,12 @@ use std::path::{Path, PathBuf}; use tokio::{fs::File, io::BufReader}; use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedRead}; -pub fn continuous_source() -> FileInfoPollerConfigBuilder +pub fn continuous_source( +) -> FileInfoPollerConfigBuilder where T: Clone, { - FileInfoPollerConfigBuilder::::default() + FileInfoPollerConfigBuilder::::default() .parser(MsgDecodeFileInfoPollerParser) } diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index 697b12960..e3ea08cb9 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -166,6 +166,7 @@ where ServiceProviderBoostedRewardsBannedRadioIngestReportV1, _, _, + _, >::default() .parser(ProstFileInfoPollerParser) .state(pool.clone())