diff --git a/be/src/common/config.h b/be/src/common/config.h index 6ae9b972b1c44..9a2570f806629 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1536,4 +1536,6 @@ CONF_mBool(avro_ignore_union_type_tag, "false"); // default batch size for simdjson lib CONF_mInt32(json_parse_many_batch_size, "1000000"); CONF_mBool(enable_dynamic_batch_size_for_json_parse_many, "true"); +CONF_mInt32(put_combined_txn_log_thread_pool_num_max, "64"); +CONF_mBool(enable_put_combinded_txn_log_parallel, "false"); } // namespace starrocks::config diff --git a/be/src/exec/tablet_sink_sender.cpp b/be/src/exec/tablet_sink_sender.cpp index ff7316d1cff1d..f7c66b073d545 100644 --- a/be/src/exec/tablet_sink_sender.cpp +++ b/be/src/exec/tablet_sink_sender.cpp @@ -370,6 +370,10 @@ bool TabletSinkSender::get_immutable_partition_ids(std::set* partition_ } Status TabletSinkSender::_write_combined_txn_log() { + if (config::enable_put_combinded_txn_log_parallel) { + return write_combined_txn_log(_txn_log_map); + } + for (const auto& [partition_id, logs] : _txn_log_map) { (void)partition_id; RETURN_IF_ERROR(write_combined_txn_log(logs)); diff --git a/be/src/exec/write_combined_txn_log.cpp b/be/src/exec/write_combined_txn_log.cpp index 3d3c94014476e..3d3421ab8203d 100644 --- a/be/src/exec/write_combined_txn_log.cpp +++ b/be/src/exec/write_combined_txn_log.cpp @@ -14,14 +14,68 @@ #include "exec/write_combined_txn_log.h" +#include +#include + #include "runtime/exec_env.h" #include "storage/lake/tablet_manager.h" namespace starrocks { +class TxnLogTask : public Runnable { +public: + TxnLogTask(const CombinedTxnLogPB* logs, lake::TabletManager* tablet_mgr, std::promise promise) + : _logs(logs), _tablet_mgr(tablet_mgr), _promise(std::move(promise)) {} + + void run() override { + try { + Status status = _tablet_mgr->put_combined_txn_log(*_logs); + if (!status.ok()) { + throw std::runtime_error("Log write failed"); + } + _promise.set_value(Status::OK()); + } catch (const std::exception& e) { + _promise.set_value(Status::IOError(e.what())); + } + } + +private: + const CombinedTxnLogPB* _logs; + lake::TabletManager* _tablet_mgr; + std::promise _promise; +}; + Status write_combined_txn_log(const CombinedTxnLogPB& logs) { auto tablet_mgr = ExecEnv::GetInstance()->lake_tablet_manager(); return tablet_mgr->put_combined_txn_log(logs); } +Status write_combined_txn_log(const std::map& txn_log_map) { + std::vector> futures; + std::vector> tasks; + + for (const auto& [partition_id, logs] : txn_log_map) { + (void)partition_id; + std::promise promise; + futures.push_back(promise.get_future()); + std::shared_ptr r( + std::make_shared(&logs, ExecEnv::GetInstance()->lake_tablet_manager(), std::move(promise))); + tasks.emplace_back(std::move(r)); + } + + for (auto& task : tasks) { + RETURN_IF_ERROR(ExecEnv::GetInstance()->put_combined_txn_log_thread_pool()->submit(task)); + } + + Status st; + for (auto& future : futures) { + Status status = future.get(); + if (st.ok() && !status.ok()) { + st = status; + } + } + + return st; +} + } // namespace starrocks \ No newline at end of file diff --git a/be/src/exec/write_combined_txn_log.h b/be/src/exec/write_combined_txn_log.h index 99fec2b812ece..d356c381fc500 100644 --- a/be/src/exec/write_combined_txn_log.h +++ b/be/src/exec/write_combined_txn_log.h @@ -20,5 +20,6 @@ namespace starrocks { class CombinedTxnLogPB; Status write_combined_txn_log(const CombinedTxnLogPB& logs); +Status write_combined_txn_log(const std::map& txn_log_map); } // namespace starrocks diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index c0a4dc1269023..5828677b2aa81 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -482,6 +482,7 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { std::unique_ptr load_rowset_pool; std::unique_ptr load_segment_pool; + std::unique_ptr put_combined_txn_log_thread_pool; RETURN_IF_ERROR( ThreadPoolBuilder("load_rowset_pool") .set_min_threads(0) @@ -501,6 +502,14 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { _load_segment_thread_pool = load_segment_pool.release(); _broker_mgr = new BrokerMgr(this); + + RETURN_IF_ERROR(ThreadPoolBuilder("put_combined_txn_log_thread_pool") + .set_min_threads(0) + .set_max_threads(config::put_combined_txn_log_thread_pool_num_max) + .set_idle_timeout(MonoDelta::FromMilliseconds(500)) + .build(&put_combined_txn_log_thread_pool)); + _put_combined_txn_log_thread_pool = put_combined_txn_log_thread_pool.release(); + #ifndef BE_TEST _bfd_parser = BfdParser::create(); #endif @@ -750,6 +759,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_lake_update_manager); SAFE_DELETE(_lake_replication_txn_manager); SAFE_DELETE(_cache_mgr); + SAFE_DELETE(_put_combined_txn_log_thread_pool); _dictionary_cache_pool.reset(); _automatic_partition_pool.reset(); _metrics = nullptr; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 5cc0360f3f6c1..209419d439fda 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -278,6 +278,7 @@ class ExecEnv { ThreadPool* streaming_load_thread_pool() { return _streaming_load_thread_pool; } ThreadPool* load_rowset_thread_pool() { return _load_rowset_thread_pool; } ThreadPool* load_segment_thread_pool() { return _load_segment_thread_pool; } + ThreadPool* put_combined_txn_log_thread_pool() { return _put_combined_txn_log_thread_pool; } pipeline::DriverExecutor* wg_driver_executor(); workgroup::ScanExecutor* scan_executor(); @@ -369,6 +370,7 @@ class ExecEnv { ThreadPool* _load_segment_thread_pool = nullptr; ThreadPool* _load_rowset_thread_pool = nullptr; + ThreadPool* _put_combined_txn_log_thread_pool = nullptr; PriorityThreadPool* _udf_call_pool = nullptr; PriorityThreadPool* _pipeline_prepare_pool = nullptr; diff --git a/be/src/storage/lake/tablet_manager.cpp b/be/src/storage/lake/tablet_manager.cpp index e1f4c4cfcd218..584158e81a91a 100644 --- a/be/src/storage/lake/tablet_manager.cpp +++ b/be/src/storage/lake/tablet_manager.cpp @@ -48,6 +48,7 @@ #include "storage/rowset/segment.h" #include "storage/tablet_schema_map.h" #include "testutil/sync_point.h" +#include "util/failpoint/fail_point.h" #include "util/raw_container.h" #include "util/trace.h" @@ -458,7 +459,11 @@ Status TabletManager::put_txn_vlog(const TxnLogPtr& log, int64_t version) { return put_txn_log(log, txn_vlog_location(log->tablet_id(), version)); } +DEFINE_FAIL_POINT(put_combined_txn_log_success); +DEFINE_FAIL_POINT(put_combined_txn_log_fail); Status TabletManager::put_combined_txn_log(const starrocks::CombinedTxnLogPB& logs) { + FAIL_POINT_TRIGGER_RETURN(put_combined_txn_log_success, Status::OK()); + FAIL_POINT_TRIGGER_RETURN(put_combined_txn_log_fail, Status::InternalError("write combined_txn_log_fail")); if (UNLIKELY(logs.txn_logs_size() == 0)) { return Status::InvalidArgument("empty CombinedTxnLogPB"); } diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index b462d5bbe8322..7c50fba458b9e 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -497,6 +497,7 @@ set(EXEC_FILES ./storage/lake/lake_persistent_index_test.cpp ./storage/lake/replication_txn_manager_test.cpp ./storage/lake/persistent_index_sstable_test.cpp + ./storage/lake/write_combined_txn_log_test.cpp ./block_cache/datacache_utils_test.cpp ./block_cache/block_cache_hit_rate_counter_test.cpp ./util/thrift_rpc_helper_test.cpp diff --git a/be/test/storage/lake/write_combined_txn_log_test.cpp b/be/test/storage/lake/write_combined_txn_log_test.cpp new file mode 100644 index 0000000000000..57854c9feb31b --- /dev/null +++ b/be/test/storage/lake/write_combined_txn_log_test.cpp @@ -0,0 +1,54 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "exec/write_combined_txn_log.h" + +#include + +#include + +#include "testutil/assert.h" +#include "util/failpoint/fail_point.h" + +namespace starrocks::lake { + +class WriteCombinedTxnLogTest : public testing::Test { +public: + WriteCombinedTxnLogTest() {} +}; + +TEST_F(WriteCombinedTxnLogTest, test_write_combined_txn_log_parallel) { + std::map txn_log_map; + size_t N = 2; + for (int64_t i = 0; i < N; i++) { + CombinedTxnLogPB combinde_txn_log_pb; + txn_log_map.insert(std::make_pair(i, std::move(combinde_txn_log_pb))); + } + PFailPointTriggerMode trigger_mode; + trigger_mode.set_mode(FailPointTriggerModeType::ENABLE); + auto fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("put_combined_txn_log_success"); + fp->setMode(trigger_mode); + ASSERT_TRUE(write_combined_txn_log(txn_log_map).ok()); + trigger_mode.set_mode(FailPointTriggerModeType::DISABLE); + fp->setMode(trigger_mode); + + trigger_mode.set_mode(FailPointTriggerModeType::ENABLE); + fp = starrocks::failpoint::FailPointRegistry::GetInstance()->get("put_combined_txn_log_fail"); + fp->setMode(trigger_mode); + ASSERT_FALSE(write_combined_txn_log(txn_log_map).ok()); + trigger_mode.set_mode(FailPointTriggerModeType::DISABLE); + fp->setMode(trigger_mode); +} + +} // namespace starrocks::lake \ No newline at end of file