Skip to content

Commit

Permalink
Merge pull request #615 from helium/andymck/taskman/iot-config
Browse files Browse the repository at this point in the history
taskman up iot config
  • Loading branch information
jeffgrunewald authored Oct 9, 2023
2 parents c3fe742 + 958725b commit 1d7058d
Show file tree
Hide file tree
Showing 10 changed files with 322 additions and 336 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions file_store/src/iot_invalid_poc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,7 @@ impl From<IotInvalidBeaconReport> for LoraInvalidBeaconReportV1 {
received_timestamp,
reason: v.reason as i32,
report: Some(report),
location: v
.location
.map(|l| l.to_string())
.unwrap_or_else(String::new),
location: v.location.map(|l| l.to_string()).unwrap_or_default(),
gain: v.gain,
elevation: v.elevation,
invalid_details: v.invalid_details,
Expand Down
10 changes: 2 additions & 8 deletions file_store/src/iot_valid_poc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,7 @@ impl From<IotValidBeaconReport> for LoraValidBeaconReportV1 {

Self {
received_timestamp,
location: v
.location
.map(|l| l.to_string())
.unwrap_or_else(String::new),
location: v.location.map(|l| l.to_string()).unwrap_or_default(),
gain: v.gain,
elevation: v.elevation,
hex_scale: (v.hex_scale * SCALE_MULTIPLIER).to_u32().unwrap_or(0),
Expand Down Expand Up @@ -214,10 +211,7 @@ impl From<IotVerifiedWitnessReport> for LoraVerifiedWitnessReportV1 {
received_timestamp,
status: v.status.into(),
report: Some(report),
location: v
.location
.map(|l| l.to_string())
.unwrap_or_else(String::new),
location: v.location.map(|l| l.to_string()).unwrap_or_default(),
gain: v.gain,
elevation: v.elevation,
hex_scale: (v.hex_scale * SCALE_MULTIPLIER).to_u32().unwrap_or(0),
Expand Down
2 changes: 2 additions & 0 deletions iot_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ sqlx = {workspace = true}
thiserror = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}
tokio-util = { workspace = true }
tonic = {workspace = true}
tower-http = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
triggered = {workspace = true}
task-manager = { path = "../task_manager" }
12 changes: 8 additions & 4 deletions iot_config/src/admin_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,24 @@ impl iot_config::Admin for AdminService {
admin::insert_key(request.pubkey.clone().into(), key_type, &self.pool)
.and_then(|_| async move {
if self.auth_updater.send_if_modified(|cache| {
if let std::collections::hash_map::Entry::Vacant(key) = cache.entry(pubkey) {
if let std::collections::hash_map::Entry::Vacant(key) =
cache.entry(pubkey.clone())
{
key.insert(key_type);
true
} else {
false
}
}) {
tracing::info!(%pubkey, %key_type, "key authorized");
Ok(())
} else {
Err(anyhow!("key already registered"))
}
})
.map_err(|err| {
let pubkey: PublicKeyBinary = request.pubkey.into();
tracing::error!(pubkey = pubkey.to_string(), "pubkey add failed");
tracing::error!(%pubkey, "pubkey add failed");
Status::internal(format!("error saving requested key: {pubkey}, {err:?}"))
})
.await?;
Expand All @@ -135,18 +138,19 @@ impl iot_config::Admin for AdminService {
admin::remove_key(request.pubkey.clone().into(), &self.pool)
.and_then(|deleted| async move {
match deleted {
Some((pubkey, _key_type)) => {
Some((pubkey, key_type)) => {
self.auth_updater.send_modify(|cache| {
cache.remove(&pubkey);
});
tracing::info!(%pubkey, %key_type,"key de-authorized");
Ok(())
}
None => Ok(()),
}
})
.map_err(|_| {
let pubkey: PublicKeyBinary = request.pubkey.into();
tracing::error!(pubkey = pubkey.to_string(), "pubkey remove failed");
tracing::error!(%pubkey, "pubkey remove failed");
Status::internal(format!("error removing request key: {pubkey}"))
})
.await?;
Expand Down
5 changes: 0 additions & 5 deletions iot_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub struct GatewayService {
region_map: RegionMapReader,
signing_key: Arc<Keypair>,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
}

impl GatewayService {
Expand All @@ -45,7 +44,6 @@ impl GatewayService {
region_map: RegionMapReader,
auth_cache: AuthCache,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
let gateway_cache = Arc::new(Cache::new());
let cache_clone = gateway_cache.clone();
Expand All @@ -58,7 +56,6 @@ impl GatewayService {
region_map,
signing_key: Arc::new(settings.signing_keypair()?),
delegate_cache,
shutdown,
})
}

Expand Down Expand Up @@ -278,13 +275,11 @@ impl iot_config::Gateway for GatewayService {
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;
let region_map = self.region_map.clone();
let shutdown_listener = self.shutdown.clone();

let (tx, rx) = tokio::sync::mpsc::channel(20);

tokio::spawn(async move {
tokio::select! {
_ = shutdown_listener => (),
_ = stream_all_gateways_info(
&pool,
tx.clone(),
Expand Down
90 changes: 57 additions & 33 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use anyhow::{Error, Result};
use clap::Parser;
use futures::future::LocalBoxFuture;
use futures_util::TryFutureExt;
use helium_proto::services::iot_config::{AdminServer, GatewayServer, OrgServer, RouteServer};
use iot_config::{
admin::AuthCache, admin_service::AdminService, gateway_service::GatewayService, org,
org_service::OrgService, region_map::RegionMapReader, route_service::RouteService,
settings::Settings, telemetry,
};
use std::{path::PathBuf, time::Duration};
use tokio::signal;
use std::{net::SocketAddr, path::PathBuf, time::Duration};
use task_manager::{ManagedTask, TaskManager};
use tonic::transport;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand Down Expand Up @@ -60,16 +61,6 @@ impl Daemon {
poc_metrics::start_metrics(&settings.metrics)?;
telemetry::initialize();

// Configure shutdown trigger
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?;
tokio::spawn(async move {
tokio::select! {
_ = sigterm.recv() => shutdown_trigger.trigger(),
_ = signal::ctrl_c() => shutdown_trigger.trigger(),
}
});

// Create database pool
let pool = settings.database.connect("iot-config-store").await?;
sqlx::migrate!().run(&pool).await?;
Expand All @@ -89,21 +80,14 @@ impl Daemon {
region_map.clone(),
auth_cache.clone(),
delegate_key_cache,
shutdown_listener.clone(),
)?;
let route_svc = RouteService::new(
settings,
auth_cache.clone(),
pool.clone(),
shutdown_listener.clone(),
)?;
let route_svc = RouteService::new(settings, auth_cache.clone(), pool.clone())?;
let org_svc = OrgService::new(
settings,
auth_cache.clone(),
pool.clone(),
route_svc.clone_update_channel(),
delegate_key_updater,
shutdown_listener.clone(),
)?;
let admin_svc = AdminService::new(
settings,
Expand All @@ -120,19 +104,59 @@ impl Daemon {
tracing::debug!("listening on {listen_addr}");
tracing::debug!("signing as {pubkey}");

transport::Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(250)))
.http2_keepalive_timeout(Some(Duration::from_secs(60)))
.layer(tower_http::trace::TraceLayer::new_for_grpc())
.add_service(GatewayServer::new(gateway_svc))
.add_service(OrgServer::new(org_svc))
.add_service(RouteServer::new(route_svc))
.add_service(AdminServer::new(admin_svc))
.serve_with_shutdown(listen_addr, shutdown_listener)
.map_err(Error::from)
.await?;

Ok(())
let grpc_server = GrpcServer {
listen_addr,
gateway_svc,
route_svc,
org_svc,
admin_svc,
};

TaskManager::builder().add_task(grpc_server).start().await
}
}

pub struct GrpcServer {
listen_addr: SocketAddr,
gateway_svc: GatewayService,
route_svc: RouteService,
org_svc: OrgService,
admin_svc: AdminService,
}

impl ManagedTask for GrpcServer {
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
Box::pin(async move {
let grpc_server = transport::Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(250)))
.http2_keepalive_timeout(Some(Duration::from_secs(60)))
.layer(tower_http::trace::TraceLayer::new_for_grpc())
.add_service(GatewayServer::new(self.gateway_svc))
.add_service(OrgServer::new(self.org_svc))
.add_service(RouteServer::new(self.route_svc))
.add_service(AdminServer::new(self.admin_svc))
.serve(self.listen_addr)
.map_err(Error::from);

tokio::select! {
_ = shutdown => {
tracing::warn!("grpc server shutting down");
Ok(())
}
res = grpc_server => {
match res {
Ok(()) => Ok(()),
Err(err) => {
tracing::error!(?err, "grpc server failed with error");
Err(anyhow::anyhow!("grpc server exiting with error"))
}
}
}
}
})
}
}

Expand Down
Loading

0 comments on commit 1d7058d

Please sign in to comment.