Skip to content

Commit

Permalink
Merge branch 'StarRocks:main' into SR-21612
Browse files Browse the repository at this point in the history
  • Loading branch information
Astralidea authored Oct 31, 2023
2 parents b40f779 + ad24cdb commit 0189d50
Show file tree
Hide file tree
Showing 98 changed files with 1,899 additions and 806 deletions.
1 change: 1 addition & 0 deletions be/src/exec/hdfs_scanner_text.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ Status HdfsTextScanner::parse_csv(int chunk_size, ChunkPtr* chunk) {

csv::Converter::Options options;
// Use to custom Hive array format
options.is_hive = true;
options.array_format_type = csv::ArrayFormatType::kHive;
options.array_hive_collection_delimiter = _collection_delimiter;
options.array_hive_mapkey_delimiter = _mapkey_delimiter;
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec
if (query_options.__isset.enable_profile && query_options.enable_profile) {
_query_ctx->set_enable_profile();
}
if (query_options.__isset.big_query_profile_second_threshold) {
_query_ctx->set_big_query_profile_threshold(query_options.big_query_profile_second_threshold);
}
if (query_options.__isset.pipeline_profile_level) {
_query_ctx->set_profile_level(query_options.pipeline_profile_level);
}
Expand Down
14 changes: 13 additions & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,18 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
duration_cast<milliseconds>(steady_clock::now().time_since_epoch() + _query_expire_seconds).count();
}
void set_enable_profile() { _enable_profile = true; }
bool enable_profile() { return _enable_profile; }
bool enable_profile() {
if (_enable_profile) {
return true;
}
if (_big_query_profile_threshold_ns <= 0) {
return false;
}
return MonotonicNanos() - _query_begin_time > _big_query_profile_threshold_ns;
}
void set_big_query_profile_threshold(int64_t big_query_profile_threshold_s) {
_big_query_profile_threshold_ns = 1'000'000'000L * big_query_profile_threshold_s;
}
void set_runtime_profile_report_interval(int64_t runtime_profile_report_interval_s) {
_runtime_profile_report_interval_ns = 1'000'000'000L * runtime_profile_report_interval_s;
}
Expand Down Expand Up @@ -202,6 +213,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
std::once_flag _init_mem_tracker_once;
std::shared_ptr<RuntimeProfile> _profile;
bool _enable_profile = false;
int64_t _big_query_profile_threshold_ns = 0;
int64_t _runtime_profile_report_interval_ns = std::numeric_limits<int64_t>::max();
TPipelineProfileLevel::type _profile_level;
std::shared_ptr<MemTracker> _mem_tracker;
Expand Down
1 change: 1 addition & 0 deletions be/src/formats/csv/converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class Converter {
// TODO: user configurable.
bool bool_alpha = true;

bool is_hive = false;
// Here used to control array parse format.
// Considering Hive array format is different from traditional array format,
// so here we provide some variables to customize array format, and you can
Expand Down
33 changes: 29 additions & 4 deletions be/src/formats/csv/string_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,24 @@ bool StringConverter::read_string(Column* column, const Slice& s, const Options&
max_size = options.type_desc->len;
}

if (config::enable_check_string_lengths &&
((s.size > TypeDescriptor::MAX_VARCHAR_LENGTH) || (max_size > 0 && s.size > max_size))) {
bool length_check_status = true;
// Hive table, not limit string length <= 1mb anymore
if (options.is_hive) {
if (UNLIKELY(max_size > 0 && s.size > max_size)) {
length_check_status = false;
}
} else {
if ((config::enable_check_string_lengths &&
((s.size > TypeDescriptor::MAX_VARCHAR_LENGTH) || (max_size > 0 && s.size > max_size)))) {
length_check_status = false;
}
}
if (!length_check_status) {
VLOG(3) << strings::Substitute("Column [$0]'s length exceed max varchar length. str_size($1), max_size($2)",
column->get_name(), s.size, max_size);
return false;
}

down_cast<BinaryColumn*>(column)->append(s);
return true;
}
Expand Down Expand Up @@ -107,15 +119,28 @@ bool StringConverter::read_quoted_string(Column* column, const Slice& tmp_s, con
max_size = options.type_desc->len;
}
size_t ext_size = new_size - old_size;
if (config::enable_check_string_lengths &&
((ext_size > TypeDescriptor::MAX_VARCHAR_LENGTH) || (max_size > 0 && ext_size > max_size))) {

bool length_check_status = true;
// Hive table, not limit string length <= 1mb anymore
if (options.is_hive) {
if (max_size > 0 && ext_size > max_size) {
length_check_status = false;
}
} else {
if (config::enable_check_string_lengths &&
((ext_size > TypeDescriptor::MAX_VARCHAR_LENGTH) || (max_size > 0 && ext_size > max_size))) {
length_check_status = false;
}
}
if (!length_check_status) {
bytes.resize(old_size);
VLOG(3) << strings::Substitute(
"Column [$0]'s length exceed max varchar length. old_size($1), new_size($2), ext_size($3), "
"max_size($4)",
column->get_name(), old_size, new_size, ext_size, max_size);
return false;
}

offsets.push_back(bytes.size());
return true;
}
Expand Down
13 changes: 12 additions & 1 deletion be/src/formats/csv/varbinary_converter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,19 @@ bool VarBinaryConverter::read_string(Column* column, const Slice& s, const Optio
len = s.size;
}

bool length_check_status = true;
// check if length exceed max varbinary length
if (UNLIKELY((len > TypeDescriptor::MAX_VARCHAR_LENGTH) || (max_size != -1 && len > max_size))) {
if (options.is_hive) {
if (UNLIKELY(max_size != -1 && len > max_size)) {
length_check_status = false;
}
} else {
if (config::enable_check_string_lengths &&
((len > TypeDescriptor::MAX_VARCHAR_LENGTH) || (max_size != -1 && len > max_size))) {
length_check_status = false;
}
}
if (!length_check_status) {
LOG(WARNING) << "Column [" << column->get_name() << "]'s length exceed max varbinary length.";
return false;
}
Expand Down
28 changes: 10 additions & 18 deletions be/src/service/service_be/lake_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,30 +147,21 @@ void LakeServiceImpl::publish_version(::google::protobuf::RpcController* control
return;
}

auto enable_trace = config::lake_enable_publish_version_trace_log;
auto start_ts = butil::gettimeofday_us();
auto thread_pool = publish_version_thread_pool(_env);
auto latch = BThreadCountDownLatch(request->tablet_ids_size());
bthread::Mutex response_mtx;
Trace* trace = nullptr;
scoped_refptr<Trace> trace_gurad;

if (enable_trace) {
trace_gurad = scoped_refptr<Trace>(new Trace());
trace = trace_gurad.get();
TRACE_TO(trace, "got request. txn_id=$0 new_version=$1 #tablets=$2", request->txn_ids(0),
request->new_version(), request->tablet_ids_size());
}
scoped_refptr<Trace> trace_gurad = scoped_refptr<Trace>(new Trace());
Trace* trace = trace_gurad.get();
TRACE_TO(trace, "got request. txn_id=$0 new_version=$1 #tablets=$2", request->txn_ids(0), request->new_version(),
request->tablet_ids_size());

for (auto tablet_id : request->tablet_ids()) {
auto task = [&, tablet_id]() {
DeferOp defer([&] { latch.count_down(); });
Trace* sub_trace = nullptr;
if (enable_trace) {
scoped_refptr<Trace> child_trace(new Trace);
sub_trace = child_trace.get();
trace->AddChildTrace("PublishTablet", sub_trace);
}
scoped_refptr<Trace> child_trace(new Trace);
Trace* sub_trace = child_trace.get();
trace->AddChildTrace("PublishTablet", sub_trace);

ADOPT_TRACE(sub_trace);
TRACE("start publish tablet $0 at thread $1", tablet_id, Thread::current_thread()->tid());
Expand All @@ -182,6 +173,7 @@ void LakeServiceImpl::publish_version(::google::protobuf::RpcController* control
auto commit_time = request->commit_time();
g_publish_tablet_version_queuing_latency << (run_ts - start_ts);

TRACE_COUNTER_INCREMENT("tablet_id", tablet_id);
auto res = lake::publish_version(_tablet_mgr, tablet_id, base_version, new_version, txns, commit_time);
if (res.ok()) {
auto metadata = std::move(res).value();
Expand Down Expand Up @@ -213,11 +205,11 @@ void LakeServiceImpl::publish_version(::google::protobuf::RpcController* control
latch.wait();
auto cost = butil::gettimeofday_us() - start_ts;
auto is_slow = cost >= config::lake_publish_version_slow_log_ms * 1000;
if (enable_trace && is_slow) {
if (config::lake_enable_publish_version_trace_log && is_slow) {
LOG(INFO) << "Published txn " << request->txn_ids(0) << ". cost=" << cost << "us\n" << trace->DumpToString();
} else if (is_slow) {
LOG(INFO) << "Published txn " << request->txn_ids(0) << ". #tablets=" << request->tablet_ids_size()
<< " cost=" << cost << "us";
<< " cost=" << cost << "us, trace: " << trace->MetricsAsJSON();
}
TEST_SYNC_POINT("LakeServiceImpl::publish_version:return");
}
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ add_library(Storage STATIC
lake/meta_file.cpp
lake/lake_primary_index.cpp
lake/vertical_compaction_task.cpp
lake/metacache.cpp
binlog_util.cpp
binlog_file_writer.cpp
binlog_file_reader.cpp
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/compaction_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace starrocks {

CompactionManager::CompactionManager() : _next_task_id(0) {}

CompactionManager::~CompactionManager() {
void CompactionManager::stop() {
_stop.store(true, std::memory_order_release);
if (_scheduler_thread.joinable()) {
_scheduler_thread.join();
Expand Down
4 changes: 3 additions & 1 deletion be/src/storage/compaction_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class CompactionManager {
public:
CompactionManager();

~CompactionManager();
~CompactionManager() = default;

void stop();

void init_max_task_num(int32_t num);

Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ Status DeltaWriter::write(const Chunk& chunk, const uint32_t* indexes, uint32_t
_opt.tablet_id, _replica_state_name(_replica_state)));
}

if (!_mem_table->check_supported_column_partial_update(chunk)) {
if (_tablet->keys_type() == KeysType::PRIMARY_KEYS && !_mem_table->check_supported_column_partial_update(chunk)) {
return Status::InternalError(
fmt::format("can't partial update for column with row. tablet_id: {}", _opt.tablet_id));
}
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/lake_primary_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ static bvar::LatencyRecorder g_load_pk_index_latency("lake_load_pk_index");

Status LakePrimaryIndex::lake_load(Tablet* tablet, const TabletMetadata& metadata, int64_t base_version,
const MetaFileBuilder* builder) {
TRACE_COUNTER_SCOPE_LATENCY_US("primary_index_load_latency_us");
std::lock_guard<std::mutex> lg(_lock);
if (_loaded) {
return _status;
Expand Down
92 changes: 20 additions & 72 deletions be/src/storage/lake/meta_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
#include <memory>

#include "fs/fs_util.h"
#include "gutil/strings/escaping.h"
#include "storage/del_vector.h"
#include "storage/lake/location_provider.h"
#include "storage/lake/metacache.h"
#include "storage/lake/update_manager.h"
#include "storage/protobuf_file.h"
#include "util/coding.h"
#include "util/defer_op.h"
#include "util/raw_container.h"
Expand Down Expand Up @@ -223,7 +224,7 @@ void MetaFileBuilder::_fill_delvec_cache() {
// find delvec ptr by segment id
auto delvec_iter = _segmentid_to_delvec.find(cache_item.second);
if (delvec_iter != _segmentid_to_delvec.end() && delvec_iter->second != nullptr) {
_tablet.tablet_mgr()->cache_delvec(cache_item.first, delvec_iter->second);
_tablet.tablet_mgr()->metacache()->cache_delvec(cache_item.first, delvec_iter->second);
}
}
}
Expand All @@ -236,100 +237,47 @@ void MetaFileBuilder::handle_failure() {
}
}

MetaFileReader::MetaFileReader(const std::string& filepath, bool fill_cache) {
RandomAccessFileOptions opts{.skip_fill_local_cache = !fill_cache};
auto rf = fs::new_random_access_file(opts, filepath);
if (rf.ok()) {
_access_file = std::move(*rf);
} else {
_err_status = rf.status();
}
_tablet_meta = std::make_unique<TabletMetadata>();
_load = false;
}

Status MetaFileReader::load() {
if (_access_file == nullptr) return _err_status;

ASSIGN_OR_RETURN(auto file_size, _access_file->get_size());
if (file_size <= 4) {
return Status::Corruption(
fmt::format("meta file {} is corrupt, invalid file size {}", _access_file->filename(), file_size));
}
std::string metadata_str;
raw::stl_string_resize_uninitialized(&metadata_str, file_size);
RETURN_IF_ERROR(_access_file->read_at_fully(0, metadata_str.data(), file_size));
bool parsed = _tablet_meta->ParseFromArray(metadata_str.data(), static_cast<int>(file_size));
if (!parsed) {
return Status::Corruption(fmt::format("failed to parse tablet meta {}", _access_file->filename()));
}
_load = true;
TRACE("end load tablet metadata");
return Status::OK();
}

Status MetaFileReader::load_by_cache(const std::string& filepath, TabletManager* tablet_mgr) {
// 1. lookup meta cache first
if (auto ptr = tablet_mgr->lookup_tablet_metadata(filepath); ptr != nullptr) {
_tablet_meta = ptr;
_load = true;
return Status::OK();
} else {
// 2. load directly
return load();
}
}

Status MetaFileReader::get_del_vec(TabletManager* tablet_mgr, uint32_t segment_id, DelVector* delvec) {
if (_access_file == nullptr) return _err_status;
if (!_load) return Status::InternalError("meta file reader not loaded");
Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, uint32_t segment_id, DelVector* delvec) {
// find delvec by segment id
auto iter = _tablet_meta->delvec_meta().delvecs().find(segment_id);
if (iter != _tablet_meta->delvec_meta().delvecs().end()) {
VLOG(2) << fmt::format("MetaFileReader get_del_vec {} segid {}", _tablet_meta->delvec_meta().ShortDebugString(),
segment_id);
auto iter = metadata.delvec_meta().delvecs().find(segment_id);
if (iter != metadata.delvec_meta().delvecs().end()) {
VLOG(2) << fmt::format("get_del_vec {} segid {}", metadata.delvec_meta().ShortDebugString(), segment_id);
std::string buf;
raw::stl_string_resize_uninitialized(&buf, iter->second.size());
// find in cache
std::string cache_key = delvec_cache_key(_tablet_meta->id(), iter->second);
DelVectorPtr delvec_cache_ptr = tablet_mgr->lookup_delvec(cache_key);
if (delvec_cache_ptr != nullptr) {
delvec->copy_from(*delvec_cache_ptr);
std::string cache_key = delvec_cache_key(metadata.id(), iter->second);
auto cached_delvec = tablet_mgr->metacache()->lookup_delvec(cache_key);
if (cached_delvec != nullptr) {
delvec->copy_from(*cached_delvec);
return Status::OK();
}

// lookup delvec file name and then read it
auto iter2 = _tablet_meta->delvec_meta().version_to_file().find(iter->second.version());
if (iter2 == _tablet_meta->delvec_meta().version_to_file().end()) {
LOG(ERROR) << "Can't find delvec file name for tablet: " << _tablet_meta->id()
auto iter2 = metadata.delvec_meta().version_to_file().find(iter->second.version());
if (iter2 == metadata.delvec_meta().version_to_file().end()) {
LOG(ERROR) << "Can't find delvec file name for tablet: " << metadata.id()
<< ", version: " << iter->second.version();
return Status::InternalError("Can't find delvec file name");
}
const auto& delvec_name = iter2->second.name();
RandomAccessFileOptions opts{.skip_fill_local_cache = true};
ASSIGN_OR_RETURN(auto rf, fs::new_random_access_file(
opts, tablet_mgr->delvec_location(_tablet_meta->id(), delvec_name)));
ASSIGN_OR_RETURN(auto rf,
fs::new_random_access_file(opts, tablet_mgr->delvec_location(metadata.id(), delvec_name)));
RETURN_IF_ERROR(rf->read_at_fully(iter->second.offset(), buf.data(), iter->second.size()));
// parse delvec
RETURN_IF_ERROR(delvec->load(iter->second.version(), buf.data(), iter->second.size()));
// put in cache
delvec_cache_ptr = std::make_shared<DelVector>();
auto delvec_cache_ptr = std::make_shared<DelVector>();
delvec_cache_ptr->copy_from(*delvec);
tablet_mgr->cache_delvec(cache_key, delvec_cache_ptr);
tablet_mgr->metacache()->cache_delvec(cache_key, delvec_cache_ptr);
TRACE("end load delvec");
return Status::OK();
}
VLOG(2) << fmt::format("MetaFileReader get_del_vec not found, segmentid {} tablet_meta {}", segment_id,
_tablet_meta->delvec_meta().ShortDebugString());
VLOG(2) << fmt::format("get_del_vec not found, segmentid {} tablet_meta {}", segment_id,
metadata.delvec_meta().ShortDebugString());
return Status::OK();
}

StatusOr<TabletMetadataPtr> MetaFileReader::get_meta() {
if (_access_file == nullptr) return _err_status;
if (!_load) return Status::InternalError("meta file reader not loaded");
return std::move(_tablet_meta);
}

bool is_primary_key(TabletMetadata* metadata) {
return metadata->schema().keys_type() == KeysType::PRIMARY_KEYS;
}
Expand Down
Loading

0 comments on commit 0189d50

Please sign in to comment.