Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into SKYEDEN-3271-KafkaC…
Browse files Browse the repository at this point in the history
…onumerGroupDeletion-V2

# Conflicts:
#	hermes-common/src/main/java/pl/allegro/tech/hermes/infrastructure/zookeeper/ZookeeperPaths.java
#	hermes-management/src/main/java/pl/allegro/tech/hermes/management/infrastructure/zookeeper/ZookeeperRepositoryManager.java
#	hermes-management/src/main/resources/application.yaml
  • Loading branch information
MarcinBobinski committed Dec 20, 2024
2 parents 6391fbb + ffc4e39 commit 9f97071
Show file tree
Hide file tree
Showing 28 changed files with 1,301 additions and 10 deletions.
24 changes: 24 additions & 0 deletions docs/docs/configuration/inactive-topics-detection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Inactive Topics Detection

Hermes Management provides an optional feature to detect inactive topics and
notify about them. This feature is **disabled by default**. You can enable it
and configure other options in the Hermes Management configuration.

Option | Description | Default value
-------------------------------------------------------------|----------------------------------------------------------------------------|---------------
detection.inactive-topics.enabled | enable inactive topics detection | false
detection.inactive-topics.inactivity-threshold | duration after which a topic is considered inactive and first notified | 60d
detection.inactive-topics.next-notification-threshold | duration after previous notification after which a topic is notified again | 14d
detection.inactive-topics.whitelisted-qualified-topic-names | list of qualified topic names that will not be notified event if inactive | []
detection.inactive-topics.cron | cron expression for the detection job | 0 0 8 * * *
detection.inactive-topics.notifications-history-limit | how many notification timestamps will be kept in history | 5

The detection job runs on a single instance of Hermes Management that is a
leader based on the leader election Zookeeper instance.

Option | Description | Default Value
------------------------------------|-----------------------------------------------------------------------------|---------------
management.leadership.zookeeper-dc | Specifies the datacenter of the Zookeeper instance used for leader election | dc

To make notifying work, you need to provide an implementation of
`pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsNotifier`
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ public class ZookeeperPaths {
public static final String DATACENTER_READINESS_PATH = "datacenter-readiness";
public static final String OFFLINE_RETRANSMISSION_PATH = "offline-retransmission";
public static final String OFFLINE_RETRANSMISSION_TASKS_PATH = "tasks";
public static final String CONSUMER_GROUP_TO_DELETE = "consumer-group-to-delete";
public static final String CONSUMER_GROUP_TO_DELETE_TASKS = "tasks";
public static final String INACTIVE_TOPICS_PATH = "inactive-topics";
public static final String MANAGEMENT_PATH = "management";
public static final String MANAGEMENT_PATH_LEADER = "leader";
public static final String CONSUMER_GROUP_TO_DELETE = "consumer-group-to-delete";
public static final String CONSUMER_GROUP_TO_DELETE_TASKS = "tasks";

private final String basePath;

Expand Down Expand Up @@ -186,6 +187,14 @@ public String offlineRetransmissionPath(String taskId) {
.join(basePath, OFFLINE_RETRANSMISSION_PATH, OFFLINE_RETRANSMISSION_TASKS_PATH, taskId);
}

public String inactiveTopicsPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, INACTIVE_TOPICS_PATH);
}

public String managementLeaderPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, MANAGEMENT_PATH, MANAGEMENT_PATH_LEADER);
}

