Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Add more check and info when compaction find row size inconsistency (backport #33972) #34051

Merged
merged 2 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,9 @@ CONF_mInt32(max_compaction_concurrency, "-1");
// Threshold to logging compaction trace, in seconds.
CONF_mInt32(compaction_trace_threshold, "60");

// If enabled, will verify compaction/schema-change output rowset correctness
CONF_mBool(enable_rowset_verify, "false");

// Max columns of each compaction group.
// If the number of schema columns is greater than this,
// the columns will be divided into groups for vertical compaction.
Expand Down
1 change: 1 addition & 0 deletions be/src/script/script.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ class StorageEngineRef {
REG_METHOD(Tablet, debug_string);
REG_METHOD(Tablet, updates);
REG_METHOD(Tablet, save_meta);
REG_METHOD(Tablet, verify);
}
{
auto& cls = m.klass<EditVersionPB>("EditVersionPB");
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/horizontal_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ Status HorizontalCompactionTask::_horizontal_compact_data(Statistics* statistics
statistics->filtered_rows = reader.stats().rows_del_filtered;
}

if (config::enable_rowset_verify) {
RETURN_IF_ERROR(_output_rowset->verify());
}

return Status::OK();
}

Expand Down
114 changes: 114 additions & 0 deletions be/src/storage/rowset/rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,118 @@ Status Rowset::get_segment_sk_index(std::vector<std::string>* sk_index_values) {
return Status::OK();
}

static int compare_row(const vectorized::Chunk& l, size_t l_row_id, const vectorized::Chunk& r, size_t r_row_id) {
const size_t ncolumn = l.num_columns();
for (size_t i = 0; i < ncolumn; i++) {
auto v = l.columns()[i]->compare_at(l_row_id, r_row_id, *r.columns()[i], -1);
if (v != 0) {
return v;
}
}
return 0;
}

static Status report_duplicate(const vectorized::Chunk& chunk, size_t idx, int64_t row_id0, int64_t row_id1) {
return Status::Corruption(
strings::Substitute("duplicate row $0 row:$1==row:$2", chunk.debug_row(idx), row_id0, row_id1));
}

static Status report_unordered(const vectorized::Chunk& chunk0, size_t idx0, int64_t row_id0,
const vectorized::Chunk& chunk1, size_t idx1, int64_t row_id1) {
return Status::Corruption(strings::Substitute("unordered row row:$0 $1 > row:$2 $3", row_id0,
chunk0.debug_row(idx0), row_id1, chunk1.debug_row(idx1)));
}

static Status is_ordered(ChunkIteratorPtr& iter, bool unique) {
vectorized::ChunkPtr chunks[2];
chunks[0] = ChunkHelper::new_chunk(iter->schema(), iter->chunk_size());
chunks[1] = ChunkHelper::new_chunk(iter->schema(), iter->chunk_size());
size_t chunk_idx = 0;
int64_t row_idx = 0;
while (true) {
auto& cur = *chunks[chunk_idx];
cur.reset();
auto st = iter->get_next(&cur);
if (st.is_end_of_file()) {
break;
} else if (!st.ok()) {
return st;
}
auto& prev = *chunks[(chunk_idx + 1) % 2];
// check first row in this chunk is GT/GE last row in previous chunk
if (prev.has_rows()) {
auto cmp = compare_row(prev, prev.num_rows() - 1, cur, 0);
if (cmp == 0) {
if (unique) {
return report_duplicate(cur, 0, row_idx - 1, row_idx);
}
} else if (cmp > 0) {
return report_unordered(prev, prev.num_rows() - 1, row_idx - 1, cur, 0, row_idx);
}
}
// check rows in this chunk is ordered
for (size_t i = 1; i < cur.num_rows(); i++) {
auto cmp = compare_row(cur, i - 1, cur, i);
if (cmp == 0) {
if (unique) {
return report_duplicate(cur, i, row_idx + i - 1, row_idx + i);
}
} else if (cmp > 0) {
return report_unordered(cur, i - 1, row_idx + i - 1, cur, i, row_idx + i);
}
}
row_idx += cur.num_rows();
chunk_idx = (chunk_idx + 1) % 2;
}
return Status::OK();
}

Status Rowset::verify() {
vector<ColumnId> key_columns;
vector<ColumnId> order_columns;
bool is_pk_ordered = false;
for (int i = 0; i < _schema->num_key_columns(); i++) {
key_columns.push_back(i);
}
if (!_schema->sort_key_idxes().empty() && key_columns != _schema->sort_key_idxes()) {
order_columns = _schema->sort_key_idxes();
} else {
order_columns = key_columns;
is_pk_ordered = _schema->keys_type() == PRIMARY_KEYS;
}
vectorized::Schema order_schema = ChunkHelper::convert_schema(*_schema, order_columns);
RowsetReadOptions rs_opts;
OlapReaderStatistics stats;
rs_opts.sorted = false;
rs_opts.stats = &stats;
rs_opts.use_page_cache = false;
rs_opts.tablet_schema = _schema;

std::vector<ChunkIteratorPtr> iters;
RETURN_IF_ERROR(get_segment_iterators(order_schema, rs_opts, &iters));

// overlapping segments will return multiple iterators, so segment idx is known
Status st;
if (rowset_meta()->is_segments_overlapping()) {
for (size_t i = 0; i < iters.size(); i++) {
st = is_ordered(iters[i], is_pk_ordered);
if (!st.ok()) {
st = st.clone_and_append(strings::Substitute("segment:$0", i));
break;
}
}
} else {
// non-overlapping segments will return one iterator, so segment idx is unknown
if (iters.size() != 1) {
st = Status::Corruption("non-overlapping segments should return one iterator");
} else {
st = is_ordered(iters[0], is_pk_ordered);
}
}
if (!st.ok()) {
st.clone_and_append(strings::Substitute("rowset:$0 path:$1", rowset_id().to_string(), rowset_path()));
}
return st;
}

} // namespace starrocks
2 changes: 2 additions & 0 deletions be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ class Rowset : public std::enable_shared_from_this<Rowset> {
std::for_each(rowsets.begin(), rowsets.end(), [](const RowsetSharedPtr& rowset) { rowset->close(); });
}

