diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/SubscriptionOffsetChangeIndicator.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/SubscriptionOffsetChangeIndicator.java index 863c8a8801..fc3047bed7 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/SubscriptionOffsetChangeIndicator.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/common/kafka/offset/SubscriptionOffsetChangeIndicator.java @@ -1,6 +1,7 @@ package pl.allegro.tech.hermes.common.kafka.offset; import java.util.List; +import java.util.Set; import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.common.kafka.KafkaTopic; import pl.allegro.tech.hermes.common.kafka.KafkaTopicName; @@ -14,7 +15,7 @@ void setSubscriptionOffset( PartitionOffset partitionOffset); PartitionOffsets getSubscriptionOffsets( - TopicName topic, String subscriptionName, String brokersClusterName); + TopicName topic, String subscriptionName, String brokersClusterName, Set partitions); boolean areOffsetsMoved( TopicName topicName, diff --git a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionOffsetChangeIndicator.java b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionOffsetChangeIndicator.java index 0ea7cf0c18..63ea1a3fe1 100644 --- a/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionOffsetChangeIndicator.java +++ b/hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionOffsetChangeIndicator.java @@ -2,7 +2,9 @@ import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Set; import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.KeeperException.NoNodeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import pl.allegro.tech.hermes.api.TopicName; @@ -63,7 +65,10 @@ public void setSubscriptionOffset( @Override public PartitionOffsets getSubscriptionOffsets( - TopicName topic, String subscriptionName, String brokersClusterName) { + TopicName topic, + String subscriptionName, + String brokersClusterName, + Set partitions) { subscriptionRepository.ensureSubscriptionExists(topic, subscriptionName); String kafkaTopicsPath = paths.subscribedKafkaTopicsPath(topic, subscriptionName); @@ -74,7 +79,7 @@ public PartitionOffsets getSubscriptionOffsets( kafkaTopic -> allOffsets.addAll( getOffsetsForKafkaTopic( - topic, kafkaTopic, subscriptionName, brokersClusterName))); + topic, kafkaTopic, subscriptionName, brokersClusterName, partitions))); return allOffsets; } @@ -134,19 +139,29 @@ private PartitionOffsets getOffsetsForKafkaTopic( TopicName topic, KafkaTopicName kafkaTopicName, String subscriptionName, - String brokersClusterName) { - String offsetsPath = - paths.offsetsPath(topic, subscriptionName, kafkaTopicName, brokersClusterName); - + String brokersClusterName, + Set partitions) { PartitionOffsets offsets = new PartitionOffsets(); - for (String partitionAsString : getZookeeperChildrenForPath(offsetsPath)) { - Integer partition = Integer.valueOf(partitionAsString); - offsets.add( - new PartitionOffset( + for (Integer partition : partitions) { + try { + offsets.add( + new PartitionOffset( + kafkaTopicName, + getOffsetForPartition( + topic, kafkaTopicName, subscriptionName, brokersClusterName, partition), + partition)); + } catch (InternalProcessingException ex) { + if (ex.getCause() instanceof NoNodeException) { + logger.warn( + "No offset for partition {} in kafka topic {} for topic {} subscription {}", + partition, kafkaTopicName, - getOffsetForPartition( - topic, kafkaTopicName, subscriptionName, brokersClusterName, partition), - partition)); + topic, + subscriptionName); + continue; + } + throw ex; + } } return offsets; } diff --git a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionOffsetChangeIndicatorTest.groovy b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionOffsetChangeIndicatorTest.groovy index 6838c698c9..1855cd0efa 100644 --- a/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionOffsetChangeIndicatorTest.groovy +++ b/hermes-common/src/test/groovy/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperSubscriptionOffsetChangeIndicatorTest.groovy @@ -1,6 +1,6 @@ package pl.allegro.tech.hermes.infrastructure.zookeeper -import com.google.common.primitives.Longs + import pl.allegro.tech.hermes.api.Topic import pl.allegro.tech.hermes.api.TopicName import pl.allegro.tech.hermes.common.kafka.KafkaTopicName @@ -9,8 +9,6 @@ import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets import pl.allegro.tech.hermes.domain.subscription.SubscriptionNotExistsException import pl.allegro.tech.hermes.test.IntegrationTest -import java.nio.charset.StandardCharsets - import static pl.allegro.tech.hermes.test.helper.builder.GroupBuilder.group import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topic @@ -43,7 +41,7 @@ class ZookeeperSubscriptionOffsetChangeIndicatorTest extends IntegrationTest { indicator.setSubscriptionOffset(TOPIC.name, 'override', 'primary', new PartitionOffset(primaryKafkaTopicName, 10, 1)) then: - def offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'override', 'primary') + def offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'override', 'primary', [1] as Set) offsets.find { it.partition == 1 } == new PartitionOffset(primaryKafkaTopicName, 10, 1) } @@ -54,7 +52,7 @@ class ZookeeperSubscriptionOffsetChangeIndicatorTest extends IntegrationTest { indicator.setSubscriptionOffset(TOPIC.name, 'read', 'primary', new PartitionOffset(primaryKafkaTopicName, 10, 1)) when: - PartitionOffsets offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'read', 'primary') + PartitionOffsets offsets = indicator.getSubscriptionOffsets(TOPIC.name, 'read', 'primary', [1] as Set) then: (offsets.find { it.partition == 1 } as PartitionOffset).offset == 10 diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java index 9f73598e31..272ca6287a 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/BatchConsumer.java @@ -251,6 +251,11 @@ public Subscription getSubscription() { return subscription; } + @Override + public Set getAssignedPartitions() { + return receiver.getAssignedPartitions(); + } + private Retryer createRetryer( MessageBatch batch, BatchSubscriptionPolicy policy) { return createRetryer( diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java index 5e88be3da4..858811f891 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/Consumer.java @@ -29,4 +29,6 @@ public interface Consumer { boolean moveOffset(PartitionOffset subscriptionPartitionOffset); Subscription getSubscription(); + + Set getAssignedPartitions(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java index 0897a48e34..a8f1bd3212 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/SerialConsumer.java @@ -270,4 +270,9 @@ public boolean moveOffset(PartitionOffset offset) { public Subscription getSubscription() { return subscription; } + + @Override + public Set getAssignedPartitions() { + return messageReceiver.getAssignedPartitions(); + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java index 1011bf9b14..03d76d0162 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/batch/MessageBatchReceiver.java @@ -183,4 +183,8 @@ public void commit(Set offsets) { public boolean moveOffset(PartitionOffset offset) { return receiver.moveOffset(offset); } + + public Set getAssignedPartitions() { + return receiver.getAssignedPartitions(); + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java index e6c318f369..ffc41462d9 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/offset/ConsumerPartitionAssignmentState.java @@ -66,6 +66,10 @@ && isAssigned( subscriptionPartition.getSubscriptionName(), subscriptionPartition.getPartition()); } + public Set getAssignedPartitions(SubscriptionName subscriptionName) { + return assigned.get(subscriptionName); + } + private boolean isAssigned(SubscriptionName name, int partition) { return assigned.containsKey(name) && assigned.get(name).contains(partition); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java index 797539028a..30997eb2bc 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/MessageReceiver.java @@ -34,4 +34,6 @@ default void update(Subscription newSubscription) {} void commit(Set offsets); boolean moveOffset(PartitionOffset offset); + + Set getAssignedPartitions(); } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java index 7abe865d26..7febb6fd21 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/ThrottlingMessageReceiver.java @@ -57,6 +57,11 @@ public boolean moveOffset(PartitionOffset offset) { return receiver.moveOffset(offset); } + @Override + public Set getAssignedPartitions() { + return receiver.getAssignedPartitions(); + } + @Override public void stop() { receiver.stop(); diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java index 591a24384d..f890174b58 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/UninitializedMessageReceiver.java @@ -21,4 +21,9 @@ public void commit(Set offsets) { public boolean moveOffset(PartitionOffset offset) { throw new ConsumerNotInitializedException(); } + + @Override + public Set getAssignedPartitions() { + throw new ConsumerNotInitializedException(); + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java index 67ac4f0b6a..b6addea90d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/FilteringMessageReceiver.java @@ -68,4 +68,9 @@ public void commit(Set offsets) { public boolean moveOffset(PartitionOffset offset) { return receiver.moveOffset(offset); } + + @Override + public Set getAssignedPartitions() { + return receiver.getAssignedPartitions(); + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java index e256371bc3..b1f6f9b86d 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/consumer/receiver/kafka/KafkaSingleThreadedMessageReceiver.java @@ -226,4 +226,8 @@ private Map createOffset( public boolean moveOffset(PartitionOffset offset) { return offsetMover.move(offset); } + + public Set getAssignedPartitions() { + return partitionAssignmentState.getAssignedPartitions(subscription.getQualifiedName()); + } } diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java index 3ccd141f89..9ecde5980e 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/supervisor/process/Retransmitter.java @@ -28,7 +28,10 @@ public void reloadOffsets(SubscriptionName subscriptionName, Consumer consumer) try { PartitionOffsets offsets = subscriptionOffsetChangeIndicator.getSubscriptionOffsets( - subscriptionName.getTopicName(), subscriptionName.getName(), brokersClusterName); + subscriptionName.getTopicName(), + subscriptionName.getName(), + brokersClusterName, + consumer.getAssignedPartitions()); for (PartitionOffset partitionOffset : offsets) { if (moveOffset(subscriptionName, consumer, partitionOffset)) { diff --git a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy index ae3a213ac0..b503282b7b 100644 --- a/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy +++ b/hermes-consumers/src/test/groovy/pl/allegro/tech/hermes/consumers/supervisor/process/ConsumerStub.groovy @@ -64,6 +64,11 @@ class ConsumerStub implements Consumer { return subscription } + @Override + Set getAssignedPartitions() { + return null + } + @Override void updateTopic(Topic topic) { }