Skip to content

Commit

Permalink
Move PartitionSearchMode into datafusion_physical_plan, rename to `…
Browse files Browse the repository at this point in the history
…InputOrderMode` (apache#8364)

* Move PartitionSearchMode into datafusion_physical_plan

* Improve  comments

* Rename to InputOrderMode

* Update prost
  • Loading branch information
alamb authored Dec 5, 2023
1 parent 4ceb2de commit e322839
Show file tree
Hide file tree
Showing 15 changed files with 188 additions and 175 deletions.
7 changes: 4 additions & 3 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::windows::{
get_best_fitting_window, BoundedWindowAggExec, WindowAggExec,
};
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
use crate::physical_plan::{
with_new_children_if_necessary, Distribution, ExecutionPlan, InputOrderMode,
};

use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};

use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::windows::PartitionSearchMode;
use itertools::izip;

/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
Expand Down Expand Up @@ -611,7 +612,7 @@ fn analyze_window_sort_removal(
window_expr.to_vec(),
window_child,
partitionby_exprs.to_vec(),
PartitionSearchMode::Sorted,
InputOrderMode::Sorted,
)?) as _
} else {
Arc::new(WindowAggExec::try_new(
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::create_window_expr;
use crate::physical_plan::{ExecutionPlan, Partitioning};
use crate::physical_plan::{ExecutionPlan, InputOrderMode, Partitioning};
use crate::prelude::{CsvReadOptions, SessionContext};

use arrow_schema::{Schema, SchemaRef, SortOptions};
Expand All @@ -44,7 +44,6 @@ use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use datafusion_physical_plan::windows::PartitionSearchMode;

use async_trait::async_trait;

Expand Down Expand Up @@ -240,7 +239,7 @@ pub fn bounded_window_exec(
.unwrap()],
input.clone(),
vec![],
PartitionSearchMode::Sorted,
InputOrderMode::Sorted,
)
.unwrap(),
)
Expand Down
10 changes: 4 additions & 6 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,10 @@ use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::union::UnionExec;
use crate::physical_plan::unnest::UnnestExec;
use crate::physical_plan::values::ValuesExec;
use crate::physical_plan::windows::{
BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
};
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{
aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan, Partitioning,
PhysicalExpr, WindowExpr,
aggregates, displayable, udaf, windows, AggregateExpr, ExecutionPlan, InputOrderMode,
Partitioning, PhysicalExpr, WindowExpr,
};

use arrow::compute::SortOptions;
Expand Down Expand Up @@ -761,7 +759,7 @@ impl DefaultPhysicalPlanner {
window_expr,
input_exec,
physical_partition_keys,
PartitionSearchMode::Sorted,
InputOrderMode::Sorted,
)?)
} else {
Arc::new(WindowAggExec::try_new(
Expand Down
12 changes: 5 additions & 7 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use arrow::util::pretty::pretty_format_batches;
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::windows::{
create_window_expr, BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
create_window_expr, BoundedWindowAggExec, WindowAggExec,
};
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::physical_plan::{collect, ExecutionPlan, InputOrderMode};
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::type_coercion::aggregates::coerce_types;
Expand All @@ -43,9 +43,7 @@ use hashbrown::HashMap;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};

use datafusion_physical_plan::windows::PartitionSearchMode::{
Linear, PartiallySorted, Sorted,
};
use datafusion_physical_plan::InputOrderMode::{Linear, PartiallySorted, Sorted};

#[tokio::test(flavor = "multi_thread", worker_threads = 16)]
async fn window_bounded_window_random_comparison() -> Result<()> {
Expand Down Expand Up @@ -385,9 +383,9 @@ async fn run_window_test(
random_seed: u64,
partition_by_columns: Vec<&str>,
orderby_columns: Vec<&str>,
search_mode: PartitionSearchMode,
search_mode: InputOrderMode,
) -> Result<()> {
let is_linear = !matches!(search_mode, PartitionSearchMode::Sorted);
let is_linear = !matches!(search_mode, InputOrderMode::Sorted);
let mut rng = StdRng::seed_from_u64(random_seed);
let schema = input1[0].schema();
let session_config = SessionConfig::new().with_batch_size(50);
Expand Down
30 changes: 15 additions & 15 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@ use crate::aggregates::{
};

use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
get_ordered_partition_by_indices, get_window_mode, PartitionSearchMode,
};
use crate::windows::{get_ordered_partition_by_indices, get_window_mode};
use crate::{
DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
DisplayFormatType, Distribution, ExecutionPlan, InputOrderMode, Partitioning,
SendableRecordBatchStream, Statistics,
};

Expand Down Expand Up @@ -304,7 +302,9 @@ pub struct AggregateExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
required_input_ordering: Option<LexRequirement>,
partition_search_mode: PartitionSearchMode,
/// Describes how the input is ordered relative to the group by columns
input_order_mode: InputOrderMode,
/// Describe how the output is ordered
output_ordering: Option<LexOrdering>,
}

Expand Down Expand Up @@ -409,15 +409,15 @@ fn get_aggregate_search_mode(
aggr_expr: &mut [Arc<dyn AggregateExpr>],
order_by_expr: &mut [Option<LexOrdering>],
ordering_req: &mut Vec<PhysicalSortExpr>,
) -> PartitionSearchMode {
) -> InputOrderMode {
let groupby_exprs = group_by
.expr
.iter()
.map(|(item, _)| item.clone())
.collect::<Vec<_>>();
let mut partition_search_mode = PartitionSearchMode::Linear;
let mut input_order_mode = InputOrderMode::Linear;
if !group_by.is_single() || groupby_exprs.is_empty() {
return partition_search_mode;
return input_order_mode;
}

if let Some((should_reverse, mode)) =
Expand All @@ -439,9 +439,9 @@ fn get_aggregate_search_mode(
);
*ordering_req = reverse_order_bys(ordering_req);
}
partition_search_mode = mode;
input_order_mode = mode;
}
partition_search_mode
input_order_mode
}