Status verify();

protected:
friend class RowsetFactory;

Expand Down
19 changes: 19 additions & 0 deletions be/src/storage/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,25 @@ void Tablet::get_basic_info(TabletBasicInfo& info) {
}
}

Status Tablet::verify() {
int64_t version = max_continuous_version();
std::vector<RowsetSharedPtr> rowsets;
{
std::shared_lock l(get_header_lock());
capture_consistent_rowsets(Version(0, version), &rowsets);
Rowset::acquire_readers(rowsets);
}
DeferOp defer([&rowsets]() { Rowset::release_readers(rowsets); });
for (auto& rowset : rowsets) {
auto st = rowset->verify();
if (!st.ok()) {
return st.clone_and_append(strings::Substitute("tablet:$0 version:$1 rowset:$2", tablet_id(), version,
rowset->rowset_id().to_string()));
}
}
return Status::OK();
}

std::string Tablet::schema_debug_string() const {
return _tablet_meta->tablet_schema().debug_string();
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/storage/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ class Tablet : public BaseTablet {

void set_will_be_force_replaced() { _will_be_force_replaced = true; }

// verify all rowsets of current(max) version in this tablet
Status verify();

protected:
void on_shutdown() override;

Expand Down
73 changes: 71 additions & 2 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,15 @@ Status TabletUpdates::_do_compaction(std::unique_ptr<CompactionInfo>* pinfo) {
rowset_writer.get(), cfg));
auto output_rowset = rowset_writer->build();
if (!output_rowset.ok()) return output_rowset.status();
if (config::enable_rowset_verify) {
auto st = (*output_rowset)->verify();
if (!st.ok()) {
st = st.clone_and_append(
strings::Substitute("compaction tablet:$0 #input:$1", _tablet.tablet_id(), input_rowsets.size()));
LOG(WARNING) << st.get_error_msg();
return st;
}
}
// 4. commit compaction
EditVersion version;
RETURN_IF_ERROR(_commit_compaction(pinfo, *output_rowset, &version));
Expand Down Expand Up @@ -1762,10 +1771,45 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
<< Substitute("($0/$1/$2)", t_load - t_start, t_index_delvec - t_load, t_write - t_index_delvec);
VLOG(1) << "update compaction apply " << _debug_string(true, true);
if (row_before != row_after) {
string msg = Substitute("actual row size changed after compaction $0 -> $1", row_before, row_after);
string msg = strings::Substitute(
"actual row size changed after compaction $0 -> $1 inputs:$2 output:$3 max_rowset_id:$4 "
"max_src_rssid:$5 $6",
row_before, row_after, PrettyPrinter::print_unique_int_list_range(info->inputs), rowset_id,
max_rowset_id, max_src_rssid, _debug_compaction_stats(info->inputs, rowset_id));
LOG(ERROR) << msg << debug_string();
_set_error(msg + _debug_version_info(true));
CHECK(output_rowset->verify().ok()) << msg;
}
}

