Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IoT Config locked route updates and shutdown listeners #567

Merged
merged 3 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions iot_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct GatewayService {
region_map: RegionMapReader,
signing_key: Arc<Keypair>,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
}

impl GatewayService {
Expand All @@ -44,6 +45,7 @@ impl GatewayService {
region_map: RegionMapReader,
auth_cache: AuthCache,
delegate_cache: watch::Receiver<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
let gateway_cache = Arc::new(Cache::new());
let cache_clone = gateway_cache.clone();
Expand All @@ -56,6 +58,7 @@ impl GatewayService {
region_map,
signing_key: Arc::new(settings.signing_keypair()?),
delegate_cache,
shutdown,
})
}

Expand Down Expand Up @@ -275,6 +278,7 @@ impl iot_config::Gateway for GatewayService {
let signing_key = self.signing_key.clone();
let batch_size = request.batch_size;
let region_map = self.region_map.clone();
let shutdown_listener = self.shutdown.clone();

let (tx, rx) = tokio::sync::mpsc::channel(20);

Expand All @@ -285,6 +289,7 @@ impl iot_config::Gateway for GatewayService {
&signing_key,
region_map.clone(),
batch_size,
shutdown_listener,
)
.await
});
Expand All @@ -299,11 +304,15 @@ async fn stream_all_gateways_info(
signing_key: &Keypair,
region_map: RegionMapReader,
batch_size: u32,
shutdown_listener: triggered::Listener,
) -> anyhow::Result<()> {
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = signing_key.public_key().into();
let mut stream = gateway_info::db::all_info_stream(pool).chunks(batch_size as usize);
while let Some(infos) = stream.next().await {
if shutdown_listener.is_triggered() {
break;
}
let gateway_infos = infos
.into_iter()
.filter_map(|info| {
Expand All @@ -313,24 +322,24 @@ async fn stream_all_gateways_info(
})
.collect();

let mut response = GatewayInfoStreamResV1 {
let mut gateway = GatewayInfoStreamResV1 {
gateways: gateway_infos,
timestamp,
signer: signer.clone(),
signature: vec![],
};

response = match signing_key.sign(&response.encode_to_vec()) {
gateway = match signing_key.sign(&gateway.encode_to_vec()) {
Ok(signature) => GatewayInfoStreamResV1 {
signature,
..response
..gateway
},
Err(_) => {
continue;
}
};

tx.send(Ok(response)).await?;
tx.send(Ok(gateway)).await?;
}
Ok(())
}
7 changes: 5 additions & 2 deletions iot_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ pub fn update_channel<T: Clone>() -> broadcast::Sender<T> {
update_tx
}

pub async fn broadcast_update<T>(
pub async fn broadcast_update<T: std::fmt::Debug>(
message: T,
sender: broadcast::Sender<T>,
) -> Result<(), broadcast::error::SendError<T>> {
while !enqueue_update(sender.len()) {
tokio::time::sleep(tokio::time::Duration::from_millis(25)).await
}
sender.send(message).map(|_| ())
sender.send(message).map(|_| ()).map_err(|err| {
tracing::error!(error = ?err, "failed to broadcast routing update");
err
})
}

fn enqueue_update(queue_size: usize) -> bool {
Expand Down
2 changes: 2 additions & 0 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl Daemon {
region_map.clone(),
auth_cache.clone(),
delegate_key_cache,
shutdown_listener.clone(),
)?;
let route_svc = RouteService::new(
settings,
Expand All @@ -108,6 +109,7 @@ impl Daemon {
pool.clone(),
route_svc.clone_update_channel(),
delegate_key_updater,
shutdown_listener.clone(),
)?;
let admin_svc = AdminService::new(
settings,
Expand Down
21 changes: 18 additions & 3 deletions iot_config/src/org_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
admin::{AuthCache, KeyType},
helium_netids, lora_field, org,
broadcast_update, helium_netids, lora_field, org,
route::list_routes,
telemetry, verify_public_key, GrpcResult, Settings,
};
Expand All @@ -26,6 +26,7 @@ pub struct OrgService {
route_update_tx: broadcast::Sender<RouteStreamResV1>,
signing_key: Keypair,
delegate_updater: watch::Sender<org::DelegateCache>,
shutdown: triggered::Listener,
}

#[derive(Clone, Debug, PartialEq)]
Expand All @@ -41,13 +42,15 @@ impl OrgService {
pool: Pool<Postgres>,
route_update_tx: broadcast::Sender<RouteStreamResV1>,
delegate_updater: watch::Sender<org::DelegateCache>,
shutdown: triggered::Listener,
) -> Result<Self> {
Ok(Self {
auth_cache,
pool,
route_update_tx,
signing_key: settings.signing_keypair()?,
delegate_updater,
shutdown,
})
}

Expand Down Expand Up @@ -446,6 +449,9 @@ impl iot_config::Org for OrgService {
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in org_routes {
if self.shutdown.is_triggered() {
break;
}
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
Expand All @@ -455,7 +461,10 @@ impl iot_config::Org for OrgService {
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if self.route_update_tx.send(update).is_err() {
if broadcast_update(update, self.route_update_tx.clone())
.await
.is_err()
{
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route disable incomplete"
Expand Down Expand Up @@ -514,6 +523,9 @@ impl iot_config::Org for OrgService {
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in org_routes {
if self.shutdown.is_triggered() {
break;
}
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
Expand All @@ -523,7 +535,10 @@ impl iot_config::Org for OrgService {
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if self.route_update_tx.send(update).is_err() {
if broadcast_update(update, self.route_update_tx.clone())
.await
.is_err()
{
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route enable incomplete"
Expand Down
59 changes: 30 additions & 29 deletions iot_config/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,28 +137,28 @@ pub async fn create_route(

transaction.commit().await?;

if new_route.active && !new_route.locked {
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = signing_key
.sign(&update.encode_to_vec())
.map_err(|err| tracing::error!("error signing route stream response: {err:?}"))
.and_then(|signature| {
update.signature = signature;
update_tx.send(update).map_err(|err| {
tracing::warn!("error broadcasting route stream response: {err:?}")
})
});
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = futures::future::ready(signing_key.sign(&update.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route create");
anyhow!("error signing route create")
})
.and_then(|signature| {
update.signature = signature;
broadcast_update(update, update_tx)
.map_err(|_| anyhow!("failed broadcasting route create"))
})
.await;

Ok(new_route)
}
Expand Down Expand Up @@ -213,15 +213,17 @@ pub async fn update_route(
signature: vec![],
};

_ = signing_key
.sign(&update_res.encode_to_vec())
.map_err(|err| tracing::error!("error signing route stream response: {err:?}"))
_ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route update");
anyhow!("error signing route update")
})
.and_then(|signature| {
update_res.signature = signature;
update_tx
.send(update_res)
.map_err(|err| tracing::warn!("error broadcasting route stream response: {err:?}"))
});
broadcast_update(update_res, update_tx)
.map_err(|_| anyhow!("failed broadcasting route update"))
})
.await;

Ok(updated_route)
}
Expand Down Expand Up @@ -523,7 +525,6 @@ pub fn active_route_stream<'a>(
select r.id, r.oui, r.net_id, r.max_copies, r.server_host, r.server_port, r.server_protocol_opts, r.active, r.ignore_empty_skf, o.locked
from routes r
join organizations o on r.oui = o.oui
where o.locked = false and r.active = true
group by r.id, o.locked
"#,
)
Expand Down
Loading