Skip to content

Commit

Permalink
[Enhancement] add session variable parallel_merge_late_materializatio…
Browse files Browse the repository at this point in the history
…n_mode control the parallel merge behaviour (#55082)

Signed-off-by: stdpain <[email protected]>
(cherry picked from commit 37ed864)

# Conflicts:
#	be/src/exec/sorting/merge_path.h
#	gensrc/thrift/PlanNodes.thrift
  • Loading branch information
stdpain authored and mergify[bot] committed Jan 16, 2025
1 parent 34ec65d commit 4a2e176
Show file tree
Hide file tree
Showing 15 changed files with 268 additions and 20 deletions.
4 changes: 4 additions & 0 deletions be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ pipeline::OpFactories ExchangeNode::decompose_to_pipeline(pipeline::PipelineBuil
auto exchange_merge_sort_source_operator = std::make_shared<ExchangeParallelMergeSourceOperatorFactory>(
context->next_operator_id(), id(), _num_senders, _input_row_desc, &_sort_exec_exprs, _is_asc_order,
_nulls_first, _offset, _limit);
if (_texchange_node.__isset.parallel_merge_late_materialize_mode) {
exchange_merge_sort_source_operator->set_materialized_mode(
_texchange_node.parallel_merge_late_materialize_mode);
}
exchange_merge_sort_source_operator->set_degree_of_parallelism(context->degree_of_parallelism());
operators.emplace_back(std::move(exchange_merge_sort_source_operator));
// This particular exchange source will be executed in a concurrent way, and finally we need to gather them into one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ merge_path::MergePathCascadeMerger* ExchangeParallelMergeSourceOperatorFactory::
SortDescs sort_descs(_is_asc_order, _nulls_first);
_merger = std::make_unique<merge_path::MergePathCascadeMerger>(
state->chunk_size(), degree_of_parallelism(), _sort_exec_exprs->lhs_ordering_expr_ctxs(), sort_descs,
_row_desc.tuple_descriptors()[0], TTopNType::ROW_NUMBER, _offset, _limit, chunk_providers);
_row_desc.tuple_descriptors()[0], TTopNType::ROW_NUMBER, _offset, _limit, chunk_providers,
_late_materialize_mode);
}
return _merger.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class ExchangeParallelMergeSourceOperatorFactory final : public SourceOperatorFa
void close_stream_recvr();

SourceOperatorFactory::AdaptiveState adaptive_initial_state() const override { return AdaptiveState::ACTIVE; }
void set_materialized_mode(TLateMaterializeMode::type mode) { _late_materialize_mode = mode; }

private:
const int32_t _num_sender;
Expand All @@ -114,6 +115,7 @@ class ExchangeParallelMergeSourceOperatorFactory final : public SourceOperatorFa
const std::vector<bool>& _nulls_first;
const int64_t _offset;
const int64_t _limit;
TLateMaterializeMode::type _late_materialize_mode = TLateMaterializeMode::AUTO;

std::shared_ptr<DataStreamRecvr> _stream_recvr;
std::atomic<int64_t> _stream_recvr_cnt = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ OperatorPtr LocalParallelMergeSortSourceOperatorFactory::create(int32_t degree_o
_mergers.push_back(std::make_unique<merge_path::MergePathCascadeMerger>(
_state->chunk_size(), degree_of_parallelism, sort_context->sort_exprs(), sort_context->sort_descs(),
_tuple_desc, sort_context->topn_type(), sort_context->offset(), sort_context->limit(),
chunk_providers));
chunk_providers, _late_materialize_mode));
}
return std::make_shared<LocalParallelMergeSortSourceOperator>(
this, _id, _plan_node_id, driver_sequence, sort_context.get(), _is_gathered, _mergers[0].get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ class LocalParallelMergeSortSourceOperatorFactory final : public SourceOperatorF

void set_tuple_desc(const TupleDescriptor* tuple_desc) { _tuple_desc = tuple_desc; }
void set_is_gathered(const bool is_gathered) { _is_gathered = is_gathered; }
void set_materialized_mode(TLateMaterializeMode::type mode) { _late_materialize_mode = mode; }

private:
const TupleDescriptor* _tuple_desc;
bool _is_gathered = true;
RuntimeState* _state;
TLateMaterializeMode::type _late_materialize_mode = TLateMaterializeMode::AUTO;

// share data with multiple partition sort sink opeartor through _sort_context.
std::shared_ptr<SortContextFactory> _sort_context_factory;
Expand Down
14 changes: 11 additions & 3 deletions be/src/exec/sorting/merge_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,8 @@ MergePathCascadeMerger::MergePathCascadeMerger(const size_t chunk_size, const in
std::vector<ExprContext*> sort_exprs, const SortDescs& sort_descs,
const TupleDescriptor* tuple_desc, const TTopNType::type topn_type,
const int64_t offset, const int64_t limit,
std::vector<MergePathChunkProvider> chunk_providers)
std::vector<MergePathChunkProvider> chunk_providers,
TLateMaterializeMode::type mode)
: _chunk_size(chunk_size > MAX_CHUNK_SIZE ? MAX_CHUNK_SIZE : chunk_size),
_streaming_batch_size(4 * chunk_size * degree_of_parallelism),
_degree_of_parallelism(degree_of_parallelism),
Expand All @@ -657,7 +658,8 @@ MergePathCascadeMerger::MergePathCascadeMerger(const size_t chunk_size, const in
_limit(limit),
_chunk_providers(std::move(chunk_providers)),
_process_cnts(degree_of_parallelism),
_output_chunks(degree_of_parallelism) {
_output_chunks(degree_of_parallelism),
_late_materialization_mode(mode) {
_working_nodes.resize(_degree_of_parallelism);
_metrics.resize(_degree_of_parallelism);

Expand Down Expand Up @@ -1131,11 +1133,17 @@ void MergePathCascadeMerger::_init_late_materialization() {
metrics.profile->add_info_string("LateMaterialization", _late_materialization ? "True" : "False");
});
});

