diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ba40a75651d63..13bcf7696188e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3010,7 +3010,7 @@ public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { } updateTopicPolicyByNamespacePolicy(data); - + checkReplicatedSubscriptionControllerState(); isEncryptionRequired = data.encryption_required; isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; @@ -3497,12 +3497,14 @@ private synchronized void checkReplicatedSubscriptionControllerState(boolean sho boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent(); boolean isEnableReplicatedSubscriptions = brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions(); + boolean replicationEnabled = this.topicPolicies.getReplicationClusters().get().size() > 1; - if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions) { + if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions && replicationEnabled) { log.info("[{}] Enabling replicated subscriptions controller", topic); replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this, brokerService.pulsar().getConfiguration().getClusterName())); - } else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions) { + } else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions + || !replicationEnabled) { log.info("[{}] Disabled replicated subscriptions controller", topic); replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); replicatedSubscriptionsController = Optional.empty(); @@ -3685,6 +3687,7 @@ public void onUpdate(TopicPolicies policies) { updateTopicPolicy(policies); shadowTopics = policies.getShadowTopics(); updateDispatchRateLimiter(); + checkReplicatedSubscriptionControllerState(); updateSubscriptionsDispatcherRateLimiter().thenRun(() -> { updatePublishDispatcher(); updateSubscribeRateLimiter(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index f816aa2dd244a..529fb923f5918 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -32,6 +32,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -50,7 +51,9 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; @@ -60,6 +63,7 @@ import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -728,6 +732,213 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName))); } + @DataProvider(name = "isTopicPolicyEnabled") + private Object[][] isTopicPolicyEnabled() { + // Todo: fix replication can not be enabled at topic level. + return new Object[][] { { Boolean.FALSE } }; + } + + /** + * Test the replication subscription can work normal in the following cases: + *

+ * 1. Do not write data into the original topic when the topic does not configure a remote cluster. {topic1} + * 1. Publish message to the topic and then wait a moment, + * the backlog will not increase after publishing completely. + * 2. Acknowledge the messages, the last confirm entry does not change. + * 2. Snapshot and mark will be written after topic configure a remote cluster. {topic2} + * 1. publish message to topic. After publishing completely, the backlog of the topic keep increase. + * 2. Wait the snapshot complete, the backlog stop changing. + * 3. Publish messages to wait another snapshot complete. + * 4. Ack messages to move the mark delete position after the position record in the first snapshot. + * 5. Check new entry (a mark) appending to the original topic. + * 3. Stopping writing snapshot and mark after remove the remote cluster of the topic. {topic2} + * similar to step 1. + *

+ */ + @Test(dataProvider = "isTopicPolicyEnabled") + public void testWriteMarkerTaskOfReplicateSubscriptions(boolean isTopicPolicyEnabled) throws Exception { + // 1. Prepare resource and use proper configuration. + String namespace = BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog"); + String topic1 = "persistent://" + namespace + "/replication-enable"; + String topic2 = "persistent://" + namespace + "/replication-disable"; + String subName = "sub"; + + admin1.namespaces().createNamespace(namespace); + pulsar1.getConfiguration().setTopicLevelPoliciesEnabled(isTopicPolicyEnabled); + pulsar1.getConfiguration().setReplicationPolicyCheckDurationSeconds(1); + pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); + // 2. Build Producer and Consumer. + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + @Cleanup + Consumer consumer1 = client1.newConsumer() + .topic(topic1) + .subscriptionName(subName) + .ackTimeout(5, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .replicateSubscriptionState(true) + .subscribe(); + @Cleanup + Producer producer1 = client1.newProducer() + .topic(topic1) + .create(); + // 3. Test replication subscription work as expected. + // Test case 1: disable replication, backlog will not increase. + testReplicatedSubscriptionWhenDisableReplication(producer1, consumer1, topic1); + + // Test case 2: enable replication, mark and snapshot work as expected. + if (isTopicPolicyEnabled) { + admin1.topics().createNonPartitionedTopic(topic2); + admin1.topics().setReplicationClusters(topic2, List.of("r1", "r2")); + } else { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + } + @Cleanup + Consumer consumer2 = client1.newConsumer() + .topic(topic2) + .subscriptionName(subName) + .ackTimeout(5, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .replicateSubscriptionState(true) + .subscribe(); + @Cleanup + Producer producer2 = client1.newProducer() + .topic(topic2) + .create(); + testReplicatedSubscriptionWhenEnableReplication(producer2, consumer2, topic2); + + // Test case 3: enable replication, mark and snapshot work as expected. + if (isTopicPolicyEnabled) { + admin1.topics().setReplicationClusters(topic2, List.of("r1")); + } else { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1")); + } + testReplicatedSubscriptionWhenDisableReplication(producer2, consumer2, topic2); + // 4. Clear resource. + pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(true); + admin1.namespaces().deleteNamespace(namespace, true); + pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(false); + } + + /** + * Disable replication subscription. + * Test scheduled task case. + * 1. Send three messages |1:0|1:1|1:2|. + * 2. Get topic backlog, as backlog1. + * 3. Wait a moment. + * 4. Get the topic backlog again, the backlog will not increase. + * Test acknowledge messages case. + * 1. Get the last confirm entry, as LAC1. + * 2. Acknowledge these messages |1:0|1:1|. + * 3. wait a moment. + * 4. Get the last confirm entry, as LAC2. LAC1 is equal to LAC2. + * Clear environment. + * 1. Ack all the retained messages. |1:2| + * 2. Wait for the backlog to return to zero. + */ + private void testReplicatedSubscriptionWhenDisableReplication(Producer producer, Consumer consumer, + String topic) throws Exception { + final int messageSum = 3; + // Test scheduled task case. + for (int i = 0; i < messageSum; i++) { + producer.newMessage().send(); + } + long backlog1 = admin1.topics().getStats(topic, false).getBacklogSize(); + Thread.sleep(3000); + long backlog2 = admin1.topics().getStats(topic, false).getBacklogSize(); + assertEquals(backlog1, backlog2); + // Test acknowledge messages case. + String lastConfirmEntry1 = admin1.topics().getInternalStats(topic).lastConfirmedEntry; + for (int i = 0; i < messageSum - 1; i++) { + consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS)); + } + Awaitility.await().untilAsserted(() -> { + String lastConfirmEntry2 = admin1.topics().getInternalStats(topic).lastConfirmedEntry; + assertEquals(lastConfirmEntry1, lastConfirmEntry2); + }); + // Clear environment. + consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS)); + Awaitility.await().untilAsserted(() -> { + long backlog4 = admin1.topics().getStats(topic, false).getBacklogSize(); + assertEquals(backlog4, 0); + }); + } + + /** + * Enable replication subscription. + * Test scheduled task case. + * 1. Wait replicator connected. + * 2. Send three messages |1:0|1:1|1:2|. + * 3. Get topic backlog, as backlog1. + * 4. Wait a moment. + * 5. Get the topic backlog again, as backlog2. The backlog2 is bigger than backlog1. |1:0|1:1|1:2|mark|. + * 6. Wait the snapshot complete. + * Test acknowledge messages case. + * 1. Write messages and wait another snapshot complete. |1:0|1:1|1:2|mark|1:3|1:4|1:5|mark| + * 2. Ack message |1:0|1:1|1:2|1:3|1:4|. + * 3. Get last confirm entry, as LAC1. + * 2. Wait a moment. + * 3. Get Last confirm entry, as LAC2. LAC2 different to LAC1. |1:5|mark|mark| + * Clear environment. + * 1. Ack all the retained message |1:5|. + * 2. Wait for the backlog to return to zero. + */ + private void testReplicatedSubscriptionWhenEnableReplication(Producer producer, Consumer consumer, + String topic) throws Exception { + final int messageSum = 3; + Awaitility.await().untilAsserted(() -> { + List keys = pulsar1.getBrokerService() + .getTopic(topic, false).get().get() + .getReplicators().keys(); + assertEquals(keys.size(), 1); + assertTrue(pulsar1.getBrokerService() + .getTopic(topic, false).get().get() + .getReplicators().get(keys.get(0)).isConnected()); + }); + // Test scheduled task case. + sendMessageAndWaitSnapshotComplete(producer, topic, messageSum); + // Test acknowledge messages case. + // After snapshot write completely, acknowledging message to move the mark delete position + // after the position recorded in the snapshot will trigger to write a new marker. + sendMessageAndWaitSnapshotComplete(producer, topic, messageSum); + String lastConfirmedEntry3 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry; + for (int i = 0; i < messageSum * 2 - 1; i++) { + consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS)); + } + Awaitility.await().untilAsserted(() -> { + String lastConfirmedEntry4 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry; + assertNotEquals(lastConfirmedEntry3, lastConfirmedEntry4); + }); + // Clear environment. + consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS)); + Awaitility.await().untilAsserted(() -> { + long backlog4 = admin1.topics().getStats(topic, false).getBacklogSize(); + assertEquals(backlog4, 0); + }); + } + + private void sendMessageAndWaitSnapshotComplete(Producer producer, String topic, + int messageSum) throws Exception { + for (int i = 0; i < messageSum; i++) { + producer.newMessage().send(); + } + long backlog1 = admin1.topics().getStats(topic, false).getBacklogSize(); + Awaitility.await().untilAsserted(() -> { + long backlog2 = admin1.topics().getStats(topic, false).getBacklogSize(); + assertTrue(backlog2 > backlog1); + }); + // Wait snapshot write completely, stop writing marker into topic. + Awaitility.await().untilAsserted(() -> { + String lastConfirmedEntry1 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry; + PersistentTopicInternalStats persistentTopicInternalStats = admin1.topics().getInternalStats(topic, false); + Thread.sleep(1000); + String lastConfirmedEntry2 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry; + assertEquals(lastConfirmedEntry1, lastConfirmedEntry2); + }); + } + void publishMessages(Producer producer, int startIndex, int numMessages, Set sentMessages) throws PulsarClientException { for (int i = startIndex; i < startIndex + numMessages; i++) {