From 851149e6cbb365d98582020da81935e1ffecbb0b Mon Sep 17 00:00:00 2001 From: sevev Date: Fri, 10 Jan 2025 15:55:27 +0800 Subject: [PATCH 1/4] write combined txn log parallel Signed-off-by: sevev --- be/src/common/config.h | 3 ++ be/src/exec/tablet_sink_sender.cpp | 12 ++++++ be/src/exec/write_combined_txn_log.cpp | 53 ++++++++++++++++++++++++++ be/src/exec/write_combined_txn_log.h | 1 + be/src/runtime/exec_env.cpp | 9 +++++ be/src/runtime/exec_env.h | 2 + 6 files changed, 80 insertions(+) diff --git a/be/src/common/config.h b/be/src/common/config.h index 6ae9b972b1c44f..727fc471ddbd71 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1536,4 +1536,7 @@ CONF_mBool(avro_ignore_union_type_tag, "false"); // default batch size for simdjson lib CONF_mInt32(json_parse_many_batch_size, "1000000"); CONF_mBool(enable_dynamic_batch_size_for_json_parse_many, "true"); +CONF_mInt32(put_combined_txn_log_thread_pool_num_max, "64"); +CONF_mBool(enable_put_combinded_txn_log_parallel, "false"); +CONF_mBool(output_combined_txn_log, "false"); } // namespace starrocks::config diff --git a/be/src/exec/tablet_sink_sender.cpp b/be/src/exec/tablet_sink_sender.cpp index ff7316d1cff1da..1981209d8dc776 100644 --- a/be/src/exec/tablet_sink_sender.cpp +++ b/be/src/exec/tablet_sink_sender.cpp @@ -370,10 +370,22 @@ bool TabletSinkSender::get_immutable_partition_ids(std::set* partition_ } Status TabletSinkSender::_write_combined_txn_log() { + int64_t t_start = MonotonicMillis(); + if (config::enable_put_combinded_txn_log_parallel) { + Status st = write_combined_txn_log(_txn_log_map); + if (config::output_combined_txn_log) { + LOG(INFO) << "write_combined_txn_log parallel cost time: " << MonotonicMillis() - t_start; + } + return st; + } + for (const auto& [partition_id, logs] : _txn_log_map) { (void)partition_id; RETURN_IF_ERROR(write_combined_txn_log(logs)); } + if (config::output_combined_txn_log) { + LOG(INFO) << "write_combined_txn_log non-parallel cost time: " << MonotonicMillis() - t_start; + } return Status::OK(); } diff --git a/be/src/exec/write_combined_txn_log.cpp b/be/src/exec/write_combined_txn_log.cpp index 3d3c94014476e2..e678e9ee967876 100644 --- a/be/src/exec/write_combined_txn_log.cpp +++ b/be/src/exec/write_combined_txn_log.cpp @@ -14,14 +14,67 @@ #include "exec/write_combined_txn_log.h" +#include +#include + #include "runtime/exec_env.h" #include "storage/lake/tablet_manager.h" namespace starrocks { +class TxnLogTask : public Runnable { +public: + TxnLogTask(const CombinedTxnLogPB& logs, lake::TabletManager* tablet_mgr, std::promise promise) + : _logs(logs), _tablet_mgr(tablet_mgr), _promise(std::move(promise)) {} + + void run() override { + try { + Status status = _tablet_mgr->put_combined_txn_log(_logs); + if (!status.ok()) { + throw std::runtime_error("Log write failed"); + } + _promise.set_value(Status::OK()); + } catch (const std::exception& e) { + _promise.set_value(Status::IOError(e.what())); + } + } + +private: + CombinedTxnLogPB _logs; + lake::TabletManager* _tablet_mgr; + std::promise _promise; +}; + Status write_combined_txn_log(const CombinedTxnLogPB& logs) { auto tablet_mgr = ExecEnv::GetInstance()->lake_tablet_manager(); return tablet_mgr->put_combined_txn_log(logs); } +Status write_combined_txn_log(const std::map& txn_log_map) { + std::vector> futures; + std::vector> tasks; + + for (const auto& [partition_id, logs] : txn_log_map) { + (void)partition_id; + std::promise promise; + futures.push_back(promise.get_future()); + std::shared_ptr r( + std::make_shared(logs, ExecEnv::GetInstance()->lake_tablet_manager(), std::move(promise))); + tasks.emplace_back(std::move(r)); + } + + for (auto& task : tasks) { + RETURN_IF_ERROR(ExecEnv::GetInstance()->put_combined_txn_log_thread_pool()->submit(task)); + } + + for (auto& future : futures) { + Status status = future.get(); + if (!status.ok()) { + return status; + } + } + + return Status::OK(); +} + } // namespace starrocks \ No newline at end of file diff --git a/be/src/exec/write_combined_txn_log.h b/be/src/exec/write_combined_txn_log.h index 99fec2b812ece9..d356c381fc500c 100644 --- a/be/src/exec/write_combined_txn_log.h +++ b/be/src/exec/write_combined_txn_log.h @@ -20,5 +20,6 @@ namespace starrocks { class CombinedTxnLogPB; Status write_combined_txn_log(const CombinedTxnLogPB& logs); +Status write_combined_txn_log(const std::map& txn_log_map); } // namespace starrocks diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index c0a4dc1269023e..07d66a9bd2c03b 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -482,6 +482,7 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { std::unique_ptr load_rowset_pool; std::unique_ptr load_segment_pool; + std::unique_ptr put_combined_txn_log_thread_pool; RETURN_IF_ERROR( ThreadPoolBuilder("load_rowset_pool") .set_min_threads(0) @@ -501,6 +502,14 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { _load_segment_thread_pool = load_segment_pool.release(); _broker_mgr = new BrokerMgr(this); + + RETURN_IF_ERROR(ThreadPoolBuilder("put_combined_txn_log_thread_pool") + .set_min_threads(0) + .set_max_threads(config::put_combined_txn_log_thread_pool_num_max) + .set_idle_timeout(MonoDelta::FromMilliseconds(500)) + .build(&put_combined_txn_log_thread_pool)); + _put_combined_txn_log_thread_pool = put_combined_txn_log_thread_pool.release(); + #ifndef BE_TEST _bfd_parser = BfdParser::create(); #endif diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 5cc0360f3f6c12..209419d439fdad 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -278,6 +278,7 @@ class ExecEnv { ThreadPool* streaming_load_thread_pool() { return _streaming_load_thread_pool; } ThreadPool* load_rowset_thread_pool() { return _load_rowset_thread_pool; } ThreadPool* load_segment_thread_pool() { return _load_segment_thread_pool; } + ThreadPool* put_combined_txn_log_thread_pool() { return _put_combined_txn_log_thread_pool; } pipeline::DriverExecutor* wg_driver_executor(); workgroup::ScanExecutor* scan_executor(); @@ -369,6 +370,7 @@ class ExecEnv { ThreadPool* _load_segment_thread_pool = nullptr; ThreadPool* _load_rowset_thread_pool = nullptr; + ThreadPool* _put_combined_txn_log_thread_pool = nullptr; PriorityThreadPool* _udf_call_pool = nullptr; PriorityThreadPool* _pipeline_prepare_pool = nullptr; From 6e05e085123b253be09c23e266e9290b831d5b34 Mon Sep 17 00:00:00 2001 From: sevev Date: Thu, 16 Jan 2025 16:01:14 +0800 Subject: [PATCH 2/4] update Signed-off-by: sevev --- be/src/common/config.h | 1 - be/src/exec/tablet_sink_sender.cpp | 10 +--------- be/src/exec/write_combined_txn_log.cpp | 2 +- be/src/runtime/exec_env.cpp | 1 + 4 files changed, 3 insertions(+), 11 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 727fc471ddbd71..9a2570f8066297 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1538,5 +1538,4 @@ CONF_mInt32(json_parse_many_batch_size, "1000000"); CONF_mBool(enable_dynamic_batch_size_for_json_parse_many, "true"); CONF_mInt32(put_combined_txn_log_thread_pool_num_max, "64"); CONF_mBool(enable_put_combinded_txn_log_parallel, "false"); -CONF_mBool(output_combined_txn_log, "false"); } // namespace starrocks::config diff --git a/be/src/exec/tablet_sink_sender.cpp b/be/src/exec/tablet_sink_sender.cpp index 1981209d8dc776..f7c66b073d5456 100644 --- a/be/src/exec/tablet_sink_sender.cpp +++ b/be/src/exec/tablet_sink_sender.cpp @@ -370,22 +370,14 @@ bool TabletSinkSender::get_immutable_partition_ids(std::set* partition_ } Status TabletSinkSender::_write_combined_txn_log() { - int64_t t_start = MonotonicMillis(); if (config::enable_put_combinded_txn_log_parallel) { - Status st = write_combined_txn_log(_txn_log_map); - if (config::output_combined_txn_log) { - LOG(INFO) << "write_combined_txn_log parallel cost time: " << MonotonicMillis() - t_start; - } - return st; + return write_combined_txn_log(_txn_log_map); } for (const auto& [partition_id, logs] : _txn_log_map) { (void)partition_id; RETURN_IF_ERROR(write_combined_txn_log(logs)); } - if (config::output_combined_txn_log) { - LOG(INFO) << "write_combined_txn_log non-parallel cost time: " << MonotonicMillis() - t_start; - } return Status::OK(); } diff --git a/be/src/exec/write_combined_txn_log.cpp b/be/src/exec/write_combined_txn_log.cpp index e678e9ee967876..1f10699e4de51d 100644 --- a/be/src/exec/write_combined_txn_log.cpp +++ b/be/src/exec/write_combined_txn_log.cpp @@ -24,7 +24,7 @@ namespace starrocks { class TxnLogTask : public Runnable { public: - TxnLogTask(const CombinedTxnLogPB& logs, lake::TabletManager* tablet_mgr, std::promise promise) + TxnLogTask(const CombinedTxnLogPB logs, lake::TabletManager* tablet_mgr, std::promise promise) : _logs(logs), _tablet_mgr(tablet_mgr), _promise(std::move(promise)) {} void run() override { diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 07d66a9bd2c03b..5828677b2aa811 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -759,6 +759,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_lake_update_manager); SAFE_DELETE(_lake_replication_txn_manager); SAFE_DELETE(_cache_mgr); + SAFE_DELETE(_put_combined_txn_log_thread_pool); _dictionary_cache_pool.reset(); _automatic_partition_pool.reset(); _metrics = nullptr; From 9ae5b57bbdba21b975c08e6d60f9bd4b51bf2aff Mon Sep 17 00:00:00 2001 From: sevev Date: Fri, 17 Jan 2025 11:34:49 +0800 Subject: [PATCH 3/4] update Signed-off-by: sevev --- be/src/exec/write_combined_txn_log.cpp | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/be/src/exec/write_combined_txn_log.cpp b/be/src/exec/write_combined_txn_log.cpp index 1f10699e4de51d..3d3421ab8203df 100644 --- a/be/src/exec/write_combined_txn_log.cpp +++ b/be/src/exec/write_combined_txn_log.cpp @@ -24,12 +24,12 @@ namespace starrocks { class TxnLogTask : public Runnable { public: - TxnLogTask(const CombinedTxnLogPB logs, lake::TabletManager* tablet_mgr, std::promise promise) + TxnLogTask(const CombinedTxnLogPB* logs, lake::TabletManager* tablet_mgr, std::promise promise) : _logs(logs), _tablet_mgr(tablet_mgr), _promise(std::move(promise)) {} void run() override { try { - Status status = _tablet_mgr->put_combined_txn_log(_logs); + Status status = _tablet_mgr->put_combined_txn_log(*_logs); if (!status.ok()) { throw std::runtime_error("Log write failed"); } @@ -40,7 +40,7 @@ class TxnLogTask : public Runnable { } private: - CombinedTxnLogPB _logs; + const CombinedTxnLogPB* _logs; lake::TabletManager* _tablet_mgr; std::promise _promise; }; @@ -59,7 +59,7 @@ Status write_combined_txn_log(const std::map& txn_log std::promise promise; futures.push_back(promise.get_future()); std::shared_ptr r( - std::make_shared(logs, ExecEnv::GetInstance()->lake_tablet_manager(), std::move(promise))); + std::make_shared(&logs, ExecEnv::GetInstance()->lake_tablet_manager(), std::move(promise))); tasks.emplace_back(std::move(r)); } @@ -67,14 +67,15 @@ Status write_combined_txn_log(const std::map& txn_log RETURN_IF_ERROR(ExecEnv::GetInstance()->put_combined_txn_log_thread_pool()->submit(task)); } + Status st; for (auto& future : futures) { Status status = future.get(); - if (!status.ok()) { - return status; + if (st.ok() && !status.ok()) { + st = status; } } - return Status::OK(); + return st; } } // namespace starrocks \ No newline at end of file From e9a47d6679a2ea51988dfe823f38cefca7216444 Mon Sep 17 00:00:00 2001 From: sevev Date: Fri, 17 Jan 2025 17:55:29 +0800 Subject: [PATCH 4/4] add ut Signed-off-by: sevev --- be/src/storage/lake/tablet_manager.cpp | 5 ++ be/test/CMakeLists.txt | 1 + .../lake/write_combined_txn_log_test.cpp | 54 +++++++++++++++++++ 3 files changed, 60 insertions(+) create mode 100644 be/test/storage/lake/write_combined_txn_log_test.cpp diff --git a/be/src/storage/lake/tablet_manager.cpp b/be/src/storage/lake/tablet_manager.cpp index e1f4c4cfcd218c..584158e81a91ad 100644 --- a/be/src/storage/lake/tablet_manager.cpp +++ b/be/src/storage/lake/tablet_manager.cpp @@ -48,6 +48,7 @@ #include "storage/rowset/segment.h" #include "storage/tablet_schema_map.h" #include "testutil/sync_point.h" +#include "util/failpoint/fail_point.h" #include "util/raw_container.h" #include "util/trace.h" @@ -458,7 +459,11 @@ Status TabletManager::put_txn_vlog(const TxnLogPtr& log, int64_t version) { return put_txn_log(log, txn_vlog_location(log->tablet_id(), version)); } +DEFINE_FAIL_POINT(put_combined_txn_log_success); +DEFINE_FAIL_POINT(put_combined_txn_log_fail); Status TabletManager::put_combined_txn_log(const starrocks::CombinedTxnLogPB& logs) { + FAIL_POINT_TRIGGER_RETURN(put_combined_txn_log_success, Status::OK()); + FAIL_POINT_TRIGGER_RETURN(put_combined_txn_log_fail, Status::InternalError("write combined_txn_log_fail")); if (UNLIKELY(logs.txn_logs_size() == 0)) { return Status::InvalidArgument("empty CombinedTxnLogPB"); } diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index b462d5bbe83227..7c50fba458b9ec 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -497,6 +497,7 @@ set(EXEC_FILES ./storage/lake/lake_persistent_index_test.cpp ./storage/lake/replication_txn_manager_test.cpp ./storage/lake/persistent_index_sstable_test.cpp + ./storage/lake/write_combined_txn_log_test.cpp ./block_cache/datacache_utils_test.cpp ./block_cache/block_cache_hit_rate_counter_test.cpp ./util/thrift_rpc_helper_test.cpp diff --git a/be/test/storage/lake/write_combined_txn_log_test.cpp b/be/test/storage/lake/write_combined_txn_log_test.cpp new file mode 100644 index 00000000000000..57854c9feb31b1 --- /dev/null +++ b/be/test/storage/lake/write_combined_txn_log_test.cpp @@ -0,0 +1,54 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/write_combined_txn_log.h" + +#include + +#include + +#include "testutil/assert.h" +#include "util/failpoint/fail_point.h" + +namespace starrocks::lake { + +class WriteCombinedTxnLogTest : public testing::Test { +public: + WriteCombinedTxnLogTest() {} +}; + +TEST_F(WriteCombinedTxnLogTest, test_write_combined_txn_log_parallel) { + std::map txn_log_map; + size_t N = 2; + for (int64_t i = 0; i < N; i++) { + CombinedTxnLogPB combinde_txn_log_pb; + txn_log_map.insert(std::make_pair(i, std::move(combinde_txn_log_pb))); + } + PFailPointTriggerMode trigger_mode; + trigger_mode.set_mode(FailPointTriggerModeType::ENABLE); + auto fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("put_combined_txn_log_success"); + fp->setMode(trigger_mode); + ASSERT_TRUE(write_combined_txn_log(txn_log_map).ok()); + trigger_mode.set_mode(FailPointTriggerModeType::DISABLE); + fp->setMode(trigger_mode); + + trigger_mode.set_mode(FailPointTriggerModeType::ENABLE); + fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("put_combined_txn_log_fail"); + fp->setMode(trigger_mode); + ASSERT_FALSE(write_combined_txn_log(txn_log_map).ok()); + trigger_mode.set_mode(FailPointTriggerModeType::DISABLE); + fp->setMode(trigger_mode); +} + +} // namespace starrocks::lake \ No newline at end of file