Skip to content

Commit

Permalink
SKYEDEN-3020 | Retransmission with detached consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcinBobinski committed Dec 16, 2024
1 parent aff7824 commit 04857b0
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ public Set<ConsumerGroupMember> getMembers() {
return members;
}

public boolean isStable() {
return state.equals("Stable");
}

public boolean isEmpty() {
return state.equals("Empty");
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public Response retransmit(
@Context ContainerRequestContext requestContext) {

MultiDCOffsetChangeSummary summary =
multiDCAwareService.retransmit(
subscriptionService.retransmit(
topicService.getTopicDetails(TopicName.fromQualifiedName(qualifiedTopicName)),
subscriptionName,
offsetRetransmissionDate.getRetransmissionDate().toInstant().toEpochMilli(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@

public interface RetransmissionService {

List<PartitionOffset> indicateOffsetChange(
Topic topic, String subscription, String brokersClusterName, long timestamp, boolean dryRun);
List<PartitionOffset> fetchTopicOffsetsAt(Topic topic, Long timestamp);

void indicateOffsetChange(
Topic topic,
String subscription,
String brokersClusterName,
List<PartitionOffset> partitionOffsets);

boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@
import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthChecker;
import pl.allegro.tech.hermes.management.domain.subscription.validator.SubscriptionValidator;
import pl.allegro.tech.hermes.management.domain.topic.TopicService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MovingSubscriptionOffsetsValidationException;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCOffsetChangeSummary;
import pl.allegro.tech.hermes.tracker.management.LogRepository;

public class SubscriptionService {
Expand Down Expand Up @@ -461,4 +463,35 @@ private List<SubscriptionNameWithMetrics> getSubscriptionsMetrics(
})
.collect(toList());
}

public MultiDCOffsetChangeSummary retransmit(
Topic topic, String subscriptionName, Long timestamp, boolean dryRun, RequestUser requester) {
Subscription subscription = getSubscriptionDetails(topic.getName(), subscriptionName);

MultiDCOffsetChangeSummary multiDCOffsetChangeSummary =
multiDCAwareService.fetchTopicOffsetsAt(topic, timestamp);

if (dryRun) return multiDCOffsetChangeSummary;

switch (subscription.getState()) {
case ACTIVE:
multiDCAwareService.consumerRetransmission(
topic,
subscriptionName,
multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName(),
requester);
break;
case SUSPENDED:
multiDCAwareService.retransmit(
topic,
subscriptionName,
multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName());
break;
case PENDING:
throw new MovingSubscriptionOffsetsValidationException(
"Cannot retransmit messages for subscription in PENDING state");
}

return multiDCOffsetChangeSummary;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ void waitUntilAllSubscriptionsHasConsumersAssigned(

private void notifySingleSubscription(
Topic topic, Instant beforeMigrationInstant, String subscriptionName, RequestUser requester) {
multiDCAwareService.retransmit(
topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false, requester);
multiDCAwareService.consumerRetransmission(
topic,
subscriptionName,
multiDCAwareService
.fetchTopicOffsetsAt(topic, beforeMigrationInstant.toEpochMilli())
.getPartitionOffsetListPerBrokerName(),
requester);
}

private void waitUntilOffsetsAvailableOnAllKafkaTopics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
Expand All @@ -19,6 +21,7 @@
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.exception.InternalProcessingException;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.management.domain.auth.RequestUser;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand;
Expand Down Expand Up @@ -62,36 +65,72 @@ public String readMessageFromPrimary(
.readMessageFromPrimary(topic, partition, offset);
}

public MultiDCOffsetChangeSummary retransmit(
Topic topic, String subscriptionName, Long timestamp, boolean dryRun, RequestUser requester) {
public MultiDCOffsetChangeSummary fetchTopicOffsetsAt(Topic topic, Long timestamp) {
MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary();

clusters.forEach(
cluster ->
multiDCOffsetChangeSummary.addPartitionOffsetList(
cluster.getClusterName(),
cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun)));

if (!dryRun) {
logger.info(
"Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}",
topic.getQualifiedName() + "$" + subscriptionName,
requester.getUsername(),
timestamp);
multiDcExecutor.executeByUser(
new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())),
requester);
clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName));
logger.info(
"Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}",
topic.getQualifiedName() + "$" + subscriptionName,
requester.getUsername(),
timestamp);
}
cluster.getClusterName(), cluster.fetchTopicOffsetsAt(topic, timestamp)));

return multiDCOffsetChangeSummary;
}

public void retransmit(
Topic topic,
String subscriptionName,
Map<String, List<PartitionOffset>> brokerPartitionOffsets) {

clusters.forEach(
cluster ->
cluster.validateIfOffsetsCanBeMoved(
topic, new SubscriptionName(subscriptionName, topic.getName())));

clusters.forEach(
cluster ->
cluster.moveOffsets(
topic,
new SubscriptionName(subscriptionName, topic.getName()),
brokerPartitionOffsets.getOrDefault(
cluster.getClusterName(), Collections.emptyList())));
}

public void consumerRetransmission(
Topic topic,
String subscriptionName,
Map<String, List<PartitionOffset>> brokerPartitionOffsets,
RequestUser requester) {

clusters.forEach(
cluster ->
cluster.validateIfOffsetsCanBeMovedByConsumers(
topic, new SubscriptionName(subscriptionName, topic.getName())));

clusters.forEach(
cluster ->
cluster.indicateOffsetChange(
topic,
subscriptionName,
brokerPartitionOffsets.getOrDefault(
cluster.getClusterName(), Collections.emptyList())));

logger.info(
"Starting retransmission for subscription {}. Requested by {}. Retransmission offsets: {}",
topic.getQualifiedName() + "$" + subscriptionName,
requester.getUsername(),
brokerPartitionOffsets);

multiDcExecutor.executeByUser(
new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester);
clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName));

