Skip to content

Commit

Permalink
[Enhancement]more deploy metrics and optimize deploy
Browse files Browse the repository at this point in the history
Signed-off-by: zombee0 <[email protected]>
  • Loading branch information
zombee0 committed Oct 17, 2023
1 parent 5a22f73 commit ff913f0
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 37 deletions.
4 changes: 2 additions & 2 deletions be/src/connector/hive_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ Status HiveDataSource::_init_partition_values() {
return Status::OK();
}

int32_t HiveDataSource::is_scan_range_indicate_const_column(SlotId id) const {
int32_t HiveDataSource::scan_range_indicate_const_column_index(SlotId id) const {
if (!_scan_range.__isset.identity_partition_slot_ids) {
return -1;
}
Expand All @@ -229,7 +229,7 @@ void HiveDataSource::_init_tuples_and_slots(RuntimeState* state) {
_partition_index_in_chunk.push_back(i);
_partition_index_in_hdfs_partition_columns.push_back(_hive_table->get_partition_col_index(slots[i]));
_has_partition_columns = true;
} else if (int32_t index = is_scan_range_indicate_const_column(slots[i]->id()); index >= 0) {
} else if (int32_t index = scan_range_indicate_const_column_index(slots[i]->id()); index >= 0) {
_partition_slots.push_back(slots[i]);
_partition_index_in_chunk.push_back(i);
_partition_index_in_hdfs_partition_columns.push_back(index);
Expand Down
2 changes: 1 addition & 1 deletion be/src/connector/hive_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class HiveDataSource final : public DataSource {
std::atomic<int32_t>* get_lazy_column_coalesce_counter() {
return _provider->_scan_node->get_lazy_column_coalesce_counter();
}
int32_t is_scan_range_indicate_const_column(SlotId id) const;
int32_t scan_range_indicate_const_column_index(SlotId id) const;

int64_t raw_rows_read() const override;
int64_t num_rows_read() const override;
Expand Down
8 changes: 3 additions & 5 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,11 +468,9 @@ Status FileReader::_init_group_readers() {
int64_t num_rows = _file_metadata->t_metadata().row_groups[i].num_rows;
// for iceberg v2 pos delete
if (_need_skip_rowids != nullptr && !_need_skip_rowids->empty()) {
auto start_str = _need_skip_rowids->lower_bound(row_group_first_row);
auto end_str = _need_skip_rowids->upper_bound(row_group_first_row + num_rows - 1);
for (; start_str != end_str; start_str++) {
num_rows--;
}
auto start_iter = _need_skip_rowids->lower_bound(row_group_first_row);
auto end_iter = _need_skip_rowids->upper_bound(row_group_first_row + num_rows - 1);
num_rows -= std::distance(start_iter, end_iter);
}
_total_row_count += num_rows;
} else {
Expand Down
50 changes: 46 additions & 4 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@
#include <sstream>

#include "common/object_pool.h"
#include "common/status.h"
#include "exprs/base64.h"
#include "exprs/expr.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/descriptors.pb.h"
#include "util/compression/block_compression.h"
#include "util/thrift_util.h"

namespace starrocks {
using boost::algorithm::join;
Expand Down Expand Up @@ -182,10 +186,6 @@ IcebergTableDescriptor::IcebergTableDescriptor(const TTableDescriptor& tdesc, Ob
_columns = tdesc.icebergTable.columns;
_t_iceberg_schema = tdesc.icebergTable.iceberg_schema;
_partition_column_names = tdesc.icebergTable.partition_column_names;
for (const auto& entry : tdesc.icebergTable.partitions) {
auto* partition = pool->add(new HdfsPartitionDescriptor(tdesc.icebergTable, entry.second));
_partition_id_to_desc_map[entry.first] = partition;
}
}

std::vector<int32_t> IcebergTableDescriptor::partition_index_in_schema() {
Expand Down Expand Up @@ -213,6 +213,24 @@ const std::vector<std::string> IcebergTableDescriptor::full_column_names() {
return full_column_names;
}

Status IcebergTableDescriptor::set_partition_desc_map(const starrocks::TIcebergTable& thrift_table,
starrocks::ObjectPool* pool) {
if (thrift_table.__isset.compressed_partitions) {
ASSIGN_OR_RETURN(TPartitionMap * tPartitionMap,
deserialize_partition_map(thrift_table.compressed_partitions, pool));
for (const auto& entry : tPartitionMap->partitions) {
auto* partition = pool->add(new HdfsPartitionDescriptor(thrift_table, entry.second));
_partition_id_to_desc_map[entry.first] = partition;
}
} else {
for (const auto& entry : thrift_table.partitions) {
auto* partition = pool->add(new HdfsPartitionDescriptor(thrift_table, entry.second));
_partition_id_to_desc_map[entry.first] = partition;
}
}
return Status::OK();
}

DeltaLakeTableDescriptor::DeltaLakeTableDescriptor(const TTableDescriptor& tdesc, ObjectPool* pool)
: HiveTableDescriptor(tdesc, pool) {
_table_location = tdesc.deltaLakeTable.location;
Expand Down Expand Up @@ -314,6 +332,29 @@ int HiveTableDescriptor::get_partition_col_index(const SlotDescriptor* slot) con
return -1;
}

StatusOr<TPartitionMap*> HiveTableDescriptor::deserialize_partition_map(
const TCompressedPartitionMap& compressed_partition_map, ObjectPool* pool) {
const std::string& base64_partition_map = compressed_partition_map.compressed_serialized_partitions;
std::string compressed_buf;
compressed_buf.resize(base64_partition_map.size() + 3);
base64_decode2(base64_partition_map.data(), base64_partition_map.size(), compressed_buf.data());
compressed_buf.resize(compressed_partition_map.compressed_len);

std::string uncompressed_buf;
uncompressed_buf.resize(compressed_partition_map.original_len);
Slice uncompress_output(uncompressed_buf);
const BlockCompressionCodec* zlib_uncompress_codec = nullptr;
RETURN_IF_ERROR(get_block_compression_codec(starrocks::CompressionTypePB::ZLIB, &zlib_uncompress_codec));
RETURN_IF_ERROR(zlib_uncompress_codec->decompress(compressed_buf, &uncompress_output));

TPartitionMap* tPartitionMap = pool->add(new TPartitionMap());
RETURN_IF_ERROR(deserialize_thrift_msg(reinterpret_cast<uint8_t*>(uncompress_output.data),
reinterpret_cast<uint32_t*>(&uncompress_output.size), TProtocolType::BINARY,
tPartitionMap));

return tPartitionMap;
}

// =============================================

OlapTableDescriptor::OlapTableDescriptor(const TTableDescriptor& tdesc) : TableDescriptor(tdesc) {}
Expand Down Expand Up @@ -587,6 +628,7 @@ Status DescriptorTbl::create(RuntimeState* state, ObjectPool* pool, const TDescr
}
case TTableType::ICEBERG_TABLE: {
auto* iceberg_desc = pool->add(new IcebergTableDescriptor(tdesc, pool));
RETURN_IF_ERROR(iceberg_desc->set_partition_desc_map(tdesc.icebergTable, pool));
RETURN_IF_ERROR(iceberg_desc->create_key_exprs(state, pool, chunk_size));
desc = iceberg_desc;
break;
Expand Down
5 changes: 5 additions & 0 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ class HiveTableDescriptor : public TableDescriptor {
return Status::OK();
}

StatusOr<TPartitionMap*> deserialize_partition_map(const TCompressedPartitionMap& compressed_partition_map,
ObjectPool* pool);

protected:
std::string _hdfs_base_path;
std::vector<TColumn> _columns;
Expand Down Expand Up @@ -230,6 +233,8 @@ class IcebergTableDescriptor : public HiveTableDescriptor {
std::vector<int32_t> partition_index_in_schema();
bool has_base_path() const override { return true; }

Status set_partition_desc_map(const TIcebergTable& thrift_table, ObjectPool* pool);

private:
TIcebergSchema _t_iceberg_schema;
std::vector<std::string> _partition_column_names;
Expand Down
41 changes: 33 additions & 8 deletions fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.LiteralExpr;
import com.starrocks.common.io.Text;
import com.starrocks.common.util.Util;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.iceberg.IcebergApiConverter;
import com.starrocks.connector.iceberg.IcebergCatalogType;
import com.starrocks.server.CatalogMgr;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.thrift.TColumn;
import com.starrocks.thrift.TCompressedPartitionMap;
import com.starrocks.thrift.THdfsPartition;
import com.starrocks.thrift.TIcebergTable;
import com.starrocks.thrift.TPartitionMap;
import com.starrocks.thrift.TTableDescriptor;
import com.starrocks.thrift.TTableType;
import org.apache.iceberg.BaseTable;
Expand All @@ -47,11 +50,15 @@
import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TBinaryProtocol;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -261,14 +268,32 @@ public TTableDescriptor toThrift(List<DescriptorTable.ReferencedPartitionInfo> p
tIcebergTable.setIceberg_schema(IcebergApiConverter.getTIcebergSchema(nativeTable.schema()));
tIcebergTable.setPartition_column_names(getPartitionColumnNames());

for (int i = 0; i < partitions.size(); i++) {
DescriptorTable.ReferencedPartitionInfo info = partitions.get(i);
PartitionKey key = info.getKey();
long partitionId = info.getId();
THdfsPartition tPartition = new THdfsPartition();
List<LiteralExpr> keys = key.getKeys();
tPartition.setPartition_key_exprs(keys.stream().map(Expr::treeToThrift).collect(Collectors.toList()));
tIcebergTable.putToPartitions(partitionId, tPartition);
if (!partitions.isEmpty()) {
TPartitionMap tPartitionMap = new TPartitionMap();
for (int i = 0; i < partitions.size(); i++) {
DescriptorTable.ReferencedPartitionInfo info = partitions.get(i);
PartitionKey key = info.getKey();
long partitionId = info.getId();
THdfsPartition tPartition = new THdfsPartition();
List<LiteralExpr> keys = key.getKeys();
tPartition.setPartition_key_exprs(keys.stream().map(Expr::treeToThrift).collect(Collectors.toList()));
tPartitionMap.putToPartitions(partitionId, tPartition);
}

// partition info may be very big, and it is the same in plan fragment send to every be.
// extract and serialize it as a string, will get better performance(about 3x in test).
try {
TSerializer serializer = new TSerializer(TBinaryProtocol::new);
byte[] bytes = serializer.serialize(tPartitionMap);
byte[] compressedBytes = Util.compress(bytes);
TCompressedPartitionMap tCompressedPartitionMap = new TCompressedPartitionMap();
tCompressedPartitionMap.setOriginal_len(bytes.length);
tCompressedPartitionMap.setCompressed_len(compressedBytes.length);
tCompressedPartitionMap.setCompressed_serialized_partitions(Base64.getEncoder().encodeToString(compressedBytes));
tIcebergTable.setCompressed_partitions(tCompressedPartitionMap);
} catch (TException | IOException ignore) {
tIcebergTable.setPartitions(tPartitionMap.getPartitions());
}
}

TTableDescriptor tTableDescriptor = new TTableDescriptor(id, TTableType.ICEBERG_TABLE,
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.logging.log4j.Logger;

import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand All @@ -62,6 +63,8 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import java.util.zip.Adler32;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;

public class Util {
private static final Logger LOG = LogManager.getLogger(Util.class);
Expand Down Expand Up @@ -456,4 +459,13 @@ public static void validateMetastoreUris(String uris) {
public static String deriveAliasFromOrdinal(int ordinal) {
return AUTO_GENERATED_EXPR_ALIAS_PREFIX + ordinal;
}

public static byte[] compress(byte[] input) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Deflater deflater = new Deflater();
try (DeflaterOutputStream dos = new DeflaterOutputStream(outputStream, deflater)) {
dos.write(input);
}
return outputStream.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ private void prepareResultSink() throws AnalysisException {

private void deliverExecFragments(boolean needDeploy) throws RpcException, UserException {
lock();
try {
try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployLockInternalTime")) {
Deployer deployer =
new Deployer(connectContext, jobSpec, executionDAG, coordinatorPreprocessor.getCoordAddress(),
this::handleErrorExecution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {

public static final String ENABLE_ICEBERG_IDENTITY_COLUMN_OPTIMIZE = "enable_iceberg_identity_column_optimize";

public static final String ENABLE_PLAN_SERIALIZE_CONCURRENTLY = "enable_plan_serialize_concurrently";

public enum MaterializedViewRewriteMode {
DISABLE, // disable materialized view rewrite
DEFAULT, // default, choose the materialized view or not by cost optimizer
Expand Down Expand Up @@ -1415,6 +1417,9 @@ public boolean isCboPredicateSubfieldPath() {
@VarAttr(name = ENABLE_ICEBERG_IDENTITY_COLUMN_OPTIMIZE)
private boolean enableIcebergIdentityColumnOptimize = true;

@VarAttr(name = ENABLE_PLAN_SERIALIZE_CONCURRENTLY)
private boolean enablePlanSerializeConcurrently = true;

public int getExprChildrenLimit() {
return exprChildrenLimit;
}
Expand Down Expand Up @@ -2654,6 +2659,10 @@ public boolean getEnableIcebergIdentityColumnOptimize() {
return enableIcebergIdentityColumnOptimize;
}

public boolean getEnablePlanSerializeConcurrently() {
return enablePlanSerializeConcurrently;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down
20 changes: 18 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.common.collect.ImmutableList;
import com.starrocks.common.Status;
import com.starrocks.common.UserException;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.scheduler.dag.ExecutionDAG;
import com.starrocks.qe.scheduler.dag.ExecutionFragment;
Expand Down Expand Up @@ -54,6 +56,7 @@ public class Deployer {
private final TFragmentInstanceFactory tFragmentInstanceFactory;
private final TDescriptorTable emptyDescTable;
private final long deliveryTimeoutMs;
private boolean enablePlanSerializeConcurrently;

private final FailureHandler failureHandler;

Expand All @@ -76,6 +79,7 @@ public Deployer(ConnectContext context,
this.deliveryTimeoutMs = Math.min(queryOptions.query_timeout, queryOptions.query_delivery_timeout) * 1000L;

this.failureHandler = failureHandler;
this.enablePlanSerializeConcurrently = context.getSessionVariable().getEnablePlanSerializeConcurrently();
}

public void deployFragments(List<ExecutionFragment> concurrentFragments, boolean needDeploy)
Expand All @@ -91,10 +95,22 @@ public void deployFragments(List<ExecutionFragment> concurrentFragments, boolean
return;
}

if (enablePlanSerializeConcurrently) {
try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeploySerializeConcurrencyTime")) {
twoStageExecutionsToDeploy.stream().parallel().forEach(
executions -> executions.stream().parallel()
.forEach(FragmentInstanceExecState::serializeRequest));
}
}

for (List<FragmentInstanceExecState> executions : twoStageExecutionsToDeploy) {
executions.forEach(FragmentInstanceExecState::deployAsync);
try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployStageByStageTime")) {
executions.forEach(FragmentInstanceExecState::deployAsync);
}

waitForDeploymentCompletion(executions);
try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployWaitTime")) {
waitForDeploymentCompletion(executions);
}
}
}

Expand Down
Loading

0 comments on commit ff913f0

Please sign in to comment.