Skip to content

Commit

Permalink
add more env var config for postgres (#5365)
Browse files Browse the repository at this point in the history
* add more env var config for postgres

* add tests for migrations

* add documentation for new environment variables
  • Loading branch information
trinity-1686a authored Sep 12, 2024
1 parent 8494d51 commit d142f59
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 23 deletions.
9 changes: 9 additions & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,15 @@ Disables [telemetry](../telemetry.md) when set to any non-empty value.

`QW_DISABLE_TELEMETRY=1 quickwit help`

### QW_POSTGRES_SKIP_MIGRATIONS

Don't run database migrations (but verify that migrations were run successfully before, and no that unknown migration was run).

### QW_POSTGRES_SKIP_MIGRATION_LOCKING

Don't lock the database during migration. This may increase compatibility with alternative databases using the PostgreSQL wire protocol. However, it
is dangerous to use this if you can't guarantee that only one node will run the migrations.

### RUST_LOG

Configure quickwit log level.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE shards
DROP IF EXISTS update_timestamp;
13 changes: 11 additions & 2 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use futures::StreamExt;
use itertools::Itertools;
use quickwit_common::pretty::PrettySample;
use quickwit_common::uri::Uri;
use quickwit_common::ServiceStream;
use quickwit_common::{get_bool_from_env, ServiceStream};
use quickwit_config::{
validate_index_id_pattern, IndexTemplate, IndexTemplateId, PostgresMetastoreConfig,
};
Expand Down Expand Up @@ -63,6 +63,10 @@ use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Spl
use super::pool::TrackedPool;
use super::split_stream::SplitStream;
use super::utils::{append_query_filters, establish_connection};
use super::{
QW_POSTGRES_READ_ONLY_ENV_KEY, QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY,
QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY,
};
use crate::checkpoint::{
IndexCheckpointDelta, PartitionId, SourceCheckpoint, SourceCheckpointDelta,
};
Expand Down Expand Up @@ -111,17 +115,22 @@ impl PostgresqlMetastore {
.max_connection_lifetime_opt()
.expect("PostgreSQL metastore config should have been validated");

let read_only = get_bool_from_env(QW_POSTGRES_READ_ONLY_ENV_KEY, false);
let skip_migrations = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY, false);
let skip_locking = get_bool_from_env(QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY, false);

let connection_pool = establish_connection(
connection_uri,
min_connections,
max_connections,
acquire_timeout,
idle_timeout_opt,
max_lifetime_opt,
read_only,
)
.await?;

run_migrations(&connection_pool).await?;
run_migrations(&connection_pool, skip_migrations, skip_locking).await?;

let metastore = PostgresqlMetastore {
uri: connection_uri.clone(),
Expand Down
217 changes: 201 additions & 16 deletions quickwit/quickwit-metastore/src/metastore/postgres/migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,219 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeMap;

use quickwit_proto::metastore::{MetastoreError, MetastoreResult};
use sqlx::migrate::Migrator;
use sqlx::{Acquire, Postgres};
use sqlx::migrate::{Migrate, Migrator};
use sqlx::{Acquire, PgConnection, Postgres};
use tracing::{error, instrument};

use super::pool::TrackedPool;

static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql");
fn get_migrations() -> Migrator {
sqlx::migrate!("migrations/postgresql")
}

/// Initializes the database and runs the SQL migrations stored in the
/// `quickwit-metastore/migrations` directory.
#[instrument(skip_all)]
pub(super) async fn run_migrations(pool: &TrackedPool<Postgres>) -> MetastoreResult<()> {
pub(super) async fn run_migrations(
pool: &TrackedPool<Postgres>,
skip_migrations: bool,
skip_locking: bool,
) -> MetastoreResult<()> {
let mut tx = pool.begin().await?;
let conn = tx.acquire().await?;
// this is an hidden function, made to get "around the annoying "implementation of `Acquire`
// is not general enough" error", which is the error we get otherwise.
let migrate_result = MIGRATOR.run_direct(conn).await;

let Err(migrate_error) = migrate_result else {
tx.commit().await?;
return Ok(());
let mut migrator = get_migrations();

if skip_locking {
migrator.set_locking(false);
}

if !skip_migrations {
// this is an hidden function, made to get "around the annoying "implementation of `Acquire`
// is not general enough" error", which is the error we get otherwise.
let migrate_result = migrator.run_direct(conn).await;

let Err(migrate_error) = migrate_result else {
tx.commit().await?;
return Ok(());
};
tx.rollback().await?;
error!(error=%migrate_error, "failed to run PostgreSQL migrations");

Err(MetastoreError::Internal {
message: "failed to run PostgreSQL migrations".to_string(),
cause: migrate_error.to_string(),
})
} else {
check_migrations(migrator, conn).await
}
}

async fn check_migrations(migrator: Migrator, conn: &mut PgConnection) -> MetastoreResult<()> {
let dirty = match conn.dirty_version().await {
Ok(dirty) => dirty,
Err(migrate_error) => {
error!(error=%migrate_error, "failed to validate PostgreSQL migrations");

return Err(MetastoreError::Internal {
message: "failed to validate PostgreSQL migrations".to_string(),
cause: migrate_error.to_string(),
});
}
};
if let Some(dirty) = dirty {
error!("migration {dirty} is dirty");

return Err(MetastoreError::Internal {
message: "failed to validate PostgreSQL migrations".to_string(),
cause: format!("migration {dirty} is dirty"),
});
};
tx.rollback().await?;
error!(error=%migrate_error, "failed to run PostgreSQL migrations");
let applied_migrations = match conn.list_applied_migrations().await {
Ok(applied_migrations) => applied_migrations,
Err(migrate_error) => {
error!(error=%migrate_error, "failed to validate PostgreSQL migrations");

return Err(MetastoreError::Internal {
message: "failed to validate PostgreSQL migrations".to_string(),
cause: migrate_error.to_string(),
});
}
};
let expected_migrations: BTreeMap<_, _> = migrator
.iter()
.filter(|migration| migration.migration_type.is_up_migration())
.map(|migration| (migration.version, migration))
.collect();
if applied_migrations.len() < expected_migrations.len() {
error!(
"missing migrations, expected {} migrations, only {} present in database",
expected_migrations.len(),
applied_migrations.len()
);

return Err(MetastoreError::Internal {
message: "failed to validate PostgreSQL migrations".to_string(),
cause: format!(
"missing migrations, expected {} migrations, only {} present in database",
expected_migrations.len(),
applied_migrations.len()
),
});
}
for applied_migration in applied_migrations {
let Some(migration) = expected_migrations.get(&applied_migration.version) else {
error!(
"found unknown migration {} in database",
applied_migration.version
);

return Err(MetastoreError::Internal {
message: "failed to validate PostgreSQL migrations".to_string(),
cause: format!(
"found unknown migration {} in database",
applied_migration.version
),
});
};
if migration.checksum != applied_migration.checksum {
error!(
"migration {} differ between database and expected value",
applied_migration.version
);

return Err(MetastoreError::Internal {
message: "failed to validate PostgreSQL migrations".to_string(),
cause: format!(
"migration {} differ between database and expected value",
applied_migration.version
),
});
}
}
Ok(())
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use quickwit_common::uri::Uri;
use sqlx::migrate::Migrate;
use sqlx::Acquire;

use super::{get_migrations, run_migrations};
use crate::metastore::postgres::utils::establish_connection;

#[tokio::test]
#[serial_test::file_serial]
async fn test_metastore_check_migration() {
let _ = tracing_subscriber::fmt::try_init();

dotenvy::dotenv().ok();
let uri: Uri = std::env::var("QW_TEST_DATABASE_URL")
.expect("environment variable `QW_TEST_DATABASE_URL` should be set")
.parse()
.expect("environment variable `QW_TEST_DATABASE_URL` should be a valid URI");

{
let connection_pool =
establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, false)
.await
.unwrap();
// make sure migrations are run
run_migrations(&connection_pool, false, false)
.await
.unwrap();

// we just ran migration, nothing else to run
run_migrations(&connection_pool, true, false).await.unwrap();

let migrations = get_migrations();
let last_migration = migrations
.iter()
.map(|migration| migration.version)
.max()
.expect("no migration exists?");
let up_migration = migrations
.iter()
.find(|migration| {
migration.version == last_migration
&& migration.migration_type.is_up_migration()
})
.unwrap();
let down_migration = migrations
.iter()
.find(|migration| {
migration.version == last_migration
&& migration.migration_type.is_down_migration()
})
.unwrap();
let mut conn = connection_pool.acquire().await.unwrap();

conn.revert(down_migration).await.unwrap();

run_migrations(&connection_pool, true, false)
.await
.unwrap_err();

conn.apply(up_migration).await.unwrap();
}

Err(MetastoreError::Internal {
message: "failed to run PostgreSQL migrations".to_string(),
cause: migrate_error.to_string(),
})
{
let connection_pool =
establish_connection(&uri, 1, 5, Duration::from_secs(2), None, None, true)
.await
.unwrap();
// error because we are in read only mode, and we try to run migrations
run_migrations(&connection_pool, false, false)
.await
.unwrap_err();
// okay because all migrations were already run before
run_migrations(&connection_pool, true, false).await.unwrap();
}
}
}
4 changes: 4 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ mod utils;

pub use factory::PostgresqlMetastoreFactory;
pub use metastore::PostgresqlMetastore;

const QW_POSTGRES_SKIP_MIGRATIONS_ENV_KEY: &str = "QW_POSTGRES_SKIP_MIGRATIONS";
const QW_POSTGRES_SKIP_MIGRATION_LOCKING_ENV_KEY: &str = "QW_POSTGRES_SKIP_MIGRATION_LOCKING";
const QW_POSTGRES_READ_ONLY_ENV_KEY: &str = "QW_POSTGRES_READ_ONLY";
14 changes: 11 additions & 3 deletions quickwit/quickwit-metastore/src/metastore/postgres/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,24 @@ pub(super) async fn establish_connection(
acquire_timeout: Duration,
idle_timeout_opt: Option<Duration>,
max_lifetime_opt: Option<Duration>,
read_only: bool,
) -> MetastoreResult<TrackedPool<Postgres>> {
let pool_options = PgPoolOptions::new()
.min_connections(min_connections as u32)
.max_connections(max_connections as u32)
.acquire_timeout(acquire_timeout)
.idle_timeout(idle_timeout_opt)
.max_lifetime(max_lifetime_opt);
let connect_options: PgConnectOptions = PgConnectOptions::from_str(connection_uri.as_str())?
.application_name("quickwit-metastore")
.log_statements(LevelFilter::Info);

let mut connect_options: PgConnectOptions =
PgConnectOptions::from_str(connection_uri.as_str())?
.application_name("quickwit-metastore")
.log_statements(LevelFilter::Info);

if read_only {
// this isn't a security mechanism, only a safeguard against involontary missuse
connect_options = connect_options.options([("default_transaction_read_only", "on")]);
}
let sqlx_pool = pool_options
.connect_with(connect_options)
.await
Expand Down

0 comments on commit d142f59

Please sign in to comment.