Skip to content

Commit

Permalink
pipeline-level rf
Browse files Browse the repository at this point in the history
Signed-off-by: satanson <[email protected]>
  • Loading branch information
satanson committed Oct 8, 2023
1 parent 8d4c683 commit c7691ac
Show file tree
Hide file tree
Showing 20 changed files with 846 additions and 201 deletions.
1 change: 0 additions & 1 deletion .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ PointerAlignment: Left
ReflowComments: false
SortUsingDeclarations: false
SpacesBeforeTrailingComments: 1
AllowShortFunctionsOnASingleLine: Inline
---
Language: Java
BasedOnStyle: Google
Expand Down
15 changes: 14 additions & 1 deletion be/src/exec/hash_joiner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,20 @@ Status HashJoiner::_create_runtime_bloom_filters(RuntimeState* state, int64_t li
int expr_order = rf_desc->build_expr_order();
ColumnPtr column = ht.get_key_columns()[expr_order];
bool eq_null = _is_null_safes[expr_order];
_runtime_bloom_filter_build_params.emplace_back(pipeline::RuntimeBloomFilterBuildParam(eq_null, column));
MutableJoinRuntimeFilterPtr filter = nullptr;
auto multi_partitioned = rf_desc->layout().pipeline_level_multi_partitioned();
if (multi_partitioned) {
LogicalType build_type = rf_desc->build_expr_type();
filter = std::shared_ptr<JoinRuntimeFilter>(
RuntimeFilterHelper::create_runtime_bloom_filter(nullptr, build_type));
if (filter == nullptr) continue;
filter->set_join_mode(rf_desc->join_mode());
filter->init(ht.get_row_count());
RETURN_IF_ERROR(RuntimeFilterHelper::fill_runtime_bloom_filter(column, build_type, filter.get(),
kHashJoinKeyColumnOffset, eq_null));
}
_runtime_bloom_filter_build_params.emplace_back(pipeline::RuntimeBloomFilterBuildParam(
multi_partitioned, eq_null, std::move(column), std::move(filter)));
}
return Status::OK();
}
Expand Down
9 changes: 7 additions & 2 deletions be/src/exec/pipeline/exchange/shuffler.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,13 @@ class Shuffler {
if constexpr (!two_level_shuffle) {
shuffle_id = channel_id;
} else {
uint32_t driver_sequence = ReduceOp()(HashUtil::xorshift32(hash_values[i]), _num_shuffles_per_channel);
shuffle_id = channel_id * _num_shuffles_per_channel + driver_sequence;
if (_num_shuffles_per_channel == 1) {
shuffle_id = channel_id;
} else {
uint32_t driver_sequence =
ReduceOp()(HashUtil::xorshift32(hash_values[i]), _num_shuffles_per_channel);
shuffle_id = channel_id * _num_shuffles_per_channel + driver_sequence;
}
}
shuffle_channel_ids[i] = shuffle_id;
}
Expand Down
91 changes: 90 additions & 1 deletion be/src/exec/pipeline/runtime_filter_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,16 @@ struct RuntimeBloomFilterBuildParam;
using OptRuntimeBloomFilterBuildParams = std::vector<std::optional<RuntimeBloomFilterBuildParam>>;
// Parameters used to build runtime bloom-filters.
struct RuntimeBloomFilterBuildParam {
RuntimeBloomFilterBuildParam(bool eq_null, ColumnPtr column) : eq_null(eq_null), column(std::move(column)) {}
RuntimeBloomFilterBuildParam(bool multi_partitioned, bool eq_null, ColumnPtr column,
MutableJoinRuntimeFilterPtr runtime_filter)
: multi_partitioned(multi_partitioned),
eq_null(eq_null),
column(std::move(column)),
runtime_filter(std::move(runtime_filter)) {}
bool multi_partitioned;
bool eq_null;
ColumnPtr column;
MutableJoinRuntimeFilterPtr runtime_filter;
};

