Skip to content

Commit

Permalink
combine auto_commit and roll_time into 1 argument
Browse files Browse the repository at this point in the history
This is a specialized helper function, every crate using it was only
using these two options. We can provide the base functionality with a
single option.

Ideally the Builder would be updated to take some sort of enum for the
commit type as it doesn't make a lot of sense to auto_commit on every
write _and_ roll the file at a certain interval.
  • Loading branch information
michaeldjeffrey committed Aug 2, 2024
1 parent da594c7 commit 7fa21ba
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 136 deletions.
4 changes: 2 additions & 2 deletions boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ impl Server {
.await?;

// setup the writer for our updated hexes
let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink_opts(
let (updated_hexes_sink, updated_hexes_sink_server) = BoostedHexUpdateV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(Duration::from_secs(5 * 60)),
)
.await?;

Expand Down
36 changes: 19 additions & 17 deletions file_store/src/traits/file_sink_write.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use std::path::Path;
use std::{path::Path, time::Duration};

use crate::{
file_sink::FileSinkClient, file_upload::FileUpload, traits::msg_bytes::MsgBytes, FileSink,
FileSinkBuilder, FileType, Result,
file_sink::{FileSinkClient, DEFAULT_SINK_ROLL_SECS},
file_upload::FileUpload,
traits::msg_bytes::MsgBytes,
FileSink, FileSinkBuilder, FileType, Result,
};
use helium_proto::{
self as proto,
services::{packet_verifier, poc_lora, poc_mobile},
Message,
};

pub const DEFAULT_ROLL_TIME: Duration = Duration::from_secs(DEFAULT_SINK_ROLL_SECS);

#[async_trait::async_trait]
pub trait FileSinkWriteExt
where
Expand All @@ -18,32 +22,30 @@ where
const FILE_TYPE: FileType;
const METRIC_SUFFIX: &'static str;

// The `auto_commit` option and `roll_time` option are incompatible with
// each other. It doesn't make sense to roll a file every so often _and_
// commit it every time something is written. If a roll_time is provided,
// `auto_commit` is set to false.
async fn file_sink(
target_path: &Path,
file_upload: FileUpload,
roll_time: Option<Duration>,
metric_prefix: &str,
) -> Result<FileSinkBuilder> {
Self::file_sink_opts(target_path, file_upload, metric_prefix, |builder| {
builder.auto_commit(false)
})
.await
}

async fn file_sink_opts(
target_path: &Path,
file_upload: FileUpload,
metric_prefix: &str,
opts_fn: impl FnOnce(FileSinkBuilder) -> FileSinkBuilder + Send,
) -> Result<(FileSinkClient<Self>, FileSink<Self>)> {
let builder = FileSinkBuilder::new(
Self::FILE_TYPE.to_string(),
target_path,
file_upload,
format!("{}_{}", metric_prefix, Self::METRIC_SUFFIX),
);
let builder_opts = opts_fn(builder);

let file_sink = builder_opts.create().await?;
let builder = if let Some(duration) = roll_time {
builder.auto_commit(false).roll_time(duration)
} else {
builder.auto_commit(true)
};

let file_sink = builder.create().await?;
Ok(file_sink)
}
}
Expand Down
2 changes: 1 addition & 1 deletion file_store/src/traits/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod msg_timestamp;
mod msg_verify;
mod report_id;

pub use file_sink_write::FileSinkWriteExt;
pub use file_sink_write::{FileSinkWriteExt, DEFAULT_ROLL_TIME};
pub use msg_bytes::MsgBytes;
pub use msg_decode::MsgDecode;
pub use msg_timestamp::{MsgTimestamp, TimestampDecode, TimestampEncode};
Expand Down
19 changes: 9 additions & 10 deletions ingest/src/server_iot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,23 +362,22 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let store_base_path = Path::new(&settings.cache);

// iot beacon reports
let (beacon_report_sink, beacon_report_sink_server) = LoraBeaconIngestReportV1::file_sink_opts(
let (beacon_report_sink, beacon_report_sink_server) = LoraBeaconIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(Duration::from_secs(5 * 60)),
)
.await?;

// iot witness reports
let (witness_report_sink, witness_report_sink_server) =
LoraWitnessIngestReportV1::file_sink_opts(
store_base_path,
file_upload.clone(),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(Duration::from_secs(5 * 60)),
)
.await?;
let (witness_report_sink, witness_report_sink_server) = LoraWitnessIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
)
.await?;

let grpc_server = GrpcServer {
beacon_report_sink,
Expand Down
47 changes: 23 additions & 24 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,84 +379,83 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
let store_base_path = Path::new(&settings.cache);

let (heartbeat_report_sink, heartbeat_report_sink_server) =
CellHeartbeatIngestReportV1::file_sink_opts(
CellHeartbeatIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(settings.roll_time),
)
.await?;

let (wifi_heartbeat_report_sink, wifi_heartbeat_report_sink_server) =
WifiHeartbeatIngestReportV1::file_sink_opts(
WifiHeartbeatIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(settings.roll_time),
)
.await?;

// speedtests
let (speedtest_report_sink, speedtest_report_sink_server) =
SpeedtestIngestReportV1::file_sink_opts(
store_base_path,
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_speedtest_report"),
|builder| builder.roll_time(settings.roll_time),
)
.await?;
let (speedtest_report_sink, speedtest_report_sink_server) = SpeedtestIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
)
.await?;

let (data_transfer_session_sink, data_transfer_session_sink_server) =
DataTransferSessionIngestReportV1::file_sink_opts(
DataTransferSessionIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(settings.roll_time),
)
.await?;