if (_chunk_providers.size() <= 2) {
_late_materialization = false;
return;
}
if (_late_materialization_mode == TLateMaterializeMode::ALWAYS) {
_late_materialization = true;
return;
} else if (_late_materialization_mode == TLateMaterializeMode::NEVER) {
_late_materialization = false;
return;
}

const auto level_size = static_cast<size_t>(std::ceil(std::log2(_chunk_providers.size())));
std::unordered_set<SlotId> early_materialized_slots;
Expand Down
10 changes: 9 additions & 1 deletion be/src/exec/sorting/merge_path.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ class MergePathCascadeMerger {
MergePathCascadeMerger(const size_t chunk_size, const int32_t degree_of_parallelism,
std::vector<ExprContext*> sort_exprs, const SortDescs& sort_descs,
const TupleDescriptor* tuple_desc, const TTopNType::type topn_type, const int64_t offset,
const int64_t limit, std::vector<MergePathChunkProvider> chunk_providers);
const int64_t limit, std::vector<MergePathChunkProvider> chunk_providers,
TLateMaterializeMode::type mode = TLateMaterializeMode::AUTO);
const std::vector<ExprContext*>& sort_exprs() const { return _sort_exprs; }
const SortDescs& sort_descs() const { return _sort_descs; }

Expand Down Expand Up @@ -490,6 +491,13 @@ class MergePathCascadeMerger {
std::chrono::steady_clock::time_point _pending_start;
// First pending should not be recorded, because it all comes from the operator dependency
bool _is_first_pending = true;
<<<<<<< HEAD
=======

TLateMaterializeMode::type _late_materialization_mode;

starrocks::pipeline::Observable _observable;
>>>>>>> 37ed8644bb ([Enhancement] add session variable parallel_merge_late_materialization_mode control the parallel merge behaviour (#55082))
};

} // namespace starrocks::merge_path
4 changes: 4 additions & 0 deletions be/src/exec/topn_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ std::vector<std::shared_ptr<pipeline::OperatorFactory>> TopNNode::_decompose_to_
->set_tuple_desc(_materialized_tuple_desc);
down_cast<LocalParallelMergeSortSourceOperatorFactory*>(source_operator.get())->set_is_gathered(need_merge);
}
if (enable_parallel_merge && _tnode.sort_node.__isset.parallel_merge_late_materialize_mode) {
down_cast<LocalParallelMergeSortSourceOperatorFactory*>(source_operator.get())
->set_materialized_mode(_tnode.sort_node.parallel_merge_late_materialize_mode);
}

