From fe576f45ee2e13d946a3313ee272dfcc202607fd Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 8 Nov 2023 17:14:38 +0300 Subject: [PATCH 1/4] Bug fix and code simplification --- datafusion/common/src/stats.rs | 22 +++++++- datafusion/core/src/datasource/statistics.rs | 6 +- datafusion/physical-plan/src/filter.rs | 34 ++++++++---- datafusion/physical-plan/src/limit.rs | 58 +++++++++++--------- 4 files changed, 82 insertions(+), 38 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index fbf639a32182..2bc625d2dfb2 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -110,6 +110,13 @@ impl Precision { _ => self, } } + + /// Mutates the precision state from exact to inexact (if present). + pub fn make_inexact(&mut self) { + if let Some(value) = self.get_value() { + *self = Precision::Inexact(value.clone()); + } + } } impl Precision { @@ -279,9 +286,11 @@ pub struct ColumnStatistics { impl ColumnStatistics { /// Column contains a single non null value (e.g constant). pub fn is_singleton(&self) -> bool { - match (self.min_value.get_value(), self.max_value.get_value()) { + match (&self.min_value, &self.max_value) { // Min and max values are the same and not infinity. - (Some(min), Some(max)) => !min.is_null() && !max.is_null() && (min == max), + (Precision::Exact(min), Precision::Exact(max)) => { + !min.is_null() && !max.is_null() && (min == max) + } (_, _) => false, } } @@ -295,6 +304,15 @@ impl ColumnStatistics { distinct_count: Precision::Absent, } } + + /// Demotes the precision state of all fields from exact to inexact (if present). + pub fn to_inexact(mut self) -> Self { + self.distinct_count.make_inexact(); + self.null_count.make_inexact(); + self.min_value.make_inexact(); + self.max_value.make_inexact(); + self + } } #[cfg(test)] diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs index 3d8248dfdeb2..695e139517cf 100644 --- a/datafusion/core/src/datasource/statistics.rs +++ b/datafusion/core/src/datasource/statistics.rs @@ -70,7 +70,11 @@ pub async fn get_statistics_with_limit( // files. This only applies when we know the number of rows. It also // currently ignores tables that have no statistics regarding the // number of rows. - if num_rows.get_value().unwrap_or(&usize::MIN) <= &limit.unwrap_or(usize::MAX) { + let conservative_num_rows = match num_rows { + Precision::Exact(nr) => nr, + _ => usize::MIN, + }; + if conservative_num_rows <= limit.unwrap_or(usize::MAX) { while let Some(current) = all_files.next().await { let (file, file_stats) = current?; result_files.push(file); diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index ce66d614721c..e10a3d40a6c4 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -251,18 +251,32 @@ fn collect_new_statistics( .. }, )| { + let null_count = match input_column_stats[idx].null_count.get_value() { + Some(nc) => Precision::Inexact(*nc), + None => Precision::Absent, + }; let closed_interval = interval.close_bounds(); + let (min_value, max_value) = + if closed_interval.lower.value.eq(&closed_interval.upper.value) { + ( + Precision::Exact(closed_interval.lower.value.clone()), + Precision::Exact(closed_interval.lower.value), + ) + } else { + ( + Precision::Inexact(closed_interval.lower.value), + Precision::Inexact(closed_interval.upper.value), + ) + }; + let distinct_count = match distinct_count.get_value() { + Some(dc) => Precision::Inexact(*dc), + None => Precision::Absent, + }; ColumnStatistics { - null_count: match input_column_stats[idx].null_count.get_value() { - Some(nc) => Precision::Inexact(*nc), - None => Precision::Absent, - }, - max_value: Precision::Inexact(closed_interval.upper.value), - min_value: Precision::Inexact(closed_interval.lower.value), - distinct_count: match distinct_count.get_value() { - Some(dc) => Precision::Inexact(*dc), - None => Precision::Absent, - }, + null_count, + max_value, + min_value, + distinct_count, } }, ) diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 945dad16b794..ddc40c063a1c 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -33,7 +33,7 @@ use arrow::array::ArrayRef; use arrow::datatypes::SchemaRef; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::stats::Precision; -use datafusion_common::{internal_err, DataFusionError, Result}; +use datafusion_common::{internal_err, ColumnStatistics, DataFusionError, Result}; use datafusion_execution::TaskContext; use futures::stream::{Stream, StreamExt}; @@ -198,46 +198,54 @@ impl ExecutionPlan for GlobalLimitExec { fetch + skip } }) - .unwrap_or(usize::MAX); - let col_stats = Statistics::unknown_column(&self.schema()); + .unwrap_or(match input_stats.num_rows { + Precision::Exact(nr) => nr, + _ => usize::MAX, + }); - let fetched_row_number_stats = Statistics { - num_rows: Precision::Exact(max_row_num), - column_statistics: col_stats.clone(), - total_byte_size: Precision::Absent, + let to_inexact_cs = |cs: Vec| { + cs.into_iter().map(|cs| cs.to_inexact()).collect() }; - let stats = match input_stats { - Statistics { - num_rows: Precision::Exact(nr), - .. - } - | Statistics { - num_rows: Precision::Inexact(nr), - .. - } => { + match input_stats.num_rows { + Precision::Exact(nr) | Precision::Inexact(nr) => { if nr <= skip { // if all input data will be skipped, return 0 - Statistics { + Ok(Statistics { num_rows: Precision::Exact(0), - column_statistics: col_stats, - total_byte_size: Precision::Absent, - } + total_byte_size: Precision::Exact(0), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Absent, + min_value: Precision::Absent, + distinct_count: Precision::Exact(0) + }; + input_stats.column_statistics.len() + ], + }) } else if nr <= max_row_num { // if the input does not reach the "fetch" globally, return input stats - input_stats + Ok(input_stats) } else { // if the input is greater than the "fetch", the num_row will be the "fetch", // but we won't be able to predict the other statistics - fetched_row_number_stats + Ok(Statistics { + num_rows: Precision::Exact(max_row_num), + total_byte_size: input_stats.total_byte_size.to_inexact(), + column_statistics: to_inexact_cs(input_stats.column_statistics), + }) } } _ => { // the result output row number will always be no greater than the limit number - fetched_row_number_stats + Ok(Statistics { + num_rows: Precision::Exact(max_row_num), + total_byte_size: Precision::Absent, + column_statistics: to_inexact_cs(input_stats.column_statistics), + }) } - }; - Ok(stats) + } } } From d60f3e3dbe00454ec0fccfad337843d130c255c0 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Wed, 8 Nov 2023 19:28:12 +0300 Subject: [PATCH 2/4] Remove limit stats changes --- datafusion/common/src/stats.rs | 16 ---- datafusion/physical-plan/src/limit.rs | 106 +++++++++++++++----------- 2 files changed, 60 insertions(+), 62 deletions(-) diff --git a/datafusion/common/src/stats.rs b/datafusion/common/src/stats.rs index 2bc625d2dfb2..2e799c92bea7 100644 --- a/datafusion/common/src/stats.rs +++ b/datafusion/common/src/stats.rs @@ -110,13 +110,6 @@ impl Precision { _ => self, } } - - /// Mutates the precision state from exact to inexact (if present). - pub fn make_inexact(&mut self) { - if let Some(value) = self.get_value() { - *self = Precision::Inexact(value.clone()); - } - } } impl Precision { @@ -304,15 +297,6 @@ impl ColumnStatistics { distinct_count: Precision::Absent, } } - - /// Demotes the precision state of all fields from exact to inexact (if present). - pub fn to_inexact(mut self) -> Self { - self.distinct_count.make_inexact(); - self.null_count.make_inexact(); - self.min_value.make_inexact(); - self.max_value.make_inexact(); - self - } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 2d65893251b6..c8427f9bc2c6 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -33,7 +33,7 @@ use arrow::array::ArrayRef; use arrow::datatypes::SchemaRef; use arrow::record_batch::{RecordBatch, RecordBatchOptions}; use datafusion_common::stats::Precision; -use datafusion_common::{internal_err, ColumnStatistics, DataFusionError, Result}; +use datafusion_common::{internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use futures::stream::{Stream, StreamExt}; @@ -188,64 +188,78 @@ impl ExecutionPlan for GlobalLimitExec { fn statistics(&self) -> Result { let input_stats = self.input.statistics()?; let skip = self.skip; - // the maximum row number needs to be fetched - let max_row_num = self - .fetch - .map(|fetch| { - if fetch >= usize::MAX - skip { - usize::MAX - } else { - fetch + skip - } - }) - .unwrap_or(match input_stats.num_rows { - Precision::Exact(nr) => nr, - _ => usize::MAX, - }); + let col_stats = Statistics::unknown_column(&self.schema()); + let fetch = self.fetch.unwrap_or(usize::MAX); - let to_inexact_cs = |cs: Vec| { - cs.into_iter().map(|cs| cs.to_inexact()).collect() + let mut fetched_row_number_stats = Statistics { + num_rows: Precision::Exact(fetch), + column_statistics: col_stats.clone(), + total_byte_size: Precision::Absent, }; - match input_stats.num_rows { - Precision::Exact(nr) | Precision::Inexact(nr) => { + let stats = match input_stats { + Statistics { + num_rows: Precision::Exact(nr), + .. + } + | Statistics { + num_rows: Precision::Inexact(nr), + .. + } => { if nr <= skip { // if all input data will be skipped, return 0 - Ok(Statistics { + let mut skip_all_rows_stats = Statistics { num_rows: Precision::Exact(0), - total_byte_size: Precision::Exact(0), - column_statistics: vec![ - ColumnStatistics { - null_count: Precision::Exact(0), - max_value: Precision::Absent, - min_value: Precision::Absent, - distinct_count: Precision::Exact(0) - }; - input_stats.column_statistics.len() - ], - }) - } else if nr <= max_row_num { - // if the input does not reach the "fetch" globally, return input stats - Ok(input_stats) + column_statistics: col_stats, + total_byte_size: Precision::Absent, + }; + if !input_stats.num_rows.is_exact().unwrap_or(false) { + // The input stats are inexact, so the output stats must be too. + skip_all_rows_stats = skip_all_rows_stats.into_inexact(); + } + skip_all_rows_stats + } else if nr <= fetch && self.skip == 0 { + // if the input does not reach the "fetch" globally, and "skip" is zero + // (meaning the input and output are identical), return input stats. + // Can input_stats still be used, but adjusted, in the "skip != 0" case? + input_stats + } else if nr - skip <= fetch { + // after "skip" input rows are skipped, the remaining rows are less than or equal to the + // "fetch" values, so `num_rows` must equal the remaining rows + let remaining_rows: usize = nr - skip; + let mut skip_some_rows_stats = Statistics { + num_rows: Precision::Exact(remaining_rows), + column_statistics: col_stats.clone(), + total_byte_size: Precision::Absent, + }; + if !input_stats.num_rows.is_exact().unwrap_or(false) { + // The input stats are inexact, so the output stats must be too. + skip_some_rows_stats = skip_some_rows_stats.into_inexact(); + } + skip_some_rows_stats } else { // if the input is greater than "fetch+skip", the num_rows will be the "fetch", // but we won't be able to predict the other statistics - Ok(Statistics { - num_rows: Precision::Exact(max_row_num), - total_byte_size: input_stats.total_byte_size.to_inexact(), - column_statistics: to_inexact_cs(input_stats.column_statistics), - }) + if !input_stats.num_rows.is_exact().unwrap_or(false) + || self.fetch.is_none() + { + // If the input stats are inexact, the output stats must be too. + // If the fetch value is `usize::MAX` because no LIMIT was specified, + // we also can't represent it as an exact value. + fetched_row_number_stats = + fetched_row_number_stats.into_inexact(); + } + fetched_row_number_stats } } _ => { - // the result output row number will always be no greater than the limit number - Ok(Statistics { - num_rows: Precision::Exact(max_row_num), - total_byte_size: Precision::Absent, - column_statistics: to_inexact_cs(input_stats.column_statistics), - }) + // The result output `num_rows` will always be no greater than the limit number. + // Should `num_rows` be marked as `Absent` here when the `fetch` value is large, + // as the actual `num_rows` may be far away from the `fetch` value? + fetched_row_number_stats.into_inexact() } - } + }; + Ok(stats) } } From 5b0e2af3e06bc177e72f0221fb25c2d4cbba87e9 Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Thu, 9 Nov 2023 09:43:46 +0300 Subject: [PATCH 3/4] Test added --- datafusion/physical-plan/src/filter.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index e10a3d40a6c4..0d5126aa3b7f 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -977,4 +977,26 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_statistics_with_constant_column() -> Result<()> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let input = Arc::new(StatisticsExec::new( + Statistics::new_unknown(&schema), + schema, + )); + // WHERE a = 10 + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))), + )); + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, input)?); + let filter_statistics = filter.statistics()?; + // First column is "a", and it is a column with only one value after the filter. + assert!(filter_statistics.column_statistics[0].is_singleton()); + + Ok(()) + } } From b9807d2578e1561b2f6e756bb5b864bdd0df154f Mon Sep 17 00:00:00 2001 From: berkaysynnada Date: Thu, 9 Nov 2023 13:00:33 +0300 Subject: [PATCH 4/4] Reduce code diff --- datafusion/physical-plan/src/filter.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 0d5126aa3b7f..0c44b367e514 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -251,16 +251,12 @@ fn collect_new_statistics( .. }, )| { - let null_count = match input_column_stats[idx].null_count.get_value() { - Some(nc) => Precision::Inexact(*nc), - None => Precision::Absent, - }; let closed_interval = interval.close_bounds(); let (min_value, max_value) = if closed_interval.lower.value.eq(&closed_interval.upper.value) { ( - Precision::Exact(closed_interval.lower.value.clone()), Precision::Exact(closed_interval.lower.value), + Precision::Exact(closed_interval.upper.value), ) } else { ( @@ -268,15 +264,17 @@ fn collect_new_statistics( Precision::Inexact(closed_interval.upper.value), ) }; - let distinct_count = match distinct_count.get_value() { - Some(dc) => Precision::Inexact(*dc), - None => Precision::Absent, - }; ColumnStatistics { - null_count, + null_count: match input_column_stats[idx].null_count.get_value() { + Some(nc) => Precision::Inexact(*nc), + None => Precision::Absent, + }, max_value, min_value, - distinct_count, + distinct_count: match distinct_count.get_value() { + Some(dc) => Precision::Inexact(*dc), + None => Precision::Absent, + }, } }, )