diff --git a/.clang-format b/.clang-format index 39c6f8ead32ab5..977e4382e3cd16 100644 --- a/.clang-format +++ b/.clang-format @@ -13,7 +13,6 @@ PointerAlignment: Left ReflowComments: false SortUsingDeclarations: false SpacesBeforeTrailingComments: 1 -AllowShortFunctionsOnASingleLine: Inline --- Language: Java BasedOnStyle: Google diff --git a/be/src/exec/hash_joiner.cpp b/be/src/exec/hash_joiner.cpp index d1300b5642eab3..94c0822a236a96 100644 --- a/be/src/exec/hash_joiner.cpp +++ b/be/src/exec/hash_joiner.cpp @@ -575,7 +575,20 @@ Status HashJoiner::_create_runtime_bloom_filters(RuntimeState* state, int64_t li int expr_order = rf_desc->build_expr_order(); ColumnPtr column = ht.get_key_columns()[expr_order]; bool eq_null = _is_null_safes[expr_order]; - _runtime_bloom_filter_build_params.emplace_back(pipeline::RuntimeBloomFilterBuildParam(eq_null, column)); + MutableJoinRuntimeFilterPtr filter = nullptr; + auto multi_partitioned = rf_desc->layout().pipeline_level_multi_partitioned(); + if (multi_partitioned) { + LogicalType build_type = rf_desc->build_expr_type(); + filter = std::shared_ptr( + RuntimeFilterHelper::create_runtime_bloom_filter(nullptr, build_type)); + if (filter == nullptr) continue; + filter->set_join_mode(rf_desc->join_mode()); + filter->init(ht.get_row_count()); + RETURN_IF_ERROR(RuntimeFilterHelper::fill_runtime_bloom_filter(column, build_type, filter.get(), + kHashJoinKeyColumnOffset, eq_null)); + } + _runtime_bloom_filter_build_params.emplace_back(pipeline::RuntimeBloomFilterBuildParam( + multi_partitioned, eq_null, std::move(column), std::move(filter))); } return Status::OK(); } diff --git a/be/src/exec/pipeline/exchange/shuffler.h b/be/src/exec/pipeline/exchange/shuffler.h index b1c668f8ed532d..5ca3f130889c14 100644 --- a/be/src/exec/pipeline/exchange/shuffler.h +++ b/be/src/exec/pipeline/exchange/shuffler.h @@ -75,8 +75,13 @@ class Shuffler { if constexpr (!two_level_shuffle) { shuffle_id = channel_id; } else { - uint32_t driver_sequence = ReduceOp()(HashUtil::xorshift32(hash_values[i]), _num_shuffles_per_channel); - shuffle_id = channel_id * _num_shuffles_per_channel + driver_sequence; + if (_num_shuffles_per_channel == 1) { + shuffle_id = channel_id; + } else { + uint32_t driver_sequence = + ReduceOp()(HashUtil::xorshift32(hash_values[i]), _num_shuffles_per_channel); + shuffle_id = channel_id * _num_shuffles_per_channel + driver_sequence; + } } shuffle_channel_ids[i] = shuffle_id; } diff --git a/be/src/exec/pipeline/runtime_filter_types.h b/be/src/exec/pipeline/runtime_filter_types.h index 1d9671163a04aa..52b7d947eec20a 100644 --- a/be/src/exec/pipeline/runtime_filter_types.h +++ b/be/src/exec/pipeline/runtime_filter_types.h @@ -46,9 +46,16 @@ struct RuntimeBloomFilterBuildParam; using OptRuntimeBloomFilterBuildParams = std::vector>; // Parameters used to build runtime bloom-filters. struct RuntimeBloomFilterBuildParam { - RuntimeBloomFilterBuildParam(bool eq_null, ColumnPtr column) : eq_null(eq_null), column(std::move(column)) {} + RuntimeBloomFilterBuildParam(bool multi_partitioned, bool eq_null, ColumnPtr column, + MutableJoinRuntimeFilterPtr runtime_filter) + : multi_partitioned(multi_partitioned), + eq_null(eq_null), + column(std::move(column)), + runtime_filter(std::move(runtime_filter)) {} + bool multi_partitioned; bool eq_null; ColumnPtr column; + MutableJoinRuntimeFilterPtr runtime_filter; }; // RuntimeFilterCollector contains runtime in-filters and bloom-filters, it is stored in RuntimeFilerHub @@ -306,6 +313,18 @@ class PartialRuntimeFilterMerger { } Status merge_local_bloom_filters() { + if (_bloom_filter_descriptors.empty()) { + return Status::OK(); + } + auto multi_partitioned = _bloom_filter_descriptors[0]->layout().pipeline_level_multi_partitioned(); + if (multi_partitioned) { + return merge_multi_partitioned_local_bloom_filters(); + } else { + return merge_singleton_local_bloom_filters(); + } + } + + Status merge_singleton_local_bloom_filters() { if (_partial_bloom_filter_build_params.empty()) { return Status::OK(); } @@ -381,6 +400,76 @@ class PartialRuntimeFilterMerger { return Status::OK(); } + Status merge_multi_partitioned_local_bloom_filters() { + if (_partial_bloom_filter_build_params.empty()) { + return Status::OK(); + } + size_t row_count = 0; + for (auto count : _ht_row_counts) { + row_count += count; + } + for (auto& desc : _bloom_filter_descriptors) { + desc->set_is_pipeline(true); + // skip if it does not have consumer. + if (!desc->has_consumer()) continue; + // skip if ht.size() > limit, and it's only for local. + if (!desc->has_remote_targets() && row_count > _limit) continue; + LogicalType build_type = desc->build_expr_type(); + JoinRuntimeFilter* filter = RuntimeFilterHelper::create_runtime_bloom_filter(_pool, build_type); + if (filter == nullptr) continue; + filter->init(row_count); + desc->set_runtime_filter(filter); + } + + const auto& num_bloom_filters = _bloom_filter_descriptors.size(); + + // remove empty params that generated in two cases: + // 1. the corresponding HashJoinProbeOperator is finished in short-circuit style because HashJoinBuildOperator + // above this operator has constructed an empty hash table. + // 2. the HashJoinBuildOperator is finished in advance because the fragment instance is canceled + _partial_bloom_filter_build_params.erase( + std::remove_if(_partial_bloom_filter_build_params.begin(), _partial_bloom_filter_build_params.end(), + [](auto& opt_params) { return opt_params.empty(); }), + _partial_bloom_filter_build_params.end()); + + // there is no non-empty params, set all runtime filter to nullptr + if (_partial_bloom_filter_build_params.empty()) { + for (auto& desc : _bloom_filter_descriptors) { + desc->set_runtime_filter(nullptr); + } + return Status::OK(); + } + + // all params must have the same size as num_bloom_filters + DCHECK(std::all_of(_partial_bloom_filter_build_params.begin(), _partial_bloom_filter_build_params.end(), + [&num_bloom_filters](auto& opt_params) { return opt_params.size() == num_bloom_filters; })); + + for (auto i = 0; i < num_bloom_filters; ++i) { + auto& desc = _bloom_filter_descriptors[i]; + if (desc->runtime_filter() == nullptr) { + continue; + } + auto can_merge = + std::all_of(_partial_bloom_filter_build_params.begin(), _partial_bloom_filter_build_params.end(), + [i](auto& opt_params) { return opt_params[i].has_value(); }); + if (!can_merge) { + desc->set_runtime_filter(nullptr); + continue; + } + auto* rf = desc->runtime_filter(); + for (auto& opt_params : _partial_bloom_filter_build_params) { + auto& opt_param = opt_params[i]; + DCHECK(opt_param.has_value()); + auto& param = opt_param.value(); + if (param.column == nullptr || param.column->empty()) { + continue; + } + rf->concat(param.runtime_filter.get()); + } + } + return Status::OK(); + } + size_t limit() const { return _limit; } private: diff --git a/be/src/exprs/CMakeLists.txt b/be/src/exprs/CMakeLists.txt index fda81ffb51ed9b..b26006ab266cdf 100644 --- a/be/src/exprs/CMakeLists.txt +++ b/be/src/exprs/CMakeLists.txt @@ -92,6 +92,7 @@ set(EXPR_FILES subfield_expr.cpp map_apply_expr.cpp map_expr.cpp + runtime_filter_layout.cpp ) add_library(Exprs ${EXPR_FILES}) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index d728b38af0df40..4b1a04fe45cbef 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -142,8 +142,9 @@ bool SimdBlockFilter::check_equal(const SimdBlockFilter& bf) const { size_t JoinRuntimeFilter::max_serialized_size() const { // todo(yan): noted that it's not serialize compatible with 32-bit and 64-bit. - size_t size = sizeof(_has_null) + sizeof(_size) + sizeof(_num_hash_partitions) + sizeof(_join_mode); - if (_num_hash_partitions == 0) { + auto num_partitions = _hash_partition_bf.size(); + size_t size = sizeof(_has_null) + sizeof(_size) + sizeof(num_partitions) + sizeof(_join_mode); + if (num_partitions == 0) { size += _bf.max_serialized_size(); } else { for (const auto& bf : _hash_partition_bf) { @@ -155,16 +156,17 @@ size_t JoinRuntimeFilter::max_serialized_size() const { size_t JoinRuntimeFilter::serialize(int serialize_version, uint8_t* data) const { size_t offset = 0; + auto num_partitions = _hash_partition_bf.size(); #define JRF_COPY_FIELD(field) \ memcpy(data + offset, &field, sizeof(field)); \ offset += sizeof(field); JRF_COPY_FIELD(_has_null); JRF_COPY_FIELD(_size); - JRF_COPY_FIELD(_num_hash_partitions); + JRF_COPY_FIELD(num_partitions); JRF_COPY_FIELD(_join_mode); #undef JRF_COPY_FIELD - if (_num_hash_partitions == 0) { + if (num_partitions == 0) { offset += _bf.serialize(data + offset); } else { @@ -177,19 +179,20 @@ size_t JoinRuntimeFilter::serialize(int serialize_version, uint8_t* data) const size_t JoinRuntimeFilter::deserialize(int serialize_version, const uint8_t* data) { size_t offset = 0; + size_t num_partitions = 0; #define JRF_COPY_FIELD(field) \ memcpy(&field, data + offset, sizeof(field)); \ offset += sizeof(field); JRF_COPY_FIELD(_has_null); JRF_COPY_FIELD(_size); - JRF_COPY_FIELD(_num_hash_partitions); + JRF_COPY_FIELD(num_partitions); JRF_COPY_FIELD(_join_mode); #undef JRF_COPY_FIELD - if (_num_hash_partitions == 0) { + if (num_partitions == 0) { offset += _bf.deserialize(data + offset); } else { - for (size_t i = 0; i < _num_hash_partitions; i++) { + for (size_t i = 0; i < num_partitions; i++) { SimdBlockFilter bf; offset += bf.deserialize(data + offset); _hash_partition_bf.emplace_back(std::move(bf)); @@ -200,13 +203,15 @@ size_t JoinRuntimeFilter::deserialize(int serialize_version, const uint8_t* data } bool JoinRuntimeFilter::check_equal(const JoinRuntimeFilter& rf) const { - bool first = (_has_null == rf._has_null && _size == rf._size && _num_hash_partitions == rf._num_hash_partitions && + auto lhs_num_partitions = _hash_partition_bf.size(); + auto rhs_num_partitions = rf._hash_partition_bf.size(); + bool first = (_has_null == rf._has_null && _size == rf._size && lhs_num_partitions == rhs_num_partitions && _join_mode == rf._join_mode); if (!first) return false; - if (_num_hash_partitions == 0) { + if (lhs_num_partitions) { if (!_bf.check_equal(rf._bf)) return false; } else { - for (size_t i = 0; i < _num_hash_partitions; i++) { + for (size_t i = 0; i < lhs_num_partitions; ++i) { if (!_hash_partition_bf[i].check_equal(rf._hash_partition_bf[i])) { return false; } diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index 07db2a040c8c79..f6b127ed2f015b 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -22,6 +22,8 @@ #include "column/vectorized_fwd.h" #include "common/global_types.h" #include "common/object_pool.h" +#include "exec/pipeline/exchange/shuffler.h" +#include "exprs/runtime_filter_layout.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/Types_types.h" #include "types/logical_type.h" @@ -271,6 +273,9 @@ class SmallHashSet { }; // The runtime filter generated by join right small table +class JoinRuntimeFilter; +using JoinRuntimeFilterPtr = std::shared_ptr; +using MutableJoinRuntimeFilterPtr = std::shared_ptr; class JoinRuntimeFilter { public: virtual ~JoinRuntimeFilter() = default; @@ -283,22 +288,21 @@ class JoinRuntimeFilter { Filter merged_selection; bool use_merged_selection; std::vector hash_values; - const std::vector* bucketseq_to_partition; bool compatibility = true; }; - virtual void compute_hash(const std::vector& columns, RunningContext* ctx) const = 0; + virtual void compute_partition_index(const RuntimeFilterLayout& layout, const std::vector& columns, + RunningContext* ctx) const = 0; virtual void evaluate(Column* input_column, RunningContext* ctx) const = 0; size_t size() const { return _size; } bool always_true() const { return _always_true; } - size_t num_hash_partitions() const { return _num_hash_partitions; } + size_t num_hash_partitions() const { return _hash_partition_bf.size(); } bool has_null() const { return _has_null; } virtual std::string debug_string() const = 0; - void set_join_mode(int8_t join_mode) { _join_mode = join_mode; } // RuntimeFilter version // if the RuntimeFilter is updated, the version will be updated as well, // (usually used for TopN Filter) @@ -317,33 +321,106 @@ class JoinRuntimeFilter { virtual void concat(JoinRuntimeFilter* rf) { _has_null |= rf->_has_null; - _hash_partition_bf.emplace_back(std::move(rf->_bf)); - _num_hash_partitions = _hash_partition_bf.size(); + if (rf->_hash_partition_bf.empty()) { + _hash_partition_bf.emplace_back(std::move(rf->_bf)); + } else { + for (auto&& bf : rf->_hash_partition_bf) { + _hash_partition_bf.emplace_back(std::move(bf)); + } + } _join_mode = rf->_join_mode; _size += rf->_size; } virtual bool check_equal(const JoinRuntimeFilter& rf) const; virtual JoinRuntimeFilter* create_empty(ObjectPool* pool) = 0; + void set_join_mode(int8_t join_mode) { _join_mode = join_mode; } + void set_global() { this->_global = true; } protected: void _update_version() { _rf_version++; } bool _has_null = false; + bool _global = false; size_t _size = 0; int8_t _join_mode = 0; SimdBlockFilter _bf; - size_t _num_hash_partitions = 0; std::vector _hash_partition_bf; bool _always_true = false; size_t _rf_version = 0; }; +template +struct WithModuloArg { + template + struct HashValueCompute { + void operator()(const RuntimeFilterLayout& layout, const std::vector& columns, size_t num_rows, + size_t real_num_partitions, std::vector& hash_values) const { + if constexpr (layout_is_singleton) { + hash_values.assign(num_rows, 0); + return; + } + + typedef void (Column::*HashFuncType)(uint32_t*, uint32_t, uint32_t) const; + auto compute_hash = [&columns, &num_rows, &hash_values](HashFuncType hash_func) { + for (Column* input_column : columns) { + (input_column->*hash_func)(hash_values.data(), 0, num_rows); + } + }; + + if constexpr (layout_is_shuffle) { + hash_values.assign(num_rows, HashUtil::FNV_SEED); + compute_hash(&Column::fnv_hash); + [[maybe_unused]] const auto num_instances = layout.num_instances(); + [[maybe_unused]] const auto num_drivers_per_instance = layout.num_drivers_per_instance(); + [[maybe_unused]] const auto num_partitions = num_instances * num_drivers_per_instance; + for (auto i = 0; i < num_rows; ++i) { + auto& hash_value = hash_values[i]; + if constexpr (layout_is_pipeline_shuffle) { + hash_value = ModuloFunc()(HashUtil::xorshift32(hash_value), num_drivers_per_instance); + } else if constexpr (layout_is_global_shuffle_1l) { + hash_value = ModuloFunc()(hash_value, real_num_partitions); + } else if constexpr (layout_is_global_shuffle_2l) { + hash_value = ModuloFunc()(HashUtil::xorshift32(hash_value), num_partitions); + } + } + } else if (layout_is_bucket) { + hash_values.assign(num_rows, 0); + compute_hash(&Column::crc32_hash); + [[maybe_unused]] const auto& bucketseq_to_instance = layout.bucketseq_to_instance(); + [[maybe_unused]] const auto& bucketseq_to_driverseq = layout.bucketseq_to_driverseq(); + [[maybe_unused]] const auto& bucketseq_to_partition = layout.bucketseq_to_partition(); + [[maybe_unused]] const auto num_buckets = bucketseq_to_instance.size(); + [[maybe_unused]] const auto num_drivers_per_instance = layout.num_drivers_per_instance(); + for (auto i = 0; i < num_rows; ++i) { + auto& hash_value = hash_values[i]; + if constexpr (layout_is_pipeline_bucket) { + hash_value = bucketseq_to_driverseq[ModuloFunc()(hash_value, num_buckets)]; + } else if constexpr (layout_is_pipeline_bucket_lx) { + hash_value = ModuloFunc()(HashUtil::xorshift32(hash_value), num_drivers_per_instance); + } else if constexpr (layout_is_global_bucket_1l) { + hash_value = bucketseq_to_instance[ModuloFunc()(hash_value, num_buckets)]; + } else if constexpr (layout_is_global_bucket_2l) { + hash_value = bucketseq_to_partition[ModuloFunc()(hash_value, num_buckets)]; + } else if constexpr (layout_is_global_bucket_2l_lx) { + const auto bucketseq = ModuloFunc()(hash_value, num_buckets); + const auto instance = bucketseq_to_instance[bucketseq]; + const auto driverseq = ModuloFunc()(HashUtil::xorshift32(hash_value), num_drivers_per_instance); + hash_value = (instance == BUCKET_ABSENT) ? BUCKET_ABSENT + : instance * num_drivers_per_instance + driverseq; + } + } + } + } + }; +}; + // The join runtime filter implement by bloom filter template class RuntimeBloomFilter final : public JoinRuntimeFilter { public: using CppType = RunTimeCppType; using ColumnType = RunTimeColumnType; + using SelfType = RuntimeBloomFilter; RuntimeBloomFilter() { _init_min_max(); } ~RuntimeBloomFilter() override = default; @@ -431,14 +508,68 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter { bool right_close_interval() const { return _right_close_interval; } void evaluate(Column* input_column, RunningContext* ctx) const override { - if (_num_hash_partitions != 0) { - return _t_evaluate(input_column, ctx); + if (!_hash_partition_bf.empty()) { + _t_evaluate(input_column, ctx); + } else { + _t_evaluate(input_column, ctx); + } + } + + // `hash_parittion` parameters means if this runtime filter has multiple `simd-block-filter` underneath. + // for local runtime filter, it only has once `simd-block-filter`, and `hash_partition` is false. + // and for global runtime filter, since it concates multiple runtime filters from partitions + // so it has multiple `simd-block-filter` and `hash_partition` is true. + // For more information, you can refers to doc `shuffle-aware runtime filter`. + + template + void _t_evaluate(Column* input_column, RunningContext* ctx) const { + size_t size = input_column->size(); + Filter& _selection_filter = ctx->use_merged_selection ? ctx->merged_selection : ctx->selection; + _selection_filter.resize(size); + uint8_t* _selection = _selection_filter.data(); + // reuse ctx's hash_values object. + HashValues& _hash_values = ctx->hash_values; + if constexpr (multi_partition) { + DCHECK_LE(size, _hash_values.size()); + } + if (input_column->is_constant()) { + const auto* const_column = down_cast(input_column); + if (const_column->only_null()) { + _selection[0] = _has_null; + } else { + auto* input_data = down_cast(const_column->data_column().get())->get_data().data(); + _evaluate_min_max(input_data, _selection, 1); + _rf_test_data(_selection, input_data, _hash_values, 0); + } + uint8_t sel = _selection[0]; + memset(_selection, sel, size); + } else if (input_column->is_nullable()) { + const auto* nullable_column = down_cast(input_column); + auto* input_data = down_cast(nullable_column->data_column().get())->get_data().data(); + _evaluate_min_max(input_data, _selection, size); + if (nullable_column->has_null()) { + const uint8_t* null_data = nullable_column->immutable_null_column_data().data(); + for (int i = 0; i < size; i++) { + if (null_data[i]) { + _selection[i] = _has_null; + } else { + _rf_test_data(_selection, input_data, _hash_values, i); + } + } + } else { + for (int i = 0; i < size; ++i) { + _rf_test_data(_selection, input_data, _hash_values, i); + } + } } else { - return _t_evaluate(input_column, ctx); + auto* input_data = down_cast(input_column)->get_data().data(); + _evaluate_min_max(input_data, _selection, size); + for (int i = 0; i < size; ++i) { + _rf_test_data(_selection, input_data, _hash_values, i); + } } } - // this->min = std::min(other->min, this->min) // this->max = std::max(other->max, this->max) void merge(const JoinRuntimeFilter* rf) override { JoinRuntimeFilter::merge(rf); @@ -607,31 +738,24 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter { return false; } - void compute_hash(const std::vector& columns, RunningContext* ctx) const override { + void compute_partition_index(const RuntimeFilterLayout& layout, const std::vector& columns, + RunningContext* ctx) const override { if (columns.empty() || _join_mode == TRuntimeFilterBuildJoinMode::NONE) return; size_t num_rows = columns[0]->size(); // initialize hash_values. // reuse ctx's hash_values object. std::vector& _hash_values = ctx->hash_values; - switch (_join_mode) { - case TRuntimeFilterBuildJoinMode::LOCAL_HASH_BUCKET: - case TRuntimeFilterBuildJoinMode::COLOCATE: - case TRuntimeFilterBuildJoinMode::BORADCAST: { - _hash_values.assign(num_rows, 0); - break; - } - case TRuntimeFilterBuildJoinMode::PARTITIONED: - case TRuntimeFilterBuildJoinMode::SHUFFLE_HASH_BUCKET: { - _hash_values.assign(num_rows, HashUtil::FNV_SEED); - break; - } - default: - DCHECK(false) << "unexpected join mode: " << _join_mode; - } - // compute hash_values - _compute_hash_values_for_multi_part(ctx, _join_mode, columns, num_rows, _hash_values); + auto use_reduce = !ctx->compatibility && (_join_mode == TRuntimeFilterBuildJoinMode::PARTITIONED || + _join_mode == TRuntimeFilterBuildJoinMode::SHUFFLE_HASH_BUCKET); + if (use_reduce) { + dispatch_layout::HashValueCompute>(_global, layout, columns, num_rows, + _hash_partition_bf.size(), _hash_values); + } else { + dispatch_layout::HashValueCompute>(_global, layout, columns, num_rows, + _hash_partition_bf.size(), _hash_values); + } } private: @@ -723,60 +847,12 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter { } } - void _compute_hash_values_for_multi_part(RunningContext* running_ctx, int8_t join_mode, - const std::vector& columns, size_t num_rows, - std::vector& hash_values) const { - typedef void (Column::*HashFuncType)(uint32_t*, uint32_t, uint32_t) const; - - auto compute_hash = [&columns, &num_rows, &hash_values](HashFuncType hash_func, size_t num_hash_partitions, - bool fast_reduce) { - for (Column* input_column : columns) { - (input_column->*hash_func)(hash_values.data(), 0, num_rows); - } - if (fast_reduce) { - for (size_t i = 0; i < num_rows; i++) { - hash_values[i] = ReduceOp()(hash_values[i], num_hash_partitions); - } - } else { - for (size_t i = 0; i < num_rows; i++) { - hash_values[i] %= num_hash_partitions; - } - } - }; - - switch (join_mode) { - case TRuntimeFilterBuildJoinMode::BORADCAST: { - break; - } - case TRuntimeFilterBuildJoinMode::PARTITIONED: - case TRuntimeFilterBuildJoinMode::SHUFFLE_HASH_BUCKET: { - compute_hash(&Column::fnv_hash, _num_hash_partitions, !running_ctx->compatibility); - break; - } - case TRuntimeFilterBuildJoinMode::LOCAL_HASH_BUCKET: - case TRuntimeFilterBuildJoinMode::COLOCATE: { - // shuffle-aware grf is partitioned into multiple parts the number of whom equals to the number of - // instances. we can use crc32_hash to compute out bucket_seq that the row belongs to, then use - // the bucketseq_to_partition array to translate bucket_seq into partition index of the grf. - const auto& bucketseq_to_partition = *running_ctx->bucketseq_to_partition; - compute_hash(&Column::crc32_hash, bucketseq_to_partition.size(), false); - for (auto i = 0; i < num_rows; ++i) { - hash_values[i] = bucketseq_to_partition[hash_values[i]]; - } - break; - } - default: - DCHECK(false) << "unexpected join mode: " << join_mode; - } - } - bool _test_data(CppType value) const { size_t hash = compute_hash(value); return _bf.test_hash(hash); } bool _test_data_with_hash(CppType value, const uint32_t shuffle_hash) const { - static constexpr uint32_t BUCKET_ABSENT = 2147483647; if (shuffle_hash == BUCKET_ABSENT) { return false; } @@ -798,61 +874,6 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter { } } - // `hash_parittion` parameters means if this runtime filter has multiple `simd-block-filter` underneath. - // for local runtime filter, it only has once `simd-block-filter`, and `hash_partition` is false. - // and for global runtime filter, since it concates multiple runtime filters from partitions - // so it has multiple `simd-block-filter` and `hash_partition` is true. - // For more information, you can refers to doc `shuffle-aware runtime filter`. - template - void _t_evaluate(Column* input_column, RunningContext* ctx) const { - size_t size = input_column->size(); - Filter& _selection_filter = ctx->use_merged_selection ? ctx->merged_selection : ctx->selection; - _selection_filter.resize(size); - uint8_t* _selection = _selection_filter.data(); - - // reuse ctx's hash_values object. - HashValues& _hash_values = ctx->hash_values; - if constexpr (hash_partition) { - DCHECK_LE(size, _hash_values.size()); - } - if (input_column->is_constant()) { - const auto* const_column = down_cast(input_column); - if (const_column->only_null()) { - _selection[0] = _has_null; - } else { - auto* input_data = down_cast(const_column->data_column().get())->get_data().data(); - _evaluate_min_max(input_data, _selection, 1); - _rf_test_data(_selection, input_data, _hash_values, 0); - } - uint8_t sel = _selection[0]; - memset(_selection, sel, size); - } else if (input_column->is_nullable()) { - const auto* nullable_column = down_cast(input_column); - auto* input_data = down_cast(nullable_column->data_column().get())->get_data().data(); - _evaluate_min_max(input_data, _selection, size); - if (nullable_column->has_null()) { - const uint8_t* null_data = nullable_column->immutable_null_column_data().data(); - for (int i = 0; i < size; i++) { - if (null_data[i]) { - _selection[i] = _has_null; - } else { - _rf_test_data(_selection, input_data, _hash_values, i); - } - } - } else { - for (int i = 0; i < size; ++i) { - _rf_test_data(_selection, input_data, _hash_values, i); - } - } - } else { - auto* input_data = down_cast(input_column)->get_data().data(); - _evaluate_min_max(input_data, _selection, size); - for (int i = 0; i < size; ++i) { - _rf_test_data(_selection, input_data, _hash_values, i); - } - } - } - private: CppType _min; CppType _max; diff --git a/be/src/exprs/runtime_filter_bank.cpp b/be/src/exprs/runtime_filter_bank.cpp index 476f7e88980852..1e3ff774e1ebc3 100644 --- a/be/src/exprs/runtime_filter_bank.cpp +++ b/be/src/exprs/runtime_filter_bank.cpp @@ -21,6 +21,7 @@ #include "exprs/in_const_predicate.hpp" #include "exprs/literal.h" #include "exprs/runtime_filter.h" +#include "exprs/runtime_filter_layout.h" #include "gen_cpp/RuntimeFilter_types.h" #include "gen_cpp/Types_types.h" #include "gutil/strings/substitute.h" @@ -205,12 +206,12 @@ Status RuntimeFilterBuildDescriptor::init(ObjectPool* pool, const TRuntimeFilter _filter_id = desc.filter_id; _build_expr_order = desc.expr_order; _has_remote_targets = desc.has_remote_targets; - _join_mode = desc.build_join_mode; if (desc.__isset.runtime_filter_merge_nodes) { _merge_nodes = desc.runtime_filter_merge_nodes; } _has_consumer = false; + _join_mode = desc.build_join_mode; if (desc.__isset.plan_node_id_to_target_expr && desc.plan_node_id_to_target_expr.size() != 0) { _has_consumer = true; } @@ -226,7 +227,7 @@ Status RuntimeFilterBuildDescriptor::init(ObjectPool* pool, const TRuntimeFilter if (desc.__isset.broadcast_grf_destinations) { _broadcast_grf_destinations = desc.broadcast_grf_destinations; } - + WithLayoutMixin::init(desc); RETURN_IF_ERROR(Expr::create_expr_tree(pool, desc.build_expr, &_build_expr_ctx, state)); return Status::OK(); } @@ -250,9 +251,7 @@ Status RuntimeFilterProbeDescriptor::init(ObjectPool* pool, const TRuntimeFilter } } - if (desc.__isset.bucketseq_to_instance) { - _bucketseq_to_partition = desc.bucketseq_to_instance; - } + WithLayoutMixin::init(desc); if (desc.__isset.plan_node_id_to_partition_by_exprs) { const auto& it = const_cast(desc).plan_node_id_to_partition_by_exprs.find(node_id); @@ -405,7 +404,6 @@ void RuntimeFilterProbeCollector::do_evaluate(Chunk* chunk, RuntimeBloomFilterEv auto* ctx = rf_desc->probe_expr_ctx(); ColumnPtr column = EVALUATE_NULL_IF_ERROR(ctx, ctx->root(), chunk); // for colocate grf - eval_context.running_context.bucketseq_to_partition = rf_desc->bucketseq_to_partition(); compute_hash_values(chunk, column.get(), rf_desc, eval_context); filter->evaluate(column.get(), &eval_context.running_context); @@ -466,14 +464,14 @@ void RuntimeFilterProbeCollector::compute_hash_values(Chunk* chunk, Column* colu return; } if (rf_desc->partition_by_expr_contexts()->empty()) { - filter->compute_hash({column}, &eval_context.running_context); + filter->compute_partition_index(rf_desc->layout(), {column}, &eval_context.running_context); } else { std::vector partition_by_columns; for (auto& partition_ctx : *(rf_desc->partition_by_expr_contexts())) { ColumnPtr partition_column = EVALUATE_NULL_IF_ERROR(partition_ctx, partition_ctx->root(), chunk); partition_by_columns.push_back(partition_column.get()); } - filter->compute_hash(partition_by_columns, &eval_context.running_context); + filter->compute_partition_index(rf_desc->layout(), partition_by_columns, &eval_context.running_context); } } @@ -499,7 +497,6 @@ void RuntimeFilterProbeCollector::update_selectivity(Chunk* chunk, RuntimeBloomF auto ctx = rf_desc->probe_expr_ctx(); ColumnPtr column = EVALUATE_NULL_IF_ERROR(ctx, ctx->root(), chunk); // for colocate grf - eval_context.running_context.bucketseq_to_partition = rf_desc->bucketseq_to_partition(); compute_hash_values(chunk, column.get(), rf_desc, eval_context); // true count is not accummulated, it is evaluated for each RF respectively filter->evaluate(column.get(), &eval_context.running_context); diff --git a/be/src/exprs/runtime_filter_bank.h b/be/src/exprs/runtime_filter_bank.h index 494ed5ed48f36e..f39f9dfe901621 100644 --- a/be/src/exprs/runtime_filter_bank.h +++ b/be/src/exprs/runtime_filter_bank.h @@ -25,6 +25,7 @@ #include "exprs/expr.h" #include "exprs/expr_context.h" #include "exprs/runtime_filter.h" +#include "exprs/runtime_filter_layout.h" #include "gen_cpp/InternalService_types.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/RuntimeFilter_types.h" @@ -70,7 +71,7 @@ class RuntimeFilterHelper { // how to generate & publish this runtime filter // it only happens in hash join node. -class RuntimeFilterBuildDescriptor { +class RuntimeFilterBuildDescriptor : public WithLayoutMixin { public: RuntimeFilterBuildDescriptor() = default; Status init(ObjectPool* pool, const TRuntimeFilterDescription& desc, RuntimeState* state); @@ -85,7 +86,6 @@ class RuntimeFilterBuildDescriptor { } bool has_remote_targets() const { return _has_remote_targets; } bool has_consumer() const { return _has_consumer; } - int8_t join_mode() const { return _join_mode; } const std::vector& merge_nodes() const { return _merge_nodes; } void set_runtime_filter(JoinRuntimeFilter* rf) { _runtime_filter = rf; } void set_or_intersect_filter(JoinRuntimeFilter* rf) { @@ -100,6 +100,7 @@ class RuntimeFilterBuildDescriptor { JoinRuntimeFilter* runtime_filter() { return _runtime_filter; } void set_is_pipeline(bool flag) { _is_pipeline = flag; } bool is_pipeline() const { return _is_pipeline; } + int8_t join_mode() const { return _join_mode; }; private: friend class HashJoinNode; @@ -117,10 +118,11 @@ class RuntimeFilterBuildDescriptor { std::vector _merge_nodes; JoinRuntimeFilter* _runtime_filter = nullptr; bool _is_pipeline = false; + std::mutex _mutex; }; -class RuntimeFilterProbeDescriptor { +class RuntimeFilterProbeDescriptor : public WithLayoutMixin { public: RuntimeFilterProbeDescriptor() = default; Status init(ObjectPool* pool, const TRuntimeFilterDescription& desc, TPlanNodeId node_id, RuntimeState* state); @@ -158,8 +160,7 @@ class RuntimeFilterProbeDescriptor { TPlanNodeId build_plan_node_id() const { return _build_plan_node_id; } TPlanNodeId probe_plan_node_id() const { return _probe_plan_node_id; } void set_probe_plan_node_id(TPlanNodeId id) { _probe_plan_node_id = id; } - const TRuntimeFilterBuildJoinMode::type join_mode() const { return _join_mode; }; - const std::vector* bucketseq_to_partition() const { return &_bucketseq_to_partition; } + int8_t join_mode() const { return _join_mode; }; const std::vector* partition_by_expr_contexts() const { return &_partition_by_exprs_contexts; } private: @@ -178,10 +179,9 @@ class RuntimeFilterProbeDescriptor { RuntimeProfile::Counter* _latency_timer = nullptr; int64_t _open_timestamp = 0; int64_t _ready_timestamp = 0; - TRuntimeFilterBuildJoinMode::type _join_mode; + int8_t _join_mode; bool _is_topn_filter = false; bool _skip_wait = false; - std::vector _bucketseq_to_partition; std::vector _partition_by_exprs_contexts; }; diff --git a/be/src/exprs/runtime_filter_layout.cpp b/be/src/exprs/runtime_filter_layout.cpp new file mode 100644 index 00000000000000..5c68f117e5a7f1 --- /dev/null +++ b/be/src/exprs/runtime_filter_layout.cpp @@ -0,0 +1,47 @@ +#include "exprs/runtime_filter_layout.h" + +#include + +namespace starrocks { +void RuntimeFilterLayout::init(const TRuntimeFilterLayout& layout) { + this->_filter_id = layout.filter_id; + this->_local_layout = layout.local_layout; + this->_global_layout = layout.global_layout; + this->_pipeline_level_multi_partitioned = layout.pipeline_level_multi_partitioned; + this->_num_instances = layout.num_instances; + this->_num_drivers_per_instance = layout.num_drivers_per_instance; + if (layout.__isset.bucketseq_to_partition) { + this->_bucketseq_to_instance = layout.bucketseq_to_instance; + } + if (layout.__isset.bucketseq_to_driverseq) { + this->_bucketseq_to_driverseq = layout.bucketseq_to_driverseq; + } + if (layout.__isset.bucketseq_to_partition) { + this->_bucketseq_to_partition = layout.bucketseq_to_partition; + } +} + +void RuntimeFilterLayout::init(int filter_id, const std::vector& bucketseq_to_instance) { + this->_filter_id = filter_id; + this->_local_layout = TRuntimeFilterLayoutMode::SINGLETON; + this->_global_layout = bucketseq_to_instance.empty() ? TRuntimeFilterLayoutMode::SINGLETON + : TRuntimeFilterLayoutMode::GLOBAL_BUCKET_1L; + this->_pipeline_level_multi_partitioned = false; + this->_num_instances = 0; + this->_num_drivers_per_instance = 0; + this->_bucketseq_to_instance = bucketseq_to_instance; +} + +void WithLayoutMixin::init(const TRuntimeFilterDescription& desc) { + if (desc.__isset.layout) { + _layout.init(desc.layout); + } else { + if (desc.__isset.bucketseq_to_instance) { + _layout.init(desc.filter_id, desc.bucketseq_to_instance); + } else { + _layout.init(desc.filter_id, {}); + } + } +} + +} // namespace starrocks \ No newline at end of file diff --git a/be/src/exprs/runtime_filter_layout.h b/be/src/exprs/runtime_filter_layout.h new file mode 100644 index 00000000000000..1c40e848e65e1f --- /dev/null +++ b/be/src/exprs/runtime_filter_layout.h @@ -0,0 +1,116 @@ +#pragma once + +#include + +#include +#include + +#include "gen_cpp/RuntimeFilter_types.h" +#include "util/guard.h" + +namespace starrocks { +class RuntimeFilterLayout { +public: + void init(const TRuntimeFilterLayout& layout); + void init(int filter_id, const std::vector& bucketseq_to_instance); + int filter_id() const { return _filter_id; } + TRuntimeFilterLayoutMode::type local_layout() const { return _local_layout; } + TRuntimeFilterLayoutMode::type global_layout() const { return _global_layout; } + bool pipeline_level_multi_partitioned() const { return _pipeline_level_multi_partitioned; } + int num_instances() const { return _num_instances; } + int num_drivers_per_instance() const { return _num_drivers_per_instance; }; + const std::vector& bucketseq_to_instance() const { return _bucketseq_to_instance; } + + const std::vector& bucketseq_to_driverseq() const { return _bucketseq_to_driverseq; } + const std::vector& bucketseq_to_partition() const { return _bucketseq_to_partition; } + +protected: + int _filter_id; + TRuntimeFilterLayoutMode::type _local_layout; + TRuntimeFilterLayoutMode::type _global_layout; + bool _pipeline_level_multi_partitioned = false; + int _num_instances; + int _num_drivers_per_instance; + std::vector _bucketseq_to_instance; + std::vector _bucketseq_to_driverseq; + std::vector _bucketseq_to_partition; +}; + +class WithLayoutMixin { +public: + void init(const TRuntimeFilterDescription& desc); + const RuntimeFilterLayout& layout() const { return _layout; } + +protected: + RuntimeFilterLayout _layout; +}; +static constexpr uint32_t BUCKET_ABSENT = 2147483647; +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutSingletonGuard, layout_is_singleton, + TRuntimeFilterLayoutMode::SINGLETON); + +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutPipelineShuffleGuard, layout_is_pipeline_shuffle, + TRuntimeFilterLayoutMode::PIPELINE_SHUFFLE); + +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutGlobalShuffle2LGuard, layout_is_global_shuffle_2l, + TRuntimeFilterLayoutMode::GLOBAL_SHUFFLE_2L); + +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutGlobalShuffle1LGuard, layout_is_global_shuffle_1l, + TRuntimeFilterLayoutMode::GLOBAL_SHUFFLE_1L); + +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutShuffleGuard, layout_is_shuffle, + TRuntimeFilterLayoutMode::PIPELINE_SHUFFLE, TRuntimeFilterLayoutMode::GLOBAL_SHUFFLE_1L, + TRuntimeFilterLayoutMode::GLOBAL_SHUFFLE_2L); + +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutPipelineBucketLXGuard, layout_is_pipeline_bucket_lx, + TRuntimeFilterLayoutMode::PIPELINE_BUCKET_LX); + +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutPipelineBucketGuard, layout_is_pipeline_bucket, + TRuntimeFilterLayoutMode::PIPELINE_BUCKET); + +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutGlobalBucket1LGuard, layout_is_global_bucket_1l, + TRuntimeFilterLayoutMode::GLOBAL_BUCKET_1L); + +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutGlobalBucket2LLXlGuard, layout_is_global_bucket_2l_lx, + TRuntimeFilterLayoutMode::GLOBAL_BUCKET_2L_LX); + +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutGlobalBucket2LGuard, layout_is_global_bucket_2l, + TRuntimeFilterLayoutMode::GLOBAL_BUCKET_2L); + +VALUE_GUARD(TRuntimeFilterLayoutMode::type, LayoutBucketGuard, layout_is_bucket, + TRuntimeFilterLayoutMode::PIPELINE_BUCKET_LX, TRuntimeFilterLayoutMode::PIPELINE_BUCKET, + TRuntimeFilterLayoutMode::GLOBAL_BUCKET_1L, TRuntimeFilterLayoutMode::GLOBAL_BUCKET_2L_LX, + TRuntimeFilterLayoutMode::GLOBAL_BUCKET_2L) + +constexpr bool is_multi_partitioned(TRuntimeFilterLayoutMode::type mode) { + return mode != TRuntimeFilterLayoutMode::NONE && mode != TRuntimeFilterLayoutMode::SINGLETON; +} + +template