diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java index 839997a7035fe..e13efefee0b69 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LoadBalanceResources.java @@ -24,15 +24,19 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.policies.data.loadbalancer.BundleData; +import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData; @Getter public class LoadBalanceResources { public static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data"; + public static final String BROKER_TIME_AVERAGE_BASE_PATH = "/loadbalance/broker-time-average"; private final BundleDataResources bundleDataResources; + private final BrokerTimeAverageDataResources brokerTimeAverageDataResources; public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) { bundleDataResources = new BundleDataResources(store, operationTimeoutSec); + brokerTimeAverageDataResources = new BrokerTimeAverageDataResources(store, operationTimeoutSec); } public static class BundleDataResources extends BaseResources { @@ -69,4 +73,23 @@ private String getBundleDataPath(final String bundle) { return BUNDLE_DATA_BASE_PATH + "/" + bundle; } } + + public static class BrokerTimeAverageDataResources extends BaseResources { + public BrokerTimeAverageDataResources(MetadataStore store, int operationTimeoutSec) { + super(store, TimeAverageBrokerData.class, operationTimeoutSec); + } + + public CompletableFuture updateTimeAverageBrokerData(String brokerLookupAddress, + TimeAverageBrokerData data) { + return setWithCreateAsync(getTimeAverageBrokerDataPath(brokerLookupAddress), __ -> data); + } + + public CompletableFuture deleteTimeAverageBrokerData(String brokerLookupAddress) { + return deleteAsync(getTimeAverageBrokerDataPath(brokerLookupAddress)); + } + + private String getTimeAverageBrokerDataPath(final String brokerLookupAddress) { + return BROKER_TIME_AVERAGE_BASE_PATH + "/" + brokerLookupAddress; + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 586478efa50f7..491941d497ccd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -108,9 +108,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager { // Path to ZNode whose children contain ResourceQuota jsons. public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace"; - // Path to ZNode containing TimeAverageBrokerData jsons for each broker. - public static final String TIME_AVERAGE_BROKER_ZPATH = "/loadbalance/broker-time-average"; - // Set of broker candidates to reuse so that object creation is avoided. private final Set brokerCandidateCache; @@ -119,7 +116,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager { private ResourceLock brokerDataLock; private MetadataCache resourceQuotaCache; - private MetadataCache timeAverageBrokerDataCache; // Broker host usage object used to calculate system resource usage. private BrokerHostUsage brokerHostUsage; @@ -245,7 +241,6 @@ public void initialize(final PulsarService pulsar) { this.pulsarResources = pulsar.getPulsarResources(); brokersData = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class); resourceQuotaCache = pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class); - timeAverageBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class); pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification); pulsar.getLocalMetadataStore().registerSessionListener(this::handleMetadataSessionEvent); @@ -991,13 +986,13 @@ public void start() throws PulsarServerException { String lookupServiceAddress = pulsar.getLookupServiceAddress(); brokerZnodePath = LoadManager.LOADBALANCE_BROKERS_ROOT + "/" + lookupServiceAddress; - final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress; updateLocalBrokerData(); brokerDataLock = brokersData.acquireLock(brokerZnodePath, localData).join(); - - timeAverageBrokerDataCache.readModifyUpdateOrCreate(timeAverageZPath, - __ -> new TimeAverageBrokerData()).join(); + pulsarResources.getLoadBalanceResources() + .getBrokerTimeAverageDataResources() + .updateTimeAverageBrokerData(lookupServiceAddress, new TimeAverageBrokerData()) + .join(); updateAll(); } catch (Exception e) { log.error("Unable to acquire lock for broker: [{}]", brokerZnodePath, e); @@ -1154,9 +1149,8 @@ public void writeBundleDataOnZooKeeper() { for (Map.Entry entry : loadData.getBrokerData().entrySet()) { final String broker = entry.getKey(); final TimeAverageBrokerData data = entry.getValue().getTimeAverageData(); - futures.add(timeAverageBrokerDataCache.readModifyUpdateOrCreate( - TIME_AVERAGE_BROKER_ZPATH + "/" + broker, __ -> data) - .thenApply(__ -> null)); + futures.add(pulsarResources.getLoadBalanceResources() + .getBrokerTimeAverageDataResources().updateTimeAverageBrokerData(broker, data)); } try { @@ -1177,13 +1171,13 @@ private void deleteBundleDataFromMetadataStore(String bundle) { } private void deleteTimeAverageDataFromMetadataStoreAsync(String broker) { - final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + broker; - timeAverageBrokerDataCache.delete(timeAverageZPath).whenComplete((__, ex) -> { - if (ex != null && !(ex.getCause() instanceof MetadataStoreException.NotFoundException)) { - log.warn("Failed to delete dead broker {} time " - + "average data from metadata store", broker, ex); - } - }); + pulsarResources.getLoadBalanceResources() + .getBrokerTimeAverageDataResources().deleteTimeAverageBrokerData(broker).whenComplete((__, ex) -> { + if (ex != null && !(ex.getCause() instanceof MetadataStoreException.NotFoundException)) { + log.warn("Failed to delete dead broker {} time " + + "average data from metadata store", broker, ex); + } + }); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index 557393682fb03..f8b5c1258305c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -19,7 +19,7 @@ package org.apache.pulsar.broker.loadbalance.impl; import static java.lang.Thread.sleep; -import static org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH; +import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH; import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -778,7 +778,7 @@ public void testRemoveDeadBrokerTimeAverageData() throws Exception { List data = pulsar1.getLocalMetadataStore() .getMetadataCache(TimeAverageBrokerData.class) - .getChildren(TIME_AVERAGE_BROKER_ZPATH) + .getChildren(BROKER_TIME_AVERAGE_BASE_PATH) .join(); Awaitility.await().untilAsserted(() -> assertTrue(pulsar1.getLeaderElectionService().isLeader())); diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java index 3f8969860163d..a3e5a14a416e4 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/BrokerMonitor.java @@ -19,6 +19,7 @@ package org.apache.pulsar.testclient; import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.BROKER_LOAD_DATA_STORE_TOPIC; +import static org.apache.pulsar.broker.resources.LoadBalanceResources.BROKER_TIME_AVERAGE_BASE_PATH; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; @@ -34,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData; -import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SizeUnit; @@ -172,7 +172,7 @@ private void printGlobalData() { final LocalBrokerData localData = (LocalBrokerData) data; numBundles = localData.getNumBundles(); messageRate = localData.getMsgRateIn() + localData.getMsgRateOut(); - final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker; + final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker; try { final TimeAverageBrokerData timeAverageData = gson.fromJson( new String(zkClient.getData(timeAveragePath, false, null)), @@ -314,7 +314,7 @@ private synchronized void printData(final String path) { printLoadReport(broker, gson.fromJson(jsonString, LoadReport.class)); } else { final LocalBrokerData localBrokerData = gson.fromJson(jsonString, LocalBrokerData.class); - final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker; + final String timeAveragePath = BROKER_TIME_AVERAGE_BASE_PATH + "/" + broker; try { final TimeAverageBrokerData timeAverageData = gson.fromJson( new String(zkClient.getData(timeAveragePath, false, null)), TimeAverageBrokerData.class);