Skip to content

Commit

Permalink
Socket Addrs (#812)
Browse files Browse the repository at this point in the history
* Use SocketAddr for poc-metrics

we can turn a runtime `.parse()?` error into a start time config error.

* Use SocketAddr for ingest

* Use SocketAddr for iot_config

* Use SocketAddr for mobile-config

* Let OS assign available port in test

Binding to port `0` let's the OS assign a port. As unlikely as it is to hit a used port, it's still more likely than being given one.
  • Loading branch information
michaeldjeffrey authored May 14, 2024
1 parent a6d1430 commit d35d865
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 111 deletions.
3 changes: 1 addition & 2 deletions ingest/src/server_iot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,6 @@ impl poc_lora::PocLora for GrpcServer {
}

pub async fn grpc_server(settings: &Settings) -> Result<()> {
let grpc_addr = settings.listen_addr()?;

// Initialize uploader
let (file_upload, file_upload_server) =
file_upload::FileUpload::from_settings_tm(&settings.output).await?;
Expand Down Expand Up @@ -374,6 +372,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
.create()
.await?;

let grpc_addr = settings.listen;
let grpc_server = GrpcServer {
beacon_report_sink,
witness_report_sink,
Expand Down
3 changes: 1 addition & 2 deletions ingest/src/server_mobile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,8 +342,6 @@ impl poc_mobile::PocMobile for GrpcServer {
}

pub async fn grpc_server(settings: &Settings) -> Result<()> {
let grpc_addr = settings.listen_addr()?;

// Initialize uploader
let (file_upload, file_upload_server) =
file_upload::FileUpload::from_settings_tm(&settings.output).await?;
Expand Down Expand Up @@ -451,6 +449,7 @@ pub async fn grpc_server(settings: &Settings) -> Result<()> {
bail!("expected valid api token in settings");
};

let grpc_addr = settings.listen;
let grpc_server = GrpcServer {
heartbeat_report_sink,
wifi_heartbeat_report_sink,
Expand Down
16 changes: 4 additions & 12 deletions ingest/src/settings.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use config::{Config, Environment, File};
use helium_crypto::Network;
use serde::Deserialize;
use std::{
net::{AddrParseError, SocketAddr},
path::Path,
str::FromStr,
};
use std::{net::SocketAddr, path::Path};

#[derive(Debug, Deserialize)]
pub struct Settings {
Expand All @@ -20,7 +16,7 @@ pub struct Settings {
pub mode: Mode,
/// Listen address. Required. Default is 0.0.0.0:9081
#[serde(default = "default_listen_addr")]
pub listen: String,
pub listen: SocketAddr,
/// Local folder for storing intermediate files
pub cache: String,
/// Network required in all public keys: mainnet | testnet
Expand Down Expand Up @@ -49,8 +45,8 @@ pub fn default_session_key_offer_timeout() -> u64 {
5
}

pub fn default_listen_addr() -> String {
"0.0.0.0:9081".to_string()
pub fn default_listen_addr() -> SocketAddr {
"0.0.0.0:9081".parse().unwrap()
}

pub fn default_log() -> String {
Expand Down Expand Up @@ -97,10 +93,6 @@ impl Settings {
.and_then(|config| config.try_deserialize())
}

pub fn listen_addr(&self) -> Result<SocketAddr, AddrParseError> {
SocketAddr::from_str(&self.listen)
}

pub fn session_key_offer_timeout(&self) -> std::time::Duration {
std::time::Duration::from_secs(self.session_key_offer_timeout)
}
Expand Down
71 changes: 36 additions & 35 deletions ingest/tests/iot_ingest.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, str::FromStr};
use std::net::{SocketAddr, TcpListener};

use backon::{ExponentialBuilder, Retryable};
use file_store::file_sink::{FileSinkClient, Message as SinkMessage};
Expand All @@ -12,7 +12,7 @@ use helium_proto::services::poc_lora::{
};
use ingest::server_iot::GrpcServer;
use prost::Message;
use rand::{rngs::OsRng, Rng};
use rand::rngs::OsRng;
use task_manager::TaskManager;
use tokio::{sync::mpsc::error::TryRecvError, task::LocalSet, time::timeout};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
Expand All @@ -22,12 +22,12 @@ use tonic::{transport::Channel, Streaming};
async fn initialize_session_and_send_beacon_and_witness() {
let (beacon_client, mut beacons) = create_file_sink();
let (witness_client, mut witnesses) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -38,7 +38,7 @@ async fn initialize_session_and_send_beacon_and_witness() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand Down Expand Up @@ -75,12 +75,12 @@ async fn initialize_session_and_send_beacon_and_witness() {
async fn stream_stops_after_incorrectly_signed_init_request() {
let (beacon_client, _) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -91,7 +91,7 @@ async fn stream_stops_after_incorrectly_signed_init_request() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand All @@ -113,12 +113,12 @@ async fn stream_stops_after_incorrectly_signed_init_request() {
async fn stream_stops_after_incorrectly_signed_beacon() {
let (beacon_client, beacons) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -129,7 +129,7 @@ async fn stream_stops_after_incorrectly_signed_beacon() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand All @@ -154,12 +154,12 @@ async fn stream_stops_after_incorrectly_signed_beacon() {
async fn stream_stops_after_incorrect_beacon_pubkey() {
let (beacon_client, beacons) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -170,7 +170,7 @@ async fn stream_stops_after_incorrect_beacon_pubkey() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand Down Expand Up @@ -198,12 +198,12 @@ async fn stream_stops_after_incorrect_beacon_pubkey() {
async fn stream_stops_after_incorrectly_signed_witness() {
let (beacon_client, _) = create_file_sink();
let (witness_client, witnesses) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -214,7 +214,7 @@ async fn stream_stops_after_incorrectly_signed_witness() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand All @@ -239,12 +239,12 @@ async fn stream_stops_after_incorrectly_signed_witness() {
async fn stream_stops_after_incorrect_witness_pubkey() {
let (beacon_client, _) = create_file_sink();
let (witness_client, witnesses) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -255,7 +255,7 @@ async fn stream_stops_after_incorrect_witness_pubkey() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand Down Expand Up @@ -283,12 +283,12 @@ async fn stream_stops_after_incorrect_witness_pubkey() {
async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() {
let (beacon_client, mut beacons) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server = create_test_server(port, beacon_client, witness_client, None, None);
let server = create_test_server(addr, beacon_client, witness_client, None, None);
TaskManager::builder()
.add_task(server)
.build()
Expand All @@ -299,7 +299,7 @@ async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() {
let pub_key = generate_keypair();
let session_key = generate_keypair();

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

client
Expand Down Expand Up @@ -337,21 +337,21 @@ async fn stream_stop_if_client_attempts_to_initiliaze_2nd_session() {
async fn stream_stops_if_init_not_sent_within_timeout() {
let (beacon_client, _) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server =
create_test_server(port, beacon_client, witness_client, Some(500), None);
create_test_server(addr, beacon_client, witness_client, Some(500), None);
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let _offer = client.receive_offer().await;

client.assert_closed().await;
Expand All @@ -363,21 +363,21 @@ async fn stream_stops_if_init_not_sent_within_timeout() {
async fn stream_stops_on_session_timeout() {
let (beacon_client, mut beacons) = create_file_sink();
let (witness_client, _) = create_file_sink();
let port = get_port();
let addr = get_socket_addr().expect("socket addr");

LocalSet::new()
.run_until(async move {
tokio::task::spawn_local(async move {
let server =
create_test_server(port, beacon_client, witness_client, Some(500), Some(900));
create_test_server(addr, beacon_client, witness_client, Some(500), Some(900));
TaskManager::builder()
.add_task(server)
.build()
.start()
.await
});

let mut client = connect_and_stream(port).await;
let mut client = connect_and_stream(addr).await;
let offer = client.receive_offer().await;

let pub_key = generate_keypair();
Expand Down Expand Up @@ -449,8 +449,8 @@ fn create_file_sink() -> (FileSinkClient, MockFileSinkReceiver) {
)
}

async fn connect_and_stream(port: u64) -> TestClient {
let mut client = (|| PocLoraClient::connect(format!("http://127.0.0.1:{port}")))
async fn connect_and_stream(socket_addr: SocketAddr) -> TestClient {
let mut client = (|| PocLoraClient::connect(format!("http://{socket_addr}")))
.retry(&ExponentialBuilder::default())
.await
.expect("client connect");
Expand Down Expand Up @@ -572,7 +572,7 @@ impl TestClient {
}

fn create_test_server(
port: u64,
socket_addr: SocketAddr,
beacon_file_sink: FileSinkClient,
witness_file_sink: FileSinkClient,
offer_timeout: Option<u64>,
Expand All @@ -584,7 +584,7 @@ fn create_test_server(
beacon_report_sink: beacon_file_sink,
witness_report_sink: witness_file_sink,
required_network: Network::MainNet,
address: SocketAddr::from_str(&format!("127.0.0.1:{port}")).expect("socket address"),
address: socket_addr,
session_key_offer_timeout: std::time::Duration::from_millis(offer_timeout),
session_key_timeout: std::time::Duration::from_millis(timeout),
}
Expand All @@ -598,6 +598,7 @@ fn seconds(s: u64) -> std::time::Duration {
std::time::Duration::from_secs(s)
}

fn get_port() -> u64 {
rand::thread_rng().gen_range(6000..10000)
fn get_socket_addr() -> anyhow::Result<SocketAddr> {
let listener = TcpListener::bind("127.0.0.1:0")?;
Ok(listener.local_addr()?)
}
3 changes: 1 addition & 2 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ impl Daemon {
// Create on-chain metadata pool
let metadata_pool = settings.metadata.connect("iot-config-metadata").await?;

let listen_addr = settings.listen_addr()?;

let (auth_updater, auth_cache) = AuthCache::new(settings.admin_pubkey()?, &pool).await?;
let (region_updater, region_map) = RegionMapReader::new(&pool).await?;
let (delegate_key_updater, delegate_key_cache) = org::delegate_keys_cache(&pool).await?;
Expand Down Expand Up @@ -104,6 +102,7 @@ impl Daemon {
region_updater,
)?;

let listen_addr = settings.listen;
let pubkey = settings
.signing_keypair()
.map(|keypair| keypair.public_key().to_string())?;
Expand Down
Loading

0 comments on commit d35d865

Please sign in to comment.