Skip to content

Commit

Permalink
Introduce an artifical delay in Vitess after a DDL change is made.
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelff committed Nov 23, 2023
1 parent 82490da commit e4db9cc
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 4 deletions.
18 changes: 18 additions & 0 deletions libs/test-setup/src/test_api_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{logging, mssql, mysql, postgres, Capabilities, Tags};
use enumflags2::BitFlags;
use once_cell::sync::Lazy;
use quaint::single::Quaint;
use std::time::Duration;
use std::{fmt::Display, io::Write as _};

#[derive(Debug)]
Expand All @@ -11,6 +12,7 @@ pub(crate) struct DbUnderTest {
shadow_database_url: Option<String>,
provider: &'static str,
pub(crate) tags: BitFlags<Tags>,
pub(crate) max_ddl_refresh_delay: Option<std::time::Duration>,
}

const MISSING_TEST_DATABASE_URL_MSG: &str = r#"
Expand Down Expand Up @@ -40,13 +42,22 @@ static DB_UNDER_TEST: Lazy<Result<DbUnderTest, String>> = Lazy::new(|| {
capabilities: Capabilities::CreateDatabase.into(),
provider: "sqlite",
shadow_database_url,
max_ddl_refresh_delay: None,
}),
"mysql" => {
let tags = mysql::get_mysql_tags(&database_url)?;
let mut capabilities = Capabilities::Enums | Capabilities::Json;
let mut max_refresh_delay = None;

if tags.contains(Tags::Vitess) {
capabilities |= Capabilities::CreateDatabase;
// Vitess is an eventually consistent system that propagates schema changes
// from vttablet to vtgate asynchronously. We might need to wait for a while before
// start querying the database after a schema change is made.
//
// For schema changes that do not alter the table column names, the schema is not
// required to be reloaded.
max_refresh_delay = Some(Duration::from_millis(500));
}

Ok(DbUnderTest {
Expand All @@ -55,6 +66,7 @@ static DB_UNDER_TEST: Lazy<Result<DbUnderTest, String>> = Lazy::new(|| {
capabilities,
provider: "mysql",
shadow_database_url,
max_ddl_refresh_delay: max_refresh_delay,
})
}
"postgresql" | "postgres" => Ok({
Expand All @@ -75,6 +87,7 @@ static DB_UNDER_TEST: Lazy<Result<DbUnderTest, String>> = Lazy::new(|| {
| Capabilities::CreateDatabase,
provider,
shadow_database_url,
max_ddl_refresh_delay: None,
}
}),
"sqlserver" => Ok(DbUnderTest {
Expand All @@ -83,6 +96,7 @@ static DB_UNDER_TEST: Lazy<Result<DbUnderTest, String>> = Lazy::new(|| {
capabilities: Capabilities::CreateDatabase.into(),
provider: "sqlserver",
shadow_database_url,
max_ddl_refresh_delay: None,
}),
_ => Err("Unknown database URL".into()),
}
Expand Down Expand Up @@ -191,6 +205,10 @@ impl TestApiArgs {
pub fn tags(&self) -> BitFlags<Tags> {
self.db.tags
}

pub fn max_ddl_refresh_delay(&self) -> Option<Duration> {
self.db.max_ddl_refresh_delay.clone()
}
}

pub struct DatasourceBlock<'a> {
Expand Down
10 changes: 9 additions & 1 deletion schema-engine/sql-migration-tests/src/commands/schema_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use colored::Colorize;
use schema_core::{
commands::schema_push, json_rpc::types::*, schema_connector::SchemaConnector, CoreError, CoreResult,
};
use std::time::Duration;
use std::{borrow::Cow, fmt::Debug};
use tracing_futures::Instrument;

Expand All @@ -11,15 +12,18 @@ pub struct SchemaPush<'a> {
force: bool,
/// Purely for logging diagnostics.
migration_id: Option<&'a str>,
// In eventually-consistent systems, we might need to wait for a while before the system refreshes
max_ddl_refresh_delay: Option<Duration>,
}

impl<'a> SchemaPush<'a> {
pub fn new(api: &'a mut dyn SchemaConnector, schema: String) -> Self {
pub fn new(api: &'a mut dyn SchemaConnector, schema: String, max_refresh_delay: Option<Duration>) -> Self {
SchemaPush {
api,
schema,
force: false,
migration_id: None,
max_ddl_refresh_delay: max_refresh_delay,
}
}

Expand All @@ -44,6 +48,10 @@ impl<'a> SchemaPush<'a> {

let output = test_setup::runtime::run_with_thread_local_runtime(fut)?;

if let Some(delay) = self.max_ddl_refresh_delay {
std::thread::sleep(delay);
}

Ok(SchemaPushAssertion {
result: output,
context: None,
Expand Down
11 changes: 10 additions & 1 deletion schema-engine/sql-migration-tests/src/multi_engine_test_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! A TestApi that is initialized without IO or async code and can instantiate
//! multiple schema engines.

use std::time::Duration;
pub use test_macros::test_connector;
pub use test_setup::sqlite_test_url;
pub use test_setup::{runtime::run_with_thread_local_runtime as tok, BitFlags, Capabilities, Tags};
Expand Down Expand Up @@ -158,6 +159,12 @@ impl TestApi {
self.tags().contains(Tags::Vitess)
}

/// Returns a duration that is guaranteed to be larger than the maximum refresh rate after a
/// DDL statement
pub(crate) fn max_ddl_refresh_delay(&self) -> Option<Duration> {
self.args.max_ddl_refresh_delay()
}

/// Returns whether the database automatically lower-cases table names.
pub fn lower_cases_table_names(&self) -> bool {
self.tags().contains(Tags::LowerCasesTableNames)
Expand Down Expand Up @@ -203,6 +210,7 @@ impl TestApi {
connection_info,
tags: self.args.tags(),
namespaces: self.args.namespaces(),
max_ddl_refresh_delay: self.args.max_ddl_refresh_delay(),
}
}

Expand Down Expand Up @@ -276,6 +284,7 @@ pub struct EngineTestApi {
connection_info: ConnectionInfo,
tags: BitFlags<Tags>,
namespaces: &'static [&'static str],
max_ddl_refresh_delay: Option<Duration>,
}

impl EngineTestApi {
Expand Down Expand Up @@ -320,7 +329,7 @@ impl EngineTestApi {

/// Plan a `schemaPush` command
pub fn schema_push(&mut self, dm: impl Into<String>) -> SchemaPush<'_> {
SchemaPush::new(&mut self.connector, dm.into())
SchemaPush::new(&mut self.connector, dm.into(), self.max_ddl_refresh_delay)
}

/// The schema name of the current connected database.
Expand Down
12 changes: 10 additions & 2 deletions schema-engine/sql-migration-tests/src/test_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use schema_core::{
};
use sql_schema_connector::SqlSchemaConnector;
use sql_schema_describer::SqlSchema;
use std::time::Duration;
use std::{
borrow::Cow,
fmt::{Display, Write},
Expand Down Expand Up @@ -189,6 +190,12 @@ impl TestApi {
self.root.is_vitess()
}

/// Returns a duration that is guaranteed to be larger than the maximum refresh rate after a
/// DDL statement
pub fn max_ddl_refresh_delay(&self) -> Option<Duration> {
self.root.max_ddl_refresh_delay()
}

/// Insert test values
pub fn insert<'a>(&'a mut self, table_name: &'a str) -> SingleRowInsert<'a> {
SingleRowInsert {
Expand Down Expand Up @@ -316,12 +323,13 @@ impl TestApi {
/// Plan a `schemaPush` command adding the datasource
pub fn schema_push_w_datasource(&mut self, dm: impl Into<String>) -> SchemaPush<'_> {
let schema = self.datamodel_with_provider(&dm.into());
SchemaPush::new(&mut self.connector, schema)
self.schema_push(schema)
}

/// Plan a `schemaPush` command
pub fn schema_push(&mut self, dm: impl Into<String>) -> SchemaPush<'_> {
SchemaPush::new(&mut self.connector, dm.into())
let max_ddl_refresh_delay = self.max_ddl_refresh_delay();
SchemaPush::new(&mut self.connector, dm.into(), max_ddl_refresh_delay)
}

pub fn tags(&self) -> BitFlags<Tags> {
Expand Down

0 comments on commit e4db9cc

Please sign in to comment.