Skip to content

Commit

Permalink
finish shutdown listening in long-running streaming responses
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffgrunewald committed Jul 11, 2023
1 parent d074c70 commit 33c9adf
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 23 deletions.
13 changes: 9 additions & 4 deletions iot_config/src/org_service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
admin::{AuthCache, KeyType},
broadcast_update,
helium_netids, lora_field, org,
broadcast_update, helium_netids, lora_field, org,
route::list_routes,
telemetry, verify_public_key, GrpcResult, Settings,
};
Expand Down Expand Up @@ -462,7 +461,10 @@ impl iot_config::Org for OrgService {
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if broadcast_update(update, self.route_update_tx.clone()).await.is_err() {
if broadcast_update(update, self.route_update_tx.clone())
.await
.is_err()
{
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route disable incomplete"
Expand Down Expand Up @@ -533,7 +535,10 @@ impl iot_config::Org for OrgService {
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if broadcast_update(update, self.route_update_tx.clone()).await.is_err() {
if broadcast_update(update, self.route_update_tx.clone())
.await
.is_err()
{
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route enable incomplete"
Expand Down
6 changes: 4 additions & 2 deletions iot_config/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ pub async fn create_route(
})
.and_then(|signature| {
update.signature = signature;
broadcast_update(update, update_tx).map_err(|_| anyhow!("failed broadcasting route create"))
broadcast_update(update, update_tx)
.map_err(|_| anyhow!("failed broadcasting route create"))
})
.await;

Expand Down Expand Up @@ -219,7 +220,8 @@ pub async fn update_route(
})
.and_then(|signature| {
update_res.signature = signature;
broadcast_update(update_res, update_tx).map_err(|_| anyhow!("failed broadcasting route update"))
broadcast_update(update_res, update_tx)
.map_err(|_| anyhow!("failed broadcasting route update"))
})
.await;

Expand Down
79 changes: 62 additions & 17 deletions iot_config/src/route_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,21 @@ impl iot_config::Route for RouteService {
let mut route_updates = self.subscribe_to_routes();

tokio::spawn(async move {
if stream_existing_routes(&pool, &signing_key, tx.clone())
.and_then(|_| stream_existing_euis(&pool, &signing_key, tx.clone()))
.and_then(|_| stream_existing_devaddrs(&pool, &signing_key, tx.clone()))
.and_then(|_| stream_existing_skfs(&pool, &signing_key, tx.clone()))
if stream_existing_routes(&pool, &signing_key, tx.clone(), shutdown_listener.clone())
.and_then(|_| {
stream_existing_euis(&pool, &signing_key, tx.clone(), shutdown_listener.clone())
})
.and_then(|_| {
stream_existing_devaddrs(
&pool,
&signing_key,
tx.clone(),
shutdown_listener.clone(),
)
})
.and_then(|_| {
stream_existing_skfs(&pool, &signing_key, tx.clone(), shutdown_listener.clone())
})
.await
.is_err()
{
Expand Down Expand Up @@ -449,8 +460,10 @@ impl iot_config::Route for RouteService {

while let Some(eui) = eui_stream.next().await {
if shutdown_listener.is_triggered() {
_ = tx.send(Err(Status::unavailable("service shutting down"))).await;
return
_ = tx
.send(Err(Status::unavailable("service shutting down")))
.await;
return;
}
let message = match eui {
Ok(eui) => Ok(eui.into()),
Expand Down Expand Up @@ -525,10 +538,12 @@ impl iot_config::Route for RouteService {
if shutdown_listener.is_triggered() {
return Err(Status::unavailable("service shutting down"));
}
let (to_add, to_remove): (Vec<(ActionV1, EuiPairV1)>, Vec<(ActionV1, EuiPairV1)>) =
batch
.into_iter()
.partition(|(action, _update)| action == &ActionV1::Add);
let (to_add, to_remove): (
Vec<(ActionV1, EuiPairV1)>,
Vec<(ActionV1, EuiPairV1)>,
) = batch
.into_iter()
.partition(|(action, _update)| action == &ActionV1::Add);
telemetry::count_eui_updates(to_add.len(), to_remove.len());
tracing::debug!(
adding = to_add.len(),
Expand Down Expand Up @@ -605,8 +620,10 @@ impl iot_config::Route for RouteService {

while let Some(devaddr) = devaddrs.next().await {
if shutdown_listener.is_triggered() {
_ = tx.send(Err(Status::unavailable("service shutting down"))).await;
return
_ = tx
.send(Err(Status::unavailable("service shutting down")))
.await;
return;
}
let message = match devaddr {
Ok(devaddr) => Ok(devaddr.into()),
Expand Down Expand Up @@ -686,7 +703,7 @@ impl iot_config::Route for RouteService {
let shutdown_listener = self.shutdown.clone();
async move {
if shutdown_listener.is_triggered() {
return Err(Status::unavailable("service shutting down"))
return Err(Status::unavailable("service shutting down"));
}
let (to_add, to_remove): (
Vec<(ActionV1, DevaddrRangeV1)>,
Expand Down Expand Up @@ -775,8 +792,10 @@ impl iot_config::Route for RouteService {

while let Some(skf) = skf_stream.next().await {
if shutdown_listener.is_triggered() {
_ = tx.send(Err(Status::unavailable("service shutting down")));
return
_ = tx
.send(Err(Status::unavailable("service shutting down")))
.await;
return;
}
let message = match skf {
Ok(skf) => Ok(skf.into()),
Expand Down Expand Up @@ -838,8 +857,10 @@ impl iot_config::Route for RouteService {

while let Some(skf) = skf_stream.next().await {
if shutdown_listener.is_triggered() {
_ = tx.send(Err(Status::unavailable("service shutting down"))).await;
return
_ = tx
.send(Err(Status::unavailable("service shutting down")))
.await;
return;
}
let message = match skf {
Ok(skf) => Ok(skf.into()),
Expand Down Expand Up @@ -1117,6 +1138,7 @@ async fn stream_existing_euis(
pool: &Pool<Postgres>,
signing_key: &Keypair,
tx: mpsc::Sender<Result<RouteStreamResV1, Status>>,
shutdown_listener: triggered::Listener,
) -> Result<()> {
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = signing_key.public_key().into();
Expand All @@ -1138,6 +1160,13 @@ async fn stream_existing_euis(
}
})
.map_err(|err| anyhow!(err))
.map(move |send_result| {
if shutdown_listener.is_triggered() {
Err(anyhow!("service shutting down"))
} else {
send_result
}
})
.try_fold((), |acc, _| async move { Ok(acc) })
.await
}
Expand All @@ -1146,6 +1175,7 @@ async fn stream_existing_devaddrs(
pool: &Pool<Postgres>,
signing_key: &Keypair,
tx: mpsc::Sender<Result<RouteStreamResV1, Status>>,
shutdown_listener: triggered::Listener,
) -> Result<()> {
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = signing_key.public_key().into();
Expand All @@ -1169,6 +1199,13 @@ async fn stream_existing_devaddrs(
}
})
.map_err(|err| anyhow!(err))
.map(move |send_result| {
if shutdown_listener.is_triggered() {
Err(anyhow!("service shutting down"))
} else {
send_result
}
})
.try_fold((), |acc, _| async move { Ok(acc) })
.await
}
Expand All @@ -1177,6 +1214,7 @@ async fn stream_existing_skfs(
pool: &Pool<Postgres>,
signing_key: &Keypair,
tx: mpsc::Sender<Result<RouteStreamResV1, Status>>,
shutdown_listener: triggered::Listener,
) -> Result<()> {
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = signing_key.public_key().into();
Expand All @@ -1197,6 +1235,13 @@ async fn stream_existing_skfs(
}
})
.map_err(|err| anyhow!(err))
.map(move |send_result| {
if shutdown_listener.is_triggered() {
Err(anyhow!("service shutting down"))
} else {
send_result
}
})
.try_fold((), |acc, _| async move { Ok(acc) })
.await
}

0 comments on commit 33c9adf

Please sign in to comment.