Skip to content

Commit

Permalink
remove tm version from db_store and fix solana balance monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
bbalser committed Aug 18, 2023
1 parent 9af9dd2 commit fb7037f
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 213 deletions.
58 changes: 2 additions & 56 deletions db_store/src/iam_auth_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use aws_types::{
};
use std::time::{Duration, SystemTime, UNIX_EPOCH};

pub async fn connect(
settings: &Settings,
shutdown: triggered::Listener,
) -> Result<(Pool<Postgres>, futures::future::BoxFuture<'static, Result>)> {
pub async fn connect(settings: &Settings) -> Result<Pool<Postgres>> {
let aws_config = aws_config::load_from_env().await;
let client = aws_sdk_sts::Client::new(&aws_config);
let connect_parameters = ConnectParameters::try_from(settings)?;
Expand All @@ -28,34 +25,7 @@ pub async fn connect(
.await?;

let cloned_pool = pool.clone();
let join_handle =
tokio::spawn(async move { run(client, connect_parameters, cloned_pool, shutdown).await });

Ok((
pool,
Box::pin(async move {
match join_handle.await {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(Error::from(err)),
}
}),
))
}

pub async fn connect_tm(settings: &Settings) -> Result<Pool<Postgres>> {
let aws_config = aws_config::load_from_env().await;
let client = aws_sdk_sts::Client::new(&aws_config);
let connect_parameters = ConnectParameters::try_from(settings)?;
let connect_options = connect_parameters.connect_options(&client).await?;

let pool = settings
.pool_options()
.connect_with(connect_options)
.await?;

let cloned_pool = pool.clone();
tokio::spawn(async move { run_tm(client, connect_parameters, cloned_pool).await });
tokio::spawn(async move { run(client, connect_parameters, cloned_pool).await });

Ok(pool)
}
Expand All @@ -64,30 +34,6 @@ async fn run(
client: aws_sdk_sts::Client,
connect_parameters: ConnectParameters,
pool: Pool<Postgres>,
shutdown: triggered::Listener,
) -> Result {
let duration = std::time::Duration::from_secs(connect_parameters.iam_duration_seconds as u64)
- Duration::from_secs(120);

loop {
let shutdown = shutdown.clone();

tokio::select! {
_ = shutdown => break,
_ = tokio::time::sleep(duration) => {
let connect_options = connect_parameters.connect_options(&client).await?;
pool.set_connect_options(connect_options);
}
}
}

Ok(())
}

async fn run_tm(
client: aws_sdk_sts::Client,
connect_parameters: ConnectParameters,
pool: Pool<Postgres>,
) -> Result {
let duration = std::time::Duration::from_secs(connect_parameters.iam_duration_seconds as u64)
- Duration::from_secs(120);
Expand Down
50 changes: 3 additions & 47 deletions db_store/src/metric_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,14 @@
use std::time::Duration;

use crate::{Error, Result};

const DURATION: Duration = Duration::from_secs(300);

pub async fn start(
app_name: &str,
pool: sqlx::Pool<sqlx::Postgres>,
shutdown: triggered::Listener,
) -> Result<futures::future::BoxFuture<'static, Result>> {
pub async fn start(app_name: &str, pool: sqlx::Pool<sqlx::Postgres>) {
let pool_size_name = format!("{app_name}_db_pool_size");
let pool_idle_name = format!("{app_name}_db_pool_idle");
let join_handle =
tokio::spawn(async move { run(pool_size_name, pool_idle_name, pool, shutdown).await });

Ok(Box::pin(async move {
match join_handle.await {
Ok(()) => Ok(()),
Err(err) => Err(Error::from(err)),
}
}))
}

pub async fn start_tm(app_name: &str, pool: sqlx::Pool<sqlx::Postgres>) {
let pool_size_name = format!("{app_name}_db_pool_size");
let pool_idle_name = format!("{app_name}_db_pool_idle");
tokio::spawn(async move { run_tm(pool_size_name, pool_idle_name, pool).await });
}

async fn run(
size_name: String,
idle_name: String,
pool: sqlx::Pool<sqlx::Postgres>,
shutdown: triggered::Listener,
) {
let mut trigger = tokio::time::interval(DURATION);

loop {
let shutdown = shutdown.clone();

tokio::select! {
_ = shutdown => {
tracing::info!("db_store: MetricTracker shutting down");
break;
}
_ = trigger.tick() => {
metrics::gauge!(size_name.clone(), pool.size() as f64);
metrics::gauge!(idle_name.clone(), pool.num_idle() as f64);
}
}
}
tokio::spawn(async move { run(pool_size_name, pool_idle_name, pool).await });
}

async fn run_tm(size_name: String, idle_name: String, pool: sqlx::Pool<sqlx::Postgres>) {
async fn run(size_name: String, idle_name: String, pool: sqlx::Pool<sqlx::Postgres>) {
let mut trigger = tokio::time::interval(DURATION);

loop {
Expand Down
43 changes: 4 additions & 39 deletions db_store/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,53 +37,18 @@ fn default_auth_type() -> AuthType {
}

impl Settings {
pub async fn connect(
&self,
app_name: &str,
shutdown: triggered::Listener,
) -> Result<(Pool<Postgres>, futures::future::BoxFuture<'static, Result>)> {
match self.auth_type {
AuthType::Postgres => match self.simple_connect().await {
Ok(pool) => Ok((
pool.clone(),
metric_tracker::start(app_name, pool, shutdown).await?,
)),
Err(err) => Err(err),
},
AuthType::Iam => {
let (pool, iam_auth_handle) =
iam_auth_pool::connect(self, shutdown.clone()).await?;
let metric_handle = metric_tracker::start(app_name, pool.clone(), shutdown).await?;

let handle =
tokio::spawn(async move { tokio::try_join!(iam_auth_handle, metric_handle) });

Ok((
pool,
Box::pin(async move {
match handle.await {
Ok(Err(err)) => Err(err),
Err(err) => Err(Error::from(err)),
Ok(_) => Ok(()),
}
}),
))
}
}
}

pub async fn connect_tm(&self, app_name: &str) -> Result<Pool<Postgres>> {
pub async fn connect(&self, app_name: &str) -> Result<Pool<Postgres>> {
match self.auth_type {
AuthType::Postgres => match self.simple_connect().await {
Ok(pool) => {
metric_tracker::start_tm(app_name, pool.clone()).await;
metric_tracker::start(app_name, pool.clone()).await;
Ok(pool)
}
Err(err) => Err(err),
},
AuthType::Iam => {
let pool = iam_auth_pool::connect_tm(self).await?;
metric_tracker::start_tm(app_name, pool.clone()).await;
let pool = iam_auth_pool::connect(self).await?;
metric_tracker::start(app_name, pool.clone()).await;
Ok(pool)
}
}
Expand Down
7 changes: 5 additions & 2 deletions file_store/src/file_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ pub struct FileUploadServer {
}

impl FileUpload {
pub async fn from_settings(settings: &Settings, messages: MessageReceiver) -> Result<FileUploadServer> {
pub async fn from_settings(
settings: &Settings,
messages: MessageReceiver,
) -> Result<FileUploadServer> {
Ok(FileUploadServer {
messages: UnboundedReceiverStream::new(messages),
store: FileStore::from_settings(settings).await?,
Expand Down Expand Up @@ -72,7 +75,7 @@ impl ManagedTask for FileUploadServer {

impl FileUploadServer {
pub async fn run(self, shutdown: &triggered::Listener) -> Result {
self.run_tm( shutdown.clone()).await
self.run_tm(shutdown.clone()).await
}

pub async fn run_tm(self, shutdown: triggered::Listener) -> Result {
Expand Down
16 changes: 3 additions & 13 deletions iot_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,11 @@ impl Daemon {
});

// Create database pool
let (pool, db_join_handle) = settings
.database
.connect("iot-config-store", shutdown_listener.clone())
.await?;
let pool = settings.database.connect("iot-config-store").await?;
sqlx::migrate!().run(&pool).await?;

// Create on-chain metadata pool
let (metadata_pool, md_pool_handle) = settings
.metadata
.connect("iot-config-metadata", shutdown_listener.clone())
.await?;
let metadata_pool = settings.metadata.connect("iot-config-metadata").await?;

let listen_addr = settings.listen_addr()?;

Expand Down Expand Up @@ -136,11 +130,7 @@ impl Daemon {
.serve_with_shutdown(listen_addr, shutdown_listener)
.map_err(Error::from);

tokio::try_join!(
db_join_handle.map_err(Error::from),
md_pool_handle.map_err(Error::from),
server
)?;
tokio::try_join!(server)?;

Ok(())
}
Expand Down
7 changes: 3 additions & 4 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use crate::{
use anyhow::{bail, Result};
use file_store::{
file_info_poller_tm::{FileInfoStream, LookbackBehavior},
file_sink_tm::FileSinkBuilder,
file_sink_tm::FileSinkClient,
file_source_tm, file_upload,
iot_packet::PacketRouterPacketReport,
file_sink_tm::FileSinkBuilder, FileStore, FileType,
FileStore, FileType,
};
use futures_util::TryFutureExt;
use iot_config::client::OrgClient;
Expand Down Expand Up @@ -38,7 +39,6 @@ impl ManagedTask for Daemon {
}
}


impl Daemon {
pub async fn run(mut self, shutdown: triggered::Listener) -> Result<()> {
loop {
Expand Down Expand Up @@ -92,7 +92,7 @@ impl Cmd {
poc_metrics::start_metrics(&settings.metrics)?;

// Set up the postgres pool:
let pool = settings.database.connect_tm(env!("CARGO_PKG_NAME")).await?;
let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?;
sqlx::migrate!().run(&pool).await?;

let solana = if settings.enable_solana_integration {
Expand Down Expand Up @@ -196,6 +196,5 @@ impl Cmd {
.add_task(burner)
.start()
.await

}
}
6 changes: 1 addition & 5 deletions iot_verifier/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ impl Server {
});

// Create database pool and run migrations
let (pool, db_join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown.clone())
.await?;
let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?;
sqlx::migrate!().run(&pool).await?;

telemetry::initialize(&pool).await?;
Expand Down Expand Up @@ -172,7 +169,6 @@ impl Server {
PriceTracker::start(&settings.price_tracker, shutdown.clone()).await?;

tokio::try_join!(
db_join_handle.map_err(Error::from),
gateway_updater.run(&shutdown).map_err(Error::from),
gateway_rewards_server.run().map_err(Error::from),
reward_manifests_server.run().map_err(Error::from),
Expand Down
16 changes: 3 additions & 13 deletions mobile_config/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,11 @@ impl Daemon {
poc_metrics::start_metrics(&settings.metrics)?;

// Create database pool
let (pool, pool_handle) = settings
.database
.connect("mobile-config-store", shutdown_listener.clone())
.await?;
let pool = settings.database.connect("mobile-config-store").await?;
sqlx::migrate!().run(&pool).await?;

// Create on-chain metadata pool
let (metadata_pool, md_pool_handle) = settings
.metadata
.connect("mobile-config-metadata", shutdown_listener.clone())
.await?;
let metadata_pool = settings.metadata.connect("mobile-config-metadata").await?;

let listen_addr = settings.listen_addr()?;

Expand Down Expand Up @@ -112,11 +106,7 @@ impl Daemon {
.serve_with_shutdown(listen_addr, shutdown_listener)
.map_err(Error::from);

tokio::try_join!(
pool_handle.map_err(Error::from),
md_pool_handle.map_err(Error::from),
server,
)?;
tokio::try_join!(server)?;

Ok(())
}
Expand Down
18 changes: 6 additions & 12 deletions mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,7 @@ impl Cmd {
});

// Set up the postgres pool:
let (pool, conn_handler) = settings
.database
.connect("mobile-packet-verifier", shutdown_listener.clone())
.await?;
let pool = settings.database.connect("mobile-packet-verifier").await?;
sqlx::migrate!().run(&pool).await?;

// Set up the solana network:
Expand All @@ -116,12 +113,8 @@ impl Cmd {
None
};

let sol_balance_monitor = solana::balance_monitor::start(
env!("CARGO_PKG_NAME"),
solana.clone(),
shutdown_listener.clone(),
)
.await?;
let sol_balance_monitor =
solana::balance_monitor::BalanceMonitor::new(env!("CARGO_PKG_NAME"), solana.clone())?;

let (file_upload_tx, file_upload_rx) = file_upload::message_channel();
let file_upload =
Expand Down Expand Up @@ -189,8 +182,9 @@ impl Cmd {
invalid_sessions_server.run().map_err(Error::from),
file_upload.run(&shutdown_listener).map_err(Error::from),
daemon.run(&shutdown_listener).map_err(Error::from),
conn_handler.map_err(Error::from),
sol_balance_monitor.map_err(Error::from),
sol_balance_monitor
.run(shutdown_listener.clone())
.map_err(Error::from),
event_id_purger.run(shutdown_listener.clone()),
)?;

Expand Down
7 changes: 2 additions & 5 deletions mobile_verifier/src/cli/reward_from_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ impl Cmd {
let epoch = start..end;
let expected_rewards = get_scheduled_tokens_for_poc_and_dc(epoch.end - epoch.start);

let (shutdown_trigger, shutdown_listener) = triggered::trigger();
let (pool, _join_handle) = settings
.database
.connect(env!("CARGO_PKG_NAME"), shutdown_listener)
.await?;
let (shutdown_trigger, _shutdown_listener) = triggered::trigger();
let pool = settings.database.connect(env!("CARGO_PKG_NAME")).await?;

let heartbeats = HeartbeatReward::validated(&pool, &epoch);
let speedtests = SpeedtestAverages::validated(&pool, epoch.end).await?;
Expand Down
Loading

0 comments on commit fb7037f

Please sign in to comment.