From ff913f0e3944eae08ba6c77d838d51e87e2bd1ba Mon Sep 17 00:00:00 2001 From: zombee0 Date: Mon, 9 Oct 2023 14:46:23 +0800 Subject: [PATCH] [Enhancement]more deploy metrics and optimize deploy Signed-off-by: zombee0 --- be/src/connector/hive_connector.cpp | 4 +- be/src/connector/hive_connector.h | 2 +- be/src/formats/parquet/file_reader.cpp | 8 ++- be/src/runtime/descriptors.cpp | 50 +++++++++++++++++-- be/src/runtime/descriptors.h | 5 ++ .../com/starrocks/catalog/IcebergTable.java | 41 ++++++++++++--- .../java/com/starrocks/common/util/Util.java | 12 +++++ .../com/starrocks/qe/DefaultCoordinator.java | 2 +- .../com/starrocks/qe/SessionVariable.java | 9 ++++ .../com/starrocks/qe/scheduler/Deployer.java | 20 +++++++- .../dag/FragmentInstanceExecState.java | 21 +++++++- .../com/starrocks/rpc/AttachmentRequest.java | 18 +++++-- .../starrocks/rpc/BackendServiceClient.java | 31 +++++++++--- gensrc/thrift/Descriptors.thrift | 17 ++++++- 14 files changed, 203 insertions(+), 37 deletions(-) diff --git a/be/src/connector/hive_connector.cpp b/be/src/connector/hive_connector.cpp index e17de4abafe60..1361bba8747c1 100644 --- a/be/src/connector/hive_connector.cpp +++ b/be/src/connector/hive_connector.cpp @@ -202,7 +202,7 @@ Status HiveDataSource::_init_partition_values() { return Status::OK(); } -int32_t HiveDataSource::is_scan_range_indicate_const_column(SlotId id) const { +int32_t HiveDataSource::scan_range_indicate_const_column_index(SlotId id) const { if (!_scan_range.__isset.identity_partition_slot_ids) { return -1; } @@ -229,7 +229,7 @@ void HiveDataSource::_init_tuples_and_slots(RuntimeState* state) { _partition_index_in_chunk.push_back(i); _partition_index_in_hdfs_partition_columns.push_back(_hive_table->get_partition_col_index(slots[i])); _has_partition_columns = true; - } else if (int32_t index = is_scan_range_indicate_const_column(slots[i]->id()); index >= 0) { + } else if (int32_t index = scan_range_indicate_const_column_index(slots[i]->id()); index >= 0) { _partition_slots.push_back(slots[i]); _partition_index_in_chunk.push_back(i); _partition_index_in_hdfs_partition_columns.push_back(index); diff --git a/be/src/connector/hive_connector.h b/be/src/connector/hive_connector.h index 6075f5a9e2131..94158cf01d640 100644 --- a/be/src/connector/hive_connector.h +++ b/be/src/connector/hive_connector.h @@ -60,7 +60,7 @@ class HiveDataSource final : public DataSource { std::atomic* get_lazy_column_coalesce_counter() { return _provider->_scan_node->get_lazy_column_coalesce_counter(); } - int32_t is_scan_range_indicate_const_column(SlotId id) const; + int32_t scan_range_indicate_const_column_index(SlotId id) const; int64_t raw_rows_read() const override; int64_t num_rows_read() const override; diff --git a/be/src/formats/parquet/file_reader.cpp b/be/src/formats/parquet/file_reader.cpp index 69f53e53c1ec6..9c98b1b31858f 100644 --- a/be/src/formats/parquet/file_reader.cpp +++ b/be/src/formats/parquet/file_reader.cpp @@ -468,11 +468,9 @@ Status FileReader::_init_group_readers() { int64_t num_rows = _file_metadata->t_metadata().row_groups[i].num_rows; // for iceberg v2 pos delete if (_need_skip_rowids != nullptr && !_need_skip_rowids->empty()) { - auto start_str = _need_skip_rowids->lower_bound(row_group_first_row); - auto end_str = _need_skip_rowids->upper_bound(row_group_first_row + num_rows - 1); - for (; start_str != end_str; start_str++) { - num_rows--; - } + auto start_iter = _need_skip_rowids->lower_bound(row_group_first_row); + auto end_iter = _need_skip_rowids->upper_bound(row_group_first_row + num_rows - 1); + num_rows -= std::distance(start_iter, end_iter); } _total_row_count += num_rows; } else { diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index fe43904d5a132..fe50e264ef164 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -39,10 +39,14 @@ #include #include "common/object_pool.h" +#include "common/status.h" +#include "exprs/base64.h" #include "exprs/expr.h" #include "gen_cpp/Descriptors_types.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/descriptors.pb.h" +#include "util/compression/block_compression.h" +#include "util/thrift_util.h" namespace starrocks { using boost::algorithm::join; @@ -182,10 +186,6 @@ IcebergTableDescriptor::IcebergTableDescriptor(const TTableDescriptor& tdesc, Ob _columns = tdesc.icebergTable.columns; _t_iceberg_schema = tdesc.icebergTable.iceberg_schema; _partition_column_names = tdesc.icebergTable.partition_column_names; - for (const auto& entry : tdesc.icebergTable.partitions) { - auto* partition = pool->add(new HdfsPartitionDescriptor(tdesc.icebergTable, entry.second)); - _partition_id_to_desc_map[entry.first] = partition; - } } std::vector IcebergTableDescriptor::partition_index_in_schema() { @@ -213,6 +213,24 @@ const std::vector IcebergTableDescriptor::full_column_names() { return full_column_names; } +Status IcebergTableDescriptor::set_partition_desc_map(const starrocks::TIcebergTable& thrift_table, + starrocks::ObjectPool* pool) { + if (thrift_table.__isset.compressed_partitions) { + ASSIGN_OR_RETURN(TPartitionMap * tPartitionMap, + deserialize_partition_map(thrift_table.compressed_partitions, pool)); + for (const auto& entry : tPartitionMap->partitions) { + auto* partition = pool->add(new HdfsPartitionDescriptor(thrift_table, entry.second)); + _partition_id_to_desc_map[entry.first] = partition; + } + } else { + for (const auto& entry : thrift_table.partitions) { + auto* partition = pool->add(new HdfsPartitionDescriptor(thrift_table, entry.second)); + _partition_id_to_desc_map[entry.first] = partition; + } + } + return Status::OK(); +} + DeltaLakeTableDescriptor::DeltaLakeTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool) : HiveTableDescriptor(tdesc, pool) { _table_location = tdesc.deltaLakeTable.location; @@ -314,6 +332,29 @@ int HiveTableDescriptor::get_partition_col_index(const SlotDescriptor* slot) con return -1; } +StatusOr HiveTableDescriptor::deserialize_partition_map( + const TCompressedPartitionMap& compressed_partition_map, ObjectPool* pool) { + const std::string& base64_partition_map = compressed_partition_map.compressed_serialized_partitions; + std::string compressed_buf; + compressed_buf.resize(base64_partition_map.size() + 3); + base64_decode2(base64_partition_map.data(), base64_partition_map.size(), compressed_buf.data()); + compressed_buf.resize(compressed_partition_map.compressed_len); + + std::string uncompressed_buf; + uncompressed_buf.resize(compressed_partition_map.original_len); + Slice uncompress_output(uncompressed_buf); + const BlockCompressionCodec* zlib_uncompress_codec = nullptr; + RETURN_IF_ERROR(get_block_compression_codec(starrocks::CompressionTypePB::ZLIB, &zlib_uncompress_codec)); + RETURN_IF_ERROR(zlib_uncompress_codec->decompress(compressed_buf, &uncompress_output)); + + TPartitionMap* tPartitionMap = pool->add(new TPartitionMap()); + RETURN_IF_ERROR(deserialize_thrift_msg(reinterpret_cast(uncompress_output.data), + reinterpret_cast(&uncompress_output.size), TProtocolType::BINARY, + tPartitionMap)); + + return tPartitionMap; +} + // ============================================= OlapTableDescriptor::OlapTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) {} @@ -587,6 +628,7 @@ Status DescriptorTbl::create(RuntimeState* state, ObjectPool* pool, const TDescr } case TTableType::ICEBERG_TABLE: { auto* iceberg_desc = pool->add(new IcebergTableDescriptor(tdesc, pool)); + RETURN_IF_ERROR(iceberg_desc->set_partition_desc_map(tdesc.icebergTable, pool)); RETURN_IF_ERROR(iceberg_desc->create_key_exprs(state, pool, chunk_size)); desc = iceberg_desc; break; diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index d9df1a8e765ab..6fe75ed9ea3bb 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -203,6 +203,9 @@ class HiveTableDescriptor : public TableDescriptor { return Status::OK(); } + StatusOr deserialize_partition_map(const TCompressedPartitionMap& compressed_partition_map, + ObjectPool* pool); + protected: std::string _hdfs_base_path; std::vector _columns; @@ -230,6 +233,8 @@ class IcebergTableDescriptor : public HiveTableDescriptor { std::vector partition_index_in_schema(); bool has_base_path() const override { return true; } + Status set_partition_desc_map(const TIcebergTable& thrift_table, ObjectPool* pool); + private: TIcebergSchema _t_iceberg_schema; std::vector _partition_column_names; diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java index 6a57a23935240..77528e22dca04 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java @@ -29,14 +29,17 @@ import com.starrocks.analysis.Expr; import com.starrocks.analysis.LiteralExpr; import com.starrocks.common.io.Text; +import com.starrocks.common.util.Util; import com.starrocks.connector.exception.StarRocksConnectorException; import com.starrocks.connector.iceberg.IcebergApiConverter; import com.starrocks.connector.iceberg.IcebergCatalogType; import com.starrocks.server.CatalogMgr; import com.starrocks.server.GlobalStateMgr; import com.starrocks.thrift.TColumn; +import com.starrocks.thrift.TCompressedPartitionMap; import com.starrocks.thrift.THdfsPartition; import com.starrocks.thrift.TIcebergTable; +import com.starrocks.thrift.TPartitionMap; import com.starrocks.thrift.TTableDescriptor; import com.starrocks.thrift.TTableType; import org.apache.iceberg.BaseTable; @@ -47,11 +50,15 @@ import org.apache.iceberg.types.Types; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.Base64; import java.util.List; import java.util.Map; import java.util.Optional; @@ -261,14 +268,32 @@ public TTableDescriptor toThrift(List p tIcebergTable.setIceberg_schema(IcebergApiConverter.getTIcebergSchema(nativeTable.schema())); tIcebergTable.setPartition_column_names(getPartitionColumnNames()); - for (int i = 0; i < partitions.size(); i++) { - DescriptorTable.ReferencedPartitionInfo info = partitions.get(i); - PartitionKey key = info.getKey(); - long partitionId = info.getId(); - THdfsPartition tPartition = new THdfsPartition(); - List keys = key.getKeys(); - tPartition.setPartition_key_exprs(keys.stream().map(Expr::treeToThrift).collect(Collectors.toList())); - tIcebergTable.putToPartitions(partitionId, tPartition); + if (!partitions.isEmpty()) { + TPartitionMap tPartitionMap = new TPartitionMap(); + for (int i = 0; i < partitions.size(); i++) { + DescriptorTable.ReferencedPartitionInfo info = partitions.get(i); + PartitionKey key = info.getKey(); + long partitionId = info.getId(); + THdfsPartition tPartition = new THdfsPartition(); + List keys = key.getKeys(); + tPartition.setPartition_key_exprs(keys.stream().map(Expr::treeToThrift).collect(Collectors.toList())); + tPartitionMap.putToPartitions(partitionId, tPartition); + } + + // partition info may be very big, and it is the same in plan fragment send to every be. + // extract and serialize it as a string, will get better performance(about 3x in test). + try { + TSerializer serializer = new TSerializer(TBinaryProtocol::new); + byte[] bytes = serializer.serialize(tPartitionMap); + byte[] compressedBytes = Util.compress(bytes); + TCompressedPartitionMap tCompressedPartitionMap = new TCompressedPartitionMap(); + tCompressedPartitionMap.setOriginal_len(bytes.length); + tCompressedPartitionMap.setCompressed_len(compressedBytes.length); + tCompressedPartitionMap.setCompressed_serialized_partitions(Base64.getEncoder().encodeToString(compressedBytes)); + tIcebergTable.setCompressed_partitions(tCompressedPartitionMap); + } catch (TException | IOException ignore) { + tIcebergTable.setPartitions(tPartitionMap.getPartitions()); + } } TTableDescriptor tTableDescriptor = new TTableDescriptor(id, TTableType.ICEBERG_TABLE, diff --git a/fe/fe-core/src/main/java/com/starrocks/common/util/Util.java b/fe/fe-core/src/main/java/com/starrocks/common/util/Util.java index 6bedd2bb62a16..da786369e9487 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/util/Util.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/util/Util.java @@ -46,6 +46,7 @@ import org.apache.logging.log4j.Logger; import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -62,6 +63,8 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.function.Predicate; import java.util.zip.Adler32; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; public class Util { private static final Logger LOG = LogManager.getLogger(Util.class); @@ -456,4 +459,13 @@ public static void validateMetastoreUris(String uris) { public static String deriveAliasFromOrdinal(int ordinal) { return AUTO_GENERATED_EXPR_ALIAS_PREFIX + ordinal; } + + public static byte[] compress(byte[] input) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + Deflater deflater = new Deflater(); + try (DeflaterOutputStream dos = new DeflaterOutputStream(outputStream, deflater)) { + dos.write(input); + } + return outputStream.toByteArray(); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java b/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java index 9318ba4123311..87e9292a17c08 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java @@ -526,7 +526,7 @@ private void prepareResultSink() throws AnalysisException { private void deliverExecFragments(boolean needDeploy) throws RpcException, UserException { lock(); - try { + try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployLockInternalTime")) { Deployer deployer = new Deployer(connectContext, jobSpec, executionDAG, coordinatorPreprocessor.getCoordAddress(), this::handleErrorExecution); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index a48d57e0a6d88..e11d9478a5e16 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -406,6 +406,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable { public static final String ENABLE_ICEBERG_IDENTITY_COLUMN_OPTIMIZE = "enable_iceberg_identity_column_optimize"; + public static final String ENABLE_PLAN_SERIALIZE_CONCURRENTLY = "enable_plan_serialize_concurrently"; + public enum MaterializedViewRewriteMode { DISABLE, // disable materialized view rewrite DEFAULT, // default, choose the materialized view or not by cost optimizer @@ -1415,6 +1417,9 @@ public boolean isCboPredicateSubfieldPath() { @VarAttr(name = ENABLE_ICEBERG_IDENTITY_COLUMN_OPTIMIZE) private boolean enableIcebergIdentityColumnOptimize = true; + @VarAttr(name = ENABLE_PLAN_SERIALIZE_CONCURRENTLY) + private boolean enablePlanSerializeConcurrently = true; + public int getExprChildrenLimit() { return exprChildrenLimit; } @@ -2654,6 +2659,10 @@ public boolean getEnableIcebergIdentityColumnOptimize() { return enableIcebergIdentityColumnOptimize; } + public boolean getEnablePlanSerializeConcurrently() { + return enablePlanSerializeConcurrently; + } + // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java index e94858e5df86b..35060efc1aabb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java @@ -19,6 +19,8 @@ import com.google.common.collect.ImmutableList; import com.starrocks.common.Status; import com.starrocks.common.UserException; +import com.starrocks.common.profile.Timer; +import com.starrocks.common.profile.Tracers; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.scheduler.dag.ExecutionDAG; import com.starrocks.qe.scheduler.dag.ExecutionFragment; @@ -54,6 +56,7 @@ public class Deployer { private final TFragmentInstanceFactory tFragmentInstanceFactory; private final TDescriptorTable emptyDescTable; private final long deliveryTimeoutMs; + private boolean enablePlanSerializeConcurrently; private final FailureHandler failureHandler; @@ -76,6 +79,7 @@ public Deployer(ConnectContext context, this.deliveryTimeoutMs = Math.min(queryOptions.query_timeout, queryOptions.query_delivery_timeout) * 1000L; this.failureHandler = failureHandler; + this.enablePlanSerializeConcurrently = context.getSessionVariable().getEnablePlanSerializeConcurrently(); } public void deployFragments(List concurrentFragments, boolean needDeploy) @@ -91,10 +95,22 @@ public void deployFragments(List concurrentFragments, boolean return; } + if (enablePlanSerializeConcurrently) { + try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeploySerializeConcurrencyTime")) { + twoStageExecutionsToDeploy.stream().parallel().forEach( + executions -> executions.stream().parallel() + .forEach(FragmentInstanceExecState::serializeRequest)); + } + } + for (List executions : twoStageExecutionsToDeploy) { - executions.forEach(FragmentInstanceExecState::deployAsync); + try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployStageByStageTime")) { + executions.forEach(FragmentInstanceExecState::deployAsync); + } - waitForDeploymentCompletion(executions); + try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployWaitTime")) { + waitForDeploymentCompletion(executions); + } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentInstanceExecState.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentInstanceExecState.java index c69dab880f38b..de29c44bbb898 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentInstanceExecState.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentInstanceExecState.java @@ -25,6 +25,7 @@ import com.starrocks.proto.StatusPB; import com.starrocks.qe.QueryStatisticsItem; import com.starrocks.qe.SimpleScheduler; +import com.starrocks.rpc.AttachmentRequest; import com.starrocks.rpc.BackendServiceClient; import com.starrocks.rpc.RpcException; import com.starrocks.system.ComputeNode; @@ -38,6 +39,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; import org.jetbrains.annotations.NotNull; import java.util.Collection; @@ -83,6 +85,7 @@ public class FragmentInstanceExecState { * request and future will be cleaned after deployment completion. */ private TExecPlanFragmentParams requestToDeploy; + private byte[] serializedRequest; private Future deployFuture = null; private final int fragmentIndex; @@ -152,6 +155,15 @@ private FragmentInstanceExecState(JobSpec jobSpec, this.lastMissingHeartbeatTime = lastMissingHeartbeatTime; } + public void serializeRequest() { + TSerializer serializer = AttachmentRequest.getSerializer(jobSpec.getPlanProtocol()); + try { + serializedRequest = serializer.serialize(requestToDeploy); + } catch (TException ignore) { + // throw exception means serializedRequest will be empty, and then we will treat it as not serialized + } + } + /** * Deploy the fragment instance to the worker asynchronously. * The state transitions to DEPLOYING. @@ -161,8 +173,13 @@ public void deployAsync() { TNetworkAddress brpcAddress = worker.getBrpcAddress(); try { - deployFuture = BackendServiceClient.getInstance().execPlanFragmentAsync(brpcAddress, requestToDeploy, - jobSpec.getPlanProtocol()); + if (serializedRequest.length != 0) { + deployFuture = BackendServiceClient.getInstance().execPlanFragmentAsync(brpcAddress, serializedRequest, + jobSpec.getPlanProtocol()); + } else { + deployFuture = BackendServiceClient.getInstance().execPlanFragmentAsync(brpcAddress, requestToDeploy, + jobSpec.getPlanProtocol()); + } } catch (RpcException | TException e) { // DO NOT throw exception here, return a complete future with error code, // so that the following logic will cancel the fragment. diff --git a/fe/fe-core/src/main/java/com/starrocks/rpc/AttachmentRequest.java b/fe/fe-core/src/main/java/com/starrocks/rpc/AttachmentRequest.java index 43a40effa1dba..c3b120aa5a04c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/rpc/AttachmentRequest.java +++ b/fe/fe-core/src/main/java/com/starrocks/rpc/AttachmentRequest.java @@ -17,6 +17,8 @@ package com.starrocks.rpc; +import com.starrocks.common.profile.Timer; +import com.starrocks.common.profile.Tracers; import org.apache.commons.lang3.StringUtils; import org.apache.thrift.TBase; import org.apache.thrift.TDeserializer; @@ -32,8 +34,7 @@ public class AttachmentRequest { protected byte[] serializedRequest; protected byte[] serializedResult; - public , F extends TFieldIdEnum> void setRequest(TBase request, String protocol) - throws TException { + public static TSerializer getSerializer(String protocol) { TSerializer serializer; if (StringUtils.equalsIgnoreCase(protocol, "compact")) { serializer = new TSerializer(TCompactProtocol::new); @@ -43,8 +44,15 @@ public , F extends TFieldIdEnum> void setRequest(TBase, F extends TFieldIdEnum> void setRequest(TBase request, String protocol) + throws TException { + TSerializer serializer = getSerializer(protocol); + try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeploySerializeTime")) { + serializedRequest = serializer.serialize(request); + } } public , F extends TFieldIdEnum> void setRequest(TBase request) @@ -54,6 +62,10 @@ public , F extends TFieldIdEnum> void setRequest(TBase execPlanFragmentAsync( - TNetworkAddress address, TExecPlanFragmentParams tRequest, String protocol) - throws TException, RpcException { - final PExecPlanFragmentRequest pRequest = new PExecPlanFragmentRequest(); - pRequest.setAttachmentProtocol(protocol); - pRequest.setRequest(tRequest, protocol); - try { + private Future sendPlanFragmentAsync(TNetworkAddress address, PExecPlanFragmentRequest pRequest) + throws RpcException { + Tracers.count(Tracers.Module.SCHEDULER, "DeployDataSize", pRequest.serializedRequest.length); + try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployAsyncSendTime")) { final PBackendService service = BrpcProxy.getBackendService(address); return service.execPlanFragmentAsync(pRequest); } catch (NoSuchElementException e) { @@ -107,6 +106,24 @@ public Future execPlanFragmentAsync( } } + public Future execPlanFragmentAsync( + TNetworkAddress address, byte[] request, String protocol) + throws TException, RpcException { + final PExecPlanFragmentRequest pRequest = new PExecPlanFragmentRequest(); + pRequest.setAttachmentProtocol(protocol); + pRequest.setRequest(request); + return sendPlanFragmentAsync(address, pRequest); + } + + public Future execPlanFragmentAsync( + TNetworkAddress address, TExecPlanFragmentParams tRequest, String protocol) + throws TException, RpcException { + final PExecPlanFragmentRequest pRequest = new PExecPlanFragmentRequest(); + pRequest.setAttachmentProtocol(protocol); + pRequest.setRequest(tRequest, protocol); + return sendPlanFragmentAsync(address, pRequest); + } + public Future cancelPlanFragmentAsync( TNetworkAddress address, TUniqueId queryId, TUniqueId finstId, PPlanFragmentCancelReason cancelReason, boolean isPipeline) throws RpcException { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 995ebadfd23ca..5b0f9b44f97fb 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -417,6 +417,16 @@ struct TIcebergSchemaField { 100: optional list children } +struct TPartitionMap { + 1: optional map partitions +} + +struct TCompressedPartitionMap { + 1: optional i32 original_len + 2: optional i32 compressed_len + 3: optional string compressed_serialized_partitions +} + struct TIcebergTable { // table location 1: optional string location @@ -430,8 +440,11 @@ struct TIcebergTable { // partition column names 4: optional list partition_column_names - // Map from partition id to partition metadata. - 5: optional map partitions + // partition map may be very big, serialize costs too much, just use serialized byte[] + 5: optional TCompressedPartitionMap compressed_partitions + + // if serialize partition info throws exception, then use unserialized partitions + 6: optional map partitions } struct THudiTable {