Skip to content

Commit

Permalink
[Enhancement] Avoid duplicate schema file creation (#33014)
Browse files Browse the repository at this point in the history
For each partition, the schema file is created only when the first tablet is created.

Signed-off-by: Alex Zhu <[email protected]>
  • Loading branch information
sduzh authored Oct 18, 2023
1 parent 0db07bd commit 32b4ad4
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 38 deletions.
4 changes: 4 additions & 0 deletions be/src/storage/lake/location_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ class LocationProvider {
tablet_metadata_lock_filename(tablet_id, version, expire_time));
}

std::string schema_file_location(int64_t tablet_id, int64_t schema_id) const {
return join_path(root_location(tablet_id), schema_filename(schema_id));
}

private:
static std::string join_path(std::string_view parent, std::string_view child) {
return fmt::format("{}/{}", parent, child);
Expand Down
36 changes: 16 additions & 20 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ Status TabletManager::create_tablet(const TCreateTabletReq& req) {
RETURN_IF_ERROR(starrocks::convert_t_schema_to_pb_schema(
req.tablet_schema, next_unique_id, col_idx_to_unique_id, tablet_metadata_pb->mutable_schema(),
req.__isset.compression_type ? req.compression_type : TCompressionType::LZ4_FRAME));
RETURN_IF_ERROR(create_schema_file(req.tablet_id, tablet_metadata_pb->schema()));
if (req.create_schema_file) {
RETURN_IF_ERROR(create_schema_file(req.tablet_id, tablet_metadata_pb->schema()));
}

return put_tablet_metadata(std::move(tablet_metadata_pb));
}
Expand Down Expand Up @@ -907,27 +909,21 @@ Status TabletManager::delete_tablet_metadata_lock(int64_t tablet_id, int64_t ver
}

// Store a copy of the tablet schema in a separate schema file named SCHEMA_{indexId}.
// If this is a multi-partition table, then each partition directory will contain a schema file.
// This method may be concurrently and repeatedly called multiple times. That means concurrent
// creation and writes to the same schema file could happen. We assume the FileSystem interface
// guarantees last-write-wins semantics here.
Status TabletManager::create_schema_file(int64_t tablet_id, const TabletSchemaPB& schema_pb) {
auto schema_file_path = join_path(tablet_root_location(tablet_id), schema_filename(schema_pb.id()));
if (!fs::path_exist(schema_file_path)) {
VLOG(3) << "Creating schema file of id " << schema_pb.id() << " for tablet " << tablet_id;
ProtobufFile file(schema_file_path);
RETURN_IF_ERROR(file.save(schema_pb));

// Save the schema into the in-memory cache
auto [schema, inserted] = GlobalTabletSchemaMap::Instance()->emplace(schema_pb);
if (UNLIKELY(schema == nullptr)) {
return Status::InternalError("failed to emplace the schema hash map");
}
auto cache_key = global_schema_cache_key(schema_pb.id());
auto cache_value = std::make_unique<CacheValue>(schema);
auto cache_size = inserted ? schema->mem_usage() : 0;
fill_metacache(cache_key, cache_value.release(), cache_size);
auto schema_file_path = _location_provider->schema_file_location(tablet_id, schema_pb.id());
VLOG(3) << "Creating schema file of id " << schema_pb.id() << " for tablet " << tablet_id;
ProtobufFile file(schema_file_path);
RETURN_IF_ERROR(file.save(schema_pb));

// Save the schema into the in-memory cache
auto [schema, inserted] = GlobalTabletSchemaMap::Instance()->emplace(schema_pb);
if (UNLIKELY(schema == nullptr)) {
return Status::InternalError("failed to emplace the schema hash map");
}
auto cache_key = global_schema_cache_key(schema_pb.id());
auto cache_value = std::make_unique<CacheValue>(schema);
auto cache_size = inserted ? schema->mem_usage() : 0;
fill_metacache(cache_key, cache_value.release(), cache_size);
return Status::OK();
}

Expand Down
53 changes: 43 additions & 10 deletions be/test/storage/lake/tablet_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,28 +108,61 @@ TEST_F(LakeTabletManagerTest, txnlog_write_and_read) {

// NOLINTNEXTLINE
TEST_F(LakeTabletManagerTest, create_and_delete_tablet) {
auto fs = FileSystem::Default();
auto tablet_id = next_id();
auto schema_id = next_id();

TCreateTabletReq req;
req.tablet_id = 65535;
req.tablet_id = tablet_id;
req.__set_version(1);
req.__set_version_hash(0);
req.tablet_schema.__set_id(next_id());
req.tablet_schema.__set_id(schema_id);
req.tablet_schema.__set_schema_hash(270068375);
req.tablet_schema.__set_short_key_column_count(2);
req.tablet_schema.__set_keys_type(TKeysType::DUP_KEYS);
EXPECT_OK(_tablet_manager->create_tablet(req));
auto res = _tablet_manager->get_tablet(65535);
EXPECT_TRUE(res.ok());
EXPECT_TRUE(_tablet_manager->get_tablet(tablet_id).ok());
EXPECT_TRUE(fs->path_exists(_location_provider->tablet_metadata_location(tablet_id, 1)).ok());
EXPECT_TRUE(fs->path_exists(_location_provider->schema_file_location(tablet_id, schema_id)).ok());

starrocks::lake::TxnLog txnLog;
txnLog.set_tablet_id(65535);
txnLog.set_tablet_id(tablet_id);
txnLog.set_txn_id(2);
EXPECT_OK(_tablet_manager->put_txn_log(txnLog));
EXPECT_OK(_tablet_manager->delete_tablet(65535));
EXPECT_OK(_tablet_manager->delete_tablet(tablet_id));

auto st = FileSystem::Default()->path_exists(_location_provider->tablet_metadata_location(65535, 1));
EXPECT_TRUE(st.is_not_found());
st = FileSystem::Default()->path_exists(_location_provider->tablet_metadata_location(65535, 2));
EXPECT_TRUE(st.is_not_found());
EXPECT_TRUE(fs->path_exists(_location_provider->tablet_metadata_location(tablet_id, 1)).is_not_found());
EXPECT_TRUE(fs->path_exists(_location_provider->txn_log_location(tablet_id, 2)).is_not_found());
EXPECT_TRUE(fs->path_exists(_location_provider->schema_file_location(tablet_id, schema_id)).ok());
}

// NOLINTNEXTLINE
TEST_F(LakeTabletManagerTest, create_tablet_without_schema_file) {
auto fs = FileSystem::Default();

for (auto create_schema_file : {false, true}) {
auto tablet_id = next_id();
auto schema_id = next_id();

TCreateTabletReq req;
req.tablet_id = tablet_id;
req.__set_version(1);
req.__set_version_hash(0);
req.tablet_schema.__set_id(schema_id);
req.tablet_schema.__set_schema_hash(270068375);
req.tablet_schema.__set_short_key_column_count(2);
req.tablet_schema.__set_keys_type(TKeysType::DUP_KEYS);
req.__set_create_schema_file(create_schema_file);
EXPECT_OK(_tablet_manager->create_tablet(req));
EXPECT_TRUE(_tablet_manager->get_tablet(tablet_id).ok());
EXPECT_TRUE(fs->path_exists(_location_provider->tablet_metadata_location(tablet_id, 1)).ok());
auto st = fs->path_exists(_location_provider->schema_file_location(tablet_id, schema_id));
if (create_schema_file) {
EXPECT_TRUE(st.ok()) << st;
} else {
EXPECT_TRUE(st.is_not_found()) << st;
}
}
}

// NOLINTNEXTLINE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ protected void runPendingJob() throws AlterCancelException {
sortKeyIdxes = copiedSortKeyIdxes;
}

boolean createSchemaFile = true;
for (Tablet shadowTablet : shadowIdx.getTablets()) {
long shadowTabletId = shadowTablet.getId();
LakeTablet lakeTablet = ((LakeTablet) shadowTablet);
Expand All @@ -358,7 +359,9 @@ protected void runPendingJob() throws AlterCancelException {
countDownLatch, indexes, table.isInMemory(), table.enablePersistentIndex(),
table.primaryIndexCacheExpireSec(), TTabletType.TABLET_TYPE_LAKE,
table.getCompressionType(),
copiedSortKeyIdxes, null);
copiedSortKeyIdxes, null, createSchemaFile);
// For each partition, the schema file is created only when the first Tablet is created
createSchemaFile = false;
batchTask.addTask(createReplicaTask);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ protected void runPendingJob() throws AlterCancelException {
tbl.enablePersistentIndex(),
tbl.primaryIndexCacheExpireSec(),
tabletType, tbl.getCompressionType(), index.getSortKeyIdxes(),
index.getSortKeyUniqueIds());
index.getSortKeyUniqueIds(), true);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
batchTask.addTask(createReplicaTask);
} // end for rollupReplicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1983,7 +1983,8 @@ private List<CreateReplicaTask> buildCreateReplicaTasks(long dbId, OlapTable tab
private List<CreateReplicaTask> buildCreateReplicaTasks(long dbId, OlapTable table, PhysicalPartition partition,
MaterializedIndex index) throws DdlException {
LOG.info("build create replica tasks for index {} db {} table {} partition {}",
index, dbId, table.getId(), partition);
index, dbId, table.getId(), partition);
boolean createSchemaFile = true;
List<CreateReplicaTask> tasks = new ArrayList<>((int) index.getReplicaCount());
MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(index.getId());
for (Tablet tablet : index.getTablets()) {
Expand Down Expand Up @@ -2021,7 +2022,10 @@ private List<CreateReplicaTask> buildCreateReplicaTasks(long dbId, OlapTable tab
table.getPersistentIndexType(),
TTabletType.TABLET_TYPE_LAKE,
table.getCompressionType(), indexMeta.getSortKeyIdxes(),
indexMeta.getSortKeyUniqueIds());
indexMeta.getSortKeyUniqueIds(),
createSchemaFile);
// For each partition, the schema file is created only when the first Tablet is created
createSchemaFile = false;
task.setSchemaVersion(indexMeta.getSchemaVersion());
tasks.add(task);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class CreateReplicaTask extends AgentTask {
private boolean isRecoverTask = false;

private int primaryIndexCacheExpireSec = 0;
private boolean createSchemaFile = true;

public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
short shortKeyColumnCount, int schemaHash, long version,
Expand Down Expand Up @@ -155,10 +156,12 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition
boolean enablePersistentIndex,
int primaryIndexCacheExpireSec,
TTabletType tabletType, TCompressionType compressionType, List<Integer> sortKeyIdxes,
List<Integer> sortKeyUniqueIds) {
List<Integer> sortKeyUniqueIds,
boolean createSchemaFile) {
this(backendId, dbId, tableId, partitionId, indexId, tabletId, shortKeyColumnCount, schemaHash, 0, version,
keysType, storageType, storageMedium, columns, bfColumns, bfFpp, latch, indexes, isInMemory,
enablePersistentIndex, primaryIndexCacheExpireSec, tabletType, compressionType, sortKeyIdxes, sortKeyUniqueIds);
this.createSchemaFile = createSchemaFile;
}

