diff --git a/libs/test-setup/src/test_api_args.rs b/libs/test-setup/src/test_api_args.rs index f104d2710f25..548136554b35 100644 --- a/libs/test-setup/src/test_api_args.rs +++ b/libs/test-setup/src/test_api_args.rs @@ -2,6 +2,7 @@ use crate::{logging, mssql, mysql, postgres, Capabilities, Tags}; use enumflags2::BitFlags; use once_cell::sync::Lazy; use quaint::single::Quaint; +use std::time::Duration; use std::{fmt::Display, io::Write as _}; #[derive(Debug)] @@ -11,6 +12,7 @@ pub(crate) struct DbUnderTest { shadow_database_url: Option, provider: &'static str, pub(crate) tags: BitFlags, + pub(crate) max_ddl_refresh_delay: Option, } const MISSING_TEST_DATABASE_URL_MSG: &str = r#" @@ -40,13 +42,22 @@ static DB_UNDER_TEST: Lazy> = Lazy::new(|| { capabilities: Capabilities::CreateDatabase.into(), provider: "sqlite", shadow_database_url, + max_ddl_refresh_delay: None, }), "mysql" => { let tags = mysql::get_mysql_tags(&database_url)?; let mut capabilities = Capabilities::Enums | Capabilities::Json; + let mut max_refresh_delay = None; if tags.contains(Tags::Vitess) { capabilities |= Capabilities::CreateDatabase; + // Vitess is an eventually consistent system that propagates schema changes + // from vttablet to vtgate asynchronously. We might need to wait for a while before + // start querying the database after a schema change is made. + // + // For schema changes that do not alter the table column names, the schema is not + // required to be reloaded. + max_refresh_delay = Some(Duration::from_millis(500)); } Ok(DbUnderTest { @@ -55,6 +66,7 @@ static DB_UNDER_TEST: Lazy> = Lazy::new(|| { capabilities, provider: "mysql", shadow_database_url, + max_ddl_refresh_delay: max_refresh_delay, }) } "postgresql" | "postgres" => Ok({ @@ -75,6 +87,7 @@ static DB_UNDER_TEST: Lazy> = Lazy::new(|| { | Capabilities::CreateDatabase, provider, shadow_database_url, + max_ddl_refresh_delay: None, } }), "sqlserver" => Ok(DbUnderTest { @@ -83,6 +96,7 @@ static DB_UNDER_TEST: Lazy> = Lazy::new(|| { capabilities: Capabilities::CreateDatabase.into(), provider: "sqlserver", shadow_database_url, + max_ddl_refresh_delay: None, }), _ => Err("Unknown database URL".into()), } @@ -191,6 +205,10 @@ impl TestApiArgs { pub fn tags(&self) -> BitFlags { self.db.tags } + + pub fn max_ddl_refresh_delay(&self) -> Option { + self.db.max_ddl_refresh_delay.clone() + } } pub struct DatasourceBlock<'a> { diff --git a/schema-engine/sql-migration-tests/src/commands/schema_push.rs b/schema-engine/sql-migration-tests/src/commands/schema_push.rs index 346b032f5781..541dba81797f 100644 --- a/schema-engine/sql-migration-tests/src/commands/schema_push.rs +++ b/schema-engine/sql-migration-tests/src/commands/schema_push.rs @@ -2,6 +2,7 @@ use colored::Colorize; use schema_core::{ commands::schema_push, json_rpc::types::*, schema_connector::SchemaConnector, CoreError, CoreResult, }; +use std::time::Duration; use std::{borrow::Cow, fmt::Debug}; use tracing_futures::Instrument; @@ -11,15 +12,18 @@ pub struct SchemaPush<'a> { force: bool, /// Purely for logging diagnostics. migration_id: Option<&'a str>, + // In eventually-consistent systems, we might need to wait for a while before the system refreshes + max_ddl_refresh_delay: Option, } impl<'a> SchemaPush<'a> { - pub fn new(api: &'a mut dyn SchemaConnector, schema: String) -> Self { + pub fn new(api: &'a mut dyn SchemaConnector, schema: String, max_refresh_delay: Option) -> Self { SchemaPush { api, schema, force: false, migration_id: None, + max_ddl_refresh_delay: max_refresh_delay, } } @@ -44,6 +48,10 @@ impl<'a> SchemaPush<'a> { let output = test_setup::runtime::run_with_thread_local_runtime(fut)?; + if let Some(delay) = self.max_ddl_refresh_delay { + std::thread::sleep(delay); + } + Ok(SchemaPushAssertion { result: output, context: None, diff --git a/schema-engine/sql-migration-tests/src/multi_engine_test_api.rs b/schema-engine/sql-migration-tests/src/multi_engine_test_api.rs index e4fff7f71502..4551a94e6445 100644 --- a/schema-engine/sql-migration-tests/src/multi_engine_test_api.rs +++ b/schema-engine/sql-migration-tests/src/multi_engine_test_api.rs @@ -3,6 +3,7 @@ //! A TestApi that is initialized without IO or async code and can instantiate //! multiple schema engines. +use std::time::Duration; pub use test_macros::test_connector; pub use test_setup::sqlite_test_url; pub use test_setup::{runtime::run_with_thread_local_runtime as tok, BitFlags, Capabilities, Tags}; @@ -158,6 +159,12 @@ impl TestApi { self.tags().contains(Tags::Vitess) } + /// Returns a duration that is guaranteed to be larger than the maximum refresh rate after a + /// DDL statement + pub(crate) fn max_ddl_refresh_delay(&self) -> Option { + self.args.max_ddl_refresh_delay() + } + /// Returns whether the database automatically lower-cases table names. pub fn lower_cases_table_names(&self) -> bool { self.tags().contains(Tags::LowerCasesTableNames) @@ -203,6 +210,7 @@ impl TestApi { connection_info, tags: self.args.tags(), namespaces: self.args.namespaces(), + max_ddl_refresh_delay: self.args.max_ddl_refresh_delay(), } } @@ -276,6 +284,7 @@ pub struct EngineTestApi { connection_info: ConnectionInfo, tags: BitFlags, namespaces: &'static [&'static str], + max_ddl_refresh_delay: Option, } impl EngineTestApi { @@ -320,7 +329,7 @@ impl EngineTestApi { /// Plan a `schemaPush` command pub fn schema_push(&mut self, dm: impl Into) -> SchemaPush<'_> { - SchemaPush::new(&mut self.connector, dm.into()) + SchemaPush::new(&mut self.connector, dm.into(), self.max_ddl_refresh_delay) } /// The schema name of the current connected database. diff --git a/schema-engine/sql-migration-tests/src/test_api.rs b/schema-engine/sql-migration-tests/src/test_api.rs index 4dab27d96ef9..bf0790cab770 100644 --- a/schema-engine/sql-migration-tests/src/test_api.rs +++ b/schema-engine/sql-migration-tests/src/test_api.rs @@ -21,6 +21,7 @@ use schema_core::{ }; use sql_schema_connector::SqlSchemaConnector; use sql_schema_describer::SqlSchema; +use std::time::Duration; use std::{ borrow::Cow, fmt::{Display, Write}, @@ -189,6 +190,12 @@ impl TestApi { self.root.is_vitess() } + /// Returns a duration that is guaranteed to be larger than the maximum refresh rate after a + /// DDL statement + pub fn max_ddl_refresh_delay(&self) -> Option { + self.root.max_ddl_refresh_delay() + } + /// Insert test values pub fn insert<'a>(&'a mut self, table_name: &'a str) -> SingleRowInsert<'a> { SingleRowInsert { @@ -316,12 +323,13 @@ impl TestApi { /// Plan a `schemaPush` command adding the datasource pub fn schema_push_w_datasource(&mut self, dm: impl Into) -> SchemaPush<'_> { let schema = self.datamodel_with_provider(&dm.into()); - SchemaPush::new(&mut self.connector, schema) + self.schema_push(schema) } /// Plan a `schemaPush` command pub fn schema_push(&mut self, dm: impl Into) -> SchemaPush<'_> { - SchemaPush::new(&mut self.connector, dm.into()) + let max_ddl_refresh_delay = self.max_ddl_refresh_delay(); + SchemaPush::new(&mut self.connector, dm.into(), max_ddl_refresh_delay) } pub fn tags(&self) -> BitFlags {