Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Fix broker not starting when both transactions and the …
Browse files Browse the repository at this point in the history
…Extensible Load Manager are enabled (apache#22139)
  • Loading branch information
dragosvictor authored Feb 28, 2024
1 parent f33a3f4 commit 3b3c713
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
Expand Down Expand Up @@ -157,7 +158,8 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(TopicName.get(topicName))) {
&& !isEventSystemTopic(TopicName.get(topicName))
&& !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
this.pendingAckHandle = new PendingAckHandleImpl(this);
} else {
this.pendingAckHandle = new PendingAckHandleDisabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
TopicName topicName = TopicName.get(topic);
if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
&& !isEventSystemTopic(topicName)
&& !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
&& !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())
&& !ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
this.transactionBuffer = brokerService.getPulsar()
.getTransactionBufferProvider().newTransactionBuffer(this);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance.extensions;

import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import com.google.common.collect.Sets;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;

public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServiceBaseTest {

protected PulsarService pulsar1;
protected PulsarService pulsar2;

protected PulsarTestContext additionalPulsarTestContext;

protected ExtensibleLoadManagerImpl primaryLoadManager;

protected ExtensibleLoadManagerImpl secondaryLoadManager;

protected ServiceUnitStateChannelImpl channel1;
protected ServiceUnitStateChannelImpl channel2;

protected final String defaultTestNamespace;

protected LookupService lookupService;

protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {
this.defaultTestNamespace = defaultTestNamespace;
}

protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
// Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
// stuck when doing unload.
conf.setLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis(5 * 1000);
conf.setLoadBalancerServiceUnitStateMonitorIntervalInSeconds(1);
conf.setForceDeleteNamespaceAllowed(true);
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerSheddingEnabled(false);
conf.setLoadBalancerDebugModeEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
return conf;
}

@Override
@BeforeClass(alwaysRun = true)
protected void setup() throws Exception {
initConfig(conf);
super.internalSetup(conf);
pulsar1 = pulsar;
var conf2 = initConfig(getDefaultConf());
additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2);
pulsar2 = additionalPulsarTestContext.getPulsarService();

setPrimaryLoadManager();
setSecondaryLoadManager();

admin.clusters().createCluster(this.conf.getClusterName(),
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
admin.tenants().createTenant("public",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet(this.conf.getClusterName())));
admin.namespaces().createNamespace("public/default");
admin.namespaces().setNamespaceReplicationClusters("public/default",
Sets.newHashSet(this.conf.getClusterName()));

admin.namespaces().createNamespace(defaultTestNamespace, 128);
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
Sets.newHashSet(this.conf.getClusterName()));
lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
}

@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
this.additionalPulsarTestContext.close();
super.internalCleanup();
}

@BeforeMethod(alwaysRun = true)
protected void initializeState() throws PulsarAdminException, IllegalAccessException {
admin.namespaces().unload(defaultTestNamespace);
reset(primaryLoadManager, secondaryLoadManager);
FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true);
pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
}

protected void setPrimaryLoadManager() throws IllegalAccessException {
ExtensibleLoadManagerWrapper wrapper =
(ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
primaryLoadManager = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(wrapper, "loadManager", true));
FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true);
channel1 = (ServiceUnitStateChannelImpl)
FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true);
}

private void setSecondaryLoadManager() throws IllegalAccessException {
ExtensibleLoadManagerWrapper wrapper =
(ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(wrapper, "loadManager", true));
FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true);
channel2 = (ServiceUnitStateChannelImpl)
FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
}

protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
return pulsar.getNamespaceService().getBundleAsync(topic);
}

protected Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
throws Exception {
TopicName changeEventsTopicName =
TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
int i = 0;
while(true) {
TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
if (!bundle.equals(changeEventsBundle)) {
return Pair.of(topicName, bundle);
}
i++;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
Expand All @@ -110,7 +109,6 @@
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand All @@ -124,25 +122,18 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.awaitility.Awaitility;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -152,81 +143,10 @@
@Slf4j
@Test(groups = "broker")
@SuppressWarnings("unchecked")
public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBaseTest {

private PulsarService pulsar1;
private PulsarService pulsar2;

private PulsarTestContext additionalPulsarTestContext;

private ExtensibleLoadManagerImpl primaryLoadManager;

private ExtensibleLoadManagerImpl secondaryLoadManager;

private ServiceUnitStateChannelImpl channel1;
private ServiceUnitStateChannelImpl channel2;

private final String defaultTestNamespace = "public/test";

private LookupService lookupService;

private static void initConfig(ServiceConfiguration conf){
conf.setForceDeleteNamespaceAllowed(true);
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
conf.setLoadBalancerSheddingEnabled(false);
conf.setLoadBalancerDebugModeEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
}

@BeforeClass
@Override
public void setup() throws Exception {
// Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
// stuck when doing unload.
initConfig(conf);
super.internalSetup(conf);
pulsar1 = pulsar;
ServiceConfiguration defaultConf = getDefaultConf();
initConfig(defaultConf);
additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
pulsar2 = additionalPulsarTestContext.getPulsarService();

setPrimaryLoadManager();

setSecondaryLoadManager();

admin.clusters().createCluster(this.conf.getClusterName(),
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
admin.tenants().createTenant("public",
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
Sets.newHashSet(this.conf.getClusterName())));
admin.namespaces().createNamespace("public/default");
admin.namespaces().setNamespaceReplicationClusters("public/default",
Sets.newHashSet(this.conf.getClusterName()));

admin.namespaces().createNamespace(defaultTestNamespace, 128);
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
Sets.newHashSet(this.conf.getClusterName()));
lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
}

@Override
@AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
this.additionalPulsarTestContext.close();
super.internalCleanup();
}

@BeforeMethod(alwaysRun = true)
protected void initializeState() throws PulsarAdminException, IllegalAccessException {
admin.namespaces().unload(defaultTestNamespace);
reset(primaryLoadManager, secondaryLoadManager);
FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true);
pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
public ExtensibleLoadManagerImplTest() {
super("public/test");
}

@Test
Expand Down Expand Up @@ -1678,43 +1598,4 @@ public String name() {

}

private void setPrimaryLoadManager() throws IllegalAccessException {
ExtensibleLoadManagerWrapper wrapper =
(ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
primaryLoadManager = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(wrapper, "loadManager", true));
FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true);
channel1 = (ServiceUnitStateChannelImpl)
FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true);
}

private void setSecondaryLoadManager() throws IllegalAccessException {
ExtensibleLoadManagerWrapper wrapper =
(ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
FieldUtils.readField(wrapper, "loadManager", true));
FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true);
channel2 = (ServiceUnitStateChannelImpl)
FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
}

private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
return pulsar.getNamespaceService().getBundleAsync(topic);
}

private Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
throws Exception {
TopicName changeEventsTopicName =
TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
int i = 0;
while(true) {
TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
if (!bundle.equals(changeEventsBundle)) {
return Pair.of(topicName, bundle);
}
i++;
}
}
}
Loading

0 comments on commit 3b3c713

Please sign in to comment.