diff --git a/be/src/common/config.h b/be/src/common/config.h index 24a0a99bc8476..a17e9548a4e9e 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -319,6 +319,9 @@ CONF_mInt32(max_compaction_concurrency, "-1"); // Threshold to logging compaction trace, in seconds. CONF_mInt32(compaction_trace_threshold, "60"); +// If enabled, will verify compaction/schema-change output rowset correctness +CONF_mBool(enable_rowset_verify, "false"); + // Max columns of each compaction group. // If the number of schema columns is greater than this, // the columns will be divided into groups for vertical compaction. diff --git a/be/src/script/script.cpp b/be/src/script/script.cpp index 7839211549e15..89d23c15ad5e8 100644 --- a/be/src/script/script.cpp +++ b/be/src/script/script.cpp @@ -348,6 +348,7 @@ class StorageEngineRef { REG_METHOD(Tablet, debug_string); REG_METHOD(Tablet, updates); REG_METHOD(Tablet, save_meta); + REG_METHOD(Tablet, verify); } { auto& cls = m.klass("EditVersionPB"); diff --git a/be/src/storage/horizontal_compaction_task.cpp b/be/src/storage/horizontal_compaction_task.cpp index 4d1875f9418b3..d493f84fbe7b2 100644 --- a/be/src/storage/horizontal_compaction_task.cpp +++ b/be/src/storage/horizontal_compaction_task.cpp @@ -98,6 +98,10 @@ Status HorizontalCompactionTask::_horizontal_compact_data(Statistics* statistics statistics->filtered_rows = reader.stats().rows_del_filtered; } + if (config::enable_rowset_verify) { + RETURN_IF_ERROR(_output_rowset->verify()); + } + return Status::OK(); } diff --git a/be/src/storage/rowset/rowset.cpp b/be/src/storage/rowset/rowset.cpp index fdb08b9031609..18a297a762f79 100644 --- a/be/src/storage/rowset/rowset.cpp +++ b/be/src/storage/rowset/rowset.cpp @@ -492,4 +492,118 @@ Status Rowset::get_segment_sk_index(std::vector* sk_index_values) { return Status::OK(); } +static int compare_row(const vectorized::Chunk& l, size_t l_row_id, const vectorized::Chunk& r, size_t r_row_id) { + const size_t ncolumn = l.num_columns(); + for (size_t i = 0; i < ncolumn; i++) { + auto v = l.columns()[i]->compare_at(l_row_id, r_row_id, *r.columns()[i], -1); + if (v != 0) { + return v; + } + } + return 0; +} + +static Status report_duplicate(const vectorized::Chunk& chunk, size_t idx, int64_t row_id0, int64_t row_id1) { + return Status::Corruption( + strings::Substitute("duplicate row $0 row:$1==row:$2", chunk.debug_row(idx), row_id0, row_id1)); +} + +static Status report_unordered(const vectorized::Chunk& chunk0, size_t idx0, int64_t row_id0, + const vectorized::Chunk& chunk1, size_t idx1, int64_t row_id1) { + return Status::Corruption(strings::Substitute("unordered row row:$0 $1 > row:$2 $3", row_id0, + chunk0.debug_row(idx0), row_id1, chunk1.debug_row(idx1))); +} + +static Status is_ordered(ChunkIteratorPtr& iter, bool unique) { + vectorized::ChunkPtr chunks[2]; + chunks[0] = ChunkHelper::new_chunk(iter->schema(), iter->chunk_size()); + chunks[1] = ChunkHelper::new_chunk(iter->schema(), iter->chunk_size()); + size_t chunk_idx = 0; + int64_t row_idx = 0; + while (true) { + auto& cur = *chunks[chunk_idx]; + cur.reset(); + auto st = iter->get_next(&cur); + if (st.is_end_of_file()) { + break; + } else if (!st.ok()) { + return st; + } + auto& prev = *chunks[(chunk_idx + 1) % 2]; + // check first row in this chunk is GT/GE last row in previous chunk + if (prev.has_rows()) { + auto cmp = compare_row(prev, prev.num_rows() - 1, cur, 0); + if (cmp == 0) { + if (unique) { + return report_duplicate(cur, 0, row_idx - 1, row_idx); + } + } else if (cmp > 0) { + return report_unordered(prev, prev.num_rows() - 1, row_idx - 1, cur, 0, row_idx); + } + } + // check rows in this chunk is ordered + for (size_t i = 1; i < cur.num_rows(); i++) { + auto cmp = compare_row(cur, i - 1, cur, i); + if (cmp == 0) { + if (unique) { + return report_duplicate(cur, i, row_idx + i - 1, row_idx + i); + } + } else if (cmp > 0) { + return report_unordered(cur, i - 1, row_idx + i - 1, cur, i, row_idx + i); + } + } + row_idx += cur.num_rows(); + chunk_idx = (chunk_idx + 1) % 2; + } + return Status::OK(); +} + +Status Rowset::verify() { + vector key_columns; + vector order_columns; + bool is_pk_ordered = false; + for (int i = 0; i < _schema->num_key_columns(); i++) { + key_columns.push_back(i); + } + if (!_schema->sort_key_idxes().empty() && key_columns != _schema->sort_key_idxes()) { + order_columns = _schema->sort_key_idxes(); + } else { + order_columns = key_columns; + is_pk_ordered = _schema->keys_type() == PRIMARY_KEYS; + } + vectorized::Schema order_schema = ChunkHelper::convert_schema(*_schema, order_columns); + RowsetReadOptions rs_opts; + OlapReaderStatistics stats; + rs_opts.sorted = false; + rs_opts.stats = &stats; + rs_opts.use_page_cache = false; + rs_opts.tablet_schema = _schema; + + std::vector iters; + RETURN_IF_ERROR(get_segment_iterators(order_schema, rs_opts, &iters)); + + // overlapping segments will return multiple iterators, so segment idx is known + Status st; + if (rowset_meta()->is_segments_overlapping()) { + for (size_t i = 0; i < iters.size(); i++) { + st = is_ordered(iters[i], is_pk_ordered); + if (!st.ok()) { + st = st.clone_and_append(strings::Substitute("segment:$0", i)); + break; + } + } + } else { + // non-overlapping segments will return one iterator, so segment idx is unknown + if (iters.size() != 1) { + st = Status::Corruption("non-overlapping segments should return one iterator"); + } else { + st = is_ordered(iters[0], is_pk_ordered); + } + } + if (!st.ok()) { + st.clone_and_append(strings::Substitute("rowset:$0 path:$1", rowset_id().to_string(), rowset_path())); + } + return st; +} + } // namespace starrocks diff --git a/be/src/storage/rowset/rowset.h b/be/src/storage/rowset/rowset.h index a99ec3ba2d9c6..beea681207fcc 100644 --- a/be/src/storage/rowset/rowset.h +++ b/be/src/storage/rowset/rowset.h @@ -314,6 +314,8 @@ class Rowset : public std::enable_shared_from_this { std::for_each(rowsets.begin(), rowsets.end(), [](const RowsetSharedPtr& rowset) { rowset->close(); }); } + Status verify(); + protected: friend class RowsetFactory; diff --git a/be/src/storage/tablet.cpp b/be/src/storage/tablet.cpp index d0845ca51b1fb..312de2e8e614d 100644 --- a/be/src/storage/tablet.cpp +++ b/be/src/storage/tablet.cpp @@ -1295,6 +1295,25 @@ void Tablet::get_basic_info(TabletBasicInfo& info) { } } +Status Tablet::verify() { + int64_t version = max_continuous_version(); + std::vector rowsets; + { + std::shared_lock l(get_header_lock()); + capture_consistent_rowsets(Version(0, version), &rowsets); + Rowset::acquire_readers(rowsets); + } + DeferOp defer([&rowsets]() { Rowset::release_readers(rowsets); }); + for (auto& rowset : rowsets) { + auto st = rowset->verify(); + if (!st.ok()) { + return st.clone_and_append(strings::Substitute("tablet:$0 version:$1 rowset:$2", tablet_id(), version, + rowset->rowset_id().to_string())); + } + } + return Status::OK(); +} + std::string Tablet::schema_debug_string() const { return _tablet_meta->tablet_schema().debug_string(); } diff --git a/be/src/storage/tablet.h b/be/src/storage/tablet.h index 0563ca729279f..da406f355d0f9 100644 --- a/be/src/storage/tablet.h +++ b/be/src/storage/tablet.h @@ -268,6 +268,9 @@ class Tablet : public BaseTablet { void set_will_be_force_replaced() { _will_be_force_replaced = true; } + // verify all rowsets of current(max) version in this tablet + Status verify(); + protected: void on_shutdown() override; diff --git a/be/src/storage/tablet_updates.cpp b/be/src/storage/tablet_updates.cpp index dab2e710a38be..a7521ba925645 100644 --- a/be/src/storage/tablet_updates.cpp +++ b/be/src/storage/tablet_updates.cpp @@ -1439,6 +1439,15 @@ Status TabletUpdates::_do_compaction(std::unique_ptr* pinfo) { rowset_writer.get(), cfg)); auto output_rowset = rowset_writer->build(); if (!output_rowset.ok()) return output_rowset.status(); + if (config::enable_rowset_verify) { + auto st = (*output_rowset)->verify(); + if (!st.ok()) { + st = st.clone_and_append( + strings::Substitute("compaction tablet:$0 #input:$1", _tablet.tablet_id(), input_rowsets.size())); + LOG(WARNING) << st.get_error_msg(); + return st; + } + } // 4. commit compaction EditVersion version; RETURN_IF_ERROR(_commit_compaction(pinfo, *output_rowset, &version)); @@ -1769,6 +1778,7 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info max_rowset_id, max_src_rssid, _debug_compaction_stats(info->inputs, rowset_id)); LOG(ERROR) << msg << debug_string(); _set_error(msg + _debug_version_info(true)); + CHECK(output_rowset->verify().ok()) << msg; } } @@ -2925,6 +2935,15 @@ Status TabletUpdates::convert_from(const std::shared_ptr& base_tablet, i auto new_rowset = rowset_writer->build(); if (!new_rowset.ok()) return new_rowset.status(); + if (config::enable_rowset_verify) { + status = (*new_rowset)->verify(); + if (!status.ok()) { + status = status.clone_and_append( + strings::Substitute("convert_from base_tablet: $0", base_tablet->tablet_id())); + LOG(WARNING) << status.get_error_msg(); + return status; + } + } auto& new_rowset_load_info = new_rowset_load_infos[i]; new_rowset_load_info.num_segments = (*new_rowset)->num_segments(); new_rowset_load_info.rowset_id = next_rowset_id; @@ -3198,6 +3217,16 @@ Status TabletUpdates::reorder_from(const std::shared_ptr& base_tablet, i auto new_rowset = rowset_writer->build(); if (!new_rowset.ok()) return new_rowset.status(); + if (config::enable_rowset_verify) { + status = (*new_rowset)->verify(); + if (!status.ok()) { + status = status.clone_and_append( + strings::Substitute("reorder_from base_tablet: $0", base_tablet->tablet_id())); + LOG(WARNING) << status.get_error_msg(); + return status; + } + } + auto& new_rowset_load_info = new_rowset_load_infos[i]; new_rowset_load_info.num_segments = (*new_rowset)->num_segments(); new_rowset_load_info.rowset_id = next_rowset_id; diff --git a/be/src/storage/vertical_compaction_task.cpp b/be/src/storage/vertical_compaction_task.cpp index 2c2daf86be9b5..b2ce14ac75533 100644 --- a/be/src/storage/vertical_compaction_task.cpp +++ b/be/src/storage/vertical_compaction_task.cpp @@ -95,6 +95,10 @@ Status VerticalCompactionTask::_vertical_compaction_data(Statistics* statistics) TRACE_COUNTER_INCREMENT("output_segments_num", _output_rowset->num_segments()); TRACE("[Compaction] output rowset built"); + if (config::enable_rowset_verify) { + RETURN_IF_ERROR(_output_rowset->verify()); + } + return Status::OK(); } diff --git a/be/test/storage/tablet_updates_test.cpp b/be/test/storage/tablet_updates_test.cpp index 26c3a56a714bb..b48ceec315eee 100644 --- a/be/test/storage/tablet_updates_test.cpp +++ b/be/test/storage/tablet_updates_test.cpp @@ -1785,6 +1785,7 @@ void TabletUpdatesTest::test_horizontal_compaction(bool enable_persistent_index) ASSERT_EQ(best_tablet->updates()->version_history_count(), 5); // the time interval is not enough after last compaction EXPECT_EQ(best_tablet->updates()->get_compaction_score(), -1); + EXPECT_TRUE(best_tablet->verify().ok()); } TEST_F(TabletUpdatesTest, horizontal_compaction) {