Skip to content

Commit

Permalink
[Enhancement] Add function to check compaction correctness (#25513)
Browse files Browse the repository at this point in the history
Fixes #25337

Recently, we encountered some data corruption bugs after
compaction/schema-change, especially for the sort key enabled table. It's
better to add a function to check compaction/schema-change correctness, so
as to notice bugs earlier.

This PR adds a function to check rowset correctness(currently checks each
segment file's key/order key is ordered, and unique if it's pk table and
data is ordered by pk). There are 2 ways to invoke/enable rowset verify

1. BE config `enable_rowset_verify`, if enabled, compaction and
schema_change creating a new rowset will do a verification after rowset is
generated.
2. bind `Tablet::verify` method to script engine, so user can verify
each tablet manually, or verify all the tablets of a table in a loop
using script.

(cherry picked from commit dba492c)
Signed-off-by: Binglin Chang <[email protected]>
  • Loading branch information
decster committed Nov 1, 2023
1 parent 72f0c33 commit b519868
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 0 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ CONF_mInt32(max_compaction_concurrency, "-1");
// Threshold to logging compaction trace, in seconds.
CONF_mInt32(compaction_trace_threshold, "60");

// If enabled, will verify compaction/schema-change output rowset correctness
CONF_mBool(enable_rowset_verify, "false");

// Max columns of each compaction group.
// If the number of schema columns is greater than this,
// the columns will be divided into groups for vertical compaction.
Expand Down
1 change: 1 addition & 0 deletions be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ class StorageEngineRef {
REG_METHOD(Tablet, debug_string);
REG_METHOD(Tablet, updates);
REG_METHOD(Tablet, save_meta);
REG_METHOD(Tablet, verify);
}
{
auto& cls = m.klass<EditVersionPB>("EditVersionPB");
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/horizontal_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ Status HorizontalCompactionTask::_horizontal_compact_data(Statistics* statistics
statistics->filtered_rows = reader.stats().rows_del_filtered;
}

if (config::enable_rowset_verify) {
RETURN_IF_ERROR(_output_rowset->verify());
}

return Status::OK();
}

Expand Down
114 changes: 114 additions & 0 deletions be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,118 @@ Status Rowset::get_segment_sk_index(std::vector<std::string>* sk_index_values) {
return Status::OK();
}

static int compare_row(const Chunk& l, size_t l_row_id, const Chunk& r, size_t r_row_id) {
const size_t ncolumn = l.num_columns();
for (size_t i = 0; i < ncolumn; i++) {
auto v = l.columns()[i]->compare_at(l_row_id, r_row_id, *r.columns()[i], -1);
if (v != 0) {
return v;
}
}
return 0;
}

static Status report_duplicate(const Chunk& chunk, size_t idx, int64_t row_id0, int64_t row_id1) {
return Status::Corruption(
strings::Substitute("duplicate row $0 row:$1==row:$2", chunk.debug_row(idx), row_id0, row_id1));
}

static Status report_unordered(const Chunk& chunk0, size_t idx0, int64_t row_id0, const Chunk& chunk1, size_t idx1,
int64_t row_id1) {
return Status::Corruption(strings::Substitute("unordered row row:$0 $1 > row:$2 $3", row_id0,
chunk0.debug_row(idx0), row_id1, chunk1.debug_row(idx1)));
}

static Status is_ordered(ChunkIteratorPtr& iter, bool unique) {
ChunkUniquePtr chunks[2];
chunks[0] = ChunkHelper::new_chunk(iter->schema(), iter->chunk_size());
chunks[1] = ChunkHelper::new_chunk(iter->schema(), iter->chunk_size());
size_t chunk_idx = 0;
int64_t row_idx = 0;
while (true) {
auto& cur = *chunks[chunk_idx];
cur.reset();
auto st = iter->get_next(&cur);
if (st.is_end_of_file()) {
break;
} else if (!st.ok()) {
return st;
}
auto& prev = *chunks[(chunk_idx + 1) % 2];
// check first row in this chunk is GT/GE last row in previous chunk
if (prev.has_rows()) {
auto cmp = compare_row(prev, prev.num_rows() - 1, cur, 0);
if (cmp == 0) {
if (unique) {
return report_duplicate(cur, 0, row_idx - 1, row_idx);
}
} else if (cmp > 0) {
return report_unordered(prev, prev.num_rows() - 1, row_idx - 1, cur, 0, row_idx);
}
}
// check rows in this chunk is ordered
for (size_t i = 1; i < cur.num_rows(); i++) {
auto cmp = compare_row(cur, i - 1, cur, i);
if (cmp == 0) {
if (unique) {
return report_duplicate(cur, i, row_idx + i - 1, row_idx + i);
}
} else if (cmp > 0) {
return report_unordered(cur, i - 1, row_idx + i - 1, cur, i, row_idx + i);
}
}
row_idx += cur.num_rows();
chunk_idx = (chunk_idx + 1) % 2;
}
return Status::OK();
}

Status Rowset::verify() {
vector<ColumnId> key_columns;
vector<ColumnId> order_columns;
bool is_pk_ordered = false;
for (int i = 0; i < _schema->num_key_columns(); i++) {
key_columns.push_back(i);
}
if (!_schema->sort_key_idxes().empty() && key_columns != _schema->sort_key_idxes()) {
order_columns = _schema->sort_key_idxes();
} else {
order_columns = key_columns;
is_pk_ordered = _schema->keys_type() == PRIMARY_KEYS;
}
Schema order_schema = ChunkHelper::convert_schema(*_schema, order_columns);
RowsetReadOptions rs_opts;
OlapReaderStatistics stats;
rs_opts.sorted = false;
rs_opts.stats = &stats;
rs_opts.use_page_cache = false;
rs_opts.tablet_schema = _schema;

std::vector<ChunkIteratorPtr> iters;
RETURN_IF_ERROR(get_segment_iterators(order_schema, rs_opts, &iters));

// overlapping segments will return multiple iterators, so segment idx is known
Status st;
if (rowset_meta()->is_segments_overlapping()) {
for (size_t i = 0; i < iters.size(); i++) {
st = is_ordered(iters[i], is_pk_ordered);
if (!st.ok()) {
st = st.clone_and_append(strings::Substitute("segment:$0", i));
break;
}
}
} else {
// non-overlapping segments will return one iterator, so segment idx is unknown
if (iters.size() != 1) {
st = Status::Corruption("non-overlapping segments should return one iterator");
} else {
st = is_ordered(iters[0], is_pk_ordered);
}
}
if (!st.ok()) {
st.clone_and_append(strings::Substitute("rowset:$0 path:$1", rowset_id().to_string(), rowset_path()));
}
return st;
}

} // namespace starrocks
2 changes: 2 additions & 0 deletions be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
std::for_each(rowsets.begin(), rowsets.end(), [](const RowsetSharedPtr& rowset) { rowset->close(); });
}

