Skip to content

Commit

Permalink
Make acquire connection timeout configurable and increase default to …
Browse files Browse the repository at this point in the history
…10s (#4573)
  • Loading branch information
guilload authored Feb 13, 2024
1 parent 652453a commit e5fd9fe
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 33 deletions.
6 changes: 5 additions & 1 deletion config/quickwit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ version: 0.7
#
# metastore:
# postgres:
# max_num_connections: 20
# min_connections: 0
# max_connections: 10
# acquire_connection_timeout: 10s
# idle_connection_timeout: 10min
# max_connection_lifetime: 30min
#
# -------------------------------- Indexer settings --------------------------------

Expand Down
12 changes: 10 additions & 2 deletions docs/configuration/node-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,22 @@ metastore:

| Property | Description | Default value |
| --- | --- | --- |
| `max_num_connections` | Determines the maximum number of concurrent connections to the database server. | `10` |
| `min_connections` | Minimum number of connections to maintain in the pool at all times. | `0` |
| `max_connections` | Maximum number of connections to maintain in the pool. | `10` |
| `acquire_connection_timeout` | Maximum amount of time to spend waiting for an available connection before aborting a query. | `10s` |
| `idle_connection_timeout` | Maximum idle duration before closing individual connections. | `10min` |
| `max_connection_lifetime` | Maximum lifetime of individual connections. | `30min` |

Example of a metastore configuration for PostgreSQL in YAML format:

```yaml
metastore:
postgres:
max_num_connections: 50
min_connections: 10
max_connections: 50
acquire_connection_timeout: 30s
idle_connection_timeout: 1h
max_connection_lifetime: 1d
```

## Indexer configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@
},
"metastore": {
"postgres": {
"max_num_connections": 12
"min_connections": 1,
"max_num_connections": 12,
"acquire_connection_timeout": "30s",
"idle_connection_timeout": "30min",
"max_connection_lifetime": "1h"
}
},
"indexer": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ endpoint = "http://localhost:4566"
force_path_style_access = true

[metastore.postgres]
min_connections = 1
max_num_connections = 12
acquire_connection_timeout = "30s"
idle_connection_timeout = "30min"
max_connection_lifetime = "1h"

[indexer]
enable_otlp_endpoint = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ storage:

metastore:
postgres:
min_connections: 1
max_num_connections: 12
acquire_connection_timeout: 30s
idle_connection_timeout: 30min
max_connection_lifetime: 1h

indexer:
enable_otlp_endpoint: true
Expand Down
219 changes: 204 additions & 15 deletions quickwit/quickwit-config/src/metastore_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

use std::num::NonZeroUsize;
use std::ops::Deref;
use std::time::Duration;