logger.info(
"Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission offsets: {}",
topic.getQualifiedName() + "$" + subscriptionName,
requester.getUsername(),
brokerPartitionOffsets);
}

public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) {
return clusters.stream()
.allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic));
Expand Down Expand Up @@ -167,6 +206,7 @@ public List<ConsumerGroup> describeConsumerGroups(Topic topic, String subscripti
}

public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) {
clusters.forEach(c -> c.validateIfOffsetsCanBeMoved(topic, subscription));
clusters.forEach(c -> c.moveOffsetsToTheEnd(topic, subscription));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,14 @@ public String readMessageFromPrimary(Topic topic, Integer partition, Long offset
topic, kafkaNamesMapper.toKafkaTopics(topic).getPrimary(), partition, offset);
}

public List<PartitionOffset> indicateOffsetChange(
Topic topic, String subscriptionName, Long timestamp, boolean dryRun) {
return retransmissionService.indicateOffsetChange(
topic, subscriptionName, clusterName, timestamp, dryRun);
public void indicateOffsetChange(
Topic topic, String subscriptionName, List<PartitionOffset> partitionOffsets) {
retransmissionService.indicateOffsetChange(
topic, subscriptionName, clusterName, partitionOffsets);
}

public List<PartitionOffset> fetchTopicOffsetsAt(Topic topic, Long timestamp) {
return retransmissionService.fetchTopicOffsetsAt(topic, timestamp);
}

public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) {
Expand Down Expand Up @@ -152,24 +156,41 @@ public Optional<ConsumerGroup> describeConsumerGroup(Topic topic, String subscri
}

public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) {
validateIfOffsetsCanBeMoved(topic, subscription);
KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(topic, subscription);
moveOffsets(
subscription, consumer, buildOffsetsMetadata(consumer.endOffsets(consumer.assignment())));
consumer.close();
}

KafkaConsumer<byte[], byte[]> consumer = kafkaConsumerManager.createConsumer(subscription);
String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString();
Set<TopicPartition> topicPartitions = getTopicPartitions(consumer, kafkaTopicName);
consumer.assign(topicPartitions);
public void moveOffsets(
Topic topic, SubscriptionName subscription, List<PartitionOffset> offsets) {

Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
Map<TopicPartition, OffsetAndMetadata> endOffsetsMetadata = buildOffsetsMetadata(endOffsets);
consumer.commitSync(endOffsetsMetadata);
KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(topic, subscription);
moveOffsets(subscription, consumer, buildOffsetsMetadata(offsets));
consumer.close();
}

private void moveOffsets(
SubscriptionName subscription,
KafkaConsumer<byte[], byte[]> consumer,
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata) {
consumer.commitSync(offsetAndMetadata);

logger.info(
"Successfully moved offset to the end position for subscription {} and consumer group {}",
subscription.getQualifiedName(),
kafkaNamesMapper.toConsumerGroupId(subscription));
}

private KafkaConsumer<byte[], byte[]> createKafkaConsumer(
Topic topic, SubscriptionName subscription) {
KafkaConsumer<byte[], byte[]> consumer = kafkaConsumerManager.createConsumer(subscription);
String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString();
Set<TopicPartition> topicPartitions = getTopicPartitions(consumer, kafkaTopicName);
consumer.assign(topicPartitions);
return consumer;
}

private int numberOfAssignmentsForConsumersGroups(List<String> consumerGroupsIds)
throws ExecutionException, InterruptedException {
Collection<ConsumerGroupDescription> consumerGroupsDescriptions =
Expand All @@ -182,11 +203,32 @@ private int numberOfAssignmentsForConsumersGroups(List<String> consumerGroupsIds
.size();
}

private void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) {
public void validateIfOffsetsCanBeMovedByConsumers(Topic topic, SubscriptionName subscription) {
describeConsumerGroup(topic, subscription.getName())
.ifPresentOrElse(
group -> {
if (!group.isStable()) {
String s =
format(
"Consumer group %s for subscription %s is not stable.",
group.getGroupId(), subscription.getQualifiedName());
throw new MovingSubscriptionOffsetsValidationException(s);
}
},
() -> {
String s =
format(
"No consumer group for subscription %s exists.",
subscription.getQualifiedName());
throw new MovingSubscriptionOffsetsValidationException(s);
});
}

public void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) {
describeConsumerGroup(topic, subscription.getName())
.ifPresentOrElse(
group -> {
if (!group.getMembers().isEmpty()) {
if (!group.isEmpty()) {
String s =
format(
"Consumer group %s for subscription %s has still active members.",
Expand Down Expand Up @@ -228,4 +270,15 @@ private Map<TopicPartition, OffsetAndMetadata> buildOffsetsMetadata(
.map(entry -> ImmutablePair.of(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
.collect(toMap(Pair::getKey, Pair::getValue));
}

private Map<TopicPartition, OffsetAndMetadata> buildOffsetsMetadata(
List<PartitionOffset> offsets) {
return offsets.stream()
.map(
offset ->
ImmutablePair.of(
new TopicPartition(offset.getTopic().asString(), offset.getPartition()),
new OffsetAndMetadata(offset.getOffset())))
.collect(toMap(Pair::getKey, Pair::getValue));
}
}
Loading

0 comments on commit 04857b0

Please sign in to comment.