Skip to content

Commit

Permalink
Ensure ConsumerNodesRegistry is thread safe (#1776)
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrrzysko authored Nov 6, 2023
1 parent cbf655c commit 3831522
Showing 1 changed file with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.allegro.tech.hermes.consumers.registry;

import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
Expand All @@ -17,6 +18,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

Expand All @@ -32,7 +34,7 @@ public class ConsumerNodesRegistry extends PathChildrenCache implements PathChil
private final ConsumerNodesRegistryPaths registryPaths;
private final String consumerNodeId;
private final LeaderLatch leaderLatch;
private final Map<String, Long> consumersLastSeen = new HashMap<>();
private final Map<String, Long> consumersLastSeen = new ConcurrentHashMap<>();
private final long deathOfConsumerAfterMillis;
private final Clock clock;

Expand Down Expand Up @@ -97,11 +99,18 @@ public List<String> listConsumerNodes() {
return new ArrayList<>(consumersLastSeen.keySet());
}

public void refresh() {
public synchronized void refresh() {
logger.info("Refreshing current consumers registry");

long currentTime = clock.millis();
readCurrentNodes().forEach(node -> consumersLastSeen.put(node, currentTime));
List<String> currentNodes = readCurrentNodes();
List<String> validNodes = currentNodes.stream()
.filter(StringUtils::isNotBlank)
.toList();
if (currentNodes.size() != validNodes.size()) {
logger.warn("Found {} invalid consumer nodes.", currentNodes.size() - validNodes.size());
}
validNodes.forEach(node -> consumersLastSeen.put(node, currentTime));

List<String> deadConsumers = findDeadConsumers(currentTime);
if (!deadConsumers.isEmpty()) {
Expand Down

0 comments on commit 3831522

Please sign in to comment.