// RuntimeFilterCollector contains runtime in-filters and bloom-filters, it is stored in RuntimeFilerHub
Expand Down Expand Up @@ -306,6 +313,18 @@ class PartialRuntimeFilterMerger {
}

Status merge_local_bloom_filters() {
if (_bloom_filter_descriptors.empty()) {
return Status::OK();
}
auto multi_partitioned = _bloom_filter_descriptors[0]->layout().pipeline_level_multi_partitioned();
if (multi_partitioned) {
return merge_multi_partitioned_local_bloom_filters();
} else {
return merge_singleton_local_bloom_filters();
}
}

Status merge_singleton_local_bloom_filters() {
if (_partial_bloom_filter_build_params.empty()) {
return Status::OK();
}
Expand Down Expand Up @@ -381,6 +400,76 @@ class PartialRuntimeFilterMerger {
return Status::OK();
}

Status merge_multi_partitioned_local_bloom_filters() {
if (_partial_bloom_filter_build_params.empty()) {
return Status::OK();
}
size_t row_count = 0;
for (auto count : _ht_row_counts) {
row_count += count;
}
for (auto& desc : _bloom_filter_descriptors) {
desc->set_is_pipeline(true);
// skip if it does not have consumer.
if (!desc->has_consumer()) continue;
// skip if ht.size() > limit, and it's only for local.
if (!desc->has_remote_targets() && row_count > _limit) continue;
LogicalType build_type = desc->build_expr_type();
JoinRuntimeFilter* filter = RuntimeFilterHelper::create_runtime_bloom_filter(_pool, build_type);
if (filter == nullptr) continue;
filter->init(row_count);
desc->set_runtime_filter(filter);
}

const auto& num_bloom_filters = _bloom_filter_descriptors.size();

// remove empty params that generated in two cases:
// 1. the corresponding HashJoinProbeOperator is finished in short-circuit style because HashJoinBuildOperator
// above this operator has constructed an empty hash table.
// 2. the HashJoinBuildOperator is finished in advance because the fragment instance is canceled
_partial_bloom_filter_build_params.erase(
std::remove_if(_partial_bloom_filter_build_params.begin(), _partial_bloom_filter_build_params.end(),
[](auto& opt_params) { return opt_params.empty(); }),
_partial_bloom_filter_build_params.end());

// there is no non-empty params, set all runtime filter to nullptr
if (_partial_bloom_filter_build_params.empty()) {
for (auto& desc : _bloom_filter_descriptors) {
desc->set_runtime_filter(nullptr);
}
return Status::OK();
}

// all params must have the same size as num_bloom_filters
DCHECK(std::all_of(_partial_bloom_filter_build_params.begin(), _partial_bloom_filter_build_params.end(),
[&num_bloom_filters](auto& opt_params) { return opt_params.size() == num_bloom_filters; }));

for (auto i = 0; i < num_bloom_filters; ++i) {
auto& desc = _bloom_filter_descriptors[i];
if (desc->runtime_filter() == nullptr) {
continue;
}
auto can_merge =
std::all_of(_partial_bloom_filter_build_params.begin(), _partial_bloom_filter_build_params.end(),
[i](auto& opt_params) { return opt_params[i].has_value(); });
if (!can_merge) {
desc->set_runtime_filter(nullptr);
continue;
}
auto* rf = desc->runtime_filter();
for (auto& opt_params : _partial_bloom_filter_build_params) {
auto& opt_param = opt_params[i];
DCHECK(opt_param.has_value());
auto& param = opt_param.value();
if (param.column == nullptr || param.column->empty()) {
continue;
}
rf->concat(param.runtime_filter.get());
}
}
return Status::OK();
}

size_t limit() const { return _limit; }

private:
Expand Down
1 change: 1 addition & 0 deletions be/src/exprs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ set(EXPR_FILES
subfield_expr.cpp
map_apply_expr.cpp
map_expr.cpp
runtime_filter_layout.cpp
)

add_library(Exprs ${EXPR_FILES})
25 changes: 15 additions & 10 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,9 @@ bool SimdBlockFilter::check_equal(const SimdBlockFilter& bf) const {

size_t JoinRuntimeFilter::max_serialized_size() const {
// todo(yan): noted that it's not serialize compatible with 32-bit and 64-bit.
size_t size = sizeof(_has_null) + sizeof(_size) + sizeof(_num_hash_partitions) + sizeof(_join_mode);
if (_num_hash_partitions == 0) {
auto num_partitions = _hash_partition_bf.size();
size_t size = sizeof(_has_null) + sizeof(_size) + sizeof(num_partitions) + sizeof(_join_mode);
if (num_partitions == 0) {
size += _bf.max_serialized_size();
} else {
for (const auto& bf : _hash_partition_bf) {
Expand All @@ -155,16 +156,17 @@ size_t JoinRuntimeFilter::max_serialized_size() const {

size_t JoinRuntimeFilter::serialize(int serialize_version, uint8_t* data) const {
size_t offset = 0;
auto num_partitions = _hash_partition_bf.size();
#define JRF_COPY_FIELD(field) \
memcpy(data + offset, &field, sizeof(field)); \
offset += sizeof(field);
JRF_COPY_FIELD(_has_null);
JRF_COPY_FIELD(_size);
JRF_COPY_FIELD(_num_hash_partitions);
JRF_COPY_FIELD(num_partitions);
JRF_COPY_FIELD(_join_mode);
#undef JRF_COPY_FIELD

if (_num_hash_partitions == 0) {
if (num_partitions == 0) {
offset += _bf.serialize(data + offset);

} else {
Expand All @@ -177,19 +179,20 @@ size_t JoinRuntimeFilter::serialize(int serialize_version, uint8_t* data) const

size_t JoinRuntimeFilter::deserialize(int serialize_version, const uint8_t* data) {
size_t offset = 0;
size_t num_partitions = 0;
#define JRF_COPY_FIELD(field) \
memcpy(&field, data + offset, sizeof(field)); \
offset += sizeof(field);
JRF_COPY_FIELD(_has_null);
JRF_COPY_FIELD(_size);
JRF_COPY_FIELD(_num_hash_partitions);
JRF_COPY_FIELD(num_partitions);
JRF_COPY_FIELD(_join_mode);
#undef JRF_COPY_FIELD

if (_num_hash_partitions == 0) {
if (num_partitions == 0) {
offset += _bf.deserialize(data + offset);
} else {
for (size_t i = 0; i < _num_hash_partitions; i++) {
for (size_t i = 0; i < num_partitions; i++) {
SimdBlockFilter bf;
offset += bf.deserialize(data + offset);
_hash_partition_bf.emplace_back(std::move(bf));
Expand All @@ -200,13 +203,15 @@ size_t JoinRuntimeFilter::deserialize(int serialize_version, const uint8_t* data
}

bool JoinRuntimeFilter::check_equal(const JoinRuntimeFilter& rf) const {
bool first = (_has_null == rf._has_null && _size == rf._size && _num_hash_partitions == rf._num_hash_partitions &&
auto lhs_num_partitions = _hash_partition_bf.size();
auto rhs_num_partitions = rf._hash_partition_bf.size();
bool first = (_has_null == rf._has_null && _size == rf._size && lhs_num_partitions == rhs_num_partitions &&
_join_mode == rf._join_mode);
if (!first) return false;
if (_num_hash_partitions == 0) {
if (lhs_num_partitions) {
if (!_bf.check_equal(rf._bf)) return false;
} else {
for (size_t i = 0; i < _num_hash_partitions; i++) {
for (size_t i = 0; i < lhs_num_partitions; ++i) {
if (!_hash_partition_bf[i].check_equal(rf._hash_partition_bf[i])) {
return false;
}
Expand Down
Loading

0 comments on commit c7691ac

Please sign in to comment.