Skip to content

Commit

Permalink
SKYEDEN-3020 | consumers retransmission improvements (#1941)
Browse files Browse the repository at this point in the history
* SKYEDEN-3020 | commiting offsets change on retransmission

* SKYEDEN-3020 | prevent overriding commited offset with lower offset

* SKYEDEN-3020 | tests + new approach

* SKYEDEN-3020 | add comment

* SKYEDEN-3020 | resolve conflicts

* SKYEDEN-3020 | refactor
  • Loading branch information
MarcinBobinski authored Jan 10, 2025
1 parent f551c5b commit 80ab55c
Show file tree
Hide file tree
Showing 16 changed files with 121 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ public PartitionOffsets addAll(PartitionOffsets offsets) {
public Iterator<PartitionOffset> iterator() {
return offsets.iterator();
}

public boolean isEmpty() {
return offsets.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch;
Expand Down Expand Up @@ -239,11 +239,11 @@ public void commit(Set<SubscriptionPartitionOffset> offsetsToCommit) {
}

@Override
public boolean moveOffset(PartitionOffset partitionOffset) {
public PartitionOffsets moveOffset(PartitionOffsets partitionOffsets) {
if (receiver != null) {
return receiver.moveOffset(partitionOffset);
return receiver.moveOffset(partitionOffsets);
}
return false;
return new PartitionOffsets();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

public interface Consumer {
Expand All @@ -26,7 +26,7 @@ public interface Consumer {

void commit(Set<SubscriptionPartitionOffset> offsets);

boolean moveOffset(PartitionOffset subscriptionPartitionOffset);
PartitionOffsets moveOffset(PartitionOffsets subscriptionPartitionOffsets);

Subscription getSubscription();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.CommonConsumerParameters;
import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver;
Expand Down Expand Up @@ -262,8 +262,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return messageReceiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return messageReceiver.moveOffset(offsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException;
import pl.allegro.tech.hermes.consumers.consumer.Message;
Expand Down Expand Up @@ -180,8 +180,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
receiver.commit(offsets);
}

public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}

public Set<Integer> getAssignedPartitions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void assign(SubscriptionName name, Collection<Integer> partitions) {
}));
}

private void incrementTerm(SubscriptionName name) {
public void incrementTerm(SubscriptionName name) {
terms.compute(name, ((subscriptionName, term) -> term == null ? 0L : term + 1L));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,55 @@
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState;

public class KafkaConsumerOffsetMover {

private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerOffsetMover.class);

private final SubscriptionName subscriptionName;
private KafkaConsumer consumer;
private ConsumerPartitionAssignmentState partitionAssignmentState;

public KafkaConsumerOffsetMover(SubscriptionName subscriptionName, KafkaConsumer consumer) {
public KafkaConsumerOffsetMover(
SubscriptionName subscriptionName,
KafkaConsumer consumer,
ConsumerPartitionAssignmentState partitionAssignmentState) {
this.subscriptionName = subscriptionName;
this.consumer = consumer;
this.partitionAssignmentState = partitionAssignmentState;
}

public boolean move(PartitionOffset offset) {
public PartitionOffsets move(PartitionOffsets offsets) {
PartitionOffsets movedOffsets = new PartitionOffsets();

for (PartitionOffset offset : offsets) {
if (move(offset)) {
movedOffsets.add(offset);
}
}

commit(movedOffsets);

if (!movedOffsets.isEmpty()) {
// Incrementing assignment term ensures that currently committed offsets won't be overwritten
// by the events from the past which are concurrently processed by the consumer
partitionAssignmentState.incrementTerm(subscriptionName);
}

return movedOffsets;
}

private boolean move(PartitionOffset offset) {
try {
TopicPartition tp = new TopicPartition(offset.getTopic().asString(), offset.getPartition());
if (consumer.assignment().contains(tp)) {
Expand All @@ -46,4 +76,24 @@ public boolean move(PartitionOffset offset) {
return false;
}
}

private void commit(PartitionOffsets partitionOffsets) {
try {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new LinkedHashMap<>();
for (PartitionOffset partitionOffset : partitionOffsets) {
offsetsToCommit.put(
new TopicPartition(
partitionOffset.getTopic().asString(), partitionOffset.getPartition()),
new OffsetAndMetadata(partitionOffset.getOffset()));
}
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit);
}
} catch (Exception e) {
logger.error(
"Failed to commit offsets while trying to move them for subscription {}",
subscriptionName,
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.Optional;
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

Expand Down Expand Up @@ -33,7 +33,7 @@ default void update(Subscription newSubscription) {}

void commit(Set<SubscriptionPartitionOffset> offsets);

boolean moveOffset(PartitionOffset offset);
PartitionOffsets moveOffset(PartitionOffsets offsets);

Set<Integer> getAssignedPartitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.idletime.IdleTimeCalculator;
Expand Down Expand Up @@ -53,8 +53,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.util.Optional;
import java.util.Set;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

Expand All @@ -18,7 +18,7 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
throw new ConsumerNotInitializedException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.Optional;
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.filtering.FilteredMessageHandler;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
Expand Down Expand Up @@ -65,8 +65,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder;
Expand Down Expand Up @@ -74,7 +74,9 @@ public KafkaSingleThreadedMessageReceiver(
this.partitionAssignmentState = partitionAssignmentState;
this.consumer = consumer;
this.readQueue = new ArrayBlockingQueue<>(readQueueCapacity);
this.offsetMover = new KafkaConsumerOffsetMover(subscription.getQualifiedName(), consumer);
this.offsetMover =
new KafkaConsumerOffsetMover(
subscription.getQualifiedName(), consumer, partitionAssignmentState);
Map<String, KafkaTopic> topics =
getKafkaTopics(topic, kafkaNamesMapper).stream()
.collect(Collectors.toMap(t -> t.name().asString(), Function.identity()));
Expand Down Expand Up @@ -195,6 +197,7 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {

private Map<TopicPartition, OffsetAndMetadata> createOffset(
Set<SubscriptionPartitionOffset> partitionOffsets) {

Map<TopicPartition, OffsetAndMetadata> offsetsData = new LinkedHashMap<>();
for (SubscriptionPartitionOffset partitionOffset : partitionOffsets) {
TopicPartition topicAndPartition =
Expand Down Expand Up @@ -223,8 +226,8 @@ private Map<TopicPartition, OffsetAndMetadata> createOffset(
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return offsetMover.move(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return offsetMover.move(offsets);
}

public Set<Integer> getAssignedPartitions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,22 @@ public void reloadOffsets(SubscriptionName subscriptionName, Consumer consumer)
brokersClusterName,
consumer.getAssignedPartitions());

for (PartitionOffset partitionOffset : offsets) {
if (moveOffset(subscriptionName, consumer, partitionOffset)) {
subscriptionOffsetChangeIndicator.removeOffset(
subscriptionName.getTopicName(),
subscriptionName.getName(),
brokersClusterName,
partitionOffset.getTopic(),
partitionOffset.getPartition());
logger.info(
"Removed offset indicator for subscription={} and partition={}",
subscriptionName,
partitionOffset.getPartition());
}
PartitionOffsets movedOffsets = consumer.moveOffset(offsets);

for (PartitionOffset partitionOffset : movedOffsets) {
subscriptionOffsetChangeIndicator.removeOffset(
subscriptionName.getTopicName(),
subscriptionName.getName(),
brokersClusterName,
partitionOffset.getTopic(),
partitionOffset.getPartition());
logger.info(
"Removed offset indicator for subscription={} and partition={}",
subscriptionName,
partitionOffset.getPartition());
}
} catch (Exception ex) {
throw new RetransmissionException(ex);
}
}

private boolean moveOffset(
SubscriptionName subscriptionName, Consumer consumer, PartitionOffset partitionOffset) {
try {
return consumer.moveOffset(partitionOffset);
} catch (IllegalStateException ex) {
logger.warn(
"Cannot move offset for subscription={} and partition={} , possibly owned by different node",
subscriptionName,
partitionOffset.getPartition(),
ex);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.allegro.tech.hermes.consumers.supervisor.process

import pl.allegro.tech.hermes.api.Subscription
import pl.allegro.tech.hermes.api.Topic
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets
import pl.allegro.tech.hermes.consumers.consumer.Consumer
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset

Expand Down Expand Up @@ -78,8 +78,8 @@ class ConsumerStub implements Consumer {
}

@Override
boolean moveOffset(PartitionOffset partitionOffset) {
return true
PartitionOffsets moveOffset(PartitionOffsets partitionOffset) {
return partitionOffset
}

boolean getInitialized() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void waitUntilConsumerCommitsOffset(String topicQualifiedName, String sub
});
}

private long calculateCommittedMessages(String topicQualifiedName, String subscription) {
public long calculateCommittedMessages(String topicQualifiedName, String subscription) {
AtomicLong messagesCommittedCount = new AtomicLong(0);
List<ConsumerGroup> consumerGroups =
getConsumerGroupsDescription(topicQualifiedName, subscription)
Expand Down
Loading

0 comments on commit 80ab55c

Please sign in to comment.