Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKYEDEN-3020 | consumers retransmission improvements #1941

Merged
merged 11 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ public PartitionOffsets addAll(PartitionOffsets offsets) {
public Iterator<PartitionOffset> iterator() {
return offsets.iterator();
}

public boolean isEmpty() {
return offsets.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.batch.MessageBatch;
Expand Down Expand Up @@ -239,11 +239,11 @@ public void commit(Set<SubscriptionPartitionOffset> offsetsToCommit) {
}

@Override
public boolean moveOffset(PartitionOffset partitionOffset) {
public PartitionOffsets moveOffset(PartitionOffsets partitionOffsets) {
if (receiver != null) {
return receiver.moveOffset(partitionOffset);
return receiver.moveOffset(partitionOffsets);
}
return false;
return new PartitionOffsets();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

public interface Consumer {
Expand All @@ -26,7 +26,7 @@ public interface Consumer {

void commit(Set<SubscriptionPartitionOffset> offsets);

boolean moveOffset(PartitionOffset subscriptionPartitionOffset);
PartitionOffsets moveOffset(PartitionOffsets subscriptionPartitionOffsets);

Subscription getSubscription();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.CommonConsumerParameters;
import pl.allegro.tech.hermes.consumers.consumer.converter.MessageConverterResolver;
Expand Down Expand Up @@ -262,8 +262,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return messageReceiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return messageReceiver.moveOffset(offsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.message.wrapper.CompositeMessageContentWrapper;
import pl.allegro.tech.hermes.common.message.wrapper.UnsupportedContentTypeException;
import pl.allegro.tech.hermes.consumers.consumer.Message;
Expand Down Expand Up @@ -180,8 +180,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
receiver.commit(offsets);
}

public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}

public Set<Integer> getAssignedPartitions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void assign(SubscriptionName name, Collection<Integer> partitions) {
}));
}

private void incrementTerm(SubscriptionName name) {
public void incrementTerm(SubscriptionName name) {
terms.compute(name, ((subscriptionName, term) -> term == null ? 0L : term + 1L));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,55 @@
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.offset.ConsumerPartitionAssignmentState;

public class KafkaConsumerOffsetMover {

private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerOffsetMover.class);

private final SubscriptionName subscriptionName;
private KafkaConsumer consumer;
private ConsumerPartitionAssignmentState partitionAssignmentState;

public KafkaConsumerOffsetMover(SubscriptionName subscriptionName, KafkaConsumer consumer) {
public KafkaConsumerOffsetMover(
SubscriptionName subscriptionName,
KafkaConsumer consumer,
ConsumerPartitionAssignmentState partitionAssignmentState) {
this.subscriptionName = subscriptionName;
this.consumer = consumer;
this.partitionAssignmentState = partitionAssignmentState;
}

public boolean move(PartitionOffset offset) {
public PartitionOffsets move(PartitionOffsets offsets) {
PartitionOffsets movedOffsets = new PartitionOffsets();

for (PartitionOffset offset : offsets) {
if (move(offset)) {
movedOffsets.add(offset);
}
}

commit(movedOffsets);

if (!movedOffsets.isEmpty()) {
// Incrementing assignment term ensures that currently committed offsets won't be overwritten
// by the events from the past which are concurrently processed by the consumer
partitionAssignmentState.incrementTerm(subscriptionName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are You sure that this is the correct order to commit first and then increment assignment term?

Copy link
Collaborator Author

@MarcinBobinski MarcinBobinski Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Order doesn't matter in this case. Even though the events are processed concurrently, the commitment of offsets is synchronous. The method commitIfReady can only be executed when the entire process of retransmission is complete.

}

return movedOffsets;
}

private boolean move(PartitionOffset offset) {
try {
TopicPartition tp = new TopicPartition(offset.getTopic().asString(), offset.getPartition());
if (consumer.assignment().contains(tp)) {
Expand All @@ -46,4 +76,24 @@ public boolean move(PartitionOffset offset) {
return false;
}
}

private void commit(PartitionOffsets partitionOffsets) {
try {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new LinkedHashMap<>();
for (PartitionOffset partitionOffset : partitionOffsets) {
offsetsToCommit.put(
new TopicPartition(
partitionOffset.getTopic().asString(), partitionOffset.getPartition()),
new OffsetAndMetadata(partitionOffset.getOffset()));
}
if (!offsetsToCommit.isEmpty()) {
consumer.commitSync(offsetsToCommit);
}
} catch (Exception e) {
logger.error(
"Failed to commit offsets while trying to move them for subscription {}",
subscriptionName,
e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.Optional;
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

Expand Down Expand Up @@ -33,7 +33,7 @@ default void update(Subscription newSubscription) {}

void commit(Set<SubscriptionPartitionOffset> offsets);

boolean moveOffset(PartitionOffset offset);
PartitionOffsets moveOffset(PartitionOffsets offsets);

Set<Integer> getAssignedPartitions();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.idletime.IdleTimeCalculator;
Expand Down Expand Up @@ -53,8 +53,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.util.Optional;
import java.util.Set;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

Expand All @@ -18,7 +18,7 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
throw new ConsumerNotInitializedException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import java.util.Optional;
import java.util.Set;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.filtering.FilteredMessageHandler;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
Expand Down Expand Up @@ -65,8 +65,8 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return receiver.moveOffset(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return receiver.moveOffset(offsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.KafkaTopic;
import pl.allegro.tech.hermes.common.kafka.KafkaTopics;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets;
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.load.SubscriptionLoadRecorder;
Expand Down Expand Up @@ -74,7 +74,9 @@ public KafkaSingleThreadedMessageReceiver(
this.partitionAssignmentState = partitionAssignmentState;
this.consumer = consumer;
this.readQueue = new ArrayBlockingQueue<>(readQueueCapacity);
this.offsetMover = new KafkaConsumerOffsetMover(subscription.getQualifiedName(), consumer);
this.offsetMover =
new KafkaConsumerOffsetMover(
subscription.getQualifiedName(), consumer, partitionAssignmentState);
Map<String, KafkaTopic> topics =
getKafkaTopics(topic, kafkaNamesMapper).stream()
.collect(Collectors.toMap(t -> t.name().asString(), Function.identity()));
Expand Down Expand Up @@ -195,6 +197,7 @@ public void commit(Set<SubscriptionPartitionOffset> offsets) {

private Map<TopicPartition, OffsetAndMetadata> createOffset(
Set<SubscriptionPartitionOffset> partitionOffsets) {

Map<TopicPartition, OffsetAndMetadata> offsetsData = new LinkedHashMap<>();
for (SubscriptionPartitionOffset partitionOffset : partitionOffsets) {
TopicPartition topicAndPartition =
Expand Down Expand Up @@ -223,8 +226,8 @@ private Map<TopicPartition, OffsetAndMetadata> createOffset(
}

@Override
public boolean moveOffset(PartitionOffset offset) {
return offsetMover.move(offset);
public PartitionOffsets moveOffset(PartitionOffsets offsets) {
return offsetMover.move(offsets);
}

public Set<Integer> getAssignedPartitions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,22 @@ public void reloadOffsets(SubscriptionName subscriptionName, Consumer consumer)
brokersClusterName,
consumer.getAssignedPartitions());

for (PartitionOffset partitionOffset : offsets) {
if (moveOffset(subscriptionName, consumer, partitionOffset)) {
subscriptionOffsetChangeIndicator.removeOffset(
subscriptionName.getTopicName(),
subscriptionName.getName(),
brokersClusterName,
partitionOffset.getTopic(),
partitionOffset.getPartition());
logger.info(
"Removed offset indicator for subscription={} and partition={}",
subscriptionName,
partitionOffset.getPartition());
}
PartitionOffsets movedOffsets = consumer.moveOffset(offsets);

for (PartitionOffset partitionOffset : movedOffsets) {
subscriptionOffsetChangeIndicator.removeOffset(
subscriptionName.getTopicName(),
subscriptionName.getName(),
brokersClusterName,
partitionOffset.getTopic(),
partitionOffset.getPartition());
logger.info(
"Removed offset indicator for subscription={} and partition={}",
subscriptionName,
partitionOffset.getPartition());
}
} catch (Exception ex) {
throw new RetransmissionException(ex);
}
}

private boolean moveOffset(
SubscriptionName subscriptionName, Consumer consumer, PartitionOffset partitionOffset) {
try {
return consumer.moveOffset(partitionOffset);
} catch (IllegalStateException ex) {
logger.warn(
"Cannot move offset for subscription={} and partition={} , possibly owned by different node",
subscriptionName,
partitionOffset.getPartition(),
ex);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package pl.allegro.tech.hermes.consumers.supervisor.process

import pl.allegro.tech.hermes.api.Subscription
import pl.allegro.tech.hermes.api.Topic
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffsets
import pl.allegro.tech.hermes.consumers.consumer.Consumer
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset

Expand Down Expand Up @@ -78,8 +78,8 @@ class ConsumerStub implements Consumer {
}

@Override
boolean moveOffset(PartitionOffset partitionOffset) {
return true
PartitionOffsets moveOffset(PartitionOffsets partitionOffset) {
return partitionOffset
}

boolean getInitialized() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void waitUntilConsumerCommitsOffset(String topicQualifiedName, String sub
});
}

private long calculateCommittedMessages(String topicQualifiedName, String subscription) {
public long calculateCommittedMessages(String topicQualifiedName, String subscription) {
AtomicLong messagesCommittedCount = new AtomicLong(0);
List<ConsumerGroup> consumerGroups =
getConsumerGroupsDescription(topicQualifiedName, subscription)
Expand Down
Loading
Loading