From 156c347a37e2bfb815dd5df154ba395c099e87b0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 1 Jan 2024 22:11:22 -0800 Subject: [PATCH] Fix tests --- .../enforce_distribution.rs | 187 ++++++------------ 1 file changed, 63 insertions(+), 124 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 3db99cca9ea86..4cc8cf22b1247 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -2077,15 +2077,12 @@ pub(crate) mod tests { JoinType::Inner | JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => vec![ top_join_plan.as_str(), join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 4 RepartitionExecs @@ -2093,15 +2090,12 @@ pub(crate) mod tests { top_join_plan.as_str(), "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; @@ -2140,15 +2134,12 @@ pub(crate) mod tests { vec![ top_join_plan.as_str(), join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 4 RepartitionExecs @@ -2157,15 +2148,12 @@ pub(crate) mod tests { top_join_plan.as_str(), "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; @@ -2216,14 +2204,11 @@ pub(crate) mod tests { "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a1@0, c@2)]", "ProjectionExec: expr=[a@0 as a1, a@0 as a2]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; assert_optimized!(expected, top_join.clone(), true); @@ -2242,14 +2227,11 @@ pub(crate) mod tests { "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a2@1, c@2)]", "ProjectionExec: expr=[a@0 as a1, a@0 as a2]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; @@ -2297,14 +2279,11 @@ pub(crate) mod tests { "ProjectionExec: expr=[c1@0 as a]", "ProjectionExec: expr=[c@2 as c1]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@1)]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b@1], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; @@ -2499,19 +2478,15 @@ pub(crate) mod tests { "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(B@2, b1@6), (C@3, c@2), (AA@1, a1@5)]", "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@1, b1@1), (c@2, c1@2), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b@1, c@2, a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1, c1@2, a1@0], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; @@ -2627,19 +2602,15 @@ pub(crate) mod tests { top_join_plan.as_str(), "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1), (c@2, c1@2)]", - "RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0, b@1, c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a1@0, b1@1, c1@2], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; @@ -2752,19 +2723,15 @@ pub(crate) mod tests { top_join_plan.as_str(), "ProjectionExec: expr=[a@0 as A, a@0 as AA, b@1 as B, c@2 as C]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a1@0), (b@1, b1@1)]", - "RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0, b@1], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a1@0, b1@1], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@2, c1@2), (b@1, b1@1), (a@0, a1@0)]", - "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2, b@1, a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c1@2, b1@1, a1@0], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ]; @@ -2830,17 +2797,14 @@ pub(crate) mod tests { top_join_plan.as_str(), join_plan.as_str(), "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 7 RepartitionExecs (4 hash, 3 round-robin), 4 SortExecs @@ -2859,17 +2823,14 @@ pub(crate) mod tests { "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", join_plan.as_str(), "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], }; @@ -2881,17 +2842,14 @@ pub(crate) mod tests { vec![ top_join_plan.as_str(), join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], @@ -2907,22 +2865,18 @@ pub(crate) mod tests { _ => vec![ top_join_plan.as_str(), // Below 4 operators are differences introduced, when join mode is changed - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "SortExec: expr=[a@0 ASC]", "CoalescePartitionsExec", join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], @@ -2952,17 +2906,14 @@ pub(crate) mod tests { top_join_plan.as_str(), join_plan.as_str(), "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 7 RepartitionExecs (4 hash, 3 round-robin) and 4 SortExecs @@ -2972,17 +2923,14 @@ pub(crate) mod tests { "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10", join_plan.as_str(), "SortExec: expr=[a@0 ASC]", - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[b1@1 ASC]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", "SortExec: expr=[c@2 ASC]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // this match arm cannot be reached @@ -2995,39 +2943,32 @@ pub(crate) mod tests { JoinType::Inner | JoinType::Right => vec![ top_join_plan.as_str(), join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], // Should include 8 RepartitionExecs (4 of them preserves order) and 4 SortExecs JoinType::Left | JoinType::Full => vec![ top_join_plan.as_str(), - "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@6 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@6], 10), input_partitions=1", "SortExec: expr=[b1@6 ASC]", "CoalescePartitionsExec", join_plan.as_str(), - "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, preserve_order=true, sort_exprs=a@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=1", "SortExec: expr=[a@0 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=10, preserve_order=true, sort_exprs=b1@1 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b1@1], 10), input_partitions=1", "SortExec: expr=[b1@1 ASC]", "ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1, d@3 as d1, e@4 as e1]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=10, preserve_order=true, sort_exprs=c@2 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([c@2], 10), input_partitions=1", "SortExec: expr=[c@2 ASC]", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", ], @@ -3117,8 +3058,7 @@ pub(crate) mod tests { let expected_first_sort_enforcement = &[ "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]", - "RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b3@1 ASC,a3@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b3@1, a3@0], 10), input_partitions=1", "SortExec: expr=[b3@1 ASC,a3@0 ASC]", "CoalescePartitionsExec", "ProjectionExec: expr=[a1@0 as a3, b1@1 as b3]", @@ -3128,8 +3068,7 @@ pub(crate) mod tests { "AggregateExec: mode=Partial, gby=[b@1 as b1, a@0 as a1], aggr=[]", "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e]", - "RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=10, preserve_order=true, sort_exprs=b2@1 ASC,a2@0 ASC", - "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", + "RepartitionExec: partitioning=Hash([b2@1, a2@0], 10), input_partitions=1", "SortExec: expr=[b2@1 ASC,a2@0 ASC]", "CoalescePartitionsExec", "ProjectionExec: expr=[a@1 as a2, b@0 as b2]", @@ -3853,14 +3792,14 @@ pub(crate) mod tests { "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", // Plan already has two partitions - "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e]", + "ParquetExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e]", ]; let expected_csv = [ "AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]", "RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2", "AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", // Plan already has two partitions - "CsvExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], has_header=false", + "CsvExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], has_header=false", ]; assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10);