Skip to content

Commit

Permalink
SKYEDEN-3020 | prevent overriding commited offset with lower offset
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcinBobinski committed Dec 23, 2024
1 parent b304b45 commit 94fd563
Showing 1 changed file with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.Message;
Expand Down Expand Up @@ -196,15 +195,24 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {

private Map<TopicPartition, OffsetAndMetadata> createOffset(
Set<SubscriptionPartitionOffset> partitionOffsets) {
Map<TopicPartition, OffsetAndMetadata> commitedOffsetsData =
fetchCommitedOffsetsMetadata(partitionOffsets);

Map<TopicPartition, OffsetAndMetadata> offsetsData = new LinkedHashMap<>();
for (SubscriptionPartitionOffset partitionOffset : partitionOffsets) {
TopicPartition topicAndPartition =
new TopicPartition(
partitionOffset.getKafkaTopicName().asString(), partitionOffset.getPartition());

Long commitedOffset =
Optional.ofNullable(commitedOffsetsData.get(topicAndPartition))
.map(OffsetAndMetadata::offset)
.orElse(Long.MIN_VALUE);

if (partitionAssignmentState.isAssignedPartitionAtCurrentTerm(
partitionOffset.getSubscriptionPartition())) {
if (consumer.position(topicAndPartition) >= partitionOffset.getOffset()) {
if (consumer.position(topicAndPartition) >= partitionOffset.getOffset()
&& partitionOffset.getOffset() > commitedOffset) {
offsetsData.put(topicAndPartition, new OffsetAndMetadata(partitionOffset.getOffset()));
} else {
skippedCounter.increment();
Expand All @@ -227,4 +235,17 @@ private Map<TopicPartition, OffsetAndMetadata> createOffset(
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return offsetMover.move(offsets);
}

private Map<TopicPartition, OffsetAndMetadata> fetchCommitedOffsetsMetadata(
Set<SubscriptionPartitionOffset> partitionOffsets) {
Set<TopicPartition> topicPartitions =
partitionOffsets.stream()
.map(
offset ->
new TopicPartition(
offset.getKafkaTopicName().asString(), offset.getPartition()))
.collect(Collectors.toSet());

return consumer.committed(topicPartitions);
}
}

0 comments on commit 94fd563

Please sign in to comment.