diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java index a09e205432..bf4434786c 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/kafka/KafkaConfiguration.java @@ -37,7 +37,6 @@ import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerGroupManager; -import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerManager; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader; import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker; @@ -112,8 +111,7 @@ MultiDCAwareService multiDCAwareService( new OffsetsAvailableChecker(consumerPool, storage), new LogEndOffsetChecker(consumerPool), brokerAdminClient, - createConsumerGroupManager(kafkaProperties, kafkaNamesMapper), - createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper)); + createConsumerGroupManager(kafkaProperties, kafkaNamesMapper)); }) .collect(toList()); @@ -136,12 +134,6 @@ private ConsumerGroupManager createConsumerGroupManager( : new NoOpConsumerGroupManager(); } - private KafkaConsumerManager createKafkaConsumerManager( - KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) { - return new KafkaConsumerManager( - kafkaProperties, kafkaNamesMapper, kafkaProperties.getBrokerList()); - } - private SubscriptionOffsetChangeIndicator getRepository( List> repositories, KafkaProperties kafkaProperties) { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java index b180077df1..76e8e64546 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/message/RetransmissionService.java @@ -8,6 +8,8 @@ public interface RetransmissionService { List fetchTopicOffsetsAt(Topic topic, Long timestamp); + List fetchTopicEndOffsets(Topic topic); + void indicateOffsetChange( Topic topic, String subscription, diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java index a6fef68a1d..020dc3eb37 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/subscription/SubscriptionService.java @@ -475,14 +475,14 @@ public MultiDCOffsetChangeSummary retransmit( switch (subscription.getState()) { case ACTIVE: - multiDCAwareService.consumerRetransmission( + multiDCAwareService.moveOffsetsForActiveConsumers( topic, subscriptionName, multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName(), requester); break; case SUSPENDED: - multiDCAwareService.retransmit( + multiDCAwareService.moveOffsets( topic, subscriptionName, multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName()); diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java index 4ac5c59812..1ab6675422 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/topic/TopicContentTypeMigrationService.java @@ -63,7 +63,7 @@ void waitUntilAllSubscriptionsHasConsumersAssigned( private void notifySingleSubscription( Topic topic, Instant beforeMigrationInstant, String subscriptionName, RequestUser requester) { - multiDCAwareService.consumerRetransmission( + multiDCAwareService.moveOffsetsForActiveConsumers( topic, subscriptionName, multiDCAwareService diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java index 4e321b22f6..0813cb7239 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/MultiDCAwareService.java @@ -76,11 +76,10 @@ public MultiDCOffsetChangeSummary fetchTopicOffsetsAt(Topic topic, Long timestam return multiDCOffsetChangeSummary; } - public void retransmit( + public void moveOffsets( Topic topic, String subscriptionName, Map> brokerPartitionOffsets) { - clusters.forEach( cluster -> cluster.validateIfOffsetsCanBeMoved( @@ -89,18 +88,16 @@ public void retransmit( clusters.forEach( cluster -> cluster.moveOffsets( - topic, new SubscriptionName(subscriptionName, topic.getName()), brokerPartitionOffsets.getOrDefault( cluster.getClusterName(), Collections.emptyList()))); } - public void consumerRetransmission( + public void moveOffsetsForActiveConsumers( Topic topic, String subscriptionName, Map> brokerPartitionOffsets, RequestUser requester) { - clusters.forEach( cluster -> cluster.validateIfOffsetsCanBeMovedByConsumers( @@ -115,7 +112,7 @@ public void consumerRetransmission( cluster.getClusterName(), Collections.emptyList()))); logger.info( - "Starting retransmission for subscription {}. Requested by {}. Retransmission offsets: {}", + "Starting offsets move for subscription {}. Requested by {}. Retransmission offsets: {}", topic.getQualifiedName() + "$" + subscriptionName, requester.getUsername(), brokerPartitionOffsets); diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java index 9d99f5bf04..c8f45ddf4e 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/BrokersClusterService.java @@ -29,6 +29,7 @@ import pl.allegro.tech.hermes.api.Subscription; import pl.allegro.tech.hermes.api.SubscriptionName; import pl.allegro.tech.hermes.api.Topic; +import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId; import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper; import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset; import pl.allegro.tech.hermes.management.domain.message.RetransmissionService; @@ -50,7 +51,6 @@ public class BrokersClusterService { private final ConsumerGroupsDescriber consumerGroupsDescriber; private final AdminClient adminClient; private final ConsumerGroupManager consumerGroupManager; - private final KafkaConsumerManager kafkaConsumerManager; public BrokersClusterService( String clusterName, @@ -61,8 +61,7 @@ public BrokersClusterService( OffsetsAvailableChecker offsetsAvailableChecker, LogEndOffsetChecker logEndOffsetChecker, AdminClient adminClient, - ConsumerGroupManager consumerGroupManager, - KafkaConsumerManager kafkaConsumerManager) { + ConsumerGroupManager consumerGroupManager) { this.clusterName = clusterName; this.singleMessageReader = singleMessageReader; this.retransmissionService = retransmissionService; @@ -74,7 +73,6 @@ public BrokersClusterService( kafkaNamesMapper, adminClient, logEndOffsetChecker, clusterName); this.adminClient = adminClient; this.consumerGroupManager = consumerGroupManager; - this.kafkaConsumerManager = kafkaConsumerManager; } public String getClusterName() { @@ -156,39 +154,24 @@ public Optional describeConsumerGroup(Topic topic, String subscri } public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) { - KafkaConsumer consumer = createKafkaConsumer(topic, subscription); - moveOffsets( - subscription, consumer, buildOffsetsMetadata(consumer.endOffsets(consumer.assignment()))); - consumer.close(); + List endOffsets = retransmissionService.fetchTopicEndOffsets(topic); + moveOffsets(subscription, buildOffsetsMetadata(endOffsets)); } - public void moveOffsets( - Topic topic, SubscriptionName subscription, List offsets) { - - KafkaConsumer consumer = createKafkaConsumer(topic, subscription); - moveOffsets(subscription, consumer, buildOffsetsMetadata(offsets)); - consumer.close(); + public void moveOffsets(SubscriptionName subscription, List offsets) { + moveOffsets(subscription, buildOffsetsMetadata(offsets)); } private void moveOffsets( - SubscriptionName subscription, - KafkaConsumer consumer, - Map offsetAndMetadata) { - consumer.commitSync(offsetAndMetadata); + SubscriptionName subscription, Map offsetAndMetadata) { + ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription); + adminClient.alterConsumerGroupOffsets(consumerGroupId.asString(), offsetAndMetadata).all(); logger.info( - "Successfully moved offset to the end position for subscription {} and consumer group {}", + "Successfully moved offsets for subscription {} and consumer group {} to {}", subscription.getQualifiedName(), - kafkaNamesMapper.toConsumerGroupId(subscription)); - } - - private KafkaConsumer createKafkaConsumer( - Topic topic, SubscriptionName subscription) { - KafkaConsumer consumer = kafkaConsumerManager.createConsumer(subscription); - String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString(); - Set topicPartitions = getTopicPartitions(consumer, kafkaTopicName); - consumer.assign(topicPartitions); - return consumer; + kafkaNamesMapper.toConsumerGroupId(subscription), + offsetAndMetadata.toString()); } private int numberOfAssignmentsForConsumersGroups(List consumerGroupsIds) @@ -264,13 +247,6 @@ private Set getTopicPartitions( .collect(toSet()); } - private Map buildOffsetsMetadata( - Map offsets) { - return offsets.entrySet().stream() - .map(entry -> ImmutablePair.of(entry.getKey(), new OffsetAndMetadata(entry.getValue()))) - .collect(toMap(Pair::getKey, Pair::getValue)); - } - private Map buildOffsetsMetadata( List offsets) { return offsets.stream() diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java index 5034069393..8c8cc04182 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/kafka/service/retransmit/KafkaRetransmissionService.java @@ -42,16 +42,10 @@ public void indicateOffsetChange( String subscription, String brokersClusterName, List partitionOffsets) { - kafkaNamesMapper - .toKafkaTopics(topic) - .forEach( - k -> { - for (PartitionOffset partitionOffset : partitionOffsets) { - if (!k.name().equals(partitionOffset.getTopic())) continue; - subscriptionOffsetChange.setSubscriptionOffset( - topic.getName(), subscription, brokersClusterName, partitionOffset); - } - }); + for (PartitionOffset partitionOffset : partitionOffsets) { + subscriptionOffsetChange.setSubscriptionOffset( + topic.getName(), subscription, brokersClusterName, partitionOffset); + } } @Override @@ -71,6 +65,10 @@ private KafkaConsumer createKafkaConsumer(KafkaTopic kafkaTopic, return consumerPool.get(kafkaTopic, partition); } + public List fetchTopicEndOffsets(Topic topic) { + return fetchTopicOffsetsAt(topic, null); + } + public List fetchTopicOffsetsAt(Topic topic, Long timestamp) { List partitionOffsetList = new ArrayList<>(); kafkaNamesMapper @@ -80,8 +78,7 @@ public List fetchTopicOffsetsAt(Topic topic, Long timestamp) { List partitionsIds = brokerStorage.readPartitionsIds(k.name().asString()); for (Integer partitionId : partitionsIds) { KafkaConsumer consumer = createKafkaConsumer(k, partitionId); - long offset = - findClosestOffsetJustBeforeTimestamp(consumer, k, partitionId, timestamp); + long offset = getOffsetForTimestampOrEnd(timestamp, k, partitionId, consumer); PartitionOffset partitionOffset = new PartitionOffset(k.name(), offset, partitionId); partitionOffsetList.add(partitionOffset); @@ -91,19 +88,28 @@ public List fetchTopicOffsetsAt(Topic topic, Long timestamp) { return partitionOffsetList; } - private long findClosestOffsetJustBeforeTimestamp( + private long getOffsetForTimestampOrEnd( + Long timestamp, + KafkaTopic kafkaTopic, + Integer partitionId, + KafkaConsumer consumer) { + long endOffset = getEndingOffset(consumer, kafkaTopic, partitionId); + return Optional.ofNullable(timestamp) + .flatMap(ts -> findClosestOffsetJustBeforeTimestamp(consumer, kafkaTopic, partitionId, ts)) + .orElse(endOffset); + } + + private Optional findClosestOffsetJustBeforeTimestamp( KafkaConsumer consumer, KafkaTopic kafkaTopic, int partition, long timestamp) { - long endOffset = getEndingOffset(consumer, kafkaTopic, partition); TopicPartition topicPartition = new TopicPartition(kafkaTopic.name().asString(), partition); return Optional.ofNullable( consumer .offsetsForTimes(Collections.singletonMap(topicPartition, timestamp)) .get(topicPartition)) - .orElse(new OffsetAndTimestamp(endOffset, timestamp)) - .offset(); + .map(OffsetAndTimestamp::offset); } private long getEndingOffset(