diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index b9f09c698bf..d384e02c5bc 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -32,6 +32,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -87,6 +88,7 @@ import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.AmpleImpl; import org.apache.accumulo.core.rpc.SaslConnectionParams; @@ -237,7 +239,7 @@ public ClientContext(SingletonReservation reservation, ClientInfo info, }); this.zooCache = memoize(() -> new ZooCache(getZooSession(), - ZooCache.createPersistentWatcherPaths(ZooUtil.getRoot(getInstanceID())))); + createPersistentWatcherPaths(ZooUtil.getRoot(getInstanceID())))); this.accumuloConf = serverConf; timeoutSupplier = memoizeWithExpiration( () -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS); @@ -1066,7 +1068,7 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() { var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get(); String zkRoot = getZooKeeperRoot(); this.zkLockChecker = - new ZookeeperLockChecker(new ZooCache(zk, List.of(zkRoot + Constants.ZTSERVERS)), zkRoot); + new ZookeeperLockChecker(new ZooCache(zk, Set.of(zkRoot + Constants.ZTSERVERS)), zkRoot); } return this.zkLockChecker; } @@ -1079,4 +1081,15 @@ public NamespaceMapping getNamespaces() { return namespaces; } + private static Set createPersistentWatcherPaths(String zkRoot) { + Set pathsToWatch = new HashSet<>(); + for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, + Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK, + Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES, + Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET)) { + pathsToWatch.add(zkRoot + path); + } + return pathsToWatch; + } + } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index 77fe858206c..4cd321256ba 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@ -36,11 +36,9 @@ import java.util.function.Consumer; import java.util.function.Predicate; -import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; -import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.util.cache.Caches; import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.zookeeper.KeeperException; @@ -65,17 +63,6 @@ public interface ZooCacheWatcher extends Consumer {} private static final Logger log = LoggerFactory.getLogger(ZooCache.class); - public static List createPersistentWatcherPaths(String zkRoot) { - List pathsToWatch = new ArrayList<>(); - for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, - Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK, - Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES, - Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET)) { - pathsToWatch.add(zkRoot + path); - } - return pathsToWatch; - } - protected final TreeSet watchedPaths = new TreeSet<>(); // visible for tests protected final ZCacheWatcher watcher = new ZCacheWatcher(); @@ -97,6 +84,8 @@ public static List createPersistentWatcherPaths(String zkRoot) { // their compute functions. private final ConcurrentMap nodeCache; + private final Set pathsToWatch; + private final ZooSession zk; private volatile boolean closed = false; @@ -177,7 +166,9 @@ public void process(WatchedEvent event) { clear(); break; case SyncConnected: - log.trace("{} ZooKeeper connection established, ignoring; {}", cacheId, event); + log.trace("{} ZooKeeper connection established, re-establishing watchers; {}", + cacheId, event); + setupWatchers(pathsToWatch); break; case Expired: log.trace("{} ZooKeeper connection expired, clearing cache; {}", cacheId, event); @@ -207,24 +198,18 @@ public void process(WatchedEvent event) { * @param zk ZooSession for this instance * @param pathsToWatch Paths in ZooKeeper to watch */ - public ZooCache(ZooSession zk, List pathsToWatch) { - this.zk = requireNonNull(zk); - this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false) - .ticker(Ticker.systemTicker()).expireAfterAccess(CACHE_DURATION).build(); - this.nodeCache = cache.asMap(); - - setupWatchers(requireNonNull(pathsToWatch)); - log.trace("{} created new cache", cacheId, new Exception()); + public ZooCache(ZooSession zk, Set pathsToWatch) { + this(zk, pathsToWatch, Ticker.systemTicker()); } // for tests that use a Ticker - public ZooCache(ZooSession zk, List pathsToWatch, Ticker ticker) { + public ZooCache(ZooSession zk, Set pathsToWatch, Ticker ticker) { this.zk = requireNonNull(zk); this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false) .ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build(); this.nodeCache = cache.asMap(); - - setupWatchers(requireNonNull(pathsToWatch)); + this.pathsToWatch = requireNonNull(pathsToWatch); + setupWatchers(pathsToWatch); log.trace("{} created new cache", cacheId, new Exception()); } @@ -233,7 +218,7 @@ public void addZooCacheWatcher(ZooCacheWatcher watcher) { } // Visible for testing - protected void setupWatchers(List pathsToWatch) { + protected void setupWatchers(Set pathsToWatch) { for (String left : pathsToWatch) { for (String right : pathsToWatch) { @@ -584,7 +569,7 @@ public boolean childrenCached(String zPath) { public void clear(Predicate pathPredicate) { Preconditions.checkState(!closed); Predicate pathPredicateWrapper = path -> { - boolean testResult = isWatchedPath(path) && pathPredicate.test(path); + boolean testResult = pathPredicate.test(path); if (testResult) { updateCount.incrementAndGet(); log.trace("{} removing {} from cache", cacheId, path); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java index 034b298f1c1..9c99b4ce355 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java @@ -33,6 +33,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; +import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.Constants; @@ -54,12 +55,12 @@ public class ZooCacheTest { */ private static class TestZooCache extends ZooCache { - public TestZooCache(ZooSession zk, List pathsToWatch) { + public TestZooCache(ZooSession zk, Set pathsToWatch) { super(zk, pathsToWatch); } @Override - protected void setupWatchers(List pathsToWatch) { + protected void setupWatchers(Set pathsToWatch) { for (String path : pathsToWatch) { watchedPaths.add(path); } @@ -84,15 +85,15 @@ public void executeWatcher(WatchedEvent event) { @BeforeEach public void setUp() { zk = createStrictMock(ZooSession.class); - zc = new TestZooCache(zk, List.of(root)); + zc = new TestZooCache(zk, Set.of(root)); } @Test public void testOverlappingPaths() { assertThrows(IllegalArgumentException.class, - () -> new ZooCache(zk, List.of(root, root + "/localhost:9995"))); + () -> new ZooCache(zk, Set.of(root, root + "/localhost:9995"))); - List goodPaths = List.of("/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/compactors", + Set goodPaths = Set.of("/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/compactors", "/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/dead/tservers", "/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/gc/lock", "/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/managers/lock", @@ -336,7 +337,7 @@ private void testWatchDataNode(byte[] initialData, Watcher.Event.EventType event WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH); TestWatcher exw = new TestWatcher(event); - zc = new TestZooCache(zk, List.of(root)); + zc = new TestZooCache(zk, Set.of(root)); zc.addZooCacheWatcher(exw); watchData(initialData); @@ -447,7 +448,7 @@ private void testGetBoth(boolean getDataFirst) throws Exception { private void testWatchDataNode_Clear(Watcher.Event.KeeperState state) throws Exception { WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, state, null); TestWatcher exw = new TestWatcher(event); - zc = new TestZooCache(zk, List.of(root)); + zc = new TestZooCache(zk, Set.of(root)); zc.addZooCacheWatcher(exw); watchData(DATA); @@ -482,7 +483,7 @@ private void testWatchChildrenNode(List initialChildren, WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH); TestWatcher exw = new TestWatcher(event); - zc = new TestZooCache(zk, List.of(root)); + zc = new TestZooCache(zk, Set.of(root)); zc.addZooCacheWatcher(exw); watchChildren(initialChildren); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java index fe0a49436c9..dee9729f516 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java @@ -25,6 +25,7 @@ import java.io.ObjectOutputStream; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.UUID; @@ -47,7 +48,7 @@ public static void main(String[] args) throws Exception { myfile.deleteOnExit(); try (var zk = new ZooSession(CacheTestReader.class.getSimpleName(), keepers, 30_000, null)) { - ZooCache zc = new ZooCache(zk, List.of("/")); + ZooCache zc = new ZooCache(zk, Set.of("/")); while (true) { if (myfile.exists() && !myfile.delete()) { diff --git a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java index e6402db3c1f..742aaa1506b 100644 --- a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java +++ b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java @@ -69,7 +69,7 @@ public static class TestZooCache extends ZooCache { private static final ZooCacheTicker ticker = new ZooCacheTicker(); - public TestZooCache(ZooSession zk, List pathsToWatch) { + public TestZooCache(ZooSession zk, Set pathsToWatch) { super(zk, pathsToWatch, ticker); } @@ -105,7 +105,7 @@ public void testGetChildren() throws Exception { final String root = Constants.ZROOT + UUID.randomUUID().toString(); final String base = root + Constants.ZTSERVERS; - TestZooCache zooCache = new TestZooCache(zk, List.of(base)); + TestZooCache zooCache = new TestZooCache(zk, Set.of(base)); zrw.mkdirs(base + "/test2"); zrw.mkdirs(base + "/test3/c1");