Skip to content

Commit

Permalink
Make filter selectivity for statistics configurable (apache#8243)
Browse files Browse the repository at this point in the history
* Turning filter selectivity as a configurable parameter

* Renaming API to be more consistent with struct value

* Adding a filter with custom selectivity
  • Loading branch information
edmondop authored Dec 5, 2023
1 parent e322839 commit c7a6965
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 6 deletions.
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,11 @@ config_namespace! {
/// The maximum estimated size in bytes for one input side of a HashJoin
/// will be collected into a single partition
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024

/// The default filter selectivity used by Filter Statistics
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
pub default_filter_selectivity: u8, default = 20
}
}

Expand Down Expand Up @@ -877,6 +882,7 @@ config_field!(String);
config_field!(bool);
config_field!(usize);
config_field!(f64);
config_field!(u8);
config_field!(u64);

/// An implementation trait used to recursively walk configuration
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ fn try_swapping_with_filter(
};

FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?)
.and_then(|e| {
let selectivity = filter.default_selectivity();
e.with_default_selectivity(selectivity)
})
.map(|e| Some(Arc::new(e) as _))
}

Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,9 @@ impl DefaultPhysicalPlanner {
&input_schema,
session_state,
)?;
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
let filter = FilterExec::try_new(runtime_expr, physical_input)?;
Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
}
LogicalPlan::Union(Union { inputs, .. }) => {
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;
Expand Down
78 changes: 74 additions & 4 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct FilterExec {
input: Arc<dyn ExecutionPlan>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Selectivity for statistics. 0 = no rows, 100 all rows
default_selectivity: u8,
}

impl FilterExec {
Expand All @@ -74,13 +76,25 @@ impl FilterExec {
predicate,
input: input.clone(),
metrics: ExecutionPlanMetricsSet::new(),
default_selectivity: 20,
}),
other => {
plan_err!("Filter predicate must return boolean values, not {other:?}")
}
}
}

pub fn with_default_selectivity(
mut self,
default_selectivity: u8,
) -> Result<Self, DataFusionError> {
if default_selectivity > 100 {
return plan_err!("Default flter selectivity needs to be less than 100");
}
self.default_selectivity = default_selectivity;
Ok(self)
}

/// The expression to filter on. This expression must evaluate to a boolean value.
pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
&self.predicate
Expand All @@ -90,6 +104,11 @@ impl FilterExec {
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}

/// The default selectivity
pub fn default_selectivity(&self) -> u8 {
self.default_selectivity
}
}

impl DisplayAs for FilterExec {
Expand Down Expand Up @@ -166,6 +185,10 @@ impl ExecutionPlan for FilterExec {
mut children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
FilterExec::try_new(self.predicate.clone(), children.swap_remove(0))
.and_then(|e| {
let selectivity = e.default_selectivity();
e.with_default_selectivity(selectivity)
})
.map(|e| Arc::new(e) as _)
}

Expand Down Expand Up @@ -196,10 +219,7 @@ impl ExecutionPlan for FilterExec {
let input_stats = self.input.statistics()?;
let schema = self.schema();
if !check_support(predicate, &schema) {
// assume filter selects 20% of rows if we cannot do anything smarter
// tracking issue for making this configurable:
// https://github.com/apache/arrow-datafusion/issues/8133
let selectivity = 0.2_f64;
let selectivity = self.default_selectivity as f64 / 100.0;
let mut stats = input_stats.into_inexact();
stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
stats.total_byte_size = stats
Expand Down Expand Up @@ -987,4 +1007,54 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_validation_filter_selectivity() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
let input = Arc::new(StatisticsExec::new(
Statistics::new_unknown(&schema),
schema,
));
// WHERE a = 10
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
));
let filter = FilterExec::try_new(predicate, input)?;
assert!(filter.with_default_selectivity(120).is_err());
Ok(())
}

#[tokio::test]
async fn test_custom_filter_selectivity() -> Result<()> {
// Need a decimal to trigger inexact selectivity
let schema =
Schema::new(vec![Field::new("a", DataType::Decimal128(2, 3), false)]);
let input = Arc::new(StatisticsExec::new(
Statistics {
num_rows: Precision::Inexact(1000),
total_byte_size: Precision::Inexact(4000),
column_statistics: vec![ColumnStatistics {
..Default::default()
}],
},
schema,
));
// WHERE a = 10
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Eq,
Arc::new(Literal::new(ScalarValue::Decimal128(Some(10), 10, 10))),
));
let filter = FilterExec::try_new(predicate, input)?;
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(200));
assert_eq!(statistics.total_byte_size, Precision::Inexact(800));
let filter = filter.with_default_selectivity(40)?;
let statistics = filter.statistics()?;
assert_eq!(statistics.num_rows, Precision::Inexact(400));
assert_eq!(statistics.total_byte_size, Precision::Inexact(1600));
Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,7 @@ message PhysicalNegativeNode {
message FilterExecNode {
PhysicalPlanNode input = 1;
PhysicalExprNode expr = 2;
uint32 default_filter_selectivity = 3;
}

message FileGroup {
Expand Down
20 changes: 20 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,16 @@ impl AsExecutionPlan for PhysicalPlanNode {
.to_owned(),
)
})?;
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
let filter_selectivity = filter.default_filter_selectivity.try_into();
let filter = FilterExec::try_new(predicate, input)?;
match filter_selectivity {
Ok(filter_selectivity) => Ok(Arc::new(
filter.with_default_selectivity(filter_selectivity)?,
)),
Err(_) => Err(DataFusionError::Internal(
"filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
)),
}
}
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
parse_protobuf_file_scan_config(
Expand Down Expand Up @@ -988,6 +997,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
protobuf::FilterExecNode {
input: Some(Box::new(input)),
expr: Some(exec.predicate().clone().try_into()?),
default_filter_selectivity: exec.default_selectivity() as u32,
},
))),
});
Expand Down
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ datafusion.explain.logical_plan_only false
datafusion.explain.physical_plan_only false
datafusion.explain.show_statistics false
datafusion.optimizer.allow_symmetric_joins_without_pruning true
datafusion.optimizer.default_filter_selectivity 20
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_topk_aggregation true
Expand Down Expand Up @@ -261,6 +262,7 @@ datafusion.explain.logical_plan_only false When set to true, the explain stateme
datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans
datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans
datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors.
datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected).
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys |
| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory |
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |
Expand Down

0 comments on commit c7a6965

Please sign in to comment.