diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java index 61528237df..223c7132be 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/ConsumerGroup.java @@ -41,6 +41,14 @@ public Set getMembers() { return members; } + public boolean isStable() { + return state.equals("Stable"); + } + + public boolean isEmpty() { + return state.equals("Empty"); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java index 4c92836529..b0ec590bd9 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/SubscriptionsEndpoint.java @@ -273,7 +273,7 @@ public Response retransmit( @Context ContainerRequestContext requestContext) { MultiDCOffsetChangeSummary summary = - multiDCAwareService.retransmit( + subscriptionService.retransmit( topicService.getTopicDetails(TopicName.fromQualifiedName(qualifiedTopicName)), subscriptionName, offsetRetransmissionDate.getRetransmissionDate().toInstant().toEpochMilli(), diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java index 5ad8f6fc35..b180077df1 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java @@ -6,8 +6,13 @@ public interface RetransmissionService { - List indicateOffsetChange( - Topic topic, String subscription, String brokersClusterName, long timestamp, boolean dryRun); + List fetchTopicOffsetsAt(Topic topic, Long timestamp); + + void indicateOffsetChange( + Topic topic, + String subscription, + String brokersClusterName, + List partitionOffsets); boolean areOffsetsMoved(Topic topic, String subscriptionName, String brokersClusterName); } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java index c8d088e598..a6fef68a1d 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java @@ -51,7 +51,9 @@ import pl.allegro.tech.hermes.management.domain.subscription.health.SubscriptionHealthChecker; import pl.allegro.tech.hermes.management.domain.subscription.validator.SubscriptionValidator; import pl.allegro.tech.hermes.management.domain.topic.TopicService; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MovingSubscriptionOffsetsValidationException; import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCAwareService; +import pl.allegro.tech.hermes.management.infrastructure.kafka.MultiDCOffsetChangeSummary; import pl.allegro.tech.hermes.tracker.management.LogRepository; public class SubscriptionService { @@ -461,4 +463,35 @@ private List getSubscriptionsMetrics( }) .collect(toList()); } + + public MultiDCOffsetChangeSummary retransmit( + Topic topic, String subscriptionName, Long timestamp, boolean dryRun, RequestUser requester) { + Subscription subscription = getSubscriptionDetails(topic.getName(), subscriptionName); + + MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = + multiDCAwareService.fetchTopicOffsetsAt(topic, timestamp); + + if (dryRun) return multiDCOffsetChangeSummary; + + switch (subscription.getState()) { + case ACTIVE: + multiDCAwareService.consumerRetransmission( + topic, + subscriptionName, + multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName(), + requester); + break; + case SUSPENDED: + multiDCAwareService.retransmit( + topic, + subscriptionName, + multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName()); + break; + case PENDING: + throw new MovingSubscriptionOffsetsValidationException( + "Cannot retransmit messages for subscription in PENDING state"); + } + + return multiDCOffsetChangeSummary; + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java index 44117189d9..4ac5c59812 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java @@ -63,8 +63,13 @@ void waitUntilAllSubscriptionsHasConsumersAssigned( private void notifySingleSubscription( Topic topic, Instant beforeMigrationInstant, String subscriptionName, RequestUser requester) { - multiDCAwareService.retransmit( - topic, subscriptionName, beforeMigrationInstant.toEpochMilli(), false, requester); + multiDCAwareService.consumerRetransmission( + topic, + subscriptionName, + multiDCAwareService + .fetchTopicOffsetsAt(topic, beforeMigrationInstant.toEpochMilli()) + .getPartitionOffsetListPerBrokerName(), + requester); } private void waitUntilOffsetsAvailableOnAllKafkaTopics( diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java index 17729e6d21..4e321b22f6 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java @@ -6,8 +6,10 @@ import java.time.Duration; import java.time.Instant; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.function.Consumer; @@ -19,6 +21,7 @@ import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.common.exception.InternalProcessingException; +import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; import pl.allegro.tech.hermes.management.domain.auth.RequestUser; import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor; import pl.allegro.tech.hermes.management.domain.retransmit.RetransmitCommand; @@ -62,36 +65,72 @@ public String readMessageFromPrimary( .readMessageFromPrimary(topic, partition, offset); } - public MultiDCOffsetChangeSummary retransmit( - Topic topic, String subscriptionName, Long timestamp, boolean dryRun, RequestUser requester) { + public MultiDCOffsetChangeSummary fetchTopicOffsetsAt(Topic topic, Long timestamp) { MultiDCOffsetChangeSummary multiDCOffsetChangeSummary = new MultiDCOffsetChangeSummary(); clusters.forEach( cluster -> multiDCOffsetChangeSummary.addPartitionOffsetList( - cluster.getClusterName(), - cluster.indicateOffsetChange(topic, subscriptionName, timestamp, dryRun))); - - if (!dryRun) { - logger.info( - "Starting retransmission for subscription {}. Requested by {}. Retransmission timestamp: {}", - topic.getQualifiedName() + "$" + subscriptionName, - requester.getUsername(), - timestamp); - multiDcExecutor.executeByUser( - new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), - requester); - clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); - logger.info( - "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission timestamp: {}", - topic.getQualifiedName() + "$" + subscriptionName, - requester.getUsername(), - timestamp); - } + cluster.getClusterName(), cluster.fetchTopicOffsetsAt(topic, timestamp))); return multiDCOffsetChangeSummary; } + public void retransmit( + Topic topic, + String subscriptionName, + Map> brokerPartitionOffsets) { + + clusters.forEach( + cluster -> + cluster.validateIfOffsetsCanBeMoved( + topic, new SubscriptionName(subscriptionName, topic.getName()))); + + clusters.forEach( + cluster -> + cluster.moveOffsets( + topic, + new SubscriptionName(subscriptionName, topic.getName()), + brokerPartitionOffsets.getOrDefault( + cluster.getClusterName(), Collections.emptyList()))); + } + + public void consumerRetransmission( + Topic topic, + String subscriptionName, + Map> brokerPartitionOffsets, + RequestUser requester) { + + clusters.forEach( + cluster -> + cluster.validateIfOffsetsCanBeMovedByConsumers( + topic, new SubscriptionName(subscriptionName, topic.getName()))); + + clusters.forEach( + cluster -> + cluster.indicateOffsetChange( + topic, + subscriptionName, + brokerPartitionOffsets.getOrDefault( + cluster.getClusterName(), Collections.emptyList()))); + + logger.info( + "Starting retransmission for subscription {}. Requested by {}. Retransmission offsets: {}", + topic.getQualifiedName() + "$" + subscriptionName, + requester.getUsername(), + brokerPartitionOffsets); + + multiDcExecutor.executeByUser( + new RetransmitCommand(new SubscriptionName(subscriptionName, topic.getName())), requester); + clusters.forEach(clusters -> waitUntilOffsetsAreMoved(topic, subscriptionName)); + + logger.info( + "Successfully moved offsets for retransmission of subscription {}. Requested by user: {}. Retransmission offsets: {}", + topic.getQualifiedName() + "$" + subscriptionName, + requester.getUsername(), + brokerPartitionOffsets); + } + public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { return clusters.stream() .allMatch(cluster -> cluster.areOffsetsAvailableOnAllKafkaTopics(topic)); @@ -167,6 +206,7 @@ public List describeConsumerGroups(Topic topic, String subscripti } public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) { + clusters.forEach(c -> c.validateIfOffsetsCanBeMoved(topic, subscription)); clusters.forEach(c -> c.moveOffsetsToTheEnd(topic, subscription)); } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java index 2dcf3103f2..9d99f5bf04 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java @@ -90,10 +90,14 @@ public String readMessageFromPrimary(Topic topic, Integer partition, Long offset topic, kafkaNamesMapper.toKafkaTopics(topic).getPrimary(), partition, offset); } - public List indicateOffsetChange( - Topic topic, String subscriptionName, Long timestamp, boolean dryRun) { - return retransmissionService.indicateOffsetChange( - topic, subscriptionName, clusterName, timestamp, dryRun); + public void indicateOffsetChange( + Topic topic, String subscriptionName, List partitionOffsets) { + retransmissionService.indicateOffsetChange( + topic, subscriptionName, clusterName, partitionOffsets); + } + + public List fetchTopicOffsetsAt(Topic topic, Long timestamp) { + return retransmissionService.fetchTopicOffsetsAt(topic, timestamp); } public boolean areOffsetsAvailableOnAllKafkaTopics(Topic topic) { @@ -152,17 +156,25 @@ public Optional describeConsumerGroup(Topic topic, String subscri } public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) { - validateIfOffsetsCanBeMoved(topic, subscription); + KafkaConsumer consumer = createKafkaConsumer(topic, subscription); + moveOffsets( + subscription, consumer, buildOffsetsMetadata(consumer.endOffsets(consumer.assignment()))); + consumer.close(); + } - KafkaConsumer consumer = kafkaConsumerManager.createConsumer(subscription); - String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString(); - Set topicPartitions = getTopicPartitions(consumer, kafkaTopicName); - consumer.assign(topicPartitions); + public void moveOffsets( + Topic topic, SubscriptionName subscription, List offsets) { - Map endOffsets = consumer.endOffsets(topicPartitions); - Map endOffsetsMetadata = buildOffsetsMetadata(endOffsets); - consumer.commitSync(endOffsetsMetadata); + KafkaConsumer consumer = createKafkaConsumer(topic, subscription); + moveOffsets(subscription, consumer, buildOffsetsMetadata(offsets)); consumer.close(); + } + + private void moveOffsets( + SubscriptionName subscription, + KafkaConsumer consumer, + Map offsetAndMetadata) { + consumer.commitSync(offsetAndMetadata); logger.info( "Successfully moved offset to the end position for subscription {} and consumer group {}", @@ -170,6 +182,15 @@ public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) { kafkaNamesMapper.toConsumerGroupId(subscription)); } + private KafkaConsumer createKafkaConsumer( + Topic topic, SubscriptionName subscription) { + KafkaConsumer consumer = kafkaConsumerManager.createConsumer(subscription); + String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString(); + Set topicPartitions = getTopicPartitions(consumer, kafkaTopicName); + consumer.assign(topicPartitions); + return consumer; + } + private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds) throws ExecutionException, InterruptedException { Collection consumerGroupsDescriptions = @@ -182,11 +203,32 @@ private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds .size(); } - private void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) { + public void validateIfOffsetsCanBeMovedByConsumers(Topic topic, SubscriptionName subscription) { + describeConsumerGroup(topic, subscription.getName()) + .ifPresentOrElse( + group -> { + if (!group.isStable()) { + String s = + format( + "Consumer group %s for subscription %s is not stable.", + group.getGroupId(), subscription.getQualifiedName()); + throw new MovingSubscriptionOffsetsValidationException(s); + } + }, + () -> { + String s = + format( + "No consumer group for subscription %s exists.", + subscription.getQualifiedName()); + throw new MovingSubscriptionOffsetsValidationException(s); + }); + } + + public void validateIfOffsetsCanBeMoved(Topic topic, SubscriptionName subscription) { describeConsumerGroup(topic, subscription.getName()) .ifPresentOrElse( group -> { - if (!group.getMembers().isEmpty()) { + if (!group.isEmpty()) { String s = format( "Consumer group %s for subscription %s has still active members.", @@ -228,4 +270,15 @@ private Map buildOffsetsMetadata( .map(entry -> ImmutablePair.of(entry.getKey(), new OffsetAndMetadata(entry.getValue()))) .collect(toMap(Pair::getKey, Pair::getValue)); } + + private Map buildOffsetsMetadata( + List offsets) { + return offsets.stream() + .map( + offset -> + ImmutablePair.of( + new TopicPartition(offset.getTopic().asString(), offset.getPartition()), + new OffsetAndMetadata(offset.getOffset()))) + .collect(toMap(Pair::getKey, Pair::getValue)); + } } diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java index d805d639c7..5034069393 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java @@ -37,31 +37,21 @@ public KafkaRetransmissionService( } @Override - public List indicateOffsetChange( - Topic topic, String subscription, String brokersClusterName, long timestamp, boolean dryRun) { - - List partitionOffsetList = new ArrayList<>(); + public void indicateOffsetChange( + Topic topic, + String subscription, + String brokersClusterName, + List partitionOffsets) { kafkaNamesMapper .toKafkaTopics(topic) .forEach( k -> { - List partitionsIds = brokerStorage.readPartitionsIds(k.name().asString()); - - for (Integer partitionId : partitionsIds) { - KafkaConsumer consumer = createKafkaConsumer(k, partitionId); - long offset = - findClosestOffsetJustBeforeTimestamp(consumer, k, partitionId, timestamp); - PartitionOffset partitionOffset = - new PartitionOffset(k.name(), offset, partitionId); - partitionOffsetList.add(partitionOffset); - if (!dryRun) { - subscriptionOffsetChange.setSubscriptionOffset( - topic.getName(), subscription, brokersClusterName, partitionOffset); - } + for (PartitionOffset partitionOffset : partitionOffsets) { + if (!k.name().equals(partitionOffset.getTopic())) continue; + subscriptionOffsetChange.setSubscriptionOffset( + topic.getName(), subscription, brokersClusterName, partitionOffset); } }); - - return partitionOffsetList; } @Override @@ -81,6 +71,26 @@ private KafkaConsumer createKafkaConsumer(KafkaTopic kafkaTopic, return consumerPool.get(kafkaTopic, partition); } + public List fetchTopicOffsetsAt(Topic topic, Long timestamp) { + List partitionOffsetList = new ArrayList<>(); + kafkaNamesMapper + .toKafkaTopics(topic) + .forEach( + k -> { + List partitionsIds = brokerStorage.readPartitionsIds(k.name().asString()); + for (Integer partitionId : partitionsIds) { + KafkaConsumer consumer = createKafkaConsumer(k, partitionId); + long offset = + findClosestOffsetJustBeforeTimestamp(consumer, k, partitionId, timestamp); + PartitionOffset partitionOffset = + new PartitionOffset(k.name(), offset, partitionId); + partitionOffsetList.add(partitionOffset); + } + }); + + return partitionOffsetList; + } + private long findClosestOffsetJustBeforeTimestamp( KafkaConsumer consumer, KafkaTopic kafkaTopic, diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java index 6220c31f3b..c1ca4a702d 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/OffsetNotFoundException.java @@ -5,9 +5,9 @@ import pl.allegro.tech.hermes.api.ErrorCode; import pl.allegro.tech.hermes.management.domain.ManagementException; -class OffsetNotFoundException extends ManagementException { +public class OffsetNotFoundException extends ManagementException { - OffsetNotFoundException(String message) { + public OffsetNotFoundException(String message) { super(message); } diff --git a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java index c9a8293b0b..12e3334f6f 100644 --- a/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java +++ b/hermes-test-helper/src/main/java/pl/allegro/tech/hermes/test/helper/client/integration/HermesTestClient.java @@ -119,6 +119,13 @@ public WebTestClient.ResponseSpec suspendSubscription(Topic topic, String subscr .is2xxSuccessful(); } + public WebTestClient.ResponseSpec activateSubscription(Topic topic, String subscription) { + return managementTestClient + .updateSubscriptionState(topic, subscription, Subscription.State.ACTIVE) + .expectStatus() + .is2xxSuccessful(); + } + public void waitUntilSubscriptionActivated(String topicQualifiedName, String subscriptionName) { waitAtMost(Duration.ofSeconds(10)) .untilAsserted( diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java index 94c7eb8f43..77a2b9c93f 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/KafkaRetransmissionServiceTest.java @@ -14,6 +14,8 @@ import java.util.stream.Collectors; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.test.web.reactive.server.WebTestClient; import pl.allegro.tech.hermes.api.ContentType; import pl.allegro.tech.hermes.api.OffsetRetransmissionDate; @@ -53,8 +55,10 @@ public class KafkaRetransmissionServiceTest { @RegisterExtension public static final TestSubscribersExtension subscribers = new TestSubscribersExtension(); - @Test - public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void shouldMoveOffsetNearGivenTimestamp(boolean suspendedSubscription) + throws InterruptedException { // given final TestSubscriber subscriber = subscribers.createSubscriber(); final Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build()); @@ -71,6 +75,11 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { publishAndConsumeMessages(messages2, topic, subscriber); hermes.api().waitUntilConsumerCommitsOffset(topic.getQualifiedName(), subscription.getName()); + if (suspendedSubscription) { + hermes.api().suspendSubscription(topic, subscription.getName()); + hermes.api().waitUntilSubscriptionSuspended(topic.getQualifiedName(), subscription.getName()); + } + // when WebTestClient.ResponseSpec response = hermes @@ -78,6 +87,11 @@ public void shouldMoveOffsetNearGivenTimestamp() throws InterruptedException { .retransmit( topic.getQualifiedName(), subscription.getName(), retransmissionDate, false); + if (suspendedSubscription) { + hermes.api().activateSubscription(topic, subscription.getName()); + hermes.api().waitUntilSubscriptionActivated(topic.getQualifiedName(), subscription.getName()); + } + // then response.expectStatus().isOk(); messages2.forEach(subscriber::waitUntilReceived);