Skip to content

Commit

Permalink
Make roll_time a std::time::Duration (#820)
Browse files Browse the repository at this point in the history
* Make roll_time a `std::time::Duration`

* Update boost_manager/src/main.rs

Co-authored-by: Michael Jeffrey <[email protected]>

---------

Co-authored-by: Michael Jeffrey <[email protected]>
  • Loading branch information
macpie and michaeldjeffrey authored Jun 3, 2024
1 parent b3585cd commit c44d5d0
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 58 deletions.
8 changes: 5 additions & 3 deletions boost_manager/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ use boost_manager::{
activator::Activator, purger::Purger, settings::Settings, telemetry, updater::Updater,
watcher::Watcher,
};
use chrono::Duration;
use clap::Parser;
use file_store::{
file_info_poller::LookbackBehavior, file_sink, file_source, file_upload,
reward_manifest::RewardManifest, FileStore, FileType,
};
use mobile_config::client::hex_boosting_client::HexBoostingClient;
use solana::start_boost::SolanaRpc;
use std::path::{self, PathBuf};
use std::{
path::{self, PathBuf},
time::Duration,
};
use task_manager::TaskManager;

#[derive(Debug, clap::Parser)]
Expand Down Expand Up @@ -103,7 +105,7 @@ impl Server {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_boosted_hex_update"),
)
.roll_time(Duration::minutes(5))
.roll_time(Duration::from_secs(5 * 60))
.create()
.await?;

Expand Down
21 changes: 9 additions & 12 deletions file_store/src/file_sink.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{file_upload::FileUpload, Error, Result};
use async_compression::tokio::write::GzipEncoder;
use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Utc};
use futures::{future::LocalBoxFuture, SinkExt, TryFutureExt};
use metrics::Label;
use std::time::Duration;
use std::{
io, mem,
path::{Path, PathBuf},
Expand All @@ -20,12 +21,12 @@ use tokio::{
};
use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedWrite};

pub const DEFAULT_SINK_ROLL_MINS: i64 = 3;
pub const DEFAULT_SINK_ROLL_SECS: u64 = 3 * 60;

#[cfg(not(test))]
pub const SINK_CHECK_MILLIS: i64 = 60_000;
pub const SINK_CHECK_MILLIS: u64 = 60_000;
#[cfg(test)]
pub const SINK_CHECK_MILLIS: i64 = 50;
pub const SINK_CHECK_MILLIS: u64 = 50;

pub const MAX_FRAME_LENGTH: usize = 15_000_000;