/// Check whether group by expression contains all of the expression inside `requirement`
Expand Down Expand Up @@ -515,7 +515,7 @@ impl AggregateExec {
&input.equivalence_properties(),
)?;
let mut ordering_req = requirement.unwrap_or(vec![]);
let partition_search_mode = get_aggregate_search_mode(
let input_order_mode = get_aggregate_search_mode(
&group_by,
&input,
&mut aggr_expr,
Expand Down Expand Up @@ -567,7 +567,7 @@ impl AggregateExec {
metrics: ExecutionPlanMetricsSet::new(),
required_input_ordering,
limit: None,
partition_search_mode,
input_order_mode,
output_ordering,
})
}
Expand Down Expand Up @@ -767,8 +767,8 @@ impl DisplayAs for AggregateExec {
write!(f, ", lim=[{limit}]")?;
}

if self.partition_search_mode != PartitionSearchMode::Linear {
write!(f, ", ordering_mode={:?}", self.partition_search_mode)?;
if self.input_order_mode != InputOrderMode::Linear {
write!(f, ", ordering_mode={:?}", self.input_order_mode)?;
}
}
}
Expand Down Expand Up @@ -819,7 +819,7 @@ impl ExecutionPlan for AggregateExec {
/// infinite, returns an error to indicate this.
fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
if children[0] {
if self.partition_search_mode == PartitionSearchMode::Linear {
if self.input_order_mode == InputOrderMode::Linear {
// Cannot run without breaking pipeline.
plan_err!(
"Aggregate Error: `GROUP BY` clauses with columns without ordering and GROUPING SETS are not supported for unbounded inputs."
Expand Down
12 changes: 5 additions & 7 deletions datafusion/physical-plan/src/aggregates/order/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion_physical_expr::{EmitTo, PhysicalSortExpr};
mod full;
mod partial;

use crate::windows::PartitionSearchMode;
use crate::InputOrderMode;
pub(crate) use full::GroupOrderingFull;
pub(crate) use partial::GroupOrderingPartial;

Expand All @@ -42,18 +42,16 @@ impl GroupOrdering {
/// Create a `GroupOrdering` for the the specified ordering
pub fn try_new(
input_schema: &Schema,
mode: &PartitionSearchMode,
mode: &InputOrderMode,
ordering: &[PhysicalSortExpr],
) -> Result<Self> {
match mode {
PartitionSearchMode::Linear => Ok(GroupOrdering::None),
PartitionSearchMode::PartiallySorted(order_indices) => {
InputOrderMode::Linear => Ok(GroupOrdering::None),
InputOrderMode::PartiallySorted(order_indices) => {
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)
.map(GroupOrdering::Partial)
}
PartitionSearchMode::Sorted => {
Ok(GroupOrdering::Full(GroupOrderingFull::new()))
}
InputOrderMode::Sorted => Ok(GroupOrdering::Full(GroupOrderingFull::new())),
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl GroupedHashAggregateStream {
.find_longest_permutation(&agg_group_by.output_exprs());
let group_ordering = GroupOrdering::try_new(
&group_schema,
&agg.partition_search_mode,
&agg.input_order_mode,
ordering.as_slice(),
)?;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ pub mod joins;
pub mod limit;
pub mod memory;
pub mod metrics;
mod ordering;
pub mod projection;
pub mod repartition;
pub mod sorts;
Expand All @@ -72,6 +73,7 @@ pub mod windows;

pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
pub use crate::metrics::Metric;
pub use crate::ordering::InputOrderMode;
pub use crate::topk::TopK;
pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor};

Expand Down
51 changes: 51 additions & 0 deletions datafusion/physical-plan/src/ordering.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Specifies how the input to an aggregation or window operator is ordered
/// relative to their `GROUP BY` or `PARTITION BY` expressions.
///
/// For example, if the existing ordering is `[a ASC, b ASC, c ASC]`
///
/// ## Window Functions
/// - A `PARTITION BY b` clause can use `Linear` mode.
/// - A `PARTITION BY a, c` or a `PARTITION BY c, a` can use
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
/// (The vector stores the index of `a` in the respective PARTITION BY expression.)
/// - A `PARTITION BY a, b` or a `PARTITION BY b, a` can use `Sorted` mode.
///
/// ## Aggregations
/// - A `GROUP BY b` clause can use `Linear` mode.
/// - A `GROUP BY a, c` or a `GROUP BY BY c, a` can use
/// `PartiallySorted([0])` or `PartiallySorted([1])` modes, respectively.
/// (The vector stores the index of `a` in the respective PARTITION BY expression.)
/// - A `GROUP BY a, b` or a `GROUP BY b, a` can use `Sorted` mode.
///
/// Note these are the same examples as above, but with `GROUP BY` instead of
/// `PARTITION BY` to make the examples easier to read.
#[derive(Debug, Clone, PartialEq)]
pub enum InputOrderMode {
/// There is no partial permutation of the expressions satisfying the
/// existing ordering.
Linear,
/// There is a partial permutation of the expressions satisfying the
/// existing ordering. Indices describing the longest partial permutation
/// are stored in the vector.
PartiallySorted(Vec<usize>),
/// There is a (full) permutation of the expressions satisfying the
/// existing ordering.
Sorted,
}
Loading

0 comments on commit e322839

Please sign in to comment.