From f8b014e2431b20b70e0f5040940e75a4297a4cb7 Mon Sep 17 00:00:00 2001 From: Andrew McKenzie Date: Mon, 21 Aug 2023 15:06:15 +0100 Subject: [PATCH] update oracles to use refactored filestore apis --- file_store/src/cli/dump.rs | 16 +++- file_store/src/error.rs | 2 +- file_store/src/file_info_poller.rs | 77 +++++++++++++----- file_store/src/file_sink.rs | 112 ++++++++++++++++----------- file_store/src/file_source.rs | 6 +- file_store/src/lib.rs | 3 - ingest/src/server_iot.rs | 14 ++-- ingest/src/server_mobile.rs | 65 ++++++++-------- iot_packet_verifier/src/daemon.rs | 14 ++-- iot_packet_verifier/src/verifier.rs | 2 +- iot_verifier/src/main.rs | 33 ++++---- iot_verifier/src/packet_loader.rs | 11 +-- iot_verifier/src/purger.rs | 56 +++++++------- iot_verifier/src/runner.rs | 19 +++-- mobile_packet_verifier/src/daemon.rs | 21 ++--- mobile_verifier/src/cli/server.rs | 65 ++++++++-------- poc_entropy/src/main.rs | 7 +- price/src/main.rs | 5 +- reward_index/src/main.rs | 7 +- 19 files changed, 302 insertions(+), 233 deletions(-) diff --git a/file_store/src/cli/dump.rs b/file_store/src/cli/dump.rs index 4d2f746ef..0622f3fea 100644 --- a/file_store/src/cli/dump.rs +++ b/file_store/src/cli/dump.rs @@ -16,7 +16,10 @@ use helium_crypto::PublicKey; use helium_proto::{ services::{ packet_verifier::ValidDataTransferSession as ValidDataTransferSessionProto, - poc_lora::{LoraBeaconIngestReportV1, LoraPocV1, LoraWitnessIngestReportV1}, + poc_lora::{ + LoraBeaconIngestReportV1, LoraInvalidWitnessReportV1, LoraPocV1, + LoraWitnessIngestReportV1, + }, poc_mobile::{ mobile_reward_share::Reward, CellHeartbeatIngestReportV1, CellHeartbeatReqV1, Heartbeat, InvalidDataTransferIngestReportV1, MobileRewardShare, RadioRewardShare, @@ -131,6 +134,17 @@ impl Cmd { print_json(&json)?; // wtr.serialize(IotWitnessIngestReport::try_from(dec_msg)?)?; } + FileType::IotInvalidWitnessReport => { + let dec_msg = LoraInvalidWitnessReportV1::decode(msg)?; + let json = json!({ + "received_timestamp": dec_msg.received_timestamp, + "reason": dec_msg.reason + }); + // TODO: tmp dump out as json + // printing to json here as csv serializing failing due on header generation from struct + print_json(&json)?; + // wtr.serialize(IotWitnessIngestReport::try_from(dec_msg)?)?; + } FileType::IotPoc => { let dec_msg = LoraPocV1::decode(msg)?; let json = json!({ diff --git a/file_store/src/error.rs b/file_store/src/error.rs index 8500c1d4e..9551fa1cf 100644 --- a/file_store/src/error.rs +++ b/file_store/src/error.rs @@ -33,7 +33,7 @@ pub enum Error { #[error("shutting down")] Shutdown, #[error("error building file info poller")] - FileInfoPollerError(#[from] crate::file_info_poller_tm::FileInfoPollerConfigBuilderError), + FileInfoPollerError(#[from] crate::file_info_poller::FileInfoPollerConfigBuilderError), } #[derive(Error, Debug)] diff --git a/file_store/src/file_info_poller.rs b/file_store/src/file_info_poller.rs index 92d2e1376..6643bf917 100644 --- a/file_store/src/file_info_poller.rs +++ b/file_store/src/file_info_poller.rs @@ -1,9 +1,10 @@ use crate::{traits::MsgDecode, Error, FileInfo, FileStore, FileType, Result}; use chrono::{DateTime, Duration, TimeZone, Utc}; use derive_builder::Builder; -use futures::{stream::BoxStream, StreamExt}; +use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt, TryFutureExt}; use retainer::Cache; use std::marker::PhantomData; +use task_manager::ManagedTask; use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender}; const DEFAULT_POLL_DURATION_SECS: i64 = 30; @@ -39,7 +40,8 @@ pub enum LookbackBehavior { } #[derive(Debug, Clone, Builder)] -pub struct FileInfoPoller { +#[builder(pattern = "owned")] +pub struct FileInfoPollerConfig { #[builder(default = "Duration::seconds(DEFAULT_POLL_DURATION_SECS)")] poll_duration: Duration, db: sqlx::Pool, @@ -54,35 +56,65 @@ pub struct FileInfoPoller { p: PhantomData, } -impl FileInfoPoller +#[derive(Debug, Clone)] +pub struct FileInfoPollerServer { + config: FileInfoPollerConfig, + sender: Sender>, +} + +impl FileInfoPollerConfigBuilder +where + T: Clone, +{ + pub fn create(self) -> Result<(Receiver>, FileInfoPollerServer)> { + let config = self.build().unwrap(); // TODO: fix the unwrap + let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_size); + Ok((receiver, FileInfoPollerServer { config, sender })) + } +} + +impl ManagedTask for FileInfoPollerServer +where + T: MsgDecode + TryFrom + Send + Sync + 'static, +{ + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + +impl FileInfoPollerServer where T: MsgDecode + TryFrom + Send + Sync + 'static, { pub async fn start( self, shutdown: triggered::Listener, - ) -> Result<( - Receiver>, - impl std::future::Future, - )> { - let (sender, receiver) = tokio::sync::mpsc::channel(self.queue_size); - let join_handle = tokio::spawn(async move { self.run(shutdown, sender).await }); - - Ok((receiver, async move { + ) -> Result> { + let join_handle = tokio::spawn(async move { self.run(shutdown).await }); + Ok(async move { match join_handle.await { Ok(Ok(())) => Ok(()), Ok(Err(err)) => Err(err), Err(err) => Err(Error::from(err)), } - })) + }) } - async fn run(self, shutdown: triggered::Listener, sender: Sender>) -> Result { + async fn run(self, shutdown: triggered::Listener) -> Result { let cache = create_cache(); let mut poll_trigger = tokio::time::interval(self.poll_duration()); let mut cleanup_trigger = tokio::time::interval(CLEAN_DURATION); - let mut latest_ts = db::latest_ts(&self.db, self.file_type).await?; + let mut latest_ts = db::latest_ts(&self.config.db, self.config.file_type).await?; loop { let after = self.after(latest_ts); @@ -95,10 +127,10 @@ where } _ = cleanup_trigger.tick() => self.clean(&cache).await?, _ = poll_trigger.tick() => { - let files = self.store.list_all(self.file_type.to_str(), after, before).await?; + let files = self.config.store.list_all(self.config.file_type.to_str(), after, before).await?; for file in files { - if !is_already_processed(&self.db, &cache, &file).await? { - if send_stream(&sender, &self.store, file.clone()).await? { + if !is_already_processed(&self.config.db, &cache, &file).await? { + if send_stream(&self.sender, &self.config.store, file.clone()).await? { latest_ts = Some(file.timestamp); cache_file(&cache, &file).await; } else { @@ -114,8 +146,8 @@ where } fn after(&self, latest: Option>) -> DateTime { - let latest_offset = latest.map(|lt| lt - self.offset); - match self.lookback { + let latest_offset = latest.map(|lt| lt - self.config.offset); + match self.config.lookback { LookbackBehavior::StartAfter(start_after) => latest_offset.unwrap_or(start_after), LookbackBehavior::Max(max_lookback) => { let max_ts = Utc::now() - max_lookback; @@ -126,12 +158,15 @@ where async fn clean(&self, cache: &MemoryFileCache) -> Result { cache.purge(4, 0.25).await; - db::clean(&self.db, &self.file_type).await?; + db::clean(&self.config.db, &self.config.file_type).await?; Ok(()) } fn poll_duration(&self) -> std::time::Duration { - self.poll_duration.to_std().unwrap_or(DEFAULT_POLL_DURATION) + self.config + .poll_duration + .to_std() + .unwrap_or(DEFAULT_POLL_DURATION) } } diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index dfc8de60a..7fa334863 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -1,13 +1,17 @@ -use crate::{file_upload, Error, Result}; +use crate::{ + file_upload::{self, FileUpload}, + Error, Result, +}; use async_compression::tokio::write::GzipEncoder; use bytes::Bytes; use chrono::{DateTime, Duration, Utc}; -use futures::SinkExt; +use futures::{future::LocalBoxFuture, SinkExt, TryFutureExt}; use metrics::Label; use std::{ io, mem, path::{Path, PathBuf}, }; +use task_manager::ManagedTask; use tokio::{ fs::{self, File, OpenOptions}, io::{AsyncWriteExt, BufWriter}, @@ -63,18 +67,13 @@ pub struct FileSinkBuilder { max_size: usize, roll_time: Duration, deposits: Option, + file_upload: Option, auto_commit: bool, metric: &'static str, - shutdown_listener: triggered::Listener, } impl FileSinkBuilder { - pub fn new( - prefix: impl ToString, - target_path: &Path, - metric: &'static str, - shutdown_listener: triggered::Listener, - ) -> Self { + pub fn new(prefix: impl ToString, target_path: &Path, metric: &'static str) -> Self { Self { prefix: prefix.to_string(), target_path: target_path.to_path_buf(), @@ -82,9 +81,23 @@ impl FileSinkBuilder { max_size: 50_000_000, roll_time: Duration::minutes(DEFAULT_SINK_ROLL_MINS), deposits: None, + file_upload: None, + auto_commit: true, + metric, + } + } + + pub fn new_tm(prefix: impl ToString, target_path: &Path, metric: &'static str) -> Self { + Self { + prefix: prefix.to_string(), + target_path: target_path.to_path_buf(), + tmp_path: target_path.join("tmp"), + max_size: 50_000_000, + roll_time: Duration::minutes(DEFAULT_SINK_ROLL_MINS), + deposits: None, + file_upload: None, auto_commit: true, metric, - shutdown_listener, } } @@ -110,6 +123,13 @@ impl FileSinkBuilder { Self { deposits, ..self } } + pub fn file_upload(self, file_upload: Option) -> Self { + Self { + file_upload, + ..self + } + } + pub fn auto_commit(self, auto_commit: bool) -> Self { Self { auto_commit, @@ -130,7 +150,6 @@ impl FileSinkBuilder { let client = FileSinkClient { sender: tx, metric: self.metric, - shutdown_listener: self.shutdown_listener.clone(), }; metrics::register_counter!(client.metric, vec![OK_LABEL]); @@ -141,12 +160,12 @@ impl FileSinkBuilder { prefix: self.prefix, max_size: self.max_size, deposits: self.deposits, + file_upload: self.file_upload, roll_time: self.roll_time, messages: rx, staged_files: Vec::new(), auto_commit: self.auto_commit, active_sink: None, - shutdown_listener: self.shutdown_listener, }; sink.init().await?; Ok((client, sink)) @@ -157,7 +176,6 @@ impl FileSinkBuilder { pub struct FileSinkClient { sender: MessageSender, metric: &'static str, - shutdown_listener: triggered::Listener, } const OK_LABEL: Label = Label::from_static_parts("status", "ok"); @@ -173,11 +191,7 @@ impl FileSinkClient { let (on_write_tx, on_write_rx) = oneshot::channel(); let bytes = item.encode_to_vec(); let labels = labels.into_iter().map(Label::from); - tokio::select! { - _ = self.shutdown_listener.clone() => { - Err(Error::Shutdown) - } result = self.sender.send_timeout(Message::Data(on_write_tx, bytes), SEND_TIMEOUT) => match result { Ok(_) => { metrics::increment_counter!( @@ -242,11 +256,11 @@ pub struct FileSink { messages: MessageReceiver, deposits: Option, + file_upload: Option, staged_files: Vec, auto_commit: bool, active_sink: Option, - shutdown_listener: triggered::Listener, } #[derive(Debug)] @@ -263,6 +277,21 @@ impl ActiveSink { } } +impl ManagedTask for FileSink { + fn start_task( + self: Box, + shutdown: triggered::Listener, + ) -> LocalBoxFuture<'static, anyhow::Result<()>> { + let handle = tokio::spawn(self.run(shutdown)); + + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) + } +} + impl FileSink { async fn init(&mut self) -> Result { fs::create_dir_all(&self.target_path).await?; @@ -309,7 +338,7 @@ impl FileSink { Ok(()) } - pub async fn run(&mut self) -> Result { + pub async fn run(mut self, shutdown: triggered::Listener) -> Result { tracing::info!( "starting file sink {} in {}", self.prefix, @@ -325,7 +354,7 @@ impl FileSink { loop { tokio::select! { - _ = self.shutdown_listener.clone() => break, + _ = shutdown.clone() => break, _ = rollover_timer.tick() => self.maybe_roll().await?, msg = self.messages.recv() => match msg { Some(Message::Data(on_write_tx, bytes)) => { @@ -449,6 +478,9 @@ impl FileSink { if let Some(deposits) = &self.deposits { file_upload::upload_file(deposits, &target_path).await?; } + if let Some(file_upload) = &self.file_upload { + file_upload.upload_file(&target_path).await?; + }; Ok(()) } @@ -514,20 +546,16 @@ mod tests { let tmp_dir = TempDir::new().expect("Unable to create temp dir"); let (shutdown_trigger, shutdown_listener) = triggered::trigger(); - let (file_sink_client, mut file_sink_server) = FileSinkBuilder::new( - FileType::EntropyReport, - tmp_dir.path(), - "fake_metric", - shutdown_listener.clone(), - ) - .roll_time(chrono::Duration::milliseconds(100)) - .create() - .await - .expect("failed to create file sink"); + let (file_sink_client, file_sink_server) = + FileSinkBuilder::new(FileType::EntropyReport, tmp_dir.path(), "fake_metric") + .roll_time(chrono::Duration::milliseconds(100)) + .create() + .await + .expect("failed to create file sink"); let sink_thread = tokio::spawn(async move { file_sink_server - .run() + .run(shutdown_listener.clone()) .await .expect("failed to complete file sink"); }); @@ -559,22 +587,18 @@ mod tests { let (shutdown_trigger, shutdown_listener) = triggered::trigger(); let (file_upload_tx, mut file_upload_rx) = file_upload::message_channel(); - let (file_sink_client, mut file_sink_server) = FileSinkBuilder::new( - FileType::EntropyReport, - tmp_dir.path(), - "fake_metric", - shutdown_listener.clone(), - ) - .roll_time(chrono::Duration::milliseconds(100)) - .auto_commit(false) - .deposits(Some(file_upload_tx)) - .create() - .await - .expect("failed to create file sink"); + let (file_sink_client, file_sink_server) = + FileSinkBuilder::new(FileType::EntropyReport, tmp_dir.path(), "fake_metric") + .roll_time(chrono::Duration::milliseconds(100)) + .auto_commit(false) + .deposits(Some(file_upload_tx)) + .create() + .await + .expect("failed to create file sink"); let sink_thread = tokio::spawn(async move { file_sink_server - .run() + .run(shutdown_listener.clone()) .await .expect("failed to complete file sink"); }); diff --git a/file_store/src/file_source.rs b/file_store/src/file_source.rs index e1d75972d..59d7c7279 100644 --- a/file_store/src/file_source.rs +++ b/file_store/src/file_source.rs @@ -1,4 +1,4 @@ -use crate::{file_info_poller::FileInfoPollerBuilder, file_sink, BytesMutStream, Error}; +use crate::{file_info_poller::FileInfoPollerConfigBuilder, file_sink, BytesMutStream, Error}; use async_compression::tokio::bufread::GzipDecoder; use futures::{ stream::{self}, @@ -8,11 +8,11 @@ use std::path::{Path, PathBuf}; use tokio::{fs::File, io::BufReader}; use tokio_util::codec::{length_delimited::LengthDelimitedCodec, FramedRead}; -pub fn continuous_source() -> FileInfoPollerBuilder +pub fn continuous_source() -> FileInfoPollerConfigBuilder where T: Clone, { - FileInfoPollerBuilder::::default() + FileInfoPollerConfigBuilder::::default() } pub fn source(paths: I) -> BytesMutStream diff --git a/file_store/src/lib.rs b/file_store/src/lib.rs index c456957b7..338ef9f68 100644 --- a/file_store/src/lib.rs +++ b/file_store/src/lib.rs @@ -3,11 +3,8 @@ pub mod entropy_report; mod error; mod file_info; pub mod file_info_poller; -pub mod file_info_poller_tm; pub mod file_sink; -pub mod file_sink_tm; pub mod file_source; -pub mod file_source_tm; pub mod file_store; pub mod file_upload; pub mod heartbeat; diff --git a/ingest/src/server_iot.rs b/ingest/src/server_iot.rs index 3c124c30b..4807d85e2 100644 --- a/ingest/src/server_iot.rs +++ b/ingest/src/server_iot.rs @@ -119,11 +119,10 @@ pub async fn grpc_server(shutdown: triggered::Listener, settings: &Settings) -> let store_base_path = Path::new(&settings.cache); // iot beacon reports - let (beacon_report_sink, mut beacon_report_sink_server) = file_sink::FileSinkBuilder::new( + let (beacon_report_sink, beacon_report_sink_server) = file_sink::FileSinkBuilder::new( FileType::IotBeaconIngestReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_beacon_report"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(Duration::minutes(5)) @@ -131,11 +130,10 @@ pub async fn grpc_server(shutdown: triggered::Listener, settings: &Settings) -> .await?; // iot witness reports - let (witness_report_sink, mut witness_report_sink_server) = file_sink::FileSinkBuilder::new( + let (witness_report_sink, witness_report_sink_server) = file_sink::FileSinkBuilder::new( FileType::IotWitnessIngestReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_witness_report"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(Duration::minutes(5)) @@ -157,8 +155,12 @@ pub async fn grpc_server(shutdown: triggered::Listener, settings: &Settings) -> tokio::try_join!( server, - beacon_report_sink_server.run().map_err(Error::from), - witness_report_sink_server.run().map_err(Error::from), + beacon_report_sink_server + .run(shutdown.clone()) + .map_err(Error::from), + witness_report_sink_server + .run(shutdown.clone()) + .map_err(Error::from), file_upload.run(&shutdown).map_err(Error::from), ) .map(|_| ()) diff --git a/ingest/src/server_mobile.rs b/ingest/src/server_mobile.rs index 7a527350e..931d98850 100644 --- a/ingest/src/server_mobile.rs +++ b/ingest/src/server_mobile.rs @@ -210,32 +210,28 @@ pub async fn grpc_server(shutdown: triggered::Listener, settings: &Settings) -> let store_base_path = Path::new(&settings.cache); - let (heartbeat_report_sink, mut heartbeat_report_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::CellHeartbeatIngestReport, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_heartbeat_report"), - shutdown.clone(), - ) - .deposits(Some(file_upload_tx.clone())) - .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) - .create() - .await?; + let (heartbeat_report_sink, heartbeat_report_sink_server) = file_sink::FileSinkBuilder::new( + FileType::CellHeartbeatIngestReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_heartbeat_report"), + ) + .deposits(Some(file_upload_tx.clone())) + .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .create() + .await?; // speedtests - let (speedtest_report_sink, mut speedtest_report_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::CellSpeedtestIngestReport, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_speedtest_report"), - shutdown.clone(), - ) - .deposits(Some(file_upload_tx.clone())) - .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) - .create() - .await?; + let (speedtest_report_sink, speedtest_report_sink_server) = file_sink::FileSinkBuilder::new( + FileType::CellSpeedtestIngestReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_speedtest_report"), + ) + .deposits(Some(file_upload_tx.clone())) + .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) + .create() + .await?; - let (data_transfer_session_sink, mut data_transfer_session_sink_server) = + let (data_transfer_session_sink, data_transfer_session_sink_server) = file_sink::FileSinkBuilder::new( FileType::DataTransferSessionIngestReport, store_base_path, @@ -243,31 +239,28 @@ pub async fn grpc_server(shutdown: triggered::Listener, settings: &Settings) -> env!("CARGO_PKG_NAME"), "_mobile_data_transfer_session_report" ), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) .create() .await?; - let (subscriber_location_report_sink, mut subscriber_location_report_sink_server) = + let (subscriber_location_report_sink, subscriber_location_report_sink_server) = file_sink::FileSinkBuilder::new( FileType::SubscriberLocationIngestReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_subscriber_location_report"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) .create() .await?; - let (coverage_object_report_sink, mut coverage_object_report_sink_server) = + let (coverage_object_report_sink, coverage_object_report_sink_server) = file_sink::FileSinkBuilder::new( FileType::CoverageObjectIngestReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_coverage_object_report"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(Duration::minutes(INGEST_WAIT_DURATION_MINUTES)) @@ -315,14 +308,20 @@ pub async fn grpc_server(shutdown: triggered::Listener, settings: &Settings) -> tokio::try_join!( server, - heartbeat_report_sink_server.run().map_err(Error::from), - speedtest_report_sink_server.run().map_err(Error::from), - data_transfer_session_sink_server.run().map_err(Error::from), + heartbeat_report_sink_server + .run(shutdown.clone()) + .map_err(Error::from), + speedtest_report_sink_server + .run(shutdown.clone()) + .map_err(Error::from), + data_transfer_session_sink_server + .run(shutdown.clone()) + .map_err(Error::from), subscriber_location_report_sink_server - .run() + .run(shutdown.clone()) .map_err(Error::from), coverage_object_report_sink_server - .run() + .run(shutdown.clone()) .map_err(Error::from), file_upload.run(&shutdown).map_err(Error::from), ) diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index b805e03fc..a320fc640 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -6,10 +6,10 @@ use crate::{ }; use anyhow::{bail, Result}; use file_store::{ - file_info_poller_tm::{FileInfoStream, LookbackBehavior}, - file_sink_tm::FileSinkBuilder, - file_sink_tm::FileSinkClient, - file_source_tm, file_upload, + file_info_poller::{FileInfoStream, LookbackBehavior}, + file_sink::FileSinkBuilder, + file_sink::FileSinkClient, + file_source, file_upload, iot_packet::PacketRouterPacketReport, FileStore, FileType, }; @@ -124,7 +124,7 @@ impl Cmd { let store_base_path = std::path::Path::new(&settings.cache); // Verified packets: - let (valid_packets, valid_packets_server) = FileSinkBuilder::new( + let (valid_packets, valid_packets_server) = FileSinkBuilder::new_tm( FileType::IotValidPacket, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_valid_packets"), @@ -134,7 +134,7 @@ impl Cmd { .create() .await?; - let (invalid_packets, invalid_packets_server) = FileSinkBuilder::new( + let (invalid_packets, invalid_packets_server) = FileSinkBuilder::new_tm( FileType::InvalidPacket, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_invalid_packets"), @@ -151,7 +151,7 @@ impl Cmd { let file_store = FileStore::from_settings(&settings.ingest).await?; let (report_files, report_files_server) = - file_source_tm::continuous_source::() + file_source::continuous_source::() .db(pool.clone()) .store(file_store) .lookback(LookbackBehavior::StartAfter(settings.start_after())) diff --git a/iot_packet_verifier/src/verifier.rs b/iot_packet_verifier/src/verifier.rs index 877295359..34f381442 100644 --- a/iot_packet_verifier/src/verifier.rs +++ b/iot_packet_verifier/src/verifier.rs @@ -1,7 +1,7 @@ use crate::pending_burns::PendingBurns; use async_trait::async_trait; use file_store::{ - file_sink_tm::FileSinkClient, iot_packet::PacketRouterPacketReport, traits::MsgTimestamp, + file_sink::FileSinkClient, iot_packet::PacketRouterPacketReport, traits::MsgTimestamp, }; use futures::{Stream, StreamExt}; use helium_crypto::PublicKeyBinary; diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 97af307ab..1f0961ff7 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -95,11 +95,10 @@ impl Server { let store_base_path = std::path::Path::new(&settings.cache); // Gateway reward shares sink - let (rewards_sink, mut gateway_rewards_server) = file_sink::FileSinkBuilder::new( + let (rewards_sink, gateway_rewards_server) = file_sink::FileSinkBuilder::new( FileType::IotRewardShare, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_gateway_reward_shares"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .auto_commit(false) @@ -107,11 +106,10 @@ impl Server { .await?; // Reward manifest - let (reward_manifests_sink, mut reward_manifests_server) = file_sink::FileSinkBuilder::new( + let (reward_manifests_sink, reward_manifests_server) = file_sink::FileSinkBuilder::new( FileType::RewardManifest, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_iot_reward_manifest"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .auto_commit(false) @@ -131,7 +129,7 @@ impl Server { let mut entropy_loader = EntropyLoader { pool: pool.clone() }; let entropy_store = FileStore::from_settings(&settings.entropy).await?; let entropy_interval = settings.entropy_interval(); - let (entropy_loader_receiver, entropy_loader_source_join_handle) = + let (entropy_loader_receiver, entropy_loader_server) = file_source::continuous_source::() .db(pool.clone()) .store(entropy_store.clone()) @@ -139,15 +137,15 @@ impl Server { .lookback(LookbackBehavior::Max(max_lookback_age)) .poll_duration(entropy_interval) .offset(entropy_interval * 2) - .build()? - .start(shutdown.clone()) - .await?; + .create()?; + let entropy_loader_source_join_handle = + entropy_loader_server.start(shutdown.clone()).await?; // setup the packet loader continious source let packet_loader = packet_loader::PacketLoader::from_settings(settings, pool.clone()); let packet_store = FileStore::from_settings(&settings.packet_ingest).await?; let packet_interval = settings.packet_interval(); - let (pk_loader_receiver, pk_loader_source_join_handle) = + let (pk_loader_receiver, pk_loader_server) = file_source::continuous_source::() .db(pool.clone()) .store(packet_store.clone()) @@ -155,9 +153,8 @@ impl Server { .lookback(LookbackBehavior::Max(max_lookback_age)) .poll_duration(packet_interval) .offset(packet_interval * 2) - .build()? - .start(shutdown.clone()) - .await?; + .create()?; + let pk_loader_source_join_handle = pk_loader_server.start(shutdown.clone()).await?; // init da processes let mut loader = loader::Loader::from_settings(settings, pool.clone()).await?; @@ -170,8 +167,12 @@ impl Server { tokio::try_join!( gateway_updater.run(&shutdown).map_err(Error::from), - gateway_rewards_server.run().map_err(Error::from), - reward_manifests_server.run().map_err(Error::from), + gateway_rewards_server + .run(shutdown.clone()) + .map_err(Error::from), + reward_manifests_server + .run(shutdown.clone()) + .map_err(Error::from), file_upload.run(&shutdown).map_err(Error::from), runner.run( file_upload_tx.clone(), @@ -184,11 +185,11 @@ impl Server { loader.run(&shutdown, &gateway_cache), packet_loader.run( pk_loader_receiver, - &shutdown, + shutdown.clone(), &gateway_cache, file_upload_tx.clone() ), - purger.run(&shutdown), + purger.run(shutdown.clone()), rewarder.run(price_tracker, &shutdown), density_scaler.run(&shutdown).map_err(Error::from), price_receiver.map_err(Error::from), diff --git a/iot_verifier/src/packet_loader.rs b/iot_verifier/src/packet_loader.rs index c0a30fe28..e48a25457 100644 --- a/iot_verifier/src/packet_loader.rs +++ b/iot_verifier/src/packet_loader.rs @@ -37,29 +37,26 @@ impl PacketLoader { pub async fn run( &self, mut receiver: Receiver>, - shutdown: &triggered::Listener, + shutdown: triggered::Listener, gateway_cache: &GatewayCache, file_upload_tx: FileUploadSender, ) -> anyhow::Result<()> { tracing::info!("starting verifier iot packet loader"); let store_base_path = Path::new(&self.cache); - let (non_rewardable_packet_sink, mut non_rewardable_packet_server) = + let (non_rewardable_packet_sink, non_rewardable_packet_server) = file_sink::FileSinkBuilder::new( FileType::NonRewardablePacket, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_non_rewardable_packet"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(ChronoDuration::minutes(5)) .create() .await?; - tokio::spawn(async move { non_rewardable_packet_server.run().await }); + let shutdown1 = shutdown.clone(); + tokio::spawn(async move { non_rewardable_packet_server.run(shutdown1).await }); loop { - if shutdown.is_triggered() { - break; - } tokio::select! { _ = shutdown.clone() => break, msg = receiver.recv() => if let Some(stream) = msg { diff --git a/iot_verifier/src/purger.rs b/iot_verifier/src/purger.rs index 42d408cd1..16612a7e2 100644 --- a/iot_verifier/src/purger.rs +++ b/iot_verifier/src/purger.rs @@ -58,7 +58,7 @@ impl Purger { }) } - pub async fn run(&self, shutdown: &triggered::Listener) -> anyhow::Result<()> { + pub async fn run(&self, shutdown: triggered::Listener) -> anyhow::Result<()> { tracing::info!("starting purger"); let mut db_timer = time::interval(DB_POLL_TIME); @@ -69,34 +69,32 @@ impl Purger { let file_upload = file_upload::FileUpload::from_settings(&self.output, file_upload_rx).await?; - let (invalid_beacon_sink, mut invalid_beacon_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::IotInvalidBeaconReport, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon"), - shutdown.clone(), - ) - .deposits(Some(file_upload_tx.clone())) - .auto_commit(false) - .create() - .await?; - - let (invalid_witness_sink, mut invalid_witness_sink_server) = - file_sink::FileSinkBuilder::new( - FileType::IotInvalidWitnessReport, - store_base_path, - concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"), - shutdown.clone(), - ) - .deposits(Some(file_upload_tx.clone())) - .auto_commit(false) - .create() - .await?; - - let upload_shutdown = shutdown.clone(); - tokio::spawn(async move { invalid_beacon_sink_server.run().await }); - tokio::spawn(async move { invalid_witness_sink_server.run().await }); - tokio::spawn(async move { file_upload.run(&upload_shutdown).await }); + let (invalid_beacon_sink, invalid_beacon_sink_server) = file_sink::FileSinkBuilder::new( + FileType::IotInvalidBeaconReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon"), + ) + .deposits(Some(file_upload_tx.clone())) + .auto_commit(false) + .create() + .await?; + + let (invalid_witness_sink, invalid_witness_sink_server) = file_sink::FileSinkBuilder::new( + FileType::IotInvalidWitnessReport, + store_base_path, + concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"), + ) + .deposits(Some(file_upload_tx.clone())) + .auto_commit(false) + .create() + .await?; + + let shutdown1 = shutdown.clone(); + let shutdown2 = shutdown.clone(); + let shutdown3 = shutdown.clone(); + tokio::spawn(async move { invalid_beacon_sink_server.run(shutdown1).await }); + tokio::spawn(async move { invalid_witness_sink_server.run(shutdown2).await }); + tokio::spawn(async move { file_upload.run(&shutdown3).await }); loop { if shutdown.is_triggered() { diff --git a/iot_verifier/src/runner.rs b/iot_verifier/src/runner.rs index 5f5063526..5526bb260 100644 --- a/iot_verifier/src/runner.rs +++ b/iot_verifier/src/runner.rs @@ -119,44 +119,43 @@ impl Runner { let store_base_path = Path::new(&self.cache); - let (iot_invalid_beacon_sink, mut iot_invalid_beacon_sink_server) = + let (iot_invalid_beacon_sink, iot_invalid_beacon_sink_server) = file_sink::FileSinkBuilder::new( FileType::IotInvalidBeaconReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_invalid_beacon_report"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(ChronoDuration::minutes(5)) .create() .await?; - let (iot_invalid_witness_sink, mut iot_invalid_witness_sink_server) = + let (iot_invalid_witness_sink, iot_invalid_witness_sink_server) = file_sink::FileSinkBuilder::new( FileType::IotInvalidWitnessReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_invalid_witness_report"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(ChronoDuration::minutes(5)) .create() .await?; - let (iot_poc_sink, mut iot_poc_sink_server) = file_sink::FileSinkBuilder::new( + let (iot_poc_sink, iot_poc_sink_server) = file_sink::FileSinkBuilder::new( FileType::IotPoc, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_valid_poc"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(ChronoDuration::minutes(2)) .create() .await?; - - tokio::spawn(async move { iot_invalid_beacon_sink_server.run().await }); - tokio::spawn(async move { iot_invalid_witness_sink_server.run().await }); - tokio::spawn(async move { iot_poc_sink_server.run().await }); + let shutdown1 = shutdown.clone(); + let shutdown2 = shutdown.clone(); + let shutdown3 = shutdown.clone(); + tokio::spawn(async move { iot_invalid_beacon_sink_server.run(shutdown1).await }); + tokio::spawn(async move { iot_invalid_witness_sink_server.run(shutdown2).await }); + tokio::spawn(async move { iot_poc_sink_server.run(shutdown3).await }); loop { if shutdown.is_triggered() { diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 0abc5a827..1dacf24a8 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -122,22 +122,20 @@ impl Cmd { let store_base_path = std::path::Path::new(&settings.cache); - let (valid_sessions, mut valid_sessions_server) = FileSinkBuilder::new( + let (valid_sessions, valid_sessions_server) = FileSinkBuilder::new( FileType::ValidDataTransferSession, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_valid_data_transfer_session"), - shutdown_listener.clone(), ) .deposits(Some(file_upload_tx.clone())) .auto_commit(true) .create() .await?; - let (invalid_sessions, mut invalid_sessions_server) = FileSinkBuilder::new( + let (invalid_sessions, invalid_sessions_server) = FileSinkBuilder::new( FileType::InvalidDataTransferSessionIngestReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_invalid_data_transfer_session"), - shutdown_listener.clone(), ) .deposits(Some(file_upload_tx.clone())) .auto_commit(false) @@ -148,7 +146,7 @@ impl Cmd { let file_store = FileStore::from_settings(&settings.ingest).await?; - let (reports, source_join_handle) = + let (reports, reports_server) = file_source::continuous_source::() .db(pool.clone()) .store(file_store) @@ -157,9 +155,8 @@ impl Cmd { )) .file_type(FileType::DataTransferSessionIngestReport) .lookback(LookbackBehavior::StartAfter(settings.start_after())) - .build()? - .start(shutdown_listener.clone()) - .await?; + .create()?; + let source_join_handle = reports_server.start(shutdown_listener.clone()).await?; let gateway_client = GatewayClient::from_settings(&settings.config_client)?; let auth_client = AuthorizationClient::from_settings(&settings.config_client)?; @@ -178,8 +175,12 @@ impl Cmd { tokio::try_join!( source_join_handle.map_err(Error::from), - valid_sessions_server.run().map_err(Error::from), - invalid_sessions_server.run().map_err(Error::from), + valid_sessions_server + .run(shutdown_listener.clone()) + .map_err(Error::from), + invalid_sessions_server + .run(shutdown_listener.clone()) + .map_err(Error::from), file_upload.run(&shutdown_listener).map_err(Error::from), daemon.run(&shutdown_listener).map_err(Error::from), sol_balance_monitor diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 5912ef838..a8fecea8b 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -57,21 +57,19 @@ impl Cmd { PriceTracker::start(&settings.price_tracker, shutdown_listener.clone()).await?; // Heartbeats - let (heartbeats, heartbeats_join_handle) = + let (heartbeats, heartbeats_server) = file_source::continuous_source::() .db(pool.clone()) .store(report_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .file_type(FileType::CellHeartbeatIngestReport) - .build()? - .start(shutdown_listener.clone()) - .await?; + .create()?; + let heartbeats_join_handle = heartbeats_server.start(shutdown_listener.clone()).await?; - let (valid_heartbeats, mut valid_heartbeats_server) = file_sink::FileSinkBuilder::new( + let (valid_heartbeats, valid_heartbeats_server) = file_sink::FileSinkBuilder::new( FileType::ValidatedHeartbeat, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_heartbeat"), - shutdown_listener.clone(), ) .deposits(Some(file_upload_tx.clone())) .auto_commit(false) @@ -87,21 +85,19 @@ impl Cmd { ); // Speedtests - let (speedtests, speedtests_join_handle) = + let (speedtests, speedtests_server) = file_source::continuous_source::() .db(pool.clone()) .store(report_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .file_type(FileType::CellSpeedtestIngestReport) - .build()? - .start(shutdown_listener.clone()) - .await?; + .create()?; + let speedtests_join_handle = speedtests_server.start(shutdown_listener.clone()).await?; - let (valid_speedtests, mut valid_speedtests_server) = file_sink::FileSinkBuilder::new( + let (valid_speedtests, valid_speedtests_server) = file_sink::FileSinkBuilder::new( FileType::SpeedtestAvg, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_speedtest_average"), - shutdown_listener.clone(), ) .deposits(Some(file_upload_tx.clone())) .auto_commit(false) @@ -118,22 +114,20 @@ impl Cmd { // Mobile rewards let reward_period_hours = settings.rewards; - let (mobile_rewards, mut mobile_rewards_server) = file_sink::FileSinkBuilder::new( + let (mobile_rewards, mobile_rewards_server) = file_sink::FileSinkBuilder::new( FileType::MobileRewardShare, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_radio_reward_shares"), - shutdown_listener.clone(), ) .deposits(Some(file_upload_tx.clone())) .auto_commit(false) .create() .await?; - let (reward_manifests, mut reward_manifests_server) = file_sink::FileSinkBuilder::new( + let (reward_manifests, reward_manifests_server) = file_sink::FileSinkBuilder::new( FileType::RewardManifest, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_reward_manifest"), - shutdown_listener.clone(), ) .deposits(Some(file_upload_tx.clone())) .auto_commit(false) @@ -150,22 +144,22 @@ impl Cmd { ); // subscriber location - let (subscriber_location_ingest, subscriber_location_ingest_join_handle) = + let (subscriber_location_ingest, subscriber_location_ingest_server) = file_source::continuous_source::() .db(pool.clone()) .store(report_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .file_type(FileType::SubscriberLocationIngestReport) - .build()? - .start(shutdown_listener.clone()) - .await?; + .create()?; + let subscriber_location_ingest_join_handle = subscriber_location_ingest_server + .start(shutdown_listener.clone()) + .await?; - let (verified_subscriber_location, mut verified_subscriber_location_server) = + let (verified_subscriber_location, verified_subscriber_location_server) = file_sink::FileSinkBuilder::new( FileType::VerifiedSubscriberLocationIngestReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_verified_subscriber_location"), - shutdown_listener.clone(), ) .deposits(Some(file_upload_tx.clone())) .auto_commit(false) @@ -181,26 +175,35 @@ impl Cmd { ); // data transfers - let (data_session_ingest, data_session_ingest_join_handle) = + let (data_session_ingest, data_session_ingest_server) = file_source::continuous_source::() .db(pool.clone()) .store(data_transfer_ingest.clone()) .lookback(LookbackBehavior::StartAfter(settings.start_after())) .file_type(FileType::ValidDataTransferSession) - .build()? - .start(shutdown_listener.clone()) - .await?; + .create()?; + let data_session_ingest_join_handle = data_session_ingest_server + .start(shutdown_listener.clone()) + .await?; let data_session_ingestor = DataSessionIngestor::new(pool.clone()); tokio::try_join!( - valid_heartbeats_server.run().map_err(Error::from), - valid_speedtests_server.run().map_err(Error::from), - mobile_rewards_server.run().map_err(Error::from), + valid_heartbeats_server + .run(shutdown_listener.clone()) + .map_err(Error::from), + valid_speedtests_server + .run(shutdown_listener.clone()) + .map_err(Error::from), + mobile_rewards_server + .run(shutdown_listener.clone()) + .map_err(Error::from), file_upload.run(&shutdown_listener).map_err(Error::from), - reward_manifests_server.run().map_err(Error::from), + reward_manifests_server + .run(shutdown_listener.clone()) + .map_err(Error::from), verified_subscriber_location_server - .run() + .run(shutdown_listener.clone()) .map_err(Error::from), subscriber_location_ingestor .run(&shutdown_listener) diff --git a/poc_entropy/src/main.rs b/poc_entropy/src/main.rs index 5843d933c..8e503454c 100644 --- a/poc_entropy/src/main.rs +++ b/poc_entropy/src/main.rs @@ -78,11 +78,10 @@ impl Server { let mut entropy_generator = EntropyGenerator::new(&settings.source).await?; let entropy_watch = entropy_generator.receiver(); - let (entropy_sink, mut entropy_sink_server) = file_sink::FileSinkBuilder::new( + let (entropy_sink, entropy_sink_server) = file_sink::FileSinkBuilder::new( FileType::EntropyReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_report_submission"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(Duration::minutes(ENTROPY_SINK_ROLL_MINS)) @@ -100,7 +99,9 @@ impl Server { entropy_generator .run(entropy_sink, &shutdown) .map_err(Error::from), - entropy_sink_server.run().map_err(Error::from), + entropy_sink_server + .run(shutdown.clone()) + .map_err(Error::from), file_upload.run(&shutdown).map_err(Error::from), ) .map(|_| ()) diff --git a/price/src/main.rs b/price/src/main.rs index b90a05572..c37d4a2b3 100644 --- a/price/src/main.rs +++ b/price/src/main.rs @@ -108,11 +108,10 @@ impl Server { let mut hst_price_generator = PriceGenerator::new(settings, BlockchainTokenTypeV1::Hst).await?; - let (price_sink, mut price_sink_server) = file_sink::FileSinkBuilder::new( + let (price_sink, price_sink_server) = file_sink::FileSinkBuilder::new( FileType::PriceReport, store_base_path, concat!(env!("CARGO_PKG_NAME"), "_report_submission"), - shutdown.clone(), ) .deposits(Some(file_upload_tx.clone())) .roll_time(Duration::minutes(PRICE_SINK_ROLL_MINS)) @@ -132,7 +131,7 @@ impl Server { hst_price_generator .run(price_sink, &shutdown) .map_err(Error::from), - price_sink_server.run().map_err(Error::from), + price_sink_server.run(shutdown.clone()).map_err(Error::from), file_upload.run(&shutdown).map_err(Error::from), ) .map(|_| ()) diff --git a/reward_index/src/main.rs b/reward_index/src/main.rs index 3ddc907f4..42b39ff58 100644 --- a/reward_index/src/main.rs +++ b/reward_index/src/main.rs @@ -77,7 +77,7 @@ impl Server { let file_store = FileStore::from_settings(&settings.verifier).await?; - let (receiver, source_join_handle) = file_source::continuous_source::() + let (receiver, server) = file_source::continuous_source::() .db(pool.clone()) .store(file_store) .file_type(FileType::RewardManifest) @@ -88,9 +88,8 @@ impl Server { )) .poll_duration(settings.interval()) .offset(settings.interval() * 2) - .build()? - .start(shutdown_listener.clone()) - .await?; + .create()?; + let source_join_handle = server.start(shutdown_listener.clone()).await?; // Reward server let mut indexer = Indexer::new(settings, pool).await?;