Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

taskman up iot config #615

Merged
merged 9 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions iot_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ sqlx = {workspace = true}
thiserror = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}
tokio-util = { workspace = true }
tonic = {workspace = true}
tower-http = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
triggered = {workspace = true}
task-manager = { path = "../task_manager" }

andymck marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 0 additions & 5 deletions iot_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ pub struct GatewayService {
region_map: RegionMapReader,
signing_key: Arc<Keypair>,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
}

impl GatewayService {
Expand All @@ -45,7 +44,6 @@ impl GatewayService {
region_map: RegionMapReader,
auth_cache: AuthCache,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
let gateway_cache = Arc::new(Cache::new());
let cache_clone = gateway_cache.clone();
Expand All @@ -58,7 +56,6 @@ impl GatewayService {
region_map,
signing_key: Arc::new(settings.signing_keypair()?),
delegate_cache,
shutdown,
})
}

Expand Down Expand Up @@ -278,13 +275,11 @@ impl iot_config::Gateway for GatewayService {
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;
let region_map = self.region_map.clone();
let shutdown_listener = self.shutdown.clone();

let (tx, rx) = tokio::sync::mpsc::channel(20);

tokio::spawn(async move {
tokio::select! {
_ = shutdown_listener => (),
_ = stream_all_gateways_info(
&pool,
tx.clone(),
Expand Down
75 changes: 42 additions & 33 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use anyhow::{Error, Result};
use clap::Parser;
use futures::future::LocalBoxFuture;
use futures_util::TryFutureExt;
use helium_proto::services::iot_config::{AdminServer, GatewayServer, OrgServer, RouteServer};
use iot_config::{
admin::AuthCache, admin_service::AdminService, gateway_service::GatewayService, org,
org_service::OrgService, region_map::RegionMapReader, route_service::RouteService,
settings::Settings, telemetry,
};
use std::{path::PathBuf, time::Duration};
use tokio::signal;
use std::{net::SocketAddr, path::PathBuf, time::Duration};
use task_manager::{ManagedTask, TaskManager};
use tonic::transport;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand Down Expand Up @@ -60,16 +61,6 @@ impl Daemon {
poc_metrics::start_metrics(&settings.metrics)?;
telemetry::initialize();

// Configure shutdown trigger
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
let mut sigterm = signal::unix::signal(signal::unix::SignalKind::terminate())?;
tokio::spawn(async move {
tokio::select! {
_ = sigterm.recv() => shutdown_trigger.trigger(),
_ = signal::ctrl_c() => shutdown_trigger.trigger(),
}
});

// Create database pool
let pool = settings.database.connect("iot-config-store").await?;
sqlx::migrate!().run(&pool).await?;
Expand All @@ -89,21 +80,14 @@ impl Daemon {
region_map.clone(),
auth_cache.clone(),
delegate_key_cache,
shutdown_listener.clone(),
)?;
let route_svc = RouteService::new(
settings,
auth_cache.clone(),
pool.clone(),
shutdown_listener.clone(),
)?;
let route_svc = RouteService::new(settings, auth_cache.clone(), pool.clone())?;
let org_svc = OrgService::new(
settings,
auth_cache.clone(),
pool.clone(),
route_svc.clone_update_channel(),
delegate_key_updater,
shutdown_listener.clone(),
)?;
let admin_svc = AdminService::new(
settings,
Expand All @@ -120,19 +104,44 @@ impl Daemon {
tracing::debug!("listening on {listen_addr}");
tracing::debug!("signing as {pubkey}");

transport::Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(250)))
.http2_keepalive_timeout(Some(Duration::from_secs(60)))
.layer(tower_http::trace::TraceLayer::new_for_grpc())
.add_service(GatewayServer::new(gateway_svc))
.add_service(OrgServer::new(org_svc))
.add_service(RouteServer::new(route_svc))
.add_service(AdminServer::new(admin_svc))
.serve_with_shutdown(listen_addr, shutdown_listener)
.map_err(Error::from)
.await?;

Ok(())
let grpc_server = GrpcServer {
listen_addr,
gateway_svc,
route_svc,
org_svc,
admin_svc,
};

TaskManager::builder().add_task(grpc_server).start().await
}
}

pub struct GrpcServer {
listen_addr: SocketAddr,
gateway_svc: GatewayService,
route_svc: RouteService,
org_svc: OrgService,
admin_svc: AdminService,
}

impl ManagedTask for GrpcServer {
fn start_task(
self: Box<Self>,
shutdown: triggered::Listener,
) -> LocalBoxFuture<'static, anyhow::Result<()>> {
Box::pin(async move {
transport::Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(250)))
.http2_keepalive_timeout(Some(Duration::from_secs(60)))
.layer(tower_http::trace::TraceLayer::new_for_grpc())
.add_service(GatewayServer::new(self.gateway_svc))
.add_service(OrgServer::new(self.org_svc))
.add_service(RouteServer::new(self.route_svc))
.add_service(AdminServer::new(self.admin_svc))
.serve_with_shutdown(self.listen_addr, shutdown)
.map_err(Error::from)
.await
})
}
}

Expand Down
5 changes: 0 additions & 5 deletions iot_config/src/org_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ pub struct OrgService {
route_update_tx: broadcast::Sender<RouteStreamResV1>,
signing_key: Keypair,
delegate_updater: watch::Sender<org::DelegateCache>,
shutdown: triggered::Listener,
}

#[derive(Clone, Debug, PartialEq)]
Expand All @@ -42,15 +41,13 @@ impl OrgService {
pool: Pool<Postgres>,
route_update_tx: broadcast::Sender<RouteStreamResV1>,
delegate_updater: watch::Sender<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
Ok(Self {
auth_cache,
pool,
route_update_tx,
signing_key: settings.signing_keypair()?,
delegate_updater,
shutdown,
})
}

Expand Down Expand Up @@ -467,7 +464,6 @@ impl iot_config::Org for OrgService {
})?;

