From 40c57f899a5a74fd53c1cc9a93a1c90e4bc5d260 Mon Sep 17 00:00:00 2001 From: Matthew Gapp <61894094+matthewgapp@users.noreply.github.com> Date: Fri, 12 Apr 2024 08:24:31 -0700 Subject: [PATCH] apply tom patch --- .../src/extract_equijoin_predicate.rs | 60 +++++++++++++++++-- .../tests/cases/roundtrip_logical_plan.rs | 1 + 2 files changed, 55 insertions(+), 6 deletions(-) diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 60b9ba3031a1..ce32c2603c98 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -76,6 +76,36 @@ impl OptimizerRule for ExtractEquijoinPredicate { right_schema, )?; + // If there are no equijoin predicates and no existing on then try a nullable join + if equijoin_predicates.is_empty() && on.is_empty() { + let (equinullable_predicates, non_equinullable_expr) = + split_eq_and_noneq_join_predicate_nullable( + expr, + left_schema, + right_schema, + )?; + if !equinullable_predicates.is_empty() { + let optimized_plan = (!equinullable_predicates.is_empty()) + .then(|| { + let mut new_on = on.clone(); + new_on.extend(equinullable_predicates); + + LogicalPlan::Join(Join { + left: left.clone(), + right: right.clone(), + on: new_on, + filter: non_equinullable_expr, + join_type: *join_type, + join_constraint: *join_constraint, + schema: schema.clone(), + null_equals_null: true, + }) + }); + + return Ok(optimized_plan); + } + } + let optimized_plan = (!equijoin_predicates.is_empty()).then(|| { let mut new_on = on.clone(); new_on.extend(equijoin_predicates); @@ -108,10 +138,11 @@ impl OptimizerRule for ExtractEquijoinPredicate { } } -fn split_eq_and_noneq_join_predicate( +fn split_equality_join_predicate( filter: &Expr, left_schema: &Arc, right_schema: &Arc, + match_bin_op: Operator, ) -> Result<(Vec, Option)> { let exprs = split_conjunction(filter); @@ -119,11 +150,7 @@ fn split_eq_and_noneq_join_predicate( let mut accum_filters: Vec = vec![]; for expr in exprs { match expr { - Expr::BinaryExpr(BinaryExpr { - left, - op: Operator::Eq, - right, - }) => { + Expr::BinaryExpr(BinaryExpr { left, op, right }) if op == &match_bin_op => { let left = left.as_ref(); let right = right.as_ref(); @@ -155,6 +182,27 @@ fn split_eq_and_noneq_join_predicate( Ok((accum_join_keys, result_filter)) } +fn split_eq_and_noneq_join_predicate( + filter: &Expr, + left_schema: &Arc, + right_schema: &Arc, +) -> Result<(Vec, Option)> { + split_equality_join_predicate(filter, left_schema, right_schema, Operator::Eq) +} + +fn split_eq_and_noneq_join_predicate_nullable( + filter: &Expr, + left_schema: &Arc, + right_schema: &Arc, +) -> Result<(Vec, Option)> { + split_equality_join_predicate( + filter, + left_schema, + right_schema, + Operator::IsNotDistinctFrom, + ) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 28c0de1c9973..1d32f682572f 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -934,6 +934,7 @@ async fn test_alias(sql_with_alias: &str, sql_no_alias: &str) -> Result<()> { async fn roundtrip_with_ctx(sql: &str, ctx: SessionContext) -> Result<()> { let df = ctx.sql(sql).await?; let plan = df.into_optimized_plan()?; + println!("{plan:#?}"); let proto = to_substrait_plan(&plan, &ctx)?; let plan2 = from_substrait_plan(&ctx, &proto).await?; let plan2 = ctx.state().optimize(&plan2)?;