Skip to content

Commit

Permalink
revert #611
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffgrunewald committed Aug 29, 2023
1 parent 07c60e4 commit 3ae434d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 90 deletions.
2 changes: 1 addition & 1 deletion iot_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn broadcast_update<T: std::fmt::Debug>(
fn enqueue_update(queue_size: usize) -> bool {
// enqueue the message for broadcast if
// the current queue is <= 80% full
(queue_size * 100) / BROADCAST_CHANNEL_QUEUE <= 85
(queue_size * 100) / BROADCAST_CHANNEL_QUEUE <= 80
}

pub fn verify_public_key(bytes: &[u8]) -> Result<PublicKey, Status> {
Expand Down
160 changes: 74 additions & 86 deletions iot_config/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub enum RouteStorageError {
pub async fn create_route(
route: Route,
db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy,
signing_key: Arc<Keypair>,
signing_key: &Keypair,
update_tx: Sender<proto::RouteStreamResV1>,
) -> anyhow::Result<Route> {
let net_id: i32 = route.net_id.into();
Expand Down Expand Up @@ -137,43 +137,38 @@ pub async fn create_route(

transaction.commit().await?;

tokio::spawn({
let new_route = new_route.clone();
async move {
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = futures::future::ready(signing_key.sign(&update.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route create");
anyhow!("error signing route create")
})
.and_then(|signature| {
update.signature = signature;
broadcast_update(update, update_tx).map_err(|_| {
tracing::error!("failed broadcasting route create");
anyhow!("failed broadcasting route create")
})
})
.await;
}
});
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = futures::future::ready(signing_key.sign(&update.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route create");
anyhow!("error signing route create")
})
.and_then(|signature| {
update.signature = signature;
broadcast_update(update, update_tx).map_err(|_| {
tracing::error!("failed broadcasting route create");
anyhow!("failed broadcasting route create")
})
})
.await;

Ok(new_route)
}

pub async fn update_route(
route: Route,
db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy,
signing_key: Arc<Keypair>,
signing_key: &Keypair,
update_tx: Sender<proto::RouteStreamResV1>,
) -> anyhow::Result<Route> {
let protocol_opts = route
Expand Down Expand Up @@ -208,36 +203,31 @@ pub async fn update_route(

transaction.commit().await?;

tokio::spawn({
let updated_route = updated_route.clone();
async move {
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update_res = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
updated_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};

_ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route update");
anyhow!("error signing route update")
})
.and_then(|signature| {
update_res.signature = signature;
broadcast_update(update_res, update_tx).map_err(|_| {
tracing::error!("failed broadcasting route update");
anyhow!("failed broadcasting route update")
})
})
.await;
}
});
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update_res = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
updated_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};

_ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route update");
anyhow!("error signing route update")
})
.and_then(|signature| {
update_res.signature = signature;
broadcast_update(update_res, update_tx).map_err(|_| {
tracing::error!("failed broadcasting route update");
anyhow!("failed broadcasting route update")
})
})
.await;

Ok(updated_route)
}
Expand Down Expand Up @@ -636,7 +626,7 @@ pub async fn get_route(id: &str, db: impl sqlx::PgExecutor<'_>) -> anyhow::Resul
pub async fn delete_route(
id: &str,
db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy,
signing_key: Arc<Keypair>,
signing_key: &Keypair,
update_tx: Sender<proto::RouteStreamResV1>,
) -> anyhow::Result<()> {
let uuid = Uuid::try_parse(id)?;
Expand All @@ -656,30 +646,28 @@ pub async fn delete_route(

transaction.commit().await?;

tokio::spawn(async move {
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut delete_res = proto::RouteStreamResV1 {
action: proto::ActionV1::Remove.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};

_ = signing_key
.sign(&delete_res.encode_to_vec())
.map_err(|_| anyhow!("failed to sign route delete update"))
.and_then(|signature| {
delete_res.signature = signature;
update_tx.send(delete_res).map_err(|_| {
tracing::error!("failed to broadcast route delete update");
anyhow!("failed to broadcast route delete update")
})
});
});
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut delete_res = proto::RouteStreamResV1 {
action: proto::ActionV1::Remove.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};

_ = signing_key
.sign(&delete_res.encode_to_vec())
.map_err(|_| anyhow!("failed to sign route delete update"))
.and_then(|signature| {
delete_res.signature = signature;
update_tx.send(delete_res).map_err(|_| {
tracing::error!("failed to broadcast route delete update");
anyhow!("failed to broadcast route delete update")
})
});

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions iot_config/src/route_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl iot_config::Route for RouteService {
let new_route: Route = route::create_route(
route,
&self.pool,
self.signing_key.clone(),
&self.signing_key,
self.clone_update_channel(),
)
.await
Expand Down Expand Up @@ -323,7 +323,7 @@ impl iot_config::Route for RouteService {
let updated_route = route::update_route(
route,
&self.pool,
self.signing_key.clone(),
&self.signing_key,
self.clone_update_channel(),
)
.await
Expand Down Expand Up @@ -360,7 +360,7 @@ impl iot_config::Route for RouteService {
route::delete_route(
&request.id,
&self.pool,
self.signing_key.clone(),
&self.signing_key,
self.clone_update_channel(),
)
.await
Expand Down

0 comments on commit 3ae434d

Please sign in to comment.