diff --git a/.gitmodules b/.gitmodules index ec5d6208b8dd..037accdbe424 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,7 @@ [submodule "testing"] path = testing url = https://github.com/apache/arrow-testing +[submodule "datafusion-testing"] + path = datafusion-testing + url = https://github.com/apache/datafusion-testing.git + branch = main diff --git a/datafusion-testing b/datafusion-testing new file mode 160000 index 000000000000..e2e320c9477a --- /dev/null +++ b/datafusion-testing @@ -0,0 +1 @@ +Subproject commit e2e320c9477a6d8ab09662eae255887733c0e304 diff --git a/datafusion/sqllogictest/Cargo.toml b/datafusion/sqllogictest/Cargo.toml index 5c7d909d5c43..1bb88a8bd44f 100644 --- a/datafusion/sqllogictest/Cargo.toml +++ b/datafusion/sqllogictest/Cargo.toml @@ -45,9 +45,11 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } futures = { workspace = true } half = { workspace = true, default-features = true } +indicatif = "0.17" itertools = { workspace = true } log = { workspace = true } object_store = { workspace = true } +once_cell = { version = "1.20", optional = true } postgres-protocol = { version = "0.6.7", optional = true } postgres-types = { version = "0.2.8", features = ["derive", "with-chrono-0_4"], optional = true } rust_decimal = { version = "1.36.0", features = ["tokio-pg"] } @@ -56,6 +58,8 @@ rust_decimal = { version = "1.36.0", features = ["tokio-pg"] } sqllogictest = "=0.24.0" sqlparser = { workspace = true } tempfile = { workspace = true } +testcontainers = { version = "0.23", features = ["default"], optional = true } +testcontainers-modules = { version = "0.11", features = ["postgres"], optional = true } thiserror = "2.0.0" tokio = { workspace = true } tokio-postgres = { version = "0.7.12", optional = true } @@ -65,9 +69,12 @@ avro = ["datafusion/avro"] postgres = [ "bytes", "chrono", - "tokio-postgres", + "once_cell", "postgres-types", "postgres-protocol", + "testcontainers", + "testcontainers-modules", + "tokio-postgres", ] [dev-dependencies] diff --git a/datafusion/sqllogictest/README.md b/datafusion/sqllogictest/README.md index 885e92fee270..4a7dc09d7dd1 100644 --- a/datafusion/sqllogictest/README.md +++ b/datafusion/sqllogictest/README.md @@ -28,13 +28,14 @@ This crate is a submodule of DataFusion that contains an implementation of [sqll ## Overview This crate uses [sqllogictest-rs](https://github.com/risinglightdb/sqllogictest-rs) to parse and run `.slt` files in the -[`test_files`](test_files) directory of this crate. +[`test_files`](test_files) directory of this crate or the [`data/sqlite`](sqlite) +directory of the datafusion-testing crate. ## Testing setup 1. `rustup update stable` DataFusion uses the latest stable release of rust 2. `git submodule init` -3. `git submodule update` +3. `git submodule update --init --remote --recursive` ## Running tests: TLDR Examples @@ -160,7 +161,7 @@ cargo test --test sqllogictests -- information Test files that start with prefix `pg_compat_` verify compatibility with Postgres by running the same script files both with DataFusion and with Postgres -In order to run the sqllogictests running against a previously running Postgres instance, do: +In order to have the sqllogictest run against an existing running Postgres instance, do: ```shell PG_COMPAT=true PG_URI="postgresql://postgres@127.0.0.1/postgres" cargo test --features=postgres --test sqllogictests @@ -172,7 +173,7 @@ The environment variables: 2. `PG_URI` contains a `libpq` style connection string, whose format is described in [the docs](https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url) -One way to create a suitable a posgres container in docker is to use +One way to create a suitable a postgres container in docker is to use the [Official Image](https://hub.docker.com/_/postgres) with a command such as the following. Note the collation **must** be set to `C` otherwise `ORDER BY` will not match DataFusion and the tests will diff. @@ -185,6 +186,15 @@ docker run \ postgres ``` +If you do not want to create a new postgres database and you have docker +installed you can skip providing a PG_URI env variable and the sqllogictest +runner will automatically create a temporary postgres docker container. +For example: + +```shell +PG_COMPAT=true cargo test --features=postgres --test sqllogictests +``` + ## Running Tests: `tpch` Test files in `tpch` directory runs against the `TPCH` data set (SF = @@ -205,6 +215,34 @@ Then you need to add `INCLUDE_TPCH=true` to run tpch tests: INCLUDE_TPCH=true cargo test --test sqllogictests ``` +## Running Tests: `sqlite` + +Test files in `data/sqlite` directory of the datafusion-testing crate were +sourced from the [sqlite test suite](https://www.sqlite.org/sqllogictest/dir?ci=tip) and have been cleansed and updated to +run within DataFusion's sqllogictest runner. + +To run the sqlite tests you need to increase the rust stack size and add +`INCLUDE_SQLITE=true` to run the sqlite tests: + +```shell +export RUST_MIN_STACK=30485760; +INCLUDE_SQLITE=true cargo test --test sqllogictests +``` + +Note that there are well over 5 million queries in these tests and running the +sqlite tests will take a long time. You may wish to run them in release-nonlto mode: + +```shell +INCLUDE_SQLITE=true cargo test --profile release-nonlto --test sqllogictests +``` + +The sqlite tests can also be run with the postgres runner to verify compatibility: + +```shell +export RUST_MIN_STACK=30485760; +PG_COMPAT=true INCLUDE_SQLITE=true cargo test --features=postgres --test sqllogictests +``` + ## Updating tests: Completion Mode In test script completion mode, `sqllogictests` reads a prototype script and runs the statements and queries against the diff --git a/datafusion/sqllogictest/bin/sqllogictests.rs b/datafusion/sqllogictest/bin/sqllogictests.rs index 066cc8ee9824..498539c1674a 100644 --- a/datafusion/sqllogictest/bin/sqllogictests.rs +++ b/datafusion/sqllogictest/bin/sqllogictests.rs @@ -16,57 +16,129 @@ // under the License. use clap::Parser; +use datafusion_common::instant::Instant; use datafusion_common::utils::get_available_parallelism; +use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result}; +use datafusion_common_runtime::SpawnedTask; use datafusion_sqllogictest::{DataFusion, TestContext}; use futures::stream::StreamExt; +use indicatif::{ + HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle, +}; use itertools::Itertools; -use log::info; -use sqllogictest::{strict_column_validator, Normalizer}; +use log::Level::{Info, Warn}; +use log::{info, log_enabled, warn}; +#[cfg(feature = "postgres")] +use once_cell::sync::Lazy; +use sqllogictest::{ + parse_file, strict_column_validator, AsyncDB, Condition, Normalizer, Record, + Validator, +}; +#[cfg(feature = "postgres")] +use std::env::set_var; use std::ffi::OsStr; use std::fs; +#[cfg(feature = "postgres")] +use std::future::Future; use std::path::{Path, PathBuf}; - -use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result}; -use datafusion_common_runtime::SpawnedTask; +#[cfg(feature = "postgres")] +use std::{env, thread}; +#[cfg(feature = "postgres")] +use testcontainers::core::IntoContainerPort; +#[cfg(feature = "postgres")] +use testcontainers::runners::AsyncRunner; +#[cfg(feature = "postgres")] +use testcontainers::ImageExt; +#[cfg(feature = "postgres")] +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +#[cfg(feature = "postgres")] +use tokio::sync::{mpsc, Mutex}; +#[cfg(feature = "postgres")] +use ContainerCommands::{FetchHost, FetchPort}; const TEST_DIRECTORY: &str = "test_files/"; +const DATAFUSION_TESTING_TEST_DIRECTORY: &str = "../../datafusion-testing/data/"; const PG_COMPAT_FILE_PREFIX: &str = "pg_compat_"; +const SQLITE_PREFIX: &str = "sqlite"; pub fn main() -> Result<()> { tokio::runtime::Builder::new_multi_thread() .enable_all() - .build() - .unwrap() + .build()? .block_on(run_tests()) } +// Trailing whitespace from lines in SLT will typically be removed, but do not fail if it is not +// If particular test wants to cover trailing whitespace on a value, +// it should project additional non-whitespace column on the right. #[allow(clippy::ptr_arg)] -fn normalizer(s: &String) -> String { - // Trailing whitespace from lines in SLT will typically be removed, but do not fail if it is not - // If particular test wants to cover trailing whitespace on a value, - // it should project additional non-whitespace column on the right. - s.trim_end().to_owned() +fn value_normalizer(s: &String) -> String { + s.trim_end().to_string() } -fn value_validator( +fn sqlite_value_validator( normalizer: Normalizer, actual: &[Vec], expected: &[String], ) -> bool { - let expected = expected.iter().map(normalizer).collect::>(); - let actual = actual + let normalized_expected = expected.iter().map(normalizer).collect::>(); + let normalized_actual = actual + .iter() + .map(|strs| strs.iter().map(normalizer).join(" ")) + .collect_vec(); + + if log_enabled!(Info) && normalized_actual != normalized_expected { + info!("sqlite validation failed. actual vs expected:"); + for i in 0..normalized_actual.len() { + info!("[{i}] {}", normalized_actual[i]); + info!( + "[{i}] {}", + if normalized_expected.len() >= i { + &normalized_expected[i] + } else { + "No more results" + } + ); + } + } + + normalized_actual == normalized_expected +} + +fn df_value_validator( + normalizer: Normalizer, + actual: &[Vec], + expected: &[String], +) -> bool { + let normalized_expected = expected.iter().map(normalizer).collect::>(); + let normalized_actual = actual .iter() .map(|strs| strs.iter().join(" ")) - // Editors do not preserve trailing whitespace, so expected may or may not lack it included - .map(|str| normalizer(&str)) - .collect::>(); - actual == expected + .map(|str| str.trim_end().to_string()) + .collect_vec(); + + if log_enabled!(Warn) && normalized_actual != normalized_expected { + warn!("df validation failed. actual vs expected:"); + for i in 0..normalized_actual.len() { + warn!("[{i}] {}", normalized_actual[i]); + warn!( + "[{i}] {}", + if normalized_expected.len() >= i { + &normalized_expected[i] + } else { + "No more results" + } + ); + } + } + + normalized_actual == normalized_expected } /// Sets up an empty directory at test_files/scratch/ /// creating it if needed and clearing any file contents if it exists /// This allows tests for inserting to external tables or copy to -/// to persist data to disk and have consistent state when running +/// persist data to disk and have consistent state when running /// a new test fn setup_scratch_dir(name: &Path) -> Result<()> { // go from copy.slt --> copy @@ -97,23 +169,89 @@ async fn run_tests() -> Result<()> { } options.warn_on_ignored(); + #[cfg(feature = "postgres")] + let start_pg_database = options.postgres_runner && !is_pg_uri_set(); + #[cfg(feature = "postgres")] + if start_pg_database { + info!("Starting postgres db ..."); + + thread::spawn(|| { + execute_blocking(start_postgres( + &POSTGRES_IN, + &POSTGRES_HOST, + &POSTGRES_PORT, + &POSTGRES_STOPPED, + )) + }); + + POSTGRES_IN.tx.send(FetchHost).unwrap(); + let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap(); + + POSTGRES_IN.tx.send(FetchPort).unwrap(); + let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap(); + + let pg_uri = format!("postgresql://postgres:postgres@{db_host}:{db_port}/test"); + info!("Postgres uri is {pg_uri}"); + + set_var("PG_URI", pg_uri); + } + // Run all tests in parallel, reporting failures at the end // // Doing so is safe because each slt file runs with its own // `SessionContext` and should not have side effects (like // modifying shared state like `/tmp/`) + let m = MultiProgress::with_draw_target(ProgressDrawTarget::stderr_with_hz(1)); + let m_style = ProgressStyle::with_template( + "[{elapsed_precise}] {bar:40.cyan/blue} {pos:>7}/{len:7} {msg}", + ) + .unwrap() + .progress_chars("##-"); + + let start = Instant::now(); + let errors: Vec<_> = futures::stream::iter(read_test_files(&options)?) .map(|test_file| { + let validator = if options.include_sqlite + && test_file.relative_path.starts_with(SQLITE_PREFIX) + { + sqlite_value_validator + } else { + df_value_validator + }; + + let m_clone = m.clone(); + let m_style_clone = m_style.clone(); + SpawnedTask::spawn(async move { - let file_path = test_file.relative_path.clone(); - let start = datafusion::common::instant::Instant::now(); match (options.postgres_runner, options.complete) { - (false, false) => run_test_file(test_file).await?, - (false, true) => run_complete_file(test_file).await?, - (true, false) => run_test_file_with_postgres(test_file).await?, - (true, true) => run_complete_file_with_postgres(test_file).await?, + (false, false) => { + run_test_file(test_file, validator, m_clone, m_style_clone) + .await? + } + (false, true) => { + run_complete_file(test_file, validator, m_clone, m_style_clone) + .await? + } + (true, false) => { + run_test_file_with_postgres( + test_file, + validator, + m_clone, + m_style_clone, + ) + .await? + } + (true, true) => { + run_complete_file_with_postgres( + test_file, + validator, + m_clone, + m_style_clone, + ) + .await? + } } - println!("Executed {:?}. Took {:?}", file_path, start.elapsed()); Ok(()) as Result<()> }) .join() @@ -136,6 +274,15 @@ async fn run_tests() -> Result<()> { .collect() .await; + m.println(format!("Completed in {}", HumanDuration(start.elapsed())))?; + + #[cfg(feature = "postgres")] + if start_pg_database { + println!("Stopping postgres db ..."); + POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(()); + POSTGRES_STOPPED.rx.lock().await.recv().await; + } + // report on any errors if !errors.is_empty() { for e in &errors { @@ -147,60 +294,148 @@ async fn run_tests() -> Result<()> { } } -async fn run_test_file(test_file: TestFile) -> Result<()> { +#[cfg(feature = "postgres")] +fn is_pg_uri_set() -> bool { + match env::var("PG_URI") { + Ok(_) => true, + Err(_) => false, + } +} + +async fn run_test_file( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { let TestFile { path, relative_path, } = test_file; - info!("Running with DataFusion runner: {}", path.display()); let Some(test_ctx) = TestContext::try_new_for_test_file(&relative_path).await else { info!("Skipping: {}", path.display()); return Ok(()); }; setup_scratch_dir(&relative_path)?; + + let count: u64 = get_record_count(&path, "Datafusion".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + let mut runner = sqllogictest::Runner::new(|| async { Ok(DataFusion::new( test_ctx.session_ctx().clone(), relative_path.clone(), + pb.clone(), )) }); + runner.add_label("Datafusion"); runner.with_column_validator(strict_column_validator); - runner.with_normalizer(normalizer); - runner.with_validator(value_validator); - runner + runner.with_normalizer(value_normalizer); + runner.with_validator(validator); + + let res = runner .run_file_async(path) .await - .map_err(|e| DataFusionError::External(Box::new(e))) + .map_err(|e| DataFusionError::External(Box::new(e))); + + pb.finish_and_clear(); + + res +} + +fn get_record_count(path: &PathBuf, label: String) -> u64 { + let records: Vec::ColumnType>> = + parse_file(path).unwrap(); + let mut count: u64 = 0; + + records.iter().for_each(|rec| match rec { + Record::Query { conditions, .. } => { + if conditions.is_empty() + || !conditions.contains(&Condition::SkipIf { + label: label.clone(), + }) + || conditions.contains(&Condition::OnlyIf { + label: label.clone(), + }) + { + count += 1; + } + } + Record::Statement { conditions, .. } => { + if conditions.is_empty() + || !conditions.contains(&Condition::SkipIf { + label: label.clone(), + }) + || conditions.contains(&Condition::OnlyIf { + label: label.clone(), + }) + { + count += 1; + } + } + _ => {} + }); + + count } #[cfg(feature = "postgres")] -async fn run_test_file_with_postgres(test_file: TestFile) -> Result<()> { +async fn run_test_file_with_postgres( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { path, relative_path, } = test_file; - info!("Running with Postgres runner: {}", path.display()); setup_scratch_dir(&relative_path)?; - let mut runner = - sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone())); + + let count: u64 = get_record_count(&path, "postgresql".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + + let mut runner = sqllogictest::Runner::new(|| { + Postgres::connect(relative_path.clone(), pb.clone()) + }); + runner.add_label("postgres"); runner.with_column_validator(strict_column_validator); - runner.with_normalizer(normalizer); - runner.with_validator(value_validator); + runner.with_normalizer(value_normalizer); + runner.with_validator(validator); runner .run_file_async(path) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; + + pb.finish_and_clear(); + Ok(()) } #[cfg(not(feature = "postgres"))] -async fn run_test_file_with_postgres(_test_file: TestFile) -> Result<()> { +async fn run_test_file_with_postgres( + _test_file: TestFile, + _validator: Validator, + _mp: MultiProgress, + _mp_style: ProgressStyle, +) -> Result<()> { use datafusion_common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") } -async fn run_complete_file(test_file: TestFile) -> Result<()> { +async fn run_complete_file( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { let TestFile { path, relative_path, @@ -213,30 +448,48 @@ async fn run_complete_file(test_file: TestFile) -> Result<()> { return Ok(()); }; setup_scratch_dir(&relative_path)?; + + let count: u64 = get_record_count(&path, "Datafusion".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + let mut runner = sqllogictest::Runner::new(|| async { Ok(DataFusion::new( test_ctx.session_ctx().clone(), relative_path.clone(), + pb.clone(), )) }); + let col_separator = " "; - runner + let res = runner .update_test_file( path, col_separator, - value_validator, - normalizer, + validator, + value_normalizer, strict_column_validator, ) .await // Can't use e directly because it isn't marked Send, so turn it into a string. .map_err(|e| { DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}")) - }) + }); + + pb.finish_and_clear(); + + res } #[cfg(feature = "postgres")] -async fn run_complete_file_with_postgres(test_file: TestFile) -> Result<()> { +async fn run_complete_file_with_postgres( + test_file: TestFile, + validator: Validator, + mp: MultiProgress, + mp_style: ProgressStyle, +) -> Result<()> { use datafusion_sqllogictest::Postgres; let TestFile { path, @@ -247,26 +500,48 @@ async fn run_complete_file_with_postgres(test_file: TestFile) -> Result<()> { path.display() ); setup_scratch_dir(&relative_path)?; - let mut runner = - sqllogictest::Runner::new(|| Postgres::connect(relative_path.clone())); + + let count: u64 = get_record_count(&path, "postgresql".to_string()); + let pb = mp.add(ProgressBar::new(count)); + + pb.set_style(mp_style); + pb.set_message(format!("{:?}", &relative_path)); + + let mut runner = sqllogictest::Runner::new(|| { + Postgres::connect(relative_path.clone(), pb.clone()) + }); + runner.add_label("postgres"); + runner.with_column_validator(strict_column_validator); + runner.with_normalizer(value_normalizer); + runner.with_validator(validator); + let col_separator = " "; - runner + let res = runner .update_test_file( path, col_separator, - value_validator, - normalizer, + validator, + value_normalizer, strict_column_validator, ) .await // Can't use e directly because it isn't marked Send, so turn it into a string. .map_err(|e| { DataFusionError::Execution(format!("Error completing {relative_path:?}: {e}")) - }) + }); + + pb.finish_and_clear(); + + res } #[cfg(not(feature = "postgres"))] -async fn run_complete_file_with_postgres(_test_file: TestFile) -> Result<()> { +async fn run_complete_file_with_postgres( + _test_file: TestFile, + _validator: Validator, + _mp: MultiProgress, + _mp_style: ProgressStyle, +) -> Result<()> { use datafusion_common::plan_err; plan_err!("Can not run with postgres as postgres feature is not enabled") } @@ -282,11 +557,14 @@ struct TestFile { impl TestFile { fn new(path: PathBuf) -> Self { - let relative_path = PathBuf::from( - path.to_string_lossy() - .strip_prefix(TEST_DIRECTORY) - .unwrap_or(""), - ); + let p = path.to_string_lossy(); + let relative_path = PathBuf::from(if p.starts_with(TEST_DIRECTORY) { + p.strip_prefix(TEST_DIRECTORY).unwrap() + } else if p.starts_with(DATAFUSION_TESTING_TEST_DIRECTORY) { + p.strip_prefix(DATAFUSION_TESTING_TEST_DIRECTORY).unwrap() + } else { + "" + }); Self { path, @@ -298,6 +576,14 @@ impl TestFile { self.path.extension() == Some(OsStr::new("slt")) } + fn check_sqlite(&self, options: &Options) -> bool { + if !self.relative_path.starts_with(SQLITE_PREFIX) { + return true; + } + + options.include_sqlite + } + fn check_tpch(&self, options: &Options) -> bool { if !self.relative_path.starts_with("tpch") { return true; @@ -310,15 +596,29 @@ impl TestFile { fn read_test_files<'a>( options: &'a Options, ) -> Result + 'a>> { - Ok(Box::new( - read_dir_recursive(TEST_DIRECTORY)? + let mut paths = read_dir_recursive(TEST_DIRECTORY)? + .into_iter() + .map(TestFile::new) + .filter(|f| options.check_test_file(&f.relative_path)) + .filter(|f| f.is_slt_file()) + .filter(|f| f.check_tpch(options)) + .filter(|f| f.check_sqlite(options)) + .filter(|f| options.check_pg_compat_file(f.path.as_path())) + .collect::>(); + if options.include_sqlite { + let mut sqlite_paths = read_dir_recursive(DATAFUSION_TESTING_TEST_DIRECTORY)? .into_iter() .map(TestFile::new) .filter(|f| options.check_test_file(&f.relative_path)) .filter(|f| f.is_slt_file()) - .filter(|f| f.check_tpch(options)) - .filter(|f| options.check_pg_compat_file(f.path.as_path())), - )) + .filter(|f| f.check_sqlite(options)) + .filter(|f| options.check_pg_compat_file(f.path.as_path())) + .collect::>(); + + paths.append(&mut sqlite_paths) + } + + Ok(Box::new(paths.into_iter())) } fn read_dir_recursive>(path: P) -> Result> { @@ -350,7 +650,7 @@ fn read_dir_recursive_impl(dst: &mut Vec, path: &Path) -> Result<()> { /// Parsed command line options /// -/// This structure attempts to mimic the command line options of the built in rust test runner +/// This structure attempts to mimic the command line options of the built-in rust test runner /// accepted by IDEs such as CLion that pass arguments /// /// See for more details @@ -367,6 +667,9 @@ struct Options { )] postgres_runner: bool, + #[clap(long, env = "INCLUDE_SQLITE", help = "Include sqlite files")] + include_sqlite: bool, + #[clap(long, env = "INCLUDE_TPCH", help = "Include tpch files")] include_tpch: bool, @@ -431,10 +734,13 @@ impl Options { .any(|filter| relative_path.to_string_lossy().contains(filter)) } - /// Postgres runner executes only tests in files with specific names + /// Postgres runner executes only tests in files with specific names or in + /// specific folders fn check_pg_compat_file(&self, path: &Path) -> bool { let file_name = path.file_name().unwrap().to_str().unwrap().to_string(); - !self.postgres_runner || file_name.starts_with(PG_COMPAT_FILE_PREFIX) + !self.postgres_runner + || file_name.starts_with(PG_COMPAT_FILE_PREFIX) + || (self.include_sqlite && path.to_string_lossy().contains(SQLITE_PREFIX)) } /// Logs warning messages to stdout if any ignored options are passed @@ -452,3 +758,87 @@ impl Options { } } } + +#[cfg(feature = "postgres")] +pub async fn start_postgres( + in_channel: &Channel, + host_channel: &Channel, + port_channel: &Channel, + stopped_channel: &Channel<()>, +) { + info!("Starting postgres test container with user postgres/postgres and db test"); + + let container = testcontainers_modules::postgres::Postgres::default() + .with_user("postgres") + .with_password("postgres") + .with_db_name("test") + .with_mapped_port(16432, 5432.tcp()) + .with_tag("17-alpine") + .start() + .await + .unwrap(); + // uncomment this if you are running docker in docker + // let host = "host.docker.internal".to_string(); + let host = container.get_host().await.unwrap().to_string(); + let port = container.get_host_port_ipv4(5432).await.unwrap(); + + let mut rx = in_channel.rx.lock().await; + while let Some(command) = rx.recv().await { + match command { + FetchHost => host_channel.tx.send(host.clone()).unwrap(), + FetchPort => port_channel.tx.send(port).unwrap(), + ContainerCommands::Stop => { + container.stop().await.unwrap(); + stopped_channel.tx.send(()).unwrap(); + rx.close(); + } + } + } +} + +#[cfg(feature = "postgres")] +#[derive(Debug)] +pub enum ContainerCommands { + FetchHost, + FetchPort, + Stop, +} + +#[cfg(feature = "postgres")] +pub struct Channel { + pub tx: UnboundedSender, + pub rx: Mutex>, +} + +#[cfg(feature = "postgres")] +pub fn channel() -> Channel { + let (tx, rx) = mpsc::unbounded_channel(); + Channel { + tx, + rx: Mutex::new(rx), + } +} + +#[cfg(feature = "postgres")] +pub fn execute_blocking(f: F) { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(f); +} + +#[cfg(feature = "postgres")] +pub struct HostPort { + pub host: String, + pub port: u16, +} + +#[cfg(feature = "postgres")] +static POSTGRES_IN: Lazy> = Lazy::new(channel); +#[cfg(feature = "postgres")] +static POSTGRES_HOST: Lazy> = Lazy::new(channel); +#[cfg(feature = "postgres")] +static POSTGRES_PORT: Lazy> = Lazy::new(channel); +#[cfg(feature = "postgres")] +static POSTGRES_STOPPED: Lazy> = Lazy::new(channel); diff --git a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs index 5c24b49cfe86..e696058484a9 100644 --- a/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs +++ b/datafusion/sqllogictest/src/engines/datafusion_engine/runner.rs @@ -18,26 +18,49 @@ use std::sync::Arc; use std::{path::PathBuf, time::Duration}; +use super::{error::Result, normalize, DFSqlLogicTestError}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion::physical_plan::common::collect; use datafusion::physical_plan::execute_stream; use datafusion::prelude::SessionContext; -use log::info; +use indicatif::ProgressBar; +use log::Level::{Debug, Info}; +use log::{debug, log_enabled, warn}; use sqllogictest::DBOutput; - -use super::{error::Result, normalize, DFSqlLogicTestError}; +use tokio::time::Instant; use crate::engines::output::{DFColumnType, DFOutput}; pub struct DataFusion { ctx: SessionContext, relative_path: PathBuf, + pb: ProgressBar, } impl DataFusion { - pub fn new(ctx: SessionContext, relative_path: PathBuf) -> Self { - Self { ctx, relative_path } + pub fn new(ctx: SessionContext, relative_path: PathBuf, pb: ProgressBar) -> Self { + Self { + ctx, + relative_path, + pb, + } + } + + fn update_slow_count(&self) { + let msg = self.pb.message(); + let split: Vec<&str> = msg.split(" ").collect(); + let mut current_count = 0; + + if split.len() > 2 { + // third match will be current slow count + current_count = split[2].parse::().unwrap(); + } + + current_count += 1; + + self.pb + .set_message(format!("{} - {} took > 500 ms", split[0], current_count)); } } @@ -47,12 +70,32 @@ impl sqllogictest::AsyncDB for DataFusion { type ColumnType = DFColumnType; async fn run(&mut self, sql: &str) -> Result { - info!( - "[{}] Running query: \"{}\"", - self.relative_path.display(), - sql - ); - run_query(&self.ctx, sql).await + if log_enabled!(Debug) { + debug!( + "[{}] Running query: \"{}\"", + self.relative_path.display(), + sql + ); + } + + let start = Instant::now(); + let result = run_query(&self.ctx, sql).await; + let duration = start.elapsed(); + + if duration.gt(&Duration::from_millis(500)) { + self.update_slow_count(); + } + + self.pb.inc(1); + + if log_enabled!(Info) && duration.gt(&Duration::from_secs(2)) { + warn!( + "[{}] Running query took more than 2 sec ({duration:?}): \"{sql}\"", + self.relative_path.display() + ); + } + + result } /// Engine name of current database. diff --git a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs index a490488cd764..1439695d62c6 100644 --- a/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs +++ b/datafusion/sqllogictest/src/engines/postgres_engine/mod.rs @@ -15,22 +15,24 @@ // specific language governing permissions and limitations // under the License. -/// Postgres engine implementation for sqllogictest. -use std::path::{Path, PathBuf}; -use std::str::FromStr; - use async_trait::async_trait; use bytes::Bytes; use futures::{SinkExt, StreamExt}; -use log::debug; +use log::{debug, info}; use sqllogictest::DBOutput; +/// Postgres engine implementation for sqllogictest. +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::time::Duration; use tokio::task::JoinHandle; use super::conversion::*; use crate::engines::output::{DFColumnType, DFOutput}; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; +use indicatif::ProgressBar; use postgres_types::Type; use rust_decimal::Decimal; +use tokio::time::Instant; use tokio_postgres::{Column, Row}; use types::PgRegtype; @@ -55,6 +57,7 @@ pub struct Postgres { join_handle: JoinHandle<()>, /// Relative test file path relative_path: PathBuf, + pb: ProgressBar, } impl Postgres { @@ -71,11 +74,11 @@ impl Postgres { /// ``` /// /// See https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url for format - pub async fn connect(relative_path: PathBuf) -> Result { + pub async fn connect(relative_path: PathBuf, pb: ProgressBar) -> Result { let uri = std::env::var("PG_URI").map_or(PG_URI.to_string(), std::convert::identity); - debug!("Using postgres connection string: {uri}"); + info!("Using postgres connection string: {uri}"); let config = tokio_postgres::Config::from_str(&uri)?; @@ -113,6 +116,7 @@ impl Postgres { client, join_handle, relative_path, + pb, }) } @@ -181,6 +185,22 @@ impl Postgres { tx.commit().await?; Ok(DBOutput::StatementComplete(0)) } + + fn update_slow_count(&self) { + let msg = self.pb.message(); + let split: Vec<&str> = msg.split(" ").collect(); + let mut current_count = 0; + + if split.len() > 2 { + // second match will be current slow count + current_count += split[2].parse::().unwrap(); + } + + current_count += 1; + + self.pb + .set_message(format!("{} - {} took > 500 ms", split[0], current_count)); + } } /// remove single quotes from the start and end of the string @@ -194,16 +214,13 @@ fn no_quotes(t: &str) -> &str { /// return a schema name fn schema_name(relative_path: &Path) -> String { relative_path - .file_name() - .map(|name| { - name.to_string_lossy() - .chars() - .filter(|ch| ch.is_ascii_alphabetic()) - .collect::() - .trim_start_matches("pg_") - .to_string() - }) - .unwrap_or_else(|| "default_schema".to_string()) + .to_string_lossy() + .to_string() + .chars() + .filter(|ch| ch.is_ascii_alphanumeric()) + .collect::() + .trim_start_matches("pg_") + .to_string() } impl Drop for Postgres { @@ -221,7 +238,7 @@ impl sqllogictest::AsyncDB for Postgres { &mut self, sql: &str, ) -> Result, Self::Error> { - println!( + debug!( "[{}] Running query: \"{}\"", self.relative_path.display(), sql @@ -242,14 +259,24 @@ impl sqllogictest::AsyncDB for Postgres { }; if lower_sql.starts_with("copy") { + self.pb.inc(1); return self.run_copy_command(sql).await; } if !is_query_sql { self.client.execute(sql, &[]).await?; + self.pb.inc(1); return Ok(DBOutput::StatementComplete(0)); } + let start = Instant::now(); let rows = self.client.query(sql, &[]).await?; + let duration = start.elapsed(); + + if duration.gt(&Duration::from_millis(500)) { + self.update_slow_count(); + } + + self.pb.inc(1); let types: Vec = if rows.is_empty() { self.client diff --git a/docs/source/contributor-guide/getting_started.md b/docs/source/contributor-guide/getting_started.md index 696d6d3a0fe2..5d85e07f3eaa 100644 --- a/docs/source/contributor-guide/getting_started.md +++ b/docs/source/contributor-guide/getting_started.md @@ -74,7 +74,7 @@ Testing setup: - `rustup update stable` DataFusion uses the latest stable release of rust - `git submodule init` -- `git submodule update` +- `git submodule update --init --remote --recursive` Formatting instructions: