Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bugs explain with non-correlated query #13210

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
fix bugs explain with non-correlated query
  • Loading branch information
Lordworms committed Oct 31, 2024
commit 95e635fee7ecd8b47f735fca7b11ae2e17d81f86
12 changes: 6 additions & 6 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
@@ -880,18 +880,18 @@ impl TableProvider for ListingTable {
None => {} // no ordering required
};

let filters = conjunction(filters.to_vec())
.map(|expr| -> Result<_> {
// NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns.
let filters = match conjunction(filters.to_vec()) {
Some(expr) => {
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
state.execution_props(),
)?;
Ok(Some(filters))
})
.unwrap_or(Ok(None))?;
Some(filters)
}
None => None,
};

let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
14 changes: 11 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
@@ -1792,11 +1792,19 @@ impl DefaultPhysicalPlanner {
Err(e) => return Err(e),
}
}
Err(e) => stringified_plans
.push(StringifiedPlan::new(InitialPhysicalPlan, e.to_string())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with this some more this morning -- I think the reason the error isn't displayed is that InitialPhysicalPlan is not shown by default in explain plans.

Err(err) => {
return Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(Schema::new(vec![arrow_schema::Field::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change means that the explain will not have the logical plan, but instead will have only the error message for creating the physical plan as the "Final Logical Plan"

"Err",
arrow_schema::DataType::Utf8,
false,
)])),
vec![StringifiedPlan::new(FinalLogicalPlan, err.to_string())],
e.verbose,
))))
}
}
}

Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(e.schema.as_ref().to_owned().into()),
stringified_plans,
18 changes: 17 additions & 1 deletion datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
@@ -72,6 +72,11 @@ impl ExplainExec {
self.verbose
}

/// check if current plan is a failed explain plan
pub fn is_failed_explain(&self) -> bool {
self.stringified_plans.len() == 1 && self.schema.fields().len() == 1
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
@@ -132,7 +137,18 @@ impl ExecutionPlan for ExplainExec {
if 0 != partition {
return internal_err!("ExplainExec invalid partition {partition}");
}

if self.is_failed_explain() {
let mut err_builder = StringBuilder::with_capacity(1, 1024);
err_builder.append_value(&*self.stringified_plans[0].plan);
let record_batch = RecordBatch::try_new(
Arc::clone(&self.schema),
vec![Arc::new(err_builder.finish())],
)?;
return Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
futures::stream::iter(vec![Ok(record_batch)]),
)));
}
let mut type_builder =
StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
let mut plan_builder =
18 changes: 18 additions & 0 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
@@ -411,3 +411,21 @@ logical_plan
physical_plan
01)ProjectionExec: expr=[{c0:1,c1:2.3,c2:abc} as struct(Int64(1),Float64(2.3),Utf8("abc"))]
02)--PlaceholderRowExec


statement ok
create table t1(a int);

statement ok
create table t2(b int);

query T
explain select a from t1 where exists (select count(*) from t2);
----
This feature is not implemented: Physical plan does not support logical expression Exists(Exists { subquery: <subquery>, negated: false })

statement ok
drop table t1;

statement ok
drop table t2;
8 changes: 2 additions & 6 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
@@ -4070,18 +4070,14 @@ physical_plan
08)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true

# we do not generate physical plan for Repartition yet (e.g Distribute By queries).
query TT
query T
EXPLAIN SELECT a, b, sum1
FROM (SELECT c, b, a, SUM(d) as sum1
FROM multiple_ordered_table_with_pk
GROUP BY c)
DISTRIBUTE BY a
----
logical_plan
01)Repartition: DistributeBy(multiple_ordered_table_with_pk.a)
02)--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d]
This feature is not implemented: Physical plan does not support DistributeBy partitioning

# union with aggregate
query TT
26 changes: 4 additions & 22 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
@@ -4049,19 +4049,10 @@ physical_plan


# Test CROSS JOIN LATERAL syntax (planning)
query TT
query T
explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnest(generate_series(1, t1_int))) as series(i);
----
logical_plan
01)Cross Join:
02)--SubqueryAlias: t1
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
05)----Subquery:
06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)),depth=1) AS i
07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))|depth=1] structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))
09)------------EmptyRelation
This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t1" }), name: "t1_int" })


# Test CROSS JOIN LATERAL syntax (execution)
@@ -4071,19 +4062,10 @@ select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnes


# Test INNER JOIN LATERAL syntax (planning)
query TT
query T
explain select t1_id, t1_name, i from join_t1 t2 inner join lateral (select * from unnest(generate_series(1, t1_int))) as series(i) on(t1_id > i);
----
logical_plan
01)Inner Join: Filter: CAST(t2.t1_id AS Int64) > series.i
02)--SubqueryAlias: t2
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
05)----Subquery:
06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)),depth=1) AS i
07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))|depth=1] structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))
09)------------EmptyRelation
This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t2" }), name: "t1_int" })


# Test INNER JOIN LATERAL syntax (execution)
47 changes: 10 additions & 37 deletions datafusion/sqllogictest/test_files/update.slt
Original file line number Diff line number Diff line change
@@ -26,50 +26,30 @@ create table t1(a int, b varchar, c double, d int);
statement ok
set datafusion.optimizer.max_passes = 0;

query TT
query T
explain update t1 set a=1, b=2, c=3.0, d=NULL;
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(Int64(1) AS Int32) AS a, CAST(Int64(2) AS Utf8) AS b, Float64(3) AS c, CAST(NULL AS Int32) AS d
03)----TableScan: t1
This feature is not implemented: Unsupported logical plan: Dml(Update)

query TT
query T
explain update t1 set a=c+1, b=a, c=c+1.0, d=b;
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(t1.c + CAST(Int64(1) AS Float64) AS Int32) AS a, CAST(t1.a AS Utf8) AS b, t1.c + Float64(1) AS c, CAST(t1.b AS Int32) AS d
03)----TableScan: t1
This feature is not implemented: Unsupported logical plan: Dml(Update)

statement ok
create table t2(a int, b varchar, c double, d int);

## set from subquery
query TT
query T
explain update t1 set b = (select max(b) from t2 where t1.a = t2.a)
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, (<subquery>) AS b, t1.c AS c, t1.d AS d
03)----Subquery:
04)------Projection: max(t2.b)
05)--------Aggregate: groupBy=[[]], aggr=[[max(t2.b)]]
06)----------Filter: outer_ref(t1.a) = t2.a
07)------------TableScan: t2
08)----TableScan: t1
This feature is not implemented: Physical plan does not support logical expression ScalarSubquery(<subquery>)

# set from other table
query TT
query T
explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0;
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t1.a = t2.a AND t1.b > Utf8("foo") AND t2.c > Float64(1)
04)------Cross Join:
05)--------TableScan: t1
06)--------TableScan: t2
This feature is not implemented: Unsupported logical plan: Dml(Update)

statement ok
create table t3(a int, b varchar, c double, d int);
@@ -79,14 +59,7 @@ query error DataFusion error: SQL error: ParserError\("Expected end of statement
explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a;

# test table alias
query TT
query T
explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0;
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t.a = t2.a AND t.b > Utf8("foo") AND t2.c > Float64(1)
04)------Cross Join:
05)--------SubqueryAlias: t
06)----------TableScan: t1
07)--------TableScan: t2
This feature is not implemented: Unsupported logical plan: Dml(Update)
Loading