diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index 021d3293c8b51..aa917a5b05835 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -259,6 +259,10 @@ pipeline::OpFactories ExchangeNode::decompose_to_pipeline(pipeline::PipelineBuil auto exchange_merge_sort_source_operator = std::make_shared( context->next_operator_id(), id(), _num_senders, _input_row_desc, &_sort_exec_exprs, _is_asc_order, _nulls_first, _offset, _limit); + if (_texchange_node.__isset.parallel_merge_late_materialize_mode) { + exchange_merge_sort_source_operator->set_materialized_mode( + _texchange_node.parallel_merge_late_materialize_mode); + } exchange_merge_sort_source_operator->set_degree_of_parallelism(context->degree_of_parallelism()); operators.emplace_back(std::move(exchange_merge_sort_source_operator)); // This particular exchange source will be executed in a concurrent way, and finally we need to gather them into one diff --git a/be/src/exec/pipeline/exchange/exchange_parallel_merge_source_operator.cpp b/be/src/exec/pipeline/exchange/exchange_parallel_merge_source_operator.cpp index 619917f4475c8..09983152814b8 100644 --- a/be/src/exec/pipeline/exchange/exchange_parallel_merge_source_operator.cpp +++ b/be/src/exec/pipeline/exchange/exchange_parallel_merge_source_operator.cpp @@ -100,7 +100,8 @@ merge_path::MergePathCascadeMerger* ExchangeParallelMergeSourceOperatorFactory:: SortDescs sort_descs(_is_asc_order, _nulls_first); _merger = std::make_unique( state->chunk_size(), degree_of_parallelism(), _sort_exec_exprs->lhs_ordering_expr_ctxs(), sort_descs, - _row_desc.tuple_descriptors()[0], TTopNType::ROW_NUMBER, _offset, _limit, chunk_providers); + _row_desc.tuple_descriptors()[0], TTopNType::ROW_NUMBER, _offset, _limit, chunk_providers, + _late_materialize_mode); } return _merger.get(); } diff --git a/be/src/exec/pipeline/exchange/exchange_parallel_merge_source_operator.h b/be/src/exec/pipeline/exchange/exchange_parallel_merge_source_operator.h index b09e155e6a9b1..931c7238755f0 100644 --- a/be/src/exec/pipeline/exchange/exchange_parallel_merge_source_operator.h +++ b/be/src/exec/pipeline/exchange/exchange_parallel_merge_source_operator.h @@ -105,6 +105,7 @@ class ExchangeParallelMergeSourceOperatorFactory final : public SourceOperatorFa void close_stream_recvr(); SourceOperatorFactory::AdaptiveState adaptive_initial_state() const override { return AdaptiveState::ACTIVE; } + void set_materialized_mode(TLateMaterializeMode::type mode) { _late_materialize_mode = mode; } private: const int32_t _num_sender; @@ -114,6 +115,7 @@ class ExchangeParallelMergeSourceOperatorFactory final : public SourceOperatorFa const std::vector& _nulls_first; const int64_t _offset; const int64_t _limit; + TLateMaterializeMode::type _late_materialize_mode = TLateMaterializeMode::AUTO; std::shared_ptr _stream_recvr; std::atomic _stream_recvr_cnt = 0; diff --git a/be/src/exec/pipeline/sort/local_parallel_merge_sort_source_operator.cpp b/be/src/exec/pipeline/sort/local_parallel_merge_sort_source_operator.cpp index f412fe62dcebf..577a8fe453729 100644 --- a/be/src/exec/pipeline/sort/local_parallel_merge_sort_source_operator.cpp +++ b/be/src/exec/pipeline/sort/local_parallel_merge_sort_source_operator.cpp @@ -103,7 +103,7 @@ OperatorPtr LocalParallelMergeSortSourceOperatorFactory::create(int32_t degree_o _mergers.push_back(std::make_unique( _state->chunk_size(), degree_of_parallelism, sort_context->sort_exprs(), sort_context->sort_descs(), _tuple_desc, sort_context->topn_type(), sort_context->offset(), sort_context->limit(), - chunk_providers)); + chunk_providers, _late_materialize_mode)); } return std::make_shared( this, _id, _plan_node_id, driver_sequence, sort_context.get(), _is_gathered, _mergers[0].get()); diff --git a/be/src/exec/pipeline/sort/local_parallel_merge_sort_source_operator.h b/be/src/exec/pipeline/sort/local_parallel_merge_sort_source_operator.h index 01401510d4a89..069e4d5b5156c 100644 --- a/be/src/exec/pipeline/sort/local_parallel_merge_sort_source_operator.h +++ b/be/src/exec/pipeline/sort/local_parallel_merge_sort_source_operator.h @@ -93,11 +93,13 @@ class LocalParallelMergeSortSourceOperatorFactory final : public SourceOperatorF void set_tuple_desc(const TupleDescriptor* tuple_desc) { _tuple_desc = tuple_desc; } void set_is_gathered(const bool is_gathered) { _is_gathered = is_gathered; } + void set_materialized_mode(TLateMaterializeMode::type mode) { _late_materialize_mode = mode; } private: const TupleDescriptor* _tuple_desc; bool _is_gathered = true; RuntimeState* _state; + TLateMaterializeMode::type _late_materialize_mode = TLateMaterializeMode::AUTO; // share data with multiple partition sort sink opeartor through _sort_context. std::shared_ptr _sort_context_factory; diff --git a/be/src/exec/sorting/merge_path.cpp b/be/src/exec/sorting/merge_path.cpp index 6cf57aabd1be9..2cc477292995a 100644 --- a/be/src/exec/sorting/merge_path.cpp +++ b/be/src/exec/sorting/merge_path.cpp @@ -645,7 +645,8 @@ MergePathCascadeMerger::MergePathCascadeMerger(const size_t chunk_size, const in std::vector sort_exprs, const SortDescs& sort_descs, const TupleDescriptor* tuple_desc, const TTopNType::type topn_type, const int64_t offset, const int64_t limit, - std::vector chunk_providers) + std::vector chunk_providers, + TLateMaterializeMode::type mode) : _chunk_size(chunk_size > MAX_CHUNK_SIZE ? MAX_CHUNK_SIZE : chunk_size), _streaming_batch_size(4 * chunk_size * degree_of_parallelism), _degree_of_parallelism(degree_of_parallelism), @@ -657,7 +658,8 @@ MergePathCascadeMerger::MergePathCascadeMerger(const size_t chunk_size, const in _limit(limit), _chunk_providers(std::move(chunk_providers)), _process_cnts(degree_of_parallelism), - _output_chunks(degree_of_parallelism) { + _output_chunks(degree_of_parallelism), + _late_materialization_mode(mode) { _working_nodes.resize(_degree_of_parallelism); _metrics.resize(_degree_of_parallelism); @@ -1131,11 +1133,17 @@ void MergePathCascadeMerger::_init_late_materialization() { metrics.profile->add_info_string("LateMaterialization", _late_materialization ? "True" : "False"); }); }); - if (_chunk_providers.size() <= 2) { _late_materialization = false; return; } + if (_late_materialization_mode == TLateMaterializeMode::ALWAYS) { + _late_materialization = true; + return; + } else if (_late_materialization_mode == TLateMaterializeMode::NEVER) { + _late_materialization = false; + return; + } const auto level_size = static_cast(std::ceil(std::log2(_chunk_providers.size()))); std::unordered_set early_materialized_slots; diff --git a/be/src/exec/sorting/merge_path.h b/be/src/exec/sorting/merge_path.h index 78c63c4ff3333..4deae3020f437 100644 --- a/be/src/exec/sorting/merge_path.h +++ b/be/src/exec/sorting/merge_path.h @@ -371,7 +371,8 @@ class MergePathCascadeMerger { MergePathCascadeMerger(const size_t chunk_size, const int32_t degree_of_parallelism, std::vector sort_exprs, const SortDescs& sort_descs, const TupleDescriptor* tuple_desc, const TTopNType::type topn_type, const int64_t offset, - const int64_t limit, std::vector chunk_providers); + const int64_t limit, std::vector chunk_providers, + TLateMaterializeMode::type mode = TLateMaterializeMode::AUTO); const std::vector& sort_exprs() const { return _sort_exprs; } const SortDescs& sort_descs() const { return _sort_descs; } @@ -490,6 +491,13 @@ class MergePathCascadeMerger { std::chrono::steady_clock::time_point _pending_start; // First pending should not be recorded, because it all comes from the operator dependency bool _is_first_pending = true; +<<<<<<< HEAD +======= + + TLateMaterializeMode::type _late_materialization_mode; + + starrocks::pipeline::Observable _observable; +>>>>>>> 37ed8644bb ([Enhancement] add session variable parallel_merge_late_materialization_mode control the parallel merge behaviour (#55082)) }; } // namespace starrocks::merge_path diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index fba9a2b49b705..3ddf6f64f2492 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -335,6 +335,10 @@ std::vector> TopNNode::_decompose_to_ ->set_tuple_desc(_materialized_tuple_desc); down_cast(source_operator.get())->set_is_gathered(need_merge); } + if (enable_parallel_merge && _tnode.sort_node.__isset.parallel_merge_late_materialize_mode) { + down_cast(source_operator.get()) + ->set_materialized_mode(_tnode.sort_node.parallel_merge_late_materialize_mode); + } ops_sink_with_sort.emplace_back(std::move(sink_operator)); context->add_pipeline(ops_sink_with_sort); diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/ExchangeNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/ExchangeNode.java index d6952f6d9e9d7..84045fd13ee83 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/ExchangeNode.java @@ -50,6 +50,7 @@ import com.starrocks.sql.optimizer.operator.TopNType; import com.starrocks.thrift.TExchangeNode; import com.starrocks.thrift.TExplainLevel; +import com.starrocks.thrift.TLateMaterializeMode; import com.starrocks.thrift.TNormalExchangeNode; import com.starrocks.thrift.TNormalPlanNode; import com.starrocks.thrift.TNormalSortInfo; @@ -198,8 +199,10 @@ protected void toThrift(TPlanNode msg) { if (partitionType != null) { msg.exchange_node.setPartition_type(partitionType); } - SessionVariable sessionVariable = ConnectContext.get().getSessionVariable(); - msg.exchange_node.setEnable_parallel_merge(sessionVariable.isEnableParallelMerge()); + SessionVariable sv = ConnectContext.get().getSessionVariable(); + msg.exchange_node.setEnable_parallel_merge(sv.isEnableParallelMerge()); + TLateMaterializeMode mode = TLateMaterializeMode.valueOf(sv.getParallelMergeLateMaterializationMode().toUpperCase()); + msg.exchange_node.setParallel_merge_late_materialize_mode(mode); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/SortNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/SortNode.java index fb82a01f60e23..44adf1fcdb016 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/SortNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/SortNode.java @@ -52,6 +52,7 @@ import com.starrocks.server.GlobalStateMgr; import com.starrocks.sql.optimizer.operator.TopNType; import com.starrocks.thrift.TExplainLevel; +import com.starrocks.thrift.TLateMaterializeMode; import com.starrocks.thrift.TNormalPlanNode; import com.starrocks.thrift.TNormalSortInfo; import com.starrocks.thrift.TNormalSortNode; @@ -215,6 +216,8 @@ protected void toThrift(TPlanNode msg) { msg.sort_node.setLate_materialization(sessionVariable.isFullSortLateMaterialization()); msg.sort_node.setEnable_parallel_merge(sessionVariable.isEnableParallelMerge()); + TLateMaterializeMode mode = TLateMaterializeMode.valueOf(sessionVariable.getParallelMergeLateMaterializationMode().toUpperCase()); + msg.sort_node.setParallel_merge_late_materialize_mode(mode); if (info.getPartitionExprs() != null) { msg.sort_node.setPartition_exprs(Expr.treesToThrift(info.getPartitionExprs())); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 7aeb6f841ebc9..1ba333f537b58 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -481,6 +481,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable { public static final String GROUP_EXECUTION_MIN_SCAN_ROWS = "group_execution_min_scan_rows"; public static final String ENABLE_PARALLEL_MERGE = "enable_parallel_merge"; + public static final String PARALLEL_MERGE_LATE_MATERIALIZATION_MODE = "parallel_merge_late_materialization_mode"; public static final String ENABLE_QUERY_QUEUE = "enable_query_queue"; public static final String WINDOW_PARTITION_MODE = "window_partition_mode"; @@ -1502,6 +1503,10 @@ public static MaterializedViewRewriteMode parse(String str) { @VarAttr(name = ENABLE_PARALLEL_MERGE) private boolean enableParallelMerge = true; + // AUTO/ALWAYS/NEVER + @VarAttr(name = PARALLEL_MERGE_LATE_MATERIALIZATION_MODE) + private String parallelMergeLateMaterializationMode = SessionVariableConstants.AUTO; + @VarAttr(name = ENABLE_QUERY_QUEUE, flag = VariableMgr.INVISIBLE) private boolean enableQueryQueue = true; @@ -1685,6 +1690,10 @@ public boolean isEnableParallelMerge() { return enableParallelMerge; } + public String getParallelMergeLateMaterializationMode() { + return parallelMergeLateMaterializationMode; + } + public void setEnableParallelMerge(boolean enableParallelMerge) { this.enableParallelMerge = enableParallelMerge; } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariableConstants.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariableConstants.java index 5feab63a7cb3b..4b07f11a2aad5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariableConstants.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariableConstants.java @@ -36,6 +36,10 @@ private SessionVariableConstants() {} public static final String VARCHAR = "varchar"; + public static final String ALWAYS = "always"; + + public static final String NEVER = "never"; + public enum ChooseInstancesMode { // the number of chosen instances is the same as the max number of instances from its children fragments diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 7dc8e961ad6bf..ee17359f87cfd 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -740,18 +740,6 @@ enum TAggregationOp { PERCENT_RANK } -//struct TAggregateFunctionCall { - // The aggregate function to call. -// 1: required Types.TFunction fn - - // The input exprs to this aggregate function -// 2: required list input_exprs - - // If set, this aggregate function udf has varargs and this is the index for the - // first variable argument. -// 3: optional i32 vararg_start_idx -//} - struct TAggregationNode { 1: optional list grouping_exprs // aggregate exprs. The root of each expr is the aggregate function. The @@ -825,6 +813,12 @@ enum TTopNType { DENSE_RANK } +enum TLateMaterializeMode { + AUTO, + ALWAYS, + NEVER, +} + struct TSortNode { 1: required TSortInfo sort_info // Indicates whether the backend service should use topn vs. sorting @@ -861,6 +855,13 @@ struct TSortNode { 29: optional bool late_materialization; 30: optional bool enable_parallel_merge; 31: optional bool analytic_partition_skewed; +<<<<<<< HEAD +======= + 32: optional list pre_agg_exprs; + 33: optional list pre_agg_output_slot_id; + 34: optional bool pre_agg_insert_local_shuffle; + 40: optional TLateMaterializeMode parallel_merge_late_materialize_mode; +>>>>>>> 37ed8644bb ([Enhancement] add session variable parallel_merge_late_materialization_mode control the parallel merge behaviour (#55082)) } enum TAnalyticWindowType { @@ -1019,6 +1020,7 @@ struct TExchangeNode { // Sender's partition type 4: optional Partitions.TPartitionType partition_type; 5: optional bool enable_parallel_merge + 6: optional TLateMaterializeMode parallel_merge_late_materialize_mode; } // This contains all of the information computed by the plan as part of the resource diff --git a/test/sql/test_sort/R/test_parallel_merge_lazy_materialize b/test/sql/test_sort/R/test_parallel_merge_lazy_materialize new file mode 100644 index 0000000000000..3cddfd1963f69 --- /dev/null +++ b/test/sql/test_sort/R/test_parallel_merge_lazy_materialize @@ -0,0 +1,151 @@ +-- name: test_parallel_merge_lazy_materialize +CREATE TABLE `t0` ( + `c0` int(11) NULL COMMENT "", + `c1` varchar(20) NULL COMMENT "", + `c2` varchar(200) NULL COMMENT "", + `c3` int(11) NULL COMMENT "", + `c4` int(11) NULL COMMENT "", + `c5` int(11) NULL COMMENT "", + `c6` int(11) NULL COMMENT "", + `c7` int(11) NULL COMMENT "" +) ENGINE=OLAP +DUPLICATE KEY(`c0`, `c1`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`c0`, `c1`) BUCKETS 48 +PROPERTIES ( +"replication_num" = "1", +"in_memory" = "false", +"storage_format" = "DEFAULT", +"enable_persistent_index" = "false", +"replicated_storage" = "true", +"compression" = "LZ4" +); +-- result: +-- !result +insert into t0 SELECT generate_series, generate_series, generate_series, generate_series, generate_series, generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 40960)); +-- result: +-- !result +select count(*) from t0; +-- result: +40960 +-- !result +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(ORDER BY c5) AS wv FROM t0) a; +-- result: +838881280 20480.5 1 40960 +-- !result +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +-- result: +40960 1.0 1 1 +-- !result +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv), SUM(c1), SUM(c2), SUM(c3), SUM(c4), SUM(c5), SUM(c6) FROM (SELECT c1,c2,c3,c4,c5,c6, COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +-- result: +40960 1.0 1 1 838881280.0 838881280.0 838881280 838881280 838881280 838881280 +-- !result +select * from t0 order by 1,2,3,4,5,6 limit 10; +-- result: +1 1 1 1 1 1 1 1 +2 2 2 2 2 2 2 2 +3 3 3 3 3 3 3 3 +4 4 4 4 4 4 4 4 +5 5 5 5 5 5 5 5 +6 6 6 6 6 6 6 6 +7 7 7 7 7 7 7 7 +8 8 8 8 8 8 8 8 +9 9 9 9 9 9 9 9 +10 10 10 10 10 10 10 10 +-- !result +select * from t0 order by 1,2,3,4,5,6 desc limit 10; +-- result: +1 1 1 1 1 1 1 1 +2 2 2 2 2 2 2 2 +3 3 3 3 3 3 3 3 +4 4 4 4 4 4 4 4 +5 5 5 5 5 5 5 5 +6 6 6 6 6 6 6 6 +7 7 7 7 7 7 7 7 +8 8 8 8 8 8 8 8 +9 9 9 9 9 9 9 9 +10 10 10 10 10 10 10 10 +-- !result +set parallel_merge_late_materialization_mode="always"; +-- result: +-- !result +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(ORDER BY c5) AS wv FROM t0) a; +-- result: +838881280 20480.5 1 40960 +-- !result +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +-- result: +40960 1.0 1 1 +-- !result +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv), SUM(c1), SUM(c2), SUM(c3), SUM(c4), SUM(c5), SUM(c6) FROM (SELECT c1,c2,c3,c4,c5,c6, COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +-- result: +40960 1.0 1 1 838881280.0 838881280.0 838881280 838881280 838881280 838881280 +-- !result +select * from t0 order by 1,2,3,4,5,6 limit 10; +-- result: +1 1 1 1 1 1 1 1 +2 2 2 2 2 2 2 2 +3 3 3 3 3 3 3 3 +4 4 4 4 4 4 4 4 +5 5 5 5 5 5 5 5 +6 6 6 6 6 6 6 6 +7 7 7 7 7 7 7 7 +8 8 8 8 8 8 8 8 +9 9 9 9 9 9 9 9 +10 10 10 10 10 10 10 10 +-- !result +select * from t0 order by 1,2,3,4,5,6 desc limit 10; +-- result: +1 1 1 1 1 1 1 1 +2 2 2 2 2 2 2 2 +3 3 3 3 3 3 3 3 +4 4 4 4 4 4 4 4 +5 5 5 5 5 5 5 5 +6 6 6 6 6 6 6 6 +7 7 7 7 7 7 7 7 +8 8 8 8 8 8 8 8 +9 9 9 9 9 9 9 9 +10 10 10 10 10 10 10 10 +-- !result +set parallel_merge_late_materialization_mode="never"; +-- result: +-- !result +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(ORDER BY c5) AS wv FROM t0) a; +-- result: +838881280 20480.5 1 40960 +-- !result +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +-- result: +40960 1.0 1 1 +-- !result +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv), SUM(c1), SUM(c2), SUM(c3), SUM(c4), SUM(c5), SUM(c6) FROM (SELECT c1,c2,c3,c4,c5,c6, COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +-- result: +40960 1.0 1 1 838881280.0 838881280.0 838881280 838881280 838881280 838881280 +-- !result +select * from t0 order by 1,2,3,4,5,6 limit 10; +-- result: +1 1 1 1 1 1 1 1 +2 2 2 2 2 2 2 2 +3 3 3 3 3 3 3 3 +4 4 4 4 4 4 4 4 +5 5 5 5 5 5 5 5 +6 6 6 6 6 6 6 6 +7 7 7 7 7 7 7 7 +8 8 8 8 8 8 8 8 +9 9 9 9 9 9 9 9 +10 10 10 10 10 10 10 10 +-- !result +select * from t0 order by 1,2,3,4,5,6 desc limit 10; +-- result: +1 1 1 1 1 1 1 1 +2 2 2 2 2 2 2 2 +3 3 3 3 3 3 3 3 +4 4 4 4 4 4 4 4 +5 5 5 5 5 5 5 5 +6 6 6 6 6 6 6 6 +7 7 7 7 7 7 7 7 +8 8 8 8 8 8 8 8 +9 9 9 9 9 9 9 9 +10 10 10 10 10 10 10 10 +-- !result \ No newline at end of file diff --git a/test/sql/test_sort/T/test_parallel_merge_lazy_materialize b/test/sql/test_sort/T/test_parallel_merge_lazy_materialize new file mode 100644 index 0000000000000..79c50c22a66c2 --- /dev/null +++ b/test/sql/test_sort/T/test_parallel_merge_lazy_materialize @@ -0,0 +1,47 @@ +-- name: test_parallel_merge_lazy_materialize + +CREATE TABLE `t0` ( + `c0` int(11) NULL COMMENT "", + `c1` varchar(20) NULL COMMENT "", + `c2` varchar(200) NULL COMMENT "", + `c3` int(11) NULL COMMENT "", + `c4` int(11) NULL COMMENT "", + `c5` int(11) NULL COMMENT "", + `c6` int(11) NULL COMMENT "", + `c7` int(11) NULL COMMENT "" +) ENGINE=OLAP +DUPLICATE KEY(`c0`, `c1`) +COMMENT "OLAP" +DISTRIBUTED BY HASH(`c0`, `c1`) BUCKETS 48 +PROPERTIES ( +"replication_num" = "1", +"in_memory" = "false", +"storage_format" = "DEFAULT", +"enable_persistent_index" = "false", +"replicated_storage" = "true", +"compression" = "LZ4" +); + +insert into t0 SELECT generate_series, generate_series, generate_series, generate_series, generate_series, generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 40960)); + +select count(*) from t0; +-- auto +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(ORDER BY c5) AS wv FROM t0) a; +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv), SUM(c1), SUM(c2), SUM(c3), SUM(c4), SUM(c5), SUM(c6) FROM (SELECT c1,c2,c3,c4,c5,c6, COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +select * from t0 order by 1,2,3,4,5,6 limit 10; +select * from t0 order by 1,2,3,4,5,6 desc limit 10; + +set parallel_merge_late_materialization_mode="always"; +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(ORDER BY c5) AS wv FROM t0) a; +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv), SUM(c1), SUM(c2), SUM(c3), SUM(c4), SUM(c5), SUM(c6) FROM (SELECT c1,c2,c3,c4,c5,c6, COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +select * from t0 order by 1,2,3,4,5,6 limit 10; +select * from t0 order by 1,2,3,4,5,6 desc limit 10; + +set parallel_merge_late_materialization_mode="never"; +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(ORDER BY c5) AS wv FROM t0) a; +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv) FROM (SELECT COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +SELECT SUM(wv), AVG(wv), MIN(wv), MAX(wv), SUM(c1), SUM(c2), SUM(c3), SUM(c4), SUM(c5), SUM(c6) FROM (SELECT c1,c2,c3,c4,c5,c6, COUNT(c2) OVER(PARTITION BY c1 ORDER BY c5) AS wv FROM t0) a; +select * from t0 order by 1,2,3,4,5,6 limit 10; +select * from t0 order by 1,2,3,4,5,6 desc limit 10;