Skip to content

Commit

Permalink
SKYEDEN-3020 | Handle retransmission zookeeper race condition (#1936)
Browse files Browse the repository at this point in the history
MarcinBobinski authored Jan 8, 2025

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 829b93a commit 6f6a5ed
Showing 15 changed files with 83 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.common.kafka.offset;

import java.util.List;
import java.util.Set;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
@@ -14,7 +15,7 @@ void setSubscriptionOffset(
PartitionOffset partitionOffset);

PartitionOffsets getSubscriptionOffsets(
TopicName topic, String subscriptionName, String brokersClusterName);
TopicName topic, String subscriptionName, String brokersClusterName, Set<Integer> partitions);

boolean areOffsetsMoved(
TopicName topicName,
Original file line number Diff line number Diff line change
@@ -2,7 +2,9 @@

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.TopicName;
@@ -63,7 +65,10 @@ public void setSubscriptionOffset(

@Override
public PartitionOffsets getSubscriptionOffsets(
TopicName topic, String subscriptionName, String brokersClusterName) {
TopicName topic,
String subscriptionName,
String brokersClusterName,
Set<Integer> partitions) {
subscriptionRepository.ensureSubscriptionExists(topic, subscriptionName);
String kafkaTopicsPath = paths.subscribedKafkaTopicsPath(topic, subscriptionName);

@@ -74,7 +79,7 @@ public PartitionOffsets getSubscriptionOffsets(
kafkaTopic ->
allOffsets.addAll(
getOffsetsForKafkaTopic(
topic, kafkaTopic, subscriptionName, brokersClusterName)));
topic, kafkaTopic, subscriptionName, brokersClusterName, partitions)));
return allOffsets;
}

@@ -134,19 +139,29 @@ private PartitionOffsets getOffsetsForKafkaTopic(
TopicName topic,
KafkaTopicName kafkaTopicName,
String subscriptionName,
String brokersClusterName) {
String offsetsPath =
paths.offsetsPath(topic, subscriptionName, kafkaTopicName, brokersClusterName);

String brokersClusterName,
Set<Integer> partitions) {
PartitionOffsets offsets = new PartitionOffsets();
for (String partitionAsString : getZookeeperChildrenForPath(offsetsPath)) {
Integer partition = Integer.valueOf(partitionAsString);
offsets.add(
new PartitionOffset(
for (Integer partition : partitions) {
try {
offsets.add(
new PartitionOffset(
kafkaTopicName,
getOffsetForPartition(
topic, kafkaTopicName, subscriptionName, brokersClusterName, partition),
partition));
} catch (InternalProcessingException ex) {
if (ex.getCause() instanceof NoNodeException) {
logger.warn(
"No offset for partition {} in kafka topic {} for topic {} subscription {}",
partition,
kafkaTopicName,
getOffsetForPartition(
topic, kafkaTopicName, subscriptionName, brokersClusterName, partition),
partition));
topic,
subscriptionName);
continue;
}
throw ex;
}
}
return offsets;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.allegro.tech.hermes.infrastructure.zookeeper

import com.google.common.primitives.Longs

import pl.allegro.tech.hermes.api.Topic
import pl.allegro.tech.hermes.api.TopicName
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName
@@ -9,8 +9,6 @@ import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets
import pl.allegro.tech.hermes.domain.subscription.SubscriptionNotExistsException
import pl.allegro.tech.hermes.test.IntegrationTest

import java.nio.charset.StandardCharsets

import static pl.allegro.tech.hermes.test.helper.builder.GroupBuilder.group
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic
@@ -43,7 +41,7 @@ class ZookeeperSubscriptionOffsetChangeIndicatorTest extends IntegrationTest {
indicator.setSubscriptionOffset(TOPIC.name, 'override', 'primary', new PartitionOffset(primaryKafkaTopicName, 10, 1))

then:
def offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'override', 'primary')
def offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'override', 'primary', [1] as Set)
offsets.find { it.partition == 1 } == new PartitionOffset(primaryKafkaTopicName, 10, 1)
}

