Skip to content

Commit

Permalink
update oracles to use refactored filestore apis
Browse files Browse the repository at this point in the history
  • Loading branch information
andymck committed Aug 21, 2023
1 parent 85db1b5 commit f8b014e
Show file tree
Hide file tree
Showing 19 changed files with 302 additions and 233 deletions.
16 changes: 15 additions & 1 deletion file_store/src/cli/dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use helium_crypto::PublicKey;
use helium_proto::{
services::{
packet_verifier::ValidDataTransferSession as ValidDataTransferSessionProto,
poc_lora::{LoraBeaconIngestReportV1, LoraPocV1, LoraWitnessIngestReportV1},
poc_lora::{
LoraBeaconIngestReportV1, LoraInvalidWitnessReportV1, LoraPocV1,
LoraWitnessIngestReportV1,
},
poc_mobile::{
mobile_reward_share::Reward, CellHeartbeatIngestReportV1, CellHeartbeatReqV1,
Heartbeat, InvalidDataTransferIngestReportV1, MobileRewardShare, RadioRewardShare,
Expand Down Expand Up @@ -131,6 +134,17 @@ impl Cmd {
print_json(&json)?;
// wtr.serialize(IotWitnessIngestReport::try_from(dec_msg)?)?;
}
FileType::IotInvalidWitnessReport => {
let dec_msg = LoraInvalidWitnessReportV1::decode(msg)?;
let json = json!({
"received_timestamp": dec_msg.received_timestamp,
"reason": dec_msg.reason
});
// TODO: tmp dump out as json
// printing to json here as csv serializing failing due on header generation from struct
print_json(&json)?;
// wtr.serialize(IotWitnessIngestReport::try_from(dec_msg)?)?;
}
FileType::IotPoc => {
let dec_msg = LoraPocV1::decode(msg)?;
let json = json!({
Expand Down
2 changes: 1 addition & 1 deletion file_store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub enum Error {
#[error("shutting down")]
Shutdown,
#[error("error building file info poller")]
FileInfoPollerError(#[from] crate::file_info_poller_tm::FileInfoPollerConfigBuilderError),
FileInfoPollerError(#[from] crate::file_info_poller::FileInfoPollerConfigBuilderError),
}

#[derive(Error, Debug)]
Expand Down
77 changes: 56 additions & 21 deletions file_store/src/file_info_poller.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{traits::MsgDecode, Error, FileInfo, FileStore, FileType, Result};
use chrono::{DateTime, Duration, TimeZone, Utc};
use derive_builder::Builder;
use futures::{stream::BoxStream, StreamExt};
use futures::{future::LocalBoxFuture, stream::BoxStream, StreamExt, TryFutureExt};
use retainer::Cache;
use std::marker::PhantomData;
use task_manager::ManagedTask;
use tokio::sync::mpsc::{error::TrySendError, Receiver, Sender};

const DEFAULT_POLL_DURATION_SECS: i64 = 30;
Expand Down Expand Up @@ -39,7 +40,8 @@ pub enum LookbackBehavior {
}

#[derive(Debug, Clone, Builder)]
pub struct FileInfoPoller<T> {
#[builder(pattern = "owned")]
pub struct FileInfoPollerConfig<T> {
#[builder(default = "Duration::seconds(DEFAULT_POLL_DURATION_SECS)")]
poll_duration: Duration,
db: sqlx::Pool<sqlx::Postgres>,
Expand All @@ -54,35 +56,65 @@ pub struct FileInfoPoller<T> {
p: PhantomData<T>,
}

impl<T> FileInfoPoller<T>
#[derive(Debug, Clone)]
pub struct FileInfoPollerServer<T> {
config: FileInfoPollerConfig<T>,
sender: Sender<FileInfoStream<T>>,
}

impl<T> FileInfoPollerConfigBuilder<T>
where
T: Clone,
{
pub fn create(self) -> Result<(Receiver<FileInfoStream<T>>, FileInfoPollerServer<T>)> {
let config = self.build().unwrap(); // TODO: fix the unwrap
let (sender, receiver) = tokio::sync::mpsc::channel(config.queue_size);
Ok((receiver, FileInfoPollerServer { config, sender }))
}
}

impl<T> ManagedTask for FileInfoPollerServer<T>
where
T: MsgDecode + TryFrom<T::Msg, Error = Error> + Send + Sync + 'static,
{
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
let handle = tokio::spawn(self.run(shutdown));

Box::pin(
handle
.map_err(anyhow::Error::from)
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
)
}
}

impl<T> FileInfoPollerServer<T>
where
T: MsgDecode + TryFrom<T::Msg, Error = Error> + Send + Sync + 'static,
{
pub async fn start(
self,
shutdown: triggered::Listener,
) -> Result<(
Receiver<FileInfoStream<T>>,
impl std::future::Future<Output = Result>,
)> {
let (sender, receiver) = tokio::sync::mpsc::channel(self.queue_size);
let join_handle = tokio::spawn(async move { self.run(shutdown, sender).await });

Ok((receiver, async move {
) -> Result<impl std::future::Future<Output = Result>> {
let join_handle = tokio::spawn(async move { self.run(shutdown).await });
Ok(async move {
match join_handle.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(Error::from(err)),
}
}))
})
}

async fn run(self, shutdown: triggered::Listener, sender: Sender<FileInfoStream<T>>) -> Result {
async fn run(self, shutdown: triggered::Listener) -> Result {
let cache = create_cache();
let mut poll_trigger = tokio::time::interval(self.poll_duration());
let mut cleanup_trigger = tokio::time::interval(CLEAN_DURATION);

let mut latest_ts = db::latest_ts(&self.db, self.file_type).await?;
let mut latest_ts = db::latest_ts(&self.config.db, self.config.file_type).await?;

loop {
let after = self.after(latest_ts);
Expand All @@ -95,10 +127,10 @@ where
}
_ = cleanup_trigger.tick() => self.clean(&cache).await?,
_ = poll_trigger.tick() => {
let files = self.store.list_all(self.file_type.to_str(), after, before).await?;
let files = self.config.store.list_all(self.config.file_type.to_str(), after, before).await?;
for file in files {
if !is_already_processed(&self.db, &cache, &file).await? {
if send_stream(&sender, &self.store, file.clone()).await? {
if !is_already_processed(&self.config.db, &cache, &file).await? {
if send_stream(&self.sender, &self.config.store, file.clone()).await? {
latest_ts = Some(file.timestamp);
cache_file(&cache, &file).await;
} else {
Expand All @@ -114,8 +146,8 @@ where
}

fn after(&self, latest: Option<DateTime<Utc>>) -> DateTime<Utc> {
let latest_offset = latest.map(|lt| lt - self.offset);
match self.lookback {
let latest_offset = latest.map(|lt| lt - self.config.offset);
match self.config.lookback {
LookbackBehavior::StartAfter(start_after) => latest_offset.unwrap_or(start_after),
LookbackBehavior::Max(max_lookback) => {
let max_ts = Utc::now() - max_lookback;
Expand All @@ -126,12 +158,15 @@ where

async fn clean(&self, cache: &MemoryFileCache) -> Result {
cache.purge(4, 0.25).await;
db::clean(&self.db, &self.file_type).await?;
db::clean(&self.config.db, &self.config.file_type).await?;
Ok(())
}

fn poll_duration(&self) -> std::time::Duration {
self.poll_duration.to_std().unwrap_or(DEFAULT_POLL_DURATION)
self.config
.poll_duration
.to_std()
.unwrap_or(DEFAULT_POLL_DURATION)
}
}

Expand Down
Loading

0 comments on commit f8b014e

Please sign in to comment.