Skip to content

Commit

Permalink
Merge pull request #609 from helium/andymck/taskman/iot-verifier
Browse files Browse the repository at this point in the history
taskman up the iot verifier
  • Loading branch information
andymck authored Sep 6, 2023
2 parents 24e4849 + 2dcb6dd commit b8e3169
Show file tree
Hide file tree
Showing 27 changed files with 705 additions and 478 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions iot_verifier/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,5 @@ itertools = {workspace = true}
rand = {workspace = true}
beacon = {workspace = true}
price = { path = "../price" }
tokio-util = { workspace = true }
task-manager = { path = "../task_manager" }
28 changes: 17 additions & 11 deletions iot_verifier/src/entropy_loader.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::entropy::Entropy;
use blake3::hash;
use file_store::{entropy_report::EntropyReport, file_info_poller::FileInfoStream};
use futures::{StreamExt, TryStreamExt};
use futures::{future::LocalBoxFuture, StreamExt, TryStreamExt};
use sqlx::PgPool;
use task_manager::ManagedTask;
use tokio::sync::mpsc::Receiver;

pub struct EntropyLoader {
pub pool: PgPool,
pub file_receiver: Receiver<FileInfoStream<EntropyReport>>,
}

#[derive(thiserror::Error, Debug)]
Expand All @@ -17,24 +19,28 @@ pub enum NewLoaderError {
DbStoreError(#[from] db_store::Error),
}

impl ManagedTask for EntropyLoader {
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
Box::pin(self.run(shutdown))
}
}

impl EntropyLoader {
pub async fn run(
&mut self,
mut receiver: Receiver<FileInfoStream<EntropyReport>>,
shutdown: &triggered::Listener,
) -> anyhow::Result<()> {
pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> {
tracing::info!("starting entropy_loader");
loop {
if shutdown.is_triggered() {
break;
}
tokio::select! {
biased;
_ = shutdown.clone() => break,
msg = receiver.recv() => if let Some(stream) = msg {
msg = self.file_receiver.recv() => if let Some(stream) = msg {
self.handle_report(stream).await?;
}
}
}
tracing::info!("stopping verifier entropy_loader");
tracing::info!("stopping entropy_loader");
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions iot_verifier/src/gateway_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::gateway_updater::MessageReceiver;
use helium_crypto::PublicKeyBinary;
use iot_config::gateway_info::GatewayInfo;

#[derive(Clone)]
pub struct GatewayCache {
gateway_cache_receiver: MessageReceiver,
}
Expand Down
31 changes: 21 additions & 10 deletions iot_verifier/src/gateway_updater.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::Settings;
use chrono::Duration;
use futures::stream::StreamExt;
use futures::{future::LocalBoxFuture, stream::StreamExt, TryFutureExt};
use helium_crypto::PublicKeyBinary;
use iot_config::{
client::{Client as IotConfigClient, ClientError as IotConfigClientError},
gateway_info::{GatewayInfo, GatewayInfoResolver},
};
use std::collections::HashMap;
use task_manager::ManagedTask;
use tokio::sync::watch;
use tokio::time;

Expand All @@ -28,6 +29,20 @@ pub enum GatewayUpdaterError {
SendError(#[from] watch::error::SendError<GatewayMap>),
}

impl ManagedTask for GatewayUpdater {
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
let handle = tokio::spawn(self.run(shutdown));
Box::pin(
handle
.map_err(anyhow::Error::from)
.and_then(|result| async move { result.map_err(anyhow::Error::from) }),
)
}
}

impl GatewayUpdater {
pub async fn from_settings(
settings: &Settings,
Expand All @@ -45,26 +60,22 @@ impl GatewayUpdater {
))
}

pub async fn run(mut self, shutdown: &triggered::Listener) -> Result<(), GatewayUpdaterError> {
pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> {
tracing::info!("starting gateway_updater");

let mut trigger_timer = time::interval(
self.refresh_interval
.to_std()
.expect("valid interval in seconds"),
);

loop {
if shutdown.is_triggered() {
tracing::info!("stopping gateway_updater");
return Ok(());
}

tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = trigger_timer.tick() => self.handle_refresh_tick().await?,
_ = shutdown.clone() => return Ok(()),
}
}
tracing::info!("stopping gateway_updater");
Ok(())
}

async fn handle_refresh_tick(&mut self) -> Result<(), GatewayUpdaterError> {
Expand Down
23 changes: 10 additions & 13 deletions iot_verifier/src/hex_density.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,25 @@ lazy_static! {
};
}

#[async_trait::async_trait]
pub trait HexDensityMap: Clone {
async fn get(&self, hex: u64) -> Option<Decimal>;
async fn swap(&self, new_map: HashMap<u64, Decimal>);
}

#[derive(Debug, Clone)]
pub struct SharedHexDensityMap(Arc<RwLock<HashMap<u64, Decimal>>>);
pub struct HexDensityMap(Arc<RwLock<HashMap<u64, Decimal>>>);

impl Default for HexDensityMap {
fn default() -> Self {
Self::new()
}
}

impl SharedHexDensityMap {
impl HexDensityMap {
pub fn new() -> Self {
Self(Arc::new(RwLock::new(HashMap::new())))
}
}

#[async_trait::async_trait]
impl HexDensityMap for SharedHexDensityMap {
async fn get(&self, hex: u64) -> Option<Decimal> {
pub async fn get(&self, hex: u64) -> Option<Decimal> {
self.0.read().await.get(&hex).cloned()
}

async fn swap(&self, new_map: HashMap<u64, Decimal>) {
pub async fn swap(&self, new_map: HashMap<u64, Decimal>) {
*self.0.write().await = new_map;
}
}
Expand Down
Loading

0 comments on commit b8e3169

Please sign in to comment.