Skip to content

Commit

Permalink
SKYEDEN-3020 | Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcinBobinski committed Dec 16, 2024
1 parent 04857b0 commit af7732b
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.BrokersClusterService;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaBrokerTopicManagement;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerGroupManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaConsumerManager;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaRawMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.KafkaSingleMessageReader;
import pl.allegro.tech.hermes.management.infrastructure.kafka.service.LogEndOffsetChecker;
Expand Down Expand Up @@ -112,8 +111,7 @@ MultiDCAwareService multiDCAwareService(
new OffsetsAvailableChecker(consumerPool, storage),
new LogEndOffsetChecker(consumerPool),
brokerAdminClient,
createConsumerGroupManager(kafkaProperties, kafkaNamesMapper),
createKafkaConsumerManager(kafkaProperties, kafkaNamesMapper));
createConsumerGroupManager(kafkaProperties, kafkaNamesMapper));
})
.collect(toList());

Expand All @@ -136,12 +134,6 @@ private ConsumerGroupManager createConsumerGroupManager(
: new NoOpConsumerGroupManager();
}

private KafkaConsumerManager createKafkaConsumerManager(
KafkaProperties kafkaProperties, KafkaNamesMapper kafkaNamesMapper) {
return new KafkaConsumerManager(
kafkaProperties, kafkaNamesMapper, kafkaProperties.getBrokerList());
}