public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
Expand Down Expand Up @@ -213,10 +216,12 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition
int primaryIndexCacheExpireSec,
TPersistentIndexType persistentIndexType,
TTabletType tabletType, TCompressionType compressionType, List<Integer> sortKeyIdxes,
List<Integer> sortKeyUniqueIds) {
List<Integer> sortKeyUniqueIds,
boolean createSchemaFile) {
this(backendId, dbId, tableId, partitionId, indexId, tabletId, shortKeyColumnCount, schemaHash, version,
keysType, storageType, storageMedium, columns, bfColumns, bfFpp, latch, indexes, isInMemory,
enablePersistentIndex, primaryIndexCacheExpireSec, tabletType, compressionType, sortKeyIdxes, sortKeyUniqueIds);
enablePersistentIndex, primaryIndexCacheExpireSec, tabletType, compressionType, sortKeyIdxes, sortKeyUniqueIds,
createSchemaFile);
this.persistentIndexType = persistentIndexType;
}

Expand Down Expand Up @@ -337,8 +342,8 @@ public TCreateTabletReq toThrift() {
}

createTabletReq.setCompression_type(compressionType);

createTabletReq.setTablet_type(tabletType);
createTabletReq.setCreate_schema_file(createSchemaFile);
return createTabletReq;
}
}
2 changes: 2 additions & 0 deletions gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ struct TCreateTabletReq {
17: optional TBinlogConfig binlog_config;
18: optional TPersistentIndexType persistent_index_type;
19: optional i32 primary_index_cache_expire_sec;
// Whether or not need to create a separate file to hold schema information.
20: optional bool create_schema_file = true;
}

struct TDropTabletReq {
Expand Down

0 comments on commit 32b4ad4

Please sign in to comment.