ops_sink_with_sort.emplace_back(std::move(sink_operator));
context->add_pipeline(ops_sink_with_sort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.starrocks.sql.optimizer.operator.TopNType;
import com.starrocks.thrift.TExchangeNode;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.thrift.TLateMaterializeMode;
import com.starrocks.thrift.TNormalExchangeNode;
import com.starrocks.thrift.TNormalPlanNode;
import com.starrocks.thrift.TNormalSortInfo;
Expand Down Expand Up @@ -198,8 +199,10 @@ protected void toThrift(TPlanNode msg) {
if (partitionType != null) {
msg.exchange_node.setPartition_type(partitionType);
}
SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
msg.exchange_node.setEnable_parallel_merge(sessionVariable.isEnableParallelMerge());
SessionVariable sv = ConnectContext.get().getSessionVariable();
msg.exchange_node.setEnable_parallel_merge(sv.isEnableParallelMerge());
TLateMaterializeMode mode = TLateMaterializeMode.valueOf(sv.getParallelMergeLateMaterializationMode().toUpperCase());
msg.exchange_node.setParallel_merge_late_materialize_mode(mode);
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/SortNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.optimizer.operator.TopNType;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.thrift.TLateMaterializeMode;
import com.starrocks.thrift.TNormalPlanNode;
import com.starrocks.thrift.TNormalSortInfo;
import com.starrocks.thrift.TNormalSortNode;
Expand Down Expand Up @@ -215,6 +216,8 @@ protected void toThrift(TPlanNode msg) {

msg.sort_node.setLate_materialization(sessionVariable.isFullSortLateMaterialization());
msg.sort_node.setEnable_parallel_merge(sessionVariable.isEnableParallelMerge());
TLateMaterializeMode mode = TLateMaterializeMode.valueOf(sessionVariable.getParallelMergeLateMaterializationMode().toUpperCase());
msg.sort_node.setParallel_merge_late_materialize_mode(mode);

if (info.getPartitionExprs() != null) {
msg.sort_node.setPartition_exprs(Expr.treesToThrift(info.getPartitionExprs()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String GROUP_EXECUTION_MIN_SCAN_ROWS = "group_execution_min_scan_rows";

public static final String ENABLE_PARALLEL_MERGE = "enable_parallel_merge";
public static final String PARALLEL_MERGE_LATE_MATERIALIZATION_MODE = "parallel_merge_late_materialization_mode";
public static final String ENABLE_QUERY_QUEUE = "enable_query_queue";

public static final String WINDOW_PARTITION_MODE = "window_partition_mode";
Expand Down Expand Up @@ -1502,6 +1503,10 @@ public static MaterializedViewRewriteMode parse(String str) {
@VarAttr(name = ENABLE_PARALLEL_MERGE)
private boolean enableParallelMerge = true;

// AUTO/ALWAYS/NEVER
@VarAttr(name = PARALLEL_MERGE_LATE_MATERIALIZATION_MODE)
private String parallelMergeLateMaterializationMode = SessionVariableConstants.AUTO;

@VarAttr(name = ENABLE_QUERY_QUEUE, flag = VariableMgr.INVISIBLE)
private boolean enableQueryQueue = true;

Expand Down Expand Up @@ -1685,6 +1690,10 @@ public boolean isEnableParallelMerge() {
return enableParallelMerge;
}

public String getParallelMergeLateMaterializationMode() {
return parallelMergeLateMaterializationMode;
}

public void setEnableParallelMerge(boolean enableParallelMerge) {
this.enableParallelMerge = enableParallelMerge;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ private SessionVariableConstants() {}

public static final String VARCHAR = "varchar";

public static final String ALWAYS = "always";

public static final String NEVER = "never";

public enum ChooseInstancesMode {

// the number of chosen instances is the same as the max number of instances from its children fragments
Expand Down
26 changes: 14 additions & 12 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -740,18 +740,6 @@ enum TAggregationOp {
PERCENT_RANK
}

//struct TAggregateFunctionCall {
// The aggregate function to call.
// 1: required Types.TFunction fn

// The input exprs to this aggregate function
// 2: required list<Exprs.TExpr> input_exprs

// If set, this aggregate function udf has varargs and this is the index for the
// first variable argument.
// 3: optional i32 vararg_start_idx
//}

struct TAggregationNode {
1: optional list<Exprs.TExpr> grouping_exprs
// aggregate exprs. The root of each expr is the aggregate function. The
Expand Down Expand Up @@ -825,6 +813,12 @@ enum TTopNType {
DENSE_RANK
}

enum TLateMaterializeMode {
AUTO,
ALWAYS,
NEVER,
}

struct TSortNode {
1: required TSortInfo sort_info
// Indicates whether the backend service should use topn vs. sorting
Expand Down Expand Up @@ -861,6 +855,13 @@ struct TSortNode {
29: optional bool late_materialization;
30: optional bool enable_parallel_merge;
31: optional bool analytic_partition_skewed;
<<<<<<< HEAD
=======
32: optional list<Exprs.TExpr> pre_agg_exprs;
33: optional list<Types.TSlotId> pre_agg_output_slot_id;
34: optional bool pre_agg_insert_local_shuffle;
40: optional TLateMaterializeMode parallel_merge_late_materialize_mode;
>>>>>>> 37ed8644bb ([Enhancement] add session variable parallel_merge_late_materialization_mode control the parallel merge behaviour (#55082))
}

enum TAnalyticWindowType {
Expand Down Expand Up @@ -1019,6 +1020,7 @@ struct TExchangeNode {
// Sender's partition type
4: optional Partitions.TPartitionType partition_type;
5: optional bool enable_parallel_merge
6: optional TLateMaterializeMode parallel_merge_late_materialize_mode;
}

// This contains all of the information computed by the plan as part of the resource
Expand Down
Loading

0 comments on commit 4a2e176

Please sign in to comment.