Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/push_down_filter
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed May 16, 2024
2 parents a44fbc5 + 842f393 commit 7e02ae1
Show file tree
Hide file tree
Showing 71 changed files with 1,225 additions and 1,644 deletions.
15 changes: 5 additions & 10 deletions datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{cast::as_float64_array, ScalarValue};
use datafusion_expr::{
function::AccumulatorArgs, Accumulator, AggregateUDF, AggregateUDFImpl,
GroupsAccumulator, Signature,
function::{AccumulatorArgs, StateFieldsArgs},
Accumulator, AggregateUDF, AggregateUDFImpl, GroupsAccumulator, Signature,
};

/// This example shows how to use the full AggregateUDFImpl API to implement a user
Expand Down Expand Up @@ -92,21 +92,16 @@ impl AggregateUDFImpl for GeoMeanUdaf {
}

/// This is the description of the state. accumulator's state() must match the types here.
fn state_fields(
&self,
_name: &str,
value_type: DataType,
_ordering_fields: Vec<arrow_schema::Field>,
) -> Result<Vec<arrow_schema::Field>> {
fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<arrow_schema::Field>> {
Ok(vec![
Field::new("prod", value_type, true),
Field::new("prod", args.return_type.clone(), true),
Field::new("n", DataType::UInt32, true),
])
}

/// Tell DataFusion that this aggregate supports the more performant `GroupsAccumulator`
/// which is used for cases when there are grouping columns in the query
fn groups_accumulator_supported(&self) -> bool {
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
true
}

Expand Down
11 changes: 3 additions & 8 deletions datafusion-examples/examples/simplify_udaf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use arrow_schema::{Field, Schema};
use datafusion::{arrow::datatypes::DataType, logical_expr::Volatility};
use datafusion_expr::function::AggregateFunctionSimplification;
use datafusion_expr::function::{AggregateFunctionSimplification, StateFieldsArgs};
use datafusion_expr::simplify::SimplifyInfo;

use std::{any::Any, sync::Arc};
Expand Down Expand Up @@ -70,16 +70,11 @@ impl AggregateUDFImpl for BetterAvgUdaf {
unimplemented!("should not be invoked")
}

fn state_fields(
&self,
_name: &str,
_value_type: DataType,
_ordering_fields: Vec<arrow_schema::Field>,
) -> Result<Vec<arrow_schema::Field>> {
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<arrow_schema::Field>> {
unimplemented!("should not be invoked")
}

fn groups_accumulator_supported(&self) -> bool {
fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool {
true
}

Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use arrow::ipc::reader::FileReader;
use arrow::ipc::writer::IpcWriteOptions;
use arrow::ipc::{root_as_message, CompressionType};
use arrow_schema::{ArrowError, Schema, SchemaRef};
use datafusion_common::{not_impl_err, DataFusionError, FileType, Statistics};
use datafusion_common::{not_impl_err, DataFusionError, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
Expand Down Expand Up @@ -136,10 +136,6 @@ impl FileFormat for ArrowFormat {
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
FileType::ARROW
}
}

/// Implements [`DataSink`] for writing to arrow_ipc files
Expand Down
5 changes: 0 additions & 5 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::FileType;
use datafusion_physical_expr::PhysicalExpr;
use object_store::{GetResultPayload, ObjectMeta, ObjectStore};

Expand Down Expand Up @@ -89,10 +88,6 @@ impl FileFormat for AvroFormat {
let exec = AvroExec::new(conf);
Ok(Arc::new(exec))
}

fn file_type(&self) -> FileType {
FileType::AVRO
}
}

#[cfg(test)]
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use arrow::datatypes::SchemaRef;
use arrow::datatypes::{DataType, Field, Fields, Schema};
use datafusion_common::config::CsvOptions;
use datafusion_common::file_options::csv_writer::CsvWriterOptions;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_common::{exec_err, not_impl_err, DataFusionError};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;
Expand Down Expand Up @@ -280,10 +280,6 @@ impl FileFormat for CsvFormat {
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
FileType::CSV
}
}

impl CsvFormat {
Expand Down Expand Up @@ -549,8 +545,9 @@ mod tests {

use arrow::compute::concat_batches;
use datafusion_common::cast::as_string_array;
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;
use datafusion_common::{internal_err, GetExt};
use datafusion_common::{FileType, GetExt};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_expr::{col, lit};

Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter};
use arrow_array::RecordBatch;
use datafusion_common::config::JsonOptions;
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{not_impl_err, FileType};
use datafusion_common::not_impl_err;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use datafusion_physical_plan::metrics::MetricsSet;
Expand Down Expand Up @@ -184,10 +184,6 @@ impl FileFormat for JsonFormat {
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
FileType::JSON
}
}

impl Default for JsonSerializer {
Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};

use datafusion_common::{not_impl_err, FileType};
use datafusion_common::not_impl_err;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};

