Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 2, 2024
1 parent b7395a3 commit 506d161
Showing 1 changed file with 21 additions and 20 deletions.
41 changes: 21 additions & 20 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,26 +1202,14 @@ fn ensure_distribution(
true
};

if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count, it is not beneficial:
&& child.plan.output_partitioning().partition_count() < target_partitions
{
// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans {
if let Some(new_child) =
child.plan.repartitioned(target_partitions, config)?
{
child.plan = new_child;
}
// When `repartition_file_scans` is set, attempt to increase
// parallelism at the source.
if repartition_file_scans {
if let Some(new_child) =
child.plan.repartitioned(target_partitions, config)?
{
child.plan = new_child;
}
// Increase parallelism by adding round-robin repartitioning
// on top of the operator. Note that we only do this if the
// partition count is not already equal to the desired partition
// count.
child = add_roundrobin_on_top(child, target_partitions)?;
}

// Satisfy the distribution requirement if it is unmet.
Expand All @@ -1232,7 +1220,20 @@ fn ensure_distribution(
Distribution::HashPartitioned(exprs) => {
child = add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
}
Distribution::UnspecifiedDistribution => {}
Distribution::UnspecifiedDistribution => {
if enable_round_robin
// Operator benefits from partitioning (e.g. filter):
&& (would_benefit && repartition_beneficial_stats)
// Unless partitioning doesn't increase the partition count, it is not beneficial:
&& child.plan.output_partitioning().partition_count() < target_partitions
{
// Increase parallelism by adding round-robin repartitioning
// on top of the operator. Note that we only do this if the
// partition count is not already equal to the desired partition
// count.
child = add_roundrobin_on_top(child, target_partitions)?;
}
}
};

// There is an ordering requirement of the operator:
Expand Down

0 comments on commit 506d161

Please sign in to comment.