From 3b3c713192ba43a594090d487289fb4d665d2f4c Mon Sep 17 00:00:00 2001 From: Dragos Misca Date: Wed, 28 Feb 2024 13:07:06 -0800 Subject: [PATCH] [fix][broker] Fix broker not starting when both transactions and the Extensible Load Manager are enabled (#22139) --- .../persistent/PersistentSubscription.java | 4 +- .../service/persistent/PersistentTopic.java | 3 +- .../ExtensibleLoadManagerImplBaseTest.java | 166 ++++++++++++++++++ .../ExtensibleLoadManagerImplTest.java | 125 +------------ ...gerImplWithTransactionCoordinatorTest.java | 55 ++++++ .../messaging/MessagingSmokeTest.java | 107 +++++++++++ .../integration/topologies/PulsarCluster.java | 2 +- .../topologies/PulsarClusterTestBase.java | 15 ++ .../src/test/resources/pulsar-messaging.xml | 1 + 9 files changed, 353 insertions(+), 125 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index a01904d86f32d..50e84310ac183 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -58,6 +58,7 @@ import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.AbstractSubscription; import org.apache.pulsar.broker.service.AnalyzeBacklogResult; @@ -157,7 +158,8 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() - && !isEventSystemTopic(TopicName.get(topicName))) { + && !isEventSystemTopic(TopicName.get(topicName)) + && !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) { this.pendingAckHandle = new PendingAckHandleImpl(this); } else { this.pendingAckHandle = new PendingAckHandleDisabled(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9baafcb2e9e10..db586b7229baf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -350,7 +350,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS TopicName topicName = TopicName.get(topic); if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled() && !isEventSystemTopic(topicName) - && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { + && !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject()) + && !ExtensibleLoadManagerImpl.isInternalTopic(topic)) { this.transactionBuffer = brokerService.getPulsar() .getTransactionBufferProvider().newTransactionBuffer(this); } else { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java new file mode 100644 index 0000000000000..9e20fccff6d93 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplBaseTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import com.google.common.collect.Sets; +import java.util.concurrent.CompletableFuture; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; +import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.common.naming.NamespaceBundle; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicType; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; + +public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServiceBaseTest { + + protected PulsarService pulsar1; + protected PulsarService pulsar2; + + protected PulsarTestContext additionalPulsarTestContext; + + protected ExtensibleLoadManagerImpl primaryLoadManager; + + protected ExtensibleLoadManagerImpl secondaryLoadManager; + + protected ServiceUnitStateChannelImpl channel1; + protected ServiceUnitStateChannelImpl channel2; + + protected final String defaultTestNamespace; + + protected LookupService lookupService; + + protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) { + this.defaultTestNamespace = defaultTestNamespace; + } + + protected ServiceConfiguration initConfig(ServiceConfiguration conf) { + // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid + // stuck when doing unload. + conf.setLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis(5 * 1000); + conf.setLoadBalancerServiceUnitStateMonitorIntervalInSeconds(1); + conf.setForceDeleteNamespaceAllowed(true); + conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); + conf.setAllowAutoTopicCreation(true); + conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); + conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); + conf.setLoadBalancerSheddingEnabled(false); + conf.setLoadBalancerDebugModeEnabled(true); + conf.setTopicLevelPoliciesEnabled(true); + return conf; + } + + @Override + @BeforeClass(alwaysRun = true) + protected void setup() throws Exception { + initConfig(conf); + super.internalSetup(conf); + pulsar1 = pulsar; + var conf2 = initConfig(getDefaultConf()); + additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2); + pulsar2 = additionalPulsarTestContext.getPulsarService(); + + setPrimaryLoadManager(); + setSecondaryLoadManager(); + + admin.clusters().createCluster(this.conf.getClusterName(), + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("public", + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), + Sets.newHashSet(this.conf.getClusterName()))); + admin.namespaces().createNamespace("public/default"); + admin.namespaces().setNamespaceReplicationClusters("public/default", + Sets.newHashSet(this.conf.getClusterName())); + + admin.namespaces().createNamespace(defaultTestNamespace, 128); + admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, + Sets.newHashSet(this.conf.getClusterName())); + lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true); + } + + @Override + @AfterClass(alwaysRun = true) + protected void cleanup() throws Exception { + this.additionalPulsarTestContext.close(); + super.internalCleanup(); + } + + @BeforeMethod(alwaysRun = true) + protected void initializeState() throws PulsarAdminException, IllegalAccessException { + admin.namespaces().unload(defaultTestNamespace); + reset(primaryLoadManager, secondaryLoadManager); + FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); + pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); + pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); + } + + protected void setPrimaryLoadManager() throws IllegalAccessException { + ExtensibleLoadManagerWrapper wrapper = + (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get(); + primaryLoadManager = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper, "loadManager", true)); + FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true); + channel1 = (ServiceUnitStateChannelImpl) + FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true); + } + + private void setSecondaryLoadManager() throws IllegalAccessException { + ExtensibleLoadManagerWrapper wrapper = + (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get(); + secondaryLoadManager = spy((ExtensibleLoadManagerImpl) + FieldUtils.readField(wrapper, "loadManager", true)); + FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true); + channel2 = (ServiceUnitStateChannelImpl) + FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true); + } + + protected CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { + return pulsar.getNamespaceService().getBundleAsync(topic); + } + + protected Pair getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix) + throws Exception { + TopicName changeEventsTopicName = + TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get(); + int i = 0; + while(true) { + TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i); + NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); + if (!bundle.equals(changeEventsBundle)) { + return Pair.of(topicName, bundle); + } + i++; + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index b3d77339cabb6..308a755235c6d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -84,7 +84,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.loadbalance.BrokerFilterException; import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.broker.loadbalance.LeaderElectionService; @@ -110,7 +109,6 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; -import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -124,25 +122,18 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; -import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.policies.data.BrokerAssignment; import org.apache.pulsar.common.policies.data.BundlesData; -import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage; import org.awaitility.Awaitility; import org.testng.AssertJUnit; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -152,81 +143,10 @@ @Slf4j @Test(groups = "broker") @SuppressWarnings("unchecked") -public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { +public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBaseTest { - private PulsarService pulsar1; - private PulsarService pulsar2; - - private PulsarTestContext additionalPulsarTestContext; - - private ExtensibleLoadManagerImpl primaryLoadManager; - - private ExtensibleLoadManagerImpl secondaryLoadManager; - - private ServiceUnitStateChannelImpl channel1; - private ServiceUnitStateChannelImpl channel2; - - private final String defaultTestNamespace = "public/test"; - - private LookupService lookupService; - - private static void initConfig(ServiceConfiguration conf){ - conf.setForceDeleteNamespaceAllowed(true); - conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED); - conf.setAllowAutoTopicCreation(true); - conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); - conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName()); - conf.setLoadBalancerSheddingEnabled(false); - conf.setLoadBalancerDebugModeEnabled(true); - conf.setTopicLevelPoliciesEnabled(true); - } - - @BeforeClass - @Override - public void setup() throws Exception { - // Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid - // stuck when doing unload. - initConfig(conf); - super.internalSetup(conf); - pulsar1 = pulsar; - ServiceConfiguration defaultConf = getDefaultConf(); - initConfig(defaultConf); - additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf); - pulsar2 = additionalPulsarTestContext.getPulsarService(); - - setPrimaryLoadManager(); - - setSecondaryLoadManager(); - - admin.clusters().createCluster(this.conf.getClusterName(), - ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); - admin.tenants().createTenant("public", - new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), - Sets.newHashSet(this.conf.getClusterName()))); - admin.namespaces().createNamespace("public/default"); - admin.namespaces().setNamespaceReplicationClusters("public/default", - Sets.newHashSet(this.conf.getClusterName())); - - admin.namespaces().createNamespace(defaultTestNamespace, 128); - admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, - Sets.newHashSet(this.conf.getClusterName())); - lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true); - } - - @Override - @AfterClass(alwaysRun = true) - protected void cleanup() throws Exception { - this.additionalPulsarTestContext.close(); - super.internalCleanup(); - } - - @BeforeMethod(alwaysRun = true) - protected void initializeState() throws PulsarAdminException, IllegalAccessException { - admin.namespaces().unload(defaultTestNamespace); - reset(primaryLoadManager, secondaryLoadManager); - FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); - pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); - pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true); + public ExtensibleLoadManagerImplTest() { + super("public/test"); } @Test @@ -1678,43 +1598,4 @@ public String name() { } - private void setPrimaryLoadManager() throws IllegalAccessException { - ExtensibleLoadManagerWrapper wrapper = - (ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get(); - primaryLoadManager = spy((ExtensibleLoadManagerImpl) - FieldUtils.readField(wrapper, "loadManager", true)); - FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true); - channel1 = (ServiceUnitStateChannelImpl) - FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true); - } - - private void setSecondaryLoadManager() throws IllegalAccessException { - ExtensibleLoadManagerWrapper wrapper = - (ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get(); - secondaryLoadManager = spy((ExtensibleLoadManagerImpl) - FieldUtils.readField(wrapper, "loadManager", true)); - FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true); - channel2 = (ServiceUnitStateChannelImpl) - FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true); - } - - private CompletableFuture getBundleAsync(PulsarService pulsar, TopicName topic) { - return pulsar.getNamespaceService().getBundleAsync(topic); - } - - private Pair getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix) - throws Exception { - TopicName changeEventsTopicName = - TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); - NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get(); - int i = 0; - while(true) { - TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i); - NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); - if (!bundle.equals(changeEventsBundle)) { - return Pair.of(topicName, bundle); - } - i++; - } - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java new file mode 100644 index 0000000000000..0c95dd85f28e0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplWithTransactionCoordinatorTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.extensions; + +import static org.testng.Assert.assertEquals; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class ExtensibleLoadManagerImplWithTransactionCoordinatorTest extends ExtensibleLoadManagerImplBaseTest { + + public ExtensibleLoadManagerImplWithTransactionCoordinatorTest() { + super("public/test-elb-with-tx"); + } + + @Override + protected ServiceConfiguration initConfig(ServiceConfiguration conf) { + conf = super.initConfig(conf); + conf.setTransactionCoordinatorEnabled(true); + return conf; + } + + @Test(timeOut = 30 * 1000) + public void testUnloadAdminAPI() throws Exception { + var topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-unload"); + var topicName = topicAndBundle.getLeft(); + var bundle = topicAndBundle.getRight(); + + var srcBroker = admin.lookups().lookupTopic(topicName.toString()); + var dstBroker = srcBroker.equals(pulsar1.getBrokerServiceUrl()) ? pulsar2 : pulsar1; + var dstBrokerUrl = dstBroker.getBrokerId(); + var dstBrokerServiceUrl = dstBroker.getBrokerServiceUrl(); + + admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), dstBrokerUrl); + Awaitility.await().untilAsserted( + () -> assertEquals(admin.lookups().lookupTopic(topicName.toString()), dstBrokerServiceUrl)); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java new file mode 100644 index 0000000000000..618053ac000e2 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/MessagingSmokeTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.messaging; + +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder; +import org.apache.pulsar.common.naming.TopicDomain; +import org.testng.ITest; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +public class MessagingSmokeTest extends TopicMessagingBase implements ITest { + + @Factory + public static Object[] messagingTests() { + List tests = List.of( + new MessagingSmokeTest("Extensible Load Manager", + Map.of("loadManagerClassName", ExtensibleLoadManagerImpl.class.getName(), + "loadBalancerLoadSheddingStrategy", TransferShedder.class.getName())), + new MessagingSmokeTest("Extensible Load Manager with TX Coordinator", + Map.of("loadManagerClassName", ExtensibleLoadManagerImpl.class.getName(), + "loadBalancerLoadSheddingStrategy", TransferShedder.class.getName(), + "transactionCoordinatorEnabled", "true")) + ); + return tests.toArray(); + } + + private final String name; + + public MessagingSmokeTest(String name, Map brokerEnvs) { + super(); + this.brokerEnvs.putAll(brokerEnvs); + this.name = name; + } + + @Override + public String getTestName() { + return name; + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testNonPartitionedTopicMessagingWithExclusive(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + nonPartitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testPartitionedTopicMessagingWithExclusive(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + partitionedTopicSendAndReceiveWithExclusive(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testNonPartitionedTopicMessagingWithFailover(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + nonPartitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testPartitionedTopicMessagingWithFailover(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + partitionedTopicSendAndReceiveWithFailover(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testNonPartitionedTopicMessagingWithShared(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + nonPartitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testPartitionedTopicMessagingWithShared(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + partitionedTopicSendAndReceiveWithShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testNonPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + nonPartitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } + + @Test(dataProvider = "serviceUrlAndTopicDomain") + public void testPartitionedTopicMessagingWithKeyShared(Supplier serviceUrl, TopicDomain topicDomain) + throws Exception { + partitionedTopicSendAndReceiveWithKeyShared(serviceUrl.get(), TopicDomain.persistent.equals(topicDomain)); + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 4aa3e15f45dcd..90f08a9639471 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -150,7 +150,7 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s this.brokerContainers = Maps.newTreeMap(); this.workerContainers = Maps.newTreeMap(); - this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, spec.enableTls) + this.proxyContainer = new ProxyContainer(clusterName, appendClusterName(ProxyContainer.NAME), spec.enableTls) .withNetwork(network) .withNetworkAliases(appendClusterName("pulsar-proxy")) .withEnv("metadataStoreUrl", metadataStoreUrl) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java index ae9e44fa98254..93e2221ab2493 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarClusterTestBase.java @@ -25,6 +25,7 @@ import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.naming.TopicDomain; import org.testng.annotations.DataProvider; import java.util.stream.Stream; @@ -86,6 +87,20 @@ public Object[][] serviceAndAdminUrls() { }; } + @DataProvider + public Object[][] serviceUrlAndTopicDomain() { + return new Object[][] { + { + stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()), + TopicDomain.persistent + }, + { + stringSupplier(() -> getPulsarCluster().getPlainTextServiceUrl()), + TopicDomain.non_persistent + }, + }; + } + protected PulsarAdmin pulsarAdmin; protected PulsarCluster pulsarCluster; diff --git a/tests/integration/src/test/resources/pulsar-messaging.xml b/tests/integration/src/test/resources/pulsar-messaging.xml index 603756fab68b7..a34670267dc2a 100644 --- a/tests/integration/src/test/resources/pulsar-messaging.xml +++ b/tests/integration/src/test/resources/pulsar-messaging.xml @@ -28,6 +28,7 @@ +