diff --git a/rust/lakesoul-datafusion/src/datasource/table_provider.rs b/rust/lakesoul-datafusion/src/datasource/table_provider.rs index 77f97bb03..0fb854d36 100644 --- a/rust/lakesoul-datafusion/src/datasource/table_provider.rs +++ b/rust/lakesoul-datafusion/src/datasource/table_provider.rs @@ -115,23 +115,11 @@ impl TableProvider for LakeSoulTableProvider { self.listing_table.scan(state, projection, filters, limit).await } - fn supports_filter_pushdown( + fn supports_filters_pushdown( &self, - filter: &Expr, - ) -> Result { - if self.primary_keys().is_empty() { - Ok(TableProviderFilterPushDown::Exact) - } else { - if let Ok(cols) = filter.to_columns() { - if cols.iter().all(|col| self.primary_keys().contains(&col.name)) { - Ok(TableProviderFilterPushDown::Inexact) - } else { - Ok(TableProviderFilterPushDown::Unsupported) - } - } else { - Ok(TableProviderFilterPushDown::Unsupported) - } - } + filters: &[&Expr], + ) -> Result> { + self.listing_table.supports_filters_pushdown(filters) } async fn insert_into( diff --git a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs index 153c2fe19..c64bd8bda 100644 --- a/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs +++ b/rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs @@ -4,9 +4,9 @@ use std::sync::Arc; -use arrow::{datatypes::Schema, record_batch::RecordBatch}; +use arrow::record_batch::RecordBatch; -use datafusion::{logical_expr::{Expr, col}, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::Partitioning, execution::context::SessionState, physical_planner::create_physical_sort_expr, common::DFSchema, error::Result, scalar::ScalarValue}; +use datafusion::scalar::ScalarValue; use lakesoul_io::lakesoul_io_config::LakeSoulIOConfigBuilder; use proto::proto::entity::TableInfo; diff --git a/rust/lakesoul-io/src/datasource/listing.rs b/rust/lakesoul-io/src/datasource/listing.rs index acbebba36..e746104b9 100644 --- a/rust/lakesoul-io/src/datasource/listing.rs +++ b/rust/lakesoul-io/src/datasource/listing.rs @@ -128,13 +128,17 @@ impl TableProvider for LakeSoulListingTable { fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result> { if self.lakesoul_io_config.primary_keys.is_empty() { - Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) + if self.lakesoul_io_config.parquet_filter_pushdown { + Ok(vec![TableProviderFilterPushDown::Exact; filters.len()]) + } else { + Ok(vec![TableProviderFilterPushDown::Unsupported; filters.len()]) + } } else { filters .iter() .map(|f| { if let Ok(cols) = f.to_columns() { - if cols.iter().all(|col| self.lakesoul_io_config.primary_keys.contains(&col.name)) { + if self.lakesoul_io_config.parquet_filter_pushdown && cols.iter().all(|col| self.lakesoul_io_config.primary_keys.contains(&col.name)) { Ok(TableProviderFilterPushDown::Inexact) } else { Ok(TableProviderFilterPushDown::Unsupported) diff --git a/rust/lakesoul-io/src/lakesoul_io_config.rs b/rust/lakesoul-io/src/lakesoul_io_config.rs index bce4c25c3..d2022ba95 100644 --- a/rust/lakesoul-io/src/lakesoul_io_config.rs +++ b/rust/lakesoul-io/src/lakesoul_io_config.rs @@ -60,6 +60,8 @@ pub struct LakeSoulIOConfig { pub(crate) max_row_group_size: usize, #[derivative(Default(value = "1"))] pub(crate) prefetch_size: usize, + #[derivative(Default(value = "false"))] + pub(crate) parquet_filter_pushdown: bool, // arrow schema pub(crate) schema: IOSchema, @@ -172,6 +174,11 @@ impl LakeSoulIOConfigBuilder { self } + pub fn with_parquet_filter_pushdown(mut self, enable: bool) -> Self { + self.config.parquet_filter_pushdown = enable; + self + } + pub fn with_columns(mut self, cols: Vec) -> Self { self.config.columns = cols; self @@ -379,7 +386,7 @@ pub fn create_session_context_with_planner(config: &mut LakeSoulIOConfig, planne sess_conf.options_mut().optimizer.enable_round_robin_repartition = false; // if true, the record_batches poll from stream become unordered sess_conf.options_mut().optimizer.prefer_hash_join = false; //if true, panicked at 'range end out of bounds' - sess_conf.options_mut().execution.parquet.pushdown_filters = true; + sess_conf.options_mut().execution.parquet.pushdown_filters = config.parquet_filter_pushdown; // sess_conf.options_mut().execution.parquet.enable_page_index = true; // limit memory for sort writer