Skip to content

Commit

Permalink
SKYEDEN-3020 | commiting offsets change on retransmission
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcinBobinski committed Dec 23, 2024
1 parent aff7824 commit b304b45
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch;
Expand Down Expand Up @@ -239,11 +239,11 @@ public void commit(Set<SubscriptionPartitionOffset> offsetsToCommit) {
}

@Override
public boolean moveOffset(PartitionOffset partitionOffset) {
public PartitionOffsets moveOffset(PartitionOffsets partitionOffsets) {
if (receiver != null) {
return receiver.moveOffset(partitionOffset);
return receiver.moveOffset(partitionOffsets);
}
return false;
return new PartitionOffsets();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

public interface Consumer {
Expand All @@ -26,7 +26,7 @@ public interface Consumer {

void commit(Set<SubscriptionPartitionOffset> offsets);

boolean moveOffset(PartitionOffset subscriptionPartitionOffset);
PartitionOffsets moveOffset(PartitionOffsets subscriptionPartitionOffsets);

Subscription getSubscription();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.CommonConsumerParameters;
import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver;
Expand Down Expand Up @@ -262,8 +262,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return messageReceiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return messageReceiver.moveOffset(offsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException;
import pl.allegro.tech.hermes.consumers.consumer.Message;
Expand Down Expand Up @@ -180,7 +180,7 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
receiver.commit(offsets);
}

public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;

public class KafkaConsumerOffsetMover {

Expand All @@ -19,7 +23,21 @@ public KafkaConsumerOffsetMover(SubscriptionName subscriptionName, KafkaConsumer
this.consumer = consumer;
}

public boolean move(PartitionOffset offset) {
public PartitionOffsets move(PartitionOffsets offsets) {
PartitionOffsets movedOffsets = new PartitionOffsets();

for (PartitionOffset offset : offsets) {
if (move(offset)) {
movedOffsets.add(offset);
}
}

commit(movedOffsets);

return movedOffsets;
}

private boolean move(PartitionOffset offset) {
try {
TopicPartition tp = new TopicPartition(offset.getTopic().asString(), offset.getPartition());
if (consumer.assignment().contains(tp)) {
Expand All @@ -46,4 +64,24 @@ public boolean move(PartitionOffset offset) {
return false;
}
}

private void commit(PartitionOffsets partitionOffsets) {
try {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new LinkedHashMap<>();
for (PartitionOffset partitionOffset : partitionOffsets) {
offsetsToCommit.put(
new TopicPartition(
partitionOffset.getTopic().asString(), partitionOffset.getPartition()),
new OffsetAndMetadata(partitionOffset.getOffset()));
}
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit);
}
} catch (Exception e) {
logger.error(
"Failed to commit offsets while trying to move them for subscription {}",
subscriptionName,
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.Optional;
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

Expand Down Expand Up @@ -33,5 +33,5 @@ default void update(Subscription newSubscription) {}

void commit(Set<SubscriptionPartitionOffset> offsets);

boolean moveOffset(PartitionOffset offset);
PartitionOffsets moveOffset(PartitionOffsets offsets);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.idletime.IdleTimeCalculator;
Expand Down Expand Up @@ -53,8 +53,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.util.Optional;
import java.util.Set;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

Expand All @@ -18,7 +18,7 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
throw new ConsumerNotInitializedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.Optional;
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.filtering.FilteredMessageHandler;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
Expand Down Expand Up @@ -65,7 +65,7 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder;
Expand Down Expand Up @@ -223,7 +224,7 @@ private Map<TopicPartition, OffsetAndMetadata> createOffset(
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return offsetMover.move(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return offsetMover.move(offsets);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,22 @@ public void reloadOffsets(SubscriptionName subscriptionName, Consumer consumer)
subscriptionOffsetChangeIndicator.getSubscriptionOffsets(
subscriptionName.getTopicName(), subscriptionName.getName(), brokersClusterName);

for (PartitionOffset partitionOffset : offsets) {
if (moveOffset(subscriptionName, consumer, partitionOffset)) {
subscriptionOffsetChangeIndicator.removeOffset(
subscriptionName.getTopicName(),
subscriptionName.getName(),
brokersClusterName,
partitionOffset.getTopic(),
partitionOffset.getPartition());
logger.info(
"Removed offset indicator for subscription={} and partition={}",
subscriptionName,
partitionOffset.getPartition());
}
PartitionOffsets movedOffsets = consumer.moveOffset(offsets);

for (PartitionOffset partitionOffset : movedOffsets) {
subscriptionOffsetChangeIndicator.removeOffset(
subscriptionName.getTopicName(),
subscriptionName.getName(),
brokersClusterName,
partitionOffset.getTopic(),
partitionOffset.getPartition());
logger.info(
"Removed offset indicator for subscription={} and partition={}",
subscriptionName,
partitionOffset.getPartition());
}
} catch (Exception ex) {
throw new RetransmissionException(ex);
}
}

private boolean moveOffset(
SubscriptionName subscriptionName, Consumer consumer, PartitionOffset partitionOffset) {
try {
return consumer.moveOffset(partitionOffset);
} catch (IllegalStateException ex) {
logger.warn(
"Cannot move offset for subscription={} and partition={} , possibly owned by different node",
subscriptionName,
partitionOffset.getPartition(),
ex);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.allegro.tech.hermes.consumers.supervisor.process

import pl.allegro.tech.hermes.api.Subscription
import pl.allegro.tech.hermes.api.Topic
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets
import pl.allegro.tech.hermes.consumers.consumer.Consumer
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset

Expand Down Expand Up @@ -73,8 +73,8 @@ class ConsumerStub implements Consumer {
}

@Override
boolean moveOffset(PartitionOffset partitionOffset) {
return true
PartitionOffsets moveOffset(PartitionOffsets partitionOffset) {
return partitionOffset
}

boolean getInitialized() {
Expand Down

0 comments on commit b304b45

Please sign in to comment.