@@ -54,7 +52,7 @@ class ZookeeperSubscriptionOffsetChangeIndicatorTest extends IntegrationTest {
indicator.setSubscriptionOffset(TOPIC.name, 'read', 'primary', new PartitionOffset(primaryKafkaTopicName, 10, 1))

when:
PartitionOffsets offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'read', 'primary')
PartitionOffsets offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'read', 'primary', [1] as Set)

then:
(offsets.find { it.partition == 1 } as PartitionOffset).offset == 10
Original file line number Diff line number Diff line change
@@ -251,6 +251,11 @@ public Subscription getSubscription() {
return subscription;
}

@Override
public Set<Integer> getAssignedPartitions() {
return receiver.getAssignedPartitions();
}

private Retryer<MessageSendingResult> createRetryer(
MessageBatch batch, BatchSubscriptionPolicy policy) {
return createRetryer(
Original file line number Diff line number Diff line change
@@ -29,4 +29,6 @@ public interface Consumer {
boolean moveOffset(PartitionOffset subscriptionPartitionOffset);

Subscription getSubscription();

Set<Integer> getAssignedPartitions();
}
Original file line number Diff line number Diff line change
@@ -270,4 +270,9 @@ public boolean moveOffset(PartitionOffset offset) {
public Subscription getSubscription() {
return subscription;
}

@Override
public Set<Integer> getAssignedPartitions() {
return messageReceiver.getAssignedPartitions();
}
}
Original file line number Diff line number Diff line change
@@ -183,4 +183,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
}

public Set<Integer> getAssignedPartitions() {
return receiver.getAssignedPartitions();
}
}
Original file line number Diff line number Diff line change
@@ -66,6 +66,10 @@ && isAssigned(
subscriptionPartition.getSubscriptionName(), subscriptionPartition.getPartition());
}

public Set<Integer> getAssignedPartitions(SubscriptionName subscriptionName) {
return assigned.get(subscriptionName);
}

private boolean isAssigned(SubscriptionName name, int partition) {
return assigned.containsKey(name) && assigned.get(name).contains(partition);
}
Original file line number Diff line number Diff line change
@@ -34,4 +34,6 @@ default void update(Subscription newSubscription) {}
void commit(Set<SubscriptionPartitionOffset> offsets);

boolean moveOffset(PartitionOffset offset);

Set<Integer> getAssignedPartitions();
}
Original file line number Diff line number Diff line change
@@ -57,6 +57,11 @@ public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
}

@Override
public Set<Integer> getAssignedPartitions() {
return receiver.getAssignedPartitions();
}

@Override
public void stop() {
receiver.stop();
Original file line number Diff line number Diff line change
@@ -21,4 +21,9 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
public boolean moveOffset(PartitionOffset offset) {
throw new ConsumerNotInitializedException();
}

@Override
public Set<Integer> getAssignedPartitions() {
throw new ConsumerNotInitializedException();
}
}
Original file line number Diff line number Diff line change
@@ -68,4 +68,9 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
}

@Override
public Set<Integer> getAssignedPartitions() {
return receiver.getAssignedPartitions();
}
}
Original file line number Diff line number Diff line change
@@ -226,4 +226,8 @@ private Map<TopicPartition, OffsetAndMetadata> createOffset(
public boolean moveOffset(PartitionOffset offset) {
return offsetMover.move(offset);
}

public Set<Integer> getAssignedPartitions() {
return partitionAssignmentState.getAssignedPartitions(subscription.getQualifiedName());
}
}
Original file line number Diff line number Diff line change
@@ -28,7 +28,10 @@ public void reloadOffsets(SubscriptionName subscriptionName, Consumer consumer)
try {
PartitionOffsets offsets =
subscriptionOffsetChangeIndicator.getSubscriptionOffsets(
subscriptionName.getTopicName(), subscriptionName.getName(), brokersClusterName);
subscriptionName.getTopicName(),
subscriptionName.getName(),
brokersClusterName,
consumer.getAssignedPartitions());

for (PartitionOffset partitionOffset : offsets) {
if (moveOffset(subscriptionName, consumer, partitionOffset)) {
Original file line number Diff line number Diff line change
@@ -64,6 +64,11 @@ class ConsumerStub implements Consumer {
return subscription
}

@Override
Set<Integer> getAssignedPartitions() {
return null
}

@Override
void updateTopic(Topic topic) {
}

0 comments on commit 6f6a5ed

Please sign in to comment.