diff --git a/boost_manager/src/main.rs b/boost_manager/src/main.rs index f55e923ad..40c8b80fa 100644 --- a/boost_manager/src/main.rs +++ b/boost_manager/src/main.rs @@ -5,8 +5,11 @@ use boost_manager::{ }; use clap::Parser; use file_store::{ - file_info_poller::LookbackBehavior, file_source, file_upload, reward_manifest::RewardManifest, - traits::FileSinkWriteExt, FileStore, FileType, + file_info_poller::LookbackBehavior, + file_source, file_upload, + reward_manifest::RewardManifest, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, + FileStore, FileType, }; use helium_proto::BoostedHexUpdateV1; use mobile_config::client::hex_boosting_client::HexBoostingClient; @@ -103,7 +106,8 @@ impl Server { let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 81548daa1..771754e90 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -253,12 +253,19 @@ pub struct FileSink { target_path: PathBuf, tmp_path: PathBuf, prefix: String, + /// Maximum file size in bytes. If a single write would cause this limit to + /// be exceeded, the `active_sink` is rolled. max_size: usize, + /// Window within which writes can occur to `active_sink`. `roll_time` is + /// not checked during writing, so a file may contain items exceeding the + /// window of `roll_time`. roll_time: Duration, messages: MessageReceiver, file_upload: FileUpload, staged_files: Vec, + /// 'commit' the file to s3 automatically when either the `roll_time` is + /// surpassed, or `max_size` would be exceeded by an incoming message. auto_commit: bool, active_sink: Option, diff --git a/file_store/src/traits/file_sink_write.rs b/file_store/src/traits/file_sink_write.rs index 6aa8ea861..fae6108d1 100644 --- a/file_store/src/traits/file_sink_write.rs +++ b/file_store/src/traits/file_sink_write.rs @@ -14,6 +14,22 @@ use helium_proto::{ pub const DEFAULT_ROLL_TIME: Duration = Duration::from_secs(DEFAULT_SINK_ROLL_SECS); +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum FileSinkCommitStrategy { + /// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded. + Manual, + /// Files will be automatically uploaded when + /// [`FileSinkBuilder::max_size()`] is exceeded, or [`DEFAULT_ROLL_TIME`] has elapsed. + Automatic, +} + +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum FileSinkRollTime { + /// Default is 3 minutes + Default, + Duration(Duration), +} + #[async_trait::async_trait] pub trait FileSinkWriteExt where @@ -22,14 +38,11 @@ where const FILE_PREFIX: &'static str; const METRIC_SUFFIX: &'static str; - // The `auto_commit` option and `roll_time` option are incompatible with - // each other. It doesn't make sense to roll a file every so often _and_ - // commit it every time something is written. If a roll_time is provided, - // `auto_commit` is set to false. async fn file_sink( target_path: &Path, file_upload: FileUpload, - roll_time: Option, + commit_strategy: FileSinkCommitStrategy, + roll_time: FileSinkRollTime, metric_prefix: &str, ) -> Result<(FileSinkClient, FileSink)> { let builder = FileSinkBuilder::new( @@ -39,10 +52,18 @@ where format!("{}_{}", metric_prefix, Self::METRIC_SUFFIX), ); - let builder = if let Some(duration) = roll_time { - builder.auto_commit(false).roll_time(duration) - } else { - builder.auto_commit(true) + let builder = match commit_strategy { + FileSinkCommitStrategy::Manual => { + builder.auto_commit(false).roll_time(DEFAULT_ROLL_TIME) + } + FileSinkCommitStrategy::Automatic => { + builder.auto_commit(true).roll_time(DEFAULT_ROLL_TIME) + } + }; + + let builder = match roll_time { + FileSinkRollTime::Duration(duration) => builder.roll_time(duration), + FileSinkRollTime::Default => builder.roll_time(DEFAULT_ROLL_TIME), }; let file_sink = builder.create().await?; diff --git a/file_store/src/traits/mod.rs b/file_store/src/traits/mod.rs index ae37821c8..3a55973b0 100644 --- a/file_store/src/traits/mod.rs +++ b/file_store/src/traits/mod.rs @@ -5,7 +5,9 @@ mod msg_timestamp; mod msg_verify; mod report_id; -pub use file_sink_write::{FileSinkWriteExt, DEFAULT_ROLL_TIME}; +pub use file_sink_write::{ + FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, DEFAULT_ROLL_TIME, +}; pub use msg_bytes::MsgBytes; pub use msg_decode::MsgDecode; pub use msg_timestamp::{MsgTimestamp, TimestampDecode, TimestampEncode}; diff --git a/ingest/src/server_iot.rs b/ingest/src/server_iot.rs index f80034d69..eb82d73a1 100644 --- a/ingest/src/server_iot.rs +++ b/ingest/src/server_iot.rs @@ -4,7 +4,7 @@ use chrono::Utc; use file_store::{ file_sink::FileSinkClient, file_upload, - traits::{FileSinkWriteExt, MsgVerify}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, MsgVerify}, }; use futures::{ future::{LocalBoxFuture, TryFutureExt}, @@ -365,7 +365,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (beacon_report_sink, beacon_report_sink_server) = LoraBeaconIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -374,7 +375,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (witness_report_sink, witness_report_sink_server) = LoraWitnessIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 636bd156d..c1e91ded1 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -4,7 +4,7 @@ use chrono::Utc; use file_store::{ file_sink::FileSinkClient, file_upload, - traits::{FileSinkWriteExt, MsgVerify}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, MsgVerify}, }; use futures::future::LocalBoxFuture; use futures_util::TryFutureExt; @@ -450,7 +450,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { CellHeartbeatIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -459,7 +460,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { WifiHeartbeatIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -468,7 +470,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { let (speedtest_report_sink, speedtest_report_sink_server) = SpeedtestIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -477,7 +480,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { DataTransferSessionIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -486,7 +490,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { SubscriberLocationIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -495,7 +500,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { RadioThresholdIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -504,7 +510,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { InvalidatedRadioThresholdIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -513,7 +520,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { CoverageObjectIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -522,7 +530,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { ServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; @@ -531,7 +540,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> { SubscriberVerifiedMappingEventIngestReportV1::file_sink( store_base_path, file_upload.clone(), - Some(settings.roll_time), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(settings.roll_time), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 77b1697dd..5b4b21710 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -11,7 +11,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload, iot_packet::PacketRouterPacketReport, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use futures_util::TryFutureExt; @@ -141,7 +141,8 @@ impl Cmd { let (valid_packets, valid_packets_server) = ValidPacket::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -149,7 +150,8 @@ impl Cmd { let (invalid_packets, invalid_packets_server) = InvalidPacket::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 5afeb8866..a7e0192f0 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -6,7 +6,7 @@ use file_store::{ file_info_poller::LookbackBehavior, file_source, file_upload, iot_packet::IotValidPacket, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use helium_proto::{ @@ -124,7 +124,8 @@ impl Server { let (rewards_sink, gateway_rewards_sink_server) = IotRewardShare::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -133,7 +134,8 @@ impl Server { let (reward_manifests_sink, reward_manifests_sink_server) = RewardManifest::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -177,7 +179,8 @@ impl Server { NonRewardablePacket::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -210,7 +213,8 @@ impl Server { LoraInvalidBeaconReportV1::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -219,7 +223,8 @@ impl Server { LoraInvalidWitnessReportV1::file_sink( store_base_path, file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -243,7 +248,8 @@ impl Server { LoraInvalidBeaconReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -252,7 +258,8 @@ impl Server { LoraInvalidWitnessReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(5 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(5 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -260,7 +267,8 @@ impl Server { let (runner_poc_sink, runner_poc_sink_server) = LoraPocV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(2 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(2 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 8e9e95ccc..86d639a54 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -8,7 +8,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload, mobile_session::DataTransferSessionIngestReport, - traits::FileSinkWriteExt, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; @@ -174,7 +174,8 @@ impl Cmd { let (valid_sessions, valid_sessions_server) = ValidDataTransferSession::file_sink( store_base_path, file_upload.clone(), - None, + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -183,7 +184,8 @@ impl Cmd { InvalidDataTransferIngestReportV1::file_sink( store_base_path, file_upload.clone(), - None, + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -191,7 +193,8 @@ impl Cmd { let (pending_sessions, pending_sessions_server) = PendingDataTransferSessionV1::file_sink( store_base_path, file_upload.clone(), - None, + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/boosting_oracles/data_sets.rs b/mobile_verifier/src/boosting_oracles/data_sets.rs index 32819fd66..30da679b8 100644 --- a/mobile_verifier/src/boosting_oracles/data_sets.rs +++ b/mobile_verifier/src/boosting_oracles/data_sets.rs @@ -9,7 +9,10 @@ use chrono::{DateTime, Utc}; use file_store::{ file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkWriteExt, TimestampDecode, TimestampEncode}, + traits::{ + FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampDecode, + TimestampEncode, + }, FileStore, }; use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; @@ -262,7 +265,8 @@ impl OracleBoostingReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 0ce51d47c..193455c5c 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -15,7 +15,11 @@ use crate::{ telemetry, Settings, }; use anyhow::Result; -use file_store::{file_upload, traits::FileSinkWriteExt, FileStore}; +use file_store::{ + file_upload, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, + FileStore, +}; use helium_proto::services::poc_mobile::{Heartbeat, SeniorityUpdate, SpeedtestAvg}; use mobile_config::client::{ entity_client::EntityClient, hex_boosting_client::HexBoostingClient, AuthorizationClient, @@ -52,7 +56,8 @@ impl Cmd { let (valid_heartbeats, valid_heartbeats_server) = Heartbeat::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -61,7 +66,8 @@ impl Cmd { let (seniority_updates, seniority_updates_server) = SeniorityUpdate::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; @@ -69,7 +75,8 @@ impl Cmd { let (speedtests_avg, speedtests_avg_server) = SpeedtestAvg::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index d10aebe7a..e7178e177 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -10,7 +10,7 @@ use file_store::{ file_sink::FileSinkClient, file_source, file_upload::FileUpload, - traits::{FileSinkWriteExt, TimestampEncode}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode}, FileStore, FileType, }; use futures::{ @@ -89,7 +89,8 @@ impl CoverageDaemon { let (valid_coverage_objs, valid_coverage_objs_server) = proto::CoverageObjectV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/radio_threshold.rs b/mobile_verifier/src/radio_threshold.rs index c10453bff..158c31dd9 100644 --- a/mobile_verifier/src/radio_threshold.rs +++ b/mobile_verifier/src/radio_threshold.rs @@ -11,7 +11,7 @@ use file_store::{ mobile_radio_threshold::{ RadioThresholdIngestReport, RadioThresholdReportReq, VerifiedRadioThresholdIngestReport, }, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; @@ -73,7 +73,8 @@ where VerifiedRadioThresholdIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -82,7 +83,8 @@ where VerifiedInvalidatedRadioThresholdIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/rewarder.rs b/mobile_verifier/src/rewarder.rs index 8d5f81426..2befe03ac 100644 --- a/mobile_verifier/src/rewarder.rs +++ b/mobile_verifier/src/rewarder.rs @@ -17,7 +17,7 @@ use db_store::meta; use file_store::{ file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkWriteExt, TimestampEncode, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, TimestampEncode}, }; use futures_util::TryFutureExt; @@ -81,7 +81,8 @@ where let (mobile_rewards, mobile_rewards_server) = MobileRewardShare::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; @@ -89,7 +90,8 @@ where let (reward_manifests, reward_manifests_server) = RewardManifest::file_sink( settings.store_base_path(), file_upload, - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/sp_boosted_rewards_bans.rs b/mobile_verifier/src/sp_boosted_rewards_bans.rs index 54433e117..c4bafc051 100644 --- a/mobile_verifier/src/sp_boosted_rewards_bans.rs +++ b/mobile_verifier/src/sp_boosted_rewards_bans.rs @@ -7,7 +7,7 @@ use file_store::{ }, file_sink::FileSinkClient, file_upload::FileUpload, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use futures::{prelude::future::LocalBoxFuture, StreamExt, TryFutureExt, TryStreamExt}; @@ -158,7 +158,8 @@ where VerifiedServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink( settings.store_base_path(), file_upload, - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 97edf8e82..07a116dd3 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -9,7 +9,7 @@ use file_store::{ file_source, file_upload::FileUpload, speedtest::{CellSpeedtest, CellSpeedtestIngestReport}, - traits::FileSinkWriteExt, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use futures::{ @@ -77,7 +77,8 @@ where let (speedtests_validity, speedtests_validity_server) = VerifiedSpeedtestProto::file_sink( settings.store_base_path(), file_upload, - Some(Duration::from_secs(15 * 60)), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Duration(Duration::from_secs(15 * 60)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index 69a09a369..d7446a2df 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -8,7 +8,7 @@ use file_store::{ SubscriberLocationIngestReport, SubscriberLocationReq, VerifiedSubscriberLocationIngestReport, }, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, FileStore, FileType, }; use futures::{StreamExt, TryStreamExt}; @@ -57,7 +57,8 @@ where VerifiedSubscriberLocationIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/mobile_verifier/src/subscriber_verified_mapping_event.rs b/mobile_verifier/src/subscriber_verified_mapping_event.rs index 99ed76015..913ba9c51 100644 --- a/mobile_verifier/src/subscriber_verified_mapping_event.rs +++ b/mobile_verifier/src/subscriber_verified_mapping_event.rs @@ -7,7 +7,7 @@ use file_store::{ file_upload::FileUpload, subscriber_verified_mapping_event::SubscriberVerifiedMappingEvent, subscriber_verified_mapping_event_ingest_report::SubscriberVerifiedMappingEventIngestReport, - traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME}, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, verified_subscriber_verified_mapping_event_ingest_report::VerifiedSubscriberVerifiedMappingEventIngestReport, FileStore, FileType, }; @@ -78,7 +78,8 @@ where VerifiedSubscriberVerifiedMappingEventIngestReportV1::file_sink( settings.store_base_path(), file_upload.clone(), - Some(DEFAULT_ROLL_TIME), + FileSinkCommitStrategy::Manual, + FileSinkRollTime::Default, env!("CARGO_PKG_NAME"), ) .await?; diff --git a/poc_entropy/src/main.rs b/poc_entropy/src/main.rs index e13253438..ba0c5f171 100644 --- a/poc_entropy/src/main.rs +++ b/poc_entropy/src/main.rs @@ -1,6 +1,9 @@ use anyhow::{Error, Result}; use clap::Parser; -use file_store::{file_upload, traits::FileSinkWriteExt}; +use file_store::{ + file_upload, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, +}; use futures_util::TryFutureExt; use helium_proto::EntropyReportV1; use poc_entropy::{entropy_generator::EntropyGenerator, server::ApiServer, Settings}; @@ -74,7 +77,8 @@ impl Server { let (entropy_sink, entropy_sink_server) = EntropyReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(ENTROPY_SINK_ROLL_SECS)), env!("CARGO_PKG_NAME"), ) .await?; diff --git a/price/src/main.rs b/price/src/main.rs index 4695a9005..a1cf2c389 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -1,6 +1,9 @@ use anyhow::Result; use clap::Parser; -use file_store::{file_upload, traits::FileSinkWriteExt}; +use file_store::{ + file_upload, + traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt}, +}; use helium_proto::PriceReportV1; use price::{cli::check, PriceGenerator, Settings}; use std::{ @@ -85,7 +88,8 @@ impl Server { let (price_sink, price_sink_server) = PriceReportV1::file_sink( store_base_path, file_upload.clone(), - Some(Duration::from_secs(PRICE_SINK_ROLL_SECS)), + FileSinkCommitStrategy::Automatic, + FileSinkRollTime::Duration(Duration::from_secs(PRICE_SINK_ROLL_SECS)), env!("CARGO_PKG_NAME"), ) .await?;