Skip to content

Commit

Permalink
Fix roll_time and auto_commit settings for FileSink (#859)
Browse files Browse the repository at this point in the history
* Add FileSinkCommitStrategy

This hopefully clears up, at least for the use of the FileSinkWriteExt
trait the relationship between `auto_commit` and `roll_time`. There is
no default implementaion for this option, as all options have
implications that should be considered.

* Correct FileSink `auto_commit` and `roll_time` usage in Oracles

The PR that updated FileSink to use the Ext trait had an incorrect
understanding of the relationship between `roll_time` and `auto_commit`.

That understanding has since been corrected and codified in the
`FileSinkCommitStrategy` enum.

For this commit, https://github.com/helium/oracles/pull/849/files was
combed through to get all correct settings for FileSinks before they
were transitioned to use FileSinkWriteExt::file_sink().

* Change so auto_commit and roll_time are separated (#860)

* Change so auto_commit and roll_time are separated

* Create FileSinkRollTime enum

---------

Co-authored-by: Brian Balser <[email protected]>
  • Loading branch information
michaeldjeffrey and bbalser authored Aug 30, 2024
1 parent e6adffc commit f10606d
Show file tree
Hide file tree
Showing 20 changed files with 156 additions and 69 deletions.
10 changes: 7 additions & 3 deletions boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ use boost_manager::{
};
use clap::Parser;
use file_store::{
file_info_poller::LookbackBehavior, file_source, file_upload, reward_manifest::RewardManifest,
traits::FileSinkWriteExt, FileStore, FileType,
file_info_poller::LookbackBehavior,
file_source, file_upload,
reward_manifest::RewardManifest,
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt},
FileStore, FileType,
};
use helium_proto::BoostedHexUpdateV1;
use mobile_config::client::hex_boosting_client::HexBoostingClient;
Expand Down Expand Up @@ -103,7 +106,8 @@ impl Server {
let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
7 changes: 7 additions & 0 deletions file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,19 @@ pub struct FileSink<T> {
target_path: PathBuf,
tmp_path: PathBuf,
prefix: String,
/// Maximum file size in bytes. If a single write would cause this limit to
/// be exceeded, the `active_sink` is rolled.
max_size: usize,
/// Window within which writes can occur to `active_sink`. `roll_time` is
/// not checked during writing, so a file may contain items exceeding the
/// window of `roll_time`.
roll_time: Duration,

messages: MessageReceiver<T>,
file_upload: FileUpload,
staged_files: Vec<PathBuf>,
/// 'commit' the file to s3 automatically when either the `roll_time` is
/// surpassed, or `max_size` would be exceeded by an incoming message.
auto_commit: bool,

active_sink: Option<ActiveSink>,
Expand Down
39 changes: 30 additions & 9 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ use helium_proto::{

pub const DEFAULT_ROLL_TIME: Duration = Duration::from_secs(DEFAULT_SINK_ROLL_SECS);

#[derive(Copy, Clone, PartialEq, Eq)]
pub enum FileSinkCommitStrategy {
/// Writer must manually call [`FileSinkClient::commit()`] for files to be uploaded.
Manual,
/// Files will be automatically uploaded when
/// [`FileSinkBuilder::max_size()`] is exceeded, or [`DEFAULT_ROLL_TIME`] has elapsed.
Automatic,
}

#[derive(Copy, Clone, PartialEq, Eq)]
pub enum FileSinkRollTime {
/// Default is 3 minutes
Default,
Duration(Duration),
}

#[async_trait::async_trait]
pub trait FileSinkWriteExt
where
Expand All @@ -22,14 +38,11 @@ where
const FILE_PREFIX: &'static str;
const METRIC_SUFFIX: &'static str;

// The `auto_commit` option and `roll_time` option are incompatible with
// each other. It doesn't make sense to roll a file every so often _and_
// commit it every time something is written. If a roll_time is provided,
// `auto_commit` is set to false.
async fn file_sink(
target_path: &Path,
file_upload: FileUpload,
roll_time: Option<Duration>,
commit_strategy: FileSinkCommitStrategy,
roll_time: FileSinkRollTime,
metric_prefix: &str,
) -> Result<(FileSinkClient<Self>, FileSink<Self>)> {
let builder = FileSinkBuilder::new(
Expand All @@ -39,10 +52,18 @@ where
format!("{}_{}", metric_prefix, Self::METRIC_SUFFIX),
);

let builder = if let Some(duration) = roll_time {
builder.auto_commit(false).roll_time(duration)
} else {
builder.auto_commit(true)
let builder = match commit_strategy {
FileSinkCommitStrategy::Manual => {
builder.auto_commit(false).roll_time(DEFAULT_ROLL_TIME)
}
FileSinkCommitStrategy::Automatic => {
builder.auto_commit(true).roll_time(DEFAULT_ROLL_TIME)
}
};

let builder = match roll_time {
FileSinkRollTime::Duration(duration) => builder.roll_time(duration),
FileSinkRollTime::Default => builder.roll_time(DEFAULT_ROLL_TIME),
};

let file_sink = builder.create().await?;
Expand Down
4 changes: 3 additions & 1 deletion file_store/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ mod msg_timestamp;
mod msg_verify;
mod report_id;

pub use file_sink_write::{FileSinkWriteExt, DEFAULT_ROLL_TIME};
pub use file_sink_write::{
FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, DEFAULT_ROLL_TIME,
};
pub use msg_bytes::MsgBytes;
pub use msg_decode::MsgDecode;
pub use msg_timestamp::{MsgTimestamp, TimestampDecode, TimestampEncode};
Expand Down
8 changes: 5 additions & 3 deletions ingest/src/server_iot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::Utc;
use file_store::{
file_sink::FileSinkClient,
file_upload,
traits::{FileSinkWriteExt, MsgVerify},
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, MsgVerify},
};
use futures::{
future::{LocalBoxFuture, TryFutureExt},
Expand Down Expand Up @@ -365,7 +365,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let (beacon_report_sink, beacon_report_sink_server) = LoraBeaconIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -374,7 +375,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let (witness_report_sink, witness_report_sink_server) = LoraWitnessIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
32 changes: 21 additions & 11 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::Utc;
use file_store::{
file_sink::FileSinkClient,
file_upload,
traits::{FileSinkWriteExt, MsgVerify},
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt, MsgVerify},
};
use futures::future::LocalBoxFuture;
use futures_util::TryFutureExt;
Expand Down Expand Up @@ -450,7 +450,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
CellHeartbeatIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -459,7 +460,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
WifiHeartbeatIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -468,7 +470,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let (speedtest_report_sink, speedtest_report_sink_server) = SpeedtestIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -477,7 +480,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
DataTransferSessionIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -486,7 +490,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
SubscriberLocationIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -495,7 +500,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
RadioThresholdIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -504,7 +510,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
InvalidatedRadioThresholdIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -513,7 +520,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
CoverageObjectIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -522,7 +530,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
ServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -531,7 +540,8 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
SubscriberVerifiedMappingEventIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
8 changes: 5 additions & 3 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use file_store::{
file_sink::FileSinkClient,
file_source, file_upload,
iot_packet::PacketRouterPacketReport,
traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME},
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt},
FileStore, FileType,
};
use futures_util::TryFutureExt;
Expand Down Expand Up @@ -141,15 +141,17 @@ impl Cmd {
let (valid_packets, valid_packets_server) = ValidPacket::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;

let (invalid_packets, invalid_packets_server) = InvalidPacket::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
26 changes: 17 additions & 9 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use file_store::{
file_info_poller::LookbackBehavior,
file_source, file_upload,
iot_packet::IotValidPacket,
traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME},
traits::{FileSinkCommitStrategy, FileSinkRollTime, FileSinkWriteExt},
FileStore, FileType,
};
use helium_proto::{
Expand Down Expand Up @@ -124,7 +124,8 @@ impl Server {
let (rewards_sink, gateway_rewards_sink_server) = IotRewardShare::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -133,7 +134,8 @@ impl Server {
let (reward_manifests_sink, reward_manifests_sink_server) = RewardManifest::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down Expand Up @@ -177,7 +179,8 @@ impl Server {
NonRewardablePacket::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down Expand Up @@ -210,7 +213,8 @@ impl Server {
LoraInvalidBeaconReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -219,7 +223,8 @@ impl Server {
LoraInvalidWitnessReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
FileSinkCommitStrategy::Manual,
FileSinkRollTime::Default,
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -243,7 +248,8 @@ impl Server {
LoraInvalidBeaconReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -252,15 +258,17 @@ impl Server {
LoraInvalidWitnessReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;

let (runner_poc_sink, runner_poc_sink_server) = LoraPocV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(2 * 60)),
FileSinkCommitStrategy::Automatic,
FileSinkRollTime::Duration(Duration::from_secs(2 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand Down
Loading

0 comments on commit f10606d

Please sign in to comment.