diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java index 843cec7b20594..b0cc50edf1f1d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/ClusterResources.java @@ -29,6 +29,7 @@ import lombok.Getter; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl; import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStore; @@ -39,10 +40,19 @@ public class ClusterResources extends BaseResources { @Getter private FailureDomainResources failureDomainResources; - - public ClusterResources(MetadataStore store, int operationTimeoutSec) { - super(store, ClusterData.class, operationTimeoutSec); - this.failureDomainResources = new FailureDomainResources(store, FailureDomainImpl.class, operationTimeoutSec); + @Getter + private ClusterPoliciesResources clusterPoliciesResources; + + public ClusterResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) { + super(configurationStore, ClusterData.class, operationTimeoutSec); + this.failureDomainResources = new FailureDomainResources(configurationStore, FailureDomainImpl.class, + operationTimeoutSec); + if (localStore != null) { + this.clusterPoliciesResources = new ClusterPoliciesResources(localStore, ClusterPoliciesImpl.class, + operationTimeoutSec); + } else { + this.clusterPoliciesResources = null; + } } public CompletableFuture> listAsync() { @@ -216,4 +226,26 @@ public void registerListener(Consumer listener) { }); } } + + public static class ClusterPoliciesResources extends BaseResources { + public static final String LOCAL_POLICIES_PATH = "policies"; + + public ClusterPoliciesResources(MetadataStore store, Class clazz, + int operationTimeoutSec) { + super(store, clazz, operationTimeoutSec); + } + + public Optional getClusterPolicies(String clusterName) throws MetadataStoreException { + return get(joinPath(BASE_CLUSTERS_PATH, clusterName, LOCAL_POLICIES_PATH)); + } + + public CompletableFuture> getClusterPoliciesAsync(String clusterName) { + return getAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, LOCAL_POLICIES_PATH)); + } + + public CompletableFuture setPoliciesWithCreateAsync(String clusterName, + Function, ClusterPoliciesImpl> createFunction) { + return setWithCreateAsync(joinPath(BASE_CLUSTERS_PATH, clusterName, LOCAL_POLICIES_PATH), createFunction); + } + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java index ad872a5356cf4..fe7ffe0bc7b43 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java @@ -61,7 +61,8 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura int operationTimeoutSec) { if (configurationMetadataStore != null) { tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec); - clusterResources = new ClusterResources(configurationMetadataStore, operationTimeoutSec); + clusterResources = new ClusterResources(localMetadataStore, configurationMetadataStore, + operationTimeoutSec); namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec); resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 5d4ed54c33466..b8743933098fe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -60,8 +60,10 @@ import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.ClusterPolicies; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; +import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl; import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies; @@ -247,6 +249,41 @@ public void updateCluster( }); } + @GET + @Path("/{cluster}/migrate") + @ApiOperation( + value = "Get the cluster migration configuration for the specified cluster.", + response = ClusterDataImpl.class, + notes = "This operation requires Pulsar superuser privileges." + ) + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Return the cluster data.", response = ClusterDataImpl.class), + @ApiResponse(code = 403, message = "Don't have admin permission."), + @ApiResponse(code = 404, message = "Cluster doesn't exist."), + @ApiResponse(code = 500, message = "Internal server error.") + }) + public ClusterPolicies getClusterMigration( + @ApiParam( + value = "The cluster name", + required = true + ) + @PathParam("cluster") String cluster + ) { + validateSuperUserAccess(); + + try { + return clusterResources().getClusterPoliciesResources().getClusterPolicies(cluster) + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Cluster does not exist")); + } catch (Exception e) { + log.error("[{}] Failed to get cluster {}", clientAppId(), cluster, e); + if (e instanceof RestException) { + throw (RestException) e; + } else { + throw new RestException(e); + } + } + } + @POST @Path("/{cluster}/migrate") @ApiOperation( @@ -286,8 +323,9 @@ public void updateClusterMigration( } validateSuperUserAccessAsync() .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> clusterResources().updateClusterAsync(cluster, old -> { - ClusterDataImpl data = (ClusterDataImpl) old; + .thenCompose(__ -> clusterResources().getClusterPoliciesResources().setPoliciesWithCreateAsync(cluster, + old -> { + ClusterPoliciesImpl data = old.orElse(new ClusterPoliciesImpl()); data.setMigrated(isMigrated); data.setMigratedClusterUrl(clusterUrl); return data; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index a8f25f61a9451..7a23312c47755 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -65,7 +65,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.EntryFilters; import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; @@ -1358,14 +1358,29 @@ public static CompletableFuture isClusterMigrationEnabled(PulsarService } public static CompletableFuture> getMigratedClusterUrlAsync(PulsarService pulsar, - String topic) { - return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName()) + String topic) { + CompletableFuture> result = new CompletableFuture<>(); + pulsar.getPulsarResources().getClusterResources().getClusterPoliciesResources() + .getClusterPoliciesAsync(pulsar.getConfig().getClusterName()) .thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic), - ((clusterData, isNamespaceMigrationEnabled) - -> ((clusterData.isPresent() && clusterData.get().isMigrated()) - || isNamespaceMigrationEnabled) - ? Optional.ofNullable(clusterData.get().getMigratedClusterUrl()) - : Optional.empty())); + ((clusterData, isNamespaceMigrationEnabled) -> { + Optional url = ((clusterData.isPresent() && clusterData.get().isMigrated()) + || isNamespaceMigrationEnabled) + ? Optional.ofNullable(clusterData.get().getMigratedClusterUrl()) + : Optional.empty(); + return url; + })) + .thenAccept(res -> { + // cluster policies future is completed by metadata-store thread and continuing further + // processing in the same metadata store can cause deadlock while creating topic as + // create topic path may have blocking call on metadata-store. so, complete future on a + // separate thread to avoid deadlock. + pulsar.getExecutor().execute(() -> result.complete(res)); + }).exceptionally(ex -> { + pulsar.getExecutor().execute(() -> result.completeExceptionally(ex.getCause())); + return null; + }); + return result; } private static CompletableFuture isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 023ede74b4f91..e72c805d73879 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -56,7 +56,7 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.protocol.Commands; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index f7d2bb2dd2797..acaa7c02d197d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -50,7 +50,7 @@ import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.api.proto.ServerError; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl; import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 95f139dc11e43..da51eaea8fb39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -152,7 +152,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 76e9f261ca6a9..f3857b5ad2aef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -73,7 +73,7 @@ import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4d35d284d3299..a8a921f3b624c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -147,7 +147,7 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats; import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java index 2139a7bc12ed2..2fa201cf95844 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ClusterMigrationTest.java @@ -45,7 +45,7 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; @@ -86,36 +86,12 @@ public class ClusterMigrationTest { PulsarService pulsar4; PulsarAdmin admin4; - @DataProvider(name = "TopicsubscriptionTypes") - public Object[][] subscriptionTypes() { - return new Object[][] { - {true, SubscriptionType.Shared}, - {true, SubscriptionType.Key_Shared}, - {true, SubscriptionType.Shared}, - {true, SubscriptionType.Key_Shared}, - - {false, SubscriptionType.Shared}, - {false, SubscriptionType.Key_Shared}, - {false, SubscriptionType.Shared}, - {false, SubscriptionType.Key_Shared}, - }; - } - @DataProvider(name="NamespaceMigrationTopicSubscriptionTypes") public Object[][] namespaceMigrationSubscriptionTypes() { return new Object[][] { - {true, SubscriptionType.Shared, true, false}, - {true, SubscriptionType.Key_Shared, true, false}, - {true, SubscriptionType.Shared, false, true}, - {true, SubscriptionType.Key_Shared, false, true}, - {true, SubscriptionType.Shared, true, true}, - {true, SubscriptionType.Key_Shared, true, true}, - {false, SubscriptionType.Shared, true, false}, - {false, SubscriptionType.Key_Shared, true, false}, - {false, SubscriptionType.Shared, false, true}, - {false, SubscriptionType.Key_Shared,false, true}, - {false, SubscriptionType.Shared, true, true}, - {false, SubscriptionType.Key_Shared,true, true}, + {SubscriptionType.Shared, true, false}, + {SubscriptionType.Shared, false, true}, + {SubscriptionType.Shared, true, true}, }; } @@ -227,14 +203,14 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true, timeOut = 300000) protected void cleanup() throws Exception { log.info("--- Shutting down ---"); - broker1.cleanup(); admin1.close(); - broker2.cleanup(); admin2.close(); - broker3.cleanup(); admin3.close(); - broker4.cleanup(); admin4.close(); + broker1.cleanup(); + broker2.cleanup(); + broker3.cleanup(); + broker4.cleanup(); } @BeforeMethod(alwaysRun = true) @@ -259,11 +235,11 @@ public void beforeMethod(Method m) throws Exception { * (11) Restart Broker-1 and connect producer/consumer on cluster-1 * @throws Exception */ - @Test(dataProvider = "TopicsubscriptionTypes") - public void testClusterMigration(boolean persistent, SubscriptionType subType) throws Exception { + @Test + public void testClusterMigration() throws Exception { log.info("--- Starting ReplicatorTest::testClusterMigration ---"); final String topicName = BrokerTestUtil - .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + .newUniqueName("persistent://" + namespace + "/migrationTopic"); @Cleanup PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) @@ -271,7 +247,7 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t // cluster-1 producer/consumer Producer producer1 = client1.newProducer().topic(topicName).enableBatching(false) .producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); - Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionType(subType) + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) .subscriptionName("s1").subscribe(); AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get(); retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500); @@ -298,6 +274,7 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(), pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()); admin1.clusters().updateClusterMigration("r1", true, migratedUrl); + assertEquals(admin1.clusters().getClusterMigration("r1").getMigratedClusterUrl(), migratedUrl); retryStrategically((test) -> { try { @@ -330,12 +307,10 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t // try to consume backlog messages from cluster-1 consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); - if (persistent) { - for (int i = 0; i < n; i++) { - Message msg = consumer1.receive(); - assertEquals(msg.getData(), "test1".getBytes()); - consumer1.acknowledge(msg); - } + for (int i = 0; i < n; i++) { + Message msg = consumer1.receive(); + assertEquals(msg.getData(), "test1".getBytes()); + consumer1.acknowledge(msg); } // after consuming all messages, consumer should have disconnected // from cluster-1 and reconnect with cluster-2 @@ -351,13 +326,13 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t assertTrue(topic1.getSubscriptions().isEmpty()); // not also create a new consumer which should also reconnect to cluster-2 - Consumer consumer2 = client1.newConsumer().topic(topicName).subscriptionType(subType) + Consumer consumer2 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) .subscriptionName("s2").subscribe(); retryStrategically((test) -> topic2.getSubscription("s2") != null, 10, 500); assertFalse(topic2.getSubscription("s2").getConsumers().isEmpty()); // new sub on migration topic must be redirected immediately - Consumer consumerM = client1.newConsumer().topic(topicName).subscriptionType(subType) + Consumer consumerM = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) .subscriptionName("sM").subscribe(); assertFalse(pulsar2.getBrokerService().getTopicReference(topicName).get().getSubscription("sM").getConsumers() .isEmpty()); @@ -365,7 +340,7 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t // migrate topic after creating subscription String newTopicName = topicName + "-new"; - consumerM = client1.newConsumer().topic(newTopicName).subscriptionType(subType) + consumerM = client1.newConsumer().topic(newTopicName).subscriptionType(SubscriptionType.Shared) .subscriptionName("sM").subscribe(); retryStrategically((t) -> pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100); pulsar2.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get(); @@ -392,8 +367,8 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t // create non-migrated topic which should connect to cluster-1 String diffTopic = BrokerTestUtil - .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); - Consumer consumerDiff = client1.newConsumer().topic(diffTopic).subscriptionType(subType) + .newUniqueName("persistent://" + namespace + "/migrationTopic"); + Consumer consumerDiff = client1.newConsumer().topic(diffTopic).subscriptionType(SubscriptionType.Shared) .subscriptionName("s1-d").subscribe(); Producer producerDiff = client1.newProducer().topic(diffTopic).enableBatching(false) .producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); @@ -408,7 +383,7 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t broker1.restart(); Producer producer4 = client1.newProducer().topic(topicName).enableBatching(false) .producerName("cluster1-4").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); - Consumer consumer3 = client1.newConsumer().topic(topicName).subscriptionType(subType) + Consumer consumer3 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) .subscriptionName("s3").subscribe(); retryStrategically((test) -> topic2.getProducers().size() == 4, 10, 500); assertTrue(topic2.getProducers().size() == 4); @@ -421,15 +396,16 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t assertEquals(consumer3.receive(2, TimeUnit.SECONDS).getData(), "test3".getBytes()); } + client1.close(); + client2.close(); log.info("Successfully consumed messages by migrated consumers"); } - @Test(dataProvider = "TopicsubscriptionTypes") - public void testClusterMigrationWithReplicationBacklog(boolean persistent, SubscriptionType subType) throws Exception { + @Test + public void testClusterMigrationWithReplicationBacklog() throws Exception { log.info("--- Starting ReplicatorTest::testClusterMigrationWithReplicationBacklog ---"); - persistent = true; final String topicName = BrokerTestUtil - .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + .newUniqueName("persistent://" + namespace + "/migrationTopic"); @Cleanup PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) @@ -440,11 +416,11 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc // cluster-1 producer/consumer Producer producer1 = client1.newProducer().topic(topicName).enableBatching(false) .producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create(); - Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionType(subType) + Consumer consumer1 = client1.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) .subscriptionName("s1").subscribe(); // cluster-3 consumer - Consumer consumer3 = client3.newConsumer().topic(topicName).subscriptionType(subType) + Consumer consumer3 = client3.newConsumer().topic(topicName).subscriptionType(SubscriptionType.Shared) .subscriptionName("s1").subscribe(); AbstractTopic topic1 = (AbstractTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null).get(); retryStrategically((test) -> !topic1.getProducers().isEmpty(), 5, 500); @@ -526,6 +502,10 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc // verify that the producer1 is now is now connected to migrated cluster "r2" since backlog is cleared. retryStrategically((test) -> topic2.getProducers().size()==2, 10, 500); assertEquals(topic2.getProducers().size(), 2); + + client1.close(); + client2.close(); + client3.close(); } /** @@ -638,17 +618,19 @@ public void testClusterMigrationWithResourceCreated() throws Exception { } }, 10, 500); assertFalse(pulsar2Topic.getSubscription("s1").getConsumers().isEmpty()); + + client1.close(); } @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes") - public void testNamespaceMigration(boolean persistent, SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception { + public void testNamespaceMigration(SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception { log.info("--- Starting Test::testNamespaceMigration ---"); // topic for the namespace1 (to be migrated) final String topicName = BrokerTestUtil - .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + .newUniqueName("persistent://" + namespace + "/migrationTopic"); // topic for namespace2 (not to be migrated) final String topicName2 = BrokerTestUtil - .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespaceNotToMigrate + "/migrationTopic"); + .newUniqueName("persistent://" + namespaceNotToMigrate + "/migrationTopic"); @Cleanup PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) @@ -764,16 +746,14 @@ public void testNamespaceMigration(boolean persistent, SubscriptionType subType, // try to consume backlog messages from cluster-1 blueConsumerNs1_1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe(); blueConsumerNs2_1 = client1.newConsumer().topic(topicName2).subscriptionName("s1").subscribe(); - if (persistent) { - for (int i = 0; i < n; i++) { - Message msg = blueConsumerNs1_1.receive(); - assertEquals(msg.getData(), "test1".getBytes()); - blueConsumerNs1_1.acknowledge(msg); - - Message msg2 = blueConsumerNs2_1.receive(); - assertEquals(msg2.getData(), "test1".getBytes()); - blueConsumerNs2_1.acknowledge(msg2); - } + for (int i = 0; i < n; i++) { + Message msg = blueConsumerNs1_1.receive(); + assertEquals(msg.getData(), "test1".getBytes()); + blueConsumerNs1_1.acknowledge(msg); + + Message msg2 = blueConsumerNs2_1.receive(); + assertEquals(msg2.getData(), "test1".getBytes()); + blueConsumerNs2_1.acknowledge(msg2); } // after consuming all messages, consumer should have disconnected // from blue and reconnect with green @@ -862,7 +842,7 @@ public void testNamespaceMigration(boolean persistent, SubscriptionType subType, // create non-migrated topic which should connect to blue String diffTopic = BrokerTestUtil - .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + .newUniqueName("persistent://" + namespace + "/migrationTopic"); Consumer consumerDiff = client1.newConsumer().topic(diffTopic).subscriptionType(subType) .subscriptionName("s1-d").subscribe(); Producer producerDiff = client1.newProducer().topic(diffTopic).enableBatching(false) @@ -902,18 +882,19 @@ public void testNamespaceMigration(boolean persistent, SubscriptionType subType, blueProducerNs2_1.close(); greenProducerNs1_1.close(); greenProducerNs2_1.close(); + client1.close(); + client2.close(); } @Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes") - public void testNamespaceMigrationWithReplicationBacklog(boolean persistent, SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception { + public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subType, boolean isClusterMigrate, boolean isNamespaceMigrate) throws Exception { log.info("--- Starting ReplicatorTest::testNamespaceMigrationWithReplicationBacklog ---"); - persistent = true; // topic for namespace1 (to be migrated) final String topicName = BrokerTestUtil - .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespace + "/migrationTopic"); + .newUniqueName("persistent://" + namespace + "/migrationTopic"); // topic for namespace2 (not to be migrated) final String topicName2 = BrokerTestUtil - .newUniqueName((persistent ? "persistent" : "non-persistent") + "://" + namespaceNotToMigrate + "/migrationTopic"); + .newUniqueName("persistent://" + namespaceNotToMigrate + "/migrationTopic"); @Cleanup PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS) @@ -1069,6 +1050,9 @@ public void testNamespaceMigrationWithReplicationBacklog(boolean persistent, Sub blueConsumerNs2_1.close(); greenProducerNs1_1.close(); greenProducerNs2_1.close(); + client1.close(); + client2.close(); + client3.close(); } static class TestBroker extends MockedPulsarServiceBaseTest { diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java index 4178bc7483df5..53e6680946566 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Clusters.java @@ -29,7 +29,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; +import org.apache.pulsar.common.policies.data.ClusterPolicies; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; @@ -209,6 +210,46 @@ public interface Clusters { */ CompletableFuture updatePeerClusterNamesAsync(String cluster, LinkedHashSet peerClusterNames); + /** + * Get the cluster migration configuration data for the specified cluster. + *

+ * Response Example: + * + *

+     * { serviceUrl : "http://my-broker.example.com:8080/" }
+     * 
+ * + * @param cluster + * Cluster name + * + * @return the cluster configuration + * + * @throws NotAuthorizedException + * You don't have admin permission to get the configuration of the cluster + * @throws NotFoundException + * Cluster doesn't exist + * @throws PulsarAdminException + * Unexpected error + */ + ClusterPolicies getClusterMigration(String cluster) throws PulsarAdminException; + + /** + * Get the cluster migration configuration data for the specified cluster asynchronously. + *

+ * Response Example: + * + *

+     * { serviceUrl : "http://my-broker.example.com:8080/" }
+     * 
+ * + * @param cluster + * Cluster name + * + * @return the cluster configuration + * + */ + CompletableFuture getClusterMigrationAsync(String cluster); + /** * Update the configuration for a cluster migration. *

diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java index 0b3e5aa49cb83..1f7126521c6d6 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterData.java @@ -19,9 +19,6 @@ package org.apache.pulsar.common.policies.data; import java.util.LinkedHashSet; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; import org.apache.pulsar.client.admin.utils.ReflectionUtils; import org.apache.pulsar.client.api.ProxyProtocol; @@ -70,10 +67,6 @@ public interface ClusterData { String getListenerName(); - boolean isMigrated(); - - ClusterUrl getMigratedClusterUrl(); - interface Builder { Builder serviceUrl(String serviceUrl); @@ -119,10 +112,6 @@ interface Builder { Builder listenerName(String listenerName); - Builder migrated(boolean migrated); - - Builder migratedClusterUrl(ClusterUrl migratedClusterUrl); - ClusterData build(); } @@ -131,19 +120,4 @@ interface Builder { static Builder builder() { return ReflectionUtils.newBuilder("org.apache.pulsar.common.policies.data.ClusterDataImpl"); } - - @Data - @NoArgsConstructor - @AllArgsConstructor - class ClusterUrl { - String serviceUrl; - String serviceUrlTls; - String brokerServiceUrl; - String brokerServiceUrlTls; - - public boolean isEmpty() { - return serviceUrl != null && serviceUrlTls != null && brokerServiceUrl == null - && brokerServiceUrlTls == null; - } - } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterPolicies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterPolicies.java new file mode 100644 index 0000000000000..b95f6bb19ce2e --- /dev/null +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/ClusterPolicies.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.apache.pulsar.client.admin.utils.ReflectionUtils; + +public interface ClusterPolicies { + boolean isMigrated(); + + ClusterUrl getMigratedClusterUrl(); + + interface Builder { + Builder migrated(boolean migrated); + + Builder migratedClusterUrl(ClusterUrl migratedClusterUrl); + + ClusterPolicies build(); + } + + Builder clone(); + + static Builder builder() { + return ReflectionUtils.newBuilder("org.apache.pulsar.common.policies.data.ClusterPoliciesImpl"); + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + @EqualsAndHashCode + class ClusterUrl { + String serviceUrl; + String serviceUrlTls; + String brokerServiceUrl; + String brokerServiceUrlTls; + + public boolean isEmpty() { + return serviceUrl != null && serviceUrlTls != null && brokerServiceUrl == null + && brokerServiceUrlTls == null; + } + } +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java index 02e44aca62604..231d4506d6173 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ClustersImpl.java @@ -34,8 +34,10 @@ import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData; import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.ClusterPolicies; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; +import org.apache.pulsar.common.policies.data.ClusterPoliciesImpl; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.FailureDomainImpl; import org.apache.pulsar.common.policies.data.NamespaceIsolationData; @@ -107,6 +109,18 @@ public CompletableFuture updatePeerClusterNamesAsync(String cluster, Linke return asyncPostRequest(path, Entity.entity(peerClusterNames, MediaType.APPLICATION_JSON)); } + @Override + public ClusterPolicies getClusterMigration(String cluster) throws PulsarAdminException { + return sync(() -> getClusterMigrationAsync(cluster)); + } + + @Override + public CompletableFuture getClusterMigrationAsync(String cluster) { + WebTarget path = adminClusters.path(cluster).path("migrate"); + return asyncGetRequest(path, new FutureCallback() { + }).thenApply(policies -> policies); + } + @Override public void updateClusterMigration(String cluster, boolean isMigrated, ClusterUrl clusterUrl) throws PulsarAdminException { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java index 033146aa607b0..646f4ef0f50cf 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdClusters.java @@ -32,8 +32,8 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.ProxyProtocol; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl; import org.apache.pulsar.common.policies.data.ClusterDataImpl; +import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; import org.apache.pulsar.common.policies.data.FailureDomain; import org.apache.pulsar.common.policies.data.FailureDomainImpl; @@ -154,6 +154,17 @@ void run() throws PulsarAdminException { } } + @Parameters(commandDescription = "Get the cluster migration configuration data for the specified cluster") + private class GetClusterMigration extends CliCommand { + @Parameter(description = "cluster-name", required = true) + private java.util.List params; + + void run() throws PulsarAdminException { + String cluster = getOneArgument(params); + print(getAdmin().clusters().getClusterMigration(cluster)); + } + } + @Parameters(commandDescription = "Update cluster migration") private class UpdateClusterMigration extends CliCommand { @Parameter(description = "cluster-name", required = true) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java index 6a7110e65072d..fffe87a300562 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java @@ -176,17 +176,6 @@ public final class ClusterDataImpl implements ClusterData, Cloneable { example = "" ) private String listenerName; - @ApiModelProperty( - name = "migrated", - value = "flag to check if cluster is migrated to different cluster", - example = "true/false" - ) - private boolean migrated; - @ApiModelProperty( - name = "migratedClusterUrl", - value = "url of cluster where current cluster is migrated" - ) - private ClusterUrl migratedClusterUrl; public static ClusterDataImplBuilder builder() { return new ClusterDataImplBuilder(); @@ -216,9 +205,7 @@ public ClusterDataImplBuilder clone() { .brokerClientTrustCertsFilePath(brokerClientTrustCertsFilePath) .brokerClientCertificateFilePath(brokerClientCertificateFilePath) .brokerClientKeyFilePath(brokerClientKeyFilePath) - .listenerName(listenerName) - .migrated(migrated) - .migratedClusterUrl(migratedClusterUrl); + .listenerName(listenerName); } @Data @@ -245,8 +232,6 @@ public static class ClusterDataImplBuilder implements ClusterData.Builder { private String brokerClientKeyFilePath; private String brokerClientTrustCertsFilePath; private String listenerName; - private boolean migrated; - private ClusterUrl migratedClusterUrl; ClusterDataImplBuilder() { } @@ -367,16 +352,6 @@ public ClusterDataImplBuilder listenerName(String listenerName) { return this; } - public ClusterDataImplBuilder migrated(boolean migrated) { - this.migrated = migrated; - return this; - } - - public ClusterDataImplBuilder migratedClusterUrl(ClusterUrl migratedClusterUrl) { - this.migratedClusterUrl = migratedClusterUrl; - return this; - } - public ClusterDataImpl build() { return new ClusterDataImpl( serviceUrl, @@ -400,9 +375,7 @@ public ClusterDataImpl build() { brokerClientTrustCertsFilePath, brokerClientKeyFilePath, brokerClientCertificateFilePath, - listenerName, - migrated, - migratedClusterUrl); + listenerName); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterPoliciesImpl.java new file mode 100644 index 0000000000000..c8af2dec3216b --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterPoliciesImpl.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * The configuration data for a cluster. + */ +@ApiModel( + value = "ClusterPolicies", + description = "The local cluster policies for a cluster" +) +@Data +@AllArgsConstructor +@NoArgsConstructor +public final class ClusterPoliciesImpl implements ClusterPolicies, Cloneable { + @ApiModelProperty( + name = "migrated", + value = "flag to check if cluster is migrated to different cluster", + example = "true/false" + ) + private boolean migrated; + @ApiModelProperty( + name = "migratedClusterUrl", + value = "url of cluster where current cluster is migrated" + ) + private ClusterUrl migratedClusterUrl; + + public static ClusterPoliciesImplBuilder builder() { + return new ClusterPoliciesImplBuilder(); + } + + @Override + public ClusterPoliciesImplBuilder clone() { + return builder() + .migrated(migrated) + .migratedClusterUrl(migratedClusterUrl); + } + + @Data + public static class ClusterPoliciesImplBuilder implements ClusterPolicies.Builder { + private boolean migrated; + private ClusterUrl migratedClusterUrl; + + ClusterPoliciesImplBuilder() { + } + + public ClusterPoliciesImplBuilder migrated(boolean migrated) { + this.migrated = migrated; + return this; + } + + public ClusterPoliciesImplBuilder migratedClusterUrl(ClusterUrl migratedClusterUrl) { + this.migratedClusterUrl = migratedClusterUrl; + return this; + } + + public ClusterPoliciesImpl build() { + return new ClusterPoliciesImpl( + migrated, + migratedClusterUrl); + } + } +} diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java index 87e935ecf7360..0bf1616653107 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/ClusterDataImplTest.java @@ -53,9 +53,6 @@ public void verifyClone() { .brokerClientKeyFilePath("/my/key/file") .brokerClientCertificateFilePath("/my/cert/file") .listenerName("a-listener") - .migrated(true) - .migratedClusterUrl(new ClusterData.ClusterUrl("http://remote", "https://remote", "pulsar://remote", - "pulsar+ssl://remote")) .build(); ClusterDataImpl clone = originalData.clone().build();