diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index 5098890242b6c..310354dcd3b47 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -166,7 +166,22 @@ public synchronized CompletableFuture addConsumer(Consumer consumer) { } if (subscriptionType == SubType.Exclusive && !consumers.isEmpty()) { - return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already connected")); + Consumer actConsumer = ACTIVE_CONSUMER_UPDATER.get(this); + if (actConsumer != null) { + return actConsumer.cnx().checkConnectionLiveness().thenCompose(actConsumerStillAlive -> { + if (actConsumerStillAlive == null || actConsumerStillAlive) { + return FutureUtil.failedFuture(new ConsumerBusyException("Exclusive consumer is already" + + " connected")); + } else { + return addConsumer(consumer); + } + }); + } else { + // It should never happen. + + return FutureUtil.failedFuture(new ConsumerBusyException("Active consumer is in a strange state." + + " Active consumer is null, but there are " + consumers.size() + " registered.")); + } } if (subscriptionType == SubType.Failover && isConsumersExceededOnSubscription()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 51d3ef4577f55..a2b42b5cca034 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -209,6 +209,7 @@ public void setup() throws Exception { doReturn(eventLoopGroup.next()).when(channel).eventLoop(); doReturn(channel).when(ctx).channel(); doReturn(ctx).when(serverCnx).ctx(); + doReturn(CompletableFuture.completedFuture(true)).when(serverCnx).checkConnectionLiveness(); NamespaceService nsSvc = mock(NamespaceService.class); NamespaceBundle bundle = mock(NamespaceBundle.class); @@ -693,7 +694,15 @@ public void testSubscribeUnsubscribe() throws Exception { f1.get(); // 2. duplicate subscribe - Future f2 = topic.subscribe(getSubscriptionOption(cmd)); + CommandSubscribe cmd2 = new CommandSubscribe() + .setConsumerId(2) + .setTopic(successTopicName) + .setSubscription(successSubName) + .setConsumerName("consumer-name") + .setReadCompacted(false) + .setRequestId(2) + .setSubType(SubType.Exclusive); + Future f2 = topic.subscribe(getSubscriptionOption(cmd2)); try { f2.get(); fail("should fail with exception"); @@ -758,19 +767,11 @@ public void testAddRemoveConsumer() throws Exception { sub.addConsumer(consumer); assertTrue(sub.getDispatcher().isConsumerConnected()); - // 2. duplicate add consumer - try { - sub.addConsumer(consumer).get(); - fail("Should fail with ConsumerBusyException"); - } catch (Exception e) { - assertTrue(e.getCause() instanceof BrokerServiceException.ConsumerBusyException); - } - - // 3. simple remove consumer + // 2. simple remove consumer sub.removeConsumer(consumer); assertFalse(sub.getDispatcher().isConsumerConnected()); - // 4. duplicate remove consumer + // 3. duplicate remove consumer try { sub.removeConsumer(consumer); fail("Should fail with ServerMetadataException"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 5fd4881981365..79178ec491ff1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -45,6 +45,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; +import io.netty.channel.DefaultChannelId; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.vertx.core.impl.ConcurrentHashSet; @@ -52,6 +53,7 @@ import java.io.IOException; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -62,10 +64,15 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; +import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -99,6 +106,7 @@ import org.apache.pulsar.broker.service.utils.ClientChannelHelper; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.util.ConsumerName; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.api.proto.AuthMethod; import org.apache.pulsar.common.api.proto.BaseCommand; @@ -997,7 +1005,8 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception { channelsStoppedAnswerHealthCheck.add(channel); ClientChannel channel2 = new ClientChannel(); setChannelConnected(channel2.serverCnx); - Awaitility.await().untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> { + channel.runPendingTasks(); ByteBuf cmdProducer2 = Commands.newProducer(tName, producerId, requestId.incrementAndGet(), pName, false, metadata, null, epoch.incrementAndGet(), false, ProducerAccessMode.Shared, Optional.empty(), false); @@ -1011,10 +1020,132 @@ public void testHandleProducerAfterClientChannelInactive() throws Exception { channel2.close(); } + @Test + public void testHandleConsumerAfterClientChannelInactive() throws Exception { + final String tName = successTopicName; + final long consumerId = 1; + final MutableInt requestId = new MutableInt(1); + final String sName = successSubName; + final String cName1 = ConsumerName.generateRandomName(); + final String cName2 = ConsumerName.generateRandomName(); + resetChannel(); + setChannelConnected(); + + // The producer register using the first connection. + ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(), + SubType.Exclusive, 0, cName1, 0); + channel.writeInbound(cmdSubscribe1); + assertTrue(getResponse() instanceof CommandSuccess); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).get(); + assertNotNull(topicRef); + assertNotNull(topicRef.getSubscription(sName).getConsumers()); + assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1); + assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(), cName1); + + // Verify the second producer using a new connection will override the consumer who using a stopped channel. + channelsStoppedAnswerHealthCheck.add(channel); + ClientChannel channel2 = new ClientChannel(); + setChannelConnected(channel2.serverCnx); + ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(), + CommandSubscribe.SubType.Exclusive, 0, cName2, 0); + channel2.channel.writeInbound(cmdSubscribe2); + BackGroundExecutor backGroundExecutor = startBackgroundExecutorForEmbeddedChannel(channel); + + assertTrue(getResponse(channel2.channel, channel2.clientChannelHelper) instanceof CommandSuccess); + assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1); + assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(), cName2); + backGroundExecutor.close(); + + // cleanup. + channel.finish(); + channel2.close(); + } + + @Test + public void test2ndSubFailedIfDisabledConCheck() + throws Exception { + final String tName = successTopicName; + final long consumerId = 1; + final MutableInt requestId = new MutableInt(1); + final String sName = successSubName; + final String cName1 = ConsumerName.generateRandomName(); + final String cName2 = ConsumerName.generateRandomName(); + // Disabled connection check. + pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(-1); + resetChannel(); + setChannelConnected(); + + // The consumer register using the first connection. + ByteBuf cmdSubscribe1 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(), + SubType.Exclusive, 0, cName1, 0); + channel.writeInbound(cmdSubscribe1); + assertTrue(getResponse() instanceof CommandSuccess); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(tName).orElse(null); + assertNotNull(topicRef); + assertNotNull(topicRef.getSubscription(sName).getConsumers()); + assertEquals(topicRef.getSubscription(sName).getConsumers().stream().map(Consumer::consumerName) + .collect(Collectors.toList()), Collections.singletonList(cName1)); + + // Verify the consumer using a new connection will override the consumer who using a stopped channel. + channelsStoppedAnswerHealthCheck.add(channel); + ClientChannel channel2 = new ClientChannel(); + setChannelConnected(channel2.serverCnx); + ByteBuf cmdSubscribe2 = Commands.newSubscribe(tName, sName, consumerId, requestId.incrementAndGet(), + CommandSubscribe.SubType.Exclusive, 0, cName2, 0); + channel2.channel.writeInbound(cmdSubscribe2); + BackGroundExecutor backGroundExecutor = startBackgroundExecutorForEmbeddedChannel(channel); + + // Since the feature "ConnectionLiveness" has been disabled, the fix + // by https://github.com/apache/pulsar/pull/21183 will not be affected, so the client will still get an error. + Object responseOfConnection2 = getResponse(channel2.channel, channel2.clientChannelHelper); + assertTrue(responseOfConnection2 instanceof CommandError); + assertTrue(((CommandError) responseOfConnection2).getMessage() + .contains("Exclusive consumer is already connected")); + assertEquals(topicRef.getSubscription(sName).getConsumers().size(), 1); + assertEquals(topicRef.getSubscription(sName).getConsumers().iterator().next().consumerName(), cName1); + backGroundExecutor.close(); + + // cleanup. + channel.finish(); + channel2.close(); + // Reset configuration. + pulsar.getConfig().setConnectionLivenessCheckTimeoutMillis(5000); + } + + /** + * When a channel typed "EmbeddedChannel", once we call channel.execute(runnable), there is no background thread + * to run it. + * So starting a background thread to trigger the tasks in the queue. + */ + private BackGroundExecutor startBackgroundExecutorForEmbeddedChannel(final EmbeddedChannel channel) { + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + ScheduledFuture scheduledFuture = executor.scheduleWithFixedDelay(() -> { + channel.runPendingTasks(); + }, 100, 100, TimeUnit.MILLISECONDS); + return new BackGroundExecutor(executor, scheduledFuture); + } + + @AllArgsConstructor + private static class BackGroundExecutor implements Closeable { + + private ScheduledExecutorService executor; + + private ScheduledFuture scheduledFuture; + + @Override + public void close() throws IOException { + if (scheduledFuture != null) { + scheduledFuture.cancel(true); + } + executor.shutdown(); + } + } + private class ClientChannel implements Closeable { private ClientChannelHelper clientChannelHelper = new ClientChannelHelper(); private ServerCnx serverCnx = new ServerCnx(pulsar); - private EmbeddedChannel channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder( + private EmbeddedChannel channel = new EmbeddedChannel(DefaultChannelId.newInstance(), + new LengthFieldBasedFrameDecoder( 5 * 1024 * 1024, 0, 4, @@ -1810,9 +1941,11 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception { "test" /* consumer name */, 0 /* avoid reseting cursor */); channel.writeInbound(clientCommand); + BackGroundExecutor backGroundExecutor = startBackgroundExecutorForEmbeddedChannel(channel); + // Create producer second time clientCommand = Commands.newSubscribe(successTopicName, // - successSubName, 2 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, + successSubName, 2 /* consumer id */, 2 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */, 0 /* avoid reseting cursor */); channel.writeInbound(clientCommand); @@ -1822,6 +1955,9 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception { CommandError error = (CommandError) response; assertEquals(error.getError(), ServerError.ConsumerBusy); }); + + // cleanup. + backGroundExecutor.close(); channel.finish(); } @@ -2676,13 +2812,7 @@ protected Object getResponse(EmbeddedChannel channel, ClientChannelHelper client if (channelsStoppedAnswerHealthCheck.contains(channel)) { continue; } - channel.writeAndFlush(Commands.newPong()).addListener(future -> { - if (!future.isSuccess()) { - log.warn("[{}] Forcing connection to close since cannot send a pong message.", - channel, future.cause()); - channel.close(); - } - }); + channel.writeInbound(Commands.newPong()); continue; } return cmd; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java index bf0dd3aa9c1c5..c8fce32efc5f0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/utils/ClientChannelHelper.java @@ -27,6 +27,8 @@ import org.apache.pulsar.common.api.proto.CommandEndTxnResponse; import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; +import org.apache.pulsar.common.api.proto.CommandPing; +import org.apache.pulsar.common.api.proto.CommandPong; import org.apache.pulsar.common.api.proto.CommandWatchTopicListSuccess; import org.apache.pulsar.common.protocol.PulsarDecoder; import org.apache.pulsar.common.api.proto.CommandAck; @@ -207,6 +209,16 @@ protected void handleEndTxnOnSubscriptionResponse( CommandEndTxnOnSubscriptionResponse commandEndTxnOnSubscriptionResponse) { queue.offer(new CommandEndTxnOnSubscriptionResponse().copyFrom(commandEndTxnOnSubscriptionResponse)); } + + @Override + protected void handlePing(CommandPing ping) { + queue.offer(new CommandPing().copyFrom(ping)); + } + + @Override + protected void handlePong(CommandPong pong) { + return; + } }; }