From fb7037f39d8ae31a0e151f50a0fa058a27d7d3b1 Mon Sep 17 00:00:00 2001 From: Brian Balser Date: Fri, 18 Aug 2023 14:24:46 -0400 Subject: [PATCH] remove tm version from db_store and fix solana balance monitor --- db_store/src/iam_auth_pool.rs | 58 +---------------------- db_store/src/metric_tracker.rs | 50 ++----------------- db_store/src/settings.rs | 43 ++--------------- file_store/src/file_upload.rs | 7 ++- iot_config/src/main.rs | 16 ++----- iot_packet_verifier/src/daemon.rs | 7 ++- iot_verifier/src/main.rs | 6 +-- mobile_config/src/main.rs | 16 ++----- mobile_packet_verifier/src/daemon.rs | 18 +++---- mobile_verifier/src/cli/reward_from_db.rs | 7 +-- mobile_verifier/src/cli/server.rs | 6 +-- reward_index/src/main.rs | 6 +-- solana/src/balance_monitor.rs | 24 +++++++--- 13 files changed, 51 insertions(+), 213 deletions(-) diff --git a/db_store/src/iam_auth_pool.rs b/db_store/src/iam_auth_pool.rs index dbaaaaeac..a21c801f7 100644 --- a/db_store/src/iam_auth_pool.rs +++ b/db_store/src/iam_auth_pool.rs @@ -13,10 +13,7 @@ use aws_types::{ }; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -pub async fn connect( - settings: &Settings, - shutdown: triggered::Listener, -) -> Result<(Pool, futures::future::BoxFuture<'static, Result>)> { +pub async fn connect(settings: &Settings) -> Result> { let aws_config = aws_config::load_from_env().await; let client = aws_sdk_sts::Client::new(&aws_config); let connect_parameters = ConnectParameters::try_from(settings)?; @@ -28,34 +25,7 @@ pub async fn connect( .await?; let cloned_pool = pool.clone(); - let join_handle = - tokio::spawn(async move { run(client, connect_parameters, cloned_pool, shutdown).await }); - - Ok(( - pool, - Box::pin(async move { - match join_handle.await { - Ok(Ok(())) => Ok(()), - Ok(Err(err)) => Err(err), - Err(err) => Err(Error::from(err)), - } - }), - )) -} - -pub async fn connect_tm(settings: &Settings) -> Result> { - let aws_config = aws_config::load_from_env().await; - let client = aws_sdk_sts::Client::new(&aws_config); - let connect_parameters = ConnectParameters::try_from(settings)?; - let connect_options = connect_parameters.connect_options(&client).await?; - - let pool = settings - .pool_options() - .connect_with(connect_options) - .await?; - - let cloned_pool = pool.clone(); - tokio::spawn(async move { run_tm(client, connect_parameters, cloned_pool).await }); + tokio::spawn(async move { run(client, connect_parameters, cloned_pool).await }); Ok(pool) } @@ -64,30 +34,6 @@ async fn run( client: aws_sdk_sts::Client, connect_parameters: ConnectParameters, pool: Pool, - shutdown: triggered::Listener, -) -> Result { - let duration = std::time::Duration::from_secs(connect_parameters.iam_duration_seconds as u64) - - Duration::from_secs(120); - - loop { - let shutdown = shutdown.clone(); - - tokio::select! { - _ = shutdown => break, - _ = tokio::time::sleep(duration) => { - let connect_options = connect_parameters.connect_options(&client).await?; - pool.set_connect_options(connect_options); - } - } - } - - Ok(()) -} - -async fn run_tm( - client: aws_sdk_sts::Client, - connect_parameters: ConnectParameters, - pool: Pool, ) -> Result { let duration = std::time::Duration::from_secs(connect_parameters.iam_duration_seconds as u64) - Duration::from_secs(120); diff --git a/db_store/src/metric_tracker.rs b/db_store/src/metric_tracker.rs index ec5220cdc..19357c237 100644 --- a/db_store/src/metric_tracker.rs +++ b/db_store/src/metric_tracker.rs @@ -1,58 +1,14 @@ use std::time::Duration; -use crate::{Error, Result}; - const DURATION: Duration = Duration::from_secs(300); -pub async fn start( - app_name: &str, - pool: sqlx::Pool, - shutdown: triggered::Listener, -) -> Result> { +pub async fn start(app_name: &str, pool: sqlx::Pool) { let pool_size_name = format!("{app_name}_db_pool_size"); let pool_idle_name = format!("{app_name}_db_pool_idle"); - let join_handle = - tokio::spawn(async move { run(pool_size_name, pool_idle_name, pool, shutdown).await }); - - Ok(Box::pin(async move { - match join_handle.await { - Ok(()) => Ok(()), - Err(err) => Err(Error::from(err)), - } - })) -} - -pub async fn start_tm(app_name: &str, pool: sqlx::Pool) { - let pool_size_name = format!("{app_name}_db_pool_size"); - let pool_idle_name = format!("{app_name}_db_pool_idle"); - tokio::spawn(async move { run_tm(pool_size_name, pool_idle_name, pool).await }); -} - -async fn run( - size_name: String, - idle_name: String, - pool: sqlx::Pool, - shutdown: triggered::Listener, -) { - let mut trigger = tokio::time::interval(DURATION); - - loop { - let shutdown = shutdown.clone(); - - tokio::select! { - _ = shutdown => { - tracing::info!("db_store: MetricTracker shutting down"); - break; - } - _ = trigger.tick() => { - metrics::gauge!(size_name.clone(), pool.size() as f64); - metrics::gauge!(idle_name.clone(), pool.num_idle() as f64); - } - } - } + tokio::spawn(async move { run(pool_size_name, pool_idle_name, pool).await }); } -async fn run_tm(size_name: String, idle_name: String, pool: sqlx::Pool) { +async fn run(size_name: String, idle_name: String, pool: sqlx::Pool) { let mut trigger = tokio::time::interval(DURATION); loop { diff --git a/db_store/src/settings.rs b/db_store/src/settings.rs index c364bec66..9a46b7b74 100644 --- a/db_store/src/settings.rs +++ b/db_store/src/settings.rs @@ -37,53 +37,18 @@ fn default_auth_type() -> AuthType { } impl Settings { - pub async fn connect( - &self, - app_name: &str, - shutdown: triggered::Listener, - ) -> Result<(Pool, futures::future::BoxFuture<'static, Result>)> { - match self.auth_type { - AuthType::Postgres => match self.simple_connect().await { - Ok(pool) => Ok(( - pool.clone(), - metric_tracker::start(app_name, pool, shutdown).await?, - )), - Err(err) => Err(err), - }, - AuthType::Iam => { - let (pool, iam_auth_handle) = - iam_auth_pool::connect(self, shutdown.clone()).await?; - let metric_handle = metric_tracker::start(app_name, pool.clone(), shutdown).await?; - - let handle = - tokio::spawn(async move { tokio::try_join!(iam_auth_handle, metric_handle) }); - - Ok(( - pool, - Box::pin(async move { - match handle.await { - Ok(Err(err)) => Err(err), - Err(err) => Err(Error::from(err)), - Ok(_) => Ok(()), - } - }), - )) - } - } - } - - pub async fn connect_tm(&self, app_name: &str) -> Result> { + pub async fn connect(&self, app_name: &str) -> Result> { match self.auth_type { AuthType::Postgres => match self.simple_connect().await { Ok(pool) => { - metric_tracker::start_tm(app_name, pool.clone()).await; + metric_tracker::start(app_name, pool.clone()).await; Ok(pool) } Err(err) => Err(err), }, AuthType::Iam => { - let pool = iam_auth_pool::connect_tm(self).await?; - metric_tracker::start_tm(app_name, pool.clone()).await; + let pool = iam_auth_pool::connect(self).await?; + metric_tracker::start(app_name, pool.clone()).await; Ok(pool) } } diff --git a/file_store/src/file_upload.rs b/file_store/src/file_upload.rs index ead5fdf81..03e584d14 100644 --- a/file_store/src/file_upload.rs +++ b/file_store/src/file_upload.rs @@ -30,7 +30,10 @@ pub struct FileUploadServer { } impl FileUpload { - pub async fn from_settings(settings: &Settings, messages: MessageReceiver) -> Result { + pub async fn from_settings( + settings: &Settings, + messages: MessageReceiver, + ) -> Result { Ok(FileUploadServer { messages: UnboundedReceiverStream::new(messages), store: FileStore::from_settings(settings).await?, @@ -72,7 +75,7 @@ impl ManagedTask for FileUploadServer { impl FileUploadServer { pub async fn run(self, shutdown: &triggered::Listener) -> Result { - self.run_tm( shutdown.clone()).await + self.run_tm(shutdown.clone()).await } pub async fn run_tm(self, shutdown: triggered::Listener) -> Result { diff --git a/iot_config/src/main.rs b/iot_config/src/main.rs index 9a69b77e5..9aad8f83b 100644 --- a/iot_config/src/main.rs +++ b/iot_config/src/main.rs @@ -71,17 +71,11 @@ impl Daemon { }); // Create database pool - let (pool, db_join_handle) = settings - .database - .connect("iot-config-store", shutdown_listener.clone()) - .await?; + let pool = settings.database.connect("iot-config-store").await?; sqlx::migrate!().run(&pool).await?; // Create on-chain metadata pool - let (metadata_pool, md_pool_handle) = settings - .metadata - .connect("iot-config-metadata", shutdown_listener.clone()) - .await?; + let metadata_pool = settings.metadata.connect("iot-config-metadata").await?; let listen_addr = settings.listen_addr()?; @@ -136,11 +130,7 @@ impl Daemon { .serve_with_shutdown(listen_addr, shutdown_listener) .map_err(Error::from); - tokio::try_join!( - db_join_handle.map_err(Error::from), - md_pool_handle.map_err(Error::from), - server - )?; + tokio::try_join!(server)?; Ok(()) } diff --git a/iot_packet_verifier/src/daemon.rs b/iot_packet_verifier/src/daemon.rs index 9653739d5..b805e03fc 100644 --- a/iot_packet_verifier/src/daemon.rs +++ b/iot_packet_verifier/src/daemon.rs @@ -7,10 +7,11 @@ use crate::{ use anyhow::{bail, Result}; use file_store::{ file_info_poller_tm::{FileInfoStream, LookbackBehavior}, + file_sink_tm::FileSinkBuilder, file_sink_tm::FileSinkClient, file_source_tm, file_upload, iot_packet::PacketRouterPacketReport, - file_sink_tm::FileSinkBuilder, FileStore, FileType, + FileStore, FileType, }; use futures_util::TryFutureExt; use iot_config::client::OrgClient; @@ -38,7 +39,6 @@ impl ManagedTask for Daemon { } } - impl Daemon { pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> { loop { @@ -92,7 +92,7 @@ impl Cmd { poc_metrics::start_metrics(&settings.metrics)?; // Set up the postgres pool: - let pool = settings.database.connect_tm(env!("CARGO_PKG_NAME")).await?; + let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?; sqlx::migrate!().run(&pool).await?; let solana = if settings.enable_solana_integration { @@ -196,6 +196,5 @@ impl Cmd { .add_task(burner) .start() .await - } } diff --git a/iot_verifier/src/main.rs b/iot_verifier/src/main.rs index 8ad6e7189..97af307ab 100644 --- a/iot_verifier/src/main.rs +++ b/iot_verifier/src/main.rs @@ -76,10 +76,7 @@ impl Server { }); // Create database pool and run migrations - let (pool, db_join_handle) = settings - .database - .connect(env!("CARGO_PKG_NAME"), shutdown.clone()) - .await?; + let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?; sqlx::migrate!().run(&pool).await?; telemetry::initialize(&pool).await?; @@ -172,7 +169,6 @@ impl Server { PriceTracker::start(&settings.price_tracker, shutdown.clone()).await?; tokio::try_join!( - db_join_handle.map_err(Error::from), gateway_updater.run(&shutdown).map_err(Error::from), gateway_rewards_server.run().map_err(Error::from), reward_manifests_server.run().map_err(Error::from), diff --git a/mobile_config/src/main.rs b/mobile_config/src/main.rs index 81ad76729..4526e7515 100644 --- a/mobile_config/src/main.rs +++ b/mobile_config/src/main.rs @@ -72,17 +72,11 @@ impl Daemon { poc_metrics::start_metrics(&settings.metrics)?; // Create database pool - let (pool, pool_handle) = settings - .database - .connect("mobile-config-store", shutdown_listener.clone()) - .await?; + let pool = settings.database.connect("mobile-config-store").await?; sqlx::migrate!().run(&pool).await?; // Create on-chain metadata pool - let (metadata_pool, md_pool_handle) = settings - .metadata - .connect("mobile-config-metadata", shutdown_listener.clone()) - .await?; + let metadata_pool = settings.metadata.connect("mobile-config-metadata").await?; let listen_addr = settings.listen_addr()?; @@ -112,11 +106,7 @@ impl Daemon { .serve_with_shutdown(listen_addr, shutdown_listener) .map_err(Error::from); - tokio::try_join!( - pool_handle.map_err(Error::from), - md_pool_handle.map_err(Error::from), - server, - )?; + tokio::try_join!(server)?; Ok(()) } diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 53742c5da..0abc5a827 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -99,10 +99,7 @@ impl Cmd { }); // Set up the postgres pool: - let (pool, conn_handler) = settings - .database - .connect("mobile-packet-verifier", shutdown_listener.clone()) - .await?; + let pool = settings.database.connect("mobile-packet-verifier").await?; sqlx::migrate!().run(&pool).await?; // Set up the solana network: @@ -116,12 +113,8 @@ impl Cmd { None }; - let sol_balance_monitor = solana::balance_monitor::start( - env!("CARGO_PKG_NAME"), - solana.clone(), - shutdown_listener.clone(), - ) - .await?; + let sol_balance_monitor = + solana::balance_monitor::BalanceMonitor::new(env!("CARGO_PKG_NAME"), solana.clone())?; let (file_upload_tx, file_upload_rx) = file_upload::message_channel(); let file_upload = @@ -189,8 +182,9 @@ impl Cmd { invalid_sessions_server.run().map_err(Error::from), file_upload.run(&shutdown_listener).map_err(Error::from), daemon.run(&shutdown_listener).map_err(Error::from), - conn_handler.map_err(Error::from), - sol_balance_monitor.map_err(Error::from), + sol_balance_monitor + .run(shutdown_listener.clone()) + .map_err(Error::from), event_id_purger.run(shutdown_listener.clone()), )?; diff --git a/mobile_verifier/src/cli/reward_from_db.rs b/mobile_verifier/src/cli/reward_from_db.rs index 0246cc80f..3cfb86174 100644 --- a/mobile_verifier/src/cli/reward_from_db.rs +++ b/mobile_verifier/src/cli/reward_from_db.rs @@ -32,11 +32,8 @@ impl Cmd { let epoch = start..end; let expected_rewards = get_scheduled_tokens_for_poc_and_dc(epoch.end - epoch.start); - let (shutdown_trigger, shutdown_listener) = triggered::trigger(); - let (pool, _join_handle) = settings - .database - .connect(env!("CARGO_PKG_NAME"), shutdown_listener) - .await?; + let (shutdown_trigger, _shutdown_listener) = triggered::trigger(); + let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?; let heartbeats = HeartbeatReward::validated(&pool, &epoch); let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?; diff --git a/mobile_verifier/src/cli/server.rs b/mobile_verifier/src/cli/server.rs index 51590b75d..5912ef838 100644 --- a/mobile_verifier/src/cli/server.rs +++ b/mobile_verifier/src/cli/server.rs @@ -33,10 +33,7 @@ impl Cmd { } }); - let (pool, db_join_handle) = settings - .database - .connect(env!("CARGO_PKG_NAME"), shutdown_listener.clone()) - .await?; + let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?; sqlx::migrate!().run(&pool).await?; telemetry::initialize(&pool).await?; @@ -197,7 +194,6 @@ impl Cmd { let data_session_ingestor = DataSessionIngestor::new(pool.clone()); tokio::try_join!( - db_join_handle.map_err(Error::from), valid_heartbeats_server.run().map_err(Error::from), valid_speedtests_server.run().map_err(Error::from), mobile_rewards_server.run().map_err(Error::from), diff --git a/reward_index/src/main.rs b/reward_index/src/main.rs index 9c33a6849..3ddc907f4 100644 --- a/reward_index/src/main.rs +++ b/reward_index/src/main.rs @@ -70,10 +70,7 @@ impl Server { // Create database pool let app_name = format!("{}_{}", settings.mode, env!("CARGO_PKG_NAME")); - let (pool, db_join_handle) = settings - .database - .connect(&app_name, shutdown_listener.clone()) - .await?; + let pool = settings.database.connect(&app_name).await?; sqlx::migrate!().run(&pool).await?; telemetry::initialize(&pool).await?; @@ -99,7 +96,6 @@ impl Server { let mut indexer = Indexer::new(settings, pool).await?; tokio::try_join!( - db_join_handle.map_err(anyhow::Error::from), source_join_handle.map_err(anyhow::Error::from), indexer.run(shutdown_listener, receiver), )?; diff --git a/solana/src/balance_monitor.rs b/solana/src/balance_monitor.rs index 8e388ca4c..69327bfb0 100644 --- a/solana/src/balance_monitor.rs +++ b/solana/src/balance_monitor.rs @@ -34,6 +34,16 @@ impl BalanceMonitor { } } } + + pub async fn run(self, shutdown: triggered::Listener) -> anyhow::Result<()> { + match self { + Self::Noop => Ok(()), + Self::Solana(metric, solana, pubkey) => { + run(metric, solana, pubkey, shutdown).await; + Ok(()) + } + } + } } impl ManagedTask for BalanceMonitor { @@ -41,13 +51,13 @@ impl ManagedTask for BalanceMonitor { self: Box, shutdown: triggered::Listener, ) -> LocalBoxFuture<'static, anyhow::Result<()>> { - match *self { - Self::Noop => Box::pin(async move { Ok(()) }), - Self::Solana(metric, solana, pubkey) => { - let handle = tokio::spawn(run(metric, solana, pubkey, shutdown)); - Box::pin(handle.map_err(anyhow::Error::from)) - } - } + let handle = tokio::spawn(self.run(shutdown)); + + Box::pin( + handle + .map_err(anyhow::Error::from) + .and_then(|result| async move { result.map_err(anyhow::Error::from) }), + ) } }