Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bugs explain with non-correlated query #13210

Merged
merged 6 commits into from
Nov 5, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use explicit enum for physical errors
  • Loading branch information
alamb committed Nov 3, 2024
commit 29252e3c4e9da4b7a54241aa1a0e3432fd585c98
5 changes: 4 additions & 1 deletion datafusion/common/src/display/mod.rs
Original file line number Diff line number Diff line change
@@ -62,6 +62,8 @@ pub enum PlanType {
FinalPhysicalPlanWithStats,
/// The final with schema, fully optimized physical plan which would be executed
FinalPhysicalPlanWithSchema,
/// An error creating the physical plan
PhysicalPlanError,
}

impl Display for PlanType {
@@ -91,6 +93,7 @@ impl Display for PlanType {
PlanType::FinalPhysicalPlanWithSchema => {
write!(f, "physical_plan_with_schema")
}
PlanType::PhysicalPlanError => write!(f, "physical_plan_error"),
}
}
}
@@ -118,7 +121,7 @@ impl StringifiedPlan {
/// `verbose_mode = true` will display all available plans
pub fn should_display(&self, verbose_mode: bool) -> bool {
match self.plan_type {
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan | PlanType::PhysicalPlanError=> true,
_ => verbose_mode,
}
}
14 changes: 5 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
@@ -1798,18 +1798,14 @@ impl DefaultPhysicalPlanner {
}
}
Err(err) => {
return Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(Schema::new(vec![arrow_schema::Field::new(
"Err",
arrow_schema::DataType::Utf8,
false,
)])),
vec![StringifiedPlan::new(FinalLogicalPlan, err.to_string())],
e.verbose,
))))
// use FinalLogicalPlan so the error appears in the final output by default
// Initial plans are only shown in verbose mode
stringified_plans
.push(StringifiedPlan::new(PhysicalPlanError, err.to_string()));
}
}
}

Ok(Some(Arc::new(ExplainExec::new(
SchemaRef::new(e.schema.as_ref().to_owned().into()),
stringified_plans,
17 changes: 0 additions & 17 deletions datafusion/physical-plan/src/explain.rs
Original file line number Diff line number Diff line change
@@ -72,11 +72,6 @@ impl ExplainExec {
self.verbose
}

/// check if current plan is a failed explain plan
pub fn is_failed_explain(&self) -> bool {
self.stringified_plans.len() == 1 && self.schema.fields().len() == 1
}

/// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(schema: SchemaRef) -> PlanProperties {
let eq_properties = EquivalenceProperties::new(schema);
@@ -137,18 +132,6 @@ impl ExecutionPlan for ExplainExec {
if 0 != partition {
return internal_err!("ExplainExec invalid partition {partition}");
}
if self.is_failed_explain() {
let mut err_builder = StringBuilder::with_capacity(1, 1024);
err_builder.append_value(&*self.stringified_plans[0].plan);
let record_batch = RecordBatch::try_new(
Arc::clone(&self.schema),
vec![Arc::new(err_builder.finish())],
)?;
return Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
futures::stream::iter(vec![Ok(record_batch)]),
)));
}
let mut type_builder =
StringBuilder::with_capacity(self.stringified_plans.len(), 1024);
let mut plan_builder =
1 change: 1 addition & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
@@ -655,6 +655,7 @@ message PlanType {
datafusion_common.EmptyMessage FinalPhysicalPlan = 6;
datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10;
datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12;
datafusion_common.EmptyMessage PhysicalPlanError = 13;
}
}

13 changes: 13 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion datafusion/proto/src/logical_plan/from_proto.rs
Original file line number Diff line number Diff line change
@@ -44,7 +44,7 @@ use crate::protobuf::{
AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan,
FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan,
InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan,
OptimizedPhysicalPlan,
OptimizedPhysicalPlan, PhysicalPlanError
},
AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType,
OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
@@ -141,6 +141,7 @@ impl From<&protobuf::StringifiedPlan> for StringifiedPlan {
FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan,
FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats,
FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema,
PhysicalPlanError(_) => PlanType::PhysicalPlanError,
},
plan: Arc::new(stringified_plan.plan.clone()),
}
5 changes: 4 additions & 1 deletion datafusion/proto/src/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ use crate::protobuf::{
FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats,
InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema,
InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan,
PhysicalPlanError,
},
AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList,
OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode,
@@ -115,7 +116,9 @@ impl From<&StringifiedPlan> for protobuf::StringifiedPlan {
PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType {
plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})),
}),
},
PlanType::PhysicalPlanError => Some(protobuf::PlanType {
plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})),
}), },
plan: stringified_plan.plan.to_string(),
}
}
11 changes: 9 additions & 2 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
@@ -419,10 +419,17 @@ create table t1(a int);
statement ok
create table t2(b int);

query T
query TT
explain select a from t1 where exists (select count(*) from t2);
----
This feature is not implemented: Physical plan does not support logical expression Exists(Exists { subquery: <subquery>, negated: false })
logical_plan
01)Filter: EXISTS (<subquery>)
02)--Subquery:
03)----Projection: count(*)
04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]]
05)--------TableScan: t2
06)--TableScan: t1 projection=[a]
physical_plan_error This feature is not implemented: Physical plan does not support logical expression Exists(Exists { subquery: <subquery>, negated: false })

statement ok
drop table t1;
9 changes: 7 additions & 2 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
@@ -4070,14 +4070,19 @@ physical_plan
08)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], output_ordering=[c@0 ASC NULLS LAST], has_header=true

# we do not generate physical plan for Repartition yet (e.g Distribute By queries).
query T
query TT
EXPLAIN SELECT a, b, sum1
FROM (SELECT c, b, a, SUM(d) as sum1
FROM multiple_ordered_table_with_pk
GROUP BY c)
DISTRIBUTE BY a
----
This feature is not implemented: Physical plan does not support DistributeBy partitioning
logical_plan
01)Repartition: DistributeBy(multiple_ordered_table_with_pk.a)
02)--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1
03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d]
physical_plan_error This feature is not implemented: Physical plan does not support DistributeBy partitioning

# union with aggregate
query TT
28 changes: 24 additions & 4 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
@@ -4049,10 +4049,20 @@ physical_plan


# Test CROSS JOIN LATERAL syntax (planning)
query T
query TT
explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnest(generate_series(1, t1_int))) as series(i);
----
This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t1" }), name: "t1_int" })
logical_plan
01)Cross Join:
02)--SubqueryAlias: t1
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
05)----Subquery:
06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)),depth=1) AS i
07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))|depth=1] structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))
09)------------EmptyRelation
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t1" }), name: "t1_int" })


# Test CROSS JOIN LATERAL syntax (execution)
@@ -4062,10 +4072,20 @@ select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnes


# Test INNER JOIN LATERAL syntax (planning)
query T
query TT
explain select t1_id, t1_name, i from join_t1 t2 inner join lateral (select * from unnest(generate_series(1, t1_int))) as series(i) on(t1_id > i);
----
This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t2" }), name: "t1_int" })
logical_plan
01)Inner Join: Filter: CAST(t2.t1_id AS Int64) > series.i
02)--SubqueryAlias: t2
03)----TableScan: join_t1 projection=[t1_id, t1_name]
04)--SubqueryAlias: series
05)----Subquery:
06)------Projection: unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)),depth=1) AS i
07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))|depth=1] structs[]
08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))
09)------------EmptyRelation
physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t2" }), name: "t1_int" })


# Test INNER JOIN LATERAL syntax (execution)
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/prepare.slt
Original file line number Diff line number Diff line change
@@ -86,11 +86,13 @@ query TT
EXPLAIN EXECUTE my_plan;
----
logical_plan Execute: my_plan params=[]
physical_plan_error This feature is not implemented: Unsupported logical plan: Execute

query TT
EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo');
----
logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")]
physical_plan_error This feature is not implemented: Unsupported logical plan: Execute

query error DataFusion error: Schema error: No field named a\.
EXPLAIN EXECUTE my_plan(a);
52 changes: 42 additions & 10 deletions datafusion/sqllogictest/test_files/update.slt
Original file line number Diff line number Diff line change
@@ -26,30 +26,54 @@ create table t1(a int, b varchar, c double, d int);
statement ok
set datafusion.optimizer.max_passes = 0;

query T
query TT
explain update t1 set a=1, b=2, c=3.0, d=NULL;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(Int64(1) AS Int32) AS a, CAST(Int64(2) AS Utf8) AS b, Float64(3) AS c, CAST(NULL AS Int32) AS d
03)----TableScan: t1
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)

query T
query TT
explain update t1 set a=c+1, b=a, c=c+1.0, d=b;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: CAST(t1.c + CAST(Int64(1) AS Float64) AS Int32) AS a, CAST(t1.a AS Utf8) AS b, t1.c + Float64(1) AS c, CAST(t1.b AS Int32) AS d
03)----TableScan: t1
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)

statement ok
create table t2(a int, b varchar, c double, d int);

## set from subquery
query T
query TT
explain update t1 set b = (select max(b) from t2 where t1.a = t2.a)
----
This feature is not implemented: Physical plan does not support logical expression ScalarSubquery(<subquery>)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, (<subquery>) AS b, t1.c AS c, t1.d AS d
03)----Subquery:
04)------Projection: max(t2.b)
05)--------Aggregate: groupBy=[[]], aggr=[[max(t2.b)]]
06)----------Filter: outer_ref(t1.a) = t2.a
07)------------TableScan: t2
08)----TableScan: t1
physical_plan_error This feature is not implemented: Physical plan does not support logical expression ScalarSubquery(<subquery>)

# set from other table
query T
query TT
explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t1.a = t2.a AND t1.b > Utf8("foo") AND t2.c > Float64(1)
04)------Cross Join:
05)--------TableScan: t1
06)--------TableScan: t2
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)

statement ok
create table t3(a int, b varchar, c double, d int);
@@ -59,7 +83,15 @@ query error DataFusion error: SQL error: ParserError\("Expected end of statement
explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a;

# test table alias
query T
query TT
explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0;
----
This feature is not implemented: Unsupported logical plan: Dml(Update)
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t.a = t2.a AND t.b > Utf8("foo") AND t2.c > Float64(1)
04)------Cross Join:
05)--------SubqueryAlias: t
06)----------TableScan: t1
07)--------TableScan: t2
physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)