public String consumerGroupToDeletePath() {
return Joiner.on(URL_SEPARATOR)
.join(basePath, CONSUMER_GROUP_TO_DELETE, CONSUMER_GROUP_TO_DELETE_TASKS);
Expand All @@ -196,10 +205,6 @@ public String consumerGroupToDeletePath(String taskId) {
.join(basePath, CONSUMER_GROUP_TO_DELETE, CONSUMER_GROUP_TO_DELETE_TASKS, taskId);
}

public String managementLeaderPath() {
return Joiner.on(URL_SEPARATOR).join(basePath, MANAGEMENT_PATH, MANAGEMENT_PATH_LEADER);
}

public String join(String... parts) {
return Joiner.on(URL_SEPARATOR).join(parts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
HttpClientProperties.class,
ConsistencyCheckerProperties.class,
PrometheusProperties.class,
MicrometerRegistryProperties.class
MicrometerRegistryProperties.class,
})
public class ManagementConfiguration {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package pl.allegro.tech.hermes.management.config.detection;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsDetectionJob;
import pl.allegro.tech.hermes.management.infrastructure.detection.InactiveTopicsDetectionScheduler;
import pl.allegro.tech.hermes.management.infrastructure.leader.ManagementLeadership;

@Configuration
@EnableConfigurationProperties(InactiveTopicsDetectionProperties.class)
@EnableScheduling
public class InactiveTopicsDetectionConfig {
@ConditionalOnProperty(
prefix = "detection.inactive-topics",
value = "enabled",
havingValue = "true")
@Bean
InactiveTopicsDetectionScheduler inactiveTopicsDetectionScheduler(
InactiveTopicsDetectionJob job, ManagementLeadership leader) {
return new InactiveTopicsDetectionScheduler(job, leader);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package pl.allegro.tech.hermes.management.config.detection;

import java.time.Duration;
import java.util.Set;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "detection.inactive-topics")
public record InactiveTopicsDetectionProperties(
Duration inactivityThreshold,
Duration nextNotificationThreshold,
Set<String> whitelistedQualifiedTopicNames,
int notificationsHistoryLimit) {}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperWorkloadConstraintsRepository;
import pl.allegro.tech.hermes.management.domain.blacklist.TopicBlacklistRepository;
import pl.allegro.tech.hermes.management.domain.dc.MultiDatacenterRepositoryCommandExecutor;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsRepository;
import pl.allegro.tech.hermes.management.domain.mode.ModeService;
import pl.allegro.tech.hermes.management.domain.readiness.DatacenterReadinessRepository;
import pl.allegro.tech.hermes.management.domain.retransmit.OfflineRetransmissionRepository;
import pl.allegro.tech.hermes.management.infrastructure.blacklist.ZookeeperTopicBlacklistRepository;
import pl.allegro.tech.hermes.management.infrastructure.detection.ZookeeperInactiveTopicsRepository;
import pl.allegro.tech.hermes.management.infrastructure.metrics.SummedSharedCounter;
import pl.allegro.tech.hermes.management.infrastructure.readiness.ZookeeperDatacenterReadinessRepository;
import pl.allegro.tech.hermes.management.infrastructure.retransmit.ZookeeperOfflineRetransmissionRepository;
Expand Down Expand Up @@ -177,4 +179,11 @@ DatacenterReadinessRepository readinessRepository() {
return new ZookeeperDatacenterReadinessRepository(
localClient.getCuratorFramework(), objectMapper, zookeeperPaths());
}

@Bean
InactiveTopicsRepository inactiveTopicsRepository() {
ZookeeperClient localClient = clientManager().getLocalClient();
return new ZookeeperInactiveTopicsRepository(
localClient.getCuratorFramework(), objectMapper, zookeeperPaths());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package pl.allegro.tech.hermes.management.domain.detection;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;

public record InactiveTopic(
@JsonProperty("topic") String qualifiedTopicName,
@JsonProperty("lastPublishedTsMs") long lastPublishedMessageTimestampMs,
@JsonProperty("notificationTsMs") List<Long> notificationTimestampsMs,
@JsonProperty("whitelisted") boolean whitelisted) {

InactiveTopic notificationSent(Instant timestamp) {
List<Long> newNotificationTimestampsMs = new ArrayList<>(notificationTimestampsMs);
newNotificationTimestampsMs.add(timestamp.toEpochMilli());
return new InactiveTopic(
this.qualifiedTopicName,
this.lastPublishedMessageTimestampMs,
newNotificationTimestampsMs,
this.whitelisted);
}

InactiveTopic limitNotificationsHistory(int limit) {
List<Long> newNotificationTimestampsMs =
notificationTimestampsMs.stream()
.sorted((a, b) -> Long.compare(b, a))
.limit(limit)
.toList();
return new InactiveTopic(
this.qualifiedTopicName,
this.lastPublishedMessageTimestampMs,
newNotificationTimestampsMs,
this.whitelisted);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package pl.allegro.tech.hermes.management.domain.detection;

import pl.allegro.tech.hermes.api.OwnerId;

public record InactiveTopicWithOwner(InactiveTopic topic, OwnerId ownerId) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package pl.allegro.tech.hermes.management.domain.detection;

import static java.util.stream.Collectors.groupingBy;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tags;
import java.time.Clock;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import pl.allegro.tech.hermes.api.OwnerId;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.api.TopicName;
import pl.allegro.tech.hermes.management.config.detection.InactiveTopicsDetectionProperties;
import pl.allegro.tech.hermes.management.domain.topic.TopicService;

@Component
public class InactiveTopicsDetectionJob {
private final TopicService topicService;
private final InactiveTopicsStorageService inactiveTopicsStorageService;
private final InactiveTopicsDetectionService inactiveTopicsDetectionService;
private final Optional<InactiveTopicsNotifier> notifier;
private final InactiveTopicsDetectionProperties properties;
private final Clock clock;
private final MeterRegistry meterRegistry;

private static final Logger logger = LoggerFactory.getLogger(InactiveTopicsDetectionJob.class);

public InactiveTopicsDetectionJob(
TopicService topicService,
InactiveTopicsStorageService inactiveTopicsStorageService,
InactiveTopicsDetectionService inactiveTopicsDetectionService,
Optional<InactiveTopicsNotifier> notifier,
InactiveTopicsDetectionProperties properties,
Clock clock,
MeterRegistry meterRegistry) {
this.topicService = topicService;
this.inactiveTopicsStorageService = inactiveTopicsStorageService;
this.inactiveTopicsDetectionService = inactiveTopicsDetectionService;
this.properties = properties;
this.clock = clock;
this.meterRegistry = meterRegistry;
if (notifier.isEmpty()) {
logger.info("Inactive topics notifier bean is absent");
}
this.notifier = notifier;
}

public void detectAndNotify() {
List<Topic> topics = topicService.getAllTopics();
List<String> qualifiedTopicNames = topics.stream().map(Topic::getQualifiedName).toList();
List<InactiveTopic> historicalInactiveTopics = inactiveTopicsStorageService.getInactiveTopics();
List<InactiveTopic> foundInactiveTopics =
detectInactiveTopics(qualifiedTopicNames, historicalInactiveTopics);

Map<Boolean, List<InactiveTopic>> groupedByNeedOfNotification =
foundInactiveTopics.stream()
.collect(groupingBy(inactiveTopicsDetectionService::shouldBeNotified));

List<InactiveTopic> topicsToNotify = groupedByNeedOfNotification.getOrDefault(true, List.of());
List<InactiveTopic> topicsToSkipNotification =
groupedByNeedOfNotification.getOrDefault(false, List.of());
List<InactiveTopic> notifiedTopics = notify(enrichWithOwner(topicsToNotify, topics));

List<InactiveTopic> processedTopics =
limitHistory(
Stream.concat(notifiedTopics.stream(), topicsToSkipNotification.stream()).toList());
measureInactiveTopics(processedTopics);
inactiveTopicsStorageService.markAsInactive(processedTopics);
}

private List<InactiveTopic> detectInactiveTopics(
List<String> qualifiedTopicNames, List<InactiveTopic> historicalInactiveTopics) {
Map<String, InactiveTopic> historicalInactiveTopicsByName =
groupByName(historicalInactiveTopics);
return qualifiedTopicNames.stream()
.map(
qualifiedTopicName ->
inactiveTopicsDetectionService.detectInactiveTopic(
TopicName.fromQualifiedName(qualifiedTopicName),
Optional.ofNullable(historicalInactiveTopicsByName.get(qualifiedTopicName))))
.map(opt -> opt.orElse(null))
.filter(Objects::nonNull)
.toList();
}

private Map<String, InactiveTopic> groupByName(List<InactiveTopic> inactiveTopics) {
return inactiveTopics.stream()
.collect(Collectors.toMap(InactiveTopic::qualifiedTopicName, v -> v, (v1, v2) -> v1));
}

private List<InactiveTopicWithOwner> enrichWithOwner(
List<InactiveTopic> inactiveTopics, List<Topic> topics) {
Map<String, OwnerId> ownerByTopicName = new HashMap<>();
topics.forEach(topic -> ownerByTopicName.put(topic.getQualifiedName(), topic.getOwner()));

return inactiveTopics.stream()
.map(
inactiveTopic ->
new InactiveTopicWithOwner(
inactiveTopic, ownerByTopicName.get(inactiveTopic.qualifiedTopicName())))
.toList();
}

private List<InactiveTopic> notify(List<InactiveTopicWithOwner> inactiveTopics) {
if (inactiveTopics.isEmpty()) {
logger.info("No inactive topics to notify");
return List.of();
} else if (notifier.isPresent()) {
logger.info("Notifying {} inactive topics", inactiveTopics.size());
NotificationResult result = notifier.get().notify(inactiveTopics);
Instant now = clock.instant();

return inactiveTopics.stream()
.map(InactiveTopicWithOwner::topic)
.map(
topic ->
result.isSuccess(topic.qualifiedTopicName())
? topic.notificationSent(now)
: topic)
.toList();
} else {
logger.info("Skipping notification of {} inactive topics", inactiveTopics.size());
return inactiveTopics.stream().map(InactiveTopicWithOwner::topic).toList();
}
}

private List<InactiveTopic> limitHistory(List<InactiveTopic> inactiveTopics) {
return inactiveTopics.stream()
.map(topic -> topic.limitNotificationsHistory(properties.notificationsHistoryLimit()))
.toList();
}

private void measureInactiveTopics(List<InactiveTopic> processedTopics) {
processedTopics.stream()
.collect(
Collectors.groupingBy(
topic -> topic.notificationTimestampsMs().size(), Collectors.counting()))
.forEach(
(notificationsCount, topicsCount) -> {
Tags tags = Tags.of("notifications", notificationsCount.toString());
meterRegistry.gauge("inactive-topics", tags, topicsCount);
});
}
}
Loading

0 comments on commit 9f97071

Please sign in to comment.