Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Fix issue with consumer read uncommitted messages from …
Browse files Browse the repository at this point in the history
…compacted topic (apache#21465)
  • Loading branch information
coderzc authored Nov 3, 2023
1 parent f3f5fbc commit b5925ed
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactedTopicUtils;
import org.apache.pulsar.compaction.TopicCompactionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -350,8 +351,9 @@ protected void readMoreEntries(Consumer consumer) {
havePendingRead = true;
if (consumer.readCompacted()) {
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(), cursor,
messagesToRead, bytesToRead, readFromEarliest, this, true, consumer);
TopicCompactionService topicCompactionService = topic.getTopicCompactionService();
CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead,
bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer);
} else {
ReadEntriesCtx readEntriesCtx =
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;

public interface CompactedTopic {
Expand All @@ -34,12 +35,14 @@ public interface CompactedTopic {
* Read entries from compacted topic.
*
* @deprecated Use {@link CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService, ManagedCursor,
* int, long, boolean, ReadEntriesCallback, boolean, Consumer)} instead.
* int, long, org.apache.bookkeeper.mledger.impl.PositionImpl, boolean, ReadEntriesCallback, boolean, Consumer)}
* instead.
*/
@Deprecated
void asyncReadEntriesOrWait(ManagedCursor cursor,
int maxEntries,
long bytesToRead,
PositionImpl maxReadPosition,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId) {
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int maxEntries,
long bytesToRead,
PositionImpl maxReadPosition,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
PositionImpl cursorPosition;
Expand All @@ -112,7 +113,7 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,

if (currentCompactionHorizon == null
|| currentCompactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
} else {
ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public class CompactedTopicUtils {
@Beta
public static void asyncReadCompactedEntries(TopicCompactionService topicCompactionService,
ManagedCursor cursor, int maxEntries,
long bytesToRead, boolean readFromEarliest,
AsyncCallbacks.ReadEntriesCallback callback,
long bytesToRead, PositionImpl maxReadPosition,
boolean readFromEarliest, AsyncCallbacks.ReadEntriesCallback callback,
boolean wait, @Nullable Consumer consumer) {
Objects.requireNonNull(topicCompactionService);
Objects.requireNonNull(cursor);
Expand All @@ -68,11 +68,9 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact
|| readPosition.compareTo(
lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) > 0) {
if (wait) {
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx,
PositionImpl.LATEST);
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
} else {
cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx,
PositionImpl.LATEST);
cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
}
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1794,4 +1794,59 @@ private void getTopic(String topicName) {
});
}

@Test
public void testReadCommittedWithReadCompacted() throws Exception{
final String namespace = "tnx/ns-prechecks";
final String topic = "persistent://" + namespace + "/test_transaction_topic";
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);

admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024);

@Cleanup
Consumer<String> consumer = this.pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Exclusive)
.readCompacted(true)
.subscribe();

@Cleanup
Producer<String> producer = this.pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

producer.newMessage().key("K1").value("V1").send();

Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn).key("K2").value("V2").send();
producer.newMessage(txn).key("K3").value("V3").send();

List<String> messages = new ArrayList<>();
while (true) {
Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
if (message == null) {
break;
}
messages.add(message.getValue());
}

Assert.assertEquals(messages, List.of("V1"));

txn.commit();

messages.clear();

while (true) {
Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
if (message == null) {
break;
}
messages.add(message.getValue());
}

Assert.assertEquals(messages, List.of("V2", "V3"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}
};

CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, false,
readEntriesCallback, false, null);
CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100,
PositionImpl.LATEST, false, readEntriesCallback, false, null);

List<Entry> entries = completableFuture.get();
Assert.assertTrue(entries.isEmpty());
Expand Down

0 comments on commit b5925ed

Please sign in to comment.