diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index d96429693fda8..5e9183df0b1df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -55,6 +55,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.compaction.CompactedTopicUtils; +import org.apache.pulsar.compaction.TopicCompactionService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -350,8 +351,9 @@ protected void readMoreEntries(Consumer consumer) { havePendingRead = true; if (consumer.readCompacted()) { boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId()); - CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(), cursor, - messagesToRead, bytesToRead, readFromEarliest, this, true, consumer); + TopicCompactionService topicCompactionService = topic.getTopicCompactionService(); + CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead, + bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer); } else { ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java index 8c17e0f3ca34d..146ba4327d252 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.Consumer; public interface CompactedTopic { @@ -34,12 +35,14 @@ public interface CompactedTopic { * Read entries from compacted topic. * * @deprecated Use {@link CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService, ManagedCursor, - * int, long, boolean, ReadEntriesCallback, boolean, Consumer)} instead. + * int, long, org.apache.bookkeeper.mledger.impl.PositionImpl, boolean, ReadEntriesCallback, boolean, Consumer)} + * instead. */ @Deprecated void asyncReadEntriesOrWait(ManagedCursor cursor, int maxEntries, long bytesToRead, + PositionImpl maxReadPosition, boolean isFirstRead, ReadEntriesCallback callback, Consumer consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index b028b708c49e4..8794e2736d4d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -96,6 +96,7 @@ public CompletableFuture deleteCompactedLedger(long compactedLedgerId) { public void asyncReadEntriesOrWait(ManagedCursor cursor, int maxEntries, long bytesToRead, + PositionImpl maxReadPosition, boolean isFirstRead, ReadEntriesCallback callback, Consumer consumer) { PositionImpl cursorPosition; @@ -112,7 +113,7 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, if (currentCompactionHorizon == null || currentCompactionHorizon.compareTo(cursorPosition) < 0) { - cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST); + cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition); } else { ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor; int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java index 66bcf4c3002bd..d3464d402e9c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java @@ -42,8 +42,8 @@ public class CompactedTopicUtils { @Beta public static void asyncReadCompactedEntries(TopicCompactionService topicCompactionService, ManagedCursor cursor, int maxEntries, - long bytesToRead, boolean readFromEarliest, - AsyncCallbacks.ReadEntriesCallback callback, + long bytesToRead, PositionImpl maxReadPosition, + boolean readFromEarliest, AsyncCallbacks.ReadEntriesCallback callback, boolean wait, @Nullable Consumer consumer) { Objects.requireNonNull(topicCompactionService); Objects.requireNonNull(cursor); @@ -68,11 +68,9 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact || readPosition.compareTo( lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) > 0) { if (wait) { - cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, - PositionImpl.LATEST); + cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition); } else { - cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx, - PositionImpl.LATEST); + cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition); } return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index b9a274f5479f3..6959d8dd04861 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1794,4 +1794,59 @@ private void getTopic(String topicName) { }); } + @Test + public void testReadCommittedWithReadCompacted() throws Exception{ + final String namespace = "tnx/ns-prechecks"; + final String topic = "persistent://" + namespace + "/test_transaction_topic"; + admin.namespaces().createNamespace(namespace); + admin.topics().createNonPartitionedTopic(topic); + + admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024); + + @Cleanup + Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Exclusive) + .readCompacted(true) + .subscribe(); + + @Cleanup + Producer producer = this.pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .create(); + + producer.newMessage().key("K1").value("V1").send(); + + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); + producer.newMessage(txn).key("K2").value("V2").send(); + producer.newMessage(txn).key("K3").value("V3").send(); + + List messages = new ArrayList<>(); + while (true) { + Message message = consumer.receive(5, TimeUnit.SECONDS); + if (message == null) { + break; + } + messages.add(message.getValue()); + } + + Assert.assertEquals(messages, List.of("V1")); + + txn.commit(); + + messages.clear(); + + while (true) { + Message message = consumer.receive(5, TimeUnit.SECONDS); + if (message == null) { + break; + } + messages.add(message.getValue()); + } + + Assert.assertEquals(messages, List.of("V2", "V3")); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java index 94f2a17a2a3f4..2545c0362e82a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java @@ -69,8 +69,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } }; - CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, false, - readEntriesCallback, false, null); + CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, + PositionImpl.LATEST, false, readEntriesCallback, false, null); List entries = completableFuture.get(); Assert.assertTrue(entries.isEmpty());