Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into map/remove-hb-non-det…
Browse files Browse the repository at this point in the history
…erminism
  • Loading branch information
maplant committed Aug 30, 2023
2 parents 7fa1d17 + 1a3fd57 commit 5d7f34a
Show file tree
Hide file tree
Showing 21 changed files with 854 additions and 798 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ sqlx = {version = "0", features = [
"macros",
"runtime-tokio-rustls"
]}

helium-crypto = {version = "0.6.8", features=["sqlx-postgres", "multisig"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
hextree = "*"
Expand Down Expand Up @@ -102,6 +101,7 @@ data-credits = {git = "https://github.com/helium/helium-program-library.git", ta
helium-sub-daos = {git = "https://github.com/helium/helium-program-library.git", tag = "v0.1.0"}
price-oracle = {git = "https://github.com/helium/helium-program-library.git", tag = "v0.1.0"}
tokio-util = "0"
tower-http = {version = "0", features = ["trace"]}

[patch.crates-io]
sqlx = { git = "https://github.com/helium/sqlx.git", rev = "92a2268f02e0cac6fccb34d3e926347071dbb88d" }
1 change: 1 addition & 0 deletions iot_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ thiserror = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}
tonic = {workspace = true}
tower-http = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
triggered = {workspace = true}
2 changes: 1 addition & 1 deletion iot_config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub async fn broadcast_update<T: std::fmt::Debug>(
fn enqueue_update(queue_size: usize) -> bool {
// enqueue the message for broadcast if
// the current queue is <= 80% full
(queue_size * 100) / BROADCAST_CHANNEL_QUEUE <= 85
(queue_size * 100) / BROADCAST_CHANNEL_QUEUE <= 80
}

pub fn verify_public_key(bytes: &[u8]) -> Result<PublicKey, Status> {
Expand Down
1 change: 1 addition & 0 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ impl Daemon {
transport::Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(250)))
.http2_keepalive_timeout(Some(Duration::from_secs(60)))
.layer(tower_http::trace::TraceLayer::new_for_grpc())
.add_service(GatewayServer::new(gateway_svc))
.add_service(OrgServer::new(org_svc))
.add_service(RouteServer::new(route_svc))
Expand Down
160 changes: 74 additions & 86 deletions iot_config/src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub enum RouteStorageError {
pub async fn create_route(
route: Route,
db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy,
signing_key: Arc<Keypair>,
signing_key: &Keypair,
update_tx: Sender<proto::RouteStreamResV1>,
) -> anyhow::Result<Route> {
let net_id: i32 = route.net_id.into();
Expand Down Expand Up @@ -137,43 +137,38 @@ pub async fn create_route(

transaction.commit().await?;

tokio::spawn({
let new_route = new_route.clone();
async move {
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = futures::future::ready(signing_key.sign(&update.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route create");
anyhow!("error signing route create")
})
.and_then(|signature| {
update.signature = signature;
broadcast_update(update, update_tx).map_err(|_| {
tracing::error!("failed broadcasting route create");
anyhow!("failed broadcasting route create")
})
})
.await;
}
});
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
new_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};
_ = futures::future::ready(signing_key.sign(&update.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route create");
anyhow!("error signing route create")
})
.and_then(|signature| {
update.signature = signature;
broadcast_update(update, update_tx).map_err(|_| {
tracing::error!("failed broadcasting route create");
anyhow!("failed broadcasting route create")
})
})
.await;

Ok(new_route)
}

pub async fn update_route(
route: Route,
db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy,
signing_key: Arc<Keypair>,
signing_key: &Keypair,
update_tx: Sender<proto::RouteStreamResV1>,
) -> anyhow::Result<Route> {
let protocol_opts = route
Expand Down Expand Up @@ -208,36 +203,31 @@ pub async fn update_route(

transaction.commit().await?;

tokio::spawn({
let updated_route = updated_route.clone();
async move {
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update_res = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
updated_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};

_ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route update");
anyhow!("error signing route update")
})
.and_then(|signature| {
update_res.signature = signature;
broadcast_update(update_res, update_tx).map_err(|_| {
tracing::error!("failed broadcasting route update");
anyhow!("failed broadcasting route update")
})
})
.await;
}
});
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut update_res = proto::RouteStreamResV1 {
action: proto::ActionV1::Add.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
updated_route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};

_ = futures::future::ready(signing_key.sign(&update_res.encode_to_vec()))
.map_err(|err| {
tracing::error!(error = ?err, "error signing route update");
anyhow!("error signing route update")
})
.and_then(|signature| {
update_res.signature = signature;
broadcast_update(update_res, update_tx).map_err(|_| {
tracing::error!("failed broadcasting route update");
anyhow!("failed broadcasting route update")
})
})
.await;

Ok(updated_route)
}
Expand Down Expand Up @@ -636,7 +626,7 @@ pub async fn get_route(id: &str, db: impl sqlx::PgExecutor<'_>) -> anyhow::Resul
pub async fn delete_route(
id: &str,
db: impl sqlx::PgExecutor<'_> + sqlx::Acquire<'_, Database = sqlx::Postgres> + Copy,
signing_key: Arc<Keypair>,
signing_key: &Keypair,
update_tx: Sender<proto::RouteStreamResV1>,
) -> anyhow::Result<()> {
let uuid = Uuid::try_parse(id)?;
Expand All @@ -656,30 +646,28 @@ pub async fn delete_route(

transaction.commit().await?;

tokio::spawn(async move {
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut delete_res = proto::RouteStreamResV1 {
action: proto::ActionV1::Remove.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};

_ = signing_key
.sign(&delete_res.encode_to_vec())
.map_err(|_| anyhow!("failed to sign route delete update"))
.and_then(|signature| {
delete_res.signature = signature;
update_tx.send(delete_res).map_err(|_| {
tracing::error!("failed to broadcast route delete update");
anyhow!("failed to broadcast route delete update")
})
});
});
let timestamp = Utc::now().encode_timestamp();
let signer = signing_key.public_key().into();
let mut delete_res = proto::RouteStreamResV1 {
action: proto::ActionV1::Remove.into(),
data: Some(proto::route_stream_res_v1::Data::Route(
route.clone().into(),
)),
timestamp,
signer,
signature: vec![],
};

_ = signing_key
.sign(&delete_res.encode_to_vec())
.map_err(|_| anyhow!("failed to sign route delete update"))
.and_then(|signature| {
delete_res.signature = signature;
update_tx.send(delete_res).map_err(|_| {
tracing::error!("failed to broadcast route delete update");
anyhow!("failed to broadcast route delete update")
})
});

Ok(())
}
Expand Down
27 changes: 19 additions & 8 deletions iot_config/src/route_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl iot_config::Route for RouteService {
let new_route: Route = route::create_route(
route,
&self.pool,
self.signing_key.clone(),
&self.signing_key,
self.clone_update_channel(),
)
.await
Expand Down Expand Up @@ -323,7 +323,7 @@ impl iot_config::Route for RouteService {
let updated_route = route::update_route(
route,
&self.pool,
self.signing_key.clone(),
&self.signing_key,
self.clone_update_channel(),
)
.await
Expand Down Expand Up @@ -360,7 +360,7 @@ impl iot_config::Route for RouteService {
route::delete_route(
&request.id,
&self.pool,
self.signing_key.clone(),
&self.signing_key,
self.clone_update_channel(),
)
.await
Expand Down Expand Up @@ -403,7 +403,10 @@ impl iot_config::Route for RouteService {
.and_then(|_| stream_existing_euis(&pool, &signing_key, tx.clone()))
.and_then(|_| stream_existing_devaddrs(&pool, &signing_key, tx.clone()))
.and_then(|_| stream_existing_skfs(&pool, &signing_key, tx.clone())) => {
if result.is_err() { return; }
if let Err(error) = result {
tracing::error!(?error, "Error occurred streaming current routing configuration");
return;
}
}
}

Expand All @@ -417,10 +420,18 @@ impl iot_config::Route for RouteService {
telemetry::route_stream_unsubscribe();
return
}
msg = route_updates.recv() => if let Ok(update) = msg {
if tx.send(Ok(update)).await.is_err() {
telemetry::route_stream_unsubscribe();
return;
msg = route_updates.recv() => {
match msg {
Ok(update) => {
if tx.send(Ok(update)).await.is_err() {
tracing::info!("Client disconnected; shutting down stream");
telemetry::route_stream_unsubscribe();
return;
}
}
Err(error) => {
tracing::error!(?error, "Error occurred processing route stream update");
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions iot_verifier/src/poc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,9 @@ impl Poc {
invalid_response.details,
&witness_report.report,
witness_report.received_timestamp,
Some(beaconer_metadata.location),
beaconer_metadata.gain,
beaconer_metadata.elevation,
Some(witness_metadata.location),
witness_metadata.gain,
witness_metadata.elevation,
InvalidParticipantSide::Witness,
)),
}
Expand Down
1 change: 1 addition & 0 deletions mobile_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ thiserror = {workspace = true}
tokio = {workspace = true}
tokio-stream = {workspace = true}
tonic = {workspace = true}
tower-http = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
triggered = {workspace = true}
1 change: 1 addition & 0 deletions mobile_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl Daemon {
transport::Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(250)))
.http2_keepalive_timeout(Some(Duration::from_secs(60)))
.layer(tower_http::trace::TraceLayer::new_for_grpc())
.add_service(AdminServer::new(admin_svc))
.add_service(GatewayServer::new(gateway_svc))
.add_service(AuthorizationServer::new(auth_svc))
Expand Down
22 changes: 22 additions & 0 deletions mobile_verifier/migrations/15_speedtests_one_to_one.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

CREATE TABLE speedtests_migration (
pubkey text NOT NULL,
upload_speed bigint,
download_speed bigint,
latency integer,
serial_num text,
timestamp timestamptz NOT NULL,
inserted_at timestamptz default now(),
PRIMARY KEY(pubkey, timestamp)
);
CREATE INDEX idx_speedtests_pubkey on speedtests_migration (pubkey);

INSERT INTO speedtests_migration (pubkey, upload_speed, download_speed, latency, serial_num, timestamp)
SELECT id, (st).upload_speed, (st).download_speed, (st).latency, '', (st).timestamp
FROM (select id, unnest(speedtests) as st from speedtests) as tmp
ON CONFLICT DO NOTHING;

ALTER TABLE speedtests RENAME TO speedtests_old;
ALTER TABLE speedtests_migration RENAME TO speedtests;


Loading

0 comments on commit 5d7f34a

Please sign in to comment.