diff --git a/Cargo.toml b/Cargo.toml index 38b5627bc187..e1bd0d7aa72b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,7 +116,7 @@ indexmap = "2.0.0" itertools = "0.12" log = "^0.4" num_cpus = "1.13.0" -object_store = { version = "0.10.1", default-features = false } +object_store = { version = "0.10.2", default-features = false } parking_lot = "0.12" parquet = { version = "52.2.0", default-features = false, features = [ "arrow", diff --git a/datafusion-cli/examples/cli-session-context.rs b/datafusion-cli/examples/cli-session-context.rs index 8da52ed84a5f..1a8f15c8731b 100644 --- a/datafusion-cli/examples/cli-session-context.rs +++ b/datafusion-cli/examples/cli-session-context.rs @@ -82,7 +82,7 @@ impl CliSessionContext for MyUnionerContext { #[tokio::main] /// Runs the example. pub async fn main() { - let mut my_ctx = MyUnionerContext::default(); + let my_ctx = MyUnionerContext::default(); let mut print_options = PrintOptions { format: datafusion_cli::print_format::PrintFormat::Automatic, @@ -91,7 +91,5 @@ pub async fn main() { color: true, }; - exec_from_repl(&mut my_ctx, &mut print_options) - .await - .unwrap(); + exec_from_repl(&my_ctx, &mut print_options).await.unwrap(); } diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs index 273eb30d3a71..c4636f1ce0e0 100644 --- a/datafusion-cli/src/catalog.rs +++ b/datafusion-cli/src/catalog.rs @@ -240,7 +240,7 @@ mod tests { use datafusion::prelude::SessionContext; fn setup_context() -> (SessionContext, Arc) { - let mut ctx = SessionContext::new(); + let ctx = SessionContext::new(); ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new( ctx.state().catalog_list().clone(), ctx.state_weak_ref(), diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs index 1a6c023d3b50..05c00d634c94 100644 --- a/datafusion-cli/src/command.rs +++ b/datafusion-cli/src/command.rs @@ -55,7 +55,7 @@ pub enum OutputFormat { impl Command { pub async fn execute( &self, - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, print_options: &mut PrintOptions, ) -> Result<()> { match self { diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs index b78f32e0ac48..178bce6f2fe6 100644 --- a/datafusion-cli/src/exec.rs +++ b/datafusion-cli/src/exec.rs @@ -49,7 +49,7 @@ use tokio::signal; /// run and execute SQL statements and commands, against a context with the given print options pub async fn exec_from_commands( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, commands: Vec, print_options: &PrintOptions, ) -> Result<()> { @@ -62,7 +62,7 @@ pub async fn exec_from_commands( /// run and execute SQL statements and commands from a file, against a context with the given print options pub async fn exec_from_lines( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, reader: &mut BufReader, print_options: &PrintOptions, ) -> Result<()> { @@ -102,7 +102,7 @@ pub async fn exec_from_lines( } pub async fn exec_from_files( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, files: Vec, print_options: &PrintOptions, ) -> Result<()> { @@ -121,7 +121,7 @@ pub async fn exec_from_files( /// run and execute SQL statements and commands against a context with the given print options pub async fn exec_from_repl( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, print_options: &mut PrintOptions, ) -> rustyline::Result<()> { let mut rl = Editor::new()?; @@ -204,7 +204,7 @@ pub async fn exec_from_repl( } pub(super) async fn exec_and_print( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, print_options: &PrintOptions, sql: String, ) -> Result<()> { @@ -300,7 +300,7 @@ fn config_file_type_from_str(ext: &str) -> Option { } async fn create_plan( - ctx: &mut dyn CliSessionContext, + ctx: &dyn CliSessionContext, statement: Statement, ) -> Result { let mut plan = ctx.session_state().statement_to_plan(statement).await?; @@ -473,7 +473,7 @@ mod tests { "cos://bucket/path/file.parquet", "gcs://bucket/path/file.parquet", ]; - let mut ctx = SessionContext::new(); + let ctx = SessionContext::new(); let task_ctx = ctx.task_ctx(); let dialect = &task_ctx.session_config().options().sql_parser.dialect; let dialect = dialect_from_str(dialect).ok_or_else(|| { @@ -488,7 +488,7 @@ mod tests { let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?; for statement in statements { //Should not fail - let mut plan = create_plan(&mut ctx, statement).await?; + let mut plan = create_plan(&ctx, statement).await?; if let LogicalPlan::Copy(copy_to) = &mut plan { assert_eq!(copy_to.output_url, location); assert_eq!(copy_to.file_type.get_ext(), "parquet".to_string()); diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 6266ae6f561a..1810d3cef57c 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -175,7 +175,7 @@ async fn main_inner() -> Result<()> { let runtime_env = create_runtime_env(rt_config.clone())?; - let mut ctx = + let ctx = SessionContext::new_with_config_rt(session_config.clone(), Arc::new(runtime_env)); ctx.refresh_catalogs().await?; // install dynamic catalog provider that knows how to open files @@ -212,20 +212,20 @@ async fn main_inner() -> Result<()> { if commands.is_empty() && files.is_empty() { if !rc.is_empty() { - exec::exec_from_files(&mut ctx, rc, &print_options).await?; + exec::exec_from_files(&ctx, rc, &print_options).await?; } // TODO maybe we can have thiserror for cli but for now let's keep it simple - return exec::exec_from_repl(&mut ctx, &mut print_options) + return exec::exec_from_repl(&ctx, &mut print_options) .await .map_err(|e| DataFusionError::External(Box::new(e))); } if !files.is_empty() { - exec::exec_from_files(&mut ctx, files, &print_options).await?; + exec::exec_from_files(&ctx, files, &print_options).await?; } if !commands.is_empty() { - exec::exec_from_commands(&mut ctx, commands, &print_options).await?; + exec::exec_from_commands(&ctx, commands, &print_options).await?; } Ok(()) diff --git a/datafusion-examples/examples/catalog.rs b/datafusion-examples/examples/catalog.rs index f9ead592c7ea..f770056026ed 100644 --- a/datafusion-examples/examples/catalog.rs +++ b/datafusion-examples/examples/catalog.rs @@ -44,7 +44,7 @@ async fn main() -> Result<()> { let dir_a = prepare_example_data()?; let dir_b = prepare_example_data()?; - let mut ctx = SessionContext::new(); + let ctx = SessionContext::new(); let state = ctx.state(); let catlist = Arc::new(CustomCatalogProviderList::new()); diff --git a/datafusion-examples/examples/planner_api.rs b/datafusion-examples/examples/planner_api.rs index 92b58bcee197..35cf766ba1af 100644 --- a/datafusion-examples/examples/planner_api.rs +++ b/datafusion-examples/examples/planner_api.rs @@ -17,6 +17,7 @@ use datafusion::error::Result; use datafusion::physical_plan::displayable; +use datafusion::physical_planner::DefaultPhysicalPlanner; use datafusion::prelude::*; use datafusion_expr::{LogicalPlan, PlanType}; @@ -123,5 +124,20 @@ async fn to_physical_plan_step_by_step_demo( .plan ); + // Call the physical optimizer with an existing physical plan (in this + // case the plan is already optimized, but an unoptimized plan would + // typically be used in this context) + // Note that this is not part of the trait but a public method + // on DefaultPhysicalPlanner. Not all planners will provide this feature. + let planner = DefaultPhysicalPlanner::default(); + let physical_plan = + planner.optimize_physical_plan(physical_plan, &ctx.state(), |_, _| {})?; + println!( + "Optimized physical plan:\n\n{}\n\n", + displayable(physical_plan.as_ref()) + .to_stringified(false, PlanType::InitialPhysicalPlan) + .plan + ); + Ok(()) } diff --git a/datafusion/core/benches/filter_query_sql.rs b/datafusion/core/benches/filter_query_sql.rs index 01adc357b39a..0e09ae09d7c2 100644 --- a/datafusion/core/benches/filter_query_sql.rs +++ b/datafusion/core/benches/filter_query_sql.rs @@ -27,7 +27,7 @@ use futures::executor::block_on; use std::sync::Arc; use tokio::runtime::Runtime; -async fn query(ctx: &mut SessionContext, sql: &str) { +async fn query(ctx: &SessionContext, sql: &str) { let rt = Runtime::new().unwrap(); // execute the query @@ -70,25 +70,25 @@ fn criterion_benchmark(c: &mut Criterion) { let batch_size = 4096; // 2^12 c.bench_function("filter_array", |b| { - let mut ctx = create_context(array_len, batch_size).unwrap(); - b.iter(|| block_on(query(&mut ctx, "select f32, f64 from t where f32 >= f64"))) + let ctx = create_context(array_len, batch_size).unwrap(); + b.iter(|| block_on(query(&ctx, "select f32, f64 from t where f32 >= f64"))) }); c.bench_function("filter_scalar", |b| { - let mut ctx = create_context(array_len, batch_size).unwrap(); + let ctx = create_context(array_len, batch_size).unwrap(); b.iter(|| { block_on(query( - &mut ctx, + &ctx, "select f32, f64 from t where f32 >= 250 and f64 > 250", )) }) }); c.bench_function("filter_scalar in list", |b| { - let mut ctx = create_context(array_len, batch_size).unwrap(); + let ctx = create_context(array_len, batch_size).unwrap(); b.iter(|| { block_on(query( - &mut ctx, + &ctx, "select f32, f64 from t where f32 in (10, 20, 30, 40)", )) }) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index c4c5a4aa0834..cc1a63cc05f7 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1550,7 +1550,7 @@ impl DataFrame { /// # #[tokio::main] /// # async fn main() -> Result<()> { /// # use datafusion_common::ScalarValue; - /// let mut ctx = SessionContext::new(); + /// let ctx = SessionContext::new(); /// # ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?; /// let results = ctx /// .sql("SELECT a FROM example WHERE b = $1") @@ -2649,8 +2649,8 @@ mod tests { #[tokio::test] async fn registry() -> Result<()> { - let mut ctx = SessionContext::new(); - register_aggregate_csv(&mut ctx, "aggregate_test_100").await?; + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx, "aggregate_test_100").await?; // declare the udf let my_fn: ScalarFunctionImplementation = @@ -2783,8 +2783,8 @@ mod tests { /// Create a logical plan from a SQL query async fn create_plan(sql: &str) -> Result { - let mut ctx = SessionContext::new(); - register_aggregate_csv(&mut ctx, "aggregate_test_100").await?; + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx, "aggregate_test_100").await?; Ok(ctx.sql(sql).await?.into_unoptimized_plan()) } @@ -3147,9 +3147,9 @@ mod tests { "datafusion.sql_parser.enable_ident_normalization".to_owned(), "false".to_owned(), )]))?; - let mut ctx = SessionContext::new_with_config(config); + let ctx = SessionContext::new_with_config(config); let name = "aggregate_test_100"; - register_aggregate_csv(&mut ctx, name).await?; + register_aggregate_csv(&ctx, name).await?; let df = ctx.table(name); let df = df diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 1abb550f5c98..2a23f045f3b2 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -189,14 +189,14 @@ mod tests { async fn write_parquet_with_small_rg_size() -> Result<()> { // This test verifies writing a parquet file with small rg size // relative to datafusion.execution.batch_size does not panic - let mut ctx = SessionContext::new_with_config( - SessionConfig::from_string_hash_map(HashMap::from_iter( + let ctx = SessionContext::new_with_config(SessionConfig::from_string_hash_map( + HashMap::from_iter( [("datafusion.execution.batch_size", "10")] .iter() .map(|(s1, s2)| (s1.to_string(), s2.to_string())), - ))?, - ); - register_aggregate_csv(&mut ctx, "aggregate_test_100").await?; + ), + )?); + register_aggregate_csv(&ctx, "aggregate_test_100").await?; let test_df = ctx.table("aggregate_test_100").await?; let output_path = "file://local/test.parquet"; diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 29b593a70ca0..67af8ef12c8b 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -18,6 +18,7 @@ //! Helper functions for the table implementation use std::collections::HashMap; +use std::mem; use std::sync::Arc; use super::PartitionedFile; @@ -138,10 +139,22 @@ pub fn split_files( // effectively this is div with rounding up instead of truncating let chunk_size = (partitioned_files.len() + n - 1) / n; - partitioned_files - .chunks(chunk_size) - .map(|c| c.to_vec()) - .collect() + let mut chunks = Vec::with_capacity(n); + let mut current_chunk = Vec::with_capacity(chunk_size); + for file in partitioned_files.drain(..) { + current_chunk.push(file); + if current_chunk.len() == chunk_size { + let full_chunk = + mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size)); + chunks.push(full_chunk); + } + } + + if !current_chunk.is_empty() { + chunks.push(current_chunk) + } + + chunks } struct Partition { diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index 44f92760908d..21a60614cff2 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -82,6 +82,7 @@ pub struct PartitionedFile { /// An optional field for user defined per object metadata pub extensions: Option>, } + impl PartitionedFile { /// Create a simple file without metadata or partition pub fn new(path: impl Into, size: u64) -> Self { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 80f49e4eb8e6..bb86ac3ae416 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -973,15 +973,16 @@ impl ListingTable { // collect the statistics if required by the config let files = file_list .map(|part_file| async { - let mut part_file = part_file?; + let part_file = part_file?; if self.options.collect_stat { let statistics = self.do_collect_statistics(ctx, &store, &part_file).await?; - part_file.statistics = Some(statistics.clone()); - Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)> + Ok((part_file, statistics)) } else { - Ok((part_file, Statistics::new_unknown(&self.file_schema))) - as Result<(PartitionedFile, Statistics)> + Ok(( + part_file, + Arc::new(Statistics::new_unknown(&self.file_schema)), + )) } }) .boxed() @@ -1011,12 +1012,12 @@ impl ListingTable { ctx: &SessionState, store: &Arc, part_file: &PartitionedFile, - ) -> Result { - let statistics_cache = self.collected_statistics.clone(); - return match statistics_cache + ) -> Result> { + match self + .collected_statistics .get_with_extra(&part_file.object_meta.location, &part_file.object_meta) { - Some(statistics) => Ok(statistics.as_ref().clone()), + Some(statistics) => Ok(statistics.clone()), None => { let statistics = self .options @@ -1028,14 +1029,15 @@ impl ListingTable { &part_file.object_meta, ) .await?; - statistics_cache.put_with_extra( + let statistics = Arc::new(statistics); + self.collected_statistics.put_with_extra( &part_file.object_meta.location, - statistics.clone().into(), + statistics.clone(), &part_file.object_meta, ); Ok(statistics) } - }; + } } } diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index e720b4efff6f..a1ee6fbe1341 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -331,11 +331,9 @@ impl FileOpener for ArrowOpener { .into_iter() .zip(recordbatch_results) .filter_map(move |(block, data)| { - match decoder.read_record_batch(&block, &data.into()) { - Ok(Some(record_batch)) => Some(Ok(record_batch)), - Ok(None) => None, - Err(err) => Some(Err(err)), - } + decoder + .read_record_batch(&block, &data.into()) + .transpose() }), ) .boxed()) diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 8c789e461b08..9d031a6bbc85 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -15,6 +15,9 @@ // specific language governing permissions and limitations // under the License. +use std::mem; +use std::sync::Arc; + use super::listing::PartitionedFile; use crate::arrow::datatypes::{Schema, SchemaRef}; use crate::error::Result; @@ -26,8 +29,6 @@ use datafusion_common::stats::Precision; use datafusion_common::ScalarValue; use futures::{Stream, StreamExt}; -use itertools::izip; -use itertools::multiunzip; /// Get all files as well as the file level summary statistics (no statistic for partition columns). /// If the optional `limit` is provided, includes only sufficient files. Needed to read up to @@ -35,7 +36,7 @@ use itertools::multiunzip; /// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive /// call to `multiunzip` for constructing file level summary statistics. pub async fn get_statistics_with_limit( - all_files: impl Stream>, + all_files: impl Stream)>>, file_schema: SchemaRef, limit: Option, collect_stats: bool, @@ -48,9 +49,7 @@ pub async fn get_statistics_with_limit( // - zero for summations, and // - neutral element for extreme points. let size = file_schema.fields().len(); - let mut null_counts: Vec> = vec![Precision::Absent; size]; - let mut max_values: Vec> = vec![Precision::Absent; size]; - let mut min_values: Vec> = vec![Precision::Absent; size]; + let mut col_stats_set = vec![ColumnStatistics::default(); size]; let mut num_rows = Precision::::Absent; let mut total_byte_size = Precision::::Absent; @@ -58,16 +57,19 @@ pub async fn get_statistics_with_limit( let mut all_files = Box::pin(all_files.fuse()); if let Some(first_file) = all_files.next().await { - let (file, file_stats) = first_file?; + let (mut file, file_stats) = first_file?; + file.statistics = Some(file_stats.as_ref().clone()); result_files.push(file); // First file, we set them directly from the file statistics. - num_rows = file_stats.num_rows; - total_byte_size = file_stats.total_byte_size; - for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() { - null_counts[index] = file_column.null_count; - max_values[index] = file_column.max_value; - min_values[index] = file_column.min_value; + num_rows = file_stats.num_rows.clone(); + total_byte_size = file_stats.total_byte_size.clone(); + for (index, file_column) in + file_stats.column_statistics.clone().into_iter().enumerate() + { + col_stats_set[index].null_count = file_column.null_count; + col_stats_set[index].max_value = file_column.max_value; + col_stats_set[index].min_value = file_column.min_value; } // If the number of rows exceeds the limit, we can stop processing @@ -80,7 +82,8 @@ pub async fn get_statistics_with_limit( }; if conservative_num_rows <= limit.unwrap_or(usize::MAX) { while let Some(current) = all_files.next().await { - let (file, file_stats) = current?; + let (mut file, file_stats) = current?; + file.statistics = Some(file_stats.as_ref().clone()); result_files.push(file); if !collect_stats { continue; @@ -90,38 +93,28 @@ pub async fn get_statistics_with_limit( // counts across all the files in question. If any file does not // provide any information or provides an inexact value, we demote // the statistic precision to inexact. - num_rows = add_row_stats(file_stats.num_rows, num_rows); + num_rows = add_row_stats(file_stats.num_rows.clone(), num_rows); total_byte_size = - add_row_stats(file_stats.total_byte_size, total_byte_size); + add_row_stats(file_stats.total_byte_size.clone(), total_byte_size); - (null_counts, max_values, min_values) = multiunzip( - izip!( - file_stats.column_statistics.into_iter(), - null_counts.into_iter(), - max_values.into_iter(), - min_values.into_iter() - ) - .map( - |( - ColumnStatistics { - null_count: file_nc, - max_value: file_max, - min_value: file_min, - distinct_count: _, - }, - null_count, - max_value, - min_value, - )| { - ( - add_row_stats(file_nc, null_count), - set_max_if_greater(file_max, max_value), - set_min_if_lesser(file_min, min_value), - ) - }, - ), - ); + for (file_col_stats, col_stats) in file_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { + let ColumnStatistics { + null_count: file_nc, + max_value: file_max, + min_value: file_min, + distinct_count: _, + } = file_col_stats; + + col_stats.null_count = + add_row_stats(file_nc.clone(), col_stats.null_count.clone()); + set_max_if_greater(file_max, &mut col_stats.max_value); + set_min_if_lesser(file_min, &mut col_stats.min_value) + } // If the number of rows exceeds the limit, we can stop processing // files. This only applies when we know the number of rows. It also @@ -139,7 +132,7 @@ pub async fn get_statistics_with_limit( let mut statistics = Statistics { num_rows, total_byte_size, - column_statistics: get_col_stats_vec(null_counts, max_values, min_values), + column_statistics: col_stats_set, }; if all_files.next().await.is_some() { // If we still have files in the stream, it means that the limit kicked @@ -182,21 +175,6 @@ fn add_row_stats( } } -pub(crate) fn get_col_stats_vec( - null_counts: Vec>, - max_values: Vec>, - min_values: Vec>, -) -> Vec { - izip!(null_counts, max_values, min_values) - .map(|(null_count, max_value, min_value)| ColumnStatistics { - null_count, - max_value, - min_value, - distinct_count: Precision::Absent, - }) - .collect() -} - pub(crate) fn get_col_stats( schema: &Schema, null_counts: Vec>, @@ -238,45 +216,61 @@ fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType { /// If the given value is numerically greater than the original maximum value, /// return the new maximum value with appropriate exactness information. fn set_max_if_greater( - max_nominee: Precision, - max_values: Precision, -) -> Precision { - match (&max_values, &max_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee, + max_nominee: &Precision, + max_value: &mut Precision, +) { + match (&max_value, max_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => { + *max_value = max_nominee.clone(); + } (Precision::Exact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Exact(val2)) if val1 < val2 => { - max_nominee.to_inexact() + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_max = mem::take(max_value); + *max_value = exact_max.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *max_value = max_nominee.clone(); } - (Precision::Exact(_), Precision::Absent) => max_values.to_inexact(), - (Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(), - (Precision::Absent, Precision::Inexact(_)) => max_nominee, - (Precision::Absent, Precision::Absent) => Precision::Absent, - _ => max_values, + _ => {} } } /// If the given value is numerically lesser than the original minimum value, /// return the new minimum value with appropriate exactness information. fn set_min_if_lesser( - min_nominee: Precision, - min_values: Precision, -) -> Precision { - match (&min_values, &min_nominee) { - (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee, + min_nominee: &Precision, + min_value: &mut Precision, +) { + match (&min_value, min_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => { + *min_value = min_nominee.clone(); + } (Precision::Exact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Inexact(val2)) | (Precision::Inexact(val1), Precision::Exact(val2)) if val1 > val2 => { - min_nominee.to_inexact() + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_min = mem::take(min_value); + *min_value = exact_min.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *min_value = min_nominee.clone(); } - (Precision::Exact(_), Precision::Absent) => min_values.to_inexact(), - (Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(), - (Precision::Absent, Precision::Inexact(_)) => min_nominee, - (Precision::Absent, Precision::Absent) => Precision::Absent, - _ => min_values, + _ => {} } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index e6bb1483e256..c63ffddd81b3 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -179,7 +179,7 @@ where /// # use datafusion::{error::Result, assert_batches_eq}; /// # #[tokio::main] /// # async fn main() -> Result<()> { -/// let mut ctx = SessionContext::new(); +/// let ctx = SessionContext::new(); /// ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?; /// let results = ctx /// .sql("SELECT a, min(b) FROM example GROUP BY a LIMIT 100") @@ -369,7 +369,7 @@ impl SessionContext { /// # use datafusion_execution::object_store::ObjectStoreUrl; /// let object_store_url = ObjectStoreUrl::parse("file://").unwrap(); /// let object_store = object_store::local::LocalFileSystem::new(); - /// let mut ctx = SessionContext::new(); + /// let ctx = SessionContext::new(); /// // All files with the file:// url prefix will be read from the local file system /// ctx.register_object_store(object_store_url.as_ref(), Arc::new(object_store)); /// ``` @@ -452,7 +452,7 @@ impl SessionContext { /// # use datafusion::{error::Result, assert_batches_eq}; /// # #[tokio::main] /// # async fn main() -> Result<()> { - /// let mut ctx = SessionContext::new(); + /// let ctx = SessionContext::new(); /// ctx /// .sql("CREATE TABLE foo (x INTEGER)") /// .await? @@ -480,7 +480,7 @@ impl SessionContext { /// # use datafusion::physical_plan::collect; /// # #[tokio::main] /// # async fn main() -> Result<()> { - /// let mut ctx = SessionContext::new(); + /// let ctx = SessionContext::new(); /// let options = SQLOptions::new() /// .with_allow_ddl(false); /// let err = ctx.sql_with_options("CREATE TABLE foo (x INTEGER)", options) @@ -544,30 +544,35 @@ impl SessionContext { // stack overflows. match ddl { DdlStatement::CreateExternalTable(cmd) => { - Box::pin(async move { self.create_external_table(&cmd).await }) - as std::pin::Pin + Send>> + (Box::pin(async move { self.create_external_table(&cmd).await }) + as std::pin::Pin + Send>>) + .await } DdlStatement::CreateMemoryTable(cmd) => { - Box::pin(self.create_memory_table(cmd)) + Box::pin(self.create_memory_table(cmd)).await + } + DdlStatement::CreateView(cmd) => { + Box::pin(self.create_view(cmd)).await } - DdlStatement::CreateView(cmd) => Box::pin(self.create_view(cmd)), DdlStatement::CreateCatalogSchema(cmd) => { - Box::pin(self.create_catalog_schema(cmd)) + Box::pin(self.create_catalog_schema(cmd)).await } DdlStatement::CreateCatalog(cmd) => { - Box::pin(self.create_catalog(cmd)) + Box::pin(self.create_catalog(cmd)).await } - DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)), - DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)), + DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)).await, + DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)).await, DdlStatement::DropCatalogSchema(cmd) => { - Box::pin(self.drop_schema(cmd)) + Box::pin(self.drop_schema(cmd)).await } DdlStatement::CreateFunction(cmd) => { - Box::pin(self.create_function(cmd)) + Box::pin(self.create_function(cmd)).await } - DdlStatement::DropFunction(cmd) => Box::pin(self.drop_function(cmd)), + DdlStatement::DropFunction(cmd) => { + Box::pin(self.drop_function(cmd)).await + } + ddl => Ok(DataFrame::new(self.state(), LogicalPlan::Ddl(ddl))), } - .await } // TODO what about the other statements (like TransactionStart and TransactionEnd) LogicalPlan::Statement(Statement::SetVariable(stmt)) => { @@ -1352,7 +1357,7 @@ impl SessionContext { } /// Register [`CatalogProviderList`] in [`SessionState`] - pub fn register_catalog_list(&mut self, catalog_list: Arc) { + pub fn register_catalog_list(&self, catalog_list: Arc) { self.state.write().register_catalog_list(catalog_list) } @@ -1381,15 +1386,18 @@ impl FunctionRegistry for SessionContext { fn udwf(&self, name: &str) -> Result> { self.state.read().udwf(name) } + fn register_udf(&mut self, udf: Arc) -> Result>> { self.state.write().register_udf(udf) } + fn register_udaf( &mut self, udaf: Arc, ) -> Result>> { self.state.write().register_udaf(udaf) } + fn register_udwf(&mut self, udwf: Arc) -> Result>> { self.state.write().register_udwf(udwf) } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index ccad0240fddb..0a057d6f1417 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -249,7 +249,7 @@ impl Session for SessionState { impl SessionState { /// Returns new [`SessionState`] using the provided /// [`SessionConfig`] and [`RuntimeEnv`]. - #[deprecated(since = "40.0.0", note = "Use SessionStateBuilder")] + #[deprecated(since = "41.0.0", note = "Use SessionStateBuilder")] pub fn new_with_config_rt(config: SessionConfig, runtime: Arc) -> Self { SessionStateBuilder::new() .with_config(config) diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs index 6f3274820c8c..843efcc7b0d2 100644 --- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs +++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs @@ -280,7 +280,7 @@ mod tests { ) -> Arc { AggregateExprBuilder::new(count_udaf(), vec![expr]) .schema(Arc::new(schema.clone())) - .name(name) + .alias(name) .build() .unwrap() } @@ -364,7 +364,7 @@ mod tests { vec![ AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) - .name("Sum(b)") + .alias("Sum(b)") .build() .unwrap(), ]; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 378a892111c5..58b02c08e34c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -19,7 +19,6 @@ use std::borrow::Cow; use std::collections::HashMap; -use std::fmt::Write; use std::sync::Arc; use crate::datasource::file_format::file_type_to_format; @@ -74,11 +73,9 @@ use datafusion_common::{ }; use datafusion_expr::dml::CopyTo; use datafusion_expr::expr::{ - self, AggregateFunction, Alias, Between, BinaryExpr, Cast, GroupingSet, InList, Like, - TryCast, WindowFunction, + self, physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction, }; use datafusion_expr::expr_rewriter::unnormalize_cols; -use datafusion_expr::expr_vec_fmt; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; use datafusion_expr::{ DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan, @@ -97,265 +94,6 @@ use log::{debug, trace}; use sqlparser::ast::NullTreatment; use tokio::sync::Mutex; -fn create_function_physical_name( - fun: &str, - distinct: bool, - args: &[Expr], - order_by: Option<&Vec>, -) -> Result { - let names: Vec = args - .iter() - .map(|e| create_physical_name(e, false)) - .collect::>()?; - - let distinct_str = match distinct { - true => "DISTINCT ", - false => "", - }; - - let phys_name = format!("{}({}{})", fun, distinct_str, names.join(",")); - - Ok(order_by - .map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by))) - .unwrap_or(phys_name)) -} - -fn physical_name(e: &Expr) -> Result { - create_physical_name(e, true) -} - -fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { - match e { - Expr::Unnest(_) => { - internal_err!( - "Expr::Unnest should have been converted to LogicalPlan::Unnest" - ) - } - Expr::Column(c) => { - if is_first_expr { - Ok(c.name.clone()) - } else { - Ok(c.flat_name()) - } - } - Expr::Alias(Alias { name, .. }) => Ok(name.clone()), - Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")), - Expr::Literal(value) => Ok(format!("{value:?}")), - Expr::BinaryExpr(BinaryExpr { left, op, right }) => { - let left = create_physical_name(left, false)?; - let right = create_physical_name(right, false)?; - Ok(format!("{left} {op} {right}")) - } - Expr::Case(case) => { - let mut name = "CASE ".to_string(); - if let Some(e) = &case.expr { - let _ = write!(name, "{} ", create_physical_name(e, false)?); - } - for (w, t) in &case.when_then_expr { - let _ = write!( - name, - "WHEN {} THEN {} ", - create_physical_name(w, false)?, - create_physical_name(t, false)? - ); - } - if let Some(e) = &case.else_expr { - let _ = write!(name, "ELSE {} ", create_physical_name(e, false)?); - } - name += "END"; - Ok(name) - } - Expr::Cast(Cast { expr, .. }) => { - // CAST does not change the expression name - create_physical_name(expr, false) - } - Expr::TryCast(TryCast { expr, .. }) => { - // CAST does not change the expression name - create_physical_name(expr, false) - } - Expr::Not(expr) => { - let expr = create_physical_name(expr, false)?; - Ok(format!("NOT {expr}")) - } - Expr::Negative(expr) => { - let expr = create_physical_name(expr, false)?; - Ok(format!("(- {expr})")) - } - Expr::IsNull(expr) => { - let expr = create_physical_name(expr, false)?; - Ok(format!("{expr} IS NULL")) - } - Expr::IsNotNull(expr) => { - let expr = create_physical_name(expr, false)?; - Ok(format!("{expr} IS NOT NULL")) - } - Expr::IsTrue(expr) => { - let expr = create_physical_name(expr, false)?; - Ok(format!("{expr} IS TRUE")) - } - Expr::IsFalse(expr) => { - let expr = create_physical_name(expr, false)?; - Ok(format!("{expr} IS FALSE")) - } - Expr::IsUnknown(expr) => { - let expr = create_physical_name(expr, false)?; - Ok(format!("{expr} IS UNKNOWN")) - } - Expr::IsNotTrue(expr) => { - let expr = create_physical_name(expr, false)?; - Ok(format!("{expr} IS NOT TRUE")) - } - Expr::IsNotFalse(expr) => { - let expr = create_physical_name(expr, false)?; - Ok(format!("{expr} IS NOT FALSE")) - } - Expr::IsNotUnknown(expr) => { - let expr = create_physical_name(expr, false)?; - Ok(format!("{expr} IS NOT UNKNOWN")) - } - Expr::ScalarFunction(fun) => fun.func.display_name(&fun.args), - Expr::WindowFunction(WindowFunction { - fun, - args, - order_by, - .. - }) => { - create_function_physical_name(&fun.to_string(), false, args, Some(order_by)) - } - Expr::AggregateFunction(AggregateFunction { - func, - distinct, - args, - filter: _, - order_by, - null_treatment: _, - }) => { - create_function_physical_name(func.name(), *distinct, args, order_by.as_ref()) - } - Expr::GroupingSet(grouping_set) => match grouping_set { - GroupingSet::Rollup(exprs) => Ok(format!( - "ROLLUP ({})", - exprs - .iter() - .map(|e| create_physical_name(e, false)) - .collect::>>()? - .join(", ") - )), - GroupingSet::Cube(exprs) => Ok(format!( - "CUBE ({})", - exprs - .iter() - .map(|e| create_physical_name(e, false)) - .collect::>>()? - .join(", ") - )), - GroupingSet::GroupingSets(lists_of_exprs) => { - let mut strings = vec![]; - for exprs in lists_of_exprs { - let exprs_str = exprs - .iter() - .map(|e| create_physical_name(e, false)) - .collect::>>()? - .join(", "); - strings.push(format!("({exprs_str})")); - } - Ok(format!("GROUPING SETS ({})", strings.join(", "))) - } - }, - - Expr::InList(InList { - expr, - list, - negated, - }) => { - let expr = create_physical_name(expr, false)?; - let list = list.iter().map(|expr| create_physical_name(expr, false)); - if *negated { - Ok(format!("{expr} NOT IN ({list:?})")) - } else { - Ok(format!("{expr} IN ({list:?})")) - } - } - Expr::Exists { .. } => { - not_impl_err!("EXISTS is not yet supported in the physical plan") - } - Expr::InSubquery(_) => { - not_impl_err!("IN subquery is not yet supported in the physical plan") - } - Expr::ScalarSubquery(_) => { - not_impl_err!("Scalar subqueries are not yet supported in the physical plan") - } - Expr::Between(Between { - expr, - negated, - low, - high, - }) => { - let expr = create_physical_name(expr, false)?; - let low = create_physical_name(low, false)?; - let high = create_physical_name(high, false)?; - if *negated { - Ok(format!("{expr} NOT BETWEEN {low} AND {high}")) - } else { - Ok(format!("{expr} BETWEEN {low} AND {high}")) - } - } - Expr::Like(Like { - negated, - expr, - pattern, - escape_char, - case_insensitive, - }) => { - let expr = create_physical_name(expr, false)?; - let pattern = create_physical_name(pattern, false)?; - let op_name = if *case_insensitive { "ILIKE" } else { "LIKE" }; - let escape = if let Some(char) = escape_char { - format!("CHAR '{char}'") - } else { - "".to_string() - }; - if *negated { - Ok(format!("{expr} NOT {op_name} {pattern}{escape}")) - } else { - Ok(format!("{expr} {op_name} {pattern}{escape}")) - } - } - Expr::SimilarTo(Like { - negated, - expr, - pattern, - escape_char, - case_insensitive: _, - }) => { - let expr = create_physical_name(expr, false)?; - let pattern = create_physical_name(pattern, false)?; - let escape = if let Some(char) = escape_char { - format!("CHAR '{char}'") - } else { - "".to_string() - }; - if *negated { - Ok(format!("{expr} NOT SIMILAR TO {pattern}{escape}")) - } else { - Ok(format!("{expr} SIMILAR TO {pattern}{escape}")) - } - } - Expr::Sort { .. } => { - internal_err!("Create physical name does not support sort expression") - } - Expr::Wildcard { .. } => { - internal_err!("Create physical name does not support wildcard") - } - Expr::Placeholder(_) => { - internal_err!("Create physical name does not support placeholder") - } - Expr::OuterReferenceColumn(_, _) => { - internal_err!("Create physical name does not support OuterReferenceColumn") - } - } -} - /// Physical query planner that converts a `LogicalPlan` to an /// `ExecutionPlan` suitable for execution. #[async_trait] @@ -442,7 +180,7 @@ impl PhysicalPlanner for DefaultPhysicalPlanner { .create_initial_plan(logical_plan, session_state) .await?; - self.optimize_internal(plan, session_state, |_, _| {}) + self.optimize_physical_plan(plan, session_state, |_, _| {}) } } } @@ -1807,7 +1545,7 @@ type AggregateExprWithOptionalArgs = ( /// Create an aggregate expression with a name from a logical expression pub fn create_aggregate_expr_with_name_and_maybe_filter( e: &Expr, - name: impl Into, + name: Option, logical_input_schema: &DFSchema, _physical_input_schema: &Schema, execution_props: &ExecutionProps, @@ -1881,9 +1619,9 @@ pub fn create_aggregate_expr_and_maybe_filter( ) -> Result { // unpack (nested) aliased logical expressions, e.g. "sum(col) as total" let (name, e) = match e { - Expr::Alias(Alias { expr, name, .. }) => (name.clone(), expr.as_ref()), - Expr::AggregateFunction(_) => (e.display_name().unwrap_or(physical_name(e)?), e), - _ => (physical_name(e)?, e), + Expr::Alias(Alias { expr, name, .. }) => (Some(name.clone()), expr.as_ref()), + Expr::AggregateFunction(_) => (e.display_name().ok(), e), + _ => (None, e), }; create_aggregate_expr_with_name_and_maybe_filter( @@ -1994,7 +1732,7 @@ impl DefaultPhysicalPlanner { } } - let optimized_plan = self.optimize_internal( + let optimized_plan = self.optimize_physical_plan( input, session_state, |plan, optimizer| { @@ -2078,7 +1816,7 @@ impl DefaultPhysicalPlanner { /// Optimize a physical plan by applying each physical optimizer, /// calling observer(plan, optimizer after each one) - fn optimize_internal( + pub fn optimize_physical_plan( &self, plan: Arc, session_state: &SessionState, diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs index 6eb82dece31c..937344ef5e4e 100644 --- a/datafusion/core/src/test_util/mod.rs +++ b/datafusion/core/src/test_util/mod.rs @@ -112,7 +112,7 @@ pub fn aggr_test_schema() -> SchemaRef { /// Register session context for the aggregate_test_100.csv file pub async fn register_aggregate_csv( - ctx: &mut SessionContext, + ctx: &SessionContext, table_name: &str, ) -> Result<()> { let schema = aggr_test_schema(); @@ -128,8 +128,8 @@ pub async fn register_aggregate_csv( /// Create a table from the aggregate_test_100.csv file with the specified name pub async fn test_table_with_name(name: &str) -> Result { - let mut ctx = SessionContext::new(); - register_aggregate_csv(&mut ctx, name).await?; + let ctx = SessionContext::new(); + register_aggregate_csv(&ctx, name).await?; ctx.table(name).await } @@ -432,7 +432,7 @@ impl TestAggregate { pub fn count_expr(&self, schema: &Schema) -> Arc { AggregateExprBuilder::new(count_udaf(), vec![self.column()]) .schema(Arc::new(schema.clone())) - .name(self.column_name()) + .alias(self.column_name()) .build() .unwrap() } diff --git a/datafusion/core/tests/core_integration.rs b/datafusion/core/tests/core_integration.rs index deb5280388f1..79e5056e3cf5 100644 --- a/datafusion/core/tests/core_integration.rs +++ b/datafusion/core/tests/core_integration.rs @@ -36,6 +36,12 @@ mod memory_limit; /// Run all tests that are found in the `custom_sources_cases` directory mod custom_sources_cases; +/// Run all tests that are found in the `optimizer` directory +mod optimizer; + +/// Run all tests that are found in the `physical_optimizer` directory +mod physical_optimizer; + #[cfg(test)] #[ctor::ctor] fn init() { diff --git a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs index 69fa1af1bc5d..e42203725272 100644 --- a/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs +++ b/datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs @@ -107,7 +107,7 @@ async fn run_aggregate_test(input1: Vec, group_by_columns: Vec<&str vec![ AggregateExprBuilder::new(sum_udaf(), vec![col("d", &schema).unwrap()]) .schema(Arc::clone(&schema)) - .name("sum1") + .alias("sum1") .build() .unwrap(), ]; diff --git a/datafusion/core/tests/optimizer_integration.rs b/datafusion/core/tests/optimizer/mod.rs similarity index 100% rename from datafusion/core/tests/optimizer_integration.rs rename to datafusion/core/tests/optimizer/mod.rs diff --git a/datafusion/core/tests/physical_optimizer_integration.rs b/datafusion/core/tests/physical_optimizer/aggregate_statistics.rs similarity index 100% rename from datafusion/core/tests/physical_optimizer_integration.rs rename to datafusion/core/tests/physical_optimizer/aggregate_statistics.rs diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs new file mode 100644 index 000000000000..0ee89a3d213c --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod aggregate_statistics; diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 995ce35c5bc2..dc9d04786021 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -60,6 +60,7 @@ pub mod aggregates; pub mod create_drop; pub mod explain_analyze; pub mod joins; +mod path_partition; pub mod select; mod sql_api; diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs similarity index 100% rename from datafusion/core/tests/path_partition.rs rename to datafusion/core/tests/sql/path_partition.rs diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index 47804b927e64..1aa33fc75e5d 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -103,7 +103,7 @@ use datafusion_optimizer::AnalyzerRule; /// Execute the specified sql and return the resulting record batches /// pretty printed as a String. -async fn exec_sql(ctx: &mut SessionContext, sql: &str) -> Result { +async fn exec_sql(ctx: &SessionContext, sql: &str) -> Result { let df = ctx.sql(sql).await?; let batches = df.collect().await?; pretty_format_batches(&batches) @@ -112,25 +112,25 @@ async fn exec_sql(ctx: &mut SessionContext, sql: &str) -> Result { } /// Create a test table. -async fn setup_table(mut ctx: SessionContext) -> Result { +async fn setup_table(ctx: SessionContext) -> Result { let sql = "CREATE EXTERNAL TABLE sales(customer_id VARCHAR, revenue BIGINT) STORED AS CSV location 'tests/data/customer.csv'"; let expected = vec!["++", "++"]; - let s = exec_sql(&mut ctx, sql).await?; + let s = exec_sql(&ctx, sql).await?; let actual = s.lines().collect::>(); assert_eq!(expected, actual, "Creating table"); Ok(ctx) } -async fn setup_table_without_schemas(mut ctx: SessionContext) -> Result { +async fn setup_table_without_schemas(ctx: SessionContext) -> Result { let sql = "CREATE EXTERNAL TABLE sales STORED AS CSV location 'tests/data/customer.csv'"; let expected = vec!["++", "++"]; - let s = exec_sql(&mut ctx, sql).await?; + let s = exec_sql(&ctx, sql).await?; let actual = s.lines().collect::>(); assert_eq!(expected, actual, "Creating table"); @@ -146,7 +146,7 @@ const QUERY2: &str = "SELECT 42, arrow_typeof(42)"; // Run the query using the specified execution context and compare it // to the known result -async fn run_and_compare_query(mut ctx: SessionContext, description: &str) -> Result<()> { +async fn run_and_compare_query(ctx: SessionContext, description: &str) -> Result<()> { let expected = vec![ "+-------------+---------+", "| customer_id | revenue |", @@ -157,7 +157,7 @@ async fn run_and_compare_query(mut ctx: SessionContext, description: &str) -> Re "+-------------+---------+", ]; - let s = exec_sql(&mut ctx, QUERY).await?; + let s = exec_sql(&ctx, QUERY).await?; let actual = s.lines().collect::>(); assert_eq!( @@ -174,7 +174,7 @@ async fn run_and_compare_query(mut ctx: SessionContext, description: &str) -> Re // Run the query using the specified execution context and compare it // to the known result async fn run_and_compare_query_with_analyzer_rule( - mut ctx: SessionContext, + ctx: SessionContext, description: &str, ) -> Result<()> { let expected = vec![ @@ -185,7 +185,7 @@ async fn run_and_compare_query_with_analyzer_rule( "+------------+--------------------------+", ]; - let s = exec_sql(&mut ctx, QUERY2).await?; + let s = exec_sql(&ctx, QUERY2).await?; let actual = s.lines().collect::>(); assert_eq!( @@ -202,7 +202,7 @@ async fn run_and_compare_query_with_analyzer_rule( // Run the query using the specified execution context and compare it // to the known result async fn run_and_compare_query_with_auto_schemas( - mut ctx: SessionContext, + ctx: SessionContext, description: &str, ) -> Result<()> { let expected = vec![ @@ -215,7 +215,7 @@ async fn run_and_compare_query_with_auto_schemas( "+----------+----------+", ]; - let s = exec_sql(&mut ctx, QUERY1).await?; + let s = exec_sql(&ctx, QUERY1).await?; let actual = s.lines().collect::>(); assert_eq!( @@ -262,13 +262,13 @@ async fn topk_query() -> Result<()> { #[tokio::test] // Run EXPLAIN PLAN and show the plan was in fact rewritten async fn topk_plan() -> Result<()> { - let mut ctx = setup_table(make_topk_context()).await?; + let ctx = setup_table(make_topk_context()).await?; let mut expected = ["| logical_plan after topk | TopK: k=3 |", "| | TableScan: sales projection=[customer_id,revenue] |"].join("\n"); let explain_query = format!("EXPLAIN VERBOSE {QUERY}"); - let actual_output = exec_sql(&mut ctx, &explain_query).await?; + let actual_output = exec_sql(&ctx, &explain_query).await?; // normalize newlines (output on windows uses \r\n) let mut actual_output = actual_output.replace("\r\n", "\n"); diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 1a51c181f49f..edf45a244e1f 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -38,7 +38,8 @@ use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, }; use datafusion_common::{ - internal_err, plan_err, Column, DFSchema, Result, ScalarValue, TableReference, + internal_err, not_impl_err, plan_err, Column, DFSchema, Result, ScalarValue, + TableReference, }; use sqlparser::ast::NullTreatment; @@ -2277,6 +2278,265 @@ fn write_names_join(w: &mut W, exprs: &[Expr], sep: &str) -> Result<() Ok(()) } +pub fn create_function_physical_name( + fun: &str, + distinct: bool, + args: &[Expr], + order_by: Option<&Vec>, +) -> Result { + let names: Vec = args + .iter() + .map(|e| create_physical_name(e, false)) + .collect::>()?; + + let distinct_str = match distinct { + true => "DISTINCT ", + false => "", + }; + + let phys_name = format!("{}({}{})", fun, distinct_str, names.join(",")); + + Ok(order_by + .map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by))) + .unwrap_or(phys_name)) +} + +pub fn physical_name(e: &Expr) -> Result { + create_physical_name(e, true) +} + +fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { + match e { + Expr::Unnest(_) => { + internal_err!( + "Expr::Unnest should have been converted to LogicalPlan::Unnest" + ) + } + Expr::Column(c) => { + if is_first_expr { + Ok(c.name.clone()) + } else { + Ok(c.flat_name()) + } + } + Expr::Alias(Alias { name, .. }) => Ok(name.clone()), + Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")), + Expr::Literal(value) => Ok(format!("{value:?}")), + Expr::BinaryExpr(BinaryExpr { left, op, right }) => { + let left = create_physical_name(left, false)?; + let right = create_physical_name(right, false)?; + Ok(format!("{left} {op} {right}")) + } + Expr::Case(case) => { + let mut name = "CASE ".to_string(); + if let Some(e) = &case.expr { + let _ = write!(name, "{} ", create_physical_name(e, false)?); + } + for (w, t) in &case.when_then_expr { + let _ = write!( + name, + "WHEN {} THEN {} ", + create_physical_name(w, false)?, + create_physical_name(t, false)? + ); + } + if let Some(e) = &case.else_expr { + let _ = write!(name, "ELSE {} ", create_physical_name(e, false)?); + } + name += "END"; + Ok(name) + } + Expr::Cast(Cast { expr, .. }) => { + // CAST does not change the expression name + create_physical_name(expr, false) + } + Expr::TryCast(TryCast { expr, .. }) => { + // CAST does not change the expression name + create_physical_name(expr, false) + } + Expr::Not(expr) => { + let expr = create_physical_name(expr, false)?; + Ok(format!("NOT {expr}")) + } + Expr::Negative(expr) => { + let expr = create_physical_name(expr, false)?; + Ok(format!("(- {expr})")) + } + Expr::IsNull(expr) => { + let expr = create_physical_name(expr, false)?; + Ok(format!("{expr} IS NULL")) + } + Expr::IsNotNull(expr) => { + let expr = create_physical_name(expr, false)?; + Ok(format!("{expr} IS NOT NULL")) + } + Expr::IsTrue(expr) => { + let expr = create_physical_name(expr, false)?; + Ok(format!("{expr} IS TRUE")) + } + Expr::IsFalse(expr) => { + let expr = create_physical_name(expr, false)?; + Ok(format!("{expr} IS FALSE")) + } + Expr::IsUnknown(expr) => { + let expr = create_physical_name(expr, false)?; + Ok(format!("{expr} IS UNKNOWN")) + } + Expr::IsNotTrue(expr) => { + let expr = create_physical_name(expr, false)?; + Ok(format!("{expr} IS NOT TRUE")) + } + Expr::IsNotFalse(expr) => { + let expr = create_physical_name(expr, false)?; + Ok(format!("{expr} IS NOT FALSE")) + } + Expr::IsNotUnknown(expr) => { + let expr = create_physical_name(expr, false)?; + Ok(format!("{expr} IS NOT UNKNOWN")) + } + Expr::ScalarFunction(fun) => fun.func.display_name(&fun.args), + Expr::WindowFunction(WindowFunction { + fun, + args, + order_by, + .. + }) => { + create_function_physical_name(&fun.to_string(), false, args, Some(order_by)) + } + Expr::AggregateFunction(AggregateFunction { + func, + distinct, + args, + filter: _, + order_by, + null_treatment: _, + }) => { + create_function_physical_name(func.name(), *distinct, args, order_by.as_ref()) + } + Expr::GroupingSet(grouping_set) => match grouping_set { + GroupingSet::Rollup(exprs) => Ok(format!( + "ROLLUP ({})", + exprs + .iter() + .map(|e| create_physical_name(e, false)) + .collect::>>()? + .join(", ") + )), + GroupingSet::Cube(exprs) => Ok(format!( + "CUBE ({})", + exprs + .iter() + .map(|e| create_physical_name(e, false)) + .collect::>>()? + .join(", ") + )), + GroupingSet::GroupingSets(lists_of_exprs) => { + let mut strings = vec![]; + for exprs in lists_of_exprs { + let exprs_str = exprs + .iter() + .map(|e| create_physical_name(e, false)) + .collect::>>()? + .join(", "); + strings.push(format!("({exprs_str})")); + } + Ok(format!("GROUPING SETS ({})", strings.join(", "))) + } + }, + + Expr::InList(InList { + expr, + list, + negated, + }) => { + let expr = create_physical_name(expr, false)?; + let list = list.iter().map(|expr| create_physical_name(expr, false)); + if *negated { + Ok(format!("{expr} NOT IN ({list:?})")) + } else { + Ok(format!("{expr} IN ({list:?})")) + } + } + Expr::Exists { .. } => { + not_impl_err!("EXISTS is not yet supported in the physical plan") + } + Expr::InSubquery(_) => { + not_impl_err!("IN subquery is not yet supported in the physical plan") + } + Expr::ScalarSubquery(_) => { + not_impl_err!("Scalar subqueries are not yet supported in the physical plan") + } + Expr::Between(Between { + expr, + negated, + low, + high, + }) => { + let expr = create_physical_name(expr, false)?; + let low = create_physical_name(low, false)?; + let high = create_physical_name(high, false)?; + if *negated { + Ok(format!("{expr} NOT BETWEEN {low} AND {high}")) + } else { + Ok(format!("{expr} BETWEEN {low} AND {high}")) + } + } + Expr::Like(Like { + negated, + expr, + pattern, + escape_char, + case_insensitive, + }) => { + let expr = create_physical_name(expr, false)?; + let pattern = create_physical_name(pattern, false)?; + let op_name = if *case_insensitive { "ILIKE" } else { "LIKE" }; + let escape = if let Some(char) = escape_char { + format!("CHAR '{char}'") + } else { + "".to_string() + }; + if *negated { + Ok(format!("{expr} NOT {op_name} {pattern}{escape}")) + } else { + Ok(format!("{expr} {op_name} {pattern}{escape}")) + } + } + Expr::SimilarTo(Like { + negated, + expr, + pattern, + escape_char, + case_insensitive: _, + }) => { + let expr = create_physical_name(expr, false)?; + let pattern = create_physical_name(pattern, false)?; + let escape = if let Some(char) = escape_char { + format!("CHAR '{char}'") + } else { + "".to_string() + }; + if *negated { + Ok(format!("{expr} NOT SIMILAR TO {pattern}{escape}")) + } else { + Ok(format!("{expr} SIMILAR TO {pattern}{escape}")) + } + } + Expr::Sort { .. } => { + internal_err!("Create physical name does not support sort expression") + } + Expr::Wildcard { .. } => { + internal_err!("Create physical name does not support wildcard") + } + Expr::Placeholder(_) => { + internal_err!("Create physical name does not support placeholder") + } + Expr::OuterReferenceColumn(_, _) => { + internal_err!("Create physical name does not support OuterReferenceColumn") + } + } +} + #[cfg(test)] mod test { use crate::expr_fn::col; diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 676903d59a07..9faeb8aed506 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -148,6 +148,7 @@ impl ExprSchemable for Expr { .iter() .map(|e| e.get_type(schema)) .collect::>>()?; + // verify that function is invoked with correct number and type of arguments as defined in `TypeSignature` data_types_with_scalar_udf(&arg_data_types, func).map_err(|err| { plan_datafusion_err!( diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index 255bf4699b7f..ad0fcd2d4771 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -41,6 +41,8 @@ pub enum DdlStatement { CreateCatalogSchema(CreateCatalogSchema), /// Creates a new catalog (aka "Database"). CreateCatalog(CreateCatalog), + /// Creates a new index. + CreateIndex(CreateIndex), /// Drops a table. DropTable(DropTable), /// Drops a view. @@ -66,6 +68,7 @@ impl DdlStatement { schema } DdlStatement::CreateCatalog(CreateCatalog { schema, .. }) => schema, + DdlStatement::CreateIndex(CreateIndex { schema, .. }) => schema, DdlStatement::DropTable(DropTable { schema, .. }) => schema, DdlStatement::DropView(DropView { schema, .. }) => schema, DdlStatement::DropCatalogSchema(DropCatalogSchema { schema, .. }) => schema, @@ -83,6 +86,7 @@ impl DdlStatement { DdlStatement::CreateView(_) => "CreateView", DdlStatement::CreateCatalogSchema(_) => "CreateCatalogSchema", DdlStatement::CreateCatalog(_) => "CreateCatalog", + DdlStatement::CreateIndex(_) => "CreateIndex", DdlStatement::DropTable(_) => "DropTable", DdlStatement::DropView(_) => "DropView", DdlStatement::DropCatalogSchema(_) => "DropCatalogSchema", @@ -101,6 +105,7 @@ impl DdlStatement { vec![input] } DdlStatement::CreateView(CreateView { input, .. }) => vec![input], + DdlStatement::CreateIndex(_) => vec![], DdlStatement::DropTable(_) => vec![], DdlStatement::DropView(_) => vec![], DdlStatement::DropCatalogSchema(_) => vec![], @@ -147,6 +152,9 @@ impl DdlStatement { }) => { write!(f, "CreateCatalog: {catalog_name:?}") } + DdlStatement::CreateIndex(CreateIndex { name, .. }) => { + write!(f, "CreateIndex: {name:?}") + } DdlStatement::DropTable(DropTable { name, if_exists, .. }) => { @@ -351,3 +359,14 @@ pub struct DropFunction { pub if_exists: bool, pub schema: DFSchemaRef, } + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub struct CreateIndex { + pub name: Option, + pub table: TableReference, + pub using: Option, + pub columns: Vec, + pub unique: bool, + pub if_not_exists: bool, + pub schema: DFSchemaRef, +} diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 8928f70cd5d2..b58208591920 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -30,8 +30,8 @@ pub use builder::{ }; pub use ddl::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, - CreateFunctionBody, CreateMemoryTable, CreateView, DdlStatement, DropCatalogSchema, - DropFunction, DropTable, DropView, OperateFunctionArg, + CreateFunctionBody, CreateIndex, CreateMemoryTable, CreateView, DdlStatement, + DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg, }; pub use dml::{DmlStatement, WriteOp}; pub use plan::{ diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index a47906f20322..dbe43128fd38 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -303,6 +303,7 @@ impl TreeNode for LogicalPlan { DdlStatement::CreateExternalTable(_) | DdlStatement::CreateCatalogSchema(_) | DdlStatement::CreateCatalog(_) + | DdlStatement::CreateIndex(_) | DdlStatement::DropTable(_) | DdlStatement::DropView(_) | DdlStatement::DropCatalogSchema(_) diff --git a/datafusion/expr/src/planner.rs b/datafusion/expr/src/planner.rs index c775427df138..24f589c41582 100644 --- a/datafusion/expr/src/planner.rs +++ b/datafusion/expr/src/planner.rs @@ -197,6 +197,13 @@ pub trait ExprPlanner: Send + Sync { "Default planner compound identifier hasn't been implemented for ExprPlanner" ) } + + /// Plans `ANY` expression, e.g., `expr = ANY(array_expr)` + /// + /// Returns origin binary expression if not possible + fn plan_any(&self, expr: RawBinaryExpr) -> Result> { + Ok(PlannerResult::Original(expr)) + } } /// An operator with two arguments to plan diff --git a/datafusion/expr/src/type_coercion/binary.rs b/datafusion/expr/src/type_coercion/binary.rs index 17280289ed1b..8da33081d652 100644 --- a/datafusion/expr/src/type_coercion/binary.rs +++ b/datafusion/expr/src/type_coercion/binary.rs @@ -890,15 +890,22 @@ fn dictionary_coercion( /// 2. Data type of the other side should be able to cast to string type fn string_concat_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { use arrow::datatypes::DataType::*; - string_coercion(lhs_type, rhs_type).or(match (lhs_type, rhs_type) { - (Utf8, from_type) | (from_type, Utf8) => { - string_concat_internal_coercion(from_type, &Utf8) - } - (LargeUtf8, from_type) | (from_type, LargeUtf8) => { - string_concat_internal_coercion(from_type, &LargeUtf8) + match (lhs_type, rhs_type) { + // If Utf8View is in any side, we coerce to Utf8. + // Ref: https://github.com/apache/datafusion/pull/11796 + (Utf8View, Utf8View | Utf8 | LargeUtf8) | (Utf8 | LargeUtf8, Utf8View) => { + Some(Utf8) } - _ => None, - }) + _ => string_coercion(lhs_type, rhs_type).or(match (lhs_type, rhs_type) { + (Utf8, from_type) | (from_type, Utf8) => { + string_concat_internal_coercion(from_type, &Utf8) + } + (LargeUtf8, from_type) | (from_type, LargeUtf8) => { + string_concat_internal_coercion(from_type, &LargeUtf8) + } + _ => None, + }), + } } fn array_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { diff --git a/datafusion/expr/src/type_coercion/functions.rs b/datafusion/expr/src/type_coercion/functions.rs index 66807c3f446c..4f2776516d3e 100644 --- a/datafusion/expr/src/type_coercion/functions.rs +++ b/datafusion/expr/src/type_coercion/functions.rs @@ -583,6 +583,10 @@ fn coerced_from<'a>( (Interval(_), _) if matches!(type_from, Utf8 | LargeUtf8) => { Some(type_into.clone()) } + // We can go into a Utf8View from a Utf8 or LargeUtf8 + (Utf8View, _) if matches!(type_from, Utf8 | LargeUtf8 | Null) => { + Some(type_into.clone()) + } // Any type can be coerced into strings (Utf8 | LargeUtf8, _) => Some(type_into.clone()), (Null, _) if can_cast_types(type_from, type_into) => Some(type_into.clone()), @@ -646,6 +650,18 @@ mod tests { use super::*; use arrow::datatypes::Field; + #[test] + fn test_string_conversion() { + let cases = vec![ + (DataType::Utf8View, DataType::Utf8, true), + (DataType::Utf8View, DataType::LargeUtf8, true), + ]; + + for case in cases { + assert_eq!(can_coerce_from(&case.0, &case.1), case.2); + } + } + #[test] fn test_maybe_data_types() { // this vec contains: arg1, arg2, expected result diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index bdda5a565947..fe1df2579932 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -34,19 +34,19 @@ use std::sync::Arc; // Create static instances of ScalarUDFs for each function make_udf_expr_and_func!(ArrayHas, array_has, - first_array second_array, // arg name + haystack_array element, // arg names "returns true, if the element appears in the first array, otherwise false.", // doc array_has_udf // internal function name ); make_udf_expr_and_func!(ArrayHasAll, array_has_all, - first_array second_array, // arg name + haystack_array needle_array, // arg names "returns true if each element of the second array appears in the first array; otherwise, it returns false.", // doc array_has_all_udf // internal function name ); make_udf_expr_and_func!(ArrayHasAny, array_has_any, - first_array second_array, // arg name + haystack_array needle_array, // arg names "returns true if at least one element of the second array appears in the first array; otherwise, it returns false.", // doc array_has_any_udf // internal function name ); @@ -262,26 +262,26 @@ enum ComparisonType { } fn general_array_has_dispatch( - array: &ArrayRef, - sub_array: &ArrayRef, + haystack: &ArrayRef, + needle: &ArrayRef, comparison_type: ComparisonType, ) -> Result { let array = if comparison_type == ComparisonType::Single { - let arr = as_generic_list_array::(array)?; - check_datatypes("array_has", &[arr.values(), sub_array])?; + let arr = as_generic_list_array::(haystack)?; + check_datatypes("array_has", &[arr.values(), needle])?; arr } else { - check_datatypes("array_has", &[array, sub_array])?; - as_generic_list_array::(array)? + check_datatypes("array_has", &[haystack, needle])?; + as_generic_list_array::(haystack)? }; let mut boolean_builder = BooleanArray::builder(array.len()); let converter = RowConverter::new(vec![SortField::new(array.value_type())])?; - let element = Arc::clone(sub_array); + let element = Arc::clone(needle); let sub_array = if comparison_type != ComparisonType::Single { - as_generic_list_array::(sub_array)? + as_generic_list_array::(needle)? } else { array }; diff --git a/datafusion/functions-nested/src/planner.rs b/datafusion/functions-nested/src/planner.rs index f980362105a1..4cd8faa3ca98 100644 --- a/datafusion/functions-nested/src/planner.rs +++ b/datafusion/functions-nested/src/planner.rs @@ -17,7 +17,7 @@ //! SQL planning extensions like [`NestedFunctionPlanner`] and [`FieldAccessPlanner`] -use datafusion_common::{exec_err, utils::list_ndims, DFSchema, Result}; +use datafusion_common::{plan_err, utils::list_ndims, DFSchema, Result}; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{ planner::{ExprPlanner, PlannerResult, RawBinaryExpr, RawFieldAccessExpr}, @@ -28,7 +28,7 @@ use datafusion_functions_aggregate::nth_value::nth_value_udaf; use crate::map::map_udf; use crate::{ - array_has::array_has_all, + array_has::{array_has_all, array_has_udf}, expr_fn::{array_append, array_concat, array_prepend}, extract::{array_element, array_slice}, make_array::make_array, @@ -102,7 +102,7 @@ impl ExprPlanner for NestedFunctionPlanner { fn plan_make_map(&self, args: Vec) -> Result>> { if args.len() % 2 != 0 { - return exec_err!("make_map requires an even number of arguments"); + return plan_err!("make_map requires an even number of arguments"); } let (keys, values): (Vec<_>, Vec<_>) = @@ -114,6 +114,20 @@ impl ExprPlanner for NestedFunctionPlanner { ScalarFunction::new_udf(map_udf(), vec![keys, values]), ))) } + + fn plan_any(&self, expr: RawBinaryExpr) -> Result> { + if expr.op == sqlparser::ast::BinaryOperator::Eq { + Ok(PlannerResult::Planned(Expr::ScalarFunction( + ScalarFunction::new_udf( + array_has_udf(), + // left and right are reversed here so `needle=any(haystack)` -> `array_has(haystack, needle)` + vec![expr.right, expr.left], + ), + ))) + } else { + plan_err!("Unsupported AnyOp: '{}', only '=' is supported", expr.op) + } + } } pub struct FieldAccessPlanner; diff --git a/datafusion/functions/src/string/starts_with.rs b/datafusion/functions/src/string/starts_with.rs index 05bd960ff14b..8450697cbf30 100644 --- a/datafusion/functions/src/string/starts_with.rs +++ b/datafusion/functions/src/string/starts_with.rs @@ -18,10 +18,10 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{ArrayRef, OffsetSizeTrait}; +use arrow::array::ArrayRef; use arrow::datatypes::DataType; -use datafusion_common::{cast::as_generic_string_array, internal_err, Result}; +use datafusion_common::{internal_err, Result}; use datafusion_expr::ColumnarValue; use datafusion_expr::TypeSignature::*; use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; @@ -30,12 +30,8 @@ use crate::utils::make_scalar_function; /// Returns true if string starts with prefix. /// starts_with('alphabet', 'alph') = 't' -pub fn starts_with(args: &[ArrayRef]) -> Result { - let left = as_generic_string_array::(&args[0])?; - let right = as_generic_string_array::(&args[1])?; - - let result = arrow::compute::kernels::comparison::starts_with(left, right)?; - +pub fn starts_with(args: &[ArrayRef]) -> Result { + let result = arrow::compute::kernels::comparison::starts_with(&args[0], &args[1])?; Ok(Arc::new(result) as ArrayRef) } @@ -52,14 +48,15 @@ impl Default for StartsWithFunc { impl StartsWithFunc { pub fn new() -> Self { - use DataType::*; Self { signature: Signature::one_of( vec![ - Exact(vec![Utf8, Utf8]), - Exact(vec![Utf8, LargeUtf8]), - Exact(vec![LargeUtf8, Utf8]), - Exact(vec![LargeUtf8, LargeUtf8]), + // Planner attempts coercion to the target type starting with the most preferred candidate. + // For example, given input `(Utf8View, Utf8)`, it first tries coercing to `(Utf8View, Utf8View)`. + // If that fails, it proceeds to `(Utf8, Utf8)`. + Exact(vec![DataType::Utf8View, DataType::Utf8View]), + Exact(vec![DataType::Utf8, DataType::Utf8]), + Exact(vec![DataType::LargeUtf8, DataType::LargeUtf8]), ], Volatility::Immutable, ), @@ -81,18 +78,73 @@ impl ScalarUDFImpl for StartsWithFunc { } fn return_type(&self, _arg_types: &[DataType]) -> Result { - use DataType::*; - - Ok(Boolean) + Ok(DataType::Boolean) } fn invoke(&self, args: &[ColumnarValue]) -> Result { match args[0].data_type() { - DataType::Utf8 => make_scalar_function(starts_with::, vec![])(args), - DataType::LargeUtf8 => { - return make_scalar_function(starts_with::, vec![])(args); + DataType::Utf8View | DataType::Utf8 | DataType::LargeUtf8 => { + make_scalar_function(starts_with, vec![])(args) } - _ => internal_err!("Unsupported data type"), + _ => internal_err!("Unsupported data types for starts_with. Expected Utf8, LargeUtf8 or Utf8View")?, } } } + +#[cfg(test)] +mod tests { + use crate::utils::test::test_function; + use arrow::array::{Array, BooleanArray}; + use arrow::datatypes::DataType::Boolean; + use datafusion_common::{Result, ScalarValue}; + use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; + + use super::*; + + #[test] + fn test_functions() -> Result<()> { + // Generate test cases for starts_with + let test_cases = vec![ + (Some("alphabet"), Some("alph"), Some(true)), + (Some("alphabet"), Some("bet"), Some(false)), + ( + Some("somewhat large string"), + Some("somewhat large"), + Some(true), + ), + (Some("somewhat large string"), Some("large"), Some(false)), + ] + .into_iter() + .flat_map(|(a, b, c)| { + let utf_8_args = vec![ + ColumnarValue::Scalar(ScalarValue::Utf8(a.map(|s| s.to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8(b.map(|s| s.to_string()))), + ]; + + let large_utf_8_args = vec![ + ColumnarValue::Scalar(ScalarValue::LargeUtf8(a.map(|s| s.to_string()))), + ColumnarValue::Scalar(ScalarValue::LargeUtf8(b.map(|s| s.to_string()))), + ]; + + let utf_8_view_args = vec![ + ColumnarValue::Scalar(ScalarValue::Utf8View(a.map(|s| s.to_string()))), + ColumnarValue::Scalar(ScalarValue::Utf8View(b.map(|s| s.to_string()))), + ]; + + vec![(utf_8_args, c), (large_utf_8_args, c), (utf_8_view_args, c)] + }); + + for (args, expected) in test_cases { + test_function!( + StartsWithFunc::new(), + &args, + Ok(expected), + bool, + Boolean, + BooleanArray + ); + } + + Ok(()) + } +} diff --git a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs index 8d69646bd422..acf1ae525c79 100644 --- a/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs +++ b/datafusion/physical-expr-common/src/aggregate/groups_accumulator/prim_op.rs @@ -184,7 +184,8 @@ where "initial_values underlying buffer must not be shared" ) })? - .map_err(DataFusionError::from)?; + .map_err(DataFusionError::from)? + .with_data_type(self.data_type.clone()); Ok(vec![Arc::new(state_values)]) } diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index 665cdd708329..350023352b12 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -22,6 +22,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::exec_err; use datafusion_common::{internal_err, not_impl_err, DFSchema, Result}; +use datafusion_expr::expr::create_function_physical_name; use datafusion_expr::function::StateFieldsArgs; use datafusion_expr::type_coercion::aggregates::check_arg_count; use datafusion_expr::utils::AggregateOrderSensitivity; @@ -67,7 +68,7 @@ pub fn create_aggregate_expr( sort_exprs: &[Expr], ordering_req: &[PhysicalSortExpr], schema: &Schema, - name: impl Into, + name: Option, ignore_nulls: bool, is_distinct: bool, ) -> Result> { @@ -77,7 +78,9 @@ pub fn create_aggregate_expr( builder = builder.order_by(ordering_req.to_vec()); builder = builder.logical_exprs(input_exprs.to_vec()); builder = builder.schema(Arc::new(schema.clone())); - builder = builder.name(name); + if let Some(name) = name { + builder = builder.alias(name); + } if ignore_nulls { builder = builder.ignore_nulls(); @@ -98,7 +101,7 @@ pub fn create_aggregate_expr_with_dfschema( sort_exprs: &[Expr], ordering_req: &[PhysicalSortExpr], dfschema: &DFSchema, - name: impl Into, + alias: Option, ignore_nulls: bool, is_distinct: bool, is_reversed: bool, @@ -111,7 +114,9 @@ pub fn create_aggregate_expr_with_dfschema( builder = builder.dfschema(dfschema.clone()); let schema: Schema = dfschema.into(); builder = builder.schema(Arc::new(schema)); - builder = builder.name(name); + if let Some(alias) = alias { + builder = builder.alias(alias); + } if ignore_nulls { builder = builder.ignore_nulls(); @@ -137,7 +142,7 @@ pub struct AggregateExprBuilder { args: Vec>, /// Logical expressions of the aggregate function, it will be deprecated in logical_args: Vec, - name: String, + alias: Option, /// Arrow Schema for the aggregate function schema: SchemaRef, /// Datafusion Schema for the aggregate function @@ -160,7 +165,7 @@ impl AggregateExprBuilder { fun, args, logical_args: vec![], - name: String::new(), + alias: None, schema: Arc::new(Schema::empty()), dfschema: DFSchema::empty(), sort_exprs: vec![], @@ -176,7 +181,7 @@ impl AggregateExprBuilder { fun, args, logical_args, - name, + alias, schema, dfschema, sort_exprs, @@ -213,6 +218,19 @@ impl AggregateExprBuilder { )?; let data_type = fun.return_type(&input_exprs_types)?; + let name = match alias { + None => create_function_physical_name( + fun.name(), + is_distinct, + &logical_args, + if sort_exprs.is_empty() { + None + } else { + Some(&sort_exprs) + }, + )?, + Some(alias) => alias, + }; Ok(Arc::new(AggregateFunctionExpr { fun: Arc::unwrap_or_clone(fun), @@ -232,8 +250,8 @@ impl AggregateExprBuilder { })) } - pub fn name(mut self, name: impl Into) -> Self { - self.name = name.into(); + pub fn alias(mut self, alias: impl Into) -> Self { + self.alias = Some(alias.into()); self } @@ -680,7 +698,7 @@ impl AggregateExpr for AggregateFunctionExpr { &self.sort_exprs, &self.ordering_req, &self.dfschema, - self.name(), + Some(self.name().to_string()), self.ignore_nulls, self.is_distinct, self.is_reversed, @@ -721,7 +739,7 @@ impl AggregateExpr for AggregateFunctionExpr { &reverse_sort_exprs, &reverse_ordering_req, &self.dfschema, - name, + Some(name), self.ignore_nulls, self.is_distinct, !self.is_reversed, diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 6c4791b158c8..4c37db4849a7 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -17,9 +17,10 @@ mod guarantee; pub use guarantee::{Guarantee, LiteralGuarantee}; +use hashbrown::HashSet; use std::borrow::Borrow; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::Arc; use crate::expressions::{BinaryExpr, Column}; @@ -204,9 +205,7 @@ pub fn collect_columns(expr: &Arc) -> HashSet { let mut columns = HashSet::::new(); expr.apply(|expr| { if let Some(column) = expr.as_any().downcast_ref::() { - if !columns.iter().any(|c| c.eq(column)) { - columns.insert(column.clone()); - } + columns.get_or_insert_owned(column); } Ok(TreeNodeRecursion::Continue) }) diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 0ce92df393aa..66b250c5063b 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -296,3 +296,5 @@ fn is_max(agg_expr: &dyn AggregateExpr) -> bool { } false } + +// See tests in datafusion/core/tests/physical_optimizer diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index 8941418c12e1..d72da9b30049 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -1362,7 +1362,7 @@ mod tests { let aggregates = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1i8)]) .schema(Arc::clone(&input_schema)) - .name("COUNT(1)") + .alias("COUNT(1)") .logical_exprs(vec![datafusion_expr::lit(1i8)]) .build()?]; @@ -1507,7 +1507,7 @@ mod tests { vec![ AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) .schema(Arc::clone(&input_schema)) - .name("AVG(b)") + .alias("AVG(b)") .build()?, ]; @@ -1803,7 +1803,7 @@ mod tests { fn test_median_agg_expr(schema: SchemaRef) -> Result> { AggregateExprBuilder::new(median_udaf(), vec![col("a", &schema)?]) .schema(schema) - .name("MEDIAN(a)") + .alias("MEDIAN(a)") .build() } @@ -1834,7 +1834,7 @@ mod tests { vec![ AggregateExprBuilder::new(avg_udaf(), vec![col("b", &input_schema)?]) .schema(Arc::clone(&input_schema)) - .name("AVG(b)") + .alias("AVG(b)") .build()?, ]; @@ -1894,7 +1894,7 @@ mod tests { vec![ AggregateExprBuilder::new(avg_udaf(), vec![col("a", &schema)?]) .schema(Arc::clone(&schema)) - .name("AVG(a)") + .alias("AVG(a)") .build()?, ]; @@ -1934,7 +1934,7 @@ mod tests { vec![ AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) - .name("AVG(b)") + .alias("AVG(b)") .build()?, ]; @@ -2002,7 +2002,7 @@ mod tests { &sort_exprs, &ordering_req, dfschema, - "FIRST_VALUE(b)", + None, false, false, false, @@ -2034,7 +2034,7 @@ mod tests { &sort_exprs, &ordering_req, dfschema, - "LAST_VALUE(b)", + None, false, false, false, @@ -2130,24 +2130,24 @@ mod tests { let result = crate::collect(aggregate_final, task_ctx).await?; if is_first_acc { let expected = [ - "+---+----------------+", - "| a | FIRST_VALUE(b) |", - "+---+----------------+", - "| 2 | 0.0 |", - "| 3 | 1.0 |", - "| 4 | 3.0 |", - "+---+----------------+", + "+---+--------------------------------------------+", + "| a | first_value(b) ORDER BY [b ASC NULLS LAST] |", + "+---+--------------------------------------------+", + "| 2 | 0.0 |", + "| 3 | 1.0 |", + "| 4 | 3.0 |", + "+---+--------------------------------------------+", ]; assert_batches_eq!(expected, &result); } else { let expected = [ - "+---+---------------+", - "| a | LAST_VALUE(b) |", - "+---+---------------+", - "| 2 | 3.0 |", - "| 3 | 5.0 |", - "| 4 | 6.0 |", - "+---+---------------+", + "+---+-------------------------------------------+", + "| a | last_value(b) ORDER BY [b ASC NULLS LAST] |", + "+---+-------------------------------------------+", + "| 2 | 3.0 |", + "| 3 | 5.0 |", + "| 4 | 6.0 |", + "+---+-------------------------------------------+", ]; assert_batches_eq!(expected, &result); }; @@ -2267,7 +2267,7 @@ mod tests { &sort_exprs, &ordering_req, &test_df_schema, - "array_agg", + None, false, false, false, @@ -2363,7 +2363,7 @@ mod tests { let aggregates: Vec> = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)]) .schema(Arc::clone(&schema)) - .name("1") + .alias("1") .build()?]; let input_batches = (0..4) @@ -2427,7 +2427,7 @@ mod tests { &[], &[], &df_schema, - "COUNT(val)", + Some("COUNT(val)".to_string()), false, false, false, @@ -2515,7 +2515,7 @@ mod tests { &[], &[], &df_schema, - "COUNT(val)", + Some("COUNT(val)".to_string()), false, false, false, diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index be980de8b498..0db594ac4f45 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -29,12 +29,12 @@ use crate::aggregates::{ PhysicalGroupBy, }; use crate::common::IPCWriter; -use crate::metrics::{BaselineMetrics, RecordOutput}; +use crate::metrics::{BaselineMetrics, MetricBuilder, RecordOutput}; use crate::sorts::sort::sort_batch; use crate::sorts::streaming_merge; use crate::spill::read_spill_as_stream; use crate::stream::RecordBatchStreamAdapter; -use crate::{aggregates, ExecutionPlan, PhysicalExpr}; +use crate::{aggregates, metrics, ExecutionPlan, PhysicalExpr}; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::*; @@ -118,10 +118,22 @@ struct SkipAggregationProbe { /// Flag indicating that further updates of `SkipAggregationProbe` /// state won't make any effect is_locked: bool, + + /// Number of rows where state was output without aggregation. + /// + /// * If 0, all input rows were aggregated (should_skip was always false) + /// + /// * if greater than zero, the number of rows which were output directly + /// without aggregation + skipped_aggregation_rows: metrics::Count, } impl SkipAggregationProbe { - fn new(probe_rows_threshold: usize, probe_ratio_threshold: f64) -> Self { + fn new( + probe_rows_threshold: usize, + probe_ratio_threshold: f64, + skipped_aggregation_rows: metrics::Count, + ) -> Self { Self { input_rows: 0, num_groups: 0, @@ -129,6 +141,7 @@ impl SkipAggregationProbe { probe_ratio_threshold, should_skip: false, is_locked: false, + skipped_aggregation_rows, } } @@ -161,6 +174,11 @@ impl SkipAggregationProbe { self.should_skip = false; self.is_locked = true; } + + /// Record the number of rows that were output directly without aggregation + fn record_skipped(&mut self, batch: &RecordBatch) { + self.skipped_aggregation_rows.add(batch.num_rows()); + } } /// HashTable based Grouping Aggregator @@ -474,17 +492,17 @@ impl GroupedHashAggregateStream { .all(|acc| acc.supports_convert_to_state()) && agg_group_by.is_single() { + let options = &context.session_config().options().execution; + let probe_rows_threshold = + options.skip_partial_aggregation_probe_rows_threshold; + let probe_ratio_threshold = + options.skip_partial_aggregation_probe_ratio_threshold; + let skipped_aggregation_rows = MetricBuilder::new(&agg.metrics) + .counter("skipped_aggregation_rows", partition); Some(SkipAggregationProbe::new( - context - .session_config() - .options() - .execution - .skip_partial_aggregation_probe_rows_threshold, - context - .session_config() - .options() - .execution - .skip_partial_aggregation_probe_ratio_threshold, + probe_rows_threshold, + probe_ratio_threshold, + skipped_aggregation_rows, )) } else { None @@ -613,6 +631,9 @@ impl Stream for GroupedHashAggregateStream { match ready!(self.input.poll_next_unpin(cx)) { Some(Ok(batch)) => { let _timer = elapsed_compute.timer(); + if let Some(probe) = self.skip_aggregation_probe.as_mut() { + probe.record_skipped(&batch); + } let states = self.transform_to_states(batch)?; return Poll::Ready(Some(Ok( states.record_output(&self.baseline_metrics) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index b822ec2dafeb..de42a55ad350 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -492,8 +492,10 @@ fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch { if actual_buffer_size > (ideal_buffer_size * 2) { // We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches. // See https://github.com/apache/arrow-rs/issues/6094 for more details. - let mut builder = StringViewBuilder::with_capacity(s.len()) - .with_block_size(ideal_buffer_size as u32); + let mut builder = StringViewBuilder::with_capacity(s.len()); + if ideal_buffer_size > 0 { + builder = builder.with_block_size(ideal_buffer_size as u32); + } for v in s.iter() { builder.append_option(v); @@ -802,7 +804,7 @@ mod tests { impl StringViewTest { /// Create a `StringViewArray` with the parameters specified in this struct fn build(self) -> StringViewArray { - let mut builder = StringViewBuilder::with_capacity(100); + let mut builder = StringViewBuilder::with_capacity(100).with_block_size(8192); loop { for &v in self.strings.iter() { builder.append_option(v); diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 063f35059fb8..14835f717ea3 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -1583,7 +1583,6 @@ mod tests { use rstest::*; use rstest_reuse::*; - #[cfg(not(feature = "force_hash_collisions"))] fn div_ceil(a: usize, b: usize) -> usize { (a + b - 1) / b } @@ -1931,9 +1930,6 @@ mod tests { Ok(()) } - // FIXME(#TODO) test fails with feature `force_hash_collisions` - // https://github.com/apache/datafusion/issues/11658 - #[cfg(not(feature = "force_hash_collisions"))] #[apply(batch_sizes)] #[tokio::test] async fn join_inner_two(batch_size: usize) -> Result<()> { @@ -1964,12 +1960,20 @@ mod tests { assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]); - // expected joined records = 3 - // in case batch_size is 1 - additional empty batch for remaining 3-2 row - let mut expected_batch_count = div_ceil(3, batch_size); - if batch_size == 1 { - expected_batch_count += 1; - } + let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) { + // Expected number of hash table matches = 3 + // in case batch_size is 1 - additional empty batch for remaining 3-2 row + let mut expected_batch_count = div_ceil(3, batch_size); + if batch_size == 1 { + expected_batch_count += 1; + } + expected_batch_count + } else { + // With hash collisions enabled, all records will match each other + // and filtered later. + div_ceil(9, batch_size) + }; + assert_eq!(batches.len(), expected_batch_count); let expected = [ @@ -1989,9 +1993,6 @@ mod tests { } /// Test where the left has 2 parts, the right with 1 part => 1 part - // FIXME(#TODO) test fails with feature `force_hash_collisions` - // https://github.com/apache/datafusion/issues/11658 - #[cfg(not(feature = "force_hash_collisions"))] #[apply(batch_sizes)] #[tokio::test] async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> { @@ -2029,12 +2030,20 @@ mod tests { assert_eq!(columns, vec!["a1", "b2", "c1", "a1", "b2", "c2"]); - // expected joined records = 3 - // in case batch_size is 1 - additional empty batch for remaining 3-2 row - let mut expected_batch_count = div_ceil(3, batch_size); - if batch_size == 1 { - expected_batch_count += 1; - } + let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) { + // Expected number of hash table matches = 3 + // in case batch_size is 1 - additional empty batch for remaining 3-2 row + let mut expected_batch_count = div_ceil(3, batch_size); + if batch_size == 1 { + expected_batch_count += 1; + } + expected_batch_count + } else { + // With hash collisions enabled, all records will match each other + // and filtered later. + div_ceil(9, batch_size) + }; + assert_eq!(batches.len(), expected_batch_count); let expected = [ @@ -2104,9 +2113,6 @@ mod tests { } /// Test where the left has 1 part, the right has 2 parts => 2 parts - // FIXME(#TODO) test fails with feature `force_hash_collisions` - // https://github.com/apache/datafusion/issues/11658 - #[cfg(not(feature = "force_hash_collisions"))] #[apply(batch_sizes)] #[tokio::test] async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> { @@ -2143,12 +2149,19 @@ mod tests { let stream = join.execute(0, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; - // expected joined records = 1 (first right batch) - // and additional empty batch for non-joined 20-6-80 - let mut expected_batch_count = div_ceil(1, batch_size); - if batch_size == 1 { - expected_batch_count += 1; - } + let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) { + // Expected number of hash table matches for first right batch = 1 + // and additional empty batch for non-joined 20-6-80 + let mut expected_batch_count = div_ceil(1, batch_size); + if batch_size == 1 { + expected_batch_count += 1; + } + expected_batch_count + } else { + // With hash collisions enabled, all records will match each other + // and filtered later. + div_ceil(6, batch_size) + }; assert_eq!(batches.len(), expected_batch_count); let expected = [ @@ -2166,8 +2179,14 @@ mod tests { let stream = join.execute(1, Arc::clone(&task_ctx))?; let batches = common::collect(stream).await?; - // expected joined records = 2 (second right batch) - let expected_batch_count = div_ceil(2, batch_size); + let expected_batch_count = if cfg!(not(feature = "force_hash_collisions")) { + // Expected number of hash table matches for second right batch = 2 + div_ceil(2, batch_size) + } else { + // With hash collisions enabled, all records will match each other + // and filtered later. + div_ceil(3, batch_size) + }; assert_eq!(batches.len(), expected_batch_count); let expected = [ @@ -3732,9 +3751,9 @@ mod tests { | JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => { - (expected_resultset_records + batch_size - 1) / batch_size + div_ceil(expected_resultset_records, batch_size) } - _ => (expected_resultset_records + batch_size - 1) / batch_size + 1, + _ => div_ceil(expected_resultset_records, batch_size) + 1, }; assert_eq!( batches.len(), diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 9f1465c2d7c1..d69d818331be 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -47,7 +47,7 @@ use arrow::compute::concat_batches; use arrow::datatypes::{Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use arrow::util::bit_util; -use datafusion_common::{exec_err, JoinSide, Result, Statistics}; +use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics}; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_expr::JoinType; @@ -562,62 +562,54 @@ fn join_left_and_right_batch( schema: &Schema, visited_left_side: &SharedBitmapBuilder, ) -> Result { - let indices_result = (0..left_batch.num_rows()) + let indices = (0..left_batch.num_rows()) .map(|left_row_index| { build_join_indices(left_row_index, right_batch, left_batch, filter) }) - .collect::>>(); + .collect::>>() + .map_err(|e| { + exec_datafusion_err!( + "Fail to build join indices in NestedLoopJoinExec, error:{e}" + ) + })?; let mut left_indices_builder = UInt64Builder::new(); let mut right_indices_builder = UInt32Builder::new(); - let left_right_indices = match indices_result { - Err(err) => { - exec_err!("Fail to build join indices in NestedLoopJoinExec, error:{err}") - } - Ok(indices) => { - for (left_side, right_side) in indices { - left_indices_builder - .append_values(left_side.values(), &vec![true; left_side.len()]); - right_indices_builder - .append_values(right_side.values(), &vec![true; right_side.len()]); - } - Ok(( - left_indices_builder.finish(), - right_indices_builder.finish(), - )) - } - }; - match left_right_indices { - Ok((left_side, right_side)) => { - // set the left bitmap - // and only full join need the left bitmap - if need_produce_result_in_final(join_type) { - let mut bitmap = visited_left_side.lock(); - left_side.iter().flatten().for_each(|x| { - bitmap.set_bit(x as usize, true); - }); - } - // adjust the two side indices base on the join type - let (left_side, right_side) = adjust_indices_by_join_type( - left_side, - right_side, - 0..right_batch.num_rows(), - join_type, - false, - ); + for (left_side, right_side) in indices { + left_indices_builder + .append_values(left_side.values(), &vec![true; left_side.len()]); + right_indices_builder + .append_values(right_side.values(), &vec![true; right_side.len()]); + } - build_batch_from_indices( - schema, - left_batch, - right_batch, - &left_side, - &right_side, - column_indices, - JoinSide::Left, - ) - } - Err(e) => Err(e), + let left_side = left_indices_builder.finish(); + let right_side = right_indices_builder.finish(); + // set the left bitmap + // and only full join need the left bitmap + if need_produce_result_in_final(join_type) { + let mut bitmap = visited_left_side.lock(); + left_side.iter().flatten().for_each(|x| { + bitmap.set_bit(x as usize, true); + }); } + // adjust the two side indices base on the join type + let (left_side, right_side) = adjust_indices_by_join_type( + left_side, + right_side, + 0..right_batch.num_rows(), + join_type, + false, + ); + + build_batch_from_indices( + schema, + left_batch, + right_batch, + &left_side, + &right_side, + column_indices, + JoinSide::Left, + ) } fn get_final_indices_from_shared_bitmap( diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 65cef28efc45..b41f3ad71bb8 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -128,7 +128,7 @@ pub fn create_window_expr( let aggregate = AggregateExprBuilder::new(Arc::clone(fun), args.to_vec()) .schema(Arc::new(input_schema.clone())) - .name(name) + .alias(name) .order_by(order_by.to_vec()) .sort_exprs(sort_exprs) .with_ignore_nulls(ignore_nulls) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 0a91babdfb60..bc019725f36c 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1645,6 +1645,9 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::Ddl(DdlStatement::CreateMemoryTable(_)) => Err(proto_error( "LogicalPlan serde is not yet implemented for CreateMemoryTable", )), + LogicalPlan::Ddl(DdlStatement::CreateIndex(_)) => Err(proto_error( + "LogicalPlan serde is not yet implemented for CreateIndex", + )), LogicalPlan::Ddl(DdlStatement::DropTable(_)) => Err(proto_error( "LogicalPlan serde is not yet implemented for DropTable", )), diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index aefa1d87a278..59db791c7595 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -492,7 +492,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode { // https://github.com/apache/datafusion/issues/11804 AggregateExprBuilder::new(agg_udf, input_phy_expr) .schema(Arc::clone(&physical_schema)) - .name(name) + .alias(name) .with_ignore_nulls(agg_node.ignore_nulls) .with_distinct(agg_node.distinct) .build() diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index b96398ef217f..e5c226418441 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -592,7 +592,9 @@ async fn roundtrip_logical_plan_copy_to_parquet() -> Result<()> { // Set specific Parquet format options let mut key_value_metadata = HashMap::new(); key_value_metadata.insert("test".to_string(), Some("test".to_string())); - parquet_format.key_value_metadata = key_value_metadata.clone(); + parquet_format + .key_value_metadata + .clone_from(&key_value_metadata); parquet_format.global.allow_single_file_parallelism = false; parquet_format.global.created_by = "test".to_string(); diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 0e2bc9cbb3e2..712182791b0b 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -296,7 +296,7 @@ fn roundtrip_window() -> Result<()> { vec![cast(col("b", &schema)?, &schema, DataType::Float64)?], ) .schema(Arc::clone(&schema)) - .name("avg(b)") + .alias("avg(b)") .build()?, &[], &[], @@ -312,7 +312,7 @@ fn roundtrip_window() -> Result<()> { let args = vec![cast(col("a", &schema)?, &schema, DataType::Float64)?]; let sum_expr = AggregateExprBuilder::new(sum_udaf(), args) .schema(Arc::clone(&schema)) - .name("SUM(a) RANGE BETWEEN CURRENT ROW AND UNBOUNDED PRECEEDING") + .alias("SUM(a) RANGE BETWEEN CURRENT ROW AND UNBOUNDED PRECEEDING") .build()?; let sliding_aggr_window_expr = Arc::new(SlidingAggregateWindowExpr::new( @@ -346,17 +346,17 @@ fn rountrip_aggregate() -> Result<()> { let avg_expr = AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) - .name("AVG(b)") + .alias("AVG(b)") .build()?; let nth_expr = AggregateExprBuilder::new(nth_value_udaf(), vec![col("b", &schema)?, lit(1u64)]) .schema(Arc::clone(&schema)) - .name("NTH_VALUE(b, 1)") + .alias("NTH_VALUE(b, 1)") .build()?; let str_agg_expr = AggregateExprBuilder::new(string_agg_udaf(), vec![col("b", &schema)?, lit(1u64)]) .schema(Arc::clone(&schema)) - .name("NTH_VALUE(b, 1)") + .alias("NTH_VALUE(b, 1)") .build()?; let test_cases: Vec>> = vec![ @@ -396,7 +396,7 @@ fn rountrip_aggregate_with_limit() -> Result<()> { vec![ AggregateExprBuilder::new(avg_udaf(), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) - .name("AVG(b)") + .alias("AVG(b)") .build()?, ]; @@ -463,7 +463,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> { vec![ AggregateExprBuilder::new(Arc::new(udaf), vec![col("b", &schema)?]) .schema(Arc::clone(&schema)) - .name("example_agg") + .alias("example_agg") .build()?, ]; @@ -914,7 +914,7 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { vec![udf_expr.clone() as Arc], ) .schema(schema.clone()) - .name("max") + .alias("max") .build()?; let window = Arc::new(WindowAggExec::try_new( @@ -965,7 +965,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { let aggr_expr = AggregateExprBuilder::new(Arc::clone(&udaf), aggr_args.clone()) .schema(Arc::clone(&schema)) - .name("aggregate_udf") + .alias("aggregate_udf") .build()?; let filter = Arc::new(FilterExec::try_new( @@ -990,7 +990,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> Result<()> { let aggr_expr = AggregateExprBuilder::new(udaf, aggr_args.clone()) .schema(Arc::clone(&schema)) - .name("aggregate_udf") + .alias("aggregate_udf") .distinct() .ignore_nulls() .build()?; diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index b80ffb6aed3f..edb0002842a8 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -17,12 +17,12 @@ use arrow_schema::DataType; use arrow_schema::TimeUnit; -use datafusion_expr::planner::PlannerResult; -use datafusion_expr::planner::RawDictionaryExpr; -use datafusion_expr::planner::RawFieldAccessExpr; +use datafusion_expr::planner::{ + PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr, +}; use sqlparser::ast::{ - CastKind, DictionaryField, Expr as SQLExpr, MapEntry, StructField, Subscript, - TrimWhereField, Value, + BinaryOperator, CastKind, DictionaryField, Expr as SQLExpr, MapEntry, StructField, + Subscript, TrimWhereField, Value, }; use datafusion_common::{ @@ -104,13 +104,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { fn build_logical_expr( &self, - op: sqlparser::ast::BinaryOperator, + op: BinaryOperator, left: Expr, right: Expr, schema: &DFSchema, ) -> Result { // try extension planers - let mut binary_expr = datafusion_expr::planner::RawBinaryExpr { op, left, right }; + let mut binary_expr = RawBinaryExpr { op, left, right }; for planner in self.context_provider.get_expr_planners() { match planner.plan_binary_op(binary_expr, schema)? { PlannerResult::Planned(expr) => { @@ -122,7 +122,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } } - let datafusion_expr::planner::RawBinaryExpr { op, left, right } = binary_expr; + let RawBinaryExpr { op, left, right } = binary_expr; Ok(Expr::BinaryExpr(BinaryExpr::new( Box::new(left), self.parse_sql_binary_op(op)?, @@ -631,6 +631,36 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { SQLExpr::Map(map) => { self.try_plan_map_literal(map.entries, schema, planner_context) } + SQLExpr::AnyOp { + left, + compare_op, + right, + } => { + let mut binary_expr = RawBinaryExpr { + op: compare_op, + left: self.sql_expr_to_logical_expr( + *left, + schema, + planner_context, + )?, + right: self.sql_expr_to_logical_expr( + *right, + schema, + planner_context, + )?, + }; + for planner in self.context_provider.get_expr_planners() { + match planner.plan_any(binary_expr)? { + PlannerResult::Planned(expr) => { + return Ok(expr); + } + PlannerResult::Original(expr) => { + binary_expr = expr; + } + } + } + not_impl_err!("AnyOp not supported by ExprPlanner: {binary_expr:?}") + } _ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"), } } diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 3737e1adf8f3..6d47232ec270 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -45,20 +45,20 @@ use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{ cast, col, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, CreateFunction, CreateFunctionBody, - CreateMemoryTable, CreateView, DescribeTable, DmlStatement, DropCatalogSchema, - DropFunction, DropTable, DropView, EmptyRelation, Explain, Expr, ExprSchemable, - Filter, LogicalPlan, LogicalPlanBuilder, OperateFunctionArg, PlanType, Prepare, - SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode, - TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, - Volatility, WriteOp, + CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, DescribeTable, + DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, EmptyRelation, + Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, + OperateFunctionArg, PlanType, Prepare, SetVariable, Statement as PlanStatement, + ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd, + TransactionIsolationLevel, TransactionStart, Volatility, WriteOp, }; use sqlparser::ast; use sqlparser::ast::{ - Assignment, AssignmentTarget, ColumnDef, CreateTable, CreateTableOptions, Delete, - DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert, ObjectName, ObjectType, - OneOrManyWithParens, Query, SchemaName, SetExpr, ShowCreateObject, - ShowStatementFilter, Statement, TableConstraint, TableFactor, TableWithJoins, - TransactionMode, UnaryOperator, Value, + Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable, + CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, Ident, Insert, + ObjectName, ObjectType, OneOrManyWithParens, Query, SchemaName, SetExpr, + ShowCreateObject, ShowStatementFilter, Statement, TableConstraint, TableFactor, + TableWithJoins, TransactionMode, UnaryOperator, Value, }; use sqlparser::parser::ParserError::ParserError; @@ -769,6 +769,42 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { exec_err!("Function name not provided") } } + Statement::CreateIndex(CreateIndex { + name, + table_name, + using, + columns, + unique, + if_not_exists, + .. + }) => { + let name: Option = name.as_ref().map(object_name_to_string); + let table = self.object_name_to_table_reference(table_name)?; + let table_schema = self + .context_provider + .get_table_source(table.clone())? + .schema() + .to_dfschema_ref()?; + let using: Option = using.as_ref().map(ident_to_string); + let columns = self.order_by_to_sort_expr( + columns, + &table_schema, + planner_context, + false, + None, + )?; + Ok(LogicalPlan::Ddl(DdlStatement::CreateIndex( + PlanCreateIndex { + name, + table, + using, + columns, + unique, + if_not_exists, + schema: DFSchemaRef::new(DFSchema::empty()), + }, + ))) + } _ => { not_impl_err!("Unsupported SQL statement: {sql:?}") } diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index 8a5510eb69f3..4d7e60805657 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -28,11 +28,12 @@ use datafusion_common::{ assert_contains, DataFusionError, ParamValues, Result, ScalarValue, }; use datafusion_expr::{ + col, dml::CopyTo, logical_plan::{LogicalPlan, Prepare}, test::function_stub::sum_udaf, - ColumnarValue, CreateExternalTable, DdlStatement, ScalarUDF, ScalarUDFImpl, - Signature, Volatility, + ColumnarValue, CreateExternalTable, CreateIndex, DdlStatement, ScalarUDF, + ScalarUDFImpl, Signature, Volatility, }; use datafusion_functions::{string, unicode}; use datafusion_sql::{ @@ -4426,6 +4427,35 @@ fn test_parse_escaped_string_literal_value() { ) } +#[test] +fn plan_create_index() { + let sql = + "CREATE UNIQUE INDEX IF NOT EXISTS idx_name ON test USING btree (name, age DESC)"; + let plan = logical_plan_with_options(sql, ParserOptions::default()).unwrap(); + match plan { + LogicalPlan::Ddl(DdlStatement::CreateIndex(CreateIndex { + name, + table, + using, + columns, + unique, + if_not_exists, + .. + })) => { + assert_eq!(name, Some("idx_name".to_string())); + assert_eq!(format!("{table}"), "test"); + assert_eq!(using, Some("btree".to_string())); + assert_eq!( + columns, + vec![col("name").sort(true, false), col("age").sort(false, true),] + ); + assert!(unique); + assert!(if_not_exists); + } + _ => panic!("wrong plan type"), + } +} + fn assert_field_not_found(err: DataFusionError, name: &str) { match err { DataFusionError::SchemaError { .. } => { diff --git a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt index 65efc24ec037..6c0cf5f800d8 100644 --- a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt +++ b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt @@ -322,3 +322,29 @@ FROM aggregate_test_100_null GROUP BY c2 ORDER BY c2; 3 109 211 2.80575042963 2.80632930994 4 -171 56 2.10740506649 1.939846396446 5 -86 -76 1.8741710186 1.600569307804 + + +statement ok +DROP TABLE aggregate_test_100_null; + +# Test for aggregate functions with different intermediate types +# Need more than 10 values to trigger skipping +statement ok +CREATE TABLE decimal_table(i int, d decimal(10,3)) as +VALUES (1, 1.1), (2, 2.2), (3, 3.3), (2, 4.4), (1, 5.5); + +statement ok +CREATE TABLE t(id int) as values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10); + +query IR +SELECT i, sum(d) +FROM decimal_table CROSS JOIN t +GROUP BY i +ORDER BY i; +---- +1 66 +2 66 +3 33 + +statement ok +DROP TABLE decimal_table; diff --git a/datafusion/sqllogictest/test_files/array.slt b/datafusion/sqllogictest/test_files/array.slt index f2972e4c14c2..b71bc765ba37 100644 --- a/datafusion/sqllogictest/test_files/array.slt +++ b/datafusion/sqllogictest/test_files/array.slt @@ -5351,6 +5351,25 @@ true false true false false false true true false false true false true #---- #true false true false false false true true false false true false true +# any operator +query ? +select column3 from arrays where 'L'=any(column3); +---- +[L, o, r, e, m] + +query I +select count(*) from arrays where 'L'=any(column3); +---- +1 + +query I +select count(*) from arrays where 'X'=any(column3); +---- +0 + +query error DataFusion error: Error during planning: Unsupported AnyOp: '>', only '=' is supported +select count(*) from arrays where 'X'>any(column3); + ## array_distinct #TODO: https://github.com/apache/datafusion/issues/7142 diff --git a/datafusion/sqllogictest/test_files/string_view.slt b/datafusion/sqllogictest/test_files/string_view.slt index 763b4e99c614..4d3f72b1e8d4 100644 --- a/datafusion/sqllogictest/test_files/string_view.slt +++ b/datafusion/sqllogictest/test_files/string_view.slt @@ -355,6 +355,75 @@ logical_plan 01)Aggregate: groupBy=[[]], aggr=[[count(DISTINCT test.column1_utf8), count(DISTINCT test.column1_utf8view), count(DISTINCT test.column1_dict)]] 02)--TableScan: test projection=[column1_utf8, column1_utf8view, column1_dict] +### `STARTS_WITH` + +# Test STARTS_WITH with utf8view against utf8view, utf8, and largeutf8 +# (should be no casts) +query TT +EXPLAIN SELECT + STARTS_WITH(column1_utf8view, column2_utf8view) as c1, + STARTS_WITH(column1_utf8view, column2_utf8) as c2, + STARTS_WITH(column1_utf8view, column2_large_utf8) as c3 +FROM test; +---- +logical_plan +01)Projection: starts_with(test.column1_utf8view, test.column2_utf8view) AS c1, starts_with(test.column1_utf8view, CAST(test.column2_utf8 AS Utf8View)) AS c2, starts_with(test.column1_utf8view, CAST(test.column2_large_utf8 AS Utf8View)) AS c3 +02)--TableScan: test projection=[column2_utf8, column2_large_utf8, column1_utf8view, column2_utf8view] + +query BBB +SELECT + STARTS_WITH(column1_utf8view, column2_utf8view) as c1, + STARTS_WITH(column1_utf8view, column2_utf8) as c2, + STARTS_WITH(column1_utf8view, column2_large_utf8) as c3 +FROM test; +---- +false false false +true true true +true true true +NULL NULL NULL + +# Test STARTS_WITH with utf8 against utf8view, utf8, and largeutf8 +# Should work, but will have to cast to common types +# should cast utf8 -> utf8view and largeutf8 -> utf8view +query TT +EXPLAIN SELECT + STARTS_WITH(column1_utf8, column2_utf8view) as c1, + STARTS_WITH(column1_utf8, column2_utf8) as c3, + STARTS_WITH(column1_utf8, column2_large_utf8) as c4 +FROM test; +---- +logical_plan +01)Projection: starts_with(__common_expr_1, test.column2_utf8view) AS c1, starts_with(test.column1_utf8, test.column2_utf8) AS c3, starts_with(__common_expr_1, CAST(test.column2_large_utf8 AS Utf8View)) AS c4 +02)--Projection: CAST(test.column1_utf8 AS Utf8View) AS __common_expr_1, test.column1_utf8, test.column2_utf8, test.column2_large_utf8, test.column2_utf8view +03)----TableScan: test projection=[column1_utf8, column2_utf8, column2_large_utf8, column2_utf8view] + +query BBB + SELECT + STARTS_WITH(column1_utf8, column2_utf8view) as c1, + STARTS_WITH(column1_utf8, column2_utf8) as c3, + STARTS_WITH(column1_utf8, column2_large_utf8) as c4 +FROM test; +---- +false false false +true true true +true true true +NULL NULL NULL + + +# Test STARTS_WITH with utf8view against literals +# In this case, the literals should be cast to utf8view. The columns +# should not be cast to utf8. +query TT +EXPLAIN SELECT + STARTS_WITH(column1_utf8view, 'äöüß') as c1, + STARTS_WITH(column1_utf8view, '') as c2, + STARTS_WITH(column1_utf8view, NULL) as c3, + STARTS_WITH(NULL, column1_utf8view) as c4 +FROM test; +---- +logical_plan +01)Projection: starts_with(test.column1_utf8view, Utf8View("äöüß")) AS c1, starts_with(test.column1_utf8view, Utf8View("")) AS c2, starts_with(test.column1_utf8view, Utf8View(NULL)) AS c3, starts_with(Utf8View(NULL), test.column1_utf8view) AS c4 +02)--TableScan: test projection=[column1_utf8view] statement ok drop table test; @@ -376,6 +445,58 @@ select t.dt from dates t where arrow_cast('2024-01-01', 'Utf8View') < t.dt; ---- 2024-01-23 - statement ok drop table dates; + +statement ok +create table temp as values +('value1', arrow_cast('rust', 'Utf8View'), arrow_cast('fast', 'Utf8View')), +('value2', arrow_cast('datafusion', 'Utf8View'), arrow_cast('cool', 'Utf8View')); + +query T +select column2||' is fast' from temp; +---- +rust is fast +datafusion is fast + + +query T +select column2 || ' is ' || column3 from temp; +---- +rust is fast +datafusion is cool + +query TT +explain select column2 || 'is' || column3 from temp; +---- +logical_plan +01)Projection: CAST(temp.column2 AS Utf8) || Utf8("is") || CAST(temp.column3 AS Utf8) +02)--TableScan: temp projection=[column2, column3] + + +query TT +explain select column2||' is fast' from temp; +---- +logical_plan +01)Projection: CAST(temp.column2 AS Utf8) || Utf8(" is fast") +02)--TableScan: temp projection=[column2] + + +query T +select column2||column3 from temp; +---- +rustfast +datafusioncool + +query TT +explain select column2||column3 from temp; +---- +logical_plan +01)Projection: CAST(temp.column2 AS Utf8) || CAST(temp.column3 AS Utf8) +02)--TableScan: temp projection=[column2, column3] + +query T +select column2|| ' ' ||column3 from temp; +---- +rust fast +datafusion cool diff --git a/docs/source/library-user-guide/custom-table-providers.md b/docs/source/library-user-guide/custom-table-providers.md index a250e880913c..f86cea0bda95 100644 --- a/docs/source/library-user-guide/custom-table-providers.md +++ b/docs/source/library-user-guide/custom-table-providers.md @@ -146,7 +146,7 @@ For filters that can be pushed down, they'll be passed to the `scan` method as t In order to use the custom table provider, we need to register it with DataFusion. This is done by creating a `TableProvider` and registering it with the `SessionContext`. ```rust -let mut ctx = SessionContext::new(); +let ctx = SessionContext::new(); let custom_table_provider = CustomDataSource::new(); ctx.register_table("custom_table", Arc::new(custom_table_provider));