From 55b0ee04f445ab2eebf199ca2a5cc76060471091 Mon Sep 17 00:00:00 2001 From: duanyyyyyyy Date: Fri, 17 Jan 2025 23:01:42 +0800 Subject: [PATCH] Fix a bug that when catalog is unified catalog the backgroud refresh for hive connector do not work Signed-off-by: duanyyyyyyy --- .../connector/delta/DeltaLakeConnector.java | 7 ++-- .../hive/ConnectorProcessorName.java | 35 +++++++++++++++++++ .../hive/ConnectorTableMetadataProcessor.java | 25 +++++++------ .../connector/hive/HiveConnector.java | 6 ++-- .../connector/hudi/HudiConnector.java | 5 ++- 5 files changed, 63 insertions(+), 15 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorProcessorName.java diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java index 07f9dac068043..b34747e0f6972 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/delta/DeltaLakeConnector.java @@ -19,6 +19,7 @@ import com.starrocks.connector.ConnectorContext; import com.starrocks.connector.ConnectorMetadata; import com.starrocks.connector.HdfsEnvironment; +import com.starrocks.connector.hive.ConnectorProcessorName; import com.starrocks.credential.CloudConfiguration; import com.starrocks.credential.CloudConfigurationFactory; import com.starrocks.server.GlobalStateMgr; @@ -36,12 +37,14 @@ public class DeltaLakeConnector implements Connector { private final Map properties; private final CloudConfiguration cloudConfiguration; private final String catalogName; + private final ConnectorProcessorName processorName; private final DeltaLakeInternalMgr internalMgr; private final DeltaLakeMetadataFactory metadataFactory; private IDeltaLakeMetastore metastore; public DeltaLakeConnector(ConnectorContext context) { this.catalogName = context.getCatalogName(); + this.processorName = new ConnectorProcessorName(catalogName, "delta_lake"); this.properties = context.getProperties(); this.cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties); HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration); @@ -75,13 +78,13 @@ public CloudConfiguration getCloudConfiguration() { public void shutdown() { internalMgr.shutdown(); metadataFactory.metastoreCacheInvalidateCache(); - GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogName); + GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName); } public void onCreate() { Optional updateProcessor = metadataFactory.getCacheUpdateProcessor(); updateProcessor.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor() - .registerCacheUpdateProcessor(catalogName, updateProcessor.get())); + .registerCacheUpdateProcessor(processorName, updateProcessor.get())); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorProcessorName.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorProcessorName.java new file mode 100644 index 0000000000000..0a4444483c9e9 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorProcessorName.java @@ -0,0 +1,35 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.connector.hive; + +public class ConnectorProcessorName { + + private final String catalogName; + private final String connectorName; + + public ConnectorProcessorName(String catalogName, String connectorName) { + this.catalogName = catalogName; + this.connectorName = connectorName; + } + + public String getCatalogName() { + return this.catalogName; + } + + public String getConnectorName() { + return this.connectorName; + } + +} diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java index 868c26674aae5..6b134980b54e9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/ConnectorTableMetadataProcessor.java @@ -48,7 +48,8 @@ public class ConnectorTableMetadataProcessor extends FrontendDaemon { private final Set registeredTableInfos = Sets.newConcurrentHashSet(); - private final Map cacheUpdateProcessors = new ConcurrentHashMap<>(); + private final Map cacheUpdateProcessors = + new ConcurrentHashMap<>(); private final ExecutorService refreshRemoteFileExecutor; private final Map cachingIcebergCatalogs = new ConcurrentHashMap<>(); @@ -57,14 +58,16 @@ public void registerTableInfo(BaseTableInfo tableInfo) { registeredTableInfos.add(tableInfo); } - public void registerCacheUpdateProcessor(String catalogName, CacheUpdateProcessor cache) { - LOG.info("register to update {} metadata cache in the ConnectorTableMetadataProcessor", catalogName); - cacheUpdateProcessors.put(catalogName, cache); + public void registerCacheUpdateProcessor(ConnectorProcessorName processorName, CacheUpdateProcessor cache) { + LOG.info("register to update {} metadata cache from {} in the ConnectorTableMetadataProcessor", + processorName.getConnectorName(), processorName.getConnectorName()); + cacheUpdateProcessors.put(processorName, cache); } - public void unRegisterCacheUpdateProcessor(String catalogName) { - LOG.info("unregister to update {} metadata cache in the ConnectorTableMetadataProcessor", catalogName); - cacheUpdateProcessors.remove(catalogName); + public void unRegisterCacheUpdateProcessor(ConnectorProcessorName processorName) { + LOG.info("unregister to update {} metadata cache from {} in the ConnectorTableMetadataProcessor", + processorName.getConnectorName(), processorName.getConnectorName()); + cacheUpdateProcessors.remove(processorName); } public void registerCachingIcebergCatalog(String catalogName, IcebergCatalog icebergCatalog) { @@ -99,9 +102,11 @@ protected void runAfterCatalogReady() { private void refreshCatalogTable() { MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr(); - List catalogNames = Lists.newArrayList(cacheUpdateProcessors.keySet()); - for (String catalogName : catalogNames) { - CacheUpdateProcessor updateProcessor = cacheUpdateProcessors.get(catalogName); + List processorNames = Lists.newArrayList(cacheUpdateProcessors.keySet()); + for (ConnectorProcessorName processorName : processorNames) { + String catalogName = processorName.getCatalogName(); + LOG.info("Starting to refresh tables from {} in catalog {}", processorName.getConnectorName(), catalogName); + CacheUpdateProcessor updateProcessor = cacheUpdateProcessors.get(processorName); if (updateProcessor == null) { LOG.error("Failed to get cacheUpdateProcessor by catalog {}.", catalogName); continue; diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java index 01b6fdf14e6d8..27821543a62df 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hive/HiveConnector.java @@ -34,12 +34,14 @@ public class HiveConnector implements Connector { public static final String HIVE_METASTORE_CONNECTION_POOL_SIZE = "hive.metastore.connection.pool.size"; private final Map properties; private final String catalogName; + private final ConnectorProcessorName processorName; private final HiveConnectorInternalMgr internalMgr; private final HiveMetadataFactory metadataFactory; public HiveConnector(ConnectorContext context) { this.properties = context.getProperties(); this.catalogName = context.getCatalogName(); + this.processorName = new ConnectorProcessorName(catalogName, "hive_connector"); CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties); HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration); this.internalMgr = new HiveConnectorInternalMgr(catalogName, properties, hdfsEnvironment); @@ -83,7 +85,7 @@ public void onCreate() { internalMgr.isEnableBackgroundRefreshHiveMetadata()) { updateProcessor .ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor() - .registerCacheUpdateProcessor(catalogName, updateProcessor.get())); + .registerCacheUpdateProcessor(processorName, updateProcessor.get())); } } } @@ -93,6 +95,6 @@ public void shutdown() { internalMgr.shutdown(); metadataFactory.getCacheUpdateProcessor().ifPresent(HiveCacheUpdateProcessor::invalidateAll); GlobalStateMgr.getCurrentState().getMetastoreEventsProcessor().unRegisterCacheUpdateProcessor(catalogName); - GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogName); + GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName); } } diff --git a/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java b/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java index da45e9baac646..670358094202b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java +++ b/fe/fe-core/src/main/java/com/starrocks/connector/hudi/HudiConnector.java @@ -20,6 +20,7 @@ import com.starrocks.connector.ConnectorMetadata; import com.starrocks.connector.HdfsEnvironment; import com.starrocks.connector.RemoteFileIO; +import com.starrocks.connector.hive.ConnectorProcessorName; import com.starrocks.connector.hive.IHiveMetastore; import com.starrocks.credential.CloudConfiguration; import com.starrocks.credential.CloudConfigurationFactory; @@ -33,6 +34,7 @@ public class HudiConnector implements Connector { public static final List SUPPORTED_METASTORE_TYPE = Lists.newArrayList("hive", "glue", "dlf"); private final Map properties; private final String catalogName; + private final ConnectorProcessorName processorName; private final HudiConnectorInternalMgr internalMgr; private final HudiMetadataFactory metadataFactory; @@ -41,6 +43,7 @@ public HudiConnector(ConnectorContext context) { CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties); HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration); this.catalogName = context.getCatalogName(); + this.processorName = new ConnectorProcessorName(catalogName, "hudi_connector"); this.internalMgr = new HudiConnectorInternalMgr(catalogName, properties, hdfsEnvironment); this.metadataFactory = createMetadataFactory(hdfsEnvironment); onCreate(); @@ -74,6 +77,6 @@ public void onCreate() { @Override public void shutdown() { internalMgr.shutdown(); - GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogName); + GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName); } } \ No newline at end of file