Skip to content

Commit

Permalink
Display: Support preserve_partitioning on SortExec physical plan. (a…
Browse files Browse the repository at this point in the history
…pache#10153)

* Display: Support `preserve_partitioning` on SortExec physical plan.

Fixes: apache#10138

`Display` of SortExec execution plan now support `preserve_partitioning` field

Signed-off-by: Kaviraj <[email protected]>

* Handle `None` case

Signed-off-by: Kaviraj <[email protected]>

* ran `cargo test --test sqllogictests -- --complete`

Signed-off-by: Kaviraj <[email protected]>

* Manual tests update

Signed-off-by: Kaviraj <[email protected]>

* fix `sort_spill_reservation` test case

Signed-off-by: Kaviraj <[email protected]>

* fix formating of test output

Signed-off-by: Kaviraj <[email protected]>

* INCLUDE_TPCH=true cargo test --test sqllogictests -- --complete

Signed-off-by: Kaviraj <[email protected]>

---------

Signed-off-by: Kaviraj <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
kavirajk and alamb authored Apr 30, 2024
1 parent c23a4e0 commit 369e201
Show file tree
Hide file tree
Showing 48 changed files with 404 additions and 404 deletions.
108 changes: 54 additions & 54 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs

Large diffs are not rendered by default.

204 changes: 102 additions & 102 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs

Large diffs are not rendered by default.

54 changes: 27 additions & 27 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2216,9 +2216,9 @@ mod tests {
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, a@5 as a_from_right, c@7 as c_from_right]",
" SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left, a@5 as a_from_right, c@7 as c_from_right]",
" SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
Expand All @@ -2227,10 +2227,10 @@ mod tests {
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;

let expected = [
"SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b_from_left@1, c_from_right@1)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" ProjectionExec: expr=[a@0 as a_from_right, c@2 as c_from_right]",
"SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b_from_left@1, c_from_right@1)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" ProjectionExec: expr=[c@2 as c_from_left, b@1 as b_from_left, a@0 as a_from_left]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" ProjectionExec: expr=[a@0 as a_from_right, c@2 as c_from_right]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(get_plan_string(&after_optimize), expected);
Expand Down Expand Up @@ -2335,9 +2335,9 @@ mod tests {
)?);
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]",
" SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]",
" SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
Expand All @@ -2346,9 +2346,9 @@ mod tests {
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;

let expected = [
"ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]",
" SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"ProjectionExec: expr=[a@5 as a, b@6 as b, c@7 as c, d@8 as d, e@9 as e, a@0 as a, b@1 as b, c@2 as c, d@3 as d, e@4 as e]",
" SymmetricHashJoinExec: mode=SinglePartition, join_type=Inner, on=[(b@1, c@2)], filter=b_left_inter@0 - 1 + a_right_inter@1 <= a_right_inter@1 + c_left_inter@2",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(get_plan_string(&after_optimize), expected);
Expand Down Expand Up @@ -2502,8 +2502,8 @@ mod tests {
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;

let expected = [
"RepartitionExec: partitioning=Hash([a@1, b_new@0, d_new@2], 6), input_partitions=1",
" ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]",
"RepartitionExec: partitioning=Hash([a@1, b_new@0, d_new@2], 6), input_partitions=1",
" ProjectionExec: expr=[b@1 as b_new, a@0 as a, d@3 as d_new]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
];
assert_eq!(get_plan_string(&after_optimize), expected);
Expand Down Expand Up @@ -2560,7 +2560,7 @@ mod tests {
let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" SortExec: expr=[b@1 ASC,c@2 + a@0 ASC]",
" SortExec: expr=[b@1 ASC,c@2 + a@0 ASC], preserve_partitioning=[false]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
Expand All @@ -2569,7 +2569,7 @@ mod tests {
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;

let expected = [
"SortExec: expr=[b@2 ASC,c@0 + new_a@1 ASC]",
"SortExec: expr=[b@2 ASC,c@0 + new_a@1 ASC], preserve_partitioning=[false]",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
Expand Down Expand Up @@ -2644,10 +2644,10 @@ mod tests {

let initial = get_plan_string(&projection);
let expected_initial = [
"ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" UnionExec",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
"ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" UnionExec",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(initial, expected_initial);
Expand All @@ -2656,12 +2656,12 @@ mod tests {
ProjectionPushdown::new().optimize(projection, &ConfigOptions::new())?;

let expected = [
"UnionExec",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
"UnionExec",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false",
" ProjectionExec: expr=[c@2 as c, a@0 as new_a, b@1 as b]",
" CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], has_header=false"
];
assert_eq!(get_plan_string(&after_optimize), expected);
Expand Down
Loading

0 comments on commit 369e201

Please sign in to comment.