Skip to content

Commit

Permalink
SKYEDEN-3271 |hermes management leader
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcinBobinski committed Dec 3, 2024
1 parent 8194321 commit a2cf30f
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 29 deletions.
9 changes: 7 additions & 2 deletions docs/docs/configuration/inactive-topics-detection.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ and configure other options in the Hermes Management configuration.
detection.inactive-topics.enabled | enable inactive topics detection | false
detection.inactive-topics.inactivity-threshold | duration after which a topic is considered inactive and first notified | 60d
detection.inactive-topics.next-notification-threshold | duration after previous notification after which a topic is notified again | 14d
detection.inactive-topics.whitelisted-qualified-topic-names | list of qualified topic names that will not be notified event if inactive | []
detection.inactive-topics.leader-election-zookeeper-dc | datacenter of Zookeeper used for leader election for the detection job | dc
detection.inactive-topics.whitelisted-qualified-topic-names | list of qualified topic names that will not be notified event if inactive | []
detection.inactive-topics.cron | cron expression for the detection job | 0 0 8 * * *
detection.inactive-topics.notifications-history-limit | how many notification timestamps will be kept in history | 5

The detection job runs on a single instance of Hermes Management that is a
leader based on the leader election Zookeeper instance.

| Option | Description | Default Value |
|-------------------------------------|----------------------------------------------------------------------------|---------------|
| management.leadership.zookeeper-dc | Specifies the datacenter of the Zookeeper instance used for leader election in the detection job | dc |



To make notifying work, you need to provide an implementation of
`pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsNotifier`
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import java.time.Clock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -19,8 +20,11 @@
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.util.InetAddressInstanceIdResolver;
import pl.allegro.tech.hermes.common.util.InstanceIdResolver;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.management.domain.subscription.SubscriptionLagSource;
import pl.allegro.tech.hermes.management.infrastructure.leader.ManagementLeadership;
import pl.allegro.tech.hermes.management.infrastructure.metrics.NoOpSubscriptionLagSource;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager;
import pl.allegro.tech.hermes.metrics.PathsCompiler;

@Configuration
Expand All @@ -29,7 +33,7 @@
HttpClientProperties.class,
ConsistencyCheckerProperties.class,
PrometheusProperties.class,
MicrometerRegistryProperties.class
MicrometerRegistryProperties.class,
})
public class ManagementConfiguration {

Expand Down Expand Up @@ -85,4 +89,12 @@ public SubscriptionLagSource consumerLagSource() {
public Clock clock() {
return new ClockFactory().provide();
}

@Bean
ManagementLeadership inactiveTopicsDetectionLeader(
ZookeeperClientManager zookeeperClientManager,
@Value("${management.leadership.zookeeper-dc}") String leaderElectionDc,
ZookeeperPaths zookeeperPaths) {
return new ManagementLeadership(zookeeperClientManager, leaderElectionDc, zookeeperPaths);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,21 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsDetectionJob;
import pl.allegro.tech.hermes.management.infrastructure.detection.InactiveTopicsDetectionLeader;
import pl.allegro.tech.hermes.management.infrastructure.detection.InactiveTopicsDetectionScheduler;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager;
import pl.allegro.tech.hermes.management.infrastructure.leader.ManagementLeadership;

@Configuration
@EnableConfigurationProperties(InactiveTopicsDetectionProperties.class)
@EnableScheduling
public class InactiveTopicsDetectionConfig {
@ConditionalOnProperty(
prefix = "detection.inactive-topics",
value = "enabled",
havingValue = "true")
@Bean
InactiveTopicsDetectionLeader inactiveTopicsDetectionLeader(
ZookeeperClientManager zookeeperClientManager,
InactiveTopicsDetectionProperties properties,
ZookeeperPaths zookeeperPaths) {
return new InactiveTopicsDetectionLeader(zookeeperClientManager, properties, zookeeperPaths);
}

@ConditionalOnProperty(
prefix = "detection.inactive-topics",
value = "enabled",
havingValue = "true")
@Bean
InactiveTopicsDetectionScheduler inactiveTopicsDetectionScheduler(
InactiveTopicsDetectionJob job, InactiveTopicsDetectionLeader leader) {
InactiveTopicsDetectionJob job, ManagementLeadership leader) {
return new InactiveTopicsDetectionScheduler(job, leader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import pl.allegro.tech.hermes.management.domain.detection.InactiveTopicsDetectionJob;
import pl.allegro.tech.hermes.management.infrastructure.leader.ManagementLeadership;

public class InactiveTopicsDetectionScheduler {
private final InactiveTopicsDetectionJob job;
private final InactiveTopicsDetectionLeader leader;
private final ManagementLeadership leader;

private static final Logger logger =
LoggerFactory.getLogger(InactiveTopicsDetectionScheduler.class);

public InactiveTopicsDetectionScheduler(
InactiveTopicsDetectionJob job, InactiveTopicsDetectionLeader leader) {
InactiveTopicsDetectionJob job, ManagementLeadership leader) {
this.leader = leader;
this.job = job;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package pl.allegro.tech.hermes.management.infrastructure.detection;
package pl.allegro.tech.hermes.management.infrastructure.leader;

import jakarta.annotation.PostConstruct;
import java.util.Optional;
Expand All @@ -7,22 +7,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.infrastructure.zookeeper.ZookeeperPaths;
import pl.allegro.tech.hermes.management.config.detection.InactiveTopicsDetectionProperties;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClient;
import pl.allegro.tech.hermes.management.infrastructure.zookeeper.ZookeeperClientManager;

public class InactiveTopicsDetectionLeader {
public class ManagementLeadership {

private final String leaderElectionDc;
private final Optional<LeaderLatch> leaderLatch;

private static final Logger logger = LoggerFactory.getLogger(InactiveTopicsDetectionLeader.class);
private static final Logger logger = LoggerFactory.getLogger(ManagementLeadership.class);

public InactiveTopicsDetectionLeader(
public ManagementLeadership(
ZookeeperClientManager zookeeperClientManager,
InactiveTopicsDetectionProperties inactiveTopicsDetectionProperties,
String leaderElectionDc,
ZookeeperPaths zookeeperPaths) {
this.leaderElectionDc = inactiveTopicsDetectionProperties.leaderElectionZookeeperDc();
this.leaderElectionDc = leaderElectionDc;
Optional<CuratorFramework> leaderCuratorFramework =
zookeeperClientManager.getClients().stream()
.filter(it -> it.getDatacenterName().equals(leaderElectionDc))
Expand Down
3 changes: 2 additions & 1 deletion hermes-management/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ management:
health:
periodSeconds: 30
enabled: true
leadership:
zookeeper-dc: dc

audit:
isLoggingAuditEnabled: false
Expand Down Expand Up @@ -81,6 +83,5 @@ detection:
inactivity-threshold: 60d
next-notification-threshold: 14d
whitelisted-qualified-topic-names: []
leader-election-zookeeper-dc: dc
cron: "0 0 8 * * *"
notifications-history-limit: 5

0 comments on commit a2cf30f

Please sign in to comment.