diff --git a/iot_config/src/lib.rs b/iot_config/src/lib.rs index b2d6959d6..cbaaef6ba 100644 --- a/iot_config/src/lib.rs +++ b/iot_config/src/lib.rs @@ -52,7 +52,7 @@ pub async fn broadcast_update( fn enqueue_update(queue_size: usize) -> bool { // enqueue the message for broadcast if // the current queue is <= 80% full - (queue_size * 100) / BROADCAST_CHANNEL_QUEUE <= 85 + (queue_size * 100) / BROADCAST_CHANNEL_QUEUE <= 80 } pub fn verify_public_key(bytes: &[u8]) -> Result { diff --git a/iot_config/src/route.rs b/iot_config/src/route.rs index ecc7598af..3c4b2c7ab 100644 --- a/iot_config/src/route.rs +++ b/iot_config/src/route.rs @@ -100,7 +100,7 @@ pub enum RouteStorageError { pub async fn create_route( route: Route, db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy, - signing_key: Arc, + signing_key: &Keypair, update_tx: Sender, ) -> anyhow::Result { let net_id: i32 = route.net_id.into(); @@ -137,35 +137,30 @@ pub async fn create_route( transaction.commit().await?; - tokio::spawn({ - let new_route = new_route.clone(); - async move { - let timestamp = Utc::now().encode_timestamp(); - let signer = signing_key.public_key().into(); - let mut update = proto::RouteStreamResV1 { - action: proto::ActionV1::Add.into(), - data: Some(proto::route_stream_res_v1::Data::Route( - new_route.clone().into(), - )), - timestamp, - signer, - signature: vec![], - }; - _ = futures::future::ready(signing_key.sign(&update.encode_to_vec())) - .map_err(|err| { - tracing::error!(error = ?err, "error signing route create"); - anyhow!("error signing route create") - }) - .and_then(|signature| { - update.signature = signature; - broadcast_update(update, update_tx).map_err(|_| { - tracing::error!("failed broadcasting route create"); - anyhow!("failed broadcasting route create") - }) - }) - .await; - } - }); + let timestamp = Utc::now().encode_timestamp(); + let signer = signing_key.public_key().into(); + let mut update = proto::RouteStreamResV1 { + action: proto::ActionV1::Add.into(), + data: Some(proto::route_stream_res_v1::Data::Route( + new_route.clone().into(), + )), + timestamp, + signer, + signature: vec![], + }; + _ = futures::future::ready(signing_key.sign(&update.encode_to_vec())) + .map_err(|err| { + tracing::error!(error = ?err, "error signing route create"); + anyhow!("error signing route create") + }) + .and_then(|signature| { + update.signature = signature; + broadcast_update(update, update_tx).map_err(|_| { + tracing::error!("failed broadcasting route create"); + anyhow!("failed broadcasting route create") + }) + }) + .await; Ok(new_route) } @@ -173,7 +168,7 @@ pub async fn create_route( pub async fn update_route( route: Route, db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy, - signing_key: Arc, + signing_key: &Keypair, update_tx: Sender, ) -> anyhow::Result { let protocol_opts = route @@ -208,36 +203,31 @@ pub async fn update_route( transaction.commit().await?; - tokio::spawn({ - let updated_route = updated_route.clone(); - async move { - let timestamp = Utc::now().encode_timestamp(); - let signer = signing_key.public_key().into(); - let mut update_res = proto::RouteStreamResV1 { - action: proto::ActionV1::Add.into(), - data: Some(proto::route_stream_res_v1::Data::Route( - updated_route.clone().into(), - )), - timestamp, - signer, - signature: vec![], - }; - - _ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec())) - .map_err(|err| { - tracing::error!(error = ?err, "error signing route update"); - anyhow!("error signing route update") - }) - .and_then(|signature| { - update_res.signature = signature; - broadcast_update(update_res, update_tx).map_err(|_| { - tracing::error!("failed broadcasting route update"); - anyhow!("failed broadcasting route update") - }) - }) - .await; - } - }); + let timestamp = Utc::now().encode_timestamp(); + let signer = signing_key.public_key().into(); + let mut update_res = proto::RouteStreamResV1 { + action: proto::ActionV1::Add.into(), + data: Some(proto::route_stream_res_v1::Data::Route( + updated_route.clone().into(), + )), + timestamp, + signer, + signature: vec![], + }; + + _ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec())) + .map_err(|err| { + tracing::error!(error = ?err, "error signing route update"); + anyhow!("error signing route update") + }) + .and_then(|signature| { + update_res.signature = signature; + broadcast_update(update_res, update_tx).map_err(|_| { + tracing::error!("failed broadcasting route update"); + anyhow!("failed broadcasting route update") + }) + }) + .await; Ok(updated_route) } @@ -636,7 +626,7 @@ pub async fn get_route(id: &str, db: impl sqlx::PgExecutor<'_>) -> anyhow::Resul pub async fn delete_route( id: &str, db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy, - signing_key: Arc, + signing_key: &Keypair, update_tx: Sender, ) -> anyhow::Result<()> { let uuid = Uuid::try_parse(id)?; @@ -656,30 +646,28 @@ pub async fn delete_route( transaction.commit().await?; - tokio::spawn(async move { - let timestamp = Utc::now().encode_timestamp(); - let signer = signing_key.public_key().into(); - let mut delete_res = proto::RouteStreamResV1 { - action: proto::ActionV1::Remove.into(), - data: Some(proto::route_stream_res_v1::Data::Route( - route.clone().into(), - )), - timestamp, - signer, - signature: vec![], - }; - - _ = signing_key - .sign(&delete_res.encode_to_vec()) - .map_err(|_| anyhow!("failed to sign route delete update")) - .and_then(|signature| { - delete_res.signature = signature; - update_tx.send(delete_res).map_err(|_| { - tracing::error!("failed to broadcast route delete update"); - anyhow!("failed to broadcast route delete update") - }) - }); - }); + let timestamp = Utc::now().encode_timestamp(); + let signer = signing_key.public_key().into(); + let mut delete_res = proto::RouteStreamResV1 { + action: proto::ActionV1::Remove.into(), + data: Some(proto::route_stream_res_v1::Data::Route( + route.clone().into(), + )), + timestamp, + signer, + signature: vec![], + }; + + _ = signing_key + .sign(&delete_res.encode_to_vec()) + .map_err(|_| anyhow!("failed to sign route delete update")) + .and_then(|signature| { + delete_res.signature = signature; + update_tx.send(delete_res).map_err(|_| { + tracing::error!("failed to broadcast route delete update"); + anyhow!("failed to broadcast route delete update") + }) + }); Ok(()) } diff --git a/iot_config/src/route_service.rs b/iot_config/src/route_service.rs index 647a9c630..99baa6d95 100644 --- a/iot_config/src/route_service.rs +++ b/iot_config/src/route_service.rs @@ -280,7 +280,7 @@ impl iot_config::Route for RouteService { let new_route: Route = route::create_route( route, &self.pool, - self.signing_key.clone(), + &self.signing_key, self.clone_update_channel(), ) .await @@ -323,7 +323,7 @@ impl iot_config::Route for RouteService { let updated_route = route::update_route( route, &self.pool, - self.signing_key.clone(), + &self.signing_key, self.clone_update_channel(), ) .await @@ -360,7 +360,7 @@ impl iot_config::Route for RouteService { route::delete_route( &request.id, &self.pool, - self.signing_key.clone(), + &self.signing_key, self.clone_update_channel(), ) .await