diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 18e7f554c9994..02ea3bd713569 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -174,6 +174,7 @@ import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider; import org.apache.pulsar.websocket.WebSocketConsumerServlet; +import org.apache.pulsar.websocket.WebSocketMultiTopicConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -1081,6 +1082,11 @@ private void addWebSocketServiceHandler(WebService webService, new ServletHolder(readerWebSocketServlet), true, attributeMap); webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet), true, attributeMap); + + final WebSocketMultiTopicConsumerServlet multiTopicConsumerWebSocketServlet = + new WebSocketMultiTopicConsumerServlet(webSocketService); + webService.addServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH, + new ServletHolder(multiTopicConsumerWebSocketServlet), true, attributeMap); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index ad51158034315..9ec6a7daf7234 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -886,6 +886,7 @@ public void ackBatchMessageTest() throws Exception { WebSocketClient consumerClient = new WebSocketClient(); SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) .batchingMaxPublishDelay(1, TimeUnit.SECONDS) @@ -933,6 +934,7 @@ public void consumeEncryptedMessages() throws Exception { final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg=="; final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg=="; + @Cleanup Producer producer = pulsarClient.newProducer() .topic(topic) .enableBatching(false) @@ -1051,5 +1053,71 @@ private void stopWebSocketClient(WebSocketClient... clients) { log.info("proxy clients are stopped successfully"); } + @Test + public void testMultiTopics() throws Exception { + final String subscription1 = "my-sub1"; + final String subscription2 = "my-sub2"; + final String topic1 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID(); + final String topic2 = "my-property/my-ns/testMultiTopics" + UUID.randomUUID(); + final String consumerUri1 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + + "/ws/v3/consumer/" + subscription1 + "?topics=" + topic1 + "," + topic2; + + final String consumerUri2 = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + + "/ws/v3/consumer/" + subscription2 + "?topicsPattern=my-property/my-ns/testMultiTopics.*"; + + int messages = 10; + WebSocketClient consumerClient1 = new WebSocketClient(); + WebSocketClient consumerClient2 = new WebSocketClient(); + SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket(); + SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket(); + @Cleanup + Producer producer1 = pulsarClient.newProducer() + .topic(topic1) + .batchingMaxMessages(1) + .create(); + @Cleanup + Producer producer2 = pulsarClient.newProducer() + .topic(topic2) + .batchingMaxMessages(1) + .create(); + + try { + consumerClient1.start(); + consumerClient2.start(); + ClientUpgradeRequest consumerRequest1 = new ClientUpgradeRequest(); + ClientUpgradeRequest consumerRequest2 = new ClientUpgradeRequest(); + Future consumerFuture1 = consumerClient1.connect(consumeSocket1, URI.create(consumerUri1), consumerRequest1); + Future consumerFuture2 = consumerClient2.connect(consumeSocket2, URI.create(consumerUri2), consumerRequest2); + + assertTrue(consumerFuture1.get().isOpen()); + assertTrue(consumerFuture2.get().isOpen()); + assertEquals(consumeSocket1.getReceivedMessagesCount(), 0); + assertEquals(consumeSocket2.getReceivedMessagesCount(), 0); + + for (int i = 1; i <= messages; i ++) { + producer1.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + producer2.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + } + producer1.flush(); + producer2.flush(); + + consumeSocket1.sendPermits(2 * messages); + Awaitility.await().untilAsserted(() -> + assertEquals(consumeSocket1.getReceivedMessagesCount(), 2 * messages)); + Awaitility.await().untilAsserted(() -> + assertEquals(admin.topics().getStats(topic1).getSubscriptions() + .get(subscription1).getMsgBacklog(), 0)); + Awaitility.await().untilAsserted(() -> + assertEquals(admin.topics().getStats(topic2).getSubscriptions() + .get(subscription1).getMsgBacklog(), 0)); + + consumeSocket2.sendPermits(2 * messages); + Awaitility.await().untilAsserted(() -> + assertEquals(consumeSocket2.getReceivedMessagesCount(), 2 * messages)); + } finally { + stopWebSocketClient(consumerClient1, consumerClient2); + } + } + private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class); } diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java index 485befa00ac87..aa80b03613bee 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyServiceStarter.java @@ -52,6 +52,7 @@ import org.apache.pulsar.docs.tools.CmdGenerateDocs; import org.apache.pulsar.proxy.stats.ProxyStats; import org.apache.pulsar.websocket.WebSocketConsumerServlet; +import org.apache.pulsar.websocket.WebSocketMultiTopicConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -336,6 +337,11 @@ public static void addWebServerHandlers(WebServer server, new ServletHolder(readerWebSocketServlet)); server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new ServletHolder(readerWebSocketServlet)); + + final WebSocketMultiTopicConsumerServlet multiTopicConsumerWebSocketServlet = + new WebSocketMultiTopicConsumerServlet(webSocketService); + server.addServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH, + new ServletHolder(multiTopicConsumerWebSocketServlet)); } } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java index 3eb0a0dfcf8ca..b6ed27c87b6ba 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java @@ -67,7 +67,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen protected final WebSocketService service; protected final HttpServletRequest request; - protected final TopicName topic; + protected TopicName topic; protected final Map queryParams; private static final String PULSAR_AUTH_METHOD_NAME = "X-Pulsar-Auth-Method-Name"; protected final ObjectReader consumerCommandReader = @@ -80,12 +80,12 @@ public AbstractWebSocketHandler(WebSocketService service, ServletUpgradeResponse response) { this.service = service; this.request = new WebSocketHttpServletRequestWrapper(request); - this.topic = extractTopicName(request); this.queryParams = new TreeMap<>(); request.getParameterMap().forEach((key, values) -> { queryParams.put(key, values[0]); }); + extractTopicName(request); } protected boolean checkAuth(ServletUpgradeResponse response) { @@ -244,7 +244,7 @@ protected String checkAuthentication() { return null; } - private TopicName extractTopicName(HttpServletRequest request) { + protected void extractTopicName(HttpServletRequest request) { String uri = request.getRequestURI(); List parts = Splitter.on("/").splitToList(uri); @@ -287,7 +287,7 @@ private TopicName extractTopicName(HttpServletRequest request) { } final String name = Codec.decode(topicName.toString()); - return TopicName.get(domain, namespace, name); + topic = TopicName.get(domain, namespace, name); } @VisibleForTesting diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index 08a23eebdaeca..f07c2aa57066c 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -41,15 +41,12 @@ import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.api.TopicMessageId; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; -import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.policies.data.TopicOperation; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.DateFormatter; @@ -75,7 +72,7 @@ */ public class ConsumerHandler extends AbstractWebSocketHandler { - private String subscription = null; + protected String subscription = null; private SubscriptionType subscriptionType; private SubscriptionMode subscriptionMode; private Consumer consumer; @@ -88,6 +85,10 @@ public class ConsumerHandler extends AbstractWebSocketHandler { private final LongAdder numBytesDelivered; private final LongAdder numMsgsAcked; private volatile long msgDeliveredCounter = 0; + + protected String topicsPattern; + + protected String topics; private static final AtomicLongFieldUpdater MSG_DELIVERED_COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ConsumerHandler.class, "msgDeliveredCounter"); @@ -123,7 +124,14 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser return; } - this.consumer = builder.topic(topic.toString()).subscriptionName(subscription).subscribe(); + if (topicsPattern != null) { + this.consumer = builder.topicsPattern(topicsPattern).subscriptionName(subscription).subscribe(); + } else if (topics != null) { + this.consumer = builder.topics(Splitter.on(",").splitToList(topics)) + .subscriptionName(subscription).subscribe(); + } else { + this.consumer = builder.topic(topic.toString()).subscriptionName(subscription).subscribe(); + } if (!this.service.addConsumer(this)) { log.warn("[{}:{}] Failed to add consumer handler for topic {}", request.getRemoteAddr(), request.getRemotePort(), topic); @@ -299,8 +307,7 @@ private void checkResumeReceive() { private void handleAck(ConsumerCommand command) throws IOException { // We should have received an ack - TopicMessageId msgId = new TopicMessageIdImpl(topic.toString(), - (MessageIdAdv) MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId))); + MessageId msgId = MessageId.fromByteArray(Base64.getDecoder().decode(command.messageId)); if (log.isDebugEnabled()) { log.debug("[{}/{}] Received ack request of message {} from {} ", consumer.getTopic(), subscription, msgId, getRemote().getInetSocketAddress().toString()); @@ -490,7 +497,7 @@ protected Boolean isAuthorized(String authRole, AuthenticationDataSource authent } } - public static String extractSubscription(HttpServletRequest request) { + public String extractSubscription(HttpServletRequest request) { String uri = request.getRequestURI(); List parts = Splitter.on("/").splitToList(uri); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/MultiTopicConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/MultiTopicConsumerHandler.java new file mode 100644 index 0000000000000..7fbe257d2e249 --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/MultiTopicConsumerHandler.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.concurrent.TimeUnit.SECONDS; +import com.google.common.base.Splitter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import javax.servlet.http.HttpServletRequest; +import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.TopicOperation; +import org.apache.pulsar.common.util.Codec; +import org.apache.pulsar.common.util.FutureUtil; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Subscribing for multi-topic. + */ +public class MultiTopicConsumerHandler extends ConsumerHandler { + + public MultiTopicConsumerHandler(WebSocketService service, HttpServletRequest request, + ServletUpgradeResponse response) { + super(service, request, response); + } + + @Override + protected Boolean isAuthorized(String authRole, AuthenticationDataSource authenticationData) throws Exception { + try { + AuthenticationDataSubscription subscription = new AuthenticationDataSubscription(authenticationData, + this.subscription); + if (topics != null) { + List topicNames = Splitter.on(",").splitToList(topics); + List> futures = new ArrayList<>(); + for (String topicName : topicNames) { + futures.add(service.getAuthorizationService() + .allowTopicOperationAsync(TopicName.get(topicName), + TopicOperation.CONSUME, authRole, subscription)); + } + FutureUtil.waitForAll(futures) + .get(service.getConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + return futures.stream().allMatch(f -> f.join()); + } else { + return service.getAuthorizationService() + .allowTopicOperationAsync(topic, TopicOperation.CONSUME, authRole, subscription) + .get(service.getConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS); + } + } catch (TimeoutException e) { + log.warn("Time-out {} sec while checking authorization on {} ", + service.getConfig().getMetadataStoreOperationTimeoutSeconds(), topic); + throw e; + } catch (Exception e) { + log.warn("Consumer-client with Role - {} failed to get permissions for topic - {}. {}", authRole, topic, + e.getMessage()); + throw e; + } + } + + @Override + protected void extractTopicName(HttpServletRequest request) { + String uri = request.getRequestURI(); + List parts = Splitter.on("/").splitToList(uri); + + // V3 Format must be like : + // /ws/v3/consumer/my-subscription?topicsPattern="a.*" //ws/v3/consumer/my-subscription?topics="a,b,c" + checkArgument(parts.size() >= 4, "Invalid topic name format"); + checkArgument(parts.get(2).equals("v3")); + checkArgument(queryParams.containsKey("topicsPattern") || queryParams.containsKey("topics"), + "Should set topics or topicsPattern"); + checkArgument(!(queryParams.containsKey("topicsPattern") && queryParams.containsKey("topics")), + "Topics must be null when use topicsPattern"); + topicsPattern = queryParams.get("topicsPattern"); + topics = queryParams.get("topics"); + if (topicsPattern != null) { + topic = TopicName.get(topicsPattern); + } else { + // Multi topics only use the first topic nameļ¼Œ + topic = TopicName.get(Splitter.on(",").splitToList(topics).get(0)); + } + } + + @Override + public String extractSubscription(HttpServletRequest request) { + String uri = request.getRequestURI(); + List parts = Splitter.on("/").splitToList(uri); + // v3 Format must be like : + // /ws/v3/consumer/my-subscription?topicsPattern="a.*" //ws/v3/consumer/my-subscription?topics="a,b,c" + checkArgument(parts.size() >= 5 , "Invalid topic name format"); + checkArgument(parts.get(1).equals("ws")); + checkArgument(parts.get(2).equals("v3")); + checkArgument(parts.get(4).length() > 0, "Empty subscription name"); + + return Codec.decode(parts.get(4)); + } + + private static final Logger log = LoggerFactory.getLogger(MultiTopicConsumerHandler.class); +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketMultiTopicConsumerServlet.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketMultiTopicConsumerServlet.java new file mode 100644 index 0000000000000..4653cea98c15d --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketMultiTopicConsumerServlet.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket; + +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +public class WebSocketMultiTopicConsumerServlet extends WebSocketServlet { + private static final long serialVersionUID = 1L; + + public static final String SERVLET_PATH = "/ws/v3/consumer"; + + private final transient WebSocketService service; + + public WebSocketMultiTopicConsumerServlet(WebSocketService service) { + super(); + this.service = service; + } + + @Override + public void configure(WebSocketServletFactory factory) { + factory.getPolicy().setMaxTextMessageSize(service.getConfig().getWebSocketMaxTextFrameSize()); + if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) { + factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis()); + } + factory.setCreator((request, response) -> + new MultiTopicConsumerHandler(service, request.getHttpServletRequest(), response)); + } +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java index 8d5a896ba4aa9..c80b2da8252e8 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java @@ -29,6 +29,7 @@ import org.apache.pulsar.common.util.ShutdownUtil; import org.apache.pulsar.docs.tools.CmdGenerateDocs; import org.apache.pulsar.websocket.WebSocketConsumerServlet; +import org.apache.pulsar.websocket.WebSocketMultiTopicConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; @@ -95,6 +96,8 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH_V2, new WebSocketConsumerServlet(service)); + proxyServer.addWebSocketServlet(WebSocketMultiTopicConsumerServlet.SERVLET_PATH, + new WebSocketMultiTopicConsumerServlet(service)); proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new WebSocketReaderServlet(service)); diff --git a/pulsar-websocket/src/main/resources/findbugsExclude.xml b/pulsar-websocket/src/main/resources/findbugsExclude.xml index c96e63cdfccee..c2b0d7dac0d3b 100644 --- a/pulsar-websocket/src/main/resources/findbugsExclude.xml +++ b/pulsar-websocket/src/main/resources/findbugsExclude.xml @@ -159,6 +159,11 @@ + + + + + diff --git a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java index 7ec6faf634263..d21e1176f571d 100644 --- a/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java +++ b/pulsar-websocket/src/test/java/org/apache/pulsar/websocket/AbstractWebSocketHandlerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.websocket; +import static com.google.common.base.Preconditions.checkArgument; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -38,6 +39,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import lombok.Cleanup; +import com.google.common.base.Splitter; import lombok.Getter; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.client.api.CompressionType; @@ -52,6 +54,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.websocket.api.RemoteEndpoint; @@ -130,7 +133,7 @@ public void topicNameUrlEncodingTest() throws Exception { webSocketHandler = new WebSocketHandlerImpl(null, httpServletRequest, null); topicName = webSocketHandler.getTopic(); assertEquals(topicName.toString(), "persistent://my-property/my-ns/" + consumerV2Topic); - String sub = ConsumerHandler.extractSubscription(httpServletRequest); + String sub = extractSubscription(httpServletRequest); assertEquals(sub, consumerV2Sub); when(httpServletRequest.getRequestURI()).thenReturn(readerV2 @@ -140,6 +143,27 @@ public void topicNameUrlEncodingTest() throws Exception { assertEquals(topicName.toString(), "persistent://my-property/my-ns/" + readerV2Topic); } + public String extractSubscription(HttpServletRequest request) { + String uri = request.getRequestURI(); + List parts = Splitter.on("/").splitToList(uri); + + // v1 Format must be like : + // /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription + + // v2 Format must be like : + // /ws/v2/consumer/persistent/my-property/my-ns/my-topic/my-subscription + checkArgument(parts.size() == 9, "Invalid topic name format"); + checkArgument(parts.get(1).equals("ws")); + + final boolean isV2Format = parts.get(2).equals("v2"); + final int domainIndex = isV2Format ? 4 : 3; + checkArgument(parts.get(domainIndex).equals("persistent") + || parts.get(domainIndex).equals("non-persistent")); + checkArgument(parts.get(8).length() > 0, "Empty subscription name"); + + return Codec.decode(parts.get(8)); + } + @Test public void parseTopicNameTest() { String producerV1 = "/ws/producer/persistent/my-property/my-cluster/my-ns/my-topic";