Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] TopN runtime filter for string support update #55122

Merged
merged 7 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions be/src/exec/olap_scan_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,8 @@ void ChunkPredicateBuilder<E, Type>::normalized_rf_with_null(const JoinRuntimeFi
&decoder);
const TypeDescriptor& col_type = col_ref->type();

ColumnPtr const_min_col = parser.template min_const_column<SlotType>(col_type);
ColumnPtr const_max_col = parser.template max_const_column<SlotType>(col_type);
ColumnPtr const_min_col = parser.template min_const_column<SlotType>(col_type, pool);
ColumnPtr const_max_col = parser.template max_const_column<SlotType>(col_type, pool);
VectorizedLiteral* min_literal = pool->add(new VectorizedLiteral(std::move(const_min_col), col_type));
VectorizedLiteral* max_literal = pool->add(new VectorizedLiteral(std::move(const_max_col), col_type));

Expand Down Expand Up @@ -719,7 +719,7 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, SlotType, LowCardDictType,
detail::RuntimeColumnPredicateBuilder::GlobalDictCodeDecoder>(*range, rf,
detail::RuntimeColumnPredicateBuilder::GlobalDictCodeDecoder>(*range, rf, _opts.obj_pool,
&iter->second.first);
}
} else {
Expand All @@ -729,7 +729,7 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(
*range, rf, nullptr);
*range, rf, _opts.obj_pool, nullptr);
}
}
} else {
Expand All @@ -738,8 +738,8 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD
rf, desc->probe_expr_ctx()->root(), nullptr);
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(*range, rf,
nullptr);
RangeType, SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(
*range, rf, _opts.obj_pool, nullptr);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/min_max_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ class MinMaxPredicateBuilder {
template <LogicalType ltype>
Expr* operator()() {
auto* bloom_filter = (RuntimeBloomFilter<ltype>*)(_filter);
return _pool->add(new MinMaxPredicate<ltype>(_slot_id, bloom_filter->min_value(), bloom_filter->max_value(),
bloom_filter->has_null()));
return _pool->add(new MinMaxPredicate<ltype>(_slot_id, bloom_filter->min_value(_pool),
bloom_filter->max_value(_pool), bloom_filter->has_null()));
}

private:
Expand Down
66 changes: 52 additions & 14 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,13 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter {
p->init(1);

if constexpr (IsSlice<CppType>) {
p->_slice_min = val.to_string();
val = Slice(p->_slice_min.data(), val.get_size());
if constexpr (is_min) {
p->_slice_min = val.to_string();
val = Slice(p->_slice_min.data(), p->_slice_min.size());
} else {
p->_slice_max = val.to_string();
val = Slice(p->_slice_max.data(), p->_slice_max.size());
}
}

if constexpr (is_min) {
Expand Down Expand Up @@ -431,18 +436,33 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter {
void update_min_max(CppType val) {
// now slice have not support update min/max
if constexpr (IsSlice<CppType>) {
return;
}

if constexpr (is_min) {
if (_min < val) {
_min = val;
_update_version();
std::lock_guard<std::mutex> lk(_slice_mutex);
if constexpr (is_min) {
if (_min < val) {
_slice_min = val.to_string();
_min.data = _slice_min.data();
_min.size = _slice_min.size();
_update_version();
}
} else {
if (_max > val) {
_slice_max = val.to_string();
_max.data = _slice_max.data();
_max.size = _slice_max.size();
_update_version();
}
}
} else {
if (_max > val) {
_max = val;
_update_version();
if constexpr (is_min) {
if (_min < val) {
_min = val;
_update_version();
}
} else {
if (_max > val) {
_max = val;
_update_version();
}
}
}
}
Expand Down Expand Up @@ -490,9 +510,25 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter {

void insert_null() { _has_null = true; }

CppType min_value() const { return _min; }
CppType min_value(ObjectPool* pool) const {
if constexpr (IsSlice<CppType>) {
std::lock_guard<std::mutex> lk(_slice_mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we have slice_mutex? to protect from race condiction of pool allocation?

if that, maybe you can name it as pool_mutex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used to protect _slice_min/_slice_max. _min point to the std::string of _slice_min, so lock it first, copy the string, and then release it. Otherwise, when get, other threads will replace the std::string in _slice_min. ,

auto* str = pool->template add(new std::string(_min.get_data(), _min.get_size()));
return Slice(*str);
} else {
return _min;
}
}

CppType max_value() const { return _max; }
CppType max_value(ObjectPool* pool) const {
if constexpr (IsSlice<CppType>) {
std::lock_guard<std::mutex> lk(_slice_mutex);
auto* str = pool->template add(new std::string(_max.get_data(), _max.get_size()));
return Slice(*str);
} else {
return _max;
}
}

void set_left_close_interval(bool close_interval) { _left_close_interval = close_interval; }
void set_right_close_interval(bool close_interval) { _right_close_interval = close_interval; }
Expand Down Expand Up @@ -808,6 +844,7 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter {
_max = std::max(_max, bf->_max);

if constexpr (IsSlice<CppType>) {
std::lock_guard<std::mutex> lk(_slice_mutex);
// maybe we are refering to another runtime filter instance
// for security we have to copy that back to our instance.
if (_min.size != 0 && _min.data != _slice_min.data()) {
Expand Down Expand Up @@ -922,6 +959,7 @@ class RuntimeBloomFilter final : public JoinRuntimeFilter {
CppType _max;
std::string _slice_min;
std::string _slice_max;
mutable std::mutex _slice_mutex;
bool _has_min_max = true;
bool _left_close_interval = true;
bool _right_close_interval = true;
Expand Down
110 changes: 55 additions & 55 deletions be/src/formats/orc/orc_chunk_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1051,78 +1051,78 @@ Status OrcChunkReader::_add_conjunct(const Expr* conjunct,
case type: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<type>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(bool(xrf->min_value())); \
auto upper = orc::Literal(bool(xrf->max_value())); \
auto lower = orc::Literal(bool(xrf->min_value(&_pool))); \
auto upper = orc::Literal(bool(xrf->max_value(&_pool))); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_INT_TYPE(type) \
case type: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<type>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(int64_t(xrf->min_value())); \
auto upper = orc::Literal(int64_t(xrf->max_value())); \
auto lower = orc::Literal(int64_t(xrf->min_value(&_pool))); \
auto upper = orc::Literal(int64_t(xrf->max_value(&_pool))); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_DOUBLE_TYPE(type) \
case type: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<type>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(double(xrf->min_value())); \
auto upper = orc::Literal(double(xrf->max_value())); \
auto lower = orc::Literal(double(xrf->min_value(&_pool))); \
auto upper = orc::Literal(double(xrf->max_value(&_pool))); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_STRING_TYPE(type) \
case type: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<type>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(xrf->min_value().data, xrf->min_value().size); \
auto upper = orc::Literal(xrf->max_value().data, xrf->max_value().size); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_DATE_TYPE(type) \
case type: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<type>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = \
orc::Literal(orc::PredicateDataType::DATE, OrcDateHelper::native_date_to_orc_date(xrf->min_value())); \
auto upper = \
orc::Literal(orc::PredicateDataType::DATE, OrcDateHelper::native_date_to_orc_date(xrf->max_value())); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_DECIMALV2_TYPE(type) \
case type: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<type>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = \
orc::Literal(to_orc128(xrf->min_value().value()), xrf->min_value().PRECISION, xrf->min_value().SCALE); \
auto upper = \
orc::Literal(to_orc128(xrf->max_value().value()), xrf->max_value().PRECISION, xrf->max_value().SCALE); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_DECIMALV3_TYPE(xtype) \
case xtype: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<xtype>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(orc::Int128(xrf->min_value()), slot->type().precision, slot->type().scale); \
auto upper = orc::Literal(orc::Int128(xrf->max_value()), slot->type().precision, slot->type().scale); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_DECIMAL128_TYPE(xtype) \
case xtype: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<xtype>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(orc::Int128(xrf->min_value() >> 64, xrf->min_value()), slot->type().precision, \
slot->type().scale); \
auto upper = orc::Literal(orc::Int128(xrf->max_value() >> 64, xrf->max_value()), slot->type().precision, \
slot->type().scale); \
ADD_RF_TO_BUILDER \
#define ADD_RF_STRING_TYPE(type) \
case type: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<type>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(xrf->min_value(&_pool).data, xrf->min_value(&_pool).size); \
auto upper = orc::Literal(xrf->max_value(&_pool).data, xrf->max_value(&_pool).size); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_DATE_TYPE(type) \
case type: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<type>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(orc::PredicateDataType::DATE, \
OrcDateHelper::native_date_to_orc_date(xrf->min_value(&_pool))); \
auto upper = orc::Literal(orc::PredicateDataType::DATE, \
OrcDateHelper::native_date_to_orc_date(xrf->max_value(&_pool))); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_DECIMALV2_TYPE(type) \
case type: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<type>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(to_orc128(xrf->min_value(&_pool).value()), xrf->min_value(&_pool).PRECISION, \
xrf->min_value(&_pool).SCALE); \
auto upper = orc::Literal(to_orc128(xrf->max_value(&_pool).value()), xrf->max_value(&_pool).PRECISION, \
xrf->max_value(&_pool).SCALE); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_DECIMALV3_TYPE(xtype) \
case xtype: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<xtype>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(orc::Int128(xrf->min_value(&_pool)), slot->type().precision, slot->type().scale); \
auto upper = orc::Literal(orc::Int128(xrf->max_value(&_pool)), slot->type().precision, slot->type().scale); \
ADD_RF_TO_BUILDER \
}

#define ADD_RF_DECIMAL128_TYPE(xtype) \
case xtype: { \
auto* xrf = dynamic_cast<const RuntimeBloomFilter<xtype>*>(rf); \
if (xrf == nullptr) return false; \
auto lower = orc::Literal(orc::Int128(xrf->min_value(&_pool) >> 64, xrf->min_value(&_pool)), \
slot->type().precision, slot->type().scale); \
auto upper = orc::Literal(orc::Int128(xrf->max_value(&_pool) >> 64, xrf->max_value(&_pool)), \
slot->type().precision, slot->type().scale); \
ADD_RF_TO_BUILDER \
}

bool OrcChunkReader::_add_runtime_filter(const uint64_t column_id, const SlotDescriptor* slot,
Expand Down
Loading
Loading