tokio::select! {
_ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")),
result = self.stream_org_routes_enable_disable(request.oui) => result?
}
}
Expand Down Expand Up @@ -506,7 +502,6 @@ impl iot_config::Org for OrgService {
})?;

tokio::select! {
_ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")),
result = self.stream_org_routes_enable_disable(request.oui) => result?
}
}
Expand Down
35 changes: 1 addition & 34 deletions iot_config/src/route_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ pub struct RouteService {
auth_cache: AuthCache,
pool: Pool<Postgres>,
update_channel: broadcast::Sender<RouteStreamResV1>,
shutdown: triggered::Listener,
signing_key: Arc<Keypair>,
}

Expand All @@ -48,17 +47,11 @@ enum OrgId<'a> {
}

impl RouteService {
pub fn new(
settings: &Settings,
auth_cache: AuthCache,
pool: Pool<Postgres>,
shutdown: triggered::Listener,
) -> Result<Self> {
pub fn new(settings: &Settings, auth_cache: AuthCache, pool: Pool<Postgres>) -> Result<Self> {
Ok(Self {
auth_cache,
pool,
update_channel: update_channel(),
shutdown,
signing_key: Arc::new(settings.signing_keypair()?),
})
}
Expand Down Expand Up @@ -390,15 +383,13 @@ impl iot_config::Route for RouteService {

tracing::info!("client subscribed to route stream");
let pool = self.pool.clone();
let shutdown_listener = self.shutdown.clone();
let (tx, rx) = tokio::sync::mpsc::channel(20);
let signing_key = self.signing_key.clone();

let mut route_updates = self.subscribe_to_routes();

tokio::spawn(async move {
tokio::select! {
_ = shutdown_listener.clone() => return,
jeffgrunewald marked this conversation as resolved.
Show resolved Hide resolved
result = stream_existing_routes(&pool, &signing_key, tx.clone())
.and_then(|_| stream_existing_euis(&pool, &signing_key, tx.clone()))
.and_then(|_| stream_existing_devaddrs(&pool, &signing_key, tx.clone()))
Expand All @@ -413,13 +404,7 @@ impl iot_config::Route for RouteService {
tracing::info!("existing routes sent; streaming updates as available");
telemetry::route_stream_subscribe();
loop {
let shutdown = shutdown_listener.clone();
jeffgrunewald marked this conversation as resolved.
Show resolved Hide resolved

tokio::select! {
_ = shutdown => {
telemetry::route_stream_unsubscribe();
return
}
msg = route_updates.recv() => {
match msg {
Ok(update) => {
Expand Down Expand Up @@ -459,7 +444,6 @@ impl iot_config::Route for RouteService {

let pool = self.pool.clone();
let (tx, rx) = tokio::sync::mpsc::channel(20);
let shutdown_listener = self.shutdown.clone();

tracing::debug!(route_id = request.route_id, "listing eui pairs");

Expand All @@ -484,9 +468,6 @@ impl iot_config::Route for RouteService {
};

tokio::select! {
_ = shutdown_listener => {
_ = tx.send(Err(Status::unavailable("service shutting down"))).await;
}
_ = async {
while let Some(eui) = eui_stream.next().await {
let message = match eui {
Expand Down Expand Up @@ -533,7 +514,6 @@ impl iot_config::Route for RouteService {
.await?;

tokio::select! {
_ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")),
result = incoming_stream
.map_ok(|update| match validator.validate_update(&update) {
Ok(()) => Ok(update),
Expand Down Expand Up @@ -620,7 +600,6 @@ impl iot_config::Route for RouteService {

let (tx, rx) = tokio::sync::mpsc::channel(20);
let pool = self.pool.clone();
let shutdown_listener = self.shutdown.clone();

tracing::debug!(route_id = request.route_id, "listing devaddr ranges");

Expand All @@ -643,9 +622,6 @@ impl iot_config::Route for RouteService {
};

tokio::select! {
_ = shutdown_listener => {
_ = tx.send(Err(Status::unavailable("service shutting down"))).await;
}
_ = async {
while let Some(devaddr) = devaddrs.next().await {
let message = match devaddr {
Expand Down Expand Up @@ -695,7 +671,6 @@ impl iot_config::Route for RouteService {
.await?;

tokio::select! {
_ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")),
result = incoming_stream
.map_ok(|update| match validator.validate_update(&update) {
Ok(()) => Ok(update),
Expand Down Expand Up @@ -788,7 +763,6 @@ impl iot_config::Route for RouteService {

let pool = self.pool.clone();
let (tx, rx) = tokio::sync::mpsc::channel(20);
let shutdown_listener = self.shutdown.clone();

tracing::debug!(
route_id = request.route_id,
Expand Down Expand Up @@ -816,9 +790,6 @@ impl iot_config::Route for RouteService {
};

tokio::select! {
_ = shutdown_listener => {
_ = tx.send(Err(Status::unavailable("service shutting down"))).await;
}
_ = async {
while let Some(skf) = skf_stream.next().await {
let message = match skf {
Expand Down Expand Up @@ -850,7 +821,6 @@ impl iot_config::Route for RouteService {

let pool = self.pool.clone();
let (tx, rx) = tokio::sync::mpsc::channel(20);
let shutdown_listener = self.shutdown.clone();

tracing::debug!(
route_id = request.route_id,
Expand Down Expand Up @@ -882,9 +852,6 @@ impl iot_config::Route for RouteService {
};

tokio::select! {
_ = shutdown_listener => {
_ = tx.send(Err(Status::unavailable("service shutting down"))).await;
}
_ = async {
while let Some(skf) = skf_stream.next().await {
let message = match skf {
Expand Down