Skip to content

Commit

Permalink
write combined txn log parallel
Browse files Browse the repository at this point in the history
Signed-off-by: sevev <[email protected]>
  • Loading branch information
sevev committed Jan 10, 2025
1 parent 67cdf03 commit 851149e
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 0 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1536,4 +1536,7 @@ CONF_mBool(avro_ignore_union_type_tag, "false");
// default batch size for simdjson lib
CONF_mInt32(json_parse_many_batch_size, "1000000");
CONF_mBool(enable_dynamic_batch_size_for_json_parse_many, "true");
CONF_mInt32(put_combined_txn_log_thread_pool_num_max, "64");
CONF_mBool(enable_put_combinded_txn_log_parallel, "false");
CONF_mBool(output_combined_txn_log, "false");
} // namespace starrocks::config
12 changes: 12 additions & 0 deletions be/src/exec/tablet_sink_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,10 +370,22 @@ bool TabletSinkSender::get_immutable_partition_ids(std::set<int64_t>* partition_
}

Status TabletSinkSender::_write_combined_txn_log() {
int64_t t_start = MonotonicMillis();
if (config::enable_put_combinded_txn_log_parallel) {
Status st = write_combined_txn_log(_txn_log_map);
if (config::output_combined_txn_log) {
LOG(INFO) << "write_combined_txn_log parallel cost time: " << MonotonicMillis() - t_start;
}
return st;
}

for (const auto& [partition_id, logs] : _txn_log_map) {
(void)partition_id;
RETURN_IF_ERROR(write_combined_txn_log(logs));
}
if (config::output_combined_txn_log) {
LOG(INFO) << "write_combined_txn_log non-parallel cost time: " << MonotonicMillis() - t_start;
}
return Status::OK();
}

Expand Down
53 changes: 53 additions & 0 deletions be/src/exec/write_combined_txn_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,67 @@

#include "exec/write_combined_txn_log.h"

#include <future>
#include <vector>

#include "runtime/exec_env.h"
#include "storage/lake/tablet_manager.h"

namespace starrocks {

class TxnLogTask : public Runnable {
public:
TxnLogTask(const CombinedTxnLogPB& logs, lake::TabletManager* tablet_mgr, std::promise<Status> promise)
: _logs(logs), _tablet_mgr(tablet_mgr), _promise(std::move(promise)) {}

void run() override {
try {
Status status = _tablet_mgr->put_combined_txn_log(_logs);
if (!status.ok()) {
throw std::runtime_error("Log write failed");
}
_promise.set_value(Status::OK());
} catch (const std::exception& e) {
_promise.set_value(Status::IOError(e.what()));
}
}

private:
CombinedTxnLogPB _logs;
lake::TabletManager* _tablet_mgr;
std::promise<Status> _promise;
};

Status write_combined_txn_log(const CombinedTxnLogPB& logs) {
auto tablet_mgr = ExecEnv::GetInstance()->lake_tablet_manager();
return tablet_mgr->put_combined_txn_log(logs);
}

Status write_combined_txn_log(const std::map<int64_t, CombinedTxnLogPB>& txn_log_map) {
std::vector<std::future<Status>> futures;
std::vector<std::shared_ptr<Runnable>> tasks;

for (const auto& [partition_id, logs] : txn_log_map) {
(void)partition_id;
std::promise<Status> promise;
futures.push_back(promise.get_future());
std::shared_ptr<Runnable> r(
std::make_shared<TxnLogTask>(logs, ExecEnv::GetInstance()->lake_tablet_manager(), std::move(promise)));
tasks.emplace_back(std::move(r));
}

for (auto& task : tasks) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->put_combined_txn_log_thread_pool()->submit(task));
}

for (auto& future : futures) {
Status status = future.get();
if (!status.ok()) {
return status;
}
}

return Status::OK();
}

} // namespace starrocks
1 change: 1 addition & 0 deletions be/src/exec/write_combined_txn_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ namespace starrocks {
class CombinedTxnLogPB;

Status write_combined_txn_log(const CombinedTxnLogPB& logs);
Status write_combined_txn_log(const std::map<int64_t, CombinedTxnLogPB>& txn_log_map);

} // namespace starrocks
9 changes: 9 additions & 0 deletions be/src/runtime/exec_env.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {

std::unique_ptr<ThreadPool> load_rowset_pool;
std::unique_ptr<ThreadPool> load_segment_pool;
std::unique_ptr<ThreadPool> put_combined_txn_log_thread_pool;
RETURN_IF_ERROR(
ThreadPoolBuilder("load_rowset_pool")
.set_min_threads(0)
Expand All @@ -501,6 +502,14 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {
_load_segment_thread_pool = load_segment_pool.release();

_broker_mgr = new BrokerMgr(this);

RETURN_IF_ERROR(ThreadPoolBuilder("put_combined_txn_log_thread_pool")
.set_min_threads(0)
.set_max_threads(config::put_combined_txn_log_thread_pool_num_max)
.set_idle_timeout(MonoDelta::FromMilliseconds(500))
.build(&put_combined_txn_log_thread_pool));
_put_combined_txn_log_thread_pool = put_combined_txn_log_thread_pool.release();

#ifndef BE_TEST
_bfd_parser = BfdParser::create();
#endif
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ class ExecEnv {
ThreadPool* streaming_load_thread_pool() { return _streaming_load_thread_pool; }
ThreadPool* load_rowset_thread_pool() { return _load_rowset_thread_pool; }
ThreadPool* load_segment_thread_pool() { return _load_segment_thread_pool; }
ThreadPool* put_combined_txn_log_thread_pool() { return _put_combined_txn_log_thread_pool; }

pipeline::DriverExecutor* wg_driver_executor();
workgroup::ScanExecutor* scan_executor();
Expand Down Expand Up @@ -369,6 +370,7 @@ class ExecEnv {

ThreadPool* _load_segment_thread_pool = nullptr;
ThreadPool* _load_rowset_thread_pool = nullptr;
ThreadPool* _put_combined_txn_log_thread_pool = nullptr;

PriorityThreadPool* _udf_call_pool = nullptr;
PriorityThreadPool* _pipeline_prepare_pool = nullptr;
Expand Down

0 comments on commit 851149e

Please sign in to comment.