std::string TabletUpdates::_debug_compaction_stats(const std::vector<uint32_t>& input_rowsets,
const uint32_t output_rowset) {
std::stringstream ss;
std::lock_guard lg(_rowset_stats_lock);
ss << "inputs:";
for (auto rowset_id : input_rowsets) {
auto iter = _rowset_stats.find(rowset_id);
if (iter == _rowset_stats.end()) {
ss << rowset_id << ":"
<< "NA";
} else {
ss << rowset_id << ":" << iter->second->num_dels << "/" << iter->second->num_rows;
}
ss << " ";
}
ss << "output:";
auto iter = _rowset_stats.find(output_rowset);
if (iter == _rowset_stats.end()) {
ss << output_rowset << ":"
<< "NA";
} else {
ss << output_rowset << ":" << iter->second->num_dels << "/" << iter->second->num_rows;
}
auto rs = _get_rowset(output_rowset);
if (rs) {
ss << " " << rs->unique_id();
}
return ss.str();
}

void TabletUpdates::to_updates_pb(TabletUpdatesPB* updates_pb) const {
Expand Down Expand Up @@ -2891,6 +2935,15 @@ Status TabletUpdates::convert_from(const std::shared_ptr<Tablet>& base_tablet, i
auto new_rowset = rowset_writer->build();
if (!new_rowset.ok()) return new_rowset.status();

if (config::enable_rowset_verify) {
status = (*new_rowset)->verify();
if (!status.ok()) {
status = status.clone_and_append(
strings::Substitute("convert_from base_tablet: $0", base_tablet->tablet_id()));
LOG(WARNING) << status.get_error_msg();
return status;
}
}
auto& new_rowset_load_info = new_rowset_load_infos[i];
new_rowset_load_info.num_segments = (*new_rowset)->num_segments();
new_rowset_load_info.rowset_id = next_rowset_id;
Expand Down Expand Up @@ -3164,6 +3217,16 @@ Status TabletUpdates::reorder_from(const std::shared_ptr<Tablet>& base_tablet, i
auto new_rowset = rowset_writer->build();
if (!new_rowset.ok()) return new_rowset.status();

if (config::enable_rowset_verify) {
status = (*new_rowset)->verify();
if (!status.ok()) {
status = status.clone_and_append(
strings::Substitute("reorder_from base_tablet: $0", base_tablet->tablet_id()));
LOG(WARNING) << status.get_error_msg();
return status;
}
}

auto& new_rowset_load_info = new_rowset_load_infos[i];
new_rowset_load_info.num_segments = (*new_rowset)->num_segments();
new_rowset_load_info.rowset_id = next_rowset_id;
Expand Down Expand Up @@ -3557,7 +3620,9 @@ Status TabletUpdates::load_snapshot(const SnapshotMeta& snapshot_meta, bool rest
return Status::Cancelled("snapshot version too small");
}

std::stringstream ss;
uint32_t new_next_rowset_id = _next_rowset_id;
ss << "next_rowset_id before:" << _next_rowset_id << " rowsets:";
for (const auto& rowset_meta_pb : snapshot_meta.rowset_metas()) {
auto rowset_meta = std::make_shared<RowsetMeta>(rowset_meta_pb);
const auto new_id = rowset_meta_pb.rowset_seg_id() + _next_rowset_id;
Expand All @@ -3567,6 +3632,7 @@ Status TabletUpdates::load_snapshot(const SnapshotMeta& snapshot_meta, bool rest
RowsetSharedPtr* rowset = &new_rowsets[new_id];
RETURN_IF_ERROR(RowsetFactory::create_rowset(&_tablet.tablet_schema(), _tablet.schema_hash_path(),
rowset_meta, rowset));
ss << new_id << ",";
VLOG(2) << "add a new rowset " << tablet_id << "@" << new_id << "@" << rowset_meta->rowset_id();
}

Expand All @@ -3590,18 +3656,21 @@ Status TabletUpdates::load_snapshot(const SnapshotMeta& snapshot_meta, bool rest
}
DCHECK_EQ(1, _edit_version_infos.size());

ss << " delvec:";
WriteBatch wb;
CHECK_FAIL(TabletMetaManager::clear_log(data_store, &wb, tablet_id));
for (const auto& [rssid, delvec] : snapshot_meta.delete_vectors()) {
auto id = rssid + _next_rowset_id;
CHECK_FAIL(TabletMetaManager::put_del_vector(data_store, &wb, tablet_id, id, delvec));
ss << id << ",";
}
for (const auto& [rid, rowset] : _rowsets) {
RowsetMetaPB meta_pb = rowset->rowset_meta()->to_rowset_pb();
CHECK_FAIL(TabletMetaManager::put_rowset_meta(data_store, &wb, tablet_id, meta_pb));
}

_next_rowset_id = new_next_rowset_id;
ss << " next_rowset_id after:" << _next_rowset_id;

_to_updates_pb_unlocked(new_tablet_meta_pb.mutable_updates());
VLOG(2) << new_tablet_meta_pb.updates().DebugString();
Expand Down Expand Up @@ -3646,7 +3715,7 @@ Status TabletUpdates::load_snapshot(const SnapshotMeta& snapshot_meta, bool rest
index_entry->value().unload();
index_cache.release(index_entry);

LOG(INFO) << "load full snapshot done " << _debug_string(false);
LOG(INFO) << "load full snapshot done " << _debug_string(false) << ss.str();

return Status::OK();
} else {
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ class TabletUpdates {

std::string _debug_version_info(bool lock) const;

std::string _debug_compaction_stats(const std::vector<uint32_t>& input_rowsets, const uint32_t output_rowset);

void _print_rowsets(std::vector<uint32_t>& rowsets, std::string* dst, bool abbr) const;

void _set_error(const string& msg);
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/vertical_compaction_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ Status VerticalCompactionTask::_vertical_compaction_data(Statistics* statistics)
TRACE_COUNTER_INCREMENT("output_segments_num", _output_rowset->num_segments());
TRACE("[Compaction] output rowset built");

if (config::enable_rowset_verify) {
RETURN_IF_ERROR(_output_rowset->verify());
}

return Status::OK();
}

Expand Down
26 changes: 26 additions & 0 deletions be/src/util/pretty_printer.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,32 @@ class PrettyPrinter {
/// Convenience method
static std::string print_bytes(int64_t value) { return PrettyPrinter::print(value, TUnit::BYTES); }

// convert a vector of int to a string, consecutive numbers are printed as a range
// integers in input must be sorted & unique
template <typename T>
static std::string print_unique_int_list_range(const std::vector<T>& vs) {
std::stringstream ss;
if (vs.size() > 0) {
size_t start = 0;
for (size_t i = 1; i < vs.size(); ++i) {
if (vs[i] != vs[i - 1] + 1) {
if (start == i - 1) {
ss << vs[start] << ",";
} else {
ss << vs[start] << "-" << vs[i - 1] << ",";
}
start = i;
}
}
if (start == vs.size() - 1) {
ss << vs[start];
} else {
ss << vs[start] << "-" << vs[vs.size() - 1];
}
}
return ss.str();
}

private:
static const int PRECISION = 2;
static const int TIME_NS_PRECISION = 3;
Expand Down
1 change: 1 addition & 0 deletions be/test/storage/tablet_updates_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1785,6 +1785,7 @@ void TabletUpdatesTest::test_horizontal_compaction(bool enable_persistent_index)
ASSERT_EQ(best_tablet->updates()->version_history_count(), 5);
// the time interval is not enough after last compaction
EXPECT_EQ(best_tablet->updates()->get_compaction_score(), -1);
EXPECT_TRUE(best_tablet->verify().ok());
}

TEST_F(TabletUpdatesTest, horizontal_compaction) {
Expand Down
Loading