use anyhow::ensure;
use anyhow::{ensure, Context};
use humantime::parse_duration;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, EnumMap};
Expand All @@ -41,20 +43,23 @@ pub enum MetastoreBackend {
/// polling_interval: 30s
///
/// postgres:
/// max_num_connections: 12
/// max_connections: 12
/// ```
#[serde_as]
#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
pub struct MetastoreConfigs(#[serde_as(as = "EnumMap")] Vec<MetastoreConfig>);

impl MetastoreConfigs {
pub fn redact(&mut self) {
for metastore_config in self.0.iter_mut() {
for metastore_config in &mut self.0 {
metastore_config.redact();
}
}

pub fn validate(&self) -> anyhow::Result<()> {
for metastore_config in &self.0 {
metastore_config.validate()?;
}
let backends: Vec<MetastoreBackend> = self
.0
.iter()
Expand Down Expand Up @@ -134,6 +139,14 @@ impl MetastoreConfig {
// TODO: Implement this method when we end up storing secrets in the
// metastore config.
}

pub fn validate(&self) -> anyhow::Result<()> {
match self {
Self::File(file_metastore_config) => file_metastore_config.validate()?,
Self::PostgreSQL(postgres_metastore_config) => postgres_metastore_config.validate()?,
}
Ok(())
}
}

impl From<FileMetastoreConfig> for MetastoreConfig {
Expand All @@ -151,28 +164,121 @@ impl From<PostgresMetastoreConfig> for MetastoreConfig {
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct PostgresMetastoreConfig {
#[serde(default = "PostgresMetastoreConfig::default_max_num_connections")]
pub max_num_connections: NonZeroUsize,
#[serde(default = "PostgresMetastoreConfig::default_min_connections")]
pub min_connections: usize,
#[serde(
alias = "max_num_connections",
default = "PostgresMetastoreConfig::default_max_connections"
)]
pub max_connections: NonZeroUsize,
#[serde(default = "PostgresMetastoreConfig::default_acquire_connection_timeout")]
pub acquire_connection_timeout: String,
#[serde(default = "PostgresMetastoreConfig::default_idle_connection_timeout")]
pub idle_connection_timeout: String,
#[serde(default = "PostgresMetastoreConfig::default_max_connection_lifetime")]
pub max_connection_lifetime: String,
}

impl Default for PostgresMetastoreConfig {
fn default() -> Self {
Self {
max_num_connections: Self::default_max_num_connections(),
min_connections: Self::default_min_connections(),
max_connections: Self::default_max_connections(),
acquire_connection_timeout: Self::default_acquire_connection_timeout(),
idle_connection_timeout: Self::default_idle_connection_timeout(),
max_connection_lifetime: Self::default_max_connection_lifetime(),
}
}
}

impl PostgresMetastoreConfig {
pub fn default_max_num_connections() -> NonZeroUsize {
NonZeroUsize::new(10).expect("10 is always non-zero.")
pub fn default_min_connections() -> usize {
0
}

pub fn default_max_connections() -> NonZeroUsize {
NonZeroUsize::new(10).unwrap()
}

pub fn default_acquire_connection_timeout() -> String {
"10s".to_string()
}

pub fn default_idle_connection_timeout() -> String {
"10min".to_string()
}

pub fn default_max_connection_lifetime() -> String {
"30min".to_string()
}

pub fn acquire_connection_timeout(&self) -> anyhow::Result<Duration> {
parse_duration(&self.acquire_connection_timeout).with_context(|| {
format!(
"failed to parse `acquire_connection_timeout` value `{}`",
self.acquire_connection_timeout
)
})
}

pub fn idle_connection_timeout_opt(&self) -> anyhow::Result<Option<Duration>> {
if self.idle_connection_timeout.is_empty() || self.idle_connection_timeout == "0" {
return Ok(None);
}
let idle_connection_timeout =
parse_duration(&self.idle_connection_timeout).with_context(|| {
format!(
"failed to parse `idle_connection_timeout` value `{}`",
self.idle_connection_timeout
)
})?;
if idle_connection_timeout.is_zero() {
Ok(None)
} else {
Ok(Some(idle_connection_timeout))
}
}

pub fn max_connection_lifetime_opt(&self) -> anyhow::Result<Option<Duration>> {
if self.max_connection_lifetime.is_empty() || self.max_connection_lifetime == "0" {
return Ok(None);
}
let max_connection_lifetime =
parse_duration(&self.max_connection_lifetime).with_context(|| {
format!(
"failed to parse `max_connection_lifetime` value `{}`",
self.max_connection_lifetime
)
})?;
if max_connection_lifetime.is_zero() {
Ok(None)
} else {
Ok(Some(max_connection_lifetime))
}
}

pub fn validate(&self) -> anyhow::Result<()> {
ensure!(
self.min_connections <= self.max_connections.get(),
"`min_connections` must be less than or equal to `max_connections`"
);
self.acquire_connection_timeout()?;
self.idle_connection_timeout_opt()?;
self.max_connection_lifetime_opt()?;
Ok(())
}
}

#[derive(Debug, Clone, Default, Eq, PartialEq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct FileMetastoreConfig;

impl FileMetastoreConfig {
pub fn validate(&self) -> anyhow::Result<()> {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -186,13 +292,14 @@ mod tests {

let metastore_configs_yaml = r#"
postgres:
max_num_connections: 12
max_connections: 12
"#;
let metastore_configs: MetastoreConfigs =
serde_yaml::from_str(metastore_configs_yaml).unwrap();

let expected_metastore_configs = MetastoreConfigs(vec![PostgresMetastoreConfig {
max_num_connections: NonZeroUsize::new(12).expect("12 is always non-zero."),
max_connections: NonZeroUsize::new(12).unwrap(),
..Default::default()
}
.into()]);
assert_eq!(metastore_configs, expected_metastore_configs);
Expand All @@ -202,30 +309,112 @@ mod tests {
fn test_metastore_configs_validate() {
let metastore_configs = MetastoreConfigs(vec![
PostgresMetastoreConfig {
max_num_connections: NonZeroUsize::new(12).expect("12 is always non-zero."),
max_connections: NonZeroUsize::new(12).unwrap(),
..Default::default()
}
.into(),
PostgresMetastoreConfig {
max_num_connections: NonZeroUsize::new(12).expect("12 is always non-zero."),
max_connections: NonZeroUsize::new(12).unwrap(),
..Default::default()
}
.into(),
]);
metastore_configs.validate().unwrap_err();
let error = metastore_configs.validate().unwrap_err();
assert!(error.to_string().contains("defined multiple times"));

let metastore_configs = MetastoreConfigs(vec![PostgresMetastoreConfig {
acquire_connection_timeout: "15".to_string(),
..Default::default()
}
.into()]);
let error = metastore_configs.validate().unwrap_err();
assert!(error.to_string().contains("`acquire_connection_timeout`"));
}

#[test]
fn test_pg_metastore_config_serde() {
{
let pg_metastore_config_yaml = "";
let pg_metastore_config: PostgresMetastoreConfig =
serde_yaml::from_str(pg_metastore_config_yaml).unwrap();
assert_eq!(pg_metastore_config, PostgresMetastoreConfig::default());
}
{
let pg_metastore_config_yaml = r#"
max_connections: 12
"#;
let pg_metastore_config: PostgresMetastoreConfig =
serde_yaml::from_str(pg_metastore_config_yaml).unwrap();

let expected_pg_metastore_config = PostgresMetastoreConfig {
max_connections: NonZeroUsize::new(12).unwrap(),
..Default::default()
};
assert_eq!(pg_metastore_config, expected_pg_metastore_config);
}
{
let pg_metastore_config_yaml = r#"
min_connections: 6
max_connections: 12
acquire_connection_timeout: 500ms
idle_connection_timeout: 1h
max_connection_lifetime: 1d
"#;
let pg_metastore_config: PostgresMetastoreConfig =
serde_yaml::from_str(pg_metastore_config_yaml).unwrap();

let expected_pg_metastore_config = PostgresMetastoreConfig {
min_connections: 6,
max_connections: NonZeroUsize::new(12).unwrap(),
acquire_connection_timeout: "500ms".to_string(),
idle_connection_timeout: "1h".to_string(),
max_connection_lifetime: "1d".to_string(),
};
assert_eq!(pg_metastore_config, expected_pg_metastore_config);
assert_eq!(
pg_metastore_config.acquire_connection_timeout().unwrap(),
Duration::from_millis(500)
);
assert_eq!(
pg_metastore_config.idle_connection_timeout_opt().unwrap(),
Some(Duration::from_secs(3600))
);
assert_eq!(
pg_metastore_config.max_connection_lifetime_opt().unwrap(),
Some(Duration::from_secs(24 * 3600))
);
}
{
let pg_metastore_config_yaml = r#"
max_num_connections: 12
min_connections: 6
max_connections: 12
acquire_connection_timeout: 15s
idle_connection_timeout: ""
max_connection_lifetime: 0
"#;
let pg_metastore_config: PostgresMetastoreConfig =
serde_yaml::from_str(pg_metastore_config_yaml).unwrap();

let expected_pg_metastore_config = PostgresMetastoreConfig {
max_num_connections: NonZeroUsize::new(12).expect("12 is always non-zero."),
min_connections: 6,
max_connections: NonZeroUsize::new(12).unwrap(),
acquire_connection_timeout: "15s".to_string(),
idle_connection_timeout: "".to_string(),
max_connection_lifetime: "0".to_string(),
};
assert_eq!(pg_metastore_config, expected_pg_metastore_config);
assert_eq!(
pg_metastore_config.acquire_connection_timeout().unwrap(),
Duration::from_secs(15)
);
assert!(pg_metastore_config
.idle_connection_timeout_opt()
.unwrap()
.is_none());
assert!(pg_metastore_config
.max_connection_lifetime_opt()
.unwrap()
.is_none(),);
}
}
}
Loading

0 comments on commit e5fd9fe

Please sign in to comment.