From d142f59b8d26edeae699ab79bdd20d3077061ca0 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Thu, 12 Sep 2024 15:40:19 +0200 Subject: [PATCH] add more env var config for postgres (#5365) * add more env var config for postgres * add tests for migrations * add documentation for new environment variables --- docs/reference/cli.md | 9 + ..._add-shard-update-timestamp-field.down.sql | 2 - ..._add-shard-update-timestamp-field.down.sql | 2 + .../src/metastore/postgres/metastore.rs | 13 +- .../src/metastore/postgres/migrator.rs | 217 ++++++++++++++++-- .../src/metastore/postgres/mod.rs | 4 + .../src/metastore/postgres/utils.rs | 14 +- 7 files changed, 238 insertions(+), 23 deletions(-) delete mode 100644 quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-update-timestamp-field.down.sql create mode 100644 quickwit/quickwit-metastore/migrations/postgresql/21_add-shard-update-timestamp-field.down.sql diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 614982e0fed..ae28d608d2c 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -838,6 +838,15 @@ Disables [telemetry](../telemetry.md) when set to any non-empty value. `QW_DISABLE_TELEMETRY=1 quickwit help` +### QW_POSTGRES_SKIP_MIGRATIONS + +Don't run database migrations (but verify that migrations were run successfully before, and no that unknown migration was run). + +### QW_POSTGRES_SKIP_MIGRATION_LOCKING + +Don't lock the database during migration. This may increase compatibility with alternative databases using the PostgreSQL wire protocol. However, it +is dangerous to use this if you can't guarantee that only one node will run the migrations. + ### RUST_LOG Configure quickwit log level. diff --git a/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-update-timestamp-field.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-update-timestamp-field.down.sql deleted file mode 100644 index 2bdbc180281..00000000000 --- a/quickwit/quickwit-metastore/migrations/postgresql/20_add-shard-update-timestamp-field.down.sql +++ /dev/null @@ -1,2 +0,0 @@ -ALTER TABLE shards - DROP IF EXISTS COLUMN update_timestamps; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/21_add-shard-update-timestamp-field.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/21_add-shard-update-timestamp-field.down.sql new file mode 100644 index 00000000000..8161c18ea80 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/21_add-shard-update-timestamp-field.down.sql @@ -0,0 +1,2 @@ +ALTER TABLE shards + DROP IF EXISTS update_timestamp; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index c9f554875ba..d69f269b9ec 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -26,7 +26,7 @@ use futures::StreamExt; use itertools::Itertools; use quickwit_common::pretty::PrettySample; use quickwit_common::uri::Uri; -use quickwit_common::ServiceStream; +use quickwit_common::{get_bool_from_env, ServiceStream}; use quickwit_config::{ validate_index_id_pattern, IndexTemplate, IndexTemplateId, PostgresMetastoreConfig, }; @@ -63,6 +63,10 @@ use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Spl use super::pool::TrackedPool; use super::split_stream::SplitStream; use super::utils::{append_query_filters, establish_connection}; +use super::{ + QW_POSTGRES_READ_ONLY_ENV_KEY, QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, + QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, +}; use crate::checkpoint::{ IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta, }; @@ -111,6 +115,10 @@ impl PostgresqlMetastore { .max_connection_lifetime_opt() .expect("PostgreSQL metastore config should have been validated"); + let read_only = get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false); + let skip_migrations = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false); + let skip_locking = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false); + let connection_pool = establish_connection( connection_uri, min_connections, @@ -118,10 +126,11 @@ impl PostgresqlMetastore { acquire_timeout, idle_timeout_opt, max_lifetime_opt, + read_only, ) .await?; - run_migrations(&connection_pool).await?; + run_migrations(&connection_pool, skip_migrations, skip_locking).await?; let metastore = PostgresqlMetastore { uri: connection_uri.clone(), diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs index 932937677c5..556c390ee38 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs @@ -17,34 +17,219 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::BTreeMap; + use quickwit_proto::metastore::{MetastoreError, MetastoreResult}; -use sqlx::migrate::Migrator; -use sqlx::{Acquire, Postgres}; +use sqlx::migrate::{Migrate, Migrator}; +use sqlx::{Acquire, PgConnection, Postgres}; use tracing::{error, instrument}; use super::pool::TrackedPool; -static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql"); +fn get_migrations() -> Migrator { + sqlx::migrate!("migrations/postgresql") +} /// Initializes the database and runs the SQL migrations stored in the /// `quickwit-metastore/migrations` directory. #[instrument(skip_all)] -pub(super) async fn run_migrations(pool: &TrackedPool) -> MetastoreResult<()> { +pub(super) async fn run_migrations( + pool: &TrackedPool, + skip_migrations: bool, + skip_locking: bool, +) -> MetastoreResult<()> { let mut tx = pool.begin().await?; let conn = tx.acquire().await?; - // this is an hidden function, made to get "around the annoying "implementation of `Acquire` - // is not general enough" error", which is the error we get otherwise. - let migrate_result = MIGRATOR.run_direct(conn).await; - let Err(migrate_error) = migrate_result else { - tx.commit().await?; - return Ok(()); + let mut migrator = get_migrations(); + + if skip_locking { + migrator.set_locking(false); + } + + if !skip_migrations { + // this is an hidden function, made to get "around the annoying "implementation of `Acquire` + // is not general enough" error", which is the error we get otherwise. + let migrate_result = migrator.run_direct(conn).await; + + let Err(migrate_error) = migrate_result else { + tx.commit().await?; + return Ok(()); + }; + tx.rollback().await?; + error!(error=%migrate_error, "failed to run PostgreSQL migrations"); + + Err(MetastoreError::Internal { + message: "failed to run PostgreSQL migrations".to_string(), + cause: migrate_error.to_string(), + }) + } else { + check_migrations(migrator, conn).await + } +} + +async fn check_migrations(migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> { + let dirty = match conn.dirty_version().await { + Ok(dirty) => dirty, + Err(migrate_error) => { + error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: migrate_error.to_string(), + }); + } + }; + if let Some(dirty) = dirty { + error!("migration {dirty} is dirty"); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: format!("migration {dirty} is dirty"), + }); }; - tx.rollback().await?; - error!(error=%migrate_error, "failed to run PostgreSQL migrations"); + let applied_migrations = match conn.list_applied_migrations().await { + Ok(applied_migrations) => applied_migrations, + Err(migrate_error) => { + error!(error=%migrate_error, "failed to validate PostgreSQL migrations"); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: migrate_error.to_string(), + }); + } + }; + let expected_migrations: BTreeMap<_, _> = migrator + .iter() + .filter(|migration| migration.migration_type.is_up_migration()) + .map(|migration| (migration.version, migration)) + .collect(); + if applied_migrations.len() < expected_migrations.len() { + error!( + "missing migrations, expected {} migrations, only {} present in database", + expected_migrations.len(), + applied_migrations.len() + ); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: format!( + "missing migrations, expected {} migrations, only {} present in database", + expected_migrations.len(), + applied_migrations.len() + ), + }); + } + for applied_migration in applied_migrations { + let Some(migration) = expected_migrations.get(&applied_migration.version) else { + error!( + "found unknown migration {} in database", + applied_migration.version + ); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: format!( + "found unknown migration {} in database", + applied_migration.version + ), + }); + }; + if migration.checksum != applied_migration.checksum { + error!( + "migration {} differ between database and expected value", + applied_migration.version + ); + + return Err(MetastoreError::Internal { + message: "failed to validate PostgreSQL migrations".to_string(), + cause: format!( + "migration {} differ between database and expected value", + applied_migration.version + ), + }); + } + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use quickwit_common::uri::Uri; + use sqlx::migrate::Migrate; + use sqlx::Acquire; + + use super::{get_migrations, run_migrations}; + use crate::metastore::postgres::utils::establish_connection; + + #[tokio::test] + #[serial_test::file_serial] + async fn test_metastore_check_migration() { + let _ = tracing_subscriber::fmt::try_init(); + + dotenvy::dotenv().ok(); + let uri: Uri = std::env::var("QW_TEST_DATABASE_URL") + .expect("environment variable `QW_TEST_DATABASE_URL` should be set") + .parse() + .expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI"); + + { + let connection_pool = + establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, false) + .await + .unwrap(); + // make sure migrations are run + run_migrations(&connection_pool, false, false) + .await + .unwrap(); + + // we just ran migration, nothing else to run + run_migrations(&connection_pool, true, false).await.unwrap(); + + let migrations = get_migrations(); + let last_migration = migrations + .iter() + .map(|migration| migration.version) + .max() + .expect("no migration exists?"); + let up_migration = migrations + .iter() + .find(|migration| { + migration.version == last_migration + && migration.migration_type.is_up_migration() + }) + .unwrap(); + let down_migration = migrations + .iter() + .find(|migration| { + migration.version == last_migration + && migration.migration_type.is_down_migration() + }) + .unwrap(); + let mut conn = connection_pool.acquire().await.unwrap(); + + conn.revert(down_migration).await.unwrap(); + + run_migrations(&connection_pool, true, false) + .await + .unwrap_err(); + + conn.apply(up_migration).await.unwrap(); + } - Err(MetastoreError::Internal { - message: "failed to run PostgreSQL migrations".to_string(), - cause: migrate_error.to_string(), - }) + { + let connection_pool = + establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, true) + .await + .unwrap(); + // error because we are in read only mode, and we try to run migrations + run_migrations(&connection_pool, false, false) + .await + .unwrap_err(); + // okay because all migrations were already run before + run_migrations(&connection_pool, true, false).await.unwrap(); + } + } } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs index c89deb740e7..0b5876509d6 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/mod.rs @@ -30,3 +30,7 @@ mod utils; pub use factory::PostgresqlMetastoreFactory; pub use metastore::PostgresqlMetastore; + +const QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY: &str = "QW_POSTGRES_SKIP_MIGRATIONS"; +const QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY: &str = "QW_POSTGRES_SKIP_MIGRATION_LOCKING"; +const QW_POSTGRES_READ_ONLY_ENV_KEY: &str = "QW_POSTGRES_READ_ONLY"; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index c76a0cac673..e794ec0d2fe 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -44,6 +44,7 @@ pub(super) async fn establish_connection( acquire_timeout: Duration, idle_timeout_opt: Option, max_lifetime_opt: Option, + read_only: bool, ) -> MetastoreResult> { let pool_options = PgPoolOptions::new() .min_connections(min_connections as u32) @@ -51,9 +52,16 @@ pub(super) async fn establish_connection( .acquire_timeout(acquire_timeout) .idle_timeout(idle_timeout_opt) .max_lifetime(max_lifetime_opt); - let connect_options: PgConnectOptions = PgConnectOptions::from_str(connection_uri.as_str())? - .application_name("quickwit-metastore") - .log_statements(LevelFilter::Info); + + let mut connect_options: PgConnectOptions = + PgConnectOptions::from_str(connection_uri.as_str())? + .application_name("quickwit-metastore") + .log_statements(LevelFilter::Info); + + if read_only { + // this isn't a security mechanism, only a safeguard against involontary missuse + connect_options = connect_options.options([("default_transaction_read_only", "on")]); + } let sqlx_pool = pool_options .connect_with(connect_options) .await