Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into SKYEDEN-3020-Consum…
Browse files Browse the repository at this point in the history
…erRetransmissionImprovements

# Conflicts:
#	hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java
  • Loading branch information
MarcinBobinski committed Jan 8, 2025
2 parents f877edb + 6f6a5ed commit 9f879a0
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pl.allegro.tech.hermes.common.kafka.offset;

import java.util.List;
import java.util.Set;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName;
Expand All @@ -14,7 +15,7 @@ void setSubscriptionOffset(
PartitionOffset partitionOffset);

PartitionOffsets getSubscriptionOffsets(
TopicName topic, String subscriptionName, String brokersClusterName);
TopicName topic, String subscriptionName, String brokersClusterName, Set<Integer> partitions);

boolean areOffsetsMoved(
TopicName topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.TopicName;
Expand Down Expand Up @@ -63,7 +65,10 @@ public void setSubscriptionOffset(

@Override
public PartitionOffsets getSubscriptionOffsets(
TopicName topic, String subscriptionName, String brokersClusterName) {
TopicName topic,
String subscriptionName,
String brokersClusterName,
Set<Integer> partitions) {
subscriptionRepository.ensureSubscriptionExists(topic, subscriptionName);
String kafkaTopicsPath = paths.subscribedKafkaTopicsPath(topic, subscriptionName);

Expand All @@ -74,7 +79,7 @@ public PartitionOffsets getSubscriptionOffsets(
kafkaTopic ->
allOffsets.addAll(
getOffsetsForKafkaTopic(
topic, kafkaTopic, subscriptionName, brokersClusterName)));
topic, kafkaTopic, subscriptionName, brokersClusterName, partitions)));
return allOffsets;
}

Expand Down Expand Up @@ -134,19 +139,29 @@ private PartitionOffsets getOffsetsForKafkaTopic(
TopicName topic,
KafkaTopicName kafkaTopicName,
String subscriptionName,
String brokersClusterName) {
String offsetsPath =
paths.offsetsPath(topic, subscriptionName, kafkaTopicName, brokersClusterName);

String brokersClusterName,
Set<Integer> partitions) {
PartitionOffsets offsets = new PartitionOffsets();
for (String partitionAsString : getZookeeperChildrenForPath(offsetsPath)) {
Integer partition = Integer.valueOf(partitionAsString);
offsets.add(
new PartitionOffset(
for (Integer partition : partitions) {
try {
offsets.add(
new PartitionOffset(
kafkaTopicName,
getOffsetForPartition(
topic, kafkaTopicName, subscriptionName, brokersClusterName, partition),
partition));
} catch (InternalProcessingException ex) {
if (ex.getCause() instanceof NoNodeException) {
logger.warn(
"No offset for partition {} in kafka topic {} for topic {} subscription {}",
partition,
kafkaTopicName,
getOffsetForPartition(
topic, kafkaTopicName, subscriptionName, brokersClusterName, partition),
partition));
topic,
subscriptionName);
continue;
}
throw ex;
}
}
return offsets;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package pl.allegro.tech.hermes.infrastructure.zookeeper

import com.google.common.primitives.Longs

import pl.allegro.tech.hermes.api.Topic
import pl.allegro.tech.hermes.api.TopicName
import pl.allegro.tech.hermes.common.kafka.KafkaTopicName
Expand All @@ -9,8 +9,6 @@ import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets
import pl.allegro.tech.hermes.domain.subscription.SubscriptionNotExistsException
import pl.allegro.tech.hermes.test.IntegrationTest

import java.nio.charset.StandardCharsets

import static pl.allegro.tech.hermes.test.helper.builder.GroupBuilder.group
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic
Expand Down Expand Up @@ -43,7 +41,7 @@ class ZookeeperSubscriptionOffsetChangeIndicatorTest extends IntegrationTest {
indicator.setSubscriptionOffset(TOPIC.name, 'override', 'primary', new PartitionOffset(primaryKafkaTopicName, 10, 1))

then:
def offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'override', 'primary')
def offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'override', 'primary', [1] as Set)
offsets.find { it.partition == 1 } == new PartitionOffset(primaryKafkaTopicName, 10, 1)
}

Expand All @@ -54,7 +52,7 @@ class ZookeeperSubscriptionOffsetChangeIndicatorTest extends IntegrationTest {
indicator.setSubscriptionOffset(TOPIC.name, 'read', 'primary', new PartitionOffset(primaryKafkaTopicName, 10, 1))

when:
PartitionOffsets offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'read', 'primary')
PartitionOffsets offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'read', 'primary', [1] as Set)

then:
(offsets.find { it.partition == 1 } as PartitionOffset).offset == 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ public Subscription getSubscription() {
return subscription;
}

@Override
public Set<Integer> getAssignedPartitions() {
return receiver.getAssignedPartitions();
}

private Retryer<MessageSendingResult> createRetryer(
MessageBatch batch, BatchSubscriptionPolicy policy) {
return createRetryer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public interface Consumer {
PartitionOffsets moveOffset(PartitionOffsets subscriptionPartitionOffsets);

Subscription getSubscription();

Set<Integer> getAssignedPartitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,9 @@ public PartitionOffsets moveOffset(PartitionOffsets offsets) {
public Subscription getSubscription() {
return subscription;
}

@Override
public Set<Integer> getAssignedPartitions() {
return messageReceiver.getAssignedPartitions();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}

public Set<Integer> getAssignedPartitions() {
return receiver.getAssignedPartitions();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ && isAssigned(
subscriptionPartition.getSubscriptionName(), subscriptionPartition.getPartition());
}

public Set<Integer> getAssignedPartitions(SubscriptionName subscriptionName) {
return assigned.get(subscriptionName);
}

private boolean isAssigned(SubscriptionName name, int partition) {
return assigned.containsKey(name) && assigned.get(name).contains(partition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ default void update(Subscription newSubscription) {}
void commit(Set<SubscriptionPartitionOffset> offsets);

PartitionOffsets moveOffset(PartitionOffsets offsets);

Set<Integer> getAssignedPartitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}

@Override
public Set<Integer> getAssignedPartitions() {
return receiver.getAssignedPartitions();
}

@Override
public void stop() {
receiver.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
throw new ConsumerNotInitializedException();
}

@Override
public Set<Integer> getAssignedPartitions() {
throw new ConsumerNotInitializedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}

@Override
public Set<Integer> getAssignedPartitions() {
return receiver.getAssignedPartitions();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,4 +229,8 @@ private Map<TopicPartition, OffsetAndMetadata> createOffset(
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return offsetMover.move(offsets);
}

public Set<Integer> getAssignedPartitions() {
return partitionAssignmentState.getAssignedPartitions(subscription.getQualifiedName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ public void reloadOffsets(SubscriptionName subscriptionName, Consumer consumer)
try {
PartitionOffsets offsets =
subscriptionOffsetChangeIndicator.getSubscriptionOffsets(
subscriptionName.getTopicName(), subscriptionName.getName(), brokersClusterName);
subscriptionName.getTopicName(),
subscriptionName.getName(),
brokersClusterName,
consumer.getAssignedPartitions());

PartitionOffsets movedOffsets = consumer.moveOffset(offsets);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ class ConsumerStub implements Consumer {
return subscription
}

@Override
Set<Integer> getAssignedPartitions() {
return null
}

@Override
void updateTopic(Topic topic) {
}
Expand Down

0 comments on commit 9f879a0

Please sign in to comment.