From 72f0c33b994f1d949afd391216389b85f4e3904e Mon Sep 17 00:00:00 2001 From: Binglin Chang Date: Tue, 31 Oct 2023 17:34:35 +0800 Subject: [PATCH 1/2] [BugFix] Add more check and info when compaction find row size inconsistency (#33972) Add more debug&verify log to address #33971 Signed-off-by: Binglin Chang (cherry picked from commit 5322894e53aa214d9419b065cac4f8b107c414d5) --- be/src/storage/tablet_updates.cpp | 44 +++++++++++++++++++++++++++++-- be/src/storage/tablet_updates.h | 2 ++ be/src/util/pretty_printer.h | 26 ++++++++++++++++++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/be/src/storage/tablet_updates.cpp b/be/src/storage/tablet_updates.cpp index ce1831308d666..dab2e710a38be 100644 --- a/be/src/storage/tablet_updates.cpp +++ b/be/src/storage/tablet_updates.cpp @@ -1762,12 +1762,46 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info << Substitute("($0/$1/$2)", t_load - t_start, t_index_delvec - t_load, t_write - t_index_delvec); VLOG(1) << "update compaction apply " << _debug_string(true, true); if (row_before != row_after) { - string msg = Substitute("actual row size changed after compaction $0 -> $1", row_before, row_after); + string msg = strings::Substitute( + "actual row size changed after compaction $0 -> $1 inputs:$2 output:$3 max_rowset_id:$4 " + "max_src_rssid:$5 $6", + row_before, row_after, PrettyPrinter::print_unique_int_list_range(info->inputs), rowset_id, + max_rowset_id, max_src_rssid, _debug_compaction_stats(info->inputs, rowset_id)); LOG(ERROR) << msg << debug_string(); _set_error(msg + _debug_version_info(true)); } } +std::string TabletUpdates::_debug_compaction_stats(const std::vector& input_rowsets, + const uint32_t output_rowset) { + std::stringstream ss; + std::lock_guard lg(_rowset_stats_lock); + ss << "inputs:"; + for (auto rowset_id : input_rowsets) { + auto iter = _rowset_stats.find(rowset_id); + if (iter == _rowset_stats.end()) { + ss << rowset_id << ":" + << "NA"; + } else { + ss << rowset_id << ":" << iter->second->num_dels << "/" << iter->second->num_rows; + } + ss << " "; + } + ss << "output:"; + auto iter = _rowset_stats.find(output_rowset); + if (iter == _rowset_stats.end()) { + ss << output_rowset << ":" + << "NA"; + } else { + ss << output_rowset << ":" << iter->second->num_dels << "/" << iter->second->num_rows; + } + auto rs = _get_rowset(output_rowset); + if (rs) { + ss << " " << rs->unique_id(); + } + return ss.str(); +} + void TabletUpdates::to_updates_pb(TabletUpdatesPB* updates_pb) const { std::lock_guard rl(_lock); _to_updates_pb_unlocked(updates_pb); @@ -3557,7 +3591,9 @@ Status TabletUpdates::load_snapshot(const SnapshotMeta& snapshot_meta, bool rest return Status::Cancelled("snapshot version too small"); } + std::stringstream ss; uint32_t new_next_rowset_id = _next_rowset_id; + ss << "next_rowset_id before:" << _next_rowset_id << " rowsets:"; for (const auto& rowset_meta_pb : snapshot_meta.rowset_metas()) { auto rowset_meta = std::make_shared(rowset_meta_pb); const auto new_id = rowset_meta_pb.rowset_seg_id() + _next_rowset_id; @@ -3567,6 +3603,7 @@ Status TabletUpdates::load_snapshot(const SnapshotMeta& snapshot_meta, bool rest RowsetSharedPtr* rowset = &new_rowsets[new_id]; RETURN_IF_ERROR(RowsetFactory::create_rowset(&_tablet.tablet_schema(), _tablet.schema_hash_path(), rowset_meta, rowset)); + ss << new_id << ","; VLOG(2) << "add a new rowset " << tablet_id << "@" << new_id << "@" << rowset_meta->rowset_id(); } @@ -3590,11 +3627,13 @@ Status TabletUpdates::load_snapshot(const SnapshotMeta& snapshot_meta, bool rest } DCHECK_EQ(1, _edit_version_infos.size()); + ss << " delvec:"; WriteBatch wb; CHECK_FAIL(TabletMetaManager::clear_log(data_store, &wb, tablet_id)); for (const auto& [rssid, delvec] : snapshot_meta.delete_vectors()) { auto id = rssid + _next_rowset_id; CHECK_FAIL(TabletMetaManager::put_del_vector(data_store, &wb, tablet_id, id, delvec)); + ss << id << ","; } for (const auto& [rid, rowset] : _rowsets) { RowsetMetaPB meta_pb = rowset->rowset_meta()->to_rowset_pb(); @@ -3602,6 +3641,7 @@ Status TabletUpdates::load_snapshot(const SnapshotMeta& snapshot_meta, bool rest } _next_rowset_id = new_next_rowset_id; + ss << " next_rowset_id after:" << _next_rowset_id; _to_updates_pb_unlocked(new_tablet_meta_pb.mutable_updates()); VLOG(2) << new_tablet_meta_pb.updates().DebugString(); @@ -3646,7 +3686,7 @@ Status TabletUpdates::load_snapshot(const SnapshotMeta& snapshot_meta, bool rest index_entry->value().unload(); index_cache.release(index_entry); - LOG(INFO) << "load full snapshot done " << _debug_string(false); + LOG(INFO) << "load full snapshot done " << _debug_string(false) << ss.str(); return Status::OK(); } else { diff --git a/be/src/storage/tablet_updates.h b/be/src/storage/tablet_updates.h index 55ef4f723aa40..993b347c32e61 100644 --- a/be/src/storage/tablet_updates.h +++ b/be/src/storage/tablet_updates.h @@ -364,6 +364,8 @@ class TabletUpdates { std::string _debug_version_info(bool lock) const; + std::string _debug_compaction_stats(const std::vector& input_rowsets, const uint32_t output_rowset); + void _print_rowsets(std::vector& rowsets, std::string* dst, bool abbr) const; void _set_error(const string& msg); diff --git a/be/src/util/pretty_printer.h b/be/src/util/pretty_printer.h index 959639994c2c4..e4216555b7580 100644 --- a/be/src/util/pretty_printer.h +++ b/be/src/util/pretty_printer.h @@ -176,6 +176,32 @@ class PrettyPrinter { /// Convenience method static std::string print_bytes(int64_t value) { return PrettyPrinter::print(value, TUnit::BYTES); } + // convert a vector of int to a string, consecutive numbers are printed as a range + // integers in input must be sorted & unique + template + static std::string print_unique_int_list_range(const std::vector& vs) { + std::stringstream ss; + if (vs.size() > 0) { + size_t start = 0; + for (size_t i = 1; i < vs.size(); ++i) { + if (vs[i] != vs[i - 1] + 1) { + if (start == i - 1) { + ss << vs[start] << ","; + } else { + ss << vs[start] << "-" << vs[i - 1] << ","; + } + start = i; + } + } + if (start == vs.size() - 1) { + ss << vs[start]; + } else { + ss << vs[start] << "-" << vs[vs.size() - 1]; + } + } + return ss.str(); + } + private: static const int PRECISION = 2; static const int TIME_NS_PRECISION = 3; From 908cfc3202c129d06ae190c2576227e9c3921f98 Mon Sep 17 00:00:00 2001 From: Binglin Chang Date: Wed, 28 Jun 2023 11:53:04 +0800 Subject: [PATCH 2/2] [Enhancement] Add function to check compaction correctness (#25513) Fixes #25337 Recently, we encountered some data corruption bugs after compaction/schema-change, especially for the sort key enabled table. It's better to add a function to check compaction/schema-change correctness, so as to notice bugs earlier. This PR adds a function to check rowset correctness(currently checks each segment file's key/order key is ordered, and unique if it's pk table and data is ordered by pk). There are 2 ways to invoke/enable rowset verify 1. BE config `enable_rowset_verify`, if enabled, compaction and schema_change creating a new rowset will do a verification after rowset is generated. 2. bind `Tablet::verify` method to script engine, so user can verify each tablet manually, or verify all the tablets of a table in a loop using script. (cherry picked from commit dba492c615c7d6cf04d07b7591092e38f7d47ed9) Signed-off-by: Binglin Chang --- be/src/common/config.h | 3 + be/src/script/script.cpp | 1 + be/src/storage/horizontal_compaction_task.cpp | 4 + be/src/storage/rowset/rowset.cpp | 114 ++++++++++++++++++ be/src/storage/rowset/rowset.h | 2 + be/src/storage/tablet.cpp | 19 +++ be/src/storage/tablet.h | 3 + be/src/storage/tablet_updates.cpp | 29 +++++ be/src/storage/vertical_compaction_task.cpp | 4 + be/test/storage/tablet_updates_test.cpp | 1 + 10 files changed, 180 insertions(+) diff --git a/be/src/common/config.h b/be/src/common/config.h index 24a0a99bc8476..a17e9548a4e9e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -319,6 +319,9 @@ CONF_mInt32(max_compaction_concurrency, "-1"); // Threshold to logging compaction trace, in seconds. CONF_mInt32(compaction_trace_threshold, "60"); +// If enabled, will verify compaction/schema-change output rowset correctness +CONF_mBool(enable_rowset_verify, "false"); + // Max columns of each compaction group. // If the number of schema columns is greater than this, // the columns will be divided into groups for vertical compaction. diff --git a/be/src/script/script.cpp b/be/src/script/script.cpp index 7839211549e15..89d23c15ad5e8 100644 --- a/be/src/script/script.cpp +++ b/be/src/script/script.cpp @@ -348,6 +348,7 @@ class StorageEngineRef { REG_METHOD(Tablet, debug_string); REG_METHOD(Tablet, updates); REG_METHOD(Tablet, save_meta); + REG_METHOD(Tablet, verify); } { auto& cls = m.klass("EditVersionPB"); diff --git a/be/src/storage/horizontal_compaction_task.cpp b/be/src/storage/horizontal_compaction_task.cpp index 4d1875f9418b3..d493f84fbe7b2 100644 --- a/be/src/storage/horizontal_compaction_task.cpp +++ b/be/src/storage/horizontal_compaction_task.cpp @@ -98,6 +98,10 @@ Status HorizontalCompactionTask::_horizontal_compact_data(Statistics* statistics statistics->filtered_rows = reader.stats().rows_del_filtered; } + if (config::enable_rowset_verify) { + RETURN_IF_ERROR(_output_rowset->verify()); + } + return Status::OK(); } diff --git a/be/src/storage/rowset/rowset.cpp b/be/src/storage/rowset/rowset.cpp index fdb08b9031609..18a297a762f79 100644 --- a/be/src/storage/rowset/rowset.cpp +++ b/be/src/storage/rowset/rowset.cpp @@ -492,4 +492,118 @@ Status Rowset::get_segment_sk_index(std::vector* sk_index_values) { return Status::OK(); } +static int compare_row(const vectorized::Chunk& l, size_t l_row_id, const vectorized::Chunk& r, size_t r_row_id) { + const size_t ncolumn = l.num_columns(); + for (size_t i = 0; i < ncolumn; i++) { + auto v = l.columns()[i]->compare_at(l_row_id, r_row_id, *r.columns()[i], -1); + if (v != 0) { + return v; + } + } + return 0; +} + +static Status report_duplicate(const vectorized::Chunk& chunk, size_t idx, int64_t row_id0, int64_t row_id1) { + return Status::Corruption( + strings::Substitute("duplicate row $0 row:$1==row:$2", chunk.debug_row(idx), row_id0, row_id1)); +} + +static Status report_unordered(const vectorized::Chunk& chunk0, size_t idx0, int64_t row_id0, + const vectorized::Chunk& chunk1, size_t idx1, int64_t row_id1) { + return Status::Corruption(strings::Substitute("unordered row row:$0 $1 > row:$2 $3", row_id0, + chunk0.debug_row(idx0), row_id1, chunk1.debug_row(idx1))); +} + +static Status is_ordered(ChunkIteratorPtr& iter, bool unique) { + vectorized::ChunkPtr chunks[2]; + chunks[0] = ChunkHelper::new_chunk(iter->schema(), iter->chunk_size()); + chunks[1] = ChunkHelper::new_chunk(iter->schema(), iter->chunk_size()); + size_t chunk_idx = 0; + int64_t row_idx = 0; + while (true) { + auto& cur = *chunks[chunk_idx]; + cur.reset(); + auto st = iter->get_next(&cur); + if (st.is_end_of_file()) { + break; + } else if (!st.ok()) { + return st; + } + auto& prev = *chunks[(chunk_idx + 1) % 2]; + // check first row in this chunk is GT/GE last row in previous chunk + if (prev.has_rows()) { + auto cmp = compare_row(prev, prev.num_rows() - 1, cur, 0); + if (cmp == 0) { + if (unique) { + return report_duplicate(cur, 0, row_idx - 1, row_idx); + } + } else if (cmp > 0) { + return report_unordered(prev, prev.num_rows() - 1, row_idx - 1, cur, 0, row_idx); + } + } + // check rows in this chunk is ordered + for (size_t i = 1; i < cur.num_rows(); i++) { + auto cmp = compare_row(cur, i - 1, cur, i); + if (cmp == 0) { + if (unique) { + return report_duplicate(cur, i, row_idx + i - 1, row_idx + i); + } + } else if (cmp > 0) { + return report_unordered(cur, i - 1, row_idx + i - 1, cur, i, row_idx + i); + } + } + row_idx += cur.num_rows(); + chunk_idx = (chunk_idx + 1) % 2; + } + return Status::OK(); +} + +Status Rowset::verify() { + vector key_columns; + vector order_columns; + bool is_pk_ordered = false; + for (int i = 0; i < _schema->num_key_columns(); i++) { + key_columns.push_back(i); + } + if (!_schema->sort_key_idxes().empty() && key_columns != _schema->sort_key_idxes()) { + order_columns = _schema->sort_key_idxes(); + } else { + order_columns = key_columns; + is_pk_ordered = _schema->keys_type() == PRIMARY_KEYS; + } + vectorized::Schema order_schema = ChunkHelper::convert_schema(*_schema, order_columns); + RowsetReadOptions rs_opts; + OlapReaderStatistics stats; + rs_opts.sorted = false; + rs_opts.stats = &stats; + rs_opts.use_page_cache = false; + rs_opts.tablet_schema = _schema; + + std::vector iters; + RETURN_IF_ERROR(get_segment_iterators(order_schema, rs_opts, &iters)); + + // overlapping segments will return multiple iterators, so segment idx is known + Status st; + if (rowset_meta()->is_segments_overlapping()) { + for (size_t i = 0; i < iters.size(); i++) { + st = is_ordered(iters[i], is_pk_ordered); + if (!st.ok()) { + st = st.clone_and_append(strings::Substitute("segment:$0", i)); + break; + } + } + } else { + // non-overlapping segments will return one iterator, so segment idx is unknown + if (iters.size() != 1) { + st = Status::Corruption("non-overlapping segments should return one iterator"); + } else { + st = is_ordered(iters[0], is_pk_ordered); + } + } + if (!st.ok()) { + st.clone_and_append(strings::Substitute("rowset:$0 path:$1", rowset_id().to_string(), rowset_path())); + } + return st; +} + } // namespace starrocks diff --git a/be/src/storage/rowset/rowset.h b/be/src/storage/rowset/rowset.h index a99ec3ba2d9c6..beea681207fcc 100644 --- a/be/src/storage/rowset/rowset.h +++ b/be/src/storage/rowset/rowset.h @@ -314,6 +314,8 @@ class Rowset : public std::enable_shared_from_this { std::for_each(rowsets.begin(), rowsets.end(), [](const RowsetSharedPtr& rowset) { rowset->close(); }); } + Status verify(); + protected: friend class RowsetFactory; diff --git a/be/src/storage/tablet.cpp b/be/src/storage/tablet.cpp index d0845ca51b1fb..312de2e8e614d 100644 --- a/be/src/storage/tablet.cpp +++ b/be/src/storage/tablet.cpp @@ -1295,6 +1295,25 @@ void Tablet::get_basic_info(TabletBasicInfo& info) { } } +Status Tablet::verify() { + int64_t version = max_continuous_version(); + std::vector rowsets; + { + std::shared_lock l(get_header_lock()); + capture_consistent_rowsets(Version(0, version), &rowsets); + Rowset::acquire_readers(rowsets); + } + DeferOp defer([&rowsets]() { Rowset::release_readers(rowsets); }); + for (auto& rowset : rowsets) { + auto st = rowset->verify(); + if (!st.ok()) { + return st.clone_and_append(strings::Substitute("tablet:$0 version:$1 rowset:$2", tablet_id(), version, + rowset->rowset_id().to_string())); + } + } + return Status::OK(); +} + std::string Tablet::schema_debug_string() const { return _tablet_meta->tablet_schema().debug_string(); } diff --git a/be/src/storage/tablet.h b/be/src/storage/tablet.h index 0563ca729279f..da406f355d0f9 100644 --- a/be/src/storage/tablet.h +++ b/be/src/storage/tablet.h @@ -268,6 +268,9 @@ class Tablet : public BaseTablet { void set_will_be_force_replaced() { _will_be_force_replaced = true; } + // verify all rowsets of current(max) version in this tablet + Status verify(); + protected: void on_shutdown() override; diff --git a/be/src/storage/tablet_updates.cpp b/be/src/storage/tablet_updates.cpp index dab2e710a38be..a7521ba925645 100644 --- a/be/src/storage/tablet_updates.cpp +++ b/be/src/storage/tablet_updates.cpp @@ -1439,6 +1439,15 @@ Status TabletUpdates::_do_compaction(std::unique_ptr* pinfo) { rowset_writer.get(), cfg)); auto output_rowset = rowset_writer->build(); if (!output_rowset.ok()) return output_rowset.status(); + if (config::enable_rowset_verify) { + auto st = (*output_rowset)->verify(); + if (!st.ok()) { + st = st.clone_and_append( + strings::Substitute("compaction tablet:$0 #input:$1", _tablet.tablet_id(), input_rowsets.size())); + LOG(WARNING) << st.get_error_msg(); + return st; + } + } // 4. commit compaction EditVersion version; RETURN_IF_ERROR(_commit_compaction(pinfo, *output_rowset, &version)); @@ -1769,6 +1778,7 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info max_rowset_id, max_src_rssid, _debug_compaction_stats(info->inputs, rowset_id)); LOG(ERROR) << msg << debug_string(); _set_error(msg + _debug_version_info(true)); + CHECK(output_rowset->verify().ok()) << msg; } } @@ -2925,6 +2935,15 @@ Status TabletUpdates::convert_from(const std::shared_ptr& base_tablet, i auto new_rowset = rowset_writer->build(); if (!new_rowset.ok()) return new_rowset.status(); + if (config::enable_rowset_verify) { + status = (*new_rowset)->verify(); + if (!status.ok()) { + status = status.clone_and_append( + strings::Substitute("convert_from base_tablet: $0", base_tablet->tablet_id())); + LOG(WARNING) << status.get_error_msg(); + return status; + } + } auto& new_rowset_load_info = new_rowset_load_infos[i]; new_rowset_load_info.num_segments = (*new_rowset)->num_segments(); new_rowset_load_info.rowset_id = next_rowset_id; @@ -3198,6 +3217,16 @@ Status TabletUpdates::reorder_from(const std::shared_ptr& base_tablet, i auto new_rowset = rowset_writer->build(); if (!new_rowset.ok()) return new_rowset.status(); + if (config::enable_rowset_verify) { + status = (*new_rowset)->verify(); + if (!status.ok()) { + status = status.clone_and_append( + strings::Substitute("reorder_from base_tablet: $0", base_tablet->tablet_id())); + LOG(WARNING) << status.get_error_msg(); + return status; + } + } + auto& new_rowset_load_info = new_rowset_load_infos[i]; new_rowset_load_info.num_segments = (*new_rowset)->num_segments(); new_rowset_load_info.rowset_id = next_rowset_id; diff --git a/be/src/storage/vertical_compaction_task.cpp b/be/src/storage/vertical_compaction_task.cpp index 2c2daf86be9b5..b2ce14ac75533 100644 --- a/be/src/storage/vertical_compaction_task.cpp +++ b/be/src/storage/vertical_compaction_task.cpp @@ -95,6 +95,10 @@ Status VerticalCompactionTask::_vertical_compaction_data(Statistics* statistics) TRACE_COUNTER_INCREMENT("output_segments_num", _output_rowset->num_segments()); TRACE("[Compaction] output rowset built"); + if (config::enable_rowset_verify) { + RETURN_IF_ERROR(_output_rowset->verify()); + } + return Status::OK(); } diff --git a/be/test/storage/tablet_updates_test.cpp b/be/test/storage/tablet_updates_test.cpp index 26c3a56a714bb..b48ceec315eee 100644 --- a/be/test/storage/tablet_updates_test.cpp +++ b/be/test/storage/tablet_updates_test.cpp @@ -1785,6 +1785,7 @@ void TabletUpdatesTest::test_horizontal_compaction(bool enable_persistent_index) ASSERT_EQ(best_tablet->updates()->version_history_count(), 5); // the time interval is not enough after last compaction EXPECT_EQ(best_tablet->updates()->get_compaction_score(), -1); + EXPECT_TRUE(best_tablet->verify().ok()); } TEST_F(TabletUpdatesTest, horizontal_compaction) {