diff --git a/datafusion/common/src/display/mod.rs b/datafusion/common/src/display/mod.rs index c12e7419e4b6..bad51c45f8ee 100644 --- a/datafusion/common/src/display/mod.rs +++ b/datafusion/common/src/display/mod.rs @@ -62,6 +62,8 @@ pub enum PlanType { FinalPhysicalPlanWithStats, /// The final with schema, fully optimized physical plan which would be executed FinalPhysicalPlanWithSchema, + /// An error creating the physical plan + PhysicalPlanError, } impl Display for PlanType { @@ -91,6 +93,7 @@ impl Display for PlanType { PlanType::FinalPhysicalPlanWithSchema => { write!(f, "physical_plan_with_schema") } + PlanType::PhysicalPlanError => write!(f, "physical_plan_error"), } } } @@ -118,7 +121,9 @@ impl StringifiedPlan { /// `verbose_mode = true` will display all available plans pub fn should_display(&self, verbose_mode: bool) -> bool { match self.plan_type { - PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true, + PlanType::FinalLogicalPlan + | PlanType::FinalPhysicalPlan + | PlanType::PhysicalPlanError => true, _ => verbose_mode, } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 15125fe5a090..b937a28e9332 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -880,18 +880,18 @@ impl TableProvider for ListingTable { None => {} // no ordering required }; - let filters = conjunction(filters.to_vec()) - .map(|expr| -> Result<_> { - // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns. + let filters = match conjunction(filters.to_vec()) { + Some(expr) => { let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?; let filters = create_physical_expr( &expr, &table_df_schema, state.execution_props(), )?; - Ok(Some(filters)) - }) - .unwrap_or(Ok(None))?; + Some(filters) + } + None => None, + }; let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2a96a2ad111f..7d475ad2e2a1 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1797,8 +1797,12 @@ impl DefaultPhysicalPlanner { Err(e) => return Err(e), } } - Err(e) => stringified_plans - .push(StringifiedPlan::new(InitialPhysicalPlan, e.to_string())), + Err(err) => { + stringified_plans.push(StringifiedPlan::new( + PhysicalPlanError, + err.strip_backtrace(), + )); + } } } diff --git a/datafusion/physical-plan/src/explain.rs b/datafusion/physical-plan/src/explain.rs index 96f55a1446b0..cc42e0587151 100644 --- a/datafusion/physical-plan/src/explain.rs +++ b/datafusion/physical-plan/src/explain.rs @@ -132,7 +132,6 @@ impl ExecutionPlan for ExplainExec { if 0 != partition { return internal_err!("ExplainExec invalid partition {partition}"); } - let mut type_builder = StringBuilder::with_capacity(self.stringified_plans.len(), 1024); let mut plan_builder = diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index b68c47c57eb9..d6fa129edc3f 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -655,6 +655,7 @@ message PlanType { datafusion_common.EmptyMessage FinalPhysicalPlan = 6; datafusion_common.EmptyMessage FinalPhysicalPlanWithStats = 10; datafusion_common.EmptyMessage FinalPhysicalPlanWithSchema = 12; + datafusion_common.EmptyMessage PhysicalPlanError = 13; } } diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index e54edb718808..16f14d9ddf61 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16683,6 +16683,9 @@ impl serde::Serialize for PlanType { plan_type::PlanTypeEnum::FinalPhysicalPlanWithSchema(v) => { struct_ser.serialize_field("FinalPhysicalPlanWithSchema", v)?; } + plan_type::PlanTypeEnum::PhysicalPlanError(v) => { + struct_ser.serialize_field("PhysicalPlanError", v)?; + } } } struct_ser.end() @@ -16707,6 +16710,7 @@ impl<'de> serde::Deserialize<'de> for PlanType { "FinalPhysicalPlan", "FinalPhysicalPlanWithStats", "FinalPhysicalPlanWithSchema", + "PhysicalPlanError", ]; #[allow(clippy::enum_variant_names)] @@ -16723,6 +16727,7 @@ impl<'de> serde::Deserialize<'de> for PlanType { FinalPhysicalPlan, FinalPhysicalPlanWithStats, FinalPhysicalPlanWithSchema, + PhysicalPlanError, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16756,6 +16761,7 @@ impl<'de> serde::Deserialize<'de> for PlanType { "FinalPhysicalPlan" => Ok(GeneratedField::FinalPhysicalPlan), "FinalPhysicalPlanWithStats" => Ok(GeneratedField::FinalPhysicalPlanWithStats), "FinalPhysicalPlanWithSchema" => Ok(GeneratedField::FinalPhysicalPlanWithSchema), + "PhysicalPlanError" => Ok(GeneratedField::PhysicalPlanError), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16860,6 +16866,13 @@ impl<'de> serde::Deserialize<'de> for PlanType { return Err(serde::de::Error::duplicate_field("FinalPhysicalPlanWithSchema")); } plan_type_enum__ = map_.next_value::<::std::option::Option<_>>()?.map(plan_type::PlanTypeEnum::FinalPhysicalPlanWithSchema) +; + } + GeneratedField::PhysicalPlanError => { + if plan_type_enum__.is_some() { + return Err(serde::de::Error::duplicate_field("PhysicalPlanError")); + } + plan_type_enum__ = map_.next_value::<::std::option::Option<_>>()?.map(plan_type::PlanTypeEnum::PhysicalPlanError) ; } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index dfc30e809108..59a90eb31ade 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -888,7 +888,7 @@ pub struct OptimizedPhysicalPlanType { pub struct PlanType { #[prost( oneof = "plan_type::PlanTypeEnum", - tags = "1, 7, 8, 2, 3, 4, 9, 11, 5, 6, 10, 12" + tags = "1, 7, 8, 2, 3, 4, 9, 11, 5, 6, 10, 12, 13" )] pub plan_type_enum: ::core::option::Option, } @@ -920,6 +920,8 @@ pub mod plan_type { FinalPhysicalPlanWithStats(super::super::datafusion_common::EmptyMessage), #[prost(message, tag = "12")] FinalPhysicalPlanWithSchema(super::super::datafusion_common::EmptyMessage), + #[prost(message, tag = "13")] + PhysicalPlanError(super::super::datafusion_common::EmptyMessage), } } #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index f25fb0bf2561..33b718558827 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -44,7 +44,7 @@ use crate::protobuf::{ AnalyzedLogicalPlan, FinalAnalyzedLogicalPlan, FinalLogicalPlan, FinalPhysicalPlan, FinalPhysicalPlanWithStats, InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithStats, OptimizedLogicalPlan, - OptimizedPhysicalPlan, + OptimizedPhysicalPlan, PhysicalPlanError, }, AnalyzedLogicalPlanType, CubeNode, GroupingSetNode, OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode, @@ -141,6 +141,7 @@ impl From<&protobuf::StringifiedPlan> for StringifiedPlan { FinalPhysicalPlan(_) => PlanType::FinalPhysicalPlan, FinalPhysicalPlanWithStats(_) => PlanType::FinalPhysicalPlanWithStats, FinalPhysicalPlanWithSchema(_) => PlanType::FinalPhysicalPlanWithSchema, + PhysicalPlanError(_) => PlanType::PhysicalPlanError, }, plan: Arc::new(stringified_plan.plan.clone()), } diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index 8af7b19d9091..a5497b2c15e1 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -38,6 +38,7 @@ use crate::protobuf::{ FinalPhysicalPlan, FinalPhysicalPlanWithSchema, FinalPhysicalPlanWithStats, InitialLogicalPlan, InitialPhysicalPlan, InitialPhysicalPlanWithSchema, InitialPhysicalPlanWithStats, OptimizedLogicalPlan, OptimizedPhysicalPlan, + PhysicalPlanError, }, AnalyzedLogicalPlanType, CubeNode, EmptyMessage, GroupingSetNode, LogicalExprList, OptimizedLogicalPlanType, OptimizedPhysicalPlanType, PlaceholderNode, RollupNode, @@ -115,6 +116,9 @@ impl From<&StringifiedPlan> for protobuf::StringifiedPlan { PlanType::FinalPhysicalPlanWithSchema => Some(protobuf::PlanType { plan_type_enum: Some(FinalPhysicalPlanWithSchema(EmptyMessage {})), }), + PlanType::PhysicalPlanError => Some(protobuf::PlanType { + plan_type_enum: Some(PhysicalPlanError(EmptyMessage {})), + }), }, plan: stringified_plan.plan.to_string(), } diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 54658f36ca14..f3fee4f1fca6 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -411,3 +411,28 @@ logical_plan physical_plan 01)ProjectionExec: expr=[{c0:1,c1:2.3,c2:abc} as struct(Int64(1),Float64(2.3),Utf8("abc"))] 02)--PlaceholderRowExec + + +statement ok +create table t1(a int); + +statement ok +create table t2(b int); + +query TT +explain select a from t1 where exists (select count(*) from t2); +---- +logical_plan +01)Filter: EXISTS () +02)--Subquery: +03)----Projection: count(*) +04)------Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] +05)--------TableScan: t2 +06)--TableScan: t1 projection=[a] +physical_plan_error This feature is not implemented: Physical plan does not support logical expression Exists(Exists { subquery: , negated: false }) + +statement ok +drop table t1; + +statement ok +drop table t2; diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index daf270190870..4b90ddf2ea5f 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -4082,6 +4082,7 @@ logical_plan 02)--Projection: multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b, sum(multiple_ordered_table_with_pk.d) AS sum1 03)----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], aggr=[[sum(CAST(multiple_ordered_table_with_pk.d AS Int64))]] 04)------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d] +physical_plan_error This feature is not implemented: Physical plan does not support DistributeBy partitioning # union with aggregate query TT diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 93bb1f1f548e..d45dbc7ee1ae 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -4053,7 +4053,7 @@ query TT explain select t1_id, t1_name, i from join_t1 t1 cross join lateral (select * from unnest(generate_series(1, t1_int))) as series(i); ---- logical_plan -01)Cross Join: +01)Cross Join: 02)--SubqueryAlias: t1 03)----TableScan: join_t1 projection=[t1_id, t1_name] 04)--SubqueryAlias: series @@ -4062,6 +4062,7 @@ logical_plan 07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int)))|depth=1] structs[] 08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t1.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t1.t1_int))) 09)------------EmptyRelation +physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t1" }), name: "t1_int" }) # Test CROSS JOIN LATERAL syntax (execution) @@ -4084,6 +4085,7 @@ logical_plan 07)--------Unnest: lists[unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int)))|depth=1] structs[] 08)----------Projection: generate_series(Int64(1), CAST(outer_ref(t2.t1_int) AS Int64)) AS unnest_placeholder(generate_series(Int64(1),outer_ref(t2.t1_int))) 09)------------EmptyRelation +physical_plan_error This feature is not implemented: Physical plan does not support logical expression OuterReferenceColumn(UInt32, Column { relation: Some(Bare { table: "t2" }), name: "t1_int" }) # Test INNER JOIN LATERAL syntax (execution) diff --git a/datafusion/sqllogictest/test_files/prepare.slt b/datafusion/sqllogictest/test_files/prepare.slt index e306ec7767c7..91b925efa26c 100644 --- a/datafusion/sqllogictest/test_files/prepare.slt +++ b/datafusion/sqllogictest/test_files/prepare.slt @@ -86,11 +86,13 @@ query TT EXPLAIN EXECUTE my_plan; ---- logical_plan Execute: my_plan params=[] +physical_plan_error This feature is not implemented: Unsupported logical plan: Execute query TT EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo'); ---- logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")] +physical_plan_error This feature is not implemented: Unsupported logical plan: Execute query error DataFusion error: Schema error: No field named a\. EXPLAIN EXECUTE my_plan(a); diff --git a/datafusion/sqllogictest/test_files/update.slt b/datafusion/sqllogictest/test_files/update.slt index aaba6998ee63..0f9582b04c58 100644 --- a/datafusion/sqllogictest/test_files/update.slt +++ b/datafusion/sqllogictest/test_files/update.slt @@ -33,6 +33,7 @@ logical_plan 01)Dml: op=[Update] table=[t1] 02)--Projection: CAST(Int64(1) AS Int32) AS a, CAST(Int64(2) AS Utf8) AS b, Float64(3) AS c, CAST(NULL AS Int32) AS d 03)----TableScan: t1 +physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update) query TT explain update t1 set a=c+1, b=a, c=c+1.0, d=b; @@ -41,6 +42,7 @@ logical_plan 01)Dml: op=[Update] table=[t1] 02)--Projection: CAST(t1.c + CAST(Int64(1) AS Float64) AS Int32) AS a, CAST(t1.a AS Utf8) AS b, t1.c + Float64(1) AS c, CAST(t1.b AS Int32) AS d 03)----TableScan: t1 +physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update) statement ok create table t2(a int, b varchar, c double, d int); @@ -58,6 +60,7 @@ logical_plan 06)----------Filter: outer_ref(t1.a) = t2.a 07)------------TableScan: t2 08)----TableScan: t1 +physical_plan_error This feature is not implemented: Physical plan does not support logical expression ScalarSubquery() # set from other table query TT @@ -67,9 +70,10 @@ logical_plan 01)Dml: op=[Update] table=[t1] 02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d 03)----Filter: t1.a = t2.a AND t1.b > Utf8("foo") AND t2.c > Float64(1) -04)------Cross Join: +04)------Cross Join: 05)--------TableScan: t1 06)--------TableScan: t2 +physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update) statement ok create table t3(a int, b varchar, c double, d int); @@ -86,7 +90,8 @@ logical_plan 01)Dml: op=[Update] table=[t1] 02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d 03)----Filter: t.a = t2.a AND t.b > Utf8("foo") AND t2.c > Float64(1) -04)------Cross Join: +04)------Cross Join: 05)--------SubqueryAlias: t 06)----------TableScan: t1 07)--------TableScan: t2 +physical_plan_error This feature is not implemented: Unsupported logical plan: Dml(Update)