private SubscriptionOffsetChangeIndicator getRepository(
List<DatacenterBoundRepositoryHolder<SubscriptionOffsetChangeIndicator>> repositories,
KafkaProperties kafkaProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ public interface RetransmissionService {

List<PartitionOffset> fetchTopicOffsetsAt(Topic topic, Long timestamp);

List<PartitionOffset> fetchTopicEndOffsets(Topic topic);

void indicateOffsetChange(
Topic topic,
String subscription,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,14 +475,14 @@ public MultiDCOffsetChangeSummary retransmit(

switch (subscription.getState()) {
case ACTIVE:
multiDCAwareService.consumerRetransmission(
multiDCAwareService.moveOffsetsForActiveConsumers(
topic,
subscriptionName,
multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName(),
requester);
break;
case SUSPENDED:
multiDCAwareService.retransmit(
multiDCAwareService.moveOffsets(
topic,
subscriptionName,
multiDCOffsetChangeSummary.getPartitionOffsetListPerBrokerName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void waitUntilAllSubscriptionsHasConsumersAssigned(

private void notifySingleSubscription(
Topic topic, Instant beforeMigrationInstant, String subscriptionName, RequestUser requester) {
multiDCAwareService.consumerRetransmission(
multiDCAwareService.moveOffsetsForActiveConsumers(
topic,
subscriptionName,
multiDCAwareService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ public MultiDCOffsetChangeSummary fetchTopicOffsetsAt(Topic topic, Long timestam
return multiDCOffsetChangeSummary;
}

public void retransmit(
public void moveOffsets(
Topic topic,
String subscriptionName,
Map<String, List<PartitionOffset>> brokerPartitionOffsets) {

clusters.forEach(
cluster ->
cluster.validateIfOffsetsCanBeMoved(
Expand All @@ -89,18 +88,16 @@ public void retransmit(
clusters.forEach(
cluster ->
cluster.moveOffsets(
topic,
new SubscriptionName(subscriptionName, topic.getName()),
brokerPartitionOffsets.getOrDefault(
cluster.getClusterName(), Collections.emptyList())));
}

public void consumerRetransmission(
public void moveOffsetsForActiveConsumers(
Topic topic,
String subscriptionName,
Map<String, List<PartitionOffset>> brokerPartitionOffsets,
RequestUser requester) {

clusters.forEach(
cluster ->
cluster.validateIfOffsetsCanBeMovedByConsumers(
Expand All @@ -115,7 +112,7 @@ public void consumerRetransmission(
cluster.getClusterName(), Collections.emptyList())));

logger.info(
"Starting retransmission for subscription {}. Requested by {}. Retransmission offsets: {}",
"Starting offsets move for subscription {}. Requested by {}. Retransmission offsets: {}",
topic.getQualifiedName() + "$" + subscriptionName,
requester.getUsername(),
brokerPartitionOffsets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.common.kafka.ConsumerGroupId;
import pl.allegro.tech.hermes.common.kafka.KafkaNamesMapper;
import pl.allegro.tech.hermes.common.kafka.offset.PartitionOffset;
import pl.allegro.tech.hermes.management.domain.message.RetransmissionService;
Expand All @@ -50,7 +51,6 @@ public class BrokersClusterService {
private final ConsumerGroupsDescriber consumerGroupsDescriber;
private final AdminClient adminClient;
private final ConsumerGroupManager consumerGroupManager;
private final KafkaConsumerManager kafkaConsumerManager;

public BrokersClusterService(
String clusterName,
Expand All @@ -61,8 +61,7 @@ public BrokersClusterService(
OffsetsAvailableChecker offsetsAvailableChecker,
LogEndOffsetChecker logEndOffsetChecker,
AdminClient adminClient,
ConsumerGroupManager consumerGroupManager,
KafkaConsumerManager kafkaConsumerManager) {
ConsumerGroupManager consumerGroupManager) {
this.clusterName = clusterName;
this.singleMessageReader = singleMessageReader;
this.retransmissionService = retransmissionService;
Expand All @@ -74,7 +73,6 @@ public BrokersClusterService(
kafkaNamesMapper, adminClient, logEndOffsetChecker, clusterName);
this.adminClient = adminClient;
this.consumerGroupManager = consumerGroupManager;
this.kafkaConsumerManager = kafkaConsumerManager;
}

public String getClusterName() {
Expand Down Expand Up @@ -156,39 +154,24 @@ public Optional<ConsumerGroup> describeConsumerGroup(Topic topic, String subscri
}

public void moveOffsetsToTheEnd(Topic topic, SubscriptionName subscription) {
KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(topic, subscription);
moveOffsets(
subscription, consumer, buildOffsetsMetadata(consumer.endOffsets(consumer.assignment())));
consumer.close();
List<PartitionOffset> endOffsets = retransmissionService.fetchTopicEndOffsets(topic);
moveOffsets(subscription, buildOffsetsMetadata(endOffsets));
}

public void moveOffsets(
Topic topic, SubscriptionName subscription, List<PartitionOffset> offsets) {

KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(topic, subscription);
moveOffsets(subscription, consumer, buildOffsetsMetadata(offsets));
consumer.close();
public void moveOffsets(SubscriptionName subscription, List<PartitionOffset> offsets) {
moveOffsets(subscription, buildOffsetsMetadata(offsets));
}

private void moveOffsets(
SubscriptionName subscription,
KafkaConsumer<byte[], byte[]> consumer,
Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata) {
consumer.commitSync(offsetAndMetadata);
SubscriptionName subscription, Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata) {
ConsumerGroupId consumerGroupId = kafkaNamesMapper.toConsumerGroupId(subscription);
adminClient.alterConsumerGroupOffsets(consumerGroupId.asString(), offsetAndMetadata).all();

logger.info(
"Successfully moved offset to the end position for subscription {} and consumer group {}",
"Successfully moved offsets for subscription {} and consumer group {} to {}",
subscription.getQualifiedName(),
kafkaNamesMapper.toConsumerGroupId(subscription));
}

private KafkaConsumer<byte[], byte[]> createKafkaConsumer(
Topic topic, SubscriptionName subscription) {
KafkaConsumer<byte[], byte[]> consumer = kafkaConsumerManager.createConsumer(subscription);
String kafkaTopicName = kafkaNamesMapper.toKafkaTopics(topic).getPrimary().name().asString();
Set<TopicPartition> topicPartitions = getTopicPartitions(consumer, kafkaTopicName);
consumer.assign(topicPartitions);
return consumer;
kafkaNamesMapper.toConsumerGroupId(subscription),
offsetAndMetadata.toString());
}

private int numberOfAssignmentsForConsumersGroups(List<String> consumerGroupsIds)
Expand Down Expand Up @@ -264,13 +247,6 @@ private Set<TopicPartition> getTopicPartitions(
.collect(toSet());
}

private Map<TopicPartition, OffsetAndMetadata> buildOffsetsMetadata(
Map<TopicPartition, Long> offsets) {
return offsets.entrySet().stream()
.map(entry -> ImmutablePair.of(entry.getKey(), new OffsetAndMetadata(entry.getValue())))
.collect(toMap(Pair::getKey, Pair::getValue));
}

private Map<TopicPartition, OffsetAndMetadata> buildOffsetsMetadata(
List<PartitionOffset> offsets) {
return offsets.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,10 @@ public void indicateOffsetChange(
String subscription,
String brokersClusterName,
List<PartitionOffset> partitionOffsets) {
kafkaNamesMapper
.toKafkaTopics(topic)
.forEach(
k -> {
for (PartitionOffset partitionOffset : partitionOffsets) {
if (!k.name().equals(partitionOffset.getTopic())) continue;
subscriptionOffsetChange.setSubscriptionOffset(
topic.getName(), subscription, brokersClusterName, partitionOffset);
}
});
for (PartitionOffset partitionOffset : partitionOffsets) {
subscriptionOffsetChange.setSubscriptionOffset(
topic.getName(), subscription, brokersClusterName, partitionOffset);
}
}

@Override
Expand All @@ -71,6 +65,10 @@ private KafkaConsumer<byte[], byte[]> createKafkaConsumer(KafkaTopic kafkaTopic,
return consumerPool.get(kafkaTopic, partition);
}

public List<PartitionOffset> fetchTopicEndOffsets(Topic topic) {
return fetchTopicOffsetsAt(topic, null);
}

public List<PartitionOffset> fetchTopicOffsetsAt(Topic topic, Long timestamp) {
List<PartitionOffset> partitionOffsetList = new ArrayList<>();
kafkaNamesMapper
Expand All @@ -80,8 +78,7 @@ public List<PartitionOffset> fetchTopicOffsetsAt(Topic topic, Long timestamp) {
List<Integer> partitionsIds = brokerStorage.readPartitionsIds(k.name().asString());
for (Integer partitionId : partitionsIds) {
KafkaConsumer<byte[], byte[]> consumer = createKafkaConsumer(k, partitionId);
long offset =
findClosestOffsetJustBeforeTimestamp(consumer, k, partitionId, timestamp);
long offset = getOffsetForTimestampOrEnd(timestamp, k, partitionId, consumer);
PartitionOffset partitionOffset =
new PartitionOffset(k.name(), offset, partitionId);
partitionOffsetList.add(partitionOffset);
Expand All @@ -91,19 +88,28 @@ public List<PartitionOffset> fetchTopicOffsetsAt(Topic topic, Long timestamp) {
return partitionOffsetList;
}

private long findClosestOffsetJustBeforeTimestamp(
private long getOffsetForTimestampOrEnd(
Long timestamp,
KafkaTopic kafkaTopic,
Integer partitionId,
KafkaConsumer<byte[], byte[]> consumer) {
long endOffset = getEndingOffset(consumer, kafkaTopic, partitionId);
return Optional.ofNullable(timestamp)
.flatMap(ts -> findClosestOffsetJustBeforeTimestamp(consumer, kafkaTopic, partitionId, ts))
.orElse(endOffset);
}

private Optional<Long> findClosestOffsetJustBeforeTimestamp(
KafkaConsumer<byte[], byte[]> consumer,
KafkaTopic kafkaTopic,
int partition,
long timestamp) {
long endOffset = getEndingOffset(consumer, kafkaTopic, partition);
TopicPartition topicPartition = new TopicPartition(kafkaTopic.name().asString(), partition);
return Optional.ofNullable(
consumer
.offsetsForTimes(Collections.singletonMap(topicPartition, timestamp))
.get(topicPartition))
.orElse(new OffsetAndTimestamp(endOffset, timestamp))
.offset();
.map(OffsetAndTimestamp::offset);
}

private long getEndingOffset(
Expand Down

0 comments on commit af7732b

Please sign in to comment.