Skip to content

Commit

Permalink
chore: enable runtime filter for native datasource (databendlabs#13976)
Browse files Browse the repository at this point in the history
Co-authored-by: BohuTANG <[email protected]>
  • Loading branch information
xudong963 and BohuTANG authored Dec 11, 2023
1 parent 9b7ae4e commit 058c00a
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use crate::pipelines::processors::transforms::hash_join::SingleStringHashJoinHas
use crate::pipelines::processors::HashJoinState;
use crate::sessions::QueryContext;

const INLIST_RUNTIME_FILTER_THRESHOLD: usize = 10_000;
pub(crate) const INLIST_RUNTIME_FILTER_THRESHOLD: usize = 10_000;

/// Define some shared states for all hash join build threads.
pub struct HashJoinBuildState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use ethnum::U256;
use parking_lot::RwLock;

use crate::pipelines::processors::transforms::hash_join::build_state::BuildState;
use crate::pipelines::processors::transforms::hash_join::hash_join_build_state::INLIST_RUNTIME_FILTER_THRESHOLD;
use crate::pipelines::processors::transforms::hash_join::row::RowSpace;
use crate::pipelines::processors::transforms::hash_join::util::build_schema_wrap_nullable;
use crate::pipelines::processors::transforms::hash_join::util::inlist_filter;
Expand Down Expand Up @@ -257,7 +258,7 @@ impl HashJoinState {
let data_blocks = &mut build_state.build_chunks;

let num_rows = build_state.generation_state.build_num_rows;
if num_rows > 10_000 {
if num_rows > INLIST_RUNTIME_FILTER_THRESHOLD {
data_blocks.clear();
return Ok(());
}
Expand Down
5 changes: 5 additions & 0 deletions src/query/storages/fuse/src/operations/read/fuse_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::operations::read::ReadParquetDataSource;
#[allow(clippy::too_many_arguments)]
pub fn build_fuse_native_source_pipeline(
ctx: Arc<dyn TableContext>,
table_schema: Arc<TableSchema>,
pipeline: &mut Pipeline,
block_reader: Arc<BlockReader>,
mut max_threads: usize,
Expand Down Expand Up @@ -77,7 +78,9 @@ pub fn build_fuse_native_source_pipeline(
output.clone(),
ReadNativeDataSource::<true>::create(
i,
plan.table_index,
ctx.clone(),
table_schema.clone(),
output,
block_reader.clone(),
partitions.clone(),
Expand All @@ -102,7 +105,9 @@ pub fn build_fuse_native_source_pipeline(
output.clone(),
ReadNativeDataSource::<false>::create(
i,
plan.table_index,
ctx.clone(),
table_schema.clone(),
output,
block_reader.clone(),
partitions.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@ use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::FunctionContext;
use common_expression::TableSchema;
use common_pipeline_core::processors::Event;
use common_pipeline_core::processors::OutputPort;
use common_pipeline_core::processors::Processor;
use common_pipeline_core::processors::ProcessorPtr;
use common_pipeline_sources::SyncSource;
use common_pipeline_sources::SyncSourcer;
use common_sql::IndexType;

use super::native_data_source::DataSource;
use crate::io::AggIndexReader;
use crate::io::BlockReader;
use crate::io::TableMetaLocationGenerator;
use crate::io::VirtualColumnReader;
use crate::operations::read::native_data_source::NativeDataSourceMeta;
use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner;
use crate::FusePartInfo;

pub struct ReadNativeDataSource<const BLOCKING_IO: bool> {
func_ctx: FunctionContext,
id: usize,
finished: bool,
batch_size: usize,
Expand All @@ -49,20 +54,27 @@ pub struct ReadNativeDataSource<const BLOCKING_IO: bool> {

index_reader: Arc<Option<AggIndexReader>>,
virtual_reader: Arc<Option<VirtualColumnReader>>,

table_schema: Arc<TableSchema>,
table_index: IndexType,
}

impl ReadNativeDataSource<true> {
pub fn create(
id: usize,
table_index: IndexType,
ctx: Arc<dyn TableContext>,
table_schema: Arc<TableSchema>,
output: Arc<OutputPort>,
block_reader: Arc<BlockReader>,
partitions: StealablePartitions,
index_reader: Arc<Option<AggIndexReader>>,
virtual_reader: Arc<Option<VirtualColumnReader>>,
) -> Result<ProcessorPtr> {
let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize;
let func_ctx = ctx.get_function_context()?;
SyncSourcer::create(ctx.clone(), output.clone(), ReadNativeDataSource::<true> {
func_ctx,
id,
output,
batch_size,
Expand All @@ -72,24 +84,30 @@ impl ReadNativeDataSource<true> {
partitions,
index_reader,
virtual_reader,
table_schema,
table_index,
})
}
}

impl ReadNativeDataSource<false> {
pub fn create(
id: usize,
table_index: IndexType,
ctx: Arc<dyn TableContext>,
table_schema: Arc<TableSchema>,
output: Arc<OutputPort>,
block_reader: Arc<BlockReader>,
partitions: StealablePartitions,
index_reader: Arc<Option<AggIndexReader>>,
virtual_reader: Arc<Option<VirtualColumnReader>>,
) -> Result<ProcessorPtr> {
let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize;
let func_ctx = ctx.get_function_context()?;
Ok(ProcessorPtr::create(Box::new(ReadNativeDataSource::<
false,
> {
func_ctx,
id,
output,
batch_size,
Expand All @@ -99,6 +117,8 @@ impl ReadNativeDataSource<false> {
partitions,
index_reader,
virtual_reader,
table_schema,
table_index,
})))
}
}
Expand All @@ -110,6 +130,17 @@ impl SyncSource for ReadNativeDataSource<true> {
match self.partitions.steal_one(self.id) {
None => Ok(None),
Some(part) => {
if runtime_filter_pruner(
self.table_schema.clone(),
&part,
&self
.partitions
.ctx
.get_runtime_filter_with_id(self.table_index),
&self.func_ctx,
)? {
return Ok(Some(DataBlock::empty()));
}
if let Some(index_reader) = self.index_reader.as_ref() {
let fuse_part = FusePartInfo::from_part(&part)?;
let loc =
Expand Down Expand Up @@ -198,7 +229,15 @@ impl Processor for ReadNativeDataSource<false> {

if !parts.is_empty() {
let mut chunks = Vec::with_capacity(parts.len());
let filters = self
.partitions
.ctx
.get_runtime_filter_with_id(self.table_index);
for part in &parts {
if runtime_filter_pruner(self.table_schema.clone(), part, &filters, &self.func_ctx)?
{
continue;
}
let part = part.clone();
let block_reader = self.block_reader.clone();
let index_reader = self.index_reader.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataBlock;
use common_expression::FunctionContext;
use common_expression::TableSchema;
use common_pipeline_core::processors::Event;
use common_pipeline_core::processors::OutputPort;
Expand All @@ -42,7 +43,7 @@ use crate::operations::read::parquet_data_source::DataSourceMeta;
use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner;

pub struct ReadParquetDataSource<const BLOCKING_IO: bool> {
ctx: Arc<dyn TableContext>,
func_ctx: FunctionContext,
id: usize,
table_index: IndexType,
finished: bool,
Expand Down Expand Up @@ -73,10 +74,10 @@ impl<const BLOCKING_IO: bool> ReadParquetDataSource<BLOCKING_IO> {
virtual_reader: Arc<Option<VirtualColumnReader>>,
) -> Result<ProcessorPtr> {
let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize;

let func_ctx = ctx.get_function_context()?;
if BLOCKING_IO {
SyncSourcer::create(ctx.clone(), output.clone(), ReadParquetDataSource::<true> {
ctx: ctx.clone(),
func_ctx,
id,
table_index,
output,
Expand All @@ -93,7 +94,7 @@ impl<const BLOCKING_IO: bool> ReadParquetDataSource<BLOCKING_IO> {
Ok(ProcessorPtr::create(Box::new(ReadParquetDataSource::<
false,
> {
ctx: ctx.clone(),
func_ctx,
id,
table_index,
output,
Expand All @@ -120,8 +121,11 @@ impl SyncSource for ReadParquetDataSource<true> {
if runtime_filter_pruner(
self.table_schema.clone(),
&part,
&self.ctx.get_runtime_filter_with_id(self.table_index),
&self.partitions.ctx.get_function_context()?,
&self
.partitions
.ctx
.get_runtime_filter_with_id(self.table_index),
&self.func_ctx,
)? {
return Ok(Some(DataBlock::empty()));
}
Expand Down Expand Up @@ -220,13 +224,13 @@ impl Processor for ReadParquetDataSource<false> {

if !parts.is_empty() {
let mut chunks = Vec::with_capacity(parts.len());
let filters = self
.partitions
.ctx
.get_runtime_filter_with_id(self.table_index);
for part in &parts {
if runtime_filter_pruner(
self.table_schema.clone(),
part,
&self.ctx.get_runtime_filter_with_id(self.table_index),
&self.partitions.ctx.get_function_context()?,
)? {
if runtime_filter_pruner(self.table_schema.clone(), part, &filters, &self.func_ctx)?
{
continue;
}
let part = part.clone();
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/read_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ impl FuseTable {
match storage_format {
FuseStorageFormat::Native => build_fuse_native_source_pipeline(
ctx,
table_schema,
pipeline,
block_reader,
max_threads,
Expand Down

0 comments on commit 058c00a

Please sign in to comment.