let (subscriber_location_report_sink, subscriber_location_report_sink_server) =
SubscriberLocationIngestReportV1::file_sink_opts(
SubscriberLocationIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(settings.roll_time),
)
.await?;

let (radio_threshold_report_sink, radio_threshold_report_sink_server) =
RadioThresholdIngestReportV1::file_sink_opts(
RadioThresholdIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(settings.roll_time),
)
.await?;

let (invalidated_radio_threshold_report_sink, invalidated_radio_threshold_report_sink_server) =
InvalidatedRadioThresholdIngestReportV1::file_sink_opts(
InvalidatedRadioThresholdIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(settings.roll_time),
)
.await?;

let (coverage_object_report_sink, coverage_object_report_sink_server) =
CoverageObjectIngestReportV1::file_sink_opts(
CoverageObjectIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(settings.roll_time),
)
.await?;

let (sp_boosted_rewards_ban_sink, sp_boosted_rewards_ban_sink_server) =
ServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink_opts(
ServiceProviderBoostedRewardsBannedRadioIngestReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(settings.roll_time),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(settings.roll_time),
)
.await?;

Expand Down
24 changes: 16 additions & 8 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use file_store::{
file_sink::FileSinkClient,
file_source, file_upload,
iot_packet::PacketRouterPacketReport,
traits::FileSinkWriteExt,
traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME},
FileStore, FileType,
};
use futures_util::TryFutureExt;
Expand Down Expand Up @@ -138,13 +138,21 @@ impl Cmd {
let store_base_path = std::path::Path::new(&settings.cache);

// Verified packets:
let (valid_packets, valid_packets_server) =
ValidPacket::file_sink(store_base_path, file_upload.clone(), env!("CARGO_PKG_NAME"))
.await?;

let (invalid_packets, invalid_packets_server) =
InvalidPacket::file_sink(store_base_path, file_upload.clone(), env!("CARGO_PKG_NAME"))
.await?;
let (valid_packets, valid_packets_server) = ValidPacket::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
env!("CARGO_PKG_NAME"),
)
.await?;

let (invalid_packets, invalid_packets_server) = InvalidPacket::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
env!("CARGO_PKG_NAME"),
)
.await?;

let org_client = Arc::new(Mutex::new(CachedOrgClient::new(OrgClient::from_settings(
&settings.iot_config_client,
Expand Down
46 changes: 30 additions & 16 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ use crate::entropy_loader::EntropyLoader;
use anyhow::Result;
use clap::Parser;
use file_store::{
entropy_report::EntropyReport, file_info_poller::LookbackBehavior, file_source, file_upload,
iot_packet::IotValidPacket, traits::FileSinkWriteExt, FileStore, FileType,
entropy_report::EntropyReport,
file_info_poller::LookbackBehavior,
file_source, file_upload,
iot_packet::IotValidPacket,
traits::{FileSinkWriteExt, DEFAULT_ROLL_TIME},
FileStore, FileType,
};
use helium_proto::{
services::poc_lora::{
Expand Down Expand Up @@ -117,14 +121,22 @@ impl Server {
// *

// Gateway reward shares sink
let (rewards_sink, gateway_rewards_sink_server) =
IotRewardShare::file_sink(store_base_path, file_upload.clone(), env!("CARGO_PKG_NAME"))
.await?;
let (rewards_sink, gateway_rewards_sink_server) = IotRewardShare::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
env!("CARGO_PKG_NAME"),
)
.await?;

// Reward manifest
let (reward_manifests_sink, reward_manifests_sink_server) =
RewardManifest::file_sink(store_base_path, file_upload.clone(), env!("CARGO_PKG_NAME"))
.await?;
let (reward_manifests_sink, reward_manifests_sink_server) = RewardManifest::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
env!("CARGO_PKG_NAME"),
)
.await?;

let rewarder = Rewarder {
pool: pool.clone(),
Expand Down Expand Up @@ -162,11 +174,11 @@ impl Server {
// *

let (non_rewardable_packet_sink, non_rewardable_packet_sink_server) =
NonRewardablePacket::file_sink_opts(
NonRewardablePacket::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(Duration::from_secs(5 * 60)),
)
.await?;

Expand Down Expand Up @@ -198,6 +210,7 @@ impl Server {
LoraInvalidBeaconReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -206,6 +219,7 @@ impl Server {
LoraInvalidWitnessReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(DEFAULT_ROLL_TIME),
env!("CARGO_PKG_NAME"),
)
.await?;
Expand All @@ -226,28 +240,28 @@ impl Server {
// *

let (runner_invalid_beacon_sink, runner_invalid_beacon_sink_server) =
LoraInvalidBeaconReportV1::file_sink_opts(
LoraInvalidBeaconReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(Duration::from_secs(5 * 60)),
)
.await?;

let (runner_invalid_witness_sink, runner_invalid_witness_sink_server) =
LoraInvalidWitnessReportV1::file_sink_opts(
LoraInvalidWitnessReportV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(5 * 60)),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(Duration::from_secs(5 * 60)),
)
.await?;

let (runner_poc_sink, runner_poc_sink_server) = LoraPocV1::file_sink_opts(
let (runner_poc_sink, runner_poc_sink_server) = LoraPocV1::file_sink(
store_base_path,
file_upload.clone(),
Some(Duration::from_secs(2 * 60)),
env!("CARGO_PKG_NAME"),
|builder| builder.roll_time(Duration::from_secs(2 * 60)),
)
.await?;

Expand Down
Loading

0 comments on commit 7fa21ba

Please sign in to comment.