diff --git a/be/src/storage/lake/location_provider.h b/be/src/storage/lake/location_provider.h index 108396a93eb7f..86e15c6b5abf1 100644 --- a/be/src/storage/lake/location_provider.h +++ b/be/src/storage/lake/location_provider.h @@ -77,6 +77,10 @@ class LocationProvider { tablet_metadata_lock_filename(tablet_id, version, expire_time)); } + std::string schema_file_location(int64_t tablet_id, int64_t schema_id) const { + return join_path(root_location(tablet_id), schema_filename(schema_id)); + } + private: static std::string join_path(std::string_view parent, std::string_view child) { return fmt::format("{}/{}", parent, child); diff --git a/be/src/storage/lake/tablet_manager.cpp b/be/src/storage/lake/tablet_manager.cpp index 2a6bf298b7201..fccbc3e33d173 100644 --- a/be/src/storage/lake/tablet_manager.cpp +++ b/be/src/storage/lake/tablet_manager.cpp @@ -346,7 +346,9 @@ Status TabletManager::create_tablet(const TCreateTabletReq& req) { RETURN_IF_ERROR(starrocks::convert_t_schema_to_pb_schema( req.tablet_schema, next_unique_id, col_idx_to_unique_id, tablet_metadata_pb->mutable_schema(), req.__isset.compression_type ? req.compression_type : TCompressionType::LZ4_FRAME)); - RETURN_IF_ERROR(create_schema_file(req.tablet_id, tablet_metadata_pb->schema())); + if (req.create_schema_file) { + RETURN_IF_ERROR(create_schema_file(req.tablet_id, tablet_metadata_pb->schema())); + } return put_tablet_metadata(std::move(tablet_metadata_pb)); } @@ -907,27 +909,21 @@ Status TabletManager::delete_tablet_metadata_lock(int64_t tablet_id, int64_t ver } // Store a copy of the tablet schema in a separate schema file named SCHEMA_{indexId}. -// If this is a multi-partition table, then each partition directory will contain a schema file. -// This method may be concurrently and repeatedly called multiple times. That means concurrent -// creation and writes to the same schema file could happen. We assume the FileSystem interface -// guarantees last-write-wins semantics here. Status TabletManager::create_schema_file(int64_t tablet_id, const TabletSchemaPB& schema_pb) { - auto schema_file_path = join_path(tablet_root_location(tablet_id), schema_filename(schema_pb.id())); - if (!fs::path_exist(schema_file_path)) { - VLOG(3) << "Creating schema file of id " << schema_pb.id() << " for tablet " << tablet_id; - ProtobufFile file(schema_file_path); - RETURN_IF_ERROR(file.save(schema_pb)); - - // Save the schema into the in-memory cache - auto [schema, inserted] = GlobalTabletSchemaMap::Instance()->emplace(schema_pb); - if (UNLIKELY(schema == nullptr)) { - return Status::InternalError("failed to emplace the schema hash map"); - } - auto cache_key = global_schema_cache_key(schema_pb.id()); - auto cache_value = std::make_unique(schema); - auto cache_size = inserted ? schema->mem_usage() : 0; - fill_metacache(cache_key, cache_value.release(), cache_size); + auto schema_file_path = _location_provider->schema_file_location(tablet_id, schema_pb.id()); + VLOG(3) << "Creating schema file of id " << schema_pb.id() << " for tablet " << tablet_id; + ProtobufFile file(schema_file_path); + RETURN_IF_ERROR(file.save(schema_pb)); + + // Save the schema into the in-memory cache + auto [schema, inserted] = GlobalTabletSchemaMap::Instance()->emplace(schema_pb); + if (UNLIKELY(schema == nullptr)) { + return Status::InternalError("failed to emplace the schema hash map"); } + auto cache_key = global_schema_cache_key(schema_pb.id()); + auto cache_value = std::make_unique(schema); + auto cache_size = inserted ? schema->mem_usage() : 0; + fill_metacache(cache_key, cache_value.release(), cache_size); return Status::OK(); } diff --git a/be/test/storage/lake/tablet_manager_test.cpp b/be/test/storage/lake/tablet_manager_test.cpp index baaaf01c346fb..0c138898a8dc3 100644 --- a/be/test/storage/lake/tablet_manager_test.cpp +++ b/be/test/storage/lake/tablet_manager_test.cpp @@ -108,28 +108,61 @@ TEST_F(LakeTabletManagerTest, txnlog_write_and_read) { // NOLINTNEXTLINE TEST_F(LakeTabletManagerTest, create_and_delete_tablet) { + auto fs = FileSystem::Default(); + auto tablet_id = next_id(); + auto schema_id = next_id(); + TCreateTabletReq req; - req.tablet_id = 65535; + req.tablet_id = tablet_id; req.__set_version(1); req.__set_version_hash(0); - req.tablet_schema.__set_id(next_id()); + req.tablet_schema.__set_id(schema_id); req.tablet_schema.__set_schema_hash(270068375); req.tablet_schema.__set_short_key_column_count(2); req.tablet_schema.__set_keys_type(TKeysType::DUP_KEYS); EXPECT_OK(_tablet_manager->create_tablet(req)); - auto res = _tablet_manager->get_tablet(65535); - EXPECT_TRUE(res.ok()); + EXPECT_TRUE(_tablet_manager->get_tablet(tablet_id).ok()); + EXPECT_TRUE(fs->path_exists(_location_provider->tablet_metadata_location(tablet_id, 1)).ok()); + EXPECT_TRUE(fs->path_exists(_location_provider->schema_file_location(tablet_id, schema_id)).ok()); starrocks::lake::TxnLog txnLog; - txnLog.set_tablet_id(65535); + txnLog.set_tablet_id(tablet_id); txnLog.set_txn_id(2); EXPECT_OK(_tablet_manager->put_txn_log(txnLog)); - EXPECT_OK(_tablet_manager->delete_tablet(65535)); + EXPECT_OK(_tablet_manager->delete_tablet(tablet_id)); - auto st = FileSystem::Default()->path_exists(_location_provider->tablet_metadata_location(65535, 1)); - EXPECT_TRUE(st.is_not_found()); - st = FileSystem::Default()->path_exists(_location_provider->tablet_metadata_location(65535, 2)); - EXPECT_TRUE(st.is_not_found()); + EXPECT_TRUE(fs->path_exists(_location_provider->tablet_metadata_location(tablet_id, 1)).is_not_found()); + EXPECT_TRUE(fs->path_exists(_location_provider->txn_log_location(tablet_id, 2)).is_not_found()); + EXPECT_TRUE(fs->path_exists(_location_provider->schema_file_location(tablet_id, schema_id)).ok()); +} + +// NOLINTNEXTLINE +TEST_F(LakeTabletManagerTest, create_tablet_without_schema_file) { + auto fs = FileSystem::Default(); + + for (auto create_schema_file : {false, true}) { + auto tablet_id = next_id(); + auto schema_id = next_id(); + + TCreateTabletReq req; + req.tablet_id = tablet_id; + req.__set_version(1); + req.__set_version_hash(0); + req.tablet_schema.__set_id(schema_id); + req.tablet_schema.__set_schema_hash(270068375); + req.tablet_schema.__set_short_key_column_count(2); + req.tablet_schema.__set_keys_type(TKeysType::DUP_KEYS); + req.__set_create_schema_file(create_schema_file); + EXPECT_OK(_tablet_manager->create_tablet(req)); + EXPECT_TRUE(_tablet_manager->get_tablet(tablet_id).ok()); + EXPECT_TRUE(fs->path_exists(_location_provider->tablet_metadata_location(tablet_id, 1)).ok()); + auto st = fs->path_exists(_location_provider->schema_file_location(tablet_id, schema_id)); + if (create_schema_file) { + EXPECT_TRUE(st.ok()) << st; + } else { + EXPECT_TRUE(st.is_not_found()) << st; + } + } } // NOLINTNEXTLINE diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJob.java b/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJob.java index 5e200c7760b8c..d4e6b905b5ee4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/LakeTableSchemaChangeJob.java @@ -340,6 +340,7 @@ protected void runPendingJob() throws AlterCancelException { sortKeyIdxes = copiedSortKeyIdxes; } + boolean createSchemaFile = true; for (Tablet shadowTablet : shadowIdx.getTablets()) { long shadowTabletId = shadowTablet.getId(); LakeTablet lakeTablet = ((LakeTablet) shadowTablet); @@ -358,7 +359,9 @@ protected void runPendingJob() throws AlterCancelException { countDownLatch, indexes, table.isInMemory(), table.enablePersistentIndex(), table.primaryIndexCacheExpireSec(), TTabletType.TABLET_TYPE_LAKE, table.getCompressionType(), - copiedSortKeyIdxes, null); + copiedSortKeyIdxes, null, createSchemaFile); + // For each partition, the schema file is created only when the first Tablet is created + createSchemaFile = false; batchTask.addTask(createReplicaTask); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/alter/RollupJobV2.java b/fe/fe-core/src/main/java/com/starrocks/alter/RollupJobV2.java index fd57deccc9a08..74c42919d95ea 100644 --- a/fe/fe-core/src/main/java/com/starrocks/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/com/starrocks/alter/RollupJobV2.java @@ -267,7 +267,7 @@ protected void runPendingJob() throws AlterCancelException { tbl.enablePersistentIndex(), tbl.primaryIndexCacheExpireSec(), tabletType, tbl.getCompressionType(), index.getSortKeyIdxes(), - index.getSortKeyUniqueIds()); + index.getSortKeyUniqueIds(), true); createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash); batchTask.addTask(createReplicaTask); } // end for rollupReplicas diff --git a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java index be6c3c5e1ac40..96507670fce55 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java @@ -1983,7 +1983,8 @@ private List buildCreateReplicaTasks(long dbId, OlapTable tab private List buildCreateReplicaTasks(long dbId, OlapTable table, PhysicalPartition partition, MaterializedIndex index) throws DdlException { LOG.info("build create replica tasks for index {} db {} table {} partition {}", - index, dbId, table.getId(), partition); + index, dbId, table.getId(), partition); + boolean createSchemaFile = true; List tasks = new ArrayList<>((int) index.getReplicaCount()); MaterializedIndexMeta indexMeta = table.getIndexMetaByIndexId(index.getId()); for (Tablet tablet : index.getTablets()) { @@ -2021,7 +2022,10 @@ private List buildCreateReplicaTasks(long dbId, OlapTable tab table.getPersistentIndexType(), TTabletType.TABLET_TYPE_LAKE, table.getCompressionType(), indexMeta.getSortKeyIdxes(), - indexMeta.getSortKeyUniqueIds()); + indexMeta.getSortKeyUniqueIds(), + createSchemaFile); + // For each partition, the schema file is created only when the first Tablet is created + createSchemaFile = false; task.setSchemaVersion(indexMeta.getSchemaVersion()); tasks.add(task); } else { diff --git a/fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java index be9a13f4c7d61..66f741cedde01 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/CreateReplicaTask.java @@ -109,6 +109,7 @@ public class CreateReplicaTask extends AgentTask { private boolean isRecoverTask = false; private int primaryIndexCacheExpireSec = 0; + private boolean createSchemaFile = true; public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, short shortKeyColumnCount, int schemaHash, long version, @@ -155,10 +156,12 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition boolean enablePersistentIndex, int primaryIndexCacheExpireSec, TTabletType tabletType, TCompressionType compressionType, List sortKeyIdxes, - List sortKeyUniqueIds) { + List sortKeyUniqueIds, + boolean createSchemaFile) { this(backendId, dbId, tableId, partitionId, indexId, tabletId, shortKeyColumnCount, schemaHash, 0, version, keysType, storageType, storageMedium, columns, bfColumns, bfFpp, latch, indexes, isInMemory, enablePersistentIndex, primaryIndexCacheExpireSec, tabletType, compressionType, sortKeyIdxes, sortKeyUniqueIds); + this.createSchemaFile = createSchemaFile; } public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, @@ -213,10 +216,12 @@ public CreateReplicaTask(long backendId, long dbId, long tableId, long partition int primaryIndexCacheExpireSec, TPersistentIndexType persistentIndexType, TTabletType tabletType, TCompressionType compressionType, List sortKeyIdxes, - List sortKeyUniqueIds) { + List sortKeyUniqueIds, + boolean createSchemaFile) { this(backendId, dbId, tableId, partitionId, indexId, tabletId, shortKeyColumnCount, schemaHash, version, keysType, storageType, storageMedium, columns, bfColumns, bfFpp, latch, indexes, isInMemory, - enablePersistentIndex, primaryIndexCacheExpireSec, tabletType, compressionType, sortKeyIdxes, sortKeyUniqueIds); + enablePersistentIndex, primaryIndexCacheExpireSec, tabletType, compressionType, sortKeyIdxes, sortKeyUniqueIds, + createSchemaFile); this.persistentIndexType = persistentIndexType; } @@ -337,8 +342,8 @@ public TCreateTabletReq toThrift() { } createTabletReq.setCompression_type(compressionType); - createTabletReq.setTablet_type(tabletType); + createTabletReq.setCreate_schema_file(createSchemaFile); return createTabletReq; } } diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 544f709981f95..72de857b973e1 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -112,6 +112,8 @@ struct TCreateTabletReq { 17: optional TBinlogConfig binlog_config; 18: optional TPersistentIndexType persistent_index_type; 19: optional i32 primary_index_cache_expire_sec; + // Whether or not need to create a separate file to hold schema information. + 20: optional bool create_schema_file = true; } struct TDropTabletReq {