Skip to content

Commit

Permalink
[BugFix] Fix TopN RuntimeFilter with nulls first crash with lowcardin…
Browse files Browse the repository at this point in the history
…ality optimize

Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Jan 17, 2025
1 parent 23e6b79 commit eb6e0d3
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 15 deletions.
20 changes: 11 additions & 9 deletions be/src/exec/olap_scan_prepare.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "exprs/binary_predicate.h"
#include "exprs/compound_predicate.h"
#include "exprs/dictmapping_expr.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "exprs/in_const_predicate.hpp"
#include "exprs/is_null_predicate.h"
Expand Down Expand Up @@ -577,21 +578,22 @@ Status ChunkPredicateBuilder<E, Type>::normalize_binary_predicate(const SlotDesc

template <BoxedExprType E, CompoundNodeType Type>
template <LogicalType SlotType, LogicalType MappingType, template <class> class Decoder, class... Args>
void ChunkPredicateBuilder<E, Type>::normalized_rf_with_null(const JoinRuntimeFilter* rf, Expr* col_ref,
Args&&... args) {
void ChunkPredicateBuilder<E, Type>::normalized_rf_with_null(const JoinRuntimeFilter* rf,
const SlotDescriptor* slot_desc, Args&&... args) {
DCHECK(Type == CompoundNodeType::AND);

using RFColumnPredicateBuilder = detail::RuntimeColumnPredicateBuilder;
ObjectPool* pool = _opts.obj_pool;

const auto* filter = down_cast<const RuntimeBloomFilter<MappingType>*>(rf);
using DecoderType = Decoder<typename RunTimeTypeTraits<MappingType>::CppType>;
DecoderType decoder(std::forward<Args>(args)...);
detail::RuntimeColumnPredicateBuilder::MinMaxParser<RuntimeBloomFilter<MappingType>, DecoderType> parser(filter,
&decoder);
const TypeDescriptor& col_type = col_ref->type();
RFColumnPredicateBuilder::MinMaxParser<RuntimeBloomFilter<MappingType>, DecoderType> parser(filter, &decoder);

const TypeDescriptor& col_type = slot_desc->type();
ColumnRef* col_ref = pool->add(new ColumnRef(slot_desc));
ColumnPtr const_min_col = parser.template min_const_column<SlotType>(col_type);
ColumnPtr const_max_col = parser.template max_const_column<SlotType>(col_type);

VectorizedLiteral* min_literal = pool->add(new VectorizedLiteral(std::move(const_min_col), col_type));
VectorizedLiteral* max_literal = pool->add(new VectorizedLiteral(std::move(const_max_col), col_type));

Expand Down Expand Up @@ -715,7 +717,7 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD
if (rf->has_null()) {
normalized_rf_with_null<SlotType, LowCardDictType,
detail::RuntimeColumnPredicateBuilder::GlobalDictCodeDecoder>(
rf, desc->probe_expr_ctx()->root(), &iter->second.first);
rf, &slot, &iter->second.first);
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, SlotType, LowCardDictType,
Expand All @@ -725,7 +727,7 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD
} else {
if (rf->has_null()) {
normalized_rf_with_null<SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(
rf, desc->probe_expr_ctx()->root(), nullptr);
rf, &slot, nullptr);
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(
Expand All @@ -735,7 +737,7 @@ Status ChunkPredicateBuilder<E, Type>::normalize_join_runtime_filter(const SlotD
} else {
if (rf->has_null()) {
normalized_rf_with_null<SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(
rf, desc->probe_expr_ctx()->root(), nullptr);
rf, &slot, nullptr);
} else {
detail::RuntimeColumnPredicateBuilder::build_minmax_range<
RangeType, SlotType, SlotType, detail::RuntimeColumnPredicateBuilder::DummyDecoder>(*range, rf,
Expand Down
11 changes: 6 additions & 5 deletions be/src/exec/olap_scan_prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "exec/olap_common.h"
#include "exprs/expr.h"
#include "exprs/expr_context.h"
#include "runtime/descriptors.h"
#include "storage/predicate_tree/predicate_tree_fwd.h"
#include "storage/predicate_tree_params.h"
#include "storage/runtime_range_pruner.h"
Expand Down Expand Up @@ -91,7 +92,7 @@ class ChunkPredicateBuilder {
const UnarrivedRuntimeFilterList& unarrived_runtime_filters() { return rt_ranger_params; }

template <LogicalType SlotType, LogicalType MappingType, template <class> class Decoder, class... Args>
void normalized_rf_with_null(const JoinRuntimeFilter* rf, Expr* col_ref, Args&&... args);
void normalized_rf_with_null(const JoinRuntimeFilter* rf, const SlotDescriptor* slot_desc, Args&&... args);

private:
const ScanConjunctsManagerOptions& _opts;
Expand Down Expand Up @@ -138,11 +139,11 @@ class ChunkPredicateBuilder {
Status normalize_predicate(const SlotDescriptor& slot, ColumnValueRange<RangeValueType>* range);

template <LogicalType SlotType, typename RangeValueType, bool Negative>
requires(!lt_is_date<SlotType>) Status
normalize_in_or_equal_predicate(const SlotDescriptor& slot, ColumnValueRange<RangeValueType>* range);
requires(!lt_is_date<SlotType>)
Status normalize_in_or_equal_predicate(const SlotDescriptor& slot, ColumnValueRange<RangeValueType>* range);
template <LogicalType SlotType, typename RangeValueType, bool Negative>
requires lt_is_date<SlotType> Status normalize_in_or_equal_predicate(const SlotDescriptor& slot,
ColumnValueRange<RangeValueType>* range);
requires lt_is_date<SlotType>
Status normalize_in_or_equal_predicate(const SlotDescriptor& slot, ColumnValueRange<RangeValueType>* range);

template <LogicalType SlotType, typename RangeValueType, bool Negative>
Status normalize_binary_predicate(const SlotDescriptor& slot, ColumnValueRange<RangeValueType>* range);
Expand Down
54 changes: 54 additions & 0 deletions test/sql/test_sort/R/test_topn
Original file line number Diff line number Diff line change
Expand Up @@ -484,4 +484,58 @@ select c1,c0 from t5 order by c1,c0 desc limit 10;
11 8
12 9
13 10
-- !result
CREATE TABLE `tlow` (
`c0` int(11) NULL COMMENT "",
`c1` varchar(20) NULL COMMENT "",
`c2` varchar(200) NULL COMMENT "",
`c3` int(11) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`c0`, `c1`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`c0`, `c1`) BUCKETS 64
PROPERTIES (
"colocate_with" = "${uuid0}",
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
);
-- result:
-- !result
set io_tasks_per_scan_operator=1;
-- result:
-- !result
set group_execution_min_scan_rows=1;
-- result:
-- !result
insert into tlow SELECT generate_series, if (generate_series<10, null, generate_series%10), generate_series, generate_series FROM TABLE(generate_series(1, 40960));
-- result:
-- !result
analyze full table tl;
-- result:
E: (1064, 'Getting analyzing error. Detail message: Table tl is not found.')
-- !result
select count(distinct c1) from tlow;
-- result:
10
-- !result
function: wait_global_dict_ready('c1', 'tlow')
-- result:

-- !result
select c1 from tlow group by c0, c1 order by c1 nulls first limit 10;
-- result:
None
None
None
None
None
None
None
None
None
0
-- !result
34 changes: 33 additions & 1 deletion test/sql/test_sort/T/test_topn
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,36 @@ select c1 from t5 order by c1 asc limit 10;
select c1,c0 from t5 where c1 < 10 order by c1,c0 asc limit 10;
select c0,c1 from t5 order by c0 desc limit 10;
select c1 from t5 order by c1 desc limit 10;
select c1,c0 from t5 order by c1,c0 desc limit 10;
select c1,c0 from t5 order by c1,c0 desc limit 10;

-- test has null topn rf with lowcardinality
CREATE TABLE `tlow` (
`c0` int(11) NULL COMMENT "",
`c1` varchar(20) NULL COMMENT "",
`c2` varchar(200) NULL COMMENT "",
`c3` int(11) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`c0`, `c1`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`c0`, `c1`) BUCKETS 64
PROPERTIES (
"colocate_with" = "${uuid0}",
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false",
"replicated_storage" = "true",
"compression" = "LZ4"
);

set io_tasks_per_scan_operator=1;
set group_execution_min_scan_rows=1;

insert into tlow SELECT generate_series, if (generate_series<10, null, generate_series%10), generate_series, generate_series FROM TABLE(generate_series(1, 40960));
analyze full table tl;
select count(distinct c1) from tlow;
function: wait_global_dict_ready('c1', 'tlow')

select c1 from tlow group by c0, c1 order by c1 nulls first limit 10;


0 comments on commit eb6e0d3

Please sign in to comment.