diff --git a/be/src/common/config.h b/be/src/common/config.h index 44532ee947994..13c783e768e48 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -208,11 +208,20 @@ CONF_String(default_query_options, ""); // or 3x the number of cores. This keeps the cores busy without causing excessive // thrashing. CONF_Int32(num_threads_per_core, "3"); + +// Compression related parameters // If true, compresses tuple data in Serialize. CONF_Bool(compress_rowbatches, "true"); // Compress ratio when shuffle row_batches in network, not in storage engine. // If ratio is less than this value, use uncompressed data instead. CONF_mDouble(rpc_compress_ratio_threshold, "1.1"); +// Acceleration of LZ4 Compression, the larger the acceleration value, the faster the algorithm, but also the lesser the compression. +// Default 1, MIN=1, MAX=65537 +CONF_mInt32(lz4_acceleration, "1"); +// If compression ratio is larger than this threshold, consider it as a good compresiosn +CONF_mDouble(lz4_expected_compression_ratio, "2.1"); +CONF_mDouble(lz4_expected_compression_speed_mbps, "600"); + // Serialize and deserialize each returned row batch. CONF_Bool(serialize_batch, "false"); // Interval between profile reports; in seconds. diff --git a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp index b4fd49eb39b70..a2696a2a65f54 100644 --- a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp +++ b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp @@ -31,6 +31,7 @@ #include "runtime/exec_env.h" #include "runtime/local_pass_through_buffer.h" #include "runtime/runtime_state.h" +#include "serde/compress_strategy.h" #include "serde/protobuf_serde.h" #include "service/brpc.h" #include "util/compression/block_compression.h" @@ -397,7 +398,13 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) { } // Set compression type according to query options if (state->query_options().__isset.transmission_compression_type) { - _compress_type = CompressionUtils::to_compression_pb(state->query_options().transmission_compression_type); + TCompressionType::type type = state->query_options().transmission_compression_type; + if (type == TCompressionType::AUTO) { + _compress_type = CompressionTypePB::LZ4; + _compress_strategy = std::make_shared(); + } else { + _compress_type = CompressionUtils::to_compression_pb(state->query_options().transmission_compression_type); + } } else if (config::compress_rowbatches) { // If transmission_compression_type is not set, use compress_rowbatches to check if // compress transmitted data. @@ -436,7 +443,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) { std::shuffle(_channel_indices.begin(), _channel_indices.end(), std::mt19937(std::random_device()())); _bytes_pass_through_counter = ADD_COUNTER(_unique_metrics, "BytesPassThrough", TUnit::BYTES); - _sender_input_bytes_counter = ADD_COUNTER(_unique_metrics, "SenderInputBytes", TUnit::BYTES); + _raw_input_bytes_counter = ADD_COUNTER(_unique_metrics, "RawInputBytes", TUnit::BYTES); _serialized_bytes_counter = ADD_COUNTER(_unique_metrics, "SerializedBytes", TUnit::BYTES); _compressed_bytes_counter = ADD_COUNTER(_unique_metrics, "CompressedBytes", TUnit::BYTES); @@ -666,10 +673,11 @@ void ExchangeSinkOperator::close(RuntimeState* state) { Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, bool* is_first_chunk, int num_receivers) { VLOG_ROW << "[ExchangeSinkOperator] serializing " << src->num_rows() << " rows"; - auto send_input_bytes = serde::ProtobufChunkSerde::max_serialized_size(*src, nullptr); - COUNTER_UPDATE(_sender_input_bytes_counter, send_input_bytes * num_receivers); + auto unserialized_bytes = src->bytes_usage(); + COUNTER_UPDATE(_raw_input_bytes_counter, unserialized_bytes * num_receivers); + int64_t serialization_time_ns = 0; { - SCOPED_TIMER(_serialize_chunk_timer); + ScopedTimer _timer(_serialize_chunk_timer); // We only serialize chunk meta for first chunk if (*is_first_chunk) { _encode_context = serde::EncodeContext::get_encode_context_shared_ptr(src->columns().size(), _encode_level); @@ -684,6 +692,7 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo RETURN_IF_ERROR(res); res->Swap(dst); } + serialization_time_ns = _timer.elapsed_time(); } if (_encode_context) { _encode_context->set_encode_levels_in_pb(dst); @@ -699,8 +708,12 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo } // try compress the ChunkPB data - if (_compress_codec != nullptr && serialized_size > 0) { - SCOPED_TIMER(_compress_timer); + bool use_compression = true; + if (_compress_strategy) { + use_compression = _compress_strategy->decide(); + } + if (_compress_codec != nullptr && serialized_size > 0 && use_compression) { + ScopedTimer _timer(_compress_timer); if (use_compression_pool(_compress_codec->type())) { Slice compressed_slice; @@ -720,14 +733,20 @@ Status ExchangeSinkOperator::serialize_chunk(const Chunk* src, ChunkPB* dst, boo RETURN_IF_ERROR(_compress_codec->compress(input, &compressed_slice)); _compression_scratch.resize(compressed_slice.size); } + if (_compress_strategy) { + int64_t compression_time_ns = _timer.elapsed_time(); + _compress_strategy->feedback(serialized_size, _compression_scratch.size(), serialization_time_ns, + compression_time_ns); + } COUNTER_UPDATE(_compressed_bytes_counter, _compression_scratch.size() * num_receivers); double compress_ratio = (static_cast(serialized_size)) / _compression_scratch.size(); + VLOG_ROW << "chunk compression: uncompressed size: " << serialized_size + << ", compressed size: " << _compression_scratch.size(); if (LIKELY(compress_ratio > config::rpc_compress_ratio_threshold)) { dst->mutable_data()->swap(reinterpret_cast(_compression_scratch)); dst->set_compress_type(_compress_type); } - VLOG_ROW << "uncompressed size: " << serialized_size << ", compressed size: " << _compression_scratch.size(); } return Status::OK(); } diff --git a/be/src/exec/pipeline/exchange/exchange_sink_operator.h b/be/src/exec/pipeline/exchange/exchange_sink_operator.h index f873f34f0af21..344be0013184e 100644 --- a/be/src/exec/pipeline/exchange/exchange_sink_operator.h +++ b/be/src/exec/pipeline/exchange/exchange_sink_operator.h @@ -28,6 +28,7 @@ #include "exec/pipeline/operator.h" #include "gen_cpp/data.pb.h" #include "gen_cpp/internal_service.pb.h" +#include "serde/compress_strategy.h" #include "serde/protobuf_serde.h" #include "util/raw_container.h" #include "util/runtime_profile.h" @@ -171,6 +172,8 @@ class ExchangeSinkOperator final : public Operator { CompressionTypePB _compress_type = CompressionTypePB::NO_COMPRESSION; const BlockCompressionCodec* _compress_codec = nullptr; + std::shared_ptr _encode_context = nullptr; + std::shared_ptr _compress_strategy; RuntimeProfile::Counter* _serialize_chunk_timer = nullptr; RuntimeProfile::Counter* _shuffle_hash_timer = nullptr; @@ -178,7 +181,7 @@ class ExchangeSinkOperator final : public Operator { RuntimeProfile::Counter* _shuffle_chunk_append_timer = nullptr; RuntimeProfile::Counter* _compress_timer = nullptr; RuntimeProfile::Counter* _bytes_pass_through_counter = nullptr; - RuntimeProfile::Counter* _sender_input_bytes_counter = nullptr; + RuntimeProfile::Counter* _raw_input_bytes_counter = nullptr; RuntimeProfile::Counter* _serialized_bytes_counter = nullptr; RuntimeProfile::Counter* _compressed_bytes_counter = nullptr; RuntimeProfile::HighWaterMarkCounter* _pass_through_buffer_peak_mem_usage = nullptr; @@ -208,8 +211,6 @@ class ExchangeSinkOperator final : public Operator { const std::vector& _output_columns; std::unique_ptr _shuffler; - - std::shared_ptr _encode_context = nullptr; }; class ExchangeSinkOperatorFactory final : public OperatorFactory { diff --git a/be/src/serde/CMakeLists.txt b/be/src/serde/CMakeLists.txt index 3b0ad2c80a061..30d1137d479b6 100644 --- a/be/src/serde/CMakeLists.txt +++ b/be/src/serde/CMakeLists.txt @@ -20,5 +20,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/serde") add_library(Serde STATIC encode_context.cpp column_array_serde.cpp + compress_strategy.cpp protobuf_serde.cpp ) diff --git a/be/src/serde/compress_strategy.cpp b/be/src/serde/compress_strategy.cpp new file mode 100644 index 0000000000000..cd447974e1586 --- /dev/null +++ b/be/src/serde/compress_strategy.cpp @@ -0,0 +1,53 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "serde/compress_strategy.h" + +#include +#include + +#include "common/config.h" + +namespace starrocks::serde { + +CompressStrategy::CompressStrategy() : _gen(std::random_device()()) {} + +void CompressStrategy::feedback(uint64_t uncompressed_bytes, uint64_t compressed_bytes, uint64_t serialization_time_ns, + uint64_t compression_time_ns) { + if (uncompressed_bytes == 0 || compressed_bytes == 0 || compression_time_ns == 0) { + return; + } + double compress_speed = uncompressed_bytes / compression_time_ns * (1e9 / 1024 / 1024); // MB/s + double compress_ratio = uncompressed_bytes / compressed_bytes; + double reward_ratio = (compress_ratio / config::lz4_expected_compression_ratio) * + (compress_speed / config::lz4_expected_compression_speed_mbps); + if (reward_ratio > 1.0) { + _alpha += 1; + } else { + _beta += 1; + } +} + +bool CompressStrategy::decide() { + std::gamma_distribution gamma_alpha(_alpha, 1.0); + std::gamma_distribution gamma_beta(_beta, 1.0); + + double sample_alpha = gamma_alpha(_gen); + double sample_beta = gamma_beta(_gen); + + double theta = sample_alpha / (sample_alpha + sample_beta); + return theta > 0.5; +} + +} // namespace starrocks::serde diff --git a/be/src/serde/compress_strategy.h b/be/src/serde/compress_strategy.h new file mode 100644 index 0000000000000..44b051dff4984 --- /dev/null +++ b/be/src/serde/compress_strategy.h @@ -0,0 +1,43 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace starrocks::serde { + +// Compression strategy based on Thompson Sampling +class CompressStrategy { +public: + CompressStrategy(); + ~CompressStrategy() = default; + + // Give the feedback based on previous compression + void feedback(uint64_t uncompressed_bytes, uint64_t compressed_bytes, uint64_t serialization_time_ns, + uint64_t compression_time_ns); + + // Make the decision for the next compression + bool decide(); + +private: + std::mt19937 _gen; + + // Thompson sampling parameters, biased for TRUE value + double _alpha = 3.0; // Success count + double _beta = 1.0; // Failure count +}; + +} // namespace starrocks::serde \ No newline at end of file diff --git a/be/src/serde/encode_context.cpp b/be/src/serde/encode_context.cpp index 6f9a58c8cd242..53635be08a618 100644 --- a/be/src/serde/encode_context.cpp +++ b/be/src/serde/encode_context.cpp @@ -50,10 +50,10 @@ void EncodeContext::_adjust(const int col_id) { _column_encode_level[col_id] = 0; } if (old_level != _column_encode_level[col_id] || _session_encode_level < -1) { - VLOG_ROW << "Old encode level " << old_level << " is changed to " << _column_encode_level[col_id] - << " because the first " << EncodeSamplingNum << " of " << _frequency << " in total " << _times - << " chunks' compression ratio is " << _encoded_bytes[col_id] * 1.0 / _raw_bytes[col_id] - << " higher than limit " << EncodeRatioLimit; + VLOG_ROW << "column " << col_id << " encode_level changed from " << old_level << " to " + << _column_encode_level[col_id] << " because the first " << EncodeSamplingNum << " of " << _frequency + << " in total " << _times << " chunks' compression ratio is " + << _encoded_bytes[col_id] * 1.0 / _raw_bytes[col_id] << " higher than limit " << EncodeRatioLimit; } _encoded_bytes[col_id] = 0; _raw_bytes[col_id] = 0; diff --git a/be/src/util/compression/block_compression.cpp b/be/src/util/compression/block_compression.cpp index 26c6861ba887f..443ea8324e770 100644 --- a/be/src/util/compression/block_compression.cpp +++ b/be/src/util/compression/block_compression.cpp @@ -34,6 +34,8 @@ #include "util/compression/block_compression.h" +#include "common/config.h" + #ifdef __x86_64__ #include #endif @@ -148,7 +150,7 @@ class Lz4BlockCompression : public BlockCompressionCodec { } } - int32_t acceleration = 1; + int32_t acceleration = config::lz4_acceleration; size_t compressed_size = LZ4_compress_fast_continue(ctx, input.data, output->data, input.size, output->size, acceleration); diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h index ceb6f34302c34..aa8c15d1159ee 100644 --- a/be/src/util/runtime_profile.h +++ b/be/src/util/runtime_profile.h @@ -735,6 +735,8 @@ class ScopedTimer { void start() { _sw.start(); } + int64_t elapsed_time() { return _sw.elapsed_time(); } + bool is_cancelled() { return _is_cancelled != nullptr && *_is_cancelled; } void UpdateCounter() { diff --git a/fe/fe-core/src/main/java/com/starrocks/common/util/CompressionUtils.java b/fe/fe-core/src/main/java/com/starrocks/common/util/CompressionUtils.java index ea37c2afd2eda..470c09193060a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/util/CompressionUtils.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/util/CompressionUtils.java @@ -36,6 +36,7 @@ public class CompressionUtils { private static final ImmutableMap T_COMPRESSION_BY_NAME = (new ImmutableSortedMap.Builder(String.CASE_INSENSITIVE_ORDER)) .put("NO_COMPRESSION", TCompressionType.NO_COMPRESSION) + .put("AUTO", TCompressionType.AUTO) .put("LZ4", TCompressionType.LZ4) .put("LZ4_FRAME", TCompressionType.LZ4_FRAME) .put("SNAPPY", TCompressionType.SNAPPY) diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 36058a0bf132b..f6e91070d2b65 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -1409,8 +1409,8 @@ public static MaterializedViewRewriteMode parse(String str) { @VariableMgr.VarAttr(name = NEW_PLANER_AGG_STAGE) private int newPlannerAggStage = SessionVariableConstants.AggregationStage.AUTO.ordinal(); - @VariableMgr.VarAttr(name = TRANSMISSION_COMPRESSION_TYPE) - private String transmissionCompressionType = "NO_COMPRESSION"; + @VariableMgr.VarAttr(name = TRANSMISSION_COMPRESSION_TYPE) + private String transmissionCompressionType = "AUTO"; // if a packet's size is larger than RPC_HTTP_MIN_SIZE, it will use RPC via http, as the std rpc has 2GB size limit. // the setting size is a bit smaller than 2GB, as the pre-computed serialization size of packets may not accurate. diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index ddabbdf0309ac..bfeec183e8916 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -530,6 +530,7 @@ enum TCompressionType { BZIP2 = 10; LZO = 11; // Deprecated BROTLI = 12; + AUTO = 13; } enum TWriteQuorumType {