Skip to content
This repository has been archived by the owner on Jul 25, 2022. It is now read-only.

Commit

Permalink
Make db pool size configurable (#2158)
Browse files Browse the repository at this point in the history
* Make db pool size configurable

Allow override of POOL_LIMIT on a per service basis
Adjust most limits down
Adjust task-runner limit up to 10

Fixes #2157

Signed-off-by: Nathaniel Clark <[email protected]>

* Fix spelling and add LIMIT to task-runner

Signed-off-by: Nathaniel Clark <[email protected]>

* update sqlx-data.json

Signed-off-by: Joe Grund <[email protected]>

* Address review comments

Signed-off-by: Nathaniel Clark <[email protected]>

Co-authored-by: Joe Grund <[email protected]>
  • Loading branch information
utopiabound and jgrund authored Aug 17, 2020
1 parent bdb4a34 commit ca5f9e1
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 77 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion iml-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ mod command;
mod error;
mod task;

use iml_manager_env::get_pool_limit;
use iml_postgres::get_db_pool;
use iml_rabbit::{self, create_connection_filter};
use iml_wire_types::Conf;
use warp::Filter;

// Default pool limit if not overridden by POOL_LIMIT
const DEFAULT_POOL_LIMIT: u32 = 5;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
iml_tracing::init();
Expand All @@ -33,7 +37,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let conn_filter = create_connection_filter(pool);

let pool = get_db_pool(5).await?;
let pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?;
let db_pool_filter = warp::any().map(move || pool.clone());

let routes = warp::path("conf")
Expand Down
1 change: 1 addition & 0 deletions iml-mailbox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ iml-manager-env = {path = "../iml-manager-env", version = "0.3"}
iml-postgres = {path = "../iml-postgres", version = "0.3"}
iml-tracing = {version = "0.2", path = "../iml-tracing"}
iml-wire-types = {path = "../iml-wire-types", version = "0.3", features = ["postgres-interop"]}
lazy_static = "1.4.0"
serde = {version = "1", features = ["derive"]}
serde_json = "1.0"
thiserror = "1.0"
Expand Down
11 changes: 9 additions & 2 deletions iml-mailbox/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@

use futures::{Stream, StreamExt};
use iml_mailbox::ingest_data;
use iml_manager_env::get_pool_limit;
use iml_postgres::{get_db_pool, sqlx::PgPool};
use iml_tracing::tracing;
use lazy_static::lazy_static;
use std::pin::Pin;
use warp::Filter as _;

// Default pool limit if not overridden by POOL_LIMIT
lazy_static! {
static ref POOL_LIMIT: u32 = get_pool_limit().unwrap_or(8);
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
iml_tracing::init();

let pool = get_db_pool(10).await?;
let pool = get_db_pool(*POOL_LIMIT).await?;
let db_pool_filter = warp::any().map(move || pool.clone());

let addr = iml_manager_env::get_mailbox_addr();
Expand All @@ -41,7 +48,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

s.filter_map(|l| async move { l.ok() })
.chunks(1000)
.for_each_concurrent(10, |lines| {
.for_each_concurrent(*POOL_LIMIT as usize, |lines| {
let pool = db_pool.clone();
let task_name = task_name.clone();

Expand Down
7 changes: 7 additions & 0 deletions iml-manager-env/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ pub fn get_db_password() -> Option<String> {
empty_str_to_none(get_var("DB_PASSWORD"))
}

pub fn get_pool_limit() -> Option<u32> {
env::var("POOL_LIMIT")
.ok()
.map(|l| l.parse().ok())
.flatten()
}

/// Get the report port from the env or panic
pub fn get_report_port() -> String {
get_var("REPORT_PORT")
Expand Down
6 changes: 5 additions & 1 deletion iml-services/iml-action-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use iml_action_runner::{
data::SessionToRpcs, local_actions::SharedLocalActionsInFlight, receiver::handle_agent_data,
sender::sender, Sessions, Shared,
};
use iml_manager_env::get_pool_limit;
use iml_postgres::get_db_pool;
use iml_rabbit::create_connection_filter;
use iml_service_queue::service_queue::{consume_service_queue, ImlServiceQueueError};
Expand All @@ -16,6 +17,9 @@ use warp::{self, Filter as _};

pub static AGENT_TX_RUST: &str = "agent_tx_rust";

// Default pool limit if not overridden by POOL_LIMIT
const POOL_LIMIT: u32 = 2;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
iml_tracing::init();
Expand All @@ -30,7 +34,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let pool = iml_rabbit::connect_to_rabbit(3);

let db_pool = get_db_pool(5).await?;
let db_pool = get_db_pool(get_pool_limit().unwrap_or(POOL_LIMIT)).await?;

let routes = sender(
AGENT_TX_RUST,
Expand Down
6 changes: 5 additions & 1 deletion iml-services/iml-device/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use iml_device::{
},
update_client_mounts, update_devices, Cache, ImlDeviceError,
};
use iml_manager_env::get_pool_limit;
use iml_postgres::get_db_pool;
use iml_service_queue::service_queue::consume_data;
use iml_tracing::tracing;
Expand All @@ -23,13 +24,16 @@ use std::{
};
use warp::Filter;

// Default pool limit if not overridden by POOL_LIMIT
const DEFAULT_POOL_LIMIT: u32 = 2;

#[tokio::main]
async fn main() -> Result<(), ImlDeviceError> {
iml_tracing::init();

let addr = iml_manager_env::get_device_aggregator_addr();

let pool = get_db_pool(5).await?;
let pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?;

let cache = create_cache(&pool).await?;

Expand Down
6 changes: 5 additions & 1 deletion iml-services/iml-journal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use futures::TryStreamExt;
use iml_journal::{execute_handlers, get_message_class, ImlJournalError};
use iml_manager_env::get_pool_limit;
use iml_postgres::{
get_db_pool,
sqlx::{self, PgPool},
Expand All @@ -19,6 +20,9 @@ lazy_static! {
static ref DBLOG_LW: i64 = iml_manager_env::get_dblog_lw() as i64;
}

// Default pool limit if not overridden by POOL_LIMIT
const DEFAULT_POOL_LIMIT: u32 = 2;

async fn purge_excess(pool: &PgPool, num_rows: i64) -> Result<i64, ImlJournalError> {
if num_rows <= *DBLOG_HW {
return Ok(num_rows);
Expand Down Expand Up @@ -57,7 +61,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

tracing::info!("Starting");

let pool = get_db_pool(5).await?;
let pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?;

let rabbit_pool = iml_rabbit::connect_to_rabbit(1);

Expand Down
1 change: 1 addition & 0 deletions iml-services/iml-ntp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ version = "0.3.0"

[dependencies]
futures = "0.3"
iml-manager-env = {path = "../../iml-manager-env", version = "0.3"}
iml-postgres = {path = "../../iml-postgres", version = "0.3"}
iml-rabbit = {path = "../../iml-rabbit", version = "0.3"}
iml-service-queue = {path = "../iml-service-queue", version = "0.3"}
Expand Down
6 changes: 5 additions & 1 deletion iml-services/iml-ntp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
// license that can be found in the LICENSE file.

use futures::TryStreamExt;
use iml_manager_env::get_pool_limit;
use iml_postgres::{alert, get_db_pool, sqlx};
use iml_service_queue::service_queue::consume_data;
use iml_wire_types::{db::ManagedHostRecord, time::State, AlertRecordType, AlertSeverity};

// Default pool limit if not overridden by POOL_LIMIT
const DEFAULT_POOL_LIMIT: u32 = 2;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
iml_tracing::init();

let pool = get_db_pool(5).await?;
let pool = get_db_pool(get_pool_limit().unwrap_or(DEFAULT_POOL_LIMIT)).await?;

let rabbit_pool = iml_rabbit::connect_to_rabbit(1);

Expand Down
2 changes: 2 additions & 0 deletions iml-task-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ version = "0.1.0"
[dependencies]
futures = "0.3"
iml-action-client = {path = "../iml-action-client", version = "0.1"}
iml-manager-env = {path = "../iml-manager-env", version = "0.3"}
iml-postgres = {path = "../iml-postgres", version = "0.3"}
iml-tracing = {path = "../iml-tracing", version = "0.2"}
iml-wire-types = {path = "../iml-wire-types", version = "0.3", features = ["postgres-interop"]}
lazy_static = "1.4.0"
serde_json = "1.0"
thiserror = "1.0"
tokio = {version = "0.2", features = ["rt-threaded", "macros"]}
14 changes: 12 additions & 2 deletions iml-task-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use futures::{future::join_all, lock::Mutex, FutureExt, TryFutureExt};
use iml_action_client::invoke_rust_agent;
use iml_manager_env::get_pool_limit;
use iml_postgres::{
get_db_pool,
sqlx::{self, pool::PoolConnection, Done, Executor, PgPool, Postgres},
Expand All @@ -13,7 +14,9 @@ use iml_wire_types::{
db::{FidTaskQueue, LustreFid},
AgentResult, FidError, FidItem, LustreClient, Task, TaskAction,
};
use lazy_static::lazy_static;
use std::{
cmp::max,
collections::{HashMap, HashSet},
str::FromStr,
sync::Arc,
Expand All @@ -28,6 +31,11 @@ const FID_LIMIT: i64 = 2000;
// Number of seconds between cycles
const DELAY: Duration = Duration::from_secs(5);

// Default pool limit if not overridden by POOL_LIMIT
lazy_static! {
static ref POOL_LIMIT: u32 = get_pool_limit().unwrap_or(8);
}

async fn available_workers(
conn: &mut PoolConnection<Postgres>,
active: Arc<Mutex<HashSet<i32>>>,
Expand All @@ -43,8 +51,10 @@ async fn available_workers(
state = 'mounted'
AND not_deleted = 't'
AND id != ALL($1)
LIMIT $2
"#,
&ids
&ids,
max(*POOL_LIMIT as i64 - ids.len() as i64, 0),
)
.fetch_all(conn)
.await?;
Expand Down Expand Up @@ -306,7 +316,7 @@ async fn run_tasks(fqdn: &str, worker: &LustreClient, xs: Vec<Task>, pool: &PgPo
async fn main() -> Result<(), Box<dyn std::error::Error>> {
iml_tracing::init();

let pg_pool = get_db_pool(5).await?;
let pg_pool = get_db_pool(*POOL_LIMIT).await?;
let active_clients = Arc::new(Mutex::new(HashSet::new()));
let mut interval = time::interval(DELAY);

Expand Down
Loading

0 comments on commit ca5f9e1

Please sign in to comment.