diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index e96cbd6827b..b9f09c698bf 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -48,6 +48,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -235,7 +236,8 @@ public ClientContext(SingletonReservation reservation, ClientInfo info, return zk; }); - this.zooCache = memoize(() -> new ZooCache(getZooSession(), ZooUtil.getRoot(getInstanceID()))); + this.zooCache = memoize(() -> new ZooCache(getZooSession(), + ZooCache.createPersistentWatcherPaths(ZooUtil.getRoot(getInstanceID())))); this.accumuloConf = serverConf; timeoutSupplier = memoizeWithExpiration( () -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS); @@ -1063,7 +1065,8 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() { // this needs to be fixed; TODO https://github.com/apache/accumulo/issues/2301 var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get(); String zkRoot = getZooKeeperRoot(); - this.zkLockChecker = new ZookeeperLockChecker(new ZooCache(zk, zkRoot), zkRoot); + this.zkLockChecker = + new ZookeeperLockChecker(new ZooCache(zk, List.of(zkRoot + Constants.ZTSERVERS)), zkRoot); } return this.zkLockChecker; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index 641b73d79ed..102bb5bb020 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@ -28,6 +28,7 @@ import java.util.ConcurrentModificationException; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -55,8 +56,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; - /** * A cache for values stored in ZooKeeper. Values are kept up to date as they change. */ @@ -66,10 +65,16 @@ public interface ZooCacheWatcher extends Consumer {} private static final Logger log = LoggerFactory.getLogger(ZooCache.class); - protected static final String[] ALLOWED_PATHS = new String[] {Constants.ZCOMPACTORS, - Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, - Constants.ZMONITOR_LOCK, Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, - Constants.ZTABLES, Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET}; + public static List createPersistentWatcherPaths(String zkRoot) { + List pathsToWatch = new ArrayList<>(); + for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, + Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK, + Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES, + Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET)) { + pathsToWatch.add(zkRoot + path); + } + return pathsToWatch; + } protected final TreeSet watchedPaths = new TreeSet<>(); // visible for tests @@ -82,23 +87,15 @@ public interface ZooCacheWatcher extends Consumer {} public static final Duration CACHE_DURATION = Duration.ofMinutes(30); - // public and non-final because this is being set - // in tests to test the eviction - @SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL", - justification = "being set in tests for eviction test") - public static Ticker ticker = Ticker.systemTicker(); - // Construct this here, otherwise end up with NPE in some cases // when the Watcher tries to access nodeCache. Alternative would // be to mark nodeCache as volatile. - private final Cache cache = - Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false).ticker(ticker) - .expireAfterAccess(CACHE_DURATION).build(); + private final Cache cache; // The concurrent map returned by Caffiene will only allow one thread to run at a time for a given // key and ZooCache relies on that. Not all concurrent map implementations have this behavior for // their compute functions. - private final ConcurrentMap nodeCache = cache.asMap(); + private final ConcurrentMap nodeCache; private final ZooSession zk; @@ -140,18 +137,25 @@ class ZCacheWatcher implements Watcher { @Override public void process(WatchedEvent event) { if (log.isTraceEnabled()) { - log.trace("{} {}: {}", cacheId, event.getType(), event); + log.trace("{}: {}", cacheId, event); } switch (event.getType()) { - case NodeDataChanged: case NodeChildrenChanged: - case NodeCreated: - case NodeDeleted: + // According to documentation we should not receive this event. + // According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475 we + // may receive this event (Fixed in 3.9.0) + break; case ChildWatchRemoved: case DataWatchRemoved: - // This code use to call remove(path), but that was when a Watcher was set - // on each node. With the Watcher being set at a higher level we need to remove + // We don't need to do anything with the cache on these events. + break; + case NodeDataChanged: + clear(path -> path.equals(event.getPath())); + break; + case NodeCreated: + case NodeDeleted: + // With the Watcher being set at a higher level we need to remove // the parent of the affected node and all of its children from the cache // so that the parent and children node can be re-cached. If we only remove the // affected node, then the cached children in the parent could be incorrect. @@ -193,12 +197,34 @@ public void process(WatchedEvent event) { } } - public ZooCache(ZooSession zk, String root, ZooCacheWatcher... watchers) { + /** + * Creates a ZooCache instance that uses the supplied ZooSession for communicating with the + * instance's ZooKeeper servers. The ZooCache will create persistent watchers at the given + * pathsToWatch, if any, to be updated when changes are made in ZooKeeper for nodes at or below in + * the tree. If ZooCacheWatcher's are added via {@code addZooCacheWatcher}, then they will be + * notified when this object is notified of changes via the PersistentWatcher callback. + * + * @param zk ZooSession for this instance + * @param pathsToWatch Paths in ZooKeeper to watch + */ + public ZooCache(ZooSession zk, List pathsToWatch) { this.zk = requireNonNull(zk); - for (ZooCacheWatcher zcw : watchers) { - externalWatchers.add(zcw); - } - setupWatchers(requireNonNull(root)); + this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false) + .ticker(Ticker.systemTicker()).expireAfterAccess(CACHE_DURATION).build(); + this.nodeCache = cache.asMap(); + + setupWatchers(requireNonNull(pathsToWatch)); + log.trace("{} created new cache", cacheId, new Exception()); + } + + // for tests that use a Ticker + public ZooCache(ZooSession zk, List pathsToWatch, Ticker ticker) { + this.zk = requireNonNull(zk); + this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false) + .ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build(); + this.nodeCache = cache.asMap(); + + setupWatchers(requireNonNull(pathsToWatch)); log.trace("{} created new cache", cacheId, new Exception()); } @@ -207,13 +233,12 @@ public void addZooCacheWatcher(ZooCacheWatcher watcher) { } // Visible for testing - protected void setupWatchers(String zooRoot) { + protected void setupWatchers(List pathsToWatch) { try { - for (String path : ALLOWED_PATHS) { - final String zPath = zooRoot + path; - watchedPaths.add(zPath); - zk.addPersistentRecursiveWatcher(zPath, this.watcher); - log.trace("Added persistent recursive watcher at {}", zPath); + for (String path : pathsToWatch) { + watchedPaths.add(path); + zk.addPersistentRecursiveWatcher(path, this.watcher); + log.trace("Added persistent recursive watcher at {}", path); } } catch (KeeperException | InterruptedException e) { throw new RuntimeException("Error setting up persistent recursive watcher", e); @@ -558,7 +583,6 @@ public void clear(Predicate pathPredicate) { return testResult; }; nodeCache.keySet().removeIf(pathPredicateWrapper); - updateCount.incrementAndGet(); } /** diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java index 822ca0460fe..1edbdfb580f 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java @@ -53,15 +53,14 @@ public class ZooCacheTest { */ private static class TestZooCache extends ZooCache { - public TestZooCache(ZooSession zk, String root, ZooCacheWatcher... watchers) { - super(zk, root, watchers); + public TestZooCache(ZooSession zk, List pathsToWatch) { + super(zk, pathsToWatch); } @Override - protected void setupWatchers(String zooRoot) { - for (String path : ALLOWED_PATHS) { - final String zPath = zooRoot + path; - watchedPaths.add(zPath); + protected void setupWatchers(List pathsToWatch) { + for (String path : pathsToWatch) { + watchedPaths.add(path); } } @@ -72,8 +71,8 @@ public void executeWatcher(WatchedEvent event) { } - private static final String instancePath = Constants.ZROOT + "/" + UUID.randomUUID().toString(); - private static final String root = instancePath + Constants.ZTSERVERS; + private static final String root = + Constants.ZROOT + "/" + UUID.randomUUID().toString() + Constants.ZTSERVERS; private static final String ZPATH = root + "/testPath"; private static final byte[] DATA = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; private static final List CHILDREN = java.util.Arrays.asList("huey", "dewey", "louie"); @@ -84,7 +83,7 @@ public void executeWatcher(WatchedEvent event) { @BeforeEach public void setUp() { zk = createStrictMock(ZooSession.class); - zc = new TestZooCache(zk, instancePath); + zc = new TestZooCache(zk, List.of(root)); } @Test @@ -314,7 +313,8 @@ private void testWatchDataNode(byte[] initialData, Watcher.Event.EventType event WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH); TestWatcher exw = new TestWatcher(event); - zc = new TestZooCache(zk, instancePath, exw); + zc = new TestZooCache(zk, List.of(root)); + zc.addZooCacheWatcher(exw); watchData(initialData); zc.executeWatcher(event); @@ -424,7 +424,8 @@ private void testGetBoth(boolean getDataFirst) throws Exception { private void testWatchDataNode_Clear(Watcher.Event.KeeperState state) throws Exception { WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, state, null); TestWatcher exw = new TestWatcher(event); - zc = new TestZooCache(zk, instancePath, exw); + zc = new TestZooCache(zk, List.of(root)); + zc.addZooCacheWatcher(exw); watchData(DATA); assertTrue(zc.dataCached(ZPATH)); @@ -440,7 +441,7 @@ public void testWatchChildrenNode_Deleted() throws Exception { @Test public void testWatchChildrenNode_ChildrenChanged() throws Exception { - testWatchChildrenNode(CHILDREN, Watcher.Event.EventType.NodeChildrenChanged, false); + testWatchChildrenNode(CHILDREN, Watcher.Event.EventType.NodeChildrenChanged, true); } @Test @@ -458,7 +459,8 @@ private void testWatchChildrenNode(List initialChildren, WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH); TestWatcher exw = new TestWatcher(event); - zc = new TestZooCache(zk, instancePath, exw); + zc = new TestZooCache(zk, List.of(root)); + zc.addZooCacheWatcher(exw); watchChildren(initialChildren); zc.executeWatcher(event); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java index 9984943ce7f..3c749d8df8e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java @@ -92,7 +92,7 @@ static synchronized void listInstances(String keepers, boolean printAll, boolean try (var zk = new ZooSession(ListInstances.class.getSimpleName(), keepers, ZOOKEEPER_TIMER_MILLIS, null)) { ZooReader rdr = zk.asReader(); - ZooCache cache = new ZooCache(zk, Constants.ZROOT); + ZooCache cache = new ZooCache(zk, List.of(Constants.ZROOT)); TreeMap instanceNames = getInstanceNames(rdr, printErrors); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java index ad717ff524d..fe0a49436c9 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java @@ -47,7 +47,7 @@ public static void main(String[] args) throws Exception { myfile.deleteOnExit(); try (var zk = new ZooSession(CacheTestReader.class.getSimpleName(), keepers, 30_000, null)) { - ZooCache zc = new ZooCache(zk, "/"); + ZooCache zc = new ZooCache(zk, List.of("/")); while (true) { if (myfile.exists() && !myfile.delete()) { diff --git a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java index 5d8718f0c38..e6402db3c1f 100644 --- a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java +++ b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java @@ -20,24 +20,19 @@ import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; -import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.UUID; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.zookeeper.ZooCache; -import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZooCacheWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.accumulo.test.util.Wait; -import org.apache.zookeeper.Watcher; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -70,10 +65,19 @@ public void reset() { } + public static class TestZooCache extends ZooCache { + + private static final ZooCacheTicker ticker = new ZooCacheTicker(); + + public TestZooCache(ZooSession zk, List pathsToWatch) { + super(zk, pathsToWatch, ticker); + } + + } + private ZooKeeperTestingServer szk; private ZooSession zk; private ZooReaderWriter zrw; - private ZooCacheTicker ticker = new ZooCacheTicker(); @TempDir private File tempDir; @@ -85,7 +89,6 @@ public void setup() throws Exception { szk = new ZooKeeperTestingServer(tempDir); zk = szk.newClient(); zrw = zk.asReaderWriter(); - ZooCache.ticker = ticker; } @AfterEach @@ -100,16 +103,9 @@ public void teardown() throws Exception { @Test public void testGetChildren() throws Exception { - Set watchesRemoved = Collections.synchronizedSet(new HashSet<>()); - ZooCacheWatcher watcher = event -> { - if (event.getType() == Watcher.Event.EventType.ChildWatchRemoved - || event.getType() == Watcher.Event.EventType.DataWatchRemoved) { - watchesRemoved.add(event.getPath()); - } - }; final String root = Constants.ZROOT + UUID.randomUUID().toString(); - ZooCache zooCache = new ZooCache(zk, root, watcher); final String base = root + Constants.ZTSERVERS; + TestZooCache zooCache = new TestZooCache(zk, List.of(base)); zrw.mkdirs(base + "/test2"); zrw.mkdirs(base + "/test3/c1"); @@ -148,7 +144,6 @@ public void testGetChildren() throws Exception { assertEquals(List.of(), zooCache.getChildren(base + "/test1")); assertEquals(List.of(), zooCache.getChildren(base + "/test2")); assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3"))); - assertEquals(uc5, zooCache.getUpdateCount()); long uc5b = zooCache.getUpdateCount(); assertTrue(uc5 < uc5b); @@ -223,16 +218,14 @@ public void testGetChildren() throws Exception { assertEquals(uc10, zooCache.getUpdateCount()); // wait for the cache to evict and clear watches - ticker.advance(); + TestZooCache.ticker.advance(); Wait.waitFor(() -> { // the cache will not run its eviction handler unless accessed, so access something that is // not expected to be evicted zooCache.getChildren(base + "/test4"); - return watchesRemoved.equals(Set.of("/test1", "/test2", "/test3")); + return zooCache.childrenCached(base + "/test1") == false + && zooCache.childrenCached(base + "/test2") == false + && zooCache.childrenCached(base + "/test3") == false; }); - - assertFalse(zooCache.childrenCached(base + "/test1")); - assertFalse(zooCache.childrenCached(base + "/test2")); - assertFalse(zooCache.childrenCached(base + "/test3")); } }