Skip to content

Commit

Permalink
Fix a bug that when catalog is unified catalog the backgroud refresh …
Browse files Browse the repository at this point in the history
…for hive connector do not work

Signed-off-by: duanyyyyyyy <[email protected]>
  • Loading branch information
duanyyyyyyy committed Jan 17, 2025
1 parent 688a26d commit 55b0ee0
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.starrocks.connector.ConnectorContext;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.hive.ConnectorProcessorName;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudConfigurationFactory;
import com.starrocks.server.GlobalStateMgr;
Expand All @@ -36,12 +37,14 @@ public class DeltaLakeConnector implements Connector {
private final Map<String, String> properties;
private final CloudConfiguration cloudConfiguration;
private final String catalogName;
private final ConnectorProcessorName processorName;
private final DeltaLakeInternalMgr internalMgr;
private final DeltaLakeMetadataFactory metadataFactory;
private IDeltaLakeMetastore metastore;

public DeltaLakeConnector(ConnectorContext context) {
this.catalogName = context.getCatalogName();
this.processorName = new ConnectorProcessorName(catalogName, "delta_lake");
this.properties = context.getProperties();
this.cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration);
Expand Down Expand Up @@ -75,13 +78,13 @@ public CloudConfiguration getCloudConfiguration() {
public void shutdown() {
internalMgr.shutdown();
metadataFactory.metastoreCacheInvalidateCache();
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName);
}

public void onCreate() {
Optional<DeltaLakeCacheUpdateProcessor> updateProcessor = metadataFactory.getCacheUpdateProcessor();
updateProcessor.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
.registerCacheUpdateProcessor(catalogName, updateProcessor.get()));
.registerCacheUpdateProcessor(processorName, updateProcessor.get()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.hive;

public class ConnectorProcessorName {

private final String catalogName;
private final String connectorName;

public ConnectorProcessorName(String catalogName, String connectorName) {
this.catalogName = catalogName;
this.connectorName = connectorName;
}

public String getCatalogName() {
return this.catalogName;
}

public String getConnectorName() {
return this.connectorName;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ public class ConnectorTableMetadataProcessor extends FrontendDaemon {

private final Set<BaseTableInfo> registeredTableInfos = Sets.newConcurrentHashSet();

private final Map<String, CacheUpdateProcessor> cacheUpdateProcessors = new ConcurrentHashMap<>();
private final Map<ConnectorProcessorName, CacheUpdateProcessor> cacheUpdateProcessors =
new ConcurrentHashMap<>();

private final ExecutorService refreshRemoteFileExecutor;
private final Map<String, IcebergCatalog> cachingIcebergCatalogs = new ConcurrentHashMap<>();
Expand All @@ -57,14 +58,16 @@ public void registerTableInfo(BaseTableInfo tableInfo) {
registeredTableInfos.add(tableInfo);
}

public void registerCacheUpdateProcessor(String catalogName, CacheUpdateProcessor cache) {
LOG.info("register to update {} metadata cache in the ConnectorTableMetadataProcessor", catalogName);
cacheUpdateProcessors.put(catalogName, cache);
public void registerCacheUpdateProcessor(ConnectorProcessorName processorName, CacheUpdateProcessor cache) {
LOG.info("register to update {} metadata cache from {} in the ConnectorTableMetadataProcessor",
processorName.getConnectorName(), processorName.getConnectorName());
cacheUpdateProcessors.put(processorName, cache);
}

public void unRegisterCacheUpdateProcessor(String catalogName) {
LOG.info("unregister to update {} metadata cache in the ConnectorTableMetadataProcessor", catalogName);
cacheUpdateProcessors.remove(catalogName);
public void unRegisterCacheUpdateProcessor(ConnectorProcessorName processorName) {
LOG.info("unregister to update {} metadata cache from {} in the ConnectorTableMetadataProcessor",
processorName.getConnectorName(), processorName.getConnectorName());
cacheUpdateProcessors.remove(processorName);
}

public void registerCachingIcebergCatalog(String catalogName, IcebergCatalog icebergCatalog) {
Expand Down Expand Up @@ -99,9 +102,11 @@ protected void runAfterCatalogReady() {

private void refreshCatalogTable() {
MetadataMgr metadataMgr = GlobalStateMgr.getCurrentState().getMetadataMgr();
List<String> catalogNames = Lists.newArrayList(cacheUpdateProcessors.keySet());
for (String catalogName : catalogNames) {
CacheUpdateProcessor updateProcessor = cacheUpdateProcessors.get(catalogName);
List<ConnectorProcessorName> processorNames = Lists.newArrayList(cacheUpdateProcessors.keySet());
for (ConnectorProcessorName processorName : processorNames) {
String catalogName = processorName.getCatalogName();
LOG.info("Starting to refresh tables from {} in catalog {}", processorName.getConnectorName(), catalogName);
CacheUpdateProcessor updateProcessor = cacheUpdateProcessors.get(processorName);
if (updateProcessor == null) {
LOG.error("Failed to get cacheUpdateProcessor by catalog {}.", catalogName);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ public class HiveConnector implements Connector {
public static final String HIVE_METASTORE_CONNECTION_POOL_SIZE = "hive.metastore.connection.pool.size";
private final Map<String, String> properties;
private final String catalogName;
private final ConnectorProcessorName processorName;
private final HiveConnectorInternalMgr internalMgr;
private final HiveMetadataFactory metadataFactory;

public HiveConnector(ConnectorContext context) {
this.properties = context.getProperties();
this.catalogName = context.getCatalogName();
this.processorName = new ConnectorProcessorName(catalogName, "hive_connector");
CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration);
this.internalMgr = new HiveConnectorInternalMgr(catalogName, properties, hdfsEnvironment);
Expand Down Expand Up @@ -83,7 +85,7 @@ public void onCreate() {
internalMgr.isEnableBackgroundRefreshHiveMetadata()) {
updateProcessor
.ifPresent(processor -> GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor()
.registerCacheUpdateProcessor(catalogName, updateProcessor.get()));
.registerCacheUpdateProcessor(processorName, updateProcessor.get()));
}
}
}
Expand All @@ -93,6 +95,6 @@ public void shutdown() {
internalMgr.shutdown();
metadataFactory.getCacheUpdateProcessor().ifPresent(HiveCacheUpdateProcessor::invalidateAll);
GlobalStateMgr.getCurrentState().getMetastoreEventsProcessor().unRegisterCacheUpdateProcessor(catalogName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.RemoteFileIO;
import com.starrocks.connector.hive.ConnectorProcessorName;
import com.starrocks.connector.hive.IHiveMetastore;
import com.starrocks.credential.CloudConfiguration;
import com.starrocks.credential.CloudConfigurationFactory;
Expand All @@ -33,6 +34,7 @@ public class HudiConnector implements Connector {
public static final List<String> SUPPORTED_METASTORE_TYPE = Lists.newArrayList("hive", "glue", "dlf");
private final Map<String, String> properties;
private final String catalogName;
private final ConnectorProcessorName processorName;
private final HudiConnectorInternalMgr internalMgr;
private final HudiMetadataFactory metadataFactory;

Expand All @@ -41,6 +43,7 @@ public HudiConnector(ConnectorContext context) {
CloudConfiguration cloudConfiguration = CloudConfigurationFactory.buildCloudConfigurationForStorage(properties);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(cloudConfiguration);
this.catalogName = context.getCatalogName();
this.processorName = new ConnectorProcessorName(catalogName, "hudi_connector");
this.internalMgr = new HudiConnectorInternalMgr(catalogName, properties, hdfsEnvironment);
this.metadataFactory = createMetadataFactory(hdfsEnvironment);
onCreate();
Expand Down Expand Up @@ -74,6 +77,6 @@ public void onCreate() {
@Override
public void shutdown() {
internalMgr.shutdown();
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(catalogName);
GlobalStateMgr.getCurrentState().getConnectorTableMetadataProcessor().unRegisterCacheUpdateProcessor(processorName);
}
}

0 comments on commit 55b0ee0

Please sign in to comment.