diff --git a/iot_config/src/gateway_service.rs b/iot_config/src/gateway_service.rs index 7b3b794dc..05956674a 100644 --- a/iot_config/src/gateway_service.rs +++ b/iot_config/src/gateway_service.rs @@ -35,6 +35,7 @@ pub struct GatewayService { region_map: RegionMapReader, signing_key: Arc, delegate_cache: watch::Receiver, + shutdown: triggered::Listener, } impl GatewayService { @@ -44,6 +45,7 @@ impl GatewayService { region_map: RegionMapReader, auth_cache: AuthCache, delegate_cache: watch::Receiver, + shutdown: triggered::Listener, ) -> Result { let gateway_cache = Arc::new(Cache::new()); let cache_clone = gateway_cache.clone(); @@ -56,6 +58,7 @@ impl GatewayService { region_map, signing_key: Arc::new(settings.signing_keypair()?), delegate_cache, + shutdown, }) } @@ -275,6 +278,7 @@ impl iot_config::Gateway for GatewayService { let signing_key = self.signing_key.clone(); let batch_size = request.batch_size; let region_map = self.region_map.clone(); + let shutdown_listener = self.shutdown.clone(); let (tx, rx) = tokio::sync::mpsc::channel(20); @@ -285,6 +289,7 @@ impl iot_config::Gateway for GatewayService { &signing_key, region_map.clone(), batch_size, + shutdown_listener, ) .await }); @@ -299,11 +304,15 @@ async fn stream_all_gateways_info( signing_key: &Keypair, region_map: RegionMapReader, batch_size: u32, + shutdown_listener: triggered::Listener, ) -> anyhow::Result<()> { let timestamp = Utc::now().encode_timestamp(); let signer: Vec = signing_key.public_key().into(); let mut stream = gateway_info::db::all_info_stream(pool).chunks(batch_size as usize); while let Some(infos) = stream.next().await { + if shutdown_listener.is_triggered() { + break; + } let gateway_infos = infos .into_iter() .filter_map(|info| { @@ -313,24 +322,24 @@ async fn stream_all_gateways_info( }) .collect(); - let mut response = GatewayInfoStreamResV1 { + let mut gateway = GatewayInfoStreamResV1 { gateways: gateway_infos, timestamp, signer: signer.clone(), signature: vec![], }; - response = match signing_key.sign(&response.encode_to_vec()) { + gateway = match signing_key.sign(&gateway.encode_to_vec()) { Ok(signature) => GatewayInfoStreamResV1 { signature, - ..response + ..gateway }, Err(_) => { continue; } }; - tx.send(Ok(response)).await?; + tx.send(Ok(gateway)).await?; } Ok(()) } diff --git a/iot_config/src/lib.rs b/iot_config/src/lib.rs index 6d6d05c95..cbaaef6ba 100644 --- a/iot_config/src/lib.rs +++ b/iot_config/src/lib.rs @@ -36,14 +36,17 @@ pub fn update_channel() -> broadcast::Sender { update_tx } -pub async fn broadcast_update( +pub async fn broadcast_update( message: T, sender: broadcast::Sender, ) -> Result<(), broadcast::error::SendError> { while !enqueue_update(sender.len()) { tokio::time::sleep(tokio::time::Duration::from_millis(25)).await } - sender.send(message).map(|_| ()) + sender.send(message).map(|_| ()).map_err(|err| { + tracing::error!(error = ?err, "failed to broadcast routing update"); + err + }) } fn enqueue_update(queue_size: usize) -> bool { diff --git a/iot_config/src/main.rs b/iot_config/src/main.rs index 4c8ebafe5..9a69b77e5 100644 --- a/iot_config/src/main.rs +++ b/iot_config/src/main.rs @@ -95,6 +95,7 @@ impl Daemon { region_map.clone(), auth_cache.clone(), delegate_key_cache, + shutdown_listener.clone(), )?; let route_svc = RouteService::new( settings, @@ -108,6 +109,7 @@ impl Daemon { pool.clone(), route_svc.clone_update_channel(), delegate_key_updater, + shutdown_listener.clone(), )?; let admin_svc = AdminService::new( settings, diff --git a/iot_config/src/org_service.rs b/iot_config/src/org_service.rs index e095ba836..0799302ce 100644 --- a/iot_config/src/org_service.rs +++ b/iot_config/src/org_service.rs @@ -1,5 +1,6 @@ use crate::{ admin::{AuthCache, KeyType}, + broadcast_update, helium_netids, lora_field, org, route::list_routes, telemetry, verify_public_key, GrpcResult, Settings, @@ -26,6 +27,7 @@ pub struct OrgService { route_update_tx: broadcast::Sender, signing_key: Keypair, delegate_updater: watch::Sender, + shutdown: triggered::Listener, } #[derive(Clone, Debug, PartialEq)] @@ -41,6 +43,7 @@ impl OrgService { pool: Pool, route_update_tx: broadcast::Sender, delegate_updater: watch::Sender, + shutdown: triggered::Listener, ) -> Result { Ok(Self { auth_cache, @@ -48,6 +51,7 @@ impl OrgService { route_update_tx, signing_key: settings.signing_keypair()?, delegate_updater, + shutdown, }) } @@ -446,6 +450,9 @@ impl iot_config::Org for OrgService { let timestamp = Utc::now().encode_timestamp(); let signer: Vec = self.signing_key.public_key().into(); for route in org_routes { + if self.shutdown.is_triggered() { + break; + } let route_id = route.id.clone(); let mut update = RouteStreamResV1 { action: ActionV1::Add.into(), @@ -455,7 +462,7 @@ impl iot_config::Org for OrgService { signature: vec![], }; update.signature = self.sign_response(&update.encode_to_vec())?; - if self.route_update_tx.send(update).is_err() { + if broadcast_update(update, self.route_update_tx.clone()).await.is_err() { tracing::info!( route_id = route_id, "all subscribers disconnected; route disable incomplete" @@ -514,6 +521,9 @@ impl iot_config::Org for OrgService { let timestamp = Utc::now().encode_timestamp(); let signer: Vec = self.signing_key.public_key().into(); for route in org_routes { + if self.shutdown.is_triggered() { + break; + } let route_id = route.id.clone(); let mut update = RouteStreamResV1 { action: ActionV1::Add.into(), @@ -523,7 +533,7 @@ impl iot_config::Org for OrgService { signature: vec![], }; update.signature = self.sign_response(&update.encode_to_vec())?; - if self.route_update_tx.send(update).is_err() { + if broadcast_update(update, self.route_update_tx.clone()).await.is_err() { tracing::info!( route_id = route_id, "all subscribers disconnected; route enable incomplete" diff --git a/iot_config/src/route.rs b/iot_config/src/route.rs index 24ea553ac..5a8f5af43 100644 --- a/iot_config/src/route.rs +++ b/iot_config/src/route.rs @@ -137,28 +137,27 @@ pub async fn create_route( transaction.commit().await?; - if new_route.active && !new_route.locked { - let timestamp = Utc::now().encode_timestamp(); - let signer = signing_key.public_key().into(); - let mut update = proto::RouteStreamResV1 { - action: proto::ActionV1::Add.into(), - data: Some(proto::route_stream_res_v1::Data::Route( - new_route.clone().into(), - )), - timestamp, - signer, - signature: vec![], - }; - _ = signing_key - .sign(&update.encode_to_vec()) - .map_err(|err| tracing::error!("error signing route stream response: {err:?}")) - .and_then(|signature| { - update.signature = signature; - update_tx.send(update).map_err(|err| { - tracing::warn!("error broadcasting route stream response: {err:?}") - }) - }); + let timestamp = Utc::now().encode_timestamp(); + let signer = signing_key.public_key().into(); + let mut update = proto::RouteStreamResV1 { + action: proto::ActionV1::Add.into(), + data: Some(proto::route_stream_res_v1::Data::Route( + new_route.clone().into(), + )), + timestamp, + signer, + signature: vec![], }; + _ = futures::future::ready(signing_key.sign(&update.encode_to_vec())) + .map_err(|err| { + tracing::error!(error = ?err, "error signing route create"); + anyhow!("error signing route create") + }) + .and_then(|signature| { + update.signature = signature; + broadcast_update(update, update_tx).map_err(|_| anyhow!("failed broadcasting route create")) + }) + .await; Ok(new_route) } @@ -213,15 +212,16 @@ pub async fn update_route( signature: vec![], }; - _ = signing_key - .sign(&update_res.encode_to_vec()) - .map_err(|err| tracing::error!("error signing route stream response: {err:?}")) + _ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec())) + .map_err(|err| { + tracing::error!(error = ?err, "error signing route update"); + anyhow!("error signing route update") + }) .and_then(|signature| { update_res.signature = signature; - update_tx - .send(update_res) - .map_err(|err| tracing::warn!("error broadcasting route stream response: {err:?}")) - }); + broadcast_update(update_res, update_tx).map_err(|_| anyhow!("failed broadcasting route update")) + }) + .await; Ok(updated_route) } @@ -523,7 +523,6 @@ pub fn active_route_stream<'a>( select r.id, r.oui, r.net_id, r.max_copies, r.server_host, r.server_port, r.server_protocol_opts, r.active, r.ignore_empty_skf, o.locked from routes r join organizations o on r.oui = o.oui - where o.locked = false and r.active = true group by r.id, o.locked "#, ) diff --git a/iot_config/src/route_service.rs b/iot_config/src/route_service.rs index d9df24142..366d701a8 100644 --- a/iot_config/src/route_service.rs +++ b/iot_config/src/route_service.rs @@ -423,6 +423,7 @@ impl iot_config::Route for RouteService { let pool = self.pool.clone(); let (tx, rx) = tokio::sync::mpsc::channel(20); + let shutdown_listener = self.shutdown.clone(); tracing::debug!(route_id = request.route_id, "listing eui pairs"); @@ -447,6 +448,10 @@ impl iot_config::Route for RouteService { }; while let Some(eui) = eui_stream.next().await { + if shutdown_listener.is_triggered() { + _ = tx.send(Err(Status::unavailable("service shutting down"))).await; + return + } let message = match eui { Ok(eui) => Ok(eui.into()), Err(bad_eui) => Err(Status::internal(format!("invalid eui: {:?}", bad_eui))), @@ -514,35 +519,41 @@ impl iot_config::Route for RouteService { ) .collect::, Status>>() }) - .try_for_each(|batch: Vec<(ActionV1, EuiPairV1)>| async move { - let (to_add, to_remove): (Vec<(ActionV1, EuiPairV1)>, Vec<(ActionV1, EuiPairV1)>) = - batch + .try_for_each(move |batch: Vec<(ActionV1, EuiPairV1)>| { + let shutdown_listener = self.shutdown.clone(); + async move { + if shutdown_listener.is_triggered() { + return Err(Status::unavailable("service shutting down")); + } + let (to_add, to_remove): (Vec<(ActionV1, EuiPairV1)>, Vec<(ActionV1, EuiPairV1)>) = + batch + .into_iter() + .partition(|(action, _update)| action == &ActionV1::Add); + telemetry::count_eui_updates(to_add.len(), to_remove.len()); + tracing::debug!( + adding = to_add.len(), + removing = to_remove.len(), + "updating eui pairs" + ); + let adds_update: Vec = + to_add.into_iter().map(|(_, add)| add.into()).collect(); + let removes_update: Vec = to_remove .into_iter() - .partition(|(action, _update)| action == &ActionV1::Add); - telemetry::count_eui_updates(to_add.len(), to_remove.len()); - tracing::debug!( - adding = to_add.len(), - removing = to_remove.len(), - "updating eui pairs" - ); - let adds_update: Vec = - to_add.into_iter().map(|(_, add)| add.into()).collect(); - let removes_update: Vec = to_remove - .into_iter() - .map(|(_, remove)| remove.into()) - .collect(); - route::update_euis( - &adds_update, - &removes_update, - &self.pool, - self.signing_key.clone(), - self.clone_update_channel(), - ) - .await - .map_err(|err| { - tracing::error!("eui pair update failed: {err:?}"); - Status::internal(format!("eui pair update failed: {err:?}")) - }) + .map(|(_, remove)| remove.into()) + .collect(); + route::update_euis( + &adds_update, + &removes_update, + &self.pool, + self.signing_key.clone(), + self.clone_update_channel(), + ) + .await + .map_err(|err| { + tracing::error!("eui pair update failed: {err:?}"); + Status::internal(format!("eui pair update failed: {err:?}")) + }) + } }) .await?; @@ -570,6 +581,7 @@ impl iot_config::Route for RouteService { let (tx, rx) = tokio::sync::mpsc::channel(20); let pool = self.pool.clone(); + let shutdown_listener = self.shutdown.clone(); tracing::debug!(route_id = request.route_id, "listing devaddr ranges"); @@ -592,6 +604,10 @@ impl iot_config::Route for RouteService { }; while let Some(devaddr) = devaddrs.next().await { + if shutdown_listener.is_triggered() { + _ = tx.send(Err(Status::unavailable("service shutting down"))).await; + return + } let message = match devaddr { Ok(devaddr) => Ok(devaddr.into()), Err(bad_devaddr) => Err(Status::internal(format!( @@ -666,37 +682,43 @@ impl iot_config::Route for RouteService { }) .collect::, Status>>() }) - .try_for_each(|batch: Vec<(ActionV1, DevaddrRangeV1)>| async move { - let (to_add, to_remove): ( - Vec<(ActionV1, DevaddrRangeV1)>, - Vec<(ActionV1, DevaddrRangeV1)>, - ) = batch - .into_iter() - .partition(|(action, _update)| action == &ActionV1::Add); - telemetry::count_devaddr_updates(to_add.len(), to_remove.len()); - tracing::debug!( - adding = to_add.len(), - removing = to_remove.len(), - "updating devaddr ranges" - ); - let adds_update: Vec = - to_add.into_iter().map(|(_, add)| add.into()).collect(); - let removes_update: Vec = to_remove - .into_iter() - .map(|(_, remove)| remove.into()) - .collect(); - route::update_devaddr_ranges( - &adds_update, - &removes_update, - &self.pool, - self.signing_key.clone(), - self.clone_update_channel(), - ) - .await - .map_err(|err| { - tracing::error!("devaddr range update failed: {err:?}"); - Status::internal("devaddr range update failed") - }) + .try_for_each(move |batch: Vec<(ActionV1, DevaddrRangeV1)>| { + let shutdown_listener = self.shutdown.clone(); + async move { + if shutdown_listener.is_triggered() { + return Err(Status::unavailable("service shutting down")) + } + let (to_add, to_remove): ( + Vec<(ActionV1, DevaddrRangeV1)>, + Vec<(ActionV1, DevaddrRangeV1)>, + ) = batch + .into_iter() + .partition(|(action, _update)| action == &ActionV1::Add); + telemetry::count_devaddr_updates(to_add.len(), to_remove.len()); + tracing::debug!( + adding = to_add.len(), + removing = to_remove.len(), + "updating devaddr ranges" + ); + let adds_update: Vec = + to_add.into_iter().map(|(_, add)| add.into()).collect(); + let removes_update: Vec = to_remove + .into_iter() + .map(|(_, remove)| remove.into()) + .collect(); + route::update_devaddr_ranges( + &adds_update, + &removes_update, + &self.pool, + self.signing_key.clone(), + self.clone_update_channel(), + ) + .await + .map_err(|err| { + tracing::error!("devaddr range update failed: {err:?}"); + Status::internal("devaddr range update failed") + }) + } }) .await?; @@ -724,6 +746,7 @@ impl iot_config::Route for RouteService { let pool = self.pool.clone(); let (tx, rx) = tokio::sync::mpsc::channel(20); + let shutdown_listener = self.shutdown.clone(); tracing::debug!( route_id = request.route_id, @@ -751,6 +774,10 @@ impl iot_config::Route for RouteService { }; while let Some(skf) = skf_stream.next().await { + if shutdown_listener.is_triggered() { + _ = tx.send(Err(Status::unavailable("service shutting down"))); + return + } let message = match skf { Ok(skf) => Ok(skf.into()), Err(bad_skf) => Err(Status::internal(format!("invalid skf: {:?}", bad_skf))), @@ -778,6 +805,7 @@ impl iot_config::Route for RouteService { let pool = self.pool.clone(); let (tx, rx) = tokio::sync::mpsc::channel(20); + let shutdown_listener = self.shutdown.clone(); tracing::debug!( route_id = request.route_id, @@ -809,6 +837,10 @@ impl iot_config::Route for RouteService { }; while let Some(skf) = skf_stream.next().await { + if shutdown_listener.is_triggered() { + _ = tx.send(Err(Status::unavailable("service shutting down"))).await; + return + } let message = match skf { Ok(skf) => Ok(skf.into()), Err(bad_skf) => Err(Status::internal(format!("invalid skf: {:?}", bad_skf))), @@ -1048,6 +1080,7 @@ async fn stream_existing_routes( pool: &Pool, signing_key: &Keypair, tx: mpsc::Sender>, + shutdown_listener: triggered::Listener, ) -> Result<()> { let timestamp = Utc::now().encode_timestamp(); let signer: Vec = signing_key.public_key().into(); @@ -1069,6 +1102,13 @@ async fn stream_existing_routes( } }) .map_err(|err| anyhow!(err)) + .map(move |send_result| { + if shutdown_listener.is_triggered() { + Err(anyhow!("service shutting down")) + } else { + send_result + } + }) .try_fold((), |acc, _| async move { Ok(acc) }) .await }