Skip to content

Commit

Permalink
switch shutdown polling for select
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffgrunewald committed Jul 15, 2023
1 parent 33c9adf commit cc6886a
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 274 deletions.
23 changes: 10 additions & 13 deletions iot_config/src/gateway_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,16 @@ impl iot_config::Gateway for GatewayService {
let (tx, rx) = tokio::sync::mpsc::channel(20);

tokio::spawn(async move {
stream_all_gateways_info(
&pool,
tx.clone(),
&signing_key,
region_map.clone(),
batch_size,
shutdown_listener,
)
.await
tokio::select! {
_ = shutdown_listener => (),
_ = stream_all_gateways_info(
&pool,
tx.clone(),
&signing_key,
region_map.clone(),
batch_size,
) => (),
}
});

Ok(Response::new(GrpcStreamResult::new(rx)))
Expand All @@ -304,15 +305,11 @@ async fn stream_all_gateways_info(
signing_key: &Keypair,
region_map: RegionMapReader,
batch_size: u32,
shutdown_listener: triggered::Listener,
) -> anyhow::Result<()> {
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = signing_key.public_key().into();
let mut stream = gateway_info::db::all_info_stream(pool).chunks(batch_size as usize);
while let Some(infos) = stream.next().await {
if shutdown_listener.is_triggered() {
break;
}
let gateway_infos = infos
.into_iter()
.filter_map(|info| {
Expand Down
114 changes: 38 additions & 76 deletions iot_config/src/org_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,38 @@ impl OrgService {
.sign(response)
.map_err(|_| Status::internal("response signing error"))
}

async fn stream_org_routes_enable_disable(&self, oui: u64) -> Result<(), Status> {
let routes = list_routes(oui, &self.pool).await.map_err(|err| {
tracing::error!(org = oui, reason = ?err, "failed to list org routes for streaming update");
Status::internal(format!("error retrieving routes for updated org: {}", oui))
})?;
let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in routes {
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
data: Some(route_stream_res_v1::Data::Route(route.into())),
timestamp,
signer: signer.clone(),
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if broadcast_update(update, self.route_update_tx.clone())
.await
.is_err()
{
tracing::info!(
route_id,
"all subscribers disconnected; org routes update incomplete"
);
break;
};
tracing::debug!(route_id, "route updated");
}
Ok(())
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -434,44 +466,9 @@ impl iot_config::Org for OrgService {
Status::internal(format!("org disable failed for: {}", request.oui))
})?;

let org_routes = list_routes(request.oui, &self.pool).await.map_err(|err| {
tracing::error!(
org = request.oui,
reason = ?err,
"failed to list org routes for streaming disable update"
);
Status::internal(format!(
"error retrieving routes for disabled org: {}",
request.oui
))
})?;

let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in org_routes {
if self.shutdown.is_triggered() {
break;
}
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
data: Some(route_stream_res_v1::Data::Route(route.into())),
timestamp,
signer: signer.clone(),
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if broadcast_update(update, self.route_update_tx.clone())
.await
.is_err()
{
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route disable incomplete"
);
break;
};
tracing::debug!(route_id = route_id, "route disabled");
tokio::select! {
_ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")),
result = self.stream_org_routes_enable_disable(request.oui) => result?
}
}

Expand Down Expand Up @@ -508,44 +505,9 @@ impl iot_config::Org for OrgService {
Status::internal(format!("org enable failed for: {}", request.oui))
})?;

let org_routes = list_routes(request.oui, &self.pool).await.map_err(|err| {
tracing::error!(
org = request.oui,
reason = ?err,
"failed to list routes for streaming enable update"
);
Status::internal(format!(
"error retrieving routes for enabled org: {}",
request.oui
))
})?;

let timestamp = Utc::now().encode_timestamp();
let signer: Vec<u8> = self.signing_key.public_key().into();
for route in org_routes {
if self.shutdown.is_triggered() {
break;
}
let route_id = route.id.clone();
let mut update = RouteStreamResV1 {
action: ActionV1::Add.into(),
data: Some(route_stream_res_v1::Data::Route(route.into())),
timestamp,
signer: signer.clone(),
signature: vec![],
};
update.signature = self.sign_response(&update.encode_to_vec())?;
if broadcast_update(update, self.route_update_tx.clone())
.await
.is_err()
{
tracing::info!(
route_id = route_id,
"all subscribers disconnected; route enable incomplete"
);
break;
};
tracing::debug!(route_id = route_id, "route enabled");
tokio::select! {
_ = self.shutdown.clone() => return Err(Status::unavailable("service shutting down")),
result = self.stream_org_routes_enable_disable(request.oui) => result?
}
}

Expand Down
Loading

0 comments on commit cc6886a

Please sign in to comment.