use async_trait::async_trait;
Expand Down Expand Up @@ -104,9 +104,6 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}

/// Returns the FileType corresponding to this FileFormat
fn file_type(&self) -> FileType;
}

#[cfg(test)]
Expand Down
6 changes: 1 addition & 5 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion_common::config::TableParquetOptions;
use datafusion_common::file_options::parquet_writer::ParquetWriterOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{
exec_err, internal_datafusion_err, not_impl_err, DataFusionError, FileType,
exec_err, internal_datafusion_err, not_impl_err, DataFusionError,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::TaskContext;
Expand Down Expand Up @@ -286,10 +286,6 @@ impl FileFormat for ParquetFormat {
order_requirements,
)) as _)
}

fn file_type(&self) -> FileType {
FileType::PARQUET
}
}

fn summarize_min_max(
Expand Down
76 changes: 44 additions & 32 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::udaf::AggregateFunctionExpr;

/// Optimizer that uses available statistics for aggregate functions
#[derive(Default)]
Expand Down Expand Up @@ -57,13 +58,9 @@ impl PhysicalOptimizerRule for AggregateStatistics {
let mut projections = vec![];
for expr in partial_agg_exec.aggr_expr() {
if let Some((non_null_rows, name)) =
take_optimizable_column_count(&**expr, &stats)
take_optimizable_column_and_table_count(&**expr, &stats)
{
projections.push((expressions::lit(non_null_rows), name.to_owned()));
} else if let Some((num_rows, name)) =
take_optimizable_table_count(&**expr, &stats)
{
projections.push((expressions::lit(num_rows), name.to_owned()));
} else if let Some((min, name)) = take_optimizable_min(&**expr, &stats) {
projections.push((expressions::lit(min), name.to_owned()));
} else if let Some((max, name)) = take_optimizable_max(&**expr, &stats) {
Expand Down Expand Up @@ -137,43 +134,48 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>>
None
}

/// If this agg_expr is a count that is exactly defined in the statistics, return it.
fn take_optimizable_table_count(
/// If this agg_expr is a count that can be exactly derived from the statistics, return it.
fn take_optimizable_column_and_table_count(
agg_expr: &dyn AggregateExpr,
stats: &Statistics,
) -> Option<(ScalarValue, String)> {
if let (&Precision::Exact(num_rows), Some(casted_expr)) = (
&stats.num_rows,
agg_expr.as_any().downcast_ref::<expressions::Count>(),
) {
// TODO implementing Eq on PhysicalExpr would help a lot here
if casted_expr.expressions().len() == 1 {
if let Some(lit_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Literal>()
{
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some((
ScalarValue::Int64(Some(num_rows as i64)),
casted_expr.name().to_owned(),
));
let col_stats = &stats.column_statistics;
if let Some(agg_expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
if agg_expr.fun().name() == "COUNT" && !agg_expr.is_distinct() {
if let Precision::Exact(num_rows) = stats.num_rows {
let exprs = agg_expr.expressions();
if exprs.len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) =
exprs[0].as_any().downcast_ref::<expressions::Column>()
{
let current_val = &col_stats[col_expr.index()].null_count;
if let &Precision::Exact(val) = current_val {
return Some((
ScalarValue::Int64(Some((num_rows - val) as i64)),
agg_expr.name().to_string(),
));
}
} else if let Some(lit_expr) =
exprs[0].as_any().downcast_ref::<expressions::Literal>()
{
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some((
ScalarValue::Int64(Some(num_rows as i64)),
agg_expr.name().to_string(),
));
}
}
}
}
}
}
None
}

/// If this agg_expr is a count that can be exactly derived from the statistics, return it.
fn take_optimizable_column_count(
agg_expr: &dyn AggregateExpr,
stats: &Statistics,
) -> Option<(ScalarValue, String)> {
let col_stats = &stats.column_statistics;
if let (&Precision::Exact(num_rows), Some(casted_expr)) = (
// TODO: Remove this after revmoing Builtin Count
else if let (&Precision::Exact(num_rows), Some(casted_expr)) = (
&stats.num_rows,
agg_expr.as_any().downcast_ref::<expressions::Count>(),
) {
// TODO implementing Eq on PhysicalExpr would help a lot here
if casted_expr.expressions().len() == 1 {
// TODO optimize with exprs other than Column
if let Some(col_expr) = casted_expr.expressions()[0]
Expand All @@ -187,6 +189,16 @@ fn take_optimizable_column_count(
casted_expr.name().to_string(),
));
}
} else if let Some(lit_expr) = casted_expr.expressions()[0]
.as_any()
.downcast_ref::<expressions::Literal>()
{
if lit_expr.value() == &COUNT_STAR_EXPANSION {
return Some((
ScalarValue::Int64(Some(num_rows as i64)),
casted_expr.name().to_owned(),
));
}
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,6 @@ impl PhysicalOptimizer {
// Remove the ancillary output requirement operator since we are done with the planning
// phase.
Arc::new(OutputRequirements::new_remove_mode()),
// The PipelineChecker rule will reject non-runnable query plans that use
// pipeline-breaking operators on infinite input(s). The rule generates a
// diagnostic error message when this happens. It makes no changes to the
// given query plan; i.e. it only acts as a final gatekeeping rule.
Arc::new(PipelineChecker::new()),
// The aggregation limiter will try to find situations where the accumulator count
// is not tied to the cardinality, i.e. when the output of the aggregation is passed
// into an `order by max(x) limit y`. In this case it will copy the limit value down
Expand All @@ -129,6 +124,11 @@ impl PhysicalOptimizer {
// are not present, the load of executors such as join or union will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
// The PipelineChecker rule will reject non-runnable query plans that use
// pipeline-breaking operators on infinite input(s). The rule generates a
// diagnostic error message when this happens. It makes no changes to the
// given query plan; i.e. it only acts as a final gatekeeping rule.
Arc::new(PipelineChecker::new()),
];

Self::with_rules(rules)
Expand Down
31 changes: 8 additions & 23 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,31 +252,15 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
func_def,
distinct,
args,
filter,
filter: _,
order_by,
null_treatment: _,
}) => match func_def {
AggregateFunctionDefinition::BuiltIn(..) => create_function_physical_name(
func_def.name(),
*distinct,
args,
order_by.as_ref(),
),
AggregateFunctionDefinition::UDF(fun) => {
// TODO: Add support for filter by in AggregateUDF
if filter.is_some() {
return exec_err!(
"aggregate expression with filter is not supported"
);
}

let names = args
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?;
Ok(format!("{}({})", fun.name(), names.join(",")))
}
},
}) => create_function_physical_name(
func_def.name(),
*distinct,
args,
order_by.as_ref(),
),
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Ok(format!(
"ROLLUP ({})",
Expand Down Expand Up @@ -1941,6 +1925,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
physical_input_schema,
name,
ignore_nulls,
*distinct,
)?;
(agg_expr, filter, physical_sort_exprs)
}
Expand Down
Loading

0 comments on commit 7e02ae1

Please sign in to comment.