Status verify();

protected:
friend class RowsetFactory;

Expand Down
19 changes: 19 additions & 0 deletions be/src/storage/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,25 @@ void Tablet::get_basic_info(TabletBasicInfo& info) {
}
}

Status Tablet::verify() {
int64_t version = max_continuous_version();
std::vector<RowsetSharedPtr> rowsets;
{
std::shared_lock l(get_header_lock());
capture_consistent_rowsets(Version(0, version), &rowsets);
Rowset::acquire_readers(rowsets);
}
DeferOp defer([&rowsets]() { Rowset::release_readers(rowsets); });
for (auto& rowset : rowsets) {
auto st = rowset->verify();
if (!st.ok()) {
return st.clone_and_append(strings::Substitute("tablet:$0 version:$1 rowset:$2", tablet_id(), version,
rowset->rowset_id().to_string()));
}
}
return Status::OK();
}

std::string Tablet::schema_debug_string() const {
return _tablet_meta->tablet_schema().debug_string();
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ class Tablet : public BaseTablet {

void set_will_be_force_replaced() { _will_be_force_replaced = true; }

// verify all rowsets of current(max) version in this tablet
Status verify();

protected:
void on_shutdown() override;

Expand Down
29 changes: 29 additions & 0 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,15 @@ Status TabletUpdates::_do_compaction(std::unique_ptr<CompactionInfo>* pinfo) {
rowset_writer.get(), cfg));
auto output_rowset = rowset_writer->build();
if (!output_rowset.ok()) return output_rowset.status();
if (config::enable_rowset_verify) {
auto st = (*output_rowset)->verify();
if (!st.ok()) {
st = st.clone_and_append(
strings::Substitute("compaction tablet:$0 #input:$1", _tablet.tablet_id(), input_rowsets.size()));
LOG(WARNING) << st.get_error_msg();
return st;
}
}
// 4. commit compaction
EditVersion version;
RETURN_IF_ERROR(_commit_compaction(pinfo, *output_rowset, &version));
Expand Down Expand Up @@ -1769,6 +1778,7 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
max_rowset_id, max_src_rssid, _debug_compaction_stats(info->inputs, rowset_id));
LOG(ERROR) << msg << debug_string();
_set_error(msg + _debug_version_info(true));
CHECK(output_rowset->verify().ok()) << msg;
}
}

