Skip to content

Commit

Permalink
more shutdown listeners in streams
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffgrunewald committed Jul 10, 2023
1 parent e253815 commit d074c70
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 96 deletions.
17 changes: 13 additions & 4 deletions iot_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct GatewayService {
region_map: RegionMapReader,
signing_key: Arc<Keypair>,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
}

impl GatewayService {
Expand All @@ -44,6 +45,7 @@ impl GatewayService {
region_map: RegionMapReader,
auth_cache: AuthCache,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
let gateway_cache = Arc::new(Cache::new());
let cache_clone = gateway_cache.clone();
Expand All @@ -56,6 +58,7 @@ impl GatewayService {
region_map,
signing_key: Arc::new(settings.signing_keypair()?),
delegate_cache,
shutdown,
})
}

Expand Down Expand Up @@ -275,6 +278,7 @@ impl iot_config::Gateway for GatewayService {
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;
let region_map = self.region_map.clone();
let shutdown_listener = self.shutdown.clone();

let (tx, rx) = tokio::sync::mpsc::channel(20);

Expand All @@ -285,6 +289,7 @@ impl iot_config::Gateway for GatewayService {
&signing_key,
region_map.clone(),
batch_size,
shutdown_listener,
)
.await
});
Expand All @@ -299,11 +304,15 @@ async fn stream_all_gateways_info(
signing_key: &Keypair,
region_map: RegionMapReader,
batch_size: u32,
shutdown_listener: triggered::Listener,
) -> anyhow::Result<()> {
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = signing_key.public_key().into();
let mut stream = gateway_info::db::all_info_stream(pool).chunks(batch_size as usize);
while let Some(infos) = stream.next().await {
if shutdown_listener.is_triggered() {
break;
}
let gateway_infos = infos
.into_iter()
.filter_map(|info| {
Expand All @@ -313,24 +322,24 @@ async fn stream_all_gateways_info(
})
.collect();

let mut response = GatewayInfoStreamResV1 {
let mut gateway = GatewayInfoStreamResV1 {
gateways: gateway_infos,
timestamp,
signer: signer.clone(),
signature: vec![],
};

response = match signing_key.sign(&response.encode_to_vec()) {
gateway = match signing_key.sign(&gateway.encode_to_vec()) {
Ok(signature) => GatewayInfoStreamResV1 {
signature,
..response
..gateway
},
Err(_) => {
continue;
}
};

tx.send(Ok(response)).await?;
tx.send(Ok(gateway)).await?;
}
Ok(())
}
7 changes: 5 additions & 2 deletions iot_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ pub fn update_channel<T: Clone>() -> broadcast::Sender<T> {
update_tx
}

pub async fn broadcast_update<T>(
pub async fn broadcast_update<T: std::fmt::Debug>(
message: T,
sender: broadcast::Sender<T>,
) -> Result<(), broadcast::error::SendError<T>> {
while !enqueue_update(sender.len()) {
tokio::time::sleep(tokio::time::Duration::from_millis(25)).await
}
sender.send(message).map(|_| ())
sender.send(message).map(|_| ()).map_err(|err| {
tracing::error!(error = ?err, "failed to broadcast routing update");
err
})
}

fn enqueue_update(queue_size: usize) -> bool {
Expand Down
2 changes: 2 additions & 0 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl Daemon {
region_map.clone(),
auth_cache.clone(),
delegate_key_cache,
shutdown_listener.clone(),
)?;
let route_svc = RouteService::new(
settings,
Expand All @@ -108,6 +109,7 @@ impl Daemon {
pool.clone(),
route_svc.clone_update_channel(),
delegate_key_updater,
shutdown_listener.clone(),
)?;
let admin_svc = AdminService::new(
settings,
Expand Down
14 changes: 12 additions & 2 deletions iot_config/src/org_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
admin::{AuthCache, KeyType},
broadcast_update,
helium_netids, lora_field, org,
route::list_routes,
telemetry, verify_public_key, GrpcResult, Settings,
Expand All @@ -26,6 +27,7 @@ pub struct OrgService {
route_update_tx: broadcast::Sender<RouteStreamResV1>,
signing_key: Keypair,
delegate_updater: watch::Sender<org::DelegateCache>,
shutdown: triggered::Listener,
}

#[derive(Clone, Debug, PartialEq)]
Expand All @@ -41,13 +43,15 @@ impl OrgService {
pool: Pool<Postgres>,
route_update_tx: broadcast::Sender<RouteStreamResV1>,
delegate_updater: watch::Sender<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
Ok(Self {
auth_cache,
pool,
route_update_tx,
signing_key: settings.signing_keypair()?,
delegate_updater,
shutdown,
})
}

Expand Down Expand Up @@ -446,6 +450,9 @@ impl iot_config::Org for OrgService {
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in org_routes {
if self.shutdown.is_triggered() {
break;
}
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
Expand All @@ -455,7 +462,7 @@ impl iot_config::Org for OrgService {
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if self.route_update_tx.send(update).is_err() {
if broadcast_update(update, self.route_update_tx.clone()).await.is_err() {
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route disable incomplete"
Expand Down Expand Up @@ -514,6 +521,9 @@ impl iot_config::Org for OrgService {
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in org_routes {
if self.shutdown.is_triggered() {
break;
}
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
Expand All @@ -523,7 +533,7 @@ impl iot_config::Org for OrgService {
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if self.route_update_tx.send(update).is_err() {
if broadcast_update(update, self.route_update_tx.clone()).await.is_err() {
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route enable incomplete"
Expand Down
57 changes: 28 additions & 29 deletions iot_config/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,27 @@ pub async fn create_route(

transaction.commit().await?;

if new_route.active && !new_route.locked {
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = signing_key
.sign(&update.encode_to_vec())
.map_err(|err| tracing::error!("error signing route stream response: {err:?}"))
.and_then(|signature| {
update.signature = signature;
update_tx.send(update).map_err(|err| {
tracing::warn!("error broadcasting route stream response: {err:?}")
})
});
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = futures::future::ready(signing_key.sign(&update.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route create");
anyhow!("error signing route create")
})
.and_then(|signature| {
update.signature = signature;
broadcast_update(update, update_tx).map_err(|_| anyhow!("failed broadcasting route create"))
})
.await;

Ok(new_route)
}
Expand Down Expand Up @@ -213,15 +212,16 @@ pub async fn update_route(
signature: vec![],
};

_ = signing_key
.sign(&update_res.encode_to_vec())
.map_err(|err| tracing::error!("error signing route stream response: {err:?}"))
_ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route update");
anyhow!("error signing route update")
})
.and_then(|signature| {
update_res.signature = signature;
update_tx
.send(update_res)
.map_err(|err| tracing::warn!("error broadcasting route stream response: {err:?}"))
});
broadcast_update(update_res, update_tx).map_err(|_| anyhow!("failed broadcasting route update"))
})
.await;

Ok(updated_route)
}
Expand Down Expand Up @@ -523,7 +523,6 @@ pub fn active_route_stream<'a>(
select r.id, r.oui, r.net_id, r.max_copies, r.server_host, r.server_port, r.server_protocol_opts, r.active, r.ignore_empty_skf, o.locked
from routes r
join organizations o on r.oui = o.oui
where o.locked = false and r.active = true
group by r.id, o.locked
"#,
)
Expand Down
Loading

0 comments on commit d074c70

Please sign in to comment.