diff --git a/be/src/exec/pipeline/fragment_executor.cpp b/be/src/exec/pipeline/fragment_executor.cpp index fa6e7edbd9b4d..579343815b2f9 100644 --- a/be/src/exec/pipeline/fragment_executor.cpp +++ b/be/src/exec/pipeline/fragment_executor.cpp @@ -131,6 +131,9 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec if (query_options.__isset.enable_profile && query_options.enable_profile) { _query_ctx->set_report_profile(); } + if (query_options.__isset.big_query_profile_second_threshold) { + _query_ctx->set_big_query_profile_threshold(query_options.big_query_profile_second_threshold); + } if (query_options.__isset.pipeline_profile_level) { _query_ctx->set_profile_level(query_options.pipeline_profile_level); } diff --git a/be/src/exec/pipeline/query_context.h b/be/src/exec/pipeline/query_context.h index 27abe74355e76..84a0cfc4aaa32 100644 --- a/be/src/exec/pipeline/query_context.h +++ b/be/src/exec/pipeline/query_context.h @@ -73,8 +73,28 @@ class QueryContext : public std::enable_shared_from_this { _query_deadline = duration_cast(steady_clock::now().time_since_epoch() + _query_expire_seconds).count(); } +<<<<<<< HEAD void set_report_profile() { _is_report_profile = true; } bool is_report_profile() { return _is_report_profile; } +======= + void set_enable_profile() { _enable_profile = true; } + bool enable_profile() { + if (_enable_profile) { + return true; + } + if (_big_query_profile_threshold_ns <= 0) { + return false; + } + return MonotonicNanos() - _query_begin_time > _big_query_profile_threshold_ns; + } + void set_big_query_profile_threshold(int64_t big_query_profile_threshold_s) { + _big_query_profile_threshold_ns = 1'000'000'000L * big_query_profile_threshold_s; + } + void set_runtime_profile_report_interval(int64_t runtime_profile_report_interval_s) { + _runtime_profile_report_interval_ns = 1'000'000'000L * runtime_profile_report_interval_s; + } + int64_t get_runtime_profile_report_interval_ns() { return _runtime_profile_report_interval_ns; } +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) void set_profile_level(const TPipelineProfileLevel::type& profile_level) { _profile_level = profile_level; } const TPipelineProfileLevel::type& profile_level() { return _profile_level; } @@ -173,7 +193,13 @@ class QueryContext : public std::enable_shared_from_this { bool _is_runtime_filter_coordinator = false; std::once_flag _init_mem_tracker_once; std::shared_ptr _profile; +<<<<<<< HEAD bool _is_report_profile = false; +======= + bool _enable_profile = false; + int64_t _big_query_profile_threshold_ns = 0; + int64_t _runtime_profile_report_interval_ns = std::numeric_limits::max(); +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) TPipelineProfileLevel::type _profile_level; std::shared_ptr _mem_tracker; ObjectPool _object_pool; diff --git a/fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java b/fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java new file mode 100644 index 0000000000000..3f7cc3bb8e17a --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java @@ -0,0 +1,198 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.common.profile; + +import com.google.common.base.Stopwatch; +import com.starrocks.common.util.RuntimeProfile; +import com.starrocks.qe.ConnectContext; +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.function.Function; + +public class Tracers { + public enum Mode { + NONE, LOGS, VARS, TIMER, TIMING, + } + + public enum Module { + NONE, ALL, BASE, OPTIMIZER, SCHEDULER, ANALYZE, MV, EXTERNAL, + } + + private static final Tracer EMPTY_TRACER = new Tracer() { + }; + + private static final ThreadLocal THREAD_LOCAL = ThreadLocal.withInitial(Tracers::new); + + // [empty tracer, real tracer] + private final Tracer[] allTracer = new Tracer[] {EMPTY_TRACER, EMPTY_TRACER}; + + // mark enable module + private int moduleMask = 0; + + // mark enable mode + private int modeMask = 0; + + private boolean isCommandLog = false; + + private Tracer tracer(Module module, Mode mode) { + // need return real tracer when mode && module enable + // enable mode is `modeMask |= 1 << mode.ordinal()`, check mode is `(modeMask >> mode.ordinal()) & 1`, so + // when enable mode will return allTracer[1], disable will return allTracer[0] + return allTracer[(modeMask >> mode.ordinal()) & (moduleMask >> module.ordinal() & 1)]; + } + + public static void register(ConnectContext context) { + Tracers tracers = THREAD_LOCAL.get(); + tracers.isCommandLog = StringUtils.equalsIgnoreCase("command", context.getSessionVariable().getTraceLogMode()); + LogTracer logTracer = tracers.isCommandLog ? new CommandLogTracer() : new FileLogTracer(); + tracers.allTracer[0] = EMPTY_TRACER; + tracers.allTracer[1] = new TracerImpl(Stopwatch.createStarted(), new TimeWatcher(), new VarTracer(), logTracer); + } + + public static void register() { + // default register FileLogTracer + Tracers tracers = THREAD_LOCAL.get(); + LogTracer logTracer = new FileLogTracer(); + tracers.allTracer[0] = EMPTY_TRACER; + tracers.allTracer[1] = new TracerImpl(Stopwatch.createStarted(), new TimeWatcher(), new VarTracer(), logTracer); + } + + public static void init(ConnectContext context, Mode mode, String moduleStr) { + Tracers tracers = THREAD_LOCAL.get(); + boolean enableProfile = + context.getSessionVariable().isEnableProfile() || context.getSessionVariable().isEnableBigQueryProfile(); + boolean checkMV = context.getSessionVariable().isEnableMaterializedViewRewriteOrError(); + + Module module = getTraceModule(moduleStr); + if (Module.NONE == module || null == module) { + tracers.moduleMask = 0; + } + if (Mode.NONE == mode || null == mode) { + tracers.modeMask = 0; + } + if (enableProfile) { + tracers.moduleMask |= 1 << Module.BASE.ordinal(); + tracers.moduleMask |= 1 << Module.EXTERNAL.ordinal(); + tracers.moduleMask |= 1 << Module.SCHEDULER.ordinal(); + + tracers.modeMask |= 1 << Mode.TIMER.ordinal(); + tracers.modeMask |= 1 << Mode.VARS.ordinal(); + } + if (checkMV) { + tracers.moduleMask |= 1 << Module.MV.ordinal(); + + tracers.modeMask |= 1 << Mode.VARS.ordinal(); + } + if (Module.ALL == module) { + tracers.moduleMask = Integer.MAX_VALUE; + } else if (Module.NONE != module && null != module) { + tracers.moduleMask |= 1 << Module.BASE.ordinal(); + tracers.moduleMask |= 1 << module.ordinal(); + } + + if (Mode.TIMING == mode) { + tracers.modeMask = Integer.MAX_VALUE; + } else if (Mode.NONE != mode && null != mode) { + tracers.modeMask |= 1 << mode.ordinal(); + } + } + + public static void close() { + THREAD_LOCAL.remove(); + } + + private static Module getTraceModule(String str) { + try { + if (str != null) { + return Module.valueOf(str.toUpperCase()); + } + } catch (Exception e) { + return Module.NONE; + } + return Module.NONE; + } + + public static Timer watchScope(String name) { + Tracers tracers = THREAD_LOCAL.get(); + return tracers.tracer(Module.BASE, Mode.TIMER).watchScope(name); + } + + public static Timer watchScope(Module module, String name) { + Tracers tracers = THREAD_LOCAL.get(); + return tracers.tracer(module, Mode.TIMER).watchScope(name); + } + + public static void log(Module module, String log) { + Tracers tracers = THREAD_LOCAL.get(); + tracers.tracer(module, Mode.LOGS).log(log); + } + + public static void log(Module module, String log, Object... args) { + Tracers tracers = THREAD_LOCAL.get(); + tracers.tracer(module, Mode.LOGS).log(log, args); + } + + // lazy log, use it if you want to avoid construct log string when log is disabled + public static void log(Module module, Function func, Object... args) { + Tracers tracers = THREAD_LOCAL.get(); + tracers.tracer(module, Mode.LOGS).log(func, args); + } + + public static void log(String log, Object... args) { + Tracers tracers = THREAD_LOCAL.get(); + tracers.tracer(Module.BASE, Mode.TIMER).log(log, args); + } + + public static void record(Module module, String name, String value) { + Tracers tracers = THREAD_LOCAL.get(); + tracers.tracer(module, Mode.VARS).record(name, value); + } + + public static void count(Module module, String name, int count) { + Tracers tracers = THREAD_LOCAL.get(); + tracers.tracer(module, Mode.VARS).count(name, count); + } + + public static List> getAllVars() { + Tracers tracers = THREAD_LOCAL.get(); + return tracers.allTracer[1].getAllVars(); + } + + public static String printScopeTimer() { + Tracers tracers = THREAD_LOCAL.get(); + return tracers.allTracer[1].printScopeTimer(); + } + + public static String printTiming() { + Tracers tracers = THREAD_LOCAL.get(); + return tracers.allTracer[1].printTiming(); + } + + public static String printVars() { + Tracers tracers = THREAD_LOCAL.get(); + return tracers.allTracer[1].printVars(); + } + + public static String printLogs() { + Tracers tracers = THREAD_LOCAL.get(); + return tracers.allTracer[1].printLogs(); + } + + public static void toRuntimeProfile(RuntimeProfile profile) { + Tracers tracers = THREAD_LOCAL.get(); + tracers.allTracer[1].toRuntimeProfile(profile); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadLoadingTask.java index 4093155b3abf8..0d13b8217acab 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadLoadingTask.java @@ -219,9 +219,14 @@ private void executeOnce() throws Exception { curCoordinator.getQueryProfile().getCounterTotalTime() .setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond)); +<<<<<<< HEAD curCoordinator.endProfile(); curCoordinator.mergeIsomorphicProfiles(); profile.addChild(curCoordinator.getQueryProfile()); +======= + curCoordinator.collectProfileSync(); + profile.addChild(curCoordinator.buildQueryProfile(context.needMergeProfile())); +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) StringBuilder builder = new StringBuilder(); profile.prettyPrint(builder, ""); diff --git a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java index 663eb4072df99..56a6f76bc4e86 100644 --- a/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java @@ -952,6 +952,74 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw } } +<<<<<<< HEAD +======= + public void collectProfile() { + long currentTimestamp = System.currentTimeMillis(); + long totalTimeMs = currentTimestamp - createTimeMs; + + // For the usage scenarios of flink cdc or routine load, + // the frequency of stream load maybe very high, resulting in many profiles, + // but we may only care about the long-duration stream load profile. + if (totalTimeMs < Config.stream_load_profile_collect_second * 1000) { + LOG.info(String.format("Load %s, totalTimeMs %d < Config.stream_load_profile_collect_second %d)", + label, totalTimeMs, Config.stream_load_profile_collect_second)); + return; + } + + RuntimeProfile profile = new RuntimeProfile("Load"); + RuntimeProfile summaryProfile = new RuntimeProfile("Summary"); + summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(loadId)); + summaryProfile.addInfoString(ProfileManager.START_TIME, + TimeUtils.longToTimeString(createTimeMs)); + + summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(System.currentTimeMillis())); + summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); + + summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load"); + summaryProfile.addInfoString("StarRocks Version", + String.format("%s-%s", Version.STARROCKS_VERSION, Version.STARROCKS_COMMIT_HASH)); + summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, dbName); + + Map loadCounters = coord.getLoadCounters(); + if (loadCounters != null && loadCounters.size() != 0) { + summaryProfile.addInfoString("NumRowsNormal", loadCounters.get(LoadEtlTask.DPP_NORMAL_ALL)); + summaryProfile.addInfoString("NumLoadBytesTotal", loadCounters.get(LoadJob.LOADED_BYTES)); + summaryProfile.addInfoString("NumRowsAbnormal", loadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL)); + summaryProfile.addInfoString("numRowsUnselected", loadCounters.get(LoadJob.UNSELECTED_ROWS)); + } + ConnectContext session = ConnectContext.get(); + if (session != null) { + SessionVariable variables = session.getSessionVariable(); + if (variables != null) { + summaryProfile.addInfoString("NonDefaultSessionVariables", variables.getNonDefaultVariablesJson()); + } + } + + profile.addChild(summaryProfile); + if (coord.getQueryProfile() != null) { + if (!isSyncStreamLoad()) { + coord.collectProfileSync(); + profile.addChild(coord.buildQueryProfile(session == null || session.needMergeProfile())); + } else { + profile.addChild(coord.getQueryProfile()); + } + } + + ProfileManager.getInstance().pushLoadProfile(profile); + } + + public void setLoadState(long loadBytes, long loadRows, long filteredRows, long unselectedRows, + String errorLogUrl, String errorMsg) { + this.numRowsNormal = loadRows; + this.numRowsAbnormal = filteredRows; + this.numRowsUnselected = unselectedRows; + this.numLoadBytesTotal = loadBytes; + this.trackingUrl = errorLogUrl; + this.errorMsg = errorMsg; + } + +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) @Override public void replayOnCommitted(TransactionState txnState) { writeLock(); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java index 58dda7a80d49a..783aabb693acb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java @@ -45,6 +45,11 @@ import com.starrocks.sql.ast.UserVariable; import com.starrocks.sql.optimizer.dump.DumpInfo; import com.starrocks.sql.optimizer.dump.QueryDumpInfo; +<<<<<<< HEAD +======= +import com.starrocks.sql.parser.SqlParser; +import com.starrocks.thrift.TPipelineProfileLevel; +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) import com.starrocks.thrift.TUniqueId; import com.starrocks.thrift.TWorkGroup; import org.apache.logging.log4j.LogManager; @@ -505,6 +510,25 @@ public void setLastQueryId(UUID queryId) { this.lastQueryId = queryId; } + public boolean isProfileEnabled() { + if (sessionVariable == null) { + return false; + } + if (sessionVariable.isEnableProfile()) { + return true; + } + if (!sessionVariable.isEnableBigQueryProfile()) { + return false; + } + return System.currentTimeMillis() - getStartTime() > + 1000L * sessionVariable.getBigQueryProfileSecondThreshold(); + } + + public boolean needMergeProfile() { + return isProfileEnabled() && + sessionVariable.getPipelineProfileLevel() < TPipelineProfileLevel.DETAIL.getValue(); + } + public byte[] getAuthDataSalt() { return authDataSalt; } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java b/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java new file mode 100644 index 0000000000000..7b09bcdb9dacb --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java @@ -0,0 +1,1077 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file is based on code available under the Apache license here: +// https://github.com/apache/incubator-doris/blob/master/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.starrocks.qe; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.starrocks.analysis.DescriptorTable; +import com.starrocks.authentication.AuthenticationMgr; +import com.starrocks.catalog.FsBroker; +import com.starrocks.common.AnalysisException; +import com.starrocks.common.Config; +import com.starrocks.common.FeConstants; +import com.starrocks.common.Status; +import com.starrocks.common.ThriftServer; +import com.starrocks.common.UserException; +import com.starrocks.common.profile.Timer; +import com.starrocks.common.profile.Tracers; +import com.starrocks.common.util.AuditStatisticsUtil; +import com.starrocks.common.util.DebugUtil; +import com.starrocks.common.util.RuntimeProfile; +import com.starrocks.connector.exception.RemoteFileNotFoundException; +import com.starrocks.planner.PlanFragment; +import com.starrocks.planner.ResultSink; +import com.starrocks.planner.RuntimeFilterDescription; +import com.starrocks.planner.ScanNode; +import com.starrocks.planner.StreamLoadPlanner; +import com.starrocks.privilege.PrivilegeBuiltinConstants; +import com.starrocks.proto.PPlanFragmentCancelReason; +import com.starrocks.proto.PQueryStatistics; +import com.starrocks.qe.scheduler.Coordinator; +import com.starrocks.qe.scheduler.Deployer; +import com.starrocks.qe.scheduler.QueryRuntimeProfile; +import com.starrocks.qe.scheduler.dag.ExecutionDAG; +import com.starrocks.qe.scheduler.dag.ExecutionFragment; +import com.starrocks.qe.scheduler.dag.FragmentInstance; +import com.starrocks.qe.scheduler.dag.FragmentInstanceExecState; +import com.starrocks.qe.scheduler.dag.JobSpec; +import com.starrocks.qe.scheduler.slot.LogicalSlot; +import com.starrocks.rpc.RpcException; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.sql.LoadPlanner; +import com.starrocks.sql.ast.UserIdentity; +import com.starrocks.sql.plan.ExecPlan; +import com.starrocks.system.ComputeNode; +import com.starrocks.thrift.TDescriptorTable; +import com.starrocks.thrift.TLoadJobType; +import com.starrocks.thrift.TNetworkAddress; +import com.starrocks.thrift.TQueryType; +import com.starrocks.thrift.TReportAuditStatisticsParams; +import com.starrocks.thrift.TReportExecStatusParams; +import com.starrocks.thrift.TRuntimeFilterDestination; +import com.starrocks.thrift.TRuntimeFilterProberParams; +import com.starrocks.thrift.TSinkCommitInfo; +import com.starrocks.thrift.TStatusCode; +import com.starrocks.thrift.TTabletCommitInfo; +import com.starrocks.thrift.TTabletFailInfo; +import com.starrocks.thrift.TUniqueId; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class DefaultCoordinator extends Coordinator { + private static final Logger LOG = LogManager.getLogger(DefaultCoordinator.class); + + private static final int DEFAULT_PROFILE_TIMEOUT_SECOND = 2; + + private final JobSpec jobSpec; + private final ExecutionDAG executionDAG; + + private final ConnectContext connectContext; + + private final CoordinatorPreprocessor coordinatorPreprocessor; + + /** + * Protects all the fields below. + */ + private final Lock lock = new ReentrantLock(); + + /** + * Overall status of the entire query. + *

Set to the first reported fragment error status or to CANCELLED, if {@link #cancel(String cancelledMessage)} is called. + */ + private Status queryStatus = new Status(); + + private PQueryStatistics auditStatistics; + + private final QueryRuntimeProfile queryProfile; + + private ResultReceiver receiver; + private int numReceivedRows = 0; + + /** + * True indicates the query is done returning all results. + *

It is possible that the coordinator still needs to wait for cleanup on remote fragments (e.g. queries with limit) + * Once this is set to true, errors from remote fragments are ignored. + */ + private boolean returnedAllResults; + + private boolean thriftServerHighLoad; + + private LogicalSlot slot = null; + + public static class Factory implements Coordinator.Factory { + + @Override + public DefaultCoordinator createQueryScheduler(ConnectContext context, List fragments, + List scanNodes, + TDescriptorTable descTable) { + JobSpec jobSpec = + JobSpec.Factory.fromQuerySpec(context, fragments, scanNodes, descTable, TQueryType.SELECT); + return new DefaultCoordinator(context, jobSpec); + } + + @Override + public DefaultCoordinator createInsertScheduler(ConnectContext context, List fragments, + List scanNodes, + TDescriptorTable descTable) { + JobSpec jobSpec = JobSpec.Factory.fromQuerySpec(context, fragments, scanNodes, descTable, TQueryType.LOAD); + return new DefaultCoordinator(context, jobSpec); + } + + @Override + public DefaultCoordinator createBrokerLoadScheduler(LoadPlanner loadPlanner) { + ConnectContext context = loadPlanner.getContext(); + JobSpec jobSpec = JobSpec.Factory.fromBrokerLoadJobSpec(loadPlanner); + + return new DefaultCoordinator(context, jobSpec); + } + + @Override + public DefaultCoordinator createStreamLoadScheduler(LoadPlanner loadPlanner) { + ConnectContext context = loadPlanner.getContext(); + JobSpec jobSpec = JobSpec.Factory.fromStreamLoadJobSpec(loadPlanner); + + return new DefaultCoordinator(context, jobSpec); + } + + @Override + public DefaultCoordinator createSyncStreamLoadScheduler(StreamLoadPlanner planner, TNetworkAddress address) { + JobSpec jobSpec = JobSpec.Factory.fromSyncStreamLoadSpec(planner); + return new DefaultCoordinator(jobSpec, planner, address); + } + + @Override + public DefaultCoordinator createBrokerExportScheduler(Long jobId, TUniqueId queryId, DescriptorTable descTable, + List fragments, List scanNodes, + String timezone, + long startTime, Map sessionVariables, + long execMemLimit) { + ConnectContext context = new ConnectContext(); + context.setQualifiedUser(AuthenticationMgr.ROOT_USER); + context.setCurrentUserIdentity(UserIdentity.ROOT); + context.setCurrentRoleIds(Sets.newHashSet(PrivilegeBuiltinConstants.ROOT_ROLE_ID)); + context.getSessionVariable().setEnablePipelineEngine(true); + context.getSessionVariable().setPipelineDop(0); + + JobSpec jobSpec = JobSpec.Factory.fromBrokerExportSpec(context, jobId, queryId, descTable, + fragments, scanNodes, timezone, + startTime, sessionVariables, execMemLimit); + + return new DefaultCoordinator(context, jobSpec); + } + + @Override + public DefaultCoordinator createNonPipelineBrokerLoadScheduler(Long jobId, TUniqueId queryId, + DescriptorTable descTable, + List fragments, + List scanNodes, + String timezone, + long startTime, + Map sessionVariables, + ConnectContext context, long execMemLimit) { + JobSpec jobSpec = JobSpec.Factory.fromNonPipelineBrokerLoadJobSpec(context, jobId, queryId, descTable, + fragments, scanNodes, timezone, + startTime, sessionVariables, execMemLimit); + + return new DefaultCoordinator(context, jobSpec); + } + } + + /** + * Only used for sync stream load profile, and only init relative data structure. + */ + public DefaultCoordinator(JobSpec jobSpec, StreamLoadPlanner planner, TNetworkAddress address) { + this.connectContext = planner.getConnectContext(); + this.jobSpec = jobSpec; + this.executionDAG = ExecutionDAG.build(jobSpec); + + TUniqueId queryId = jobSpec.getQueryId(); + + LOG.info("Execution Profile: {}", DebugUtil.printId(queryId)); + + FragmentInstanceExecState execState = FragmentInstanceExecState.createFakeExecution(queryId, address); + executionDAG.addExecution(execState); + + this.queryProfile = new QueryRuntimeProfile(connectContext, jobSpec, 1); + queryProfile.attachInstances(Collections.singletonList(queryId)); + queryProfile.attachExecutionProfiles(executionDAG.getExecutions()); + + this.coordinatorPreprocessor = null; + } + + DefaultCoordinator(ConnectContext context, JobSpec jobSpec) { + this.connectContext = context; + this.jobSpec = jobSpec; + this.returnedAllResults = false; + + this.coordinatorPreprocessor = new CoordinatorPreprocessor(context, jobSpec); + this.executionDAG = coordinatorPreprocessor.getExecutionDAG(); + + this.queryProfile = + new QueryRuntimeProfile(connectContext, jobSpec, executionDAG.getFragmentsInCreatedOrder().size()); + } + + @Override + public LogicalSlot getSlot() { + return slot; + } + + public void setSlot(LogicalSlot slot) { + this.slot = slot; + } + + @Override + public long getLoadJobId() { + return jobSpec.getLoadJobId(); + } + + @Override + public void setLoadJobId(Long jobId) { + jobSpec.setLoadJobId(jobId); + } + + @Override + public TUniqueId getQueryId() { + return jobSpec.getQueryId(); + } + + @Override + public void setQueryId(TUniqueId queryId) { + jobSpec.setQueryId(queryId); + } + + @Override + public void setLoadJobType(TLoadJobType type) { + jobSpec.setLoadJobType(type); + } + + @Override + public Status getExecStatus() { + return queryStatus; + } + + @Override + public RuntimeProfile getQueryProfile() { + return queryProfile.getQueryProfile(); + } + + @Override + public List getDeltaUrls() { + return queryProfile.getDeltaUrls(); + } + + @Override + public Map getLoadCounters() { + return queryProfile.getLoadCounters(); + } + + @Override + public String getTrackingUrl() { + return queryProfile.getTrackingUrl(); + } + + @Override + public List getRejectedRecordPaths() { + return queryProfile.getRejectedRecordPaths(); + } + + @Override + public long getStartTimeMs() { + return jobSpec.getStartTimeMs(); + } + + public JobSpec getJobSpec() { + return jobSpec; + } + + @Override + public void setTimeoutSecond(int timeoutSecond) { + jobSpec.setQueryTimeout(timeoutSecond); + } + + @Override + public void clearExportStatus() { + lock.lock(); + try { + executionDAG.resetExecutions(); + queryStatus.setStatus(new Status()); + queryProfile.clearExportStatus(); + } finally { + lock.unlock(); + } + } + + @Override + public List getCommitInfos() { + return queryProfile.getCommitInfos(); + } + + @Override + public List getFailInfos() { + return queryProfile.getFailInfos(); + } + + @Override + public List getSinkCommitInfos() { + return queryProfile.getSinkCommitInfos(); + } + + @Override + public List getExportFiles() { + return queryProfile.getExportFiles(); + } + + @Override + public void setTopProfileSupplier(Supplier topProfileSupplier) { + queryProfile.setTopProfileSupplier(topProfileSupplier); + } + + @Override + public void setExecPlan(ExecPlan execPlan) { + queryProfile.setExecPlan(execPlan); + } + + @Override + public boolean isUsingBackend(Long backendID) { + return coordinatorPreprocessor.getWorkerProvider().isWorkerSelected(backendID); + } + + private void lock() { + lock.lock(); + } + + private void unlock() { + lock.unlock(); + } + + public ExecutionDAG getExecutionDAG() { + return executionDAG; + } + + // Initiate asynchronous execState of query. Returns as soon as all plan fragments + // have started executing at their respective backends. + // 'Request' must contain at least a coordinator plan fragment (ie, can't + // be for a query like 'SELECT 1'). + // A call to Exec() must precede all other member function calls. + public void prepareExec() throws Exception { + if (LOG.isDebugEnabled()) { + if (!jobSpec.getScanNodes().isEmpty()) { + LOG.debug("debug: in Coordinator::exec. query id: {}, planNode: {}", + DebugUtil.printId(jobSpec.getQueryId()), + jobSpec.getScanNodes().get(0).treeToThrift()); + } + if (!jobSpec.getFragments().isEmpty()) { + LOG.debug("debug: in Coordinator::exec. query id: {}, fragment: {}", + DebugUtil.printId(jobSpec.getQueryId()), + jobSpec.getFragments().get(0).toThrift()); + } + LOG.debug("debug: in Coordinator::exec. query id: {}, desc table: {}", + DebugUtil.printId(jobSpec.getQueryId()), jobSpec.getDescTable()); + } + + if (slot != null && slot.getPipelineDop() > 0 && + slot.getPipelineDop() != jobSpec.getQueryOptions().getPipeline_dop()) { + jobSpec.getFragments().forEach(fragment -> fragment.limitMaxPipelineDop(slot.getPipelineDop())); + } + + coordinatorPreprocessor.prepareExec(); + + prepareResultSink(); + + prepareProfile(); + } + + @Override + public void onFinished() { + GlobalStateMgr.getCurrentState().getSlotProvider().cancelSlotRequirement(slot); + GlobalStateMgr.getCurrentState().getSlotProvider().releaseSlot(slot); + } + + public CoordinatorPreprocessor getPrepareInfo() { + return coordinatorPreprocessor; + } + + public List getFragments() { + return jobSpec.getFragments(); + } + + public boolean isLoadType() { + return jobSpec.isLoadType(); + } + + @Override + public List getScanNodes() { + return jobSpec.getScanNodes(); + } + + @Override + public void startScheduling(boolean needDeploy) throws Exception { + try (Timer timer = Tracers.watchScope(Tracers.Module.SCHEDULER, "Pending")) { + QueryQueueManager.getInstance().maybeWait(connectContext, this); + } + + try (Timer timer = Tracers.watchScope(Tracers.Module.SCHEDULER, "Prepare")) { + prepareExec(); + } + + try (Timer timer = Tracers.watchScope(Tracers.Module.SCHEDULER, "Deploy")) { + deliverExecFragments(needDeploy); + } + + // Prevent `explain scheduler` from waiting until the profile timeout. + if (!needDeploy) { + queryProfile.finishAllInstances(Status.OK); + } + } + + @Override + public String getSchedulerExplain() { + return executionDAG.getFragmentsInPreorder().stream() + .map(ExecutionFragment::getExplainString) + .collect(Collectors.joining("\n")); + } + + private void prepareProfile() { + ExecutionFragment rootExecFragment = executionDAG.getRootFragment(); + boolean isLoadType = !(rootExecFragment.getPlanFragment().getSink() instanceof ResultSink); + if (isLoadType) { + jobSpec.getQueryOptions().setEnable_profile(true); + List relatedBackendIds = coordinatorPreprocessor.getWorkerProvider().getSelectedWorkerIds(); + GlobalStateMgr.getCurrentState().getLoadMgr().initJobProgress( + jobSpec.getLoadJobId(), jobSpec.getQueryId(), executionDAG.getInstanceIds(), relatedBackendIds); + LOG.info("dispatch load job: {} to {}", DebugUtil.printId(jobSpec.getQueryId()), + coordinatorPreprocessor.getWorkerProvider().getSelectedWorkerIds()); + } + + queryProfile.attachInstances(executionDAG.getInstanceIds()); + } + + private void prepareResultSink() throws AnalysisException { + ExecutionFragment rootExecFragment = executionDAG.getRootFragment(); + long workerId = rootExecFragment.getInstances().get(0).getWorkerId(); + ComputeNode worker = coordinatorPreprocessor.getWorkerProvider().getWorkerById(workerId); + // Select top fragment as global runtime filter merge address + setGlobalRuntimeFilterParams(rootExecFragment, worker.getBrpcIpAddress()); + boolean isLoadType = !(rootExecFragment.getPlanFragment().getSink() instanceof ResultSink); + if (isLoadType) { + // TODO (by satanson): Other DataSink except ResultSink can not support global + // runtime filter merging at present, we should support it in future. + // pipeline-level runtime filter needs to derive RuntimeFilterLayout, so we collect + // RuntimeFilterDescription + for (ExecutionFragment execFragment : executionDAG.getFragmentsInPreorder()) { + PlanFragment fragment = execFragment.getPlanFragment(); + fragment.collectBuildRuntimeFilters(fragment.getPlanRoot()); + } + return; + } + + TNetworkAddress execBeAddr = worker.getAddress(); + receiver = new ResultReceiver( + rootExecFragment.getInstances().get(0).getInstanceId(), + workerId, + worker.getBrpcAddress(), + jobSpec.getQueryOptions().query_timeout * 1000); + + if (LOG.isDebugEnabled()) { + LOG.debug("dispatch query job: {} to {}", DebugUtil.printId(jobSpec.getQueryId()), execBeAddr); + } + + // set the broker address for OUTFILE sink + ResultSink resultSink = (ResultSink) rootExecFragment.getPlanFragment().getSink(); + if (resultSink.isOutputFileSink() && resultSink.needBroker()) { + FsBroker broker = GlobalStateMgr.getCurrentState().getBrokerMgr().getBroker(resultSink.getBrokerName(), + execBeAddr.getHostname()); + resultSink.setBrokerAddr(broker.ip, broker.port); + LOG.info("OUTFILE through broker: {}:{}", broker.ip, broker.port); + } + } + + private void deliverExecFragments(boolean needDeploy) throws RpcException, UserException { + lock(); + try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployLockInternalTime")) { + Deployer deployer = + new Deployer(connectContext, jobSpec, executionDAG, coordinatorPreprocessor.getCoordAddress(), + this::handleErrorExecution); + for (List concurrentFragments : executionDAG.getFragmentsInTopologicalOrderFromRoot()) { + deployer.deployFragments(concurrentFragments, needDeploy); + } + + queryProfile.attachExecutionProfiles(executionDAG.getExecutions()); + } finally { + unlock(); + } + } + + private void handleErrorExecution(Status status, FragmentInstanceExecState execution, Throwable failure) + throws UserException, RpcException { + cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); + switch (Objects.requireNonNull(status.getErrorCode())) { + case TIMEOUT: + throw new UserException("query timeout. backend id: " + execution.getWorker().getId()); + case THRIFT_RPC_ERROR: + SimpleScheduler.addToBlacklist(execution.getWorker().getId()); + throw new RpcException(execution.getWorker().getHost(), "rpc failed"); + default: + throw new UserException(status.getErrorMsg()); + } + } + + // choose at most num FInstances on difference BEs + private List pickupFInstancesOnDifferentHosts(List instances, int num) { + if (instances.size() <= num) { + return instances; + } + + Map> workerId2instances = Maps.newHashMap(); + for (FragmentInstance instance : instances) { + workerId2instances.putIfAbsent(instance.getWorkerId(), Lists.newLinkedList()); + workerId2instances.get(instance.getWorkerId()).add(instance); + } + List picked = Lists.newArrayList(); + while (picked.size() < num) { + for (List instancesPerHost : workerId2instances.values()) { + if (instancesPerHost.isEmpty()) { + continue; + } + picked.add(instancesPerHost.remove(0)); + } + } + return picked; + } + + private List mergeGRFProbers(List probers) { + Map> host2probers = Maps.newHashMap(); + for (TRuntimeFilterProberParams prober : probers) { + host2probers.putIfAbsent(prober.fragment_instance_address, Lists.newArrayList()); + host2probers.get(prober.fragment_instance_address).add(prober.fragment_instance_id); + } + return host2probers.entrySet().stream().map( + e -> new TRuntimeFilterDestination().setAddress(e.getKey()).setFinstance_ids(e.getValue()) + ).collect(Collectors.toList()); + } + + private void setGlobalRuntimeFilterParams(ExecutionFragment topParams, + TNetworkAddress mergeHost) { + + Map> broadcastGRFProbersMap = Maps.newHashMap(); + List broadcastGRFList = Lists.newArrayList(); + Map> idToProbePrams = new HashMap<>(); + + for (ExecutionFragment execFragment : executionDAG.getFragmentsInPreorder()) { + PlanFragment fragment = execFragment.getPlanFragment(); + fragment.collectBuildRuntimeFilters(fragment.getPlanRoot()); + fragment.collectProbeRuntimeFilters(fragment.getPlanRoot()); + for (Map.Entry kv : fragment.getProbeRuntimeFilters().entrySet()) { + List probeParamList = Lists.newArrayList(); + for (final FragmentInstance instance : execFragment.getInstances()) { + TRuntimeFilterProberParams probeParam = new TRuntimeFilterProberParams(); + probeParam.setFragment_instance_id(instance.getInstanceId()); + probeParam.setFragment_instance_address( + coordinatorPreprocessor.getBrpcIpAddress(instance.getWorkerId())); + probeParamList.add(probeParam); + } + if (jobSpec.isEnablePipeline() && kv.getValue().isBroadcastJoin() && + kv.getValue().isHasRemoteTargets()) { + broadcastGRFProbersMap.computeIfAbsent(kv.getKey(), k -> new ArrayList<>()).addAll(probeParamList); + } else { + idToProbePrams.computeIfAbsent(kv.getKey(), k -> new ArrayList<>()).addAll(probeParamList); + } + } + + Set broadcastGRfSenders = + pickupFInstancesOnDifferentHosts(execFragment.getInstances(), 3).stream(). + map(FragmentInstance::getInstanceId).collect(Collectors.toSet()); + for (Map.Entry kv : fragment.getBuildRuntimeFilters().entrySet()) { + int rid = kv.getKey(); + RuntimeFilterDescription rf = kv.getValue(); + if (rf.isHasRemoteTargets()) { + if (rf.isBroadcastJoin()) { + // for broadcast join, we send at most 3 copy to probers, the first arrival wins. + topParams.getRuntimeFilterParams().putToRuntime_filter_builder_number(rid, 1); + if (jobSpec.isEnablePipeline()) { + rf.setBroadcastGRFSenders(broadcastGRfSenders); + broadcastGRFList.add(rf); + } else { + rf.setSenderFragmentInstanceId(execFragment.getInstances().get(0).getInstanceId()); + } + } else { + topParams.getRuntimeFilterParams() + .putToRuntime_filter_builder_number(rid, execFragment.getInstances().size()); + } + } + } + fragment.setRuntimeFilterMergeNodeAddresses(fragment.getPlanRoot(), mergeHost); + } + topParams.getRuntimeFilterParams().setId_to_prober_params(idToProbePrams); + + broadcastGRFList.forEach(rf -> rf.setBroadcastGRFDestinations( + mergeGRFProbers(broadcastGRFProbersMap.get(rf.getFilterId())))); + + if (connectContext != null) { + SessionVariable sessionVariable = connectContext.getSessionVariable(); + topParams.getRuntimeFilterParams().setRuntime_filter_max_size( + sessionVariable.getGlobalRuntimeFilterBuildMaxSize()); + } + } + + @Override + public Map getChannelIdToBEHTTPMap() { + return coordinatorPreprocessor.getChannelIdToBEHTTPMap(); + } + + @Override + public Map getChannelIdToBEPortMap() { + return coordinatorPreprocessor.getChannelIdToBEPortMap(); + } + + private void updateStatus(Status status, TUniqueId instanceId) { + lock.lock(); + try { + // The query is done and we are just waiting for remote fragments to clean up. + // Ignore their cancelled updates. + if (returnedAllResults && status.isCancelled()) { + return; + } + // nothing to update + if (status.ok()) { + return; + } + + // don't override an error status; also, cancellation has already started + if (!queryStatus.ok()) { + return; + } + + queryStatus.setStatus(status); + LOG.warn( + "one instance report fail throw updateStatus(), need cancel. job id: {}, query id: {}, instance id: {}", + jobSpec.getLoadJobId(), DebugUtil.printId(jobSpec.getQueryId()), + instanceId != null ? DebugUtil.printId(instanceId) : "NaN"); + cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); + } finally { + lock.unlock(); + } + } + + @Override + public RowBatch getNext() throws Exception { + if (receiver == null) { + throw new UserException("There is no receiver."); + } + + RowBatch resultBatch; + Status status = new Status(); + + resultBatch = receiver.getNext(status); + if (!status.ok()) { + connectContext.setErrorCodeOnce(status.getErrorCodeString()); + LOG.warn("get next fail, need cancel. status {}, query id: {}", status, + DebugUtil.printId(jobSpec.getQueryId())); + } + updateStatus(status, null /* no instance id */); + + Status copyStatus; + lock(); + try { + copyStatus = new Status(queryStatus); + } finally { + unlock(); + } + + if (!copyStatus.ok()) { + if (Strings.isNullOrEmpty(copyStatus.getErrorMsg())) { + copyStatus.rewriteErrorMsg(); + } + + if (copyStatus.isRemoteFileNotFound()) { + throw new RemoteFileNotFoundException(copyStatus.getErrorMsg()); + } + + if (copyStatus.isRpcError()) { + throw new RpcException("unknown", copyStatus.getErrorMsg()); + } else { + String errMsg = copyStatus.getErrorMsg(); + LOG.warn("query failed: {}", errMsg); + + // hide host info + int hostIndex = errMsg.indexOf("host"); + if (hostIndex != -1) { + errMsg = errMsg.substring(0, hostIndex); + } + throw new UserException(errMsg); + } + } + + if (resultBatch.isEos()) { + this.returnedAllResults = true; + + // if this query is a block query do not cancel. + long numLimitRows = executionDAG.getRootFragment().getPlanFragment().getPlanRoot().getLimit(); + boolean hasLimit = numLimitRows > 0; + if (!jobSpec.isBlockQuery() && executionDAG.getInstanceIds().size() > 1 && hasLimit && + numReceivedRows >= numLimitRows) { + LOG.debug("no block query, return num >= limit rows, need cancel"); + cancelInternal(PPlanFragmentCancelReason.LIMIT_REACH); + } + } else { + numReceivedRows += resultBatch.getBatch().getRowsSize(); + } + + return resultBatch; + } + + /** + * Cancel execState of query. This includes the execState of the local plan fragment, + * if any, as well as all plan fragments on remote nodes. + */ + @Override + public void cancel(PPlanFragmentCancelReason reason, String message) { + lock(); + try { + if (!queryStatus.ok()) { + // we can't cancel twice + return; + } else { + queryStatus.setStatus(Status.CANCELLED); + queryStatus.setErrorMsg(message); + } + LOG.warn("cancel execState of query, this is outside invoke"); + cancelInternal(reason); + } finally { + try { + // Disable count down profileDoneSignal for collect all backend's profile + // but if backend has crashed, we need count down profileDoneSignal since it will not report by itself + if (message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) { + queryProfile.finishAllInstances(Status.OK); + LOG.info("count down profileDoneSignal since backend has crashed, query id: {}", + DebugUtil.printId(jobSpec.getQueryId())); + } + } finally { + unlock(); + } + } + } + + private void cancelInternal(PPlanFragmentCancelReason cancelReason) { + GlobalStateMgr.getCurrentState().getSlotProvider().cancelSlotRequirement(slot); + if (StringUtils.isEmpty(connectContext.getState().getErrorMessage())) { + connectContext.getState().setError(cancelReason.toString()); + } + if (null != receiver) { + receiver.cancel(); + } + cancelRemoteFragmentsAsync(cancelReason); + if (cancelReason != PPlanFragmentCancelReason.LIMIT_REACH) { + // count down to zero to notify all objects waiting for this + if (!connectContext.isProfileEnabled()) { + queryProfile.finishAllInstances(Status.OK); + } + } + } + + private void cancelRemoteFragmentsAsync(PPlanFragmentCancelReason cancelReason) { + for (FragmentInstanceExecState execState : executionDAG.getExecutions()) { + // If the execState fails to be cancelled, and it has been finished or not been deployed, + // count down the profileDoneSignal of this execState immediately, + // because the profile report will not arrive anymore for the finished or non-deployed execState. + if (!execState.cancelFragmentInstance(cancelReason) && + (!execState.hasBeenDeployed() || execState.isFinished())) { + queryProfile.finishInstance(execState.getInstanceId()); + } + } + + executionDAG.getInstances().stream() + .filter(instance -> executionDAG.getExecution(instance.getIndexInJob()) == null) + .forEach(instance -> queryProfile.finishInstance(instance.getInstanceId())); + } + + @Override + public void updateFragmentExecStatus(TReportExecStatusParams params) { + FragmentInstanceExecState execState = executionDAG.getExecution(params.getBackend_num()); + if (execState == null) { + LOG.warn("unknown backend number: {}, valid backend numbers: {}", params.getBackend_num(), + executionDAG.getExecutionIndexesInJob()); + return; + } + + queryProfile.updateProfile(execState, params); + + lock(); + try { + if (!execState.updateExecStatus(params)) { + return; + } + } finally { + unlock(); + } + + // print fragment instance profile + if (LOG.isDebugEnabled()) { + StringBuilder builder = new StringBuilder(); + execState.printProfile(builder); + LOG.debug("profile for query_id={} instance_id={}\n{}", + DebugUtil.printId(jobSpec.getQueryId()), + DebugUtil.printId(params.getFragment_instance_id()), + builder); + } + + Status status = new Status(params.status); + // for now, abort the query if we see any error except if the error is cancelled + // and returned_all_results_ is true. + // (UpdateStatus() initiates cancellation, if it hasn't already been initiated) + if (!(returnedAllResults && status.isCancelled()) && !status.ok()) { + ConnectContext ctx = connectContext; + if (ctx != null) { + ctx.setErrorCodeOnce(status.getErrorCodeString()); + } + LOG.warn("exec state report failed status={}, query_id={}, instance_id={}", + status, DebugUtil.printId(jobSpec.getQueryId()), + DebugUtil.printId(params.getFragment_instance_id())); + updateStatus(status, params.getFragment_instance_id()); + } + + if (execState.isFinished()) { + lock(); + try { + queryProfile.updateLoadInformation(execState, params); + } finally { + unlock(); + } + + queryProfile.finishInstance(params.getFragment_instance_id()); + } + + updateJobProgress(params); + } + + @Override + public synchronized void updateAuditStatistics(TReportAuditStatisticsParams params) { + PQueryStatistics newAuditStatistics = AuditStatisticsUtil.toProtobuf(params.audit_statistics); + if (auditStatistics == null) { + auditStatistics = newAuditStatistics; + } else { + AuditStatisticsUtil.mergeProtobuf(newAuditStatistics, auditStatistics); + } + } + + private void updateJobProgress(TReportExecStatusParams params) { + if (params.isSetLoad_type()) { + TLoadJobType loadJobType = params.getLoad_type(); + if (loadJobType == TLoadJobType.BROKER || + loadJobType == TLoadJobType.INSERT_QUERY || + loadJobType == TLoadJobType.INSERT_VALUES) { + if (params.isSetSink_load_bytes() && params.isSetSource_load_rows() + && params.isSetSource_load_bytes()) { + GlobalStateMgr.getCurrentState().getLoadMgr().updateJobPrgress( + jobSpec.getLoadJobId(), params); + } + } + } else { + if (params.isSetSink_load_bytes() && params.isSetSource_load_rows() + && params.isSetSource_load_bytes()) { + GlobalStateMgr.getCurrentState().getLoadMgr().updateJobPrgress( + jobSpec.getLoadJobId(), params); + } + } + } + + public void collectProfileSync() { + if (executionDAG.getExecutions().isEmpty()) { + return; + } + + // wait for all backends + if (jobSpec.isNeedReport()) { + int timeout; + // connectContext can be null for broker export task coordinator + if (connectContext != null) { + timeout = connectContext.getSessionVariable().getProfileTimeout(); + } else { + timeout = DEFAULT_PROFILE_TIMEOUT_SECOND; + } + + // Waiting for other fragment instances to finish execState + // Ideally, it should wait indefinitely, but out of defense, set timeout + queryProfile.waitForProfileFinished(timeout, TimeUnit.SECONDS); + } + + lock(); + try { + queryProfile.finalizeProfile(); + } finally { + unlock(); + } + } + + public boolean tryProcessProfileAsync(Consumer task) { + if (executionDAG.getExecutions().isEmpty()) { + return false; + } + if (!jobSpec.isNeedReport()) { + return false; + } + boolean enableAsyncProfile = true; + if (connectContext != null && connectContext.getSessionVariable() != null) { + enableAsyncProfile = connectContext.getSessionVariable().isEnableAsyncProfile(); + } + TUniqueId queryId = null; + if (connectContext != null) { + queryId = connectContext.getExecutionId(); + } + + if (!enableAsyncProfile || !queryProfile.addListener(task)) { + if (enableAsyncProfile) { + LOG.info("Profile task is full, execute in sync mode, query id = {}", DebugUtil.printId(queryId)); + } + collectProfileSync(); + task.accept(false); + return false; + } + return true; + } + + /** + * Waiting the coordinator finish executing. + * return false if waiting timeout. + * return true otherwise. + * NOTICE: return true does not mean that coordinator executed success, + * the caller should check queryStatus for result. + *

+ * We divide the entire waiting process into multiple rounds, + * with a maximum of 5 seconds per round. And after each round of waiting, + * check the status of the BE. If the BE status is abnormal, the wait is ended + * and the result is returned. Otherwise, continue to the next round of waiting. + * This method mainly avoids the problem that the Coordinator waits for a long time + * after some BE can no long return the result due to some exception, such as BE is down. + */ + @Override + public boolean join(int timeoutS) { + final long fixedMaxWaitTime = 5; + + long leftTimeoutS = timeoutS; + while (leftTimeoutS > 0) { + long waitTime = Math.min(leftTimeoutS, fixedMaxWaitTime); + boolean awaitRes = queryProfile.waitForProfileFinished(waitTime, TimeUnit.SECONDS); + if (awaitRes) { + return true; + } + + if (!checkBackendState()) { + return true; + } + + if (ThriftServer.getExecutor() != null + && ThriftServer.getExecutor().getPoolSize() >= Config.thrift_server_max_worker_threads) { + thriftServerHighLoad = true; + } + + leftTimeoutS -= waitTime; + } + return false; + } + + @Override + public RuntimeProfile buildQueryProfile(boolean needMerge) { + return queryProfile.buildQueryProfile(needMerge); + } + + /** + * Check the state of backends in needCheckBackendExecStates. + * return true if all of them are OK. Otherwise, return false. + */ + @Override + public boolean checkBackendState() { + for (FragmentInstanceExecState execState : executionDAG.getNeedCheckExecutions()) { + if (!execState.isBackendStateHealthy()) { + queryStatus = new Status(TStatusCode.INTERNAL_ERROR, + "backend " + execState.getWorker().getId() + " is down"); + return false; + } + } + return true; + } + + @Override + public boolean isDone() { + return queryProfile.isFinished(); + } + + @Override + public boolean isEnableLoadProfile() { + return connectContext != null && connectContext.getSessionVariable().isEnableLoadProfile(); + } + + /** + * Get information of each fragment instance. + * + * @return the fragment instance information list, consistent with the fragment index of {@code EXPLAIN}. + */ + @Override + public List getFragmentInstanceInfos() { + return executionDAG.getFragmentInstanceInfos(); + } + + @Override + public PQueryStatistics getAuditStatistics() { + return auditStatistics; + } + + @Override + public boolean isThriftServerHighLoad() { + return this.thriftServerHighLoad; + } + + @Override + public boolean isProfileAlreadyReported() { + return this.queryProfile.isProfileAlreadyReported(); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 2e2990a341ed5..17161e9512c91 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -368,6 +368,23 @@ public static MaterializedViewRewriteMode parse(String str) { public static final String ENABLE_MATERIALIZED_VIEW_PLAN_CACHE = "enable_materialized_view_plan_cache"; +<<<<<<< HEAD +======= + public static final String ENABLE_BIG_QUERY_LOG = "enable_big_query_log"; + public static final String BIG_QUERY_LOG_CPU_SECOND_THRESHOLD = "big_query_log_cpu_second_threshold"; + public static final String BIG_QUERY_LOG_SCAN_BYTES_THRESHOLD = "big_query_log_scan_bytes_threshold"; + public static final String BIG_QUERY_LOG_SCAN_ROWS_THRESHOLD = "big_query_log_scan_rows_threshold"; + public static final String BIG_QUERY_PROFILE_SECOND_THRESHOLD = "big_query_profile_second_threshold"; + + public static final String SQL_DIALECT = "sql_dialect"; + + public static final String ENABLE_OUTER_JOIN_REORDER = "enable_outer_join_reorder"; + + public static final String CBO_REORDER_THRESHOLD_USE_EXHAUSTIVE = "cbo_reorder_threshold_use_exhaustive"; + public static final String ENABLE_REWRITE_SUM_BY_ASSOCIATIVE_RULE = "enable_rewrite_sum_by_associative_rule"; + public static final String ENABLE_REWRITE_SIMPLE_AGG_TO_META_SCAN = "enable_rewrite_simple_agg_to_meta_scan"; + +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) public static final String ENABLE_PRUNE_COMPLEX_TYPES = "enable_prune_complex_types"; public static final String GROUP_CONCAT_MAX_LEN = "group_concat_max_len"; @@ -667,6 +684,15 @@ public static MaterializedViewRewriteMode parse(String str) { @VariableMgr.VarAttr(name = PIPELINE_PROFILE_LEVEL) private int pipelineProfileLevel = 1; +<<<<<<< HEAD +======= + @VariableMgr.VarAttr(name = ENABLE_ASYNC_PROFILE, flag = VariableMgr.INVISIBLE) + private boolean enableAsyncProfile = true; + + @VariableMgr.VarAttr(name = BIG_QUERY_PROFILE_SECOND_THRESHOLD) + private int bigQueryProfileSecondThreshold = 0; + +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) @VariableMgr.VarAttr(name = RESOURCE_GROUP_ID, alias = RESOURCE_GROUP_ID_V2, show = RESOURCE_GROUP_ID_V2, flag = VariableMgr.INVISIBLE) private int resourceGroupId = 0; @@ -1165,6 +1191,25 @@ public void setEnableProfile(boolean enableProfile) { this.enableProfile = enableProfile; } +<<<<<<< HEAD +======= + public boolean isEnableLoadProfile() { + return enableLoadProfile; + } + + public void setEnableLoadProfile(boolean enableLoadProfile) { + this.enableLoadProfile = enableLoadProfile; + } + + public boolean isEnableBigQueryProfile() { + return bigQueryProfileSecondThreshold > 0; + } + + public int getBigQueryProfileSecondThreshold() { + return bigQueryProfileSecondThreshold; + } + +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) public int getWaitTimeoutS() { return waitTimeout; } @@ -1918,7 +1963,12 @@ public TQueryOptions toThrift() { tResult.setQuery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryTimeoutS)); tResult.setQuery_delivery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryDeliveryTimeoutS)); tResult.setEnable_profile(enableProfile); +<<<<<<< HEAD tResult.setCodegen_level(0); +======= + tResult.setBig_query_profile_second_threshold(bigQueryProfileSecondThreshold); + tResult.setRuntime_profile_report_interval(runtimeProfileReportInterval); +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) tResult.setBatch_size(chunkSize); tResult.setDisable_stream_preaggregations(disableStreamPreaggregations); tResult.setLoad_mem_limit(loadMemLimit); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index f668d86b0f653..91568f2a0b8b7 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -482,8 +482,25 @@ public void execute() throws Exception { throw e; } } finally { +<<<<<<< HEAD if (!needRetry && context.getSessionVariable().isEnableProfile()) { writeProfile(beginTimeInNanoSecond); +======= + boolean isAsync = false; + if (!needRetry && context.isProfileEnabled()) { + isAsync = tryProcessProfileAsync(execPlan); + if (parsedStmt.isExplain() && + StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel())) { + handleExplainStmt(ExplainAnalyzer.analyze( + ProfilingExecPlan.buildFrom(execPlan), profile, null)); + } + } + if (isAsync) { + QeProcessorImpl.INSTANCE.monitorQuery(context.getExecutionId(), System.currentTimeMillis() + + context.getSessionVariable().getProfileTimeout() * 1000L); + } else { + QeProcessorImpl.INSTANCE.unregisterQuery(context.getExecutionId()); +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) } QeProcessorImpl.INSTANCE.unregisterQuery(context.getExecutionId()); } @@ -638,6 +655,43 @@ private void writeProfile(long beginTimeInNanoSecond) { if (context.getQueryDetail() != null) { context.getQueryDetail().setProfile(profileContent); } +<<<<<<< HEAD +======= + // This process will get information from the context, so it must be executed synchronously. + // Otherwise, the context may be changed, for example, containing the wrong query id. + profile = buildTopLevelProfile(); + + long profileCollectStartTime = System.currentTimeMillis(); + long startTime = context.getStartTime(); + TUniqueId executionId = context.getExecutionId(); + QueryDetail queryDetail = context.getQueryDetail(); + boolean needMerge = context.needMergeProfile(); + + // DO NOT use context int the async task, because the context is shared among consecutive queries. + // profile of query1 maybe executed when query2 is under execution. + Consumer task = (Boolean isAsync) -> { + RuntimeProfile summaryProfile = profile.getChild("Summary"); + summaryProfile.addInfoString(ProfileManager.PROFILE_COLLECT_TIME, + DebugUtil.getPrettyStringMs(System.currentTimeMillis() - profileCollectStartTime)); + summaryProfile.addInfoString("IsProfileAsync", String.valueOf(isAsync)); + profile.addChild(coord.buildQueryProfile(needMerge)); + + // Update TotalTime to include the Profile Collect Time and the time to build the profile. + long now = System.currentTimeMillis(); + long totalTimeMs = now - startTime; + summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(now)); + summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs)); + + ProfilingExecPlan profilingPlan = plan == null ? null : plan.getProfilingPlan(); + String profileContent = ProfileManager.getInstance().pushProfile(profilingPlan, profile); + if (queryDetail != null) { + queryDetail.setProfile(profileContent); + } + QeProcessorImpl.INSTANCE.unMonitorQuery(executionId); + QeProcessorImpl.INSTANCE.unregisterQuery(executionId); + }; + return coord.tryProcessProfileAsync(task); +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) } // Analyze one statement to structure in memory. @@ -1281,8 +1335,24 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { LOG.warn("DML statement(" + originStmt.originStmt + ") process failed.", t); throw t; } finally { +<<<<<<< HEAD if (context.getSessionVariable().isEnableProfile()) { writeProfile(beginTimeInNanoSecond); +======= + boolean isAsync = false; + if (context.isProfileEnabled()) { + isAsync = tryProcessProfileAsync(execPlan); + if (parsedStmt.isExplain() && + StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel())) { + handleExplainStmt(ExplainAnalyzer.analyze(ProfilingExecPlan.buildFrom(execPlan), profile, null)); + } + } + if (isAsync) { + QeProcessorImpl.INSTANCE.monitorQuery(context.getExecutionId(), System.currentTimeMillis() + + context.getSessionVariable().getProfileTimeout() * 1000L); + } else { + QeProcessorImpl.INSTANCE.unregisterQuery(context.getExecutionId()); +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) } QeProcessorImpl.INSTANCE.unregisterQuery(context.getExecutionId()); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java new file mode 100644 index 0000000000000..2d2d3255cde0d --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java @@ -0,0 +1,213 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe.scheduler; + +import com.starrocks.analysis.DescriptorTable; +import com.starrocks.common.Status; +import com.starrocks.common.util.RuntimeProfile; +import com.starrocks.planner.PlanFragment; +import com.starrocks.planner.ScanNode; +import com.starrocks.planner.StreamLoadPlanner; +import com.starrocks.proto.PPlanFragmentCancelReason; +import com.starrocks.proto.PQueryStatistics; +import com.starrocks.qe.ConnectContext; +import com.starrocks.qe.QueryStatisticsItem; +import com.starrocks.qe.RowBatch; +import com.starrocks.qe.scheduler.slot.LogicalSlot; +import com.starrocks.sql.LoadPlanner; +import com.starrocks.sql.plan.ExecPlan; +import com.starrocks.thrift.TDescriptorTable; +import com.starrocks.thrift.TLoadJobType; +import com.starrocks.thrift.TNetworkAddress; +import com.starrocks.thrift.TReportAuditStatisticsParams; +import com.starrocks.thrift.TReportExecStatusParams; +import com.starrocks.thrift.TSinkCommitInfo; +import com.starrocks.thrift.TTabletCommitInfo; +import com.starrocks.thrift.TTabletFailInfo; +import com.starrocks.thrift.TUniqueId; + +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public abstract class Coordinator { + public interface Factory { + Coordinator createQueryScheduler(ConnectContext context, + List fragments, + List scanNodes, + TDescriptorTable descTable); + + Coordinator createInsertScheduler(ConnectContext context, + List fragments, + List scanNodes, + TDescriptorTable descTable); + + Coordinator createBrokerLoadScheduler(LoadPlanner loadPlanner); + + Coordinator createStreamLoadScheduler(LoadPlanner loadPlanner); + + Coordinator createSyncStreamLoadScheduler(StreamLoadPlanner planner, TNetworkAddress address); + + Coordinator createNonPipelineBrokerLoadScheduler(Long jobId, TUniqueId queryId, DescriptorTable descTable, + List fragments, + List scanNodes, String timezone, long startTime, + Map sessionVariables, + ConnectContext context, long execMemLimit); + + Coordinator createBrokerExportScheduler(Long jobId, TUniqueId queryId, DescriptorTable descTable, + List fragments, + List scanNodes, String timezone, long startTime, + Map sessionVariables, + long execMemLimit); + } + + // ------------------------------------------------------------------------------------ + // Common methods for scheduling. + // ------------------------------------------------------------------------------------ + + public void exec() throws Exception { + startScheduling(); + } + + /** + * Start scheduling fragments of this job, mainly containing the following work: + *

    + *
  • Instantiates multiple parallel instances of each fragment. + *
  • Assigns these fragment instances to appropriate workers (including backends and compute nodes). + *
  • Deploys them to the related workers, if the parameter {@code needDeploy} is true. + *
+ *

+ * + * @param needDeploy Whether deploying fragment instances to workers. + */ + public abstract void startScheduling(boolean needDeploy) throws Exception; + + public void startScheduling() throws Exception { + startScheduling(true); + } + + public void startSchedulingWithoutDeploy() throws Exception { + startScheduling(false); + } + + public abstract String getSchedulerExplain(); + + public abstract void updateFragmentExecStatus(TReportExecStatusParams params); + + public abstract void updateAuditStatistics(TReportAuditStatisticsParams params); + + public void cancel(String cancelledMessage) { + cancel(PPlanFragmentCancelReason.USER_CANCEL, cancelledMessage); + } + + public abstract void cancel(PPlanFragmentCancelReason reason, String message); + + public abstract void onFinished(); + + public abstract LogicalSlot getSlot(); + + // ------------------------------------------------------------------------------------ + // Methods for query. + // ------------------------------------------------------------------------------------ + + public abstract RowBatch getNext() throws Exception; + + // ------------------------------------------------------------------------------------ + // Methods for load. + // ------------------------------------------------------------------------------------ + + public abstract boolean join(int timeoutSecond); + + public abstract boolean checkBackendState(); + + public abstract boolean isThriftServerHighLoad(); + + public abstract void setLoadJobType(TLoadJobType type); + + public abstract long getLoadJobId(); + + public abstract void setLoadJobId(Long jobId); + + public abstract Map getChannelIdToBEHTTPMap(); + + public abstract Map getChannelIdToBEPortMap(); + + public abstract boolean isEnableLoadProfile(); + + public abstract void clearExportStatus(); + + // ------------------------------------------------------------------------------------ + // Methods for profile. + // ------------------------------------------------------------------------------------ + + public abstract void collectProfileSync(); + + public abstract boolean tryProcessProfileAsync(Consumer task); + + public abstract void setTopProfileSupplier(Supplier topProfileSupplier); + + public abstract void setExecPlan(ExecPlan execPlan); + + public abstract RuntimeProfile buildQueryProfile(boolean needMerge); + + public abstract RuntimeProfile getQueryProfile(); + + public abstract List getDeltaUrls(); + + public abstract Map getLoadCounters(); + + public abstract List getFailInfos(); + + public abstract List getCommitInfos(); + + public abstract List getSinkCommitInfos(); + + public abstract List getExportFiles(); + + public abstract String getTrackingUrl(); + + public abstract List getRejectedRecordPaths(); + + public abstract List getFragmentInstanceInfos(); + + // ------------------------------------------------------------------------------------ + // Methods for audit. + // ------------------------------------------------------------------------------------ + public abstract PQueryStatistics getAuditStatistics(); + + // ------------------------------------------------------------------------------------ + // Common methods. + // ------------------------------------------------------------------------------------ + + public abstract Status getExecStatus(); + + public abstract boolean isUsingBackend(Long backendID); + + public abstract boolean isDone(); + + public abstract TUniqueId getQueryId(); + + public abstract void setQueryId(TUniqueId queryId); + + public abstract List getScanNodes(); + + public abstract long getStartTimeMs(); + + public abstract void setTimeoutSecond(int timeoutSecond); + + public abstract boolean isProfileAlreadyReported(); + +} diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java new file mode 100644 index 0000000000000..7c32d7d8c7693 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java @@ -0,0 +1,570 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe.scheduler; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.starrocks.common.Config; +import com.starrocks.common.Pair; +import com.starrocks.common.Status; +import com.starrocks.common.ThreadPoolManager; +import com.starrocks.common.util.Counter; +import com.starrocks.common.util.DebugUtil; +import com.starrocks.common.util.ProfileManager; +import com.starrocks.common.util.ProfilingExecPlan; +import com.starrocks.common.util.RuntimeProfile; +import com.starrocks.common.util.concurrent.MarkedCountDownLatch; +import com.starrocks.load.loadv2.LoadJob; +import com.starrocks.qe.ConnectContext; +import com.starrocks.qe.SessionVariable; +import com.starrocks.qe.scheduler.dag.FragmentInstanceExecState; +import com.starrocks.qe.scheduler.dag.JobSpec; +import com.starrocks.sql.plan.ExecPlan; +import com.starrocks.task.LoadEtlTask; +import com.starrocks.thrift.TReportExecStatusParams; +import com.starrocks.thrift.TSinkCommitInfo; +import com.starrocks.thrift.TTabletCommitInfo; +import com.starrocks.thrift.TTabletFailInfo; +import com.starrocks.thrift.TUniqueId; +import com.starrocks.thrift.TUnit; +import org.apache.commons.collections.CollectionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class QueryRuntimeProfile { + private static final Logger LOG = LogManager.getLogger(QueryRuntimeProfile.class); + + /** + * Set the queue size to a large value. The decision to execute the profile process task asynchronously + * occurs when a listener is added to {@link QueryRuntimeProfile#profileDoneSignal}. The function + * {@link QueryRuntimeProfile#addListener} will then determine if the size of the queued task exceeds + * {@link Config#profile_process_blocking_queue_size}. + */ + private static final ThreadPoolExecutor EXECUTOR = + ThreadPoolManager.newDaemonFixedThreadPool(Config.profile_process_threads_num, + Integer.MAX_VALUE, "profile-worker", false); + + /** + * The value is meaningless, and it is just used as a value placeholder of {@link MarkedCountDownLatch}. + */ + private static final Long MARKED_COUNT_DOWN_VALUE = -1L; + + private final JobSpec jobSpec; + + private final ConnectContext connectContext; + + /** + * True indicates that the profile has been reported. + *

When {@link SessionVariable#isEnableLoadProfile()} is enabled, + * if the time costs of stream load is less than {@link Config#stream_load_profile_collect_second}, + * the profile will not be reported to FE to reduce the overhead of profile under high-frequency import + */ + private boolean profileAlreadyReported = false; + + private final RuntimeProfile queryProfile; + private final List fragmentProfiles; + + /** + * The number of instances of this query. + *

It is equal to the number of backends executing plan fragments on behalf of this query. + */ + private MarkedCountDownLatch profileDoneSignal = null; + + private Supplier topProfileSupplier; + private ExecPlan execPlan; + private final AtomicLong lastRuntimeProfileUpdateTime = new AtomicLong(System.currentTimeMillis()); + + // ------------------------------------------------------------------------------------ + // Fields for load. + // ------------------------------------------------------------------------------------ + private final List deltaUrls = Lists.newArrayList(); + private final Map loadCounters = Maps.newHashMap(); + private String trackingUrl = ""; + private final Set rejectedRecordPaths = Sets.newHashSet(); + + // ------------------------------------------------------------------------------------ + // Fields for export. + // ------------------------------------------------------------------------------------ + private final List exportFiles = Lists.newArrayList(); + private final List commitInfos = Lists.newArrayList(); + private final List failInfos = Lists.newArrayList(); + + // ------------------------------------------------------------------------------------ + // Fields for external table sink + // ------------------------------------------------------------------------------------ + private final List sinkCommitInfos = Lists.newArrayList(); + + public QueryRuntimeProfile(ConnectContext connectContext, + JobSpec jobSpec, + int numFragments) { + this.connectContext = connectContext; + this.jobSpec = jobSpec; + + this.queryProfile = new RuntimeProfile("Execution"); + this.fragmentProfiles = new ArrayList<>(numFragments); + for (int i = 0; i < numFragments; i++) { + RuntimeProfile profile = new RuntimeProfile("Fragment " + i); + fragmentProfiles.add(profile); + queryProfile.addChild(profile); + } + } + + public List getDeltaUrls() { + return deltaUrls; + } + + public Map getLoadCounters() { + return loadCounters; + } + + public String getTrackingUrl() { + return trackingUrl; + } + + public List getRejectedRecordPaths() { + return new ArrayList<>(rejectedRecordPaths); + } + + public List getCommitInfos() { + return commitInfos; + } + + public List getFailInfos() { + return failInfos; + } + + public List getSinkCommitInfos() { + return sinkCommitInfos; + } + + public List getExportFiles() { + return exportFiles; + } + + public boolean isProfileAlreadyReported() { + return profileAlreadyReported; + } + + public void setTopProfileSupplier(Supplier topProfileSupplier) { + this.topProfileSupplier = topProfileSupplier; + } + + public void setExecPlan(ExecPlan execPlan) { + this.execPlan = execPlan; + } + + public void clearExportStatus() { + exportFiles.clear(); + } + + public void attachInstances(Collection instanceIds) { + // to keep things simple, make async Cancel() calls wait until plan fragment + // execution has been initiated, otherwise we might try to cancel fragment + // execution at backends where it hasn't even started + profileDoneSignal = new MarkedCountDownLatch<>(instanceIds.size()); + instanceIds.forEach(instanceId -> profileDoneSignal.addMark(instanceId, MARKED_COUNT_DOWN_VALUE)); + } + + public void attachExecutionProfiles(Collection executions) { + for (FragmentInstanceExecState execState : executions) { + if (!execState.computeTimeInProfile(fragmentProfiles.size())) { + return; + } + fragmentProfiles.get(execState.getFragmentIndex()).addChild(execState.getProfile()); + } + } + + public void finishInstance(TUniqueId instanceId) { + if (profileDoneSignal != null) { + profileDoneSignal.markedCountDown(instanceId, MARKED_COUNT_DOWN_VALUE); + } + } + + public void finishAllInstances(Status status) { + if (profileDoneSignal != null) { + profileDoneSignal.countDownToZero(status); + LOG.info("unfinished instances: {}", getUnfinishedInstanceIds()); + } + } + + public boolean isFinished() { + return profileDoneSignal.getCount() == 0; + } + + public boolean addListener(Consumer task) { + if (EXECUTOR.getQueue().size() > Config.profile_process_blocking_queue_size) { + return false; + } + // We need to make sure this submission won't be rejected by set the queue size to Integer.MAX_VALUE + profileDoneSignal.addListener(() -> EXECUTOR.submit(() -> { + task.accept(true); + })); + return true; + } + + public boolean waitForProfileFinished(long timeout, TimeUnit unit) { + boolean res = false; + try { + res = profileDoneSignal.await(timeout, unit); + if (!res) { + LOG.warn("failed to get profile within {} seconds", timeout); + } + } catch (InterruptedException e) { // NOSONAR + LOG.warn("profile signal await error", e); + } + + return res; + } + + public RuntimeProfile getQueryProfile() { + return queryProfile; + } + + public void updateProfile(FragmentInstanceExecState execState, TReportExecStatusParams params) { + if (params.isSetProfile()) { + profileAlreadyReported = true; + } + + // Update runtime profile when query is still in process. + // + // We need to export profile to ProfileManager before update this profile, because: + // Each fragment instance will report its state based on their on own timer, and basically, these + // timers are consistent. So we can assume that all the instances will report profile in a very short + // time range, if we choose to export the profile to profile manager after update this instance's profile, + // the whole profile may include the information from the previous report, except for the current instance, + // which leads to inconsistency. + // + // So the profile update strategy looks like this: During a short time interval, each instance will report + // its execState information. However, when receiving the information reported by the first instance of the + // current batch, the previous reported state will be synchronized to the profile manager. + long now = System.currentTimeMillis(); + long lastTime = lastRuntimeProfileUpdateTime.get(); + if (topProfileSupplier != null && execPlan != null && connectContext != null && + connectContext.isProfileEnabled() && + // If it's the last done report, avoiding duplicate trigger + (!execState.isFinished() || profileDoneSignal.getLeftMarks().size() > 1) && + // Interval * 0.95 * 1000 to allow a certain range of deviation + now - lastTime > (connectContext.getSessionVariable().getRuntimeProfileReportInterval() * 950L) && + lastRuntimeProfileUpdateTime.compareAndSet(lastTime, now)) { + RuntimeProfile profile = topProfileSupplier.get(); + ExecPlan plan = execPlan; + profile.addChild(buildQueryProfile(connectContext.needMergeProfile())); + ProfilingExecPlan profilingPlan = plan == null ? null : plan.getProfilingPlan(); + ProfileManager.getInstance().pushProfile(profilingPlan, profile); + } + } + + public void finalizeProfile() { + fragmentProfiles.forEach(RuntimeProfile::sortChildren); + } + + public void updateLoadInformation(FragmentInstanceExecState execState, TReportExecStatusParams params) { + if (params.isSetDelta_urls()) { + deltaUrls.addAll(params.getDelta_urls()); + } + if (params.isSetLoad_counters()) { + updateLoadCounters(params.getLoad_counters()); + } + if (params.isSetTracking_url()) { + trackingUrl = params.tracking_url; + } + if (params.isSetExport_files()) { + exportFiles.addAll(params.getExport_files()); + } + if (params.isSetCommitInfos()) { + commitInfos.addAll(params.getCommitInfos()); + } + if (params.isSetFailInfos()) { + failInfos.addAll(params.getFailInfos()); + } + if (params.isSetRejected_record_path()) { + rejectedRecordPaths.add(execState.getAddress().getHostname() + ":" + params.getRejected_record_path()); + } + if (params.isSetSink_commit_infos()) { + sinkCommitInfos.addAll(params.getSink_commit_infos()); + } + } + + public RuntimeProfile buildQueryProfile(boolean needMerge) { + if (!needMerge || !jobSpec.isEnablePipeline()) { + return queryProfile; + } + + RuntimeProfile newQueryProfile = new RuntimeProfile(queryProfile.getName()); + long start = System.nanoTime(); + newQueryProfile.copyAllInfoStringsFrom(queryProfile, null); + newQueryProfile.copyAllCountersFrom(queryProfile); + + long maxQueryCumulativeCpuTime = 0; + long maxQueryPeakMemoryUsage = 0; + long maxQueryExecutionWallTime = 0; + long maxQuerySpillBytes = 0; + + List newFragmentProfiles = Lists.newArrayList(); + for (RuntimeProfile fragmentProfile : fragmentProfiles) { + RuntimeProfile newFragmentProfile = new RuntimeProfile(fragmentProfile.getName()); + newFragmentProfiles.add(newFragmentProfile); + newFragmentProfile.copyAllInfoStringsFrom(fragmentProfile, null); + newFragmentProfile.copyAllCountersFrom(fragmentProfile); + + if (fragmentProfile.getChildList().isEmpty()) { + continue; + } + + List instanceProfiles = fragmentProfile.getChildList().stream() + .map(pair -> pair.first) + .collect(Collectors.toList()); + + Set backendAddresses = Sets.newHashSet(); + Set instanceIds = Sets.newHashSet(); + Set missingInstanceIds = Sets.newHashSet(); + for (RuntimeProfile instanceProfile : instanceProfiles) { + // Setup backend meta infos + backendAddresses.add(instanceProfile.getInfoString("Address")); + instanceIds.add(instanceProfile.getInfoString("InstanceId")); + if (CollectionUtils.isEmpty(instanceProfile.getChildList())) { + missingInstanceIds.add(instanceProfile.getInfoString("InstanceId")); + } + + // Get query level peak memory usage, cpu cost, wall time + Counter toBeRemove = instanceProfile.getCounter("QueryCumulativeCpuTime"); + if (toBeRemove != null) { + maxQueryCumulativeCpuTime = Math.max(maxQueryCumulativeCpuTime, toBeRemove.getValue()); + } + instanceProfile.removeCounter("QueryCumulativeCpuTime"); + + toBeRemove = instanceProfile.getCounter("QueryPeakMemoryUsage"); + if (toBeRemove != null) { + maxQueryPeakMemoryUsage = Math.max(maxQueryPeakMemoryUsage, toBeRemove.getValue()); + } + instanceProfile.removeCounter("QueryPeakMemoryUsage"); + + toBeRemove = instanceProfile.getCounter("QueryExecutionWallTime"); + if (toBeRemove != null) { + maxQueryExecutionWallTime = Math.max(maxQueryExecutionWallTime, toBeRemove.getValue()); + } + instanceProfile.removeCounter("QueryExecutionWallTime"); + + toBeRemove = instanceProfile.getCounter("QuerySpillBytes"); + if (toBeRemove != null) { + maxQuerySpillBytes = Math.max(maxQuerySpillBytes, toBeRemove.getValue()); + } + instanceProfile.removeCounter("QuerySpillBytes"); + } + newFragmentProfile.addInfoString("BackendAddresses", String.join(",", backendAddresses)); + newFragmentProfile.addInfoString("InstanceIds", String.join(",", instanceIds)); + if (!missingInstanceIds.isEmpty()) { + newFragmentProfile.addInfoString("MissingInstanceIds", String.join(",", missingInstanceIds)); + } + Counter backendNum = newFragmentProfile.addCounter("BackendNum", TUnit.UNIT, null); + backendNum.setValue(backendAddresses.size()); + + // Setup number of instance + Counter counter = newFragmentProfile.addCounter("InstanceNum", TUnit.UNIT, null); + counter.setValue(instanceProfiles.size()); + + RuntimeProfile mergedInstanceProfile = + RuntimeProfile.mergeIsomorphicProfiles(instanceProfiles, Sets.newHashSet("Address", "InstanceId")); + Preconditions.checkState(mergedInstanceProfile != null); + + newFragmentProfile.copyAllInfoStringsFrom(mergedInstanceProfile, null); + newFragmentProfile.copyAllCountersFrom(mergedInstanceProfile); + + mergedInstanceProfile.getChildList().forEach(pair -> { + RuntimeProfile pipelineProfile = pair.first; + setOperatorStatus(pipelineProfile); + newFragmentProfile.addChild(pipelineProfile); + }); + + newQueryProfile.addChild(newFragmentProfile); + } + + // Remove redundant MIN/MAX metrics if MIN and MAX are identical + for (RuntimeProfile fragmentProfile : newFragmentProfiles) { + RuntimeProfile.removeRedundantMinMaxMetrics(fragmentProfile); + } + + long queryAllocatedMemoryUsage = 0; + long queryDeallocatedMemoryUsage = 0; + // Calculate ExecutionTotalTime, which comprising all operator's sync time and async time + // We can get Operator's sync time from OperatorTotalTime, and for async time, only ScanOperator and + // ExchangeOperator have async operations, we can get async time from ScanTime(for ScanOperator) and + // NetworkTime(for ExchangeOperator) + long queryCumulativeOperatorTime = 0; + long queryCumulativeScanTime = 0; + long queryCumulativeNetworkTime = 0; + long maxScheduleTime = 0; + for (RuntimeProfile fragmentProfile : newFragmentProfiles) { + Counter instanceAllocatedMemoryUsage = fragmentProfile.getCounter("InstanceAllocatedMemoryUsage"); + if (instanceAllocatedMemoryUsage != null) { + queryAllocatedMemoryUsage += instanceAllocatedMemoryUsage.getValue(); + } + Counter instanceDeallocatedMemoryUsage = fragmentProfile.getCounter("InstanceDeallocatedMemoryUsage"); + if (instanceDeallocatedMemoryUsage != null) { + queryDeallocatedMemoryUsage += instanceDeallocatedMemoryUsage.getValue(); + } + + for (Pair pipelineProfilePair : fragmentProfile.getChildList()) { + RuntimeProfile pipelineProfile = pipelineProfilePair.first; + Counter scheduleTime = pipelineProfile.getMaxCounter("ScheduleTime"); + if (scheduleTime != null) { + maxScheduleTime = Math.max(maxScheduleTime, scheduleTime.getValue()); + } + for (Pair operatorProfilePair : pipelineProfile.getChildList()) { + RuntimeProfile operatorProfile = operatorProfilePair.first; + RuntimeProfile commonMetrics = operatorProfile.getChild("CommonMetrics"); + RuntimeProfile uniqueMetrics = operatorProfile.getChild("UniqueMetrics"); + if (commonMetrics == null || uniqueMetrics == null) { + continue; + } + + if (commonMetrics.containsInfoString("IsFinalSink")) { + long resultDeliverTime = 0; + Counter outputFullTime = pipelineProfile.getMaxCounter("OutputFullTime"); + if (outputFullTime != null) { + resultDeliverTime += outputFullTime.getValue(); + } + Counter pendingFinishTime = pipelineProfile.getMaxCounter("PendingFinishTime"); + if (pendingFinishTime != null) { + resultDeliverTime += pendingFinishTime.getValue(); + } + Counter resultDeliverTimer = + newQueryProfile.addCounter("ResultDeliverTime", TUnit.TIME_NS, null); + resultDeliverTimer.setValue(resultDeliverTime); + } + + Counter operatorTotalTime = commonMetrics.getMaxCounter("OperatorTotalTime"); + Preconditions.checkNotNull(operatorTotalTime); + queryCumulativeOperatorTime += operatorTotalTime.getValue(); + + Counter scanTime = uniqueMetrics.getMaxCounter("ScanTime"); + if (scanTime != null) { + queryCumulativeScanTime += scanTime.getValue(); + queryCumulativeOperatorTime += scanTime.getValue(); + } + + Counter networkTime = uniqueMetrics.getMaxCounter("NetworkTime"); + if (networkTime != null) { + queryCumulativeNetworkTime += networkTime.getValue(); + queryCumulativeOperatorTime += networkTime.getValue(); + } + } + } + } + Counter queryAllocatedMemoryUsageCounter = + newQueryProfile.addCounter("QueryAllocatedMemoryUsage", TUnit.BYTES, null); + queryAllocatedMemoryUsageCounter.setValue(queryAllocatedMemoryUsage); + Counter queryDeallocatedMemoryUsageCounter = + newQueryProfile.addCounter("QueryDeallocatedMemoryUsage", TUnit.BYTES, null); + queryDeallocatedMemoryUsageCounter.setValue(queryDeallocatedMemoryUsage); + Counter queryCumulativeOperatorTimer = + newQueryProfile.addCounter("QueryCumulativeOperatorTime", TUnit.TIME_NS, null); + queryCumulativeOperatorTimer.setValue(queryCumulativeOperatorTime); + Counter queryCumulativeScanTimer = + newQueryProfile.addCounter("QueryCumulativeScanTime", TUnit.TIME_NS, null); + queryCumulativeScanTimer.setValue(queryCumulativeScanTime); + Counter queryCumulativeNetworkTimer = + newQueryProfile.addCounter("QueryCumulativeNetworkTime", TUnit.TIME_NS, null); + queryCumulativeNetworkTimer.setValue(queryCumulativeNetworkTime); + Counter queryPeakScheduleTime = newQueryProfile.addCounter("QueryPeakScheduleTime", TUnit.TIME_NS, null); + queryPeakScheduleTime.setValue(maxScheduleTime); + newQueryProfile.getCounterTotalTime().setValue(0); + + Counter queryCumulativeCpuTime = newQueryProfile.addCounter("QueryCumulativeCpuTime", TUnit.TIME_NS, null); + queryCumulativeCpuTime.setValue(maxQueryCumulativeCpuTime); + Counter queryPeakMemoryUsage = newQueryProfile.addCounter("QueryPeakMemoryUsage", TUnit.BYTES, null); + queryPeakMemoryUsage.setValue(maxQueryPeakMemoryUsage); + Counter queryExecutionWallTime = newQueryProfile.addCounter("QueryExecutionWallTime", TUnit.TIME_NS, null); + queryExecutionWallTime.setValue(maxQueryExecutionWallTime); + Counter querySpillBytes = newQueryProfile.addCounter("QuerySpillBytes", TUnit.BYTES, null); + querySpillBytes.setValue(maxQuerySpillBytes); + + if (execPlan != null) { + newQueryProfile.addInfoString("Topology", execPlan.getProfilingPlan().toTopologyJson()); + } + Counter processTimer = + newQueryProfile.addCounter("FrontendProfileMergeTime", TUnit.TIME_NS, null); + processTimer.setValue(System.nanoTime() - start); + + return newQueryProfile; + } + + private List getUnfinishedInstanceIds() { + return profileDoneSignal.getLeftMarks().stream() + .map(Map.Entry::getKey) + .map(DebugUtil::printId) + .collect(Collectors.toList()); + } + + private void setOperatorStatus(RuntimeProfile pipelineProfile) { + for (Pair child : pipelineProfile.getChildList()) { + RuntimeProfile operatorProfile = child.first; + RuntimeProfile commonMetrics = operatorProfile.getChild("CommonMetrics"); + Preconditions.checkNotNull(commonMetrics); + + if (commonMetrics.containsInfoString("IsChild")) { + continue; + } + + Counter closeTime = commonMetrics.getCounter("CloseTime"); + Counter minCloseTime = commonMetrics.getCounter("__MIN_OF_CloseTime"); + if (closeTime != null && closeTime.getValue() == 0 || + minCloseTime != null && minCloseTime.getValue() == 0) { + commonMetrics.addInfoString("IsRunning", ""); + } + } + } + + private void updateLoadCounters(Map newLoadCounters) { + long numRowsNormal = getCounterLongValueOrDefault(loadCounters, LoadEtlTask.DPP_NORMAL_ALL, 0L); + long numRowsAbnormal = getCounterLongValueOrDefault(loadCounters, LoadEtlTask.DPP_ABNORMAL_ALL, 0L); + long numRowsUnselected = getCounterLongValueOrDefault(loadCounters, LoadJob.UNSELECTED_ROWS, 0L); + long numLoadBytesTotal = getCounterLongValueOrDefault(loadCounters, LoadJob.LOADED_BYTES, 0L); + + // new load counters + numRowsNormal += getCounterLongValueOrDefault(newLoadCounters, LoadEtlTask.DPP_NORMAL_ALL, 0L); + numRowsAbnormal += getCounterLongValueOrDefault(newLoadCounters, LoadEtlTask.DPP_ABNORMAL_ALL, 0L); + numRowsUnselected += getCounterLongValueOrDefault(newLoadCounters, LoadJob.UNSELECTED_ROWS, 0L); + numLoadBytesTotal += getCounterLongValueOrDefault(newLoadCounters, LoadJob.LOADED_BYTES, 0L); + + this.loadCounters.put(LoadEtlTask.DPP_NORMAL_ALL, String.valueOf(numRowsNormal)); + this.loadCounters.put(LoadEtlTask.DPP_ABNORMAL_ALL, String.valueOf(numRowsAbnormal)); + this.loadCounters.put(LoadJob.UNSELECTED_ROWS, String.valueOf(numRowsUnselected)); + this.loadCounters.put(LoadJob.LOADED_BYTES, String.valueOf(numLoadBytesTotal)); + } + + private long getCounterLongValueOrDefault(Map counters, String key, long defaultValue) { + String value = counters.get(key); + if (value != null) { + return Long.parseLong(value); + } + return defaultValue; + } + +} diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java new file mode 100644 index 0000000000000..431e9e425e723 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java @@ -0,0 +1,572 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe.scheduler.dag; + +import com.google.common.base.Preconditions; +import com.starrocks.analysis.DescriptorTable; +import com.starrocks.catalog.ResourceGroupClassifier; +import com.starrocks.common.util.CompressionUtils; +import com.starrocks.common.util.DebugUtil; +import com.starrocks.load.loadv2.BulkLoadJob; +import com.starrocks.planner.PlanFragment; +import com.starrocks.planner.ScanNode; +import com.starrocks.planner.StreamLoadPlanner; +import com.starrocks.qe.ConnectContext; +import com.starrocks.qe.SessionVariable; +import com.starrocks.sql.LoadPlanner; +import com.starrocks.thrift.TCompressionType; +import com.starrocks.thrift.TDescriptorTable; +import com.starrocks.thrift.TExecPlanFragmentParams; +import com.starrocks.thrift.TLoadJobType; +import com.starrocks.thrift.TQueryGlobals; +import com.starrocks.thrift.TQueryOptions; +import com.starrocks.thrift.TQueryType; +import com.starrocks.thrift.TUniqueId; +import com.starrocks.thrift.TWorkGroup; +import org.apache.commons.lang3.StringUtils; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static com.starrocks.qe.CoordinatorPreprocessor.genQueryGlobals; +import static com.starrocks.qe.CoordinatorPreprocessor.prepareResourceGroup; + +public class JobSpec { + + private static final long UNINITIALIZED_LOAD_JOB_ID = -1; + private long loadJobId; + + private TUniqueId queryId; + + private final List fragments; + private final List scanNodes; + /** + * copied from TQueryExecRequest; constant across all fragments + */ + private final TDescriptorTable descTable; + + private final ConnectContext connectContext; + private final boolean enablePipeline; + private final boolean enableStreamPipeline; + private final boolean isBlockQuery; + + private final boolean needReport; + + /** + * Why we use query global? + * When `NOW()` function is in sql, we need only one now(), + * but, we execute `NOW()` distributed. + * So we make a query global value here to make one `now()` value in one query process. + */ + private final TQueryGlobals queryGlobals; + private final TQueryOptions queryOptions; + private final TWorkGroup resourceGroup; + + private final String planProtocol; + + public static class Factory { + private Factory() { + } + + public static JobSpec fromQuerySpec(ConnectContext context, + List fragments, + List scanNodes, + TDescriptorTable descTable, + TQueryType queryType) { + TQueryOptions queryOptions = context.getSessionVariable().toThrift(); + queryOptions.setQuery_type(queryType); + + TQueryGlobals queryGlobals = genQueryGlobals(context.getStartTime(), + context.getSessionVariable().getTimeZone()); + if (context.getLastQueryId() != null) { + queryGlobals.setLast_query_id(context.getLastQueryId().toString()); + } + + return new Builder() + .queryId(context.getExecutionId()) + .fragments(fragments) + .scanNodes(scanNodes) + .descTable(descTable) + .enableStreamPipeline(false) + .isBlockQuery(false) + .needReport(context.getSessionVariable().isEnableProfile() || + context.getSessionVariable().isEnableBigQueryProfile()) + .queryGlobals(queryGlobals) + .queryOptions(queryOptions) + .commonProperties(context) + .setPlanProtocol(context.getSessionVariable().getThriftPlanProtocol()) + .build(); + } + + public static JobSpec fromMVMaintenanceJobSpec(ConnectContext context, + List fragments, + List scanNodes, + TDescriptorTable descTable) { + TQueryOptions queryOptions = context.getSessionVariable().toThrift(); + + TQueryGlobals queryGlobals = genQueryGlobals(context.getStartTime(), + context.getSessionVariable().getTimeZone()); + if (context.getLastQueryId() != null) { + queryGlobals.setLast_query_id(context.getLastQueryId().toString()); + } + + return new Builder() + .queryId(context.getExecutionId()) + .fragments(fragments) + .scanNodes(scanNodes) + .descTable(descTable) + .enableStreamPipeline(true) + .isBlockQuery(false) + .needReport(true) + .queryGlobals(queryGlobals) + .queryOptions(queryOptions) + .commonProperties(context) + .build(); + } + + public static JobSpec fromBrokerLoadJobSpec(LoadPlanner loadPlanner) { + ConnectContext context = loadPlanner.getContext(); + + TQueryOptions queryOptions = createBrokerLoadQueryOptions(loadPlanner); + + TQueryGlobals queryGlobals = genQueryGlobals(context.getStartTime(), + context.getSessionVariable().getTimeZone()); + if (context.getLastQueryId() != null) { + queryGlobals.setLast_query_id(context.getLastQueryId().toString()); + } + + return new JobSpec.Builder() + .loadJobId(loadPlanner.getLoadJobId()) + .queryId(loadPlanner.getLoadId()) + .fragments(loadPlanner.getFragments()) + .scanNodes(loadPlanner.getScanNodes()) + .descTable(loadPlanner.getDescTable().toThrift()) + .enableStreamPipeline(false) + .isBlockQuery(true) + .needReport(true) + .queryGlobals(queryGlobals) + .queryOptions(queryOptions) + .commonProperties(context) + .build(); + } + + public static JobSpec fromStreamLoadJobSpec(LoadPlanner loadPlanner) { + JobSpec jobSpec = fromBrokerLoadJobSpec(loadPlanner); + jobSpec.getQueryOptions().setLoad_job_type((TLoadJobType.STREAM_LOAD)); + return jobSpec; + } + + public static JobSpec fromBrokerExportSpec(ConnectContext context, + Long loadJobId, TUniqueId queryId, + DescriptorTable descTable, + List fragments, + List scanNodes, String timezone, + long startTime, + Map sessionVariables, + long execMemLimit) { + TQueryOptions queryOptions = new TQueryOptions(); + setSessionVariablesToLoadQueryOptions(queryOptions, sessionVariables); + queryOptions.setMem_limit(execMemLimit); + + TQueryGlobals queryGlobals = genQueryGlobals(startTime, timezone); + + return new JobSpec.Builder() + .loadJobId(loadJobId) + .queryId(queryId) + .fragments(fragments) + .scanNodes(scanNodes) + .descTable(descTable.toThrift()) + .enableStreamPipeline(false) + .isBlockQuery(true) + .needReport(true) + .queryGlobals(queryGlobals) + .queryOptions(queryOptions) + .commonProperties(context) + .build(); + } + + public static JobSpec fromNonPipelineBrokerLoadJobSpec(ConnectContext context, + Long loadJobId, TUniqueId queryId, + DescriptorTable descTable, + List fragments, + List scanNodes, + String timezone, + long startTime, + Map sessionVariables, + long execMemLimit) { + TQueryOptions queryOptions = new TQueryOptions(); + setSessionVariablesToLoadQueryOptions(queryOptions, sessionVariables); + queryOptions.setQuery_type(TQueryType.LOAD); + /* + * For broker load job, user only need to set mem limit by 'exec_mem_limit' property. + * And the variable 'load_mem_limit' does not make any effect. + * However, in order to ensure the consistency of semantics when executing on the BE side, + * and to prevent subsequent modification from incorrectly setting the load_mem_limit, + * here we use exec_mem_limit to directly override the load_mem_limit property. + */ + queryOptions.setMem_limit(execMemLimit); + queryOptions.setLoad_mem_limit(execMemLimit); + + TQueryGlobals queryGlobals = genQueryGlobals(startTime, timezone); + + return new JobSpec.Builder() + .loadJobId(loadJobId) + .queryId(queryId) + .fragments(fragments) + .scanNodes(scanNodes) + .descTable(descTable.toThrift()) + .enableStreamPipeline(false) + .isBlockQuery(true) + .needReport(true) + .queryGlobals(queryGlobals) + .queryOptions(queryOptions) + .commonProperties(context) + .build(); + } + + public static JobSpec fromSyncStreamLoadSpec(StreamLoadPlanner planner) { + TExecPlanFragmentParams params = planner.getExecPlanFragmentParams(); + TUniqueId queryId = params.getParams().getFragment_instance_id(); + + return new Builder() + .queryId(queryId) + .fragments(Collections.emptyList()) + .scanNodes(null) + .descTable(null) + .enableStreamPipeline(false) + .isBlockQuery(true) + .needReport(true) + .queryGlobals(null) + .queryOptions(null) + .enablePipeline(false) + .resourceGroup(null) + .build(); + } + + public static JobSpec mockJobSpec(ConnectContext context, + List fragments, + List scanNodes) { + TQueryOptions queryOptions = context.getSessionVariable().toThrift(); + + TQueryGlobals queryGlobals = genQueryGlobals(context.getStartTime(), + context.getSessionVariable().getTimeZone()); + if (context.getLastQueryId() != null) { + queryGlobals.setLast_query_id(context.getLastQueryId().toString()); + } + + return new Builder() + .queryId(context.getExecutionId()) + .fragments(fragments) + .scanNodes(scanNodes) + .descTable(null) + .enableStreamPipeline(false) + .isBlockQuery(false) + .needReport(false) + .queryGlobals(queryGlobals) + .queryOptions(queryOptions) + .enablePipeline(true) + .resourceGroup(null) + .build(); + } + + private static TQueryOptions createBrokerLoadQueryOptions(LoadPlanner loadPlanner) { + ConnectContext context = loadPlanner.getContext(); + TQueryOptions queryOptions = context.getSessionVariable().toThrift(); + + queryOptions + .setQuery_type(TQueryType.LOAD) + .setQuery_timeout((int) loadPlanner.getTimeout()) + .setLoad_mem_limit(loadPlanner.getLoadMemLimit()); + + // Don't set it explicit when zero. otherwise backend will take limit as zero. + long execMemLimit = loadPlanner.getExecMemLimit(); + if (execMemLimit > 0) { + queryOptions.setMem_limit(execMemLimit); + queryOptions.setQuery_mem_limit(execMemLimit); + } + + setSessionVariablesToLoadQueryOptions(queryOptions, loadPlanner.getSessionVariables()); + + return queryOptions; + } + + private static void setSessionVariablesToLoadQueryOptions(TQueryOptions queryOptions, + Map sessionVariables) { + if (sessionVariables == null) { + return; + } + + if (sessionVariables.containsKey(SessionVariable.LOAD_TRANSMISSION_COMPRESSION_TYPE)) { + final TCompressionType loadCompressionType = CompressionUtils.findTCompressionByName( + sessionVariables.get(SessionVariable.LOAD_TRANSMISSION_COMPRESSION_TYPE)); + if (loadCompressionType != null) { + queryOptions.setLoad_transmission_compression_type(loadCompressionType); + } + } + if (sessionVariables.containsKey(BulkLoadJob.LOG_REJECTED_RECORD_NUM_SESSION_VARIABLE_KEY)) { + queryOptions.setLog_rejected_record_num( + Long.parseLong(sessionVariables.get(BulkLoadJob.LOG_REJECTED_RECORD_NUM_SESSION_VARIABLE_KEY))); + } + } + } + + private JobSpec(Builder builder) { + this.loadJobId = builder.loadJobId; + + this.queryId = builder.queryId; + + this.fragments = builder.fragments; + this.scanNodes = builder.scanNodes; + this.descTable = builder.descTable; + + this.enablePipeline = builder.enablePipeline; + this.enableStreamPipeline = builder.enableStreamPipeline; + this.isBlockQuery = builder.isBlockQuery; + this.needReport = builder.needReport; + this.connectContext = builder.connectContext; + + this.queryGlobals = builder.queryGlobals; + this.queryOptions = builder.queryOptions; + this.resourceGroup = builder.resourceGroup; + this.planProtocol = builder.planProtocol; + } + + @Override + public String toString() { + return "jobSpecrmation{" + + "loadJobId=" + loadJobId + + ", queryId=" + DebugUtil.printId(queryId) + + ", enablePipeline=" + enablePipeline + + ", enableStreamPipeline=" + enableStreamPipeline + + ", isBlockQuery=" + isBlockQuery + + ", resourceGroup=" + resourceGroup + + '}'; + } + + public boolean isLoadType() { + return queryOptions.getQuery_type() == TQueryType.LOAD; + } + + public long getLoadJobId() { + return loadJobId; + } + + public void setLoadJobId(long loadJobId) { + this.loadJobId = loadJobId; + } + + public boolean isSetLoadJobId() { + return loadJobId != UNINITIALIZED_LOAD_JOB_ID; + } + + public TUniqueId getQueryId() { + return queryId; + } + + public void setQueryId(TUniqueId queryId) { + this.queryId = queryId; + } + + public List getFragments() { + return fragments; + } + + public List getScanNodes() { + return scanNodes; + } + + public TDescriptorTable getDescTable() { + return descTable; + } + + public boolean isEnablePipeline() { + return enablePipeline; + } + + public boolean isEnableStreamPipeline() { + return enableStreamPipeline; + } + + public TQueryGlobals getQueryGlobals() { + return queryGlobals; + } + + public TQueryOptions getQueryOptions() { + return queryOptions; + } + + public void setLoadJobType(TLoadJobType type) { + queryOptions.setLoad_job_type(type); + } + + public long getStartTimeMs() { + return this.queryGlobals.getTimestamp_ms(); + } + + public void setQueryTimeout(int timeoutSecond) { + this.queryOptions.setQuery_timeout(timeoutSecond); + } + + public TWorkGroup getResourceGroup() { + return resourceGroup; + } + + public boolean isBlockQuery() { + return isBlockQuery; + } + + public boolean isNeedReport() { + return needReport; + } + + public boolean isStatisticsJob() { + return connectContext.isStatisticsJob(); + } + + public boolean isNeedQueued() { + return connectContext.isNeedQueued(); + } + + public boolean isStreamLoad() { + return queryOptions.getLoad_job_type() == TLoadJobType.STREAM_LOAD; + } + + public String getPlanProtocol() { + return planProtocol; + } + + public void reset() { + fragments.forEach(PlanFragment::reset); + } + + public static class Builder { + private long loadJobId = UNINITIALIZED_LOAD_JOB_ID; + + private TUniqueId queryId; + private List fragments; + private List scanNodes; + private TDescriptorTable descTable; + + private boolean enablePipeline; + private boolean enableStreamPipeline; + private boolean isBlockQuery; + private boolean needReport; + private ConnectContext connectContext; + + private TQueryGlobals queryGlobals; + private TQueryOptions queryOptions; + private TWorkGroup resourceGroup; + private String planProtocol; + + public JobSpec build() { + return new JobSpec(this); + } + + public Builder commonProperties(ConnectContext context) { + TWorkGroup newResourceGroup = prepareResourceGroup( + context, ResourceGroupClassifier.QueryType.fromTQueryType(queryOptions.getQuery_type())); + this.resourceGroup(newResourceGroup); + + this.enablePipeline(isEnablePipeline(context, fragments)); + this.connectContext = context; + + return this; + } + + public Builder loadJobId(long loadJobId) { + this.loadJobId = loadJobId; + return this; + } + + public Builder queryId(TUniqueId queryId) { + this.queryId = Preconditions.checkNotNull(queryId); + return this; + } + + public Builder fragments(List fragments) { + this.fragments = fragments; + return this; + } + + public Builder scanNodes(List scanNodes) { + this.scanNodes = scanNodes; + return this; + } + + public Builder descTable(TDescriptorTable descTable) { + if (descTable != null) { + descTable.setIs_cached(false); + } + this.descTable = descTable; + return this; + } + + public Builder enableStreamPipeline(boolean enableStreamPipeline) { + this.enableStreamPipeline = enableStreamPipeline; + return this; + } + + public Builder isBlockQuery(boolean isBlockQuery) { + this.isBlockQuery = isBlockQuery; + return this; + } + + public Builder queryGlobals(TQueryGlobals queryGlobals) { + this.queryGlobals = queryGlobals; + return this; + } + + public Builder queryOptions(TQueryOptions queryOptions) { + this.queryOptions = queryOptions; + return this; + } + + private Builder enablePipeline(boolean enablePipeline) { + this.enablePipeline = enablePipeline; + return this; + } + + private Builder resourceGroup(TWorkGroup resourceGroup) { + this.resourceGroup = resourceGroup; + return this; + } + + private Builder needReport(boolean needReport) { + this.needReport = needReport; + return this; + } + + private Builder setPlanProtocol(String planProtocol) { + this.planProtocol = StringUtils.lowerCase(planProtocol); + return this; + } + + /** + * Whether it can use pipeline engine. + * + * @param connectContext It is null for broker broker export. + * @param fragments All the fragments need to execute. + * @return true if enabling pipeline in the session variable and all the fragments can use pipeline, + * otherwise false. + */ + private static boolean isEnablePipeline(ConnectContext connectContext, List fragments) { + return connectContext != null && + connectContext.getSessionVariable().isEnablePipelineEngine() && + fragments.stream().allMatch(PlanFragment::canUsePipeline); + } + } + +} diff --git a/gensrc/thrift/InternalService.thrift b/gensrc/thrift/InternalService.thrift index 97d75c63e9c2a..20d3cf2a324cf 100644 --- a/gensrc/thrift/InternalService.thrift +++ b/gensrc/thrift/InternalService.thrift @@ -241,6 +241,15 @@ struct TQueryOptions { 104: optional TOverflowMode overflow_mode = TOverflowMode.OUTPUT_NULL; 105: optional bool use_column_pool = true; +<<<<<<< HEAD +======= + + 106: optional bool enable_agg_spill_preaggregation; + 107: optional i64 global_runtime_filter_build_max_size; + 108: optional i64 runtime_filter_rpc_http_min_size; + + 109: optional i64 big_query_profile_second_threshold; +>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825)) }