diff --git a/iot_config/src/org_service.rs b/iot_config/src/org_service.rs index 0799302ce..2d5cb0e7f 100644 --- a/iot_config/src/org_service.rs +++ b/iot_config/src/org_service.rs @@ -1,7 +1,6 @@ use crate::{ admin::{AuthCache, KeyType}, - broadcast_update, - helium_netids, lora_field, org, + broadcast_update, helium_netids, lora_field, org, route::list_routes, telemetry, verify_public_key, GrpcResult, Settings, }; @@ -462,7 +461,10 @@ impl iot_config::Org for OrgService { signature: vec![], }; update.signature = self.sign_response(&update.encode_to_vec())?; - if broadcast_update(update, self.route_update_tx.clone()).await.is_err() { + if broadcast_update(update, self.route_update_tx.clone()) + .await + .is_err() + { tracing::info!( route_id = route_id, "all subscribers disconnected; route disable incomplete" @@ -533,7 +535,10 @@ impl iot_config::Org for OrgService { signature: vec![], }; update.signature = self.sign_response(&update.encode_to_vec())?; - if broadcast_update(update, self.route_update_tx.clone()).await.is_err() { + if broadcast_update(update, self.route_update_tx.clone()) + .await + .is_err() + { tracing::info!( route_id = route_id, "all subscribers disconnected; route enable incomplete" diff --git a/iot_config/src/route.rs b/iot_config/src/route.rs index 5a8f5af43..e3e34d6a3 100644 --- a/iot_config/src/route.rs +++ b/iot_config/src/route.rs @@ -155,7 +155,8 @@ pub async fn create_route( }) .and_then(|signature| { update.signature = signature; - broadcast_update(update, update_tx).map_err(|_| anyhow!("failed broadcasting route create")) + broadcast_update(update, update_tx) + .map_err(|_| anyhow!("failed broadcasting route create")) }) .await; @@ -219,7 +220,8 @@ pub async fn update_route( }) .and_then(|signature| { update_res.signature = signature; - broadcast_update(update_res, update_tx).map_err(|_| anyhow!("failed broadcasting route update")) + broadcast_update(update_res, update_tx) + .map_err(|_| anyhow!("failed broadcasting route update")) }) .await; diff --git a/iot_config/src/route_service.rs b/iot_config/src/route_service.rs index 366d701a8..beee8f48e 100644 --- a/iot_config/src/route_service.rs +++ b/iot_config/src/route_service.rs @@ -376,10 +376,21 @@ impl iot_config::Route for RouteService { let mut route_updates = self.subscribe_to_routes(); tokio::spawn(async move { - if stream_existing_routes(&pool, &signing_key, tx.clone()) - .and_then(|_| stream_existing_euis(&pool, &signing_key, tx.clone())) - .and_then(|_| stream_existing_devaddrs(&pool, &signing_key, tx.clone())) - .and_then(|_| stream_existing_skfs(&pool, &signing_key, tx.clone())) + if stream_existing_routes(&pool, &signing_key, tx.clone(), shutdown_listener.clone()) + .and_then(|_| { + stream_existing_euis(&pool, &signing_key, tx.clone(), shutdown_listener.clone()) + }) + .and_then(|_| { + stream_existing_devaddrs( + &pool, + &signing_key, + tx.clone(), + shutdown_listener.clone(), + ) + }) + .and_then(|_| { + stream_existing_skfs(&pool, &signing_key, tx.clone(), shutdown_listener.clone()) + }) .await .is_err() { @@ -449,8 +460,10 @@ impl iot_config::Route for RouteService { while let Some(eui) = eui_stream.next().await { if shutdown_listener.is_triggered() { - _ = tx.send(Err(Status::unavailable("service shutting down"))).await; - return + _ = tx + .send(Err(Status::unavailable("service shutting down"))) + .await; + return; } let message = match eui { Ok(eui) => Ok(eui.into()), @@ -525,10 +538,12 @@ impl iot_config::Route for RouteService { if shutdown_listener.is_triggered() { return Err(Status::unavailable("service shutting down")); } - let (to_add, to_remove): (Vec<(ActionV1, EuiPairV1)>, Vec<(ActionV1, EuiPairV1)>) = - batch - .into_iter() - .partition(|(action, _update)| action == &ActionV1::Add); + let (to_add, to_remove): ( + Vec<(ActionV1, EuiPairV1)>, + Vec<(ActionV1, EuiPairV1)>, + ) = batch + .into_iter() + .partition(|(action, _update)| action == &ActionV1::Add); telemetry::count_eui_updates(to_add.len(), to_remove.len()); tracing::debug!( adding = to_add.len(), @@ -605,8 +620,10 @@ impl iot_config::Route for RouteService { while let Some(devaddr) = devaddrs.next().await { if shutdown_listener.is_triggered() { - _ = tx.send(Err(Status::unavailable("service shutting down"))).await; - return + _ = tx + .send(Err(Status::unavailable("service shutting down"))) + .await; + return; } let message = match devaddr { Ok(devaddr) => Ok(devaddr.into()), @@ -686,7 +703,7 @@ impl iot_config::Route for RouteService { let shutdown_listener = self.shutdown.clone(); async move { if shutdown_listener.is_triggered() { - return Err(Status::unavailable("service shutting down")) + return Err(Status::unavailable("service shutting down")); } let (to_add, to_remove): ( Vec<(ActionV1, DevaddrRangeV1)>, @@ -775,8 +792,10 @@ impl iot_config::Route for RouteService { while let Some(skf) = skf_stream.next().await { if shutdown_listener.is_triggered() { - _ = tx.send(Err(Status::unavailable("service shutting down"))); - return + _ = tx + .send(Err(Status::unavailable("service shutting down"))) + .await; + return; } let message = match skf { Ok(skf) => Ok(skf.into()), @@ -838,8 +857,10 @@ impl iot_config::Route for RouteService { while let Some(skf) = skf_stream.next().await { if shutdown_listener.is_triggered() { - _ = tx.send(Err(Status::unavailable("service shutting down"))).await; - return + _ = tx + .send(Err(Status::unavailable("service shutting down"))) + .await; + return; } let message = match skf { Ok(skf) => Ok(skf.into()), @@ -1117,6 +1138,7 @@ async fn stream_existing_euis( pool: &Pool, signing_key: &Keypair, tx: mpsc::Sender>, + shutdown_listener: triggered::Listener, ) -> Result<()> { let timestamp = Utc::now().encode_timestamp(); let signer: Vec = signing_key.public_key().into(); @@ -1138,6 +1160,13 @@ async fn stream_existing_euis( } }) .map_err(|err| anyhow!(err)) + .map(move |send_result| { + if shutdown_listener.is_triggered() { + Err(anyhow!("service shutting down")) + } else { + send_result + } + }) .try_fold((), |acc, _| async move { Ok(acc) }) .await } @@ -1146,6 +1175,7 @@ async fn stream_existing_devaddrs( pool: &Pool, signing_key: &Keypair, tx: mpsc::Sender>, + shutdown_listener: triggered::Listener, ) -> Result<()> { let timestamp = Utc::now().encode_timestamp(); let signer: Vec = signing_key.public_key().into(); @@ -1169,6 +1199,13 @@ async fn stream_existing_devaddrs( } }) .map_err(|err| anyhow!(err)) + .map(move |send_result| { + if shutdown_listener.is_triggered() { + Err(anyhow!("service shutting down")) + } else { + send_result + } + }) .try_fold((), |acc, _| async move { Ok(acc) }) .await } @@ -1177,6 +1214,7 @@ async fn stream_existing_skfs( pool: &Pool, signing_key: &Keypair, tx: mpsc::Sender>, + shutdown_listener: triggered::Listener, ) -> Result<()> { let timestamp = Utc::now().encode_timestamp(); let signer: Vec = signing_key.public_key().into(); @@ -1197,6 +1235,13 @@ async fn stream_existing_skfs( } }) .map_err(|err| anyhow!(err)) + .map(move |send_result| { + if shutdown_listener.is_triggered() { + Err(anyhow!("service shutting down")) + } else { + send_result + } + }) .try_fold((), |acc, _| async move { Ok(acc) }) .await }