diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index eac4335a26d2d..0ef487c320dde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -722,7 +722,9 @@ protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, .thenCompose(unused -> internalRemovePartitionsTopicAsync(numPartitions, force)); }) // Only tries to delete the znode for partitioned topic when all its partitions are successfully deleted - ).thenCompose(__ -> getPulsarResources().getNamespaceResources().getPartitionedTopicResources() + ).thenCompose(ignore -> + pulsar().getBrokerService().deleteSchema(topicName).exceptionally(ex -> null)) + .thenCompose(__ -> getPulsarResources().getNamespaceResources().getPartitionedTopicResources() .runWithMarkDeleteAsync(topicName, () -> namespaceResources() .getPartitionedTopicResources().deletePartitionedTopicAsync(topicName))) .thenAccept(__ -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7532bee8c12e8..43bf60f282e09 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -3569,7 +3569,7 @@ public CompletableFuture deleteTopicPolicies(TopicName topicName) { .deleteTopicPoliciesAsync(TopicName.get(topicName.getPartitionedTopicName())); } - CompletableFuture deleteSchema(TopicName topicName) { + public CompletableFuture deleteSchema(TopicName topicName) { String base = topicName.getPartitionedTopicName(); String id = TopicName.get(base).getSchemaName(); SchemaRegistryService schemaRegistryService = getPulsar().getSchemaRegistryService(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 13bcf7696188e..eaa57140c9f19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -169,6 +169,7 @@ import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.topics.TopicCompactionStrategy; import org.apache.pulsar.common.util.Codec; @@ -2380,6 +2381,15 @@ private Optional getCompactorMXBean() { return Optional.ofNullable(compactor).map(c -> c.getStats()); } + @Override + public CompletableFuture deleteSchema() { + if (TopicName.get(getName()).isPartitioned()) { + // Only delete schema when partitioned metadata is deleting. + return CompletableFuture.completedFuture(null); + } + return brokerService.deleteSchema(TopicName.get(getName())); + } + @Override public CompletableFuture getInternalStats(boolean includeLedgerMetadata) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java new file mode 100644 index 0000000000000..7790940c1327f --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicGCTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertTrue; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class TopicGCTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @EqualsAndHashCode.Include + protected void doInitConf() throws Exception { + super.doInitConf(); + this.conf.setBrokerDeleteInactiveTopicsEnabled(true); + this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); + this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(10); + } + + @Test + public void testCreateConsumerAfterOnePartDeleted() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String partition0 = topic + "-partition-0"; + final String partition1 = topic + "-partition-1"; + final String subscription = "s1"; + admin.topics().createPartitionedTopic(topic, 2); + admin.topics().createSubscription(topic, subscription, MessageId.earliest); + + // create consumers and producers. + Producer producer0 = pulsarClient.newProducer(Schema.STRING).topic(partition0) + .enableBatching(false).create(); + Producer producer1 = pulsarClient.newProducer(Schema.STRING).topic(partition1) + .enableBatching(false).create(); + org.apache.pulsar.client.api.Consumer consumer1 = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionName(subscription).isAckReceiptEnabled(true).subscribe(); + + // Make consume all messages for one topic, do not consume any messages for another one. + producer0.send("1"); + producer1.send("2"); + admin.topics().skipAllMessages(partition0, subscription); + + // Wait for topic GC. + // Partition 0 will be deleted about 20s later, left 2min to avoid flaky. + producer0.close(); + consumer1.close(); + Awaitility.await().atMost(2, TimeUnit.MINUTES).untilAsserted(() -> { + CompletableFuture> tp1 = pulsar.getBrokerService().getTopic(partition0, false); + CompletableFuture> tp2 = pulsar.getBrokerService().getTopic(partition1, false); + assertTrue(tp1 == null || !tp1.get().isPresent()); + assertTrue(tp2 != null && tp2.get().isPresent()); + }); + + // Verify that the consumer subscribed with partitioned topic can be created successful. + Consumer consumerAllPartition = pulsarClient.newConsumer(Schema.STRING).topic(topic) + .subscriptionName(subscription).isAckReceiptEnabled(true).subscribe(); + Message msg = consumerAllPartition.receive(2, TimeUnit.SECONDS); + String receivedMsgValue = msg.getValue(); + log.info("received msg: {}", receivedMsgValue); + consumerAllPartition.acknowledge(msg); + + // cleanup. + consumerAllPartition.close(); + producer0.close(); + producer1.close(); + admin.topics().deletePartitionedTopic(topic); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/TopicSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/TopicSchemaTest.java new file mode 100644 index 0000000000000..66bfd1c3ec2b0 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/TopicSchemaTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.schema; + +import static org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata; +import static org.testng.Assert.assertTrue; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class TopicSchemaTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @DataProvider(name = "topicDomains") + public Object[][] topicDomains() { + return new Object[][]{ + {TopicDomain.non_persistent}, + {TopicDomain.persistent} + }; + } + + @Test(dataProvider = "topicDomains") + public void testDeleteNonPartitionedTopicWithSchema(TopicDomain topicDomain) throws Exception { + final String topic = BrokerTestUtil.newUniqueName(topicDomain.value() + "://public/default/tp"); + final String schemaId = TopicName.get(TopicName.get(topic).getPartitionedTopicName()).getSchemaName(); + admin.topics().createNonPartitionedTopic(topic); + + // Add schema. + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic) + .enableBatching(false).create(); + producer.close(); + List schemaList1 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join() + .stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList()); + assertTrue(schemaList1 != null && schemaList1.size() > 0); + + // Verify the schema has been deleted with topic. + admin.topics().delete(topic, false); + List schemaList2 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join() + .stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList()); + assertTrue(schemaList2 == null || schemaList2.isEmpty()); + } + + @Test + public void testDeletePartitionedTopicWithoutSchema() throws Exception { + // Non-persistent topic does not support partitioned topic now, so only write a test case for persistent topic. + TopicDomain topicDomain = TopicDomain.persistent; + final String topic = BrokerTestUtil.newUniqueName(topicDomain.value() + "://public/default/tp"); + final String partition0 = topic + "-partition-0"; + final String partition1 = topic + "-partition-1"; + final String schemaId = TopicName.get(TopicName.get(topic).getPartitionedTopicName()).getSchemaName(); + admin.topics().createPartitionedTopic(topic, 2); + + // Add schema. + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic) + .enableBatching(false).create(); + producer.close(); + List schemaList1 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join() + .stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList()); + assertTrue(schemaList1 != null && schemaList1.size() > 0); + + // Verify the schema will not been deleted with partition-0. + admin.topics().delete(partition0, false); + List schemaList2 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join() + .stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList()); + assertTrue(schemaList2 != null && schemaList2.size() > 0); + + // Verify the schema will not been deleted with partition-0 & partition-1. + admin.topics().delete(partition1, false); + List schemaList3 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join() + .stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList()); + assertTrue(schemaList3 != null && schemaList3.size() > 0); + + // Verify the schema will be deleted with partitioned metadata. + admin.topics().deletePartitionedTopic(topic, false); + List schemaList4 = pulsar.getSchemaRegistryService().getAllSchemas(schemaId).join() + .stream().map(s -> s.join()).filter(Objects::nonNull).collect(Collectors.toList()); + assertTrue(schemaList4 == null || schemaList4.isEmpty()); + } +}