Expand Down Expand Up @@ -80,7 +81,7 @@ impl FileSinkBuilder {
target_path: target_path.to_path_buf(),
tmp_path: target_path.join("tmp"),
max_size: 50_000_000,
roll_time: Duration::minutes(DEFAULT_SINK_ROLL_MINS),
roll_time: Duration::from_secs(DEFAULT_SINK_ROLL_SECS),
file_upload,
auto_commit: true,
metric,
Expand Down Expand Up @@ -340,11 +341,7 @@ impl FileSink {
self.target_path.display()
);

let mut rollover_timer = time::interval(
Duration::milliseconds(SINK_CHECK_MILLIS)
.to_std()
.expect("valid sink roll time"),
);
let mut rollover_timer = time::interval(Duration::from_millis(SINK_CHECK_MILLIS));
rollover_timer.set_missed_tick_behavior(time::MissedTickBehavior::Burst);

loop {
Expand Down Expand Up @@ -548,7 +545,7 @@ mod tests {
file_upload,
"fake_metric",
)
.roll_time(chrono::Duration::milliseconds(100))
.roll_time(Duration::from_millis(100))
.create()
.await
.expect("failed to create file sink");
Expand Down Expand Up @@ -596,7 +593,7 @@ mod tests {
file_upload,
"fake_metric",
)
.roll_time(chrono::Duration::milliseconds(100))
.roll_time(Duration::from_millis(100))
.auto_commit(false)
.create()
.await
Expand Down
8 changes: 4 additions & 4 deletions ingest/src/server_iot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::Settings;
use anyhow::{Error, Result};
use chrono::{Duration, Utc};
use chrono::Utc;
use file_store::{
file_sink::{self, FileSinkClient},
file_upload,
Expand All @@ -19,7 +19,7 @@ use helium_proto::services::poc_lora::{
LoraStreamSessionInitV1, LoraStreamSessionOfferV1, LoraWitnessIngestReportV1,
LoraWitnessReportReqV1, LoraWitnessReportRespV1,
};
use std::{convert::TryFrom, net::SocketAddr, path::Path};
use std::{convert::TryFrom, net::SocketAddr, path::Path, time::Duration};
use task_manager::{ManagedTask, TaskManager};
use tokio::{sync::mpsc::Sender, time::Instant};
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -369,7 +369,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_beacon_report"),
)
.roll_time(Duration::minutes(5))
.roll_time(Duration::from_secs(5 * 60))
.create()
.await?;

Expand All @@ -380,7 +380,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_witness_report"),
)
.roll_time(Duration::minutes(5))
.roll_time(Duration::from_secs(5 * 60))
.create()
.await?;

Expand Down
20 changes: 9 additions & 11 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::Settings;
use anyhow::{bail, Error, Result};
use chrono::{Duration, Utc};
use chrono::Utc;
use file_store::{
file_sink::{self, FileSinkClient},
file_upload,
Expand All @@ -27,8 +27,6 @@ use tonic::{
transport, Request, Response, Status,
};

const INGEST_WAIT_DURATION_MINUTES: i64 = 15;

pub type GrpcResult<T> = std::result::Result<Response<T>, Status>;
pub type VerifyResult<T> = std::result::Result<T, Status>;

Expand Down Expand Up @@ -354,7 +352,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_heartbeat_report"),
)
.roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES))
.roll_time(settings.roll_time)
.create()
.await?;

Expand All @@ -365,7 +363,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_wifi_heartbeat_report"),
)
.roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES))
.roll_time(settings.roll_time)
.create()
.await?;

Expand All @@ -376,7 +374,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_speedtest_report"),
)
.roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES))
.roll_time(settings.roll_time)
.create()
.await?;

Expand All @@ -390,7 +388,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
"_mobile_data_transfer_session_report"
),
)
.roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES))
.roll_time(settings.roll_time)
.create()
.await?;

Expand All @@ -401,7 +399,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_subscriber_location_report"),
)
.roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES))
.roll_time(settings.roll_time)
.create()
.await?;

Expand All @@ -412,7 +410,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_radio_threshold_ingest_report"),
)
.roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES))
.roll_time(settings.roll_time)
.create()
.await?;

Expand All @@ -426,7 +424,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
"_invalidated_radio_threshold_ingest_report"
),
)
.roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES))
.roll_time(settings.roll_time)
.create()
.await?;

Expand All @@ -437,7 +435,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_coverage_object_report"),
)
.roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES))
.roll_time(settings.roll_time)
.create()
.await?;

Expand Down
7 changes: 7 additions & 0 deletions ingest/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,20 @@ pub struct Settings {
/// Settings for exposed public API
/// Target bucket for uploads
pub output: file_store::Settings,
/// Timeout of session key session in seconds
#[serde(with = "humantime_serde", default = "default_roll_time")]
pub roll_time: Duration,
/// API token required as part of a Bearer authentication GRPC request
/// header. Used only by the mobile mode currently
pub token: Option<String>,
/// Target output bucket details Metrics settings
pub metrics: poc_metrics::Settings,
}

fn default_roll_time() -> Duration {
humantime::parse_duration("15 minutes").unwrap()
}

fn default_session_key_timeout() -> Duration {
humantime::parse_duration("30 minutes").unwrap()
}
Expand Down
11 changes: 5 additions & 6 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::entropy_loader::EntropyLoader;
use anyhow::Result;
use chrono::Duration as ChronoDuration;
use clap::Parser;
use file_store::{
entropy_report::EntropyReport, file_info_poller::LookbackBehavior, file_sink, file_source,
Expand All @@ -13,7 +12,7 @@ use iot_verifier::{
tx_scaler::Server as DensityScaler, witness_updater::WitnessUpdater, Settings,
};
use price::PriceTracker;
use std::path;
use std::{path, time::Duration};
use task_manager::TaskManager;

#[derive(Debug, clap::Parser)]
Expand Down Expand Up @@ -175,7 +174,7 @@ impl Server {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_non_rewardable_packet"),
)
.roll_time(ChronoDuration::minutes(5))
.roll_time(Duration::from_secs(5 * 60))
.create()
.await?;

Expand Down Expand Up @@ -248,7 +247,7 @@ impl Server {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon_report"),
)
.roll_time(ChronoDuration::minutes(5))
.roll_time(Duration::from_secs(5 * 60))
.create()
.await?;

Expand All @@ -259,7 +258,7 @@ impl Server {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"),
)
.roll_time(ChronoDuration::minutes(5))
.roll_time(Duration::from_secs(5 * 60))
.create()
.await?;

Expand All @@ -269,7 +268,7 @@ impl Server {
file_upload.clone(),
concat!(env!("CARGO_PKG_NAME"), "_valid_poc"),
)
.roll_time(ChronoDuration::minutes(2))
.roll_time(Duration::from_secs(2 * 60))
.create()
.await?;

Expand Down
2 changes: 1 addition & 1 deletion mobile_verifier/src/boosting_oracles/data_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl DataSetDownloaderDaemon<Footfall, Landtype, Urbanization> {
concat!(env!("CARGO_PKG_NAME"), "_oracle_boosting_report"),
)
.auto_commit(true)
.roll_time(chrono::Duration::minutes(15))
.roll_time(Duration::from_secs(15 * 60))
.create()
.await?;

Expand Down
9 changes: 5 additions & 4 deletions mobile_verifier/src/cli/server.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::time::Duration;

use crate::{
boosting_oracles::DataSetDownloaderDaemon,
coverage::{new_coverage_object_notification_channel, CoverageDaemon},
Expand All @@ -11,7 +13,6 @@ use crate::{
telemetry, Settings,
};
use anyhow::Result;
use chrono::Duration;
use file_store::{file_sink, file_upload, FileStore, FileType};
use mobile_config::client::{
entity_client::EntityClient, hex_boosting_client::HexBoostingClient, AuthorizationClient,
Expand Down Expand Up @@ -52,7 +53,7 @@ impl Cmd {
concat!(env!("CARGO_PKG_NAME"), "_heartbeat"),
)
.auto_commit(false)
.roll_time(Duration::minutes(15))
.roll_time(Duration::from_secs(15 * 60))
.create()
.await?;

Expand All @@ -64,7 +65,7 @@ impl Cmd {
concat!(env!("CARGO_PKG_NAME"), "_seniority_update"),
)
.auto_commit(false)
.roll_time(Duration::minutes(15))
.roll_time(Duration::from_secs(15 * 60))
.create()
.await?;

Expand All @@ -75,7 +76,7 @@ impl Cmd {
concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"),
)
.auto_commit(false)
.roll_time(Duration::minutes(15))
.roll_time(Duration::from_secs(15 * 60))
.create()
.await?;

Expand Down
6 changes: 3 additions & 3 deletions mobile_verifier/src/coverage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
heartbeats::{HbType, KeyType, OwnedKeyType},
IsAuthorized, Settings,
};
use chrono::{DateTime, Duration, Utc};
use chrono::{DateTime, Utc};
use file_store::{
coverage::{self, CoverageObjectIngestReport},
file_info_poller::{FileInfoStream, LookbackBehavior},
Expand Down Expand Up @@ -38,7 +38,7 @@ use std::{
num::NonZeroU32,
pin::pin,
sync::Arc,
time::Instant,
time::{Duration, Instant},
};
use task_manager::{ManagedTask, TaskManager};
use tokio::sync::mpsc::{channel, Receiver, Sender};
Expand Down Expand Up @@ -89,7 +89,7 @@ impl CoverageDaemon {
concat!(env!("CARGO_PKG_NAME"), "_coverage_object"),
)
.auto_commit(false)
.roll_time(Duration::minutes(15))
.roll_time(Duration::from_secs(15 * 60))
.create()
.await?;

Expand Down
Loading

0 comments on commit c44d5d0

Please sign in to comment.