diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 491941d497ccd..afe4d13215c45 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -106,7 +106,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager { public static final int NUM_SHORT_SAMPLES = 10; // Path to ZNode whose children contain ResourceQuota jsons. - public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota/namespace"; + public static final String RESOURCE_QUOTA_ZPATH = "/loadbalance/resource-quota"; // Set of broker candidates to reuse so that object creation is avoided. private final Set brokerCandidateCache; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index f8b5c1258305c..5937af68ec7f2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -77,8 +77,10 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; +import org.apache.pulsar.common.policies.data.ResourceQuota; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.metadata.api.MetadataCache; @@ -99,6 +101,7 @@ import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @Slf4j @@ -785,6 +788,55 @@ public void testRemoveDeadBrokerTimeAverageData() throws Exception { assertEquals(data.size(), 1); } + @DataProvider(name = "isV1") + public Object[][] isV1() { + return new Object[][] {{true}, {false}}; + } + + @Test(dataProvider = "isV1") + public void testBundleDataDefaultValue(boolean isV1) throws Exception { + final String cluster = "use"; + final String tenant = "my-tenant"; + final String namespace = "my-ns"; + NamespaceName ns = isV1 ? NamespaceName.get(tenant, cluster, namespace) : NamespaceName.get(tenant, namespace); + admin1.clusters().createCluster(cluster, ClusterData.builder().serviceUrl("http://" + pulsar1.getAdvertisedAddress()).build()); + admin1.tenants().createTenant(tenant, + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster))); + admin1.namespaces().createNamespace(ns.toString(), 16); + + // set resourceQuota to the first bundle range. + BundlesData bundlesData = admin1.namespaces().getBundles(ns.toString()); + NamespaceBundle namespaceBundle = nsFactory.getBundle(ns, + Range.range(Long.decode(bundlesData.getBoundaries().get(0)), BoundType.CLOSED, Long.decode(bundlesData.getBoundaries().get(1)), + BoundType.OPEN)); + ResourceQuota quota = new ResourceQuota(); + quota.setMsgRateIn(1024.1); + quota.setMsgRateOut(1024.2); + quota.setBandwidthIn(1024.3); + quota.setBandwidthOut(1024.4); + quota.setMemory(1024.0); + admin1.resourceQuotas().setNamespaceBundleResourceQuota(ns.toString(), namespaceBundle.getBundleRange(), quota); + + ModularLoadManagerWrapper loadManagerWrapper = (ModularLoadManagerWrapper) pulsar1.getLoadManager().get(); + ModularLoadManagerImpl lm = (ModularLoadManagerImpl) loadManagerWrapper.getLoadManager(); + + // get the bundleData of the first bundle range. + // The default value of the bundleData be the same as resourceQuota because the resourceQuota is present. + BundleData defaultBundleData = lm.getBundleDataOrDefault(namespaceBundle.toString()); + + TimeAverageMessageData shortTermData = defaultBundleData.getShortTermData(); + TimeAverageMessageData longTermData = defaultBundleData.getLongTermData(); + assertEquals(shortTermData.getMsgRateIn(), 1024.1); + assertEquals(shortTermData.getMsgRateOut(), 1024.2); + assertEquals(shortTermData.getMsgThroughputIn(), 1024.3); + assertEquals(shortTermData.getMsgThroughputOut(), 1024.4); + + assertEquals(longTermData.getMsgRateIn(), 1024.1); + assertEquals(longTermData.getMsgRateOut(), 1024.2); + assertEquals(longTermData.getMsgThroughputIn(), 1024.3); + assertEquals(longTermData.getMsgThroughputOut(), 1024.4); + } + @Test public void testRemoveNonExistBundleData() diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java index f2ccd82b3901a..79efbeba3bc26 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/LoadSimulationController.java @@ -61,7 +61,7 @@ */ public class LoadSimulationController { private static final Logger log = LoggerFactory.getLogger(LoadSimulationController.class); - private static final String QUOTA_ROOT = "/loadbalance/resource-quota/namespace"; + private static final String QUOTA_ROOT = "/loadbalance/resource-quota"; // Input streams for each client to send commands through. private final DataInputStream[] inputStreams;