From 383152226d0eaf379dfee07723a8185123ab8d9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20R=C5=BCysko?= Date: Mon, 6 Nov 2023 11:34:20 +0100 Subject: [PATCH] Ensure ConsumerNodesRegistry is thread safe (#1776) --- .../consumers/registry/ConsumerNodesRegistry.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/registry/ConsumerNodesRegistry.java b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/registry/ConsumerNodesRegistry.java index 6b61cd5932..af6346de5e 100644 --- a/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/registry/ConsumerNodesRegistry.java +++ b/hermes-consumers/src/main/java/pl/allegro/tech/hermes/consumers/registry/ConsumerNodesRegistry.java @@ -1,5 +1,6 @@ package pl.allegro.tech.hermes.consumers.registry; +import org.apache.commons.lang.StringUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; @@ -17,6 +18,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -32,7 +34,7 @@ public class ConsumerNodesRegistry extends PathChildrenCache implements PathChil private final ConsumerNodesRegistryPaths registryPaths; private final String consumerNodeId; private final LeaderLatch leaderLatch; - private final Map consumersLastSeen = new HashMap<>(); + private final Map consumersLastSeen = new ConcurrentHashMap<>(); private final long deathOfConsumerAfterMillis; private final Clock clock; @@ -97,11 +99,18 @@ public List listConsumerNodes() { return new ArrayList<>(consumersLastSeen.keySet()); } - public void refresh() { + public synchronized void refresh() { logger.info("Refreshing current consumers registry"); long currentTime = clock.millis(); - readCurrentNodes().forEach(node -> consumersLastSeen.put(node, currentTime)); + List currentNodes = readCurrentNodes(); + List validNodes = currentNodes.stream() + .filter(StringUtils::isNotBlank) + .toList(); + if (currentNodes.size() != validNodes.size()) { + logger.warn("Found {} invalid consumer nodes.", currentNodes.size() - validNodes.size()); + } + validNodes.forEach(node -> consumersLastSeen.put(node, currentTime)); List deadConsumers = findDeadConsumers(currentTime); if (!deadConsumers.isEmpty()) {