From 6693a49662ff67c59cd9bf1c345156449164ab91 Mon Sep 17 00:00:00 2001 From: JasonLi-cn Date: Wed, 16 Oct 2024 16:52:12 +0800 Subject: [PATCH] feat: support push down limit when full join --- datafusion/optimizer/src/push_down_limit.rs | 1 + datafusion/sqllogictest/test_files/joins.slt | 88 +++++++++++++++++++- 2 files changed, 88 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 8b5e483001b3..47fce64ae00e 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -263,6 +263,7 @@ fn push_down_join(mut join: Join, limit: usize) -> Transformed { match join.join_type { Left => (Some(limit), None), Right => (None, Some(limit)), + Full => (Some(limit), Some(limit)), _ => (None, None), } }; diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index a7a252cc20d7..be9321ddb945 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4187,4 +4187,90 @@ physical_plan 02)--HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, y@1)], filter=a@0 < x@1 03)----MemoryExec: partitions=1, partition_sizes=[0] 04)----SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[false] -05)------MemoryExec: partitions=1, partition_sizes=[0] \ No newline at end of file +05)------MemoryExec: partitions=1, partition_sizes=[0] + +# Test full join with limit +statement ok +CREATE TABLE t0(c1 INT UNSIGNED, c2 INT UNSIGNED) +AS VALUES +(1, 1), +(2, 2), +(3, 3), +(4, 4); + +statement ok +CREATE TABLE t1(c1 INT UNSIGNED, c2 INT UNSIGNED, c3 BOOLEAN) +AS VALUES +(2, 2, true), +(2, 2, false), +(3, 3, true), +(3, 3, false); + +query IIIIB +SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 LIMIT 2; +---- +2 2 2 2 true +2 2 2 2 false + +query IIIIB +SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 LIMIT 2; +---- +2 2 2 2 true +3 3 2 2 true + +query IIIIB +SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 AND t0.c2 >= t1.c2 LIMIT 2; +---- +2 2 2 2 true +2 2 2 2 false + +## Test !join.on.is_empty() && join.filter.is_none() +query TT +EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Full Join: t0.c1 = t1.c1 +03)----Limit: skip=0, fetch=2 +04)------TableScan: t0 projection=[c1, c2], fetch=2 +05)----Limit: skip=0, fetch=2 +06)------TableScan: t1 projection=[c1, c2, c3], fetch=2 +physical_plan +01)CoalesceBatchesExec: target_batch_size=3, fetch=2 +02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)] +03)----MemoryExec: partitions=1, partition_sizes=[1] +04)----MemoryExec: partitions=1, partition_sizes=[1] + +## Test join.on.is_empty() && join.filter.is_some() +query TT +EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Full Join: Filter: t0.c2 >= t1.c2 +03)----Limit: skip=0, fetch=2 +04)------TableScan: t0 projection=[c1, c2], fetch=2 +05)----Limit: skip=0, fetch=2 +06)------TableScan: t1 projection=[c1, c2, c3], fetch=2 +physical_plan +01)GlobalLimitExec: skip=0, fetch=2 +02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1 +03)----MemoryExec: partitions=1, partition_sizes=[1] +04)----MemoryExec: partitions=1, partition_sizes=[1] + +## Test !join.on.is_empty() && join.filter.is_some() +query TT +EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 AND t0.c2 >= t1.c2 LIMIT 2; +---- +logical_plan +01)Limit: skip=0, fetch=2 +02)--Full Join: t0.c1 = t1.c1 Filter: t0.c2 >= t1.c2 +03)----Limit: skip=0, fetch=2 +04)------TableScan: t0 projection=[c1, c2], fetch=2 +05)----Limit: skip=0, fetch=2 +06)------TableScan: t1 projection=[c1, c2, c3], fetch=2 +physical_plan +01)CoalesceBatchesExec: target_batch_size=3, fetch=2 +02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], filter=c2@0 >= c2@1 +03)----MemoryExec: partitions=1, partition_sizes=[1] +04)----MemoryExec: partitions=1, partition_sizes=[1]