From 851149e6cbb365d98582020da81935e1ffecbb0b Mon Sep 17 00:00:00 2001 From: sevev Date: Fri, 10 Jan 2025 15:55:27 +0800 Subject: [PATCH] write combined txn log parallel Signed-off-by: sevev --- be/src/common/config.h | 3 ++ be/src/exec/tablet_sink_sender.cpp | 12 ++++++ be/src/exec/write_combined_txn_log.cpp | 53 ++++++++++++++++++++++++++ be/src/exec/write_combined_txn_log.h | 1 + be/src/runtime/exec_env.cpp | 9 +++++ be/src/runtime/exec_env.h | 2 + 6 files changed, 80 insertions(+) diff --git a/be/src/common/config.h b/be/src/common/config.h index 6ae9b972b1c44f..727fc471ddbd71 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1536,4 +1536,7 @@ CONF_mBool(avro_ignore_union_type_tag, "false"); // default batch size for simdjson lib CONF_mInt32(json_parse_many_batch_size, "1000000"); CONF_mBool(enable_dynamic_batch_size_for_json_parse_many, "true"); +CONF_mInt32(put_combined_txn_log_thread_pool_num_max, "64"); +CONF_mBool(enable_put_combinded_txn_log_parallel, "false"); +CONF_mBool(output_combined_txn_log, "false"); } // namespace starrocks::config diff --git a/be/src/exec/tablet_sink_sender.cpp b/be/src/exec/tablet_sink_sender.cpp index ff7316d1cff1da..1981209d8dc776 100644 --- a/be/src/exec/tablet_sink_sender.cpp +++ b/be/src/exec/tablet_sink_sender.cpp @@ -370,10 +370,22 @@ bool TabletSinkSender::get_immutable_partition_ids(std::set* partition_ } Status TabletSinkSender::_write_combined_txn_log() { + int64_t t_start = MonotonicMillis(); + if (config::enable_put_combinded_txn_log_parallel) { + Status st = write_combined_txn_log(_txn_log_map); + if (config::output_combined_txn_log) { + LOG(INFO) << "write_combined_txn_log parallel cost time: " << MonotonicMillis() - t_start; + } + return st; + } + for (const auto& [partition_id, logs] : _txn_log_map) { (void)partition_id; RETURN_IF_ERROR(write_combined_txn_log(logs)); } + if (config::output_combined_txn_log) { + LOG(INFO) << "write_combined_txn_log non-parallel cost time: " << MonotonicMillis() - t_start; + } return Status::OK(); } diff --git a/be/src/exec/write_combined_txn_log.cpp b/be/src/exec/write_combined_txn_log.cpp index 3d3c94014476e2..e678e9ee967876 100644 --- a/be/src/exec/write_combined_txn_log.cpp +++ b/be/src/exec/write_combined_txn_log.cpp @@ -14,14 +14,67 @@ #include "exec/write_combined_txn_log.h" +#include +#include + #include "runtime/exec_env.h" #include "storage/lake/tablet_manager.h" namespace starrocks { +class TxnLogTask : public Runnable { +public: + TxnLogTask(const CombinedTxnLogPB& logs, lake::TabletManager* tablet_mgr, std::promise promise) + : _logs(logs), _tablet_mgr(tablet_mgr), _promise(std::move(promise)) {} + + void run() override { + try { + Status status = _tablet_mgr->put_combined_txn_log(_logs); + if (!status.ok()) { + throw std::runtime_error("Log write failed"); + } + _promise.set_value(Status::OK()); + } catch (const std::exception& e) { + _promise.set_value(Status::IOError(e.what())); + } + } + +private: + CombinedTxnLogPB _logs; + lake::TabletManager* _tablet_mgr; + std::promise _promise; +}; + Status write_combined_txn_log(const CombinedTxnLogPB& logs) { auto tablet_mgr = ExecEnv::GetInstance()->lake_tablet_manager(); return tablet_mgr->put_combined_txn_log(logs); } +Status write_combined_txn_log(const std::map& txn_log_map) { + std::vector> futures; + std::vector> tasks; + + for (const auto& [partition_id, logs] : txn_log_map) { + (void)partition_id; + std::promise promise; + futures.push_back(promise.get_future()); + std::shared_ptr r( + std::make_shared(logs, ExecEnv::GetInstance()->lake_tablet_manager(), std::move(promise))); + tasks.emplace_back(std::move(r)); + } + + for (auto& task : tasks) { + RETURN_IF_ERROR(ExecEnv::GetInstance()->put_combined_txn_log_thread_pool()->submit(task)); + } + + for (auto& future : futures) { + Status status = future.get(); + if (!status.ok()) { + return status; + } + } + + return Status::OK(); +} + } // namespace starrocks \ No newline at end of file diff --git a/be/src/exec/write_combined_txn_log.h b/be/src/exec/write_combined_txn_log.h index 99fec2b812ece9..d356c381fc500c 100644 --- a/be/src/exec/write_combined_txn_log.h +++ b/be/src/exec/write_combined_txn_log.h @@ -20,5 +20,6 @@ namespace starrocks { class CombinedTxnLogPB; Status write_combined_txn_log(const CombinedTxnLogPB& logs); +Status write_combined_txn_log(const std::map& txn_log_map); } // namespace starrocks diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index c0a4dc1269023e..07d66a9bd2c03b 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -482,6 +482,7 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { std::unique_ptr load_rowset_pool; std::unique_ptr load_segment_pool; + std::unique_ptr put_combined_txn_log_thread_pool; RETURN_IF_ERROR( ThreadPoolBuilder("load_rowset_pool") .set_min_threads(0) @@ -501,6 +502,14 @@ Status ExecEnv::init(const std::vector& store_paths, bool as_cn) { _load_segment_thread_pool = load_segment_pool.release(); _broker_mgr = new BrokerMgr(this); + + RETURN_IF_ERROR(ThreadPoolBuilder("put_combined_txn_log_thread_pool") + .set_min_threads(0) + .set_max_threads(config::put_combined_txn_log_thread_pool_num_max) + .set_idle_timeout(MonoDelta::FromMilliseconds(500)) + .build(&put_combined_txn_log_thread_pool)); + _put_combined_txn_log_thread_pool = put_combined_txn_log_thread_pool.release(); + #ifndef BE_TEST _bfd_parser = BfdParser::create(); #endif diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 5cc0360f3f6c12..209419d439fdad 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -278,6 +278,7 @@ class ExecEnv { ThreadPool* streaming_load_thread_pool() { return _streaming_load_thread_pool; } ThreadPool* load_rowset_thread_pool() { return _load_rowset_thread_pool; } ThreadPool* load_segment_thread_pool() { return _load_segment_thread_pool; } + ThreadPool* put_combined_txn_log_thread_pool() { return _put_combined_txn_log_thread_pool; } pipeline::DriverExecutor* wg_driver_executor(); workgroup::ScanExecutor* scan_executor(); @@ -369,6 +370,7 @@ class ExecEnv { ThreadPool* _load_segment_thread_pool = nullptr; ThreadPool* _load_rowset_thread_pool = nullptr; + ThreadPool* _put_combined_txn_log_thread_pool = nullptr; PriorityThreadPool* _udf_call_pool = nullptr; PriorityThreadPool* _pipeline_prepare_pool = nullptr;