Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] adaptive enable the exchange compression #54956

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,20 @@ CONF_String(default_query_options, "");
// or 3x the number of cores. This keeps the cores busy without causing excessive
// thrashing.
CONF_Int32(num_threads_per_core, "3");

// Compression related parameters
// If true, compresses tuple data in Serialize.
CONF_Bool(compress_rowbatches, "true");
// Compress ratio when shuffle row_batches in network, not in storage engine.
// If ratio is less than this value, use uncompressed data instead.
CONF_mDouble(rpc_compress_ratio_threshold, "1.1");
// Acceleration of LZ4 Compression, the larger the acceleration value, the faster the algorithm, but also the lesser the compression.
// Default 1, MIN=1, MAX=65537
CONF_mInt32(lz4_acceleration, "1");
// If compression ratio is larger than this threshold, consider it as a good compresiosn
CONF_mDouble(lz4_expected_compression_ratio, "2.1");
CONF_mDouble(lz4_expected_compression_speed_mbps, "600");

// Serialize and deserialize each returned row batch.
CONF_Bool(serialize_batch, "false");
// Interval between profile reports; in seconds.
Expand Down
35 changes: 27 additions & 8 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "runtime/exec_env.h"
#include "runtime/local_pass_through_buffer.h"
#include "runtime/runtime_state.h"
#include "serde/compress_strategy.h"
#include "serde/protobuf_serde.h"
#include "service/brpc.h"
#include "util/compression/block_compression.h"
Expand Down Expand Up @@ -397,7 +398,13 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
}
// Set compression type according to query options
if (state->query_options().__isset.transmission_compression_type) {
_compress_type = CompressionUtils::to_compression_pb(state->query_options().transmission_compression_type);
TCompressionType::type type = state->query_options().transmission_compression_type;
if (type == TCompressionType::AUTO) {
_compress_type = CompressionTypePB::LZ4;
_compress_strategy = std::make_shared<serde::CompressStrategy>();
} else {
_compress_type = CompressionUtils::to_compression_pb(state->query_options().transmission_compression_type);
}
} else if (config::compress_rowbatches) {
// If transmission_compression_type is not set, use compress_rowbatches to check if
// compress transmitted data.
Expand Down Expand Up @@ -436,7 +443,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
std::shuffle(_channel_indices.begin(), _channel_indices.end(), std::mt19937(std::random_device()()));

_bytes_pass_through_counter = ADD_COUNTER(_unique_metrics, "BytesPassThrough", TUnit::BYTES);
_sender_input_bytes_counter = ADD_COUNTER(_unique_metrics, "SenderInputBytes", TUnit::BYTES);
_raw_input_bytes_counter = ADD_COUNTER(_unique_metrics, "RawInputBytes", TUnit::BYTES);
_serialized_bytes_counter = ADD_COUNTER(_unique_metrics, "SerializedBytes", TUnit::BYTES);
_compressed_bytes_counter = ADD_COUNTER(_unique_metrics, "CompressedBytes", TUnit::BYTES);

Expand Down Expand Up @@ -666,10 +673,11 @@ void ExchangeSinkOperator::close(RuntimeState* state) {

Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, bool* is_first_chunk, int num_receivers) {
VLOG_ROW << "[ExchangeSinkOperator] serializing " << src->num_rows() << " rows";
auto send_input_bytes = serde::ProtobufChunkSerde::max_serialized_size(*src, nullptr);
COUNTER_UPDATE(_sender_input_bytes_counter, send_input_bytes * num_receivers);
auto unserialized_bytes = src->bytes_usage();
COUNTER_UPDATE(_raw_input_bytes_counter, unserialized_bytes * num_receivers);
int64_t serialization_time_ns = 0;
{
SCOPED_TIMER(_serialize_chunk_timer);
ScopedTimer<MonotonicStopWatch> _timer(_serialize_chunk_timer);
// We only serialize chunk meta for first chunk
if (*is_first_chunk) {
_encode_context = serde::EncodeContext::get_encode_context_shared_ptr(src->columns().size(), _encode_level);
Expand All @@ -684,6 +692,7 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo
RETURN_IF_ERROR(res);
res->Swap(dst);
}
serialization_time_ns = _timer.elapsed_time();
}
if (_encode_context) {
_encode_context->set_encode_levels_in_pb(dst);
Expand All @@ -699,8 +708,12 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo
}

// try compress the ChunkPB data
if (_compress_codec != nullptr && serialized_size > 0) {
SCOPED_TIMER(_compress_timer);
bool use_compression = true;
if (_compress_strategy) {
use_compression = _compress_strategy->decide();
}
if (_compress_codec != nullptr && serialized_size > 0 && use_compression) {
ScopedTimer<MonotonicStopWatch> _timer(_compress_timer);

if (use_compression_pool(_compress_codec->type())) {
Slice compressed_slice;
Expand All @@ -720,14 +733,20 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo
RETURN_IF_ERROR(_compress_codec->compress(input, &compressed_slice));
_compression_scratch.resize(compressed_slice.size);
}
if (_compress_strategy) {
int64_t compression_time_ns = _timer.elapsed_time();
_compress_strategy->feedback(serialized_size, _compression_scratch.size(), serialization_time_ns,
compression_time_ns);
}

COUNTER_UPDATE(_compressed_bytes_counter, _compression_scratch.size() * num_receivers);
double compress_ratio = (static_cast<double>(serialized_size)) / _compression_scratch.size();
VLOG_ROW << "chunk compression: uncompressed size: " << serialized_size
<< ", compressed size: " << _compression_scratch.size();
if (LIKELY(compress_ratio > config::rpc_compress_ratio_threshold)) {
dst->mutable_data()->swap(reinterpret_cast<std::string&>(_compression_scratch));
dst->set_compress_type(_compress_type);
}
VLOG_ROW << "uncompressed size: " << serialized_size << ", compressed size: " << _compression_scratch.size();
}
return Status::OK();
}
murphyatwork marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
7 changes: 4 additions & 3 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "exec/pipeline/operator.h"
#include "gen_cpp/data.pb.h"
#include "gen_cpp/internal_service.pb.h"
#include "serde/compress_strategy.h"
#include "serde/protobuf_serde.h"
#include "util/raw_container.h"
#include "util/runtime_profile.h"
Expand Down Expand Up @@ -171,14 +172,16 @@ class ExchangeSinkOperator final : public Operator {

CompressionTypePB _compress_type = CompressionTypePB::NO_COMPRESSION;
const BlockCompressionCodec* _compress_codec = nullptr;
std::shared_ptr<serde::EncodeContext> _encode_context = nullptr;
std::shared_ptr<serde::CompressStrategy> _compress_strategy;

RuntimeProfile::Counter* _serialize_chunk_timer = nullptr;
RuntimeProfile::Counter* _shuffle_hash_timer = nullptr;
RuntimeProfile::Counter* _shuffle_chunk_append_counter = nullptr;
RuntimeProfile::Counter* _shuffle_chunk_append_timer = nullptr;
RuntimeProfile::Counter* _compress_timer = nullptr;
RuntimeProfile::Counter* _bytes_pass_through_counter = nullptr;
RuntimeProfile::Counter* _sender_input_bytes_counter = nullptr;
RuntimeProfile::Counter* _raw_input_bytes_counter = nullptr;
RuntimeProfile::Counter* _serialized_bytes_counter = nullptr;
RuntimeProfile::Counter* _compressed_bytes_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _pass_through_buffer_peak_mem_usage = nullptr;
Expand Down Expand Up @@ -208,8 +211,6 @@ class ExchangeSinkOperator final : public Operator {
const std::vector<int32_t>& _output_columns;

std::unique_ptr<Shuffler> _shuffler;

std::shared_ptr<serde::EncodeContext> _encode_context = nullptr;
};

class ExchangeSinkOperatorFactory final : public OperatorFactory {
Expand Down
1 change: 1 addition & 0 deletions be/src/serde/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/serde")
add_library(Serde STATIC
encode_context.cpp
column_array_serde.cpp
compress_strategy.cpp
protobuf_serde.cpp
)
53 changes: 53 additions & 0 deletions be/src/serde/compress_strategy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "serde/compress_strategy.h"

#include <cmath>
#include <random>

#include "common/config.h"

namespace starrocks::serde {

CompressStrategy::CompressStrategy() : _gen(std::random_device()()) {}

void CompressStrategy::feedback(uint64_t uncompressed_bytes, uint64_t compressed_bytes, uint64_t serialization_time_ns,
uint64_t compression_time_ns) {
if (uncompressed_bytes == 0 || compressed_bytes == 0 || compression_time_ns == 0) {
return;
}
double compress_speed = uncompressed_bytes / compression_time_ns * (1e9 / 1024 / 1024); // MB/s
double compress_ratio = uncompressed_bytes / compressed_bytes;
double reward_ratio = (compress_ratio / config::lz4_expected_compression_ratio) *
(compress_speed / config::lz4_expected_compression_speed_mbps);
if (reward_ratio > 1.0) {
_alpha += 1;
} else {
_beta += 1;
}
}

bool CompressStrategy::decide() {
std::gamma_distribution<double> gamma_alpha(_alpha, 1.0);
std::gamma_distribution<double> gamma_beta(_beta, 1.0);

double sample_alpha = gamma_alpha(_gen);
double sample_beta = gamma_beta(_gen);

double theta = sample_alpha / (sample_alpha + sample_beta);
return theta > 0.5;
}

} // namespace starrocks::serde
murphyatwork marked this conversation as resolved.
Show resolved Hide resolved
43 changes: 43 additions & 0 deletions be/src/serde/compress_strategy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <cstdint>
#include <random>

namespace starrocks::serde {

// Compression strategy based on Thompson Sampling
class CompressStrategy {
public:
CompressStrategy();
~CompressStrategy() = default;

// Give the feedback based on previous compression
void feedback(uint64_t uncompressed_bytes, uint64_t compressed_bytes, uint64_t serialization_time_ns,
uint64_t compression_time_ns);

// Make the decision for the next compression
bool decide();

private:
std::mt19937 _gen;

// Thompson sampling parameters, biased for TRUE value
double _alpha = 3.0; // Success count
double _beta = 1.0; // Failure count
};

} // namespace starrocks::serde
murphyatwork marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 4 additions & 4 deletions be/src/serde/encode_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ void EncodeContext::_adjust(const int col_id) {
_column_encode_level[col_id] = 0;
}
if (old_level != _column_encode_level[col_id] || _session_encode_level < -1) {
VLOG_ROW << "Old encode level " << old_level << " is changed to " << _column_encode_level[col_id]
<< " because the first " << EncodeSamplingNum << " of " << _frequency << " in total " << _times
<< " chunks' compression ratio is " << _encoded_bytes[col_id] * 1.0 / _raw_bytes[col_id]
<< " higher than limit " << EncodeRatioLimit;
VLOG_ROW << "column " << col_id << " encode_level changed from " << old_level << " to "
<< _column_encode_level[col_id] << " because the first " << EncodeSamplingNum << " of " << _frequency
<< " in total " << _times << " chunks' compression ratio is "
<< _encoded_bytes[col_id] * 1.0 / _raw_bytes[col_id] << " higher than limit " << EncodeRatioLimit;
}
_encoded_bytes[col_id] = 0;
_raw_bytes[col_id] = 0;
Expand Down
4 changes: 3 additions & 1 deletion be/src/util/compression/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

#include "util/compression/block_compression.h"

#include "common/config.h"

#ifdef __x86_64__
#include <libdeflate.h>
#endif
Expand Down Expand Up @@ -148,7 +150,7 @@ class Lz4BlockCompression : public BlockCompressionCodec {
}
}

int32_t acceleration = 1;
int32_t acceleration = config::lz4_acceleration;
size_t compressed_size =
LZ4_compress_fast_continue(ctx, input.data, output->data, input.size, output->size, acceleration);

Expand Down
2 changes: 2 additions & 0 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,8 @@ class ScopedTimer {

void start() { _sw.start(); }

int64_t elapsed_time() { return _sw.elapsed_time(); }

bool is_cancelled() { return _is_cancelled != nullptr && *_is_cancelled; }

void UpdateCounter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class CompressionUtils {
private static final ImmutableMap<String, TCompressionType> T_COMPRESSION_BY_NAME =
(new ImmutableSortedMap.Builder<String, TCompressionType>(String.CASE_INSENSITIVE_ORDER))
.put("NO_COMPRESSION", TCompressionType.NO_COMPRESSION)
.put("AUTO", TCompressionType.AUTO)
.put("LZ4", TCompressionType.LZ4)
.put("LZ4_FRAME", TCompressionType.LZ4_FRAME)
.put("SNAPPY", TCompressionType.SNAPPY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1409,8 +1409,8 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = NEW_PLANER_AGG_STAGE)
private int newPlannerAggStage = SessionVariableConstants.AggregationStage.AUTO.ordinal();

@VariableMgr.VarAttr(name = TRANSMISSION_COMPRESSION_TYPE)
private String transmissionCompressionType = "NO_COMPRESSION";
@VariableMgr.VarAttr(name = TRANSMISSION_COMPRESSION_TYPE)
private String transmissionCompressionType = "AUTO";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it be compatibility issue if FE upgraded to the newer version while the BE still stays in old version, especially during the upgrade.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. it's a session variable, so the behavior on BE is totally driven by FE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that means during the upgrade, FE sends to BE with "AUTO" but BE doesn't recognize and fail the request? or BE just fallback to the no_compression behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the best practice is upgrading the BE first

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't wise to count on the "best practice" which doesn't exist on any doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, actually it's a common practice of StarRocks, otherwise it would be over-complicated to add some new stuff in the codebase. the common practice is to use a session variable to control the behavior, so before upgrading the FE this new feature will not be enabled.


// if a packet's size is larger than RPC_HTTP_MIN_SIZE, it will use RPC via http, as the std rpc has 2GB size limit.
// the setting size is a bit smaller than 2GB, as the pre-computed serialization size of packets may not accurate.
Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/Types.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ enum TCompressionType {
BZIP2 = 10;
LZO = 11; // Deprecated
BROTLI = 12;
AUTO = 13;
}

enum TWriteQuorumType {
Expand Down
Loading