From f0cda7061cb1f645de58d5661830490a3e26bc2e Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 14 Jan 2025 14:55:48 +0800 Subject: [PATCH] feat(bin): abort connections before dropping temp databases in parallel run (#247) * feat(bin): abort connections before dropping temp databases in parallel run Signed-off-by: Bugen Zhao * fmt Signed-off-by: Bugen Zhao * refine comments Signed-off-by: Bugen Zhao * use one from `tokio-util` Signed-off-by: Bugen Zhao * bump version and add release notes Signed-off-by: Bugen Zhao --------- Signed-off-by: Bugen Zhao --- CHANGELOG.md | 4 ++++ Cargo.lock | 15 ++++++++++++--- Cargo.toml | 2 +- sqllogictest-bin/Cargo.toml | 1 + sqllogictest-bin/src/main.rs | 11 ++++++++--- 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78b14ed..62c0a4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## [0.26.3] - 2025-01-14 + +* bin: when `--fail-fast` is enabled, abort all remaining connections before dropping temporary databases. + ## [0.26.2] - 2025-01-08 * bin: support `--fail-fast`, and add env vars `SLT_FAIL_FAST` and `SLT_KEEP_DB_ON_FAILURE` diff --git a/Cargo.lock b/Cargo.lock index 61011e5..7d60aca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -779,6 +779,12 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1884,7 +1890,7 @@ dependencies = [ [[package]] name = "sqllogictest" -version = "0.26.2" +version = "0.26.3" dependencies = [ "async-trait", "educe", @@ -1907,7 +1913,7 @@ dependencies = [ [[package]] name = "sqllogictest-bin" -version = "0.26.2" +version = "0.26.3" dependencies = [ "anyhow", "async-trait", @@ -1923,13 +1929,14 @@ dependencies = [ "sqllogictest", "sqllogictest-engines", "tokio", + "tokio-util", "tracing", "tracing-subscriber", ] [[package]] name = "sqllogictest-engines" -version = "0.26.2" +version = "0.26.3" dependencies = [ "async-trait", "bytes", @@ -2212,6 +2219,8 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index b8c8356..52e6c96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ resolver = "2" members = ["sqllogictest", "sqllogictest-bin", "sqllogictest-engines", "tests"] [workspace.package] -version = "0.26.2" +version = "0.26.3" edition = "2021" homepage = "https://github.com/risinglightdb/sqllogictest-rs" keywords = ["sql", "database", "parser", "cli"] diff --git a/sqllogictest-bin/Cargo.toml b/sqllogictest-bin/Cargo.toml index f8aa4b4..6587463 100644 --- a/sqllogictest-bin/Cargo.toml +++ b/sqllogictest-bin/Cargo.toml @@ -33,6 +33,7 @@ tokio = { version = "1", features = [ "fs", "process", ] } +tokio-util = { version = "0.7.12", features = ["rt"] } fs-err = "3.0.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing = "0.1" diff --git a/sqllogictest-bin/src/main.rs b/sqllogictest-bin/src/main.rs index f6e48e2..2c6dd46 100644 --- a/sqllogictest-bin/src/main.rs +++ b/sqllogictest-bin/src/main.rs @@ -20,6 +20,7 @@ use sqllogictest::{ default_column_validator, default_normalizer, default_validator, update_record_with_output, AsyncDB, Injected, MakeConnection, Record, Runner, }; +use tokio_util::task::AbortOnDropHandle; #[derive(Default, Copy, Clone, Debug, PartialEq, Eq, ValueEnum)] #[must_use] @@ -314,7 +315,7 @@ async fn run_parallel( } } - let mut stream = futures::stream::iter(create_databases.into_iter()) + let mut stream = futures::stream::iter(create_databases) .map(|(db_name, filename)| { let mut config = config.clone(); config.db.clone_from(&db_name); @@ -322,13 +323,13 @@ async fn run_parallel( let engine = engine.clone(); let labels = labels.to_vec(); async move { - let (buf, res) = tokio::spawn(async move { + let (buf, res) = AbortOnDropHandle::new(tokio::spawn(async move { let mut buf = vec![]; let res = connect_and_run_test_file(&mut buf, filename, &engine, config, &labels) .await; (buf, res) - }) + })) .await .unwrap(); (db_name, file, res, buf) @@ -396,6 +397,10 @@ async fn run_parallel( start.elapsed().as_millis() ); + // If `fail_fast`, there could be some ongoing cases (then active connections) + // in the stream. Abort them before dropping temporary databases. + drop(stream); + for db_name in db_names { if keep_db_on_failure && failed_db.contains(&db_name) { eprintln!(