Expand Down Expand Up @@ -2925,6 +2935,15 @@ Status TabletUpdates::convert_from(const std::shared_ptr<Tablet>& base_tablet, i
auto new_rowset = rowset_writer->build();
if (!new_rowset.ok()) return new_rowset.status();

if (config::enable_rowset_verify) {
status = (*new_rowset)->verify();
if (!status.ok()) {
status = status.clone_and_append(
strings::Substitute("convert_from base_tablet: $0", base_tablet->tablet_id()));
LOG(WARNING) << status.get_error_msg();
return status;
}
}
auto& new_rowset_load_info = new_rowset_load_infos[i];
new_rowset_load_info.num_segments = (*new_rowset)->num_segments();
new_rowset_load_info.rowset_id = next_rowset_id;
Expand Down Expand Up @@ -3198,6 +3217,16 @@ Status TabletUpdates::reorder_from(const std::shared_ptr<Tablet>& base_tablet, i
auto new_rowset = rowset_writer->build();
if (!new_rowset.ok()) return new_rowset.status();

if (config::enable_rowset_verify) {
status = (*new_rowset)->verify();
if (!status.ok()) {
status = status.clone_and_append(
strings::Substitute("reorder_from base_tablet: $0", base_tablet->tablet_id()));
LOG(WARNING) << status.get_error_msg();
return status;
}
}

auto& new_rowset_load_info = new_rowset_load_infos[i];
new_rowset_load_info.num_segments = (*new_rowset)->num_segments();
new_rowset_load_info.rowset_id = next_rowset_id;
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/vertical_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ Status VerticalCompactionTask::_vertical_compaction_data(Statistics* statistics)
TRACE_COUNTER_INCREMENT("output_segments_num", _output_rowset->num_segments());
TRACE("[Compaction] output rowset built");

if (config::enable_rowset_verify) {
RETURN_IF_ERROR(_output_rowset->verify());
}

return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,7 @@ void TabletUpdatesTest::test_horizontal_compaction(bool enable_persistent_index)
ASSERT_EQ(best_tablet->updates()->version_history_count(), 5);
// the time interval is not enough after last compaction
EXPECT_EQ(best_tablet->updates()->get_compaction_score(), -1);
EXPECT_TRUE(best_tablet->verify().ok());
}

TEST_F(TabletUpdatesTest, horizontal_compaction) {
Expand Down

0 comments on commit b519868

Please sign in to comment.