Skip to content

Commit

Permalink
add parquet_filter_pushdown swiches
Browse files Browse the repository at this point in the history
Signed-off-by: zenghua <[email protected]>
  • Loading branch information
zenghua committed Dec 26, 2023
1 parent cf91277 commit b1a766f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 21 deletions.
20 changes: 4 additions & 16 deletions rust/lakesoul-datafusion/src/datasource/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,11 @@ impl TableProvider for LakeSoulTableProvider {
self.listing_table.scan(state, projection, filters, limit).await
}

fn supports_filter_pushdown(
fn supports_filters_pushdown(
&self,
filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
if self.primary_keys().is_empty() {
Ok(TableProviderFilterPushDown::Exact)
} else {
if let Ok(cols) = filter.to_columns() {
if cols.iter().all(|col| self.primary_keys().contains(&col.name)) {
Ok(TableProviderFilterPushDown::Inexact)
} else {
Ok(TableProviderFilterPushDown::Unsupported)
}
} else {
Ok(TableProviderFilterPushDown::Unsupported)
}
}
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
self.listing_table.supports_filters_pushdown(filters)
}

async fn insert_into(
Expand Down
4 changes: 2 additions & 2 deletions rust/lakesoul-datafusion/src/lakesoul_table/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

use std::sync::Arc;

use arrow::{datatypes::Schema, record_batch::RecordBatch};
use arrow::record_batch::RecordBatch;

use datafusion::{logical_expr::{Expr, col}, physical_expr::{create_physical_expr, PhysicalSortExpr}, physical_plan::Partitioning, execution::context::SessionState, physical_planner::create_physical_sort_expr, common::DFSchema, error::Result, scalar::ScalarValue};
use datafusion::scalar::ScalarValue;

use lakesoul_io::lakesoul_io_config::LakeSoulIOConfigBuilder;
use proto::proto::entity::TableInfo;
Expand Down
8 changes: 6 additions & 2 deletions rust/lakesoul-io/src/datasource/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,17 @@ impl TableProvider for LakeSoulListingTable {

fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<TableProviderFilterPushDown>> {
if self.lakesoul_io_config.primary_keys.is_empty() {
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
if self.lakesoul_io_config.parquet_filter_pushdown {
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
} else {
Ok(vec![TableProviderFilterPushDown::Unsupported; filters.len()])
}
} else {
filters
.iter()
.map(|f| {
if let Ok(cols) = f.to_columns() {
if cols.iter().all(|col| self.lakesoul_io_config.primary_keys.contains(&col.name)) {
if self.lakesoul_io_config.parquet_filter_pushdown && cols.iter().all(|col| self.lakesoul_io_config.primary_keys.contains(&col.name)) {
Ok(TableProviderFilterPushDown::Inexact)
} else {
Ok(TableProviderFilterPushDown::Unsupported)
Expand Down
9 changes: 8 additions & 1 deletion rust/lakesoul-io/src/lakesoul_io_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct LakeSoulIOConfig {
pub(crate) max_row_group_size: usize,
#[derivative(Default(value = "1"))]
pub(crate) prefetch_size: usize,
#[derivative(Default(value = "false"))]
pub(crate) parquet_filter_pushdown: bool,

// arrow schema
pub(crate) schema: IOSchema,
Expand Down Expand Up @@ -172,6 +174,11 @@ impl LakeSoulIOConfigBuilder {
self
}

pub fn with_parquet_filter_pushdown(mut self, enable: bool) -> Self {
self.config.parquet_filter_pushdown = enable;
self
}

pub fn with_columns(mut self, cols: Vec<String>) -> Self {
self.config.columns = cols;
self
Expand Down Expand Up @@ -379,7 +386,7 @@ pub fn create_session_context_with_planner(config: &mut LakeSoulIOConfig, planne

sess_conf.options_mut().optimizer.enable_round_robin_repartition = false; // if true, the record_batches poll from stream become unordered
sess_conf.options_mut().optimizer.prefer_hash_join = false; //if true, panicked at 'range end out of bounds'
sess_conf.options_mut().execution.parquet.pushdown_filters = true;
sess_conf.options_mut().execution.parquet.pushdown_filters = config.parquet_filter_pushdown;
// sess_conf.options_mut().execution.parquet.enable_page_index = true;

// limit memory for sort writer
Expand Down

0 comments on commit b1a766f

Please sign in to comment.