Skip to content

Commit

Permalink
apply tom patch
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewgapp committed Apr 12, 2024
1 parent 118eecd commit 40c57f8
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 6 deletions.
60 changes: 54 additions & 6 deletions datafusion/optimizer/src/extract_equijoin_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,36 @@ impl OptimizerRule for ExtractEquijoinPredicate {
right_schema,
)?;

// If there are no equijoin predicates and no existing on then try a nullable join
if equijoin_predicates.is_empty() && on.is_empty() {
let (equinullable_predicates, non_equinullable_expr) =
split_eq_and_noneq_join_predicate_nullable(
expr,
left_schema,
right_schema,
)?;
if !equinullable_predicates.is_empty() {
let optimized_plan = (!equinullable_predicates.is_empty())
.then(|| {
let mut new_on = on.clone();
new_on.extend(equinullable_predicates);

LogicalPlan::Join(Join {
left: left.clone(),
right: right.clone(),
on: new_on,
filter: non_equinullable_expr,
join_type: *join_type,
join_constraint: *join_constraint,
schema: schema.clone(),
null_equals_null: true,
})
});

return Ok(optimized_plan);
}
}

let optimized_plan = (!equijoin_predicates.is_empty()).then(|| {
let mut new_on = on.clone();
new_on.extend(equijoin_predicates);
Expand Down Expand Up @@ -108,22 +138,19 @@ impl OptimizerRule for ExtractEquijoinPredicate {
}
}

fn split_eq_and_noneq_join_predicate(
fn split_equality_join_predicate(
filter: &Expr,
left_schema: &Arc<DFSchema>,
right_schema: &Arc<DFSchema>,
match_bin_op: Operator,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
let exprs = split_conjunction(filter);

let mut accum_join_keys: Vec<(Expr, Expr)> = vec![];
let mut accum_filters: Vec<Expr> = vec![];
for expr in exprs {
match expr {
Expr::BinaryExpr(BinaryExpr {
left,
op: Operator::Eq,
right,
}) => {
Expr::BinaryExpr(BinaryExpr { left, op, right }) if op == &match_bin_op => {
let left = left.as_ref();
let right = right.as_ref();

Expand Down Expand Up @@ -155,6 +182,27 @@ fn split_eq_and_noneq_join_predicate(
Ok((accum_join_keys, result_filter))
}

fn split_eq_and_noneq_join_predicate(
filter: &Expr,
left_schema: &Arc<DFSchema>,
right_schema: &Arc<DFSchema>,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
split_equality_join_predicate(filter, left_schema, right_schema, Operator::Eq)
}

fn split_eq_and_noneq_join_predicate_nullable(
filter: &Expr,
left_schema: &Arc<DFSchema>,
right_schema: &Arc<DFSchema>,
) -> Result<(Vec<EquijoinPredicate>, Option<Expr>)> {
split_equality_join_predicate(
filter,
left_schema,
right_schema,
Operator::IsNotDistinctFrom,
)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,7 @@ async fn test_alias(sql_with_alias: &str, sql_no_alias: &str) -> Result<()> {
async fn roundtrip_with_ctx(sql: &str, ctx: SessionContext) -> Result<()> {
let df = ctx.sql(sql).await?;
let plan = df.into_optimized_plan()?;
println!("{plan:#?}");
let proto = to_substrait_plan(&plan, &ctx)?;
let plan2 = from_substrait_plan(&ctx, &proto).await?;
let plan2 = ctx.state().optimize(&plan2)?;
Expand Down

0 comments on commit 40c57f8

Please sign in to comment.