Skip to content

Commit

Permalink
Enforce sorting handle fetchable operators, add option to repartition…
Browse files Browse the repository at this point in the history
… based on row count estimates (#11875)

* Tmp

* Minor changes

* Minor changes

* Minor changes

* Implement top down recursion with delete check

* Minor changes

* Minor changes

* Address reviews

* Update comments

* Minor changes

* Make test deterministic

* Add fetch info to the statistics

* Enforce distribution use inexact count estimate also.

* Minor changes

* Minor changes

* Minor changes

* Do not add unnecessary hash partitioning

* Minor changes

* Add config option to use inexact row number estimates during planning

* Update config

* Minor changes

* Minor changes

* Final review

* Address reviews

* Add handling for sort removal with fetch

* Fix linter errors

* Minor changes

* Update config

* Cleanup stats under fetch

* Update SLT comment

---------

Co-authored-by: Mehmet Ozan Kabak <[email protected]>
  • Loading branch information
mustafasrepo and ozankabak authored Aug 10, 2024
1 parent 12aa82c commit 79fa6f9
Show file tree
Hide file tree
Showing 21 changed files with 643 additions and 264 deletions.
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,14 @@ config_namespace! {
/// Number of input rows partial aggregation partition should process, before
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000

/// Should DataFusion use row number estimates at the input to decide
/// whether increasing parallelism is beneficial or not. By default,
/// only exact row numbers (not estimates) are used for this decision.
/// Setting this flag to `true` will likely produce better plans.
/// if the source of statistics is accurate.
/// We plan to make this the default in the future.
pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
}
}

Expand Down
122 changes: 104 additions & 18 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
use std::fmt::{self, Debug, Display};

use crate::ScalarValue;
use crate::{Result, ScalarValue};

use arrow_schema::Schema;
use arrow_schema::{Schema, SchemaRef};

/// Represents a value with a degree of certainty. `Precision` is used to
/// propagate information the precision of statistical values.
Expand Down Expand Up @@ -247,21 +247,96 @@ impl Statistics {

/// If the exactness of a [`Statistics`] instance is lost, this function relaxes
/// the exactness of all information by converting them [`Precision::Inexact`].
pub fn into_inexact(self) -> Self {
Statistics {
num_rows: self.num_rows.to_inexact(),
total_byte_size: self.total_byte_size.to_inexact(),
column_statistics: self
.column_statistics
.into_iter()
.map(|cs| ColumnStatistics {
null_count: cs.null_count.to_inexact(),
max_value: cs.max_value.to_inexact(),
min_value: cs.min_value.to_inexact(),
distinct_count: cs.distinct_count.to_inexact(),
})
.collect::<Vec<_>>(),
pub fn to_inexact(mut self) -> Self {
self.num_rows = self.num_rows.to_inexact();
self.total_byte_size = self.total_byte_size.to_inexact();
self.column_statistics = self
.column_statistics
.into_iter()
.map(|s| s.to_inexact())
.collect();
self
}

/// Calculates the statistics after `fetch` and `skip` operations apply.
/// Here, `self` denotes per-partition statistics. Use the `n_partitions`
/// parameter to compute global statistics in a multi-partition setting.
pub fn with_fetch(
mut self,
schema: SchemaRef,
fetch: Option<usize>,
skip: usize,
n_partitions: usize,
) -> Result<Self> {
let fetch_val = fetch.unwrap_or(usize::MAX);

self.num_rows = match self {
Statistics {
num_rows: Precision::Exact(nr),
..
}
| Statistics {
num_rows: Precision::Inexact(nr),
..
} => {
// Here, the inexact case gives us an upper bound on the number of rows.
if nr <= skip {
// All input data will be skipped:
Precision::Exact(0)
} else if nr <= fetch_val && skip == 0 {
// If the input does not reach the `fetch` globally, and `skip`
// is zero (meaning the input and output are identical), return
// input stats as is.
// TODO: Can input stats still be used, but adjusted, when `skip`
// is non-zero?
return Ok(self);
} else if nr - skip <= fetch_val {
// After `skip` input rows are skipped, the remaining rows are
// less than or equal to the `fetch` values, so `num_rows` must
// equal the remaining rows.
check_num_rows(
(nr - skip).checked_mul(n_partitions),
// We know that we have an estimate for the number of rows:
self.num_rows.is_exact().unwrap(),
)
} else {
// At this point we know that we were given a `fetch` value
// as the `None` case would go into the branch above. Since
// the input has more rows than `fetch + skip`, the number
// of rows will be the `fetch`, but we won't be able to
// predict the other statistics.
check_num_rows(
fetch_val.checked_mul(n_partitions),
// We know that we have an estimate for the number of rows:
self.num_rows.is_exact().unwrap(),
)
}
}
Statistics {
num_rows: Precision::Absent,
..
} => check_num_rows(fetch.and_then(|v| v.checked_mul(n_partitions)), false),
};
self.column_statistics = Statistics::unknown_column(&schema);
self.total_byte_size = Precision::Absent;
Ok(self)
}
}

/// Creates an estimate of the number of rows in the output using the given
/// optional value and exactness flag.
fn check_num_rows(value: Option<usize>, is_exact: bool) -> Precision<usize> {
if let Some(value) = value {
if is_exact {
Precision::Exact(value)
} else {
// If the input stats are inexact, so are the output stats.
Precision::Inexact(value)
}
} else {
// If the estimate is not available (e.g. due to an overflow), we can
// not produce a reliable estimate.
Precision::Absent
}
}

Expand Down Expand Up @@ -336,14 +411,25 @@ impl ColumnStatistics {
}

/// Returns a [`ColumnStatistics`] instance having all [`Precision::Absent`] parameters.
pub fn new_unknown() -> ColumnStatistics {
ColumnStatistics {
pub fn new_unknown() -> Self {
Self {
null_count: Precision::Absent,
max_value: Precision::Absent,
min_value: Precision::Absent,
distinct_count: Precision::Absent,
}
}

/// If the exactness of a [`ColumnStatistics`] instance is lost, this
/// function relaxes the exactness of all information by converting them
/// [`Precision::Inexact`].
pub fn to_inexact(mut self) -> Self {
self.null_count = self.null_count.to_inexact();
self.max_value = self.max_value.to_inexact();
self.min_value = self.min_value.to_inexact();
self.distinct_count = self.distinct_count.to_inexact();
self
}
}

#[cfg(test)]
Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3000,13 +3000,13 @@ mod tests {
.await?
.select_columns(&["c1", "c2", "c3"])?
.filter(col("c2").eq(lit(3)).and(col("c1").eq(lit("a"))))?
.limit(0, Some(1))?
.sort(vec![
// make the test deterministic
col("c1").sort(true, true),
col("c2").sort(true, true),
col("c3").sort(true, true),
])?
.limit(0, Some(1))?
.with_column("sum", col("c2") + col("c3"))?;

let df_sum_renamed = df
Expand All @@ -3022,11 +3022,11 @@ mod tests {

assert_batches_sorted_eq!(
[
"+-----+-----+----+-------+",
"| one | two | c3 | total |",
"+-----+-----+----+-------+",
"| a | 3 | 13 | 16 |",
"+-----+-----+----+-------+"
"+-----+-----+-----+-------+",
"| one | two | c3 | total |",
"+-----+-----+-----+-------+",
"| a | 3 | -72 | -69 |",
"+-----+-----+-----+-------+",
],
&df_sum_renamed
);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ pub async fn get_statistics_with_limit(
// If we still have files in the stream, it means that the limit kicked
// in, and the statistic could have been different had we processed the
// files in a different order.
statistics = statistics.into_inexact()
statistics = statistics.to_inexact()
}

Ok((result_files, statistics))
Expand Down
Loading

0 comments on commit 79fa6f9

Please sign in to comment.