diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index 966ca3b62e3..b6562bc82c2 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -235,7 +235,8 @@ public ClientContext(SingletonReservation reservation, ClientInfo info, return zk; }); - this.zooCache = memoize(() -> new ZooCache(getZooSession())); + this.zooCache = memoize( + () -> new ZooCache(getZooSession(), Optional.empty(), ZooUtil.getRoot(getInstanceID()))); this.accumuloConf = serverConf; timeoutSupplier = memoizeWithExpiration( () -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS); @@ -1061,8 +1062,9 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() { // because that client could be closed, and its ZooSession also closed // this needs to be fixed; TODO https://github.com/apache/accumulo/issues/2301 var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get(); - this.zkLockChecker = - new ZookeeperLockChecker(new ZooCache(zk), getZooKeeperRoot(), getServerPaths()); + this.zkLockChecker = new ZookeeperLockChecker( + new ZooCache(zk, Optional.empty(), ZooUtil.getRoot(getInstanceID())), getZooKeeperRoot(), + getServerPaths()); } return this.zkLockChecker; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index af232424305..80607363fb7 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@ -26,15 +26,18 @@ import java.util.ConcurrentModificationException; import java.util.List; import java.util.Optional; +import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; import java.util.function.Predicate; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.util.cache.Caches; import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.zookeeper.KeeperException; @@ -46,10 +49,12 @@ import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Ticker; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + /** * A cache for values stored in ZooKeeper. Values are kept up to date as they change. */ @@ -59,16 +64,38 @@ public interface ZooCacheWatcher extends Consumer {} private static final Logger log = LoggerFactory.getLogger(ZooCache.class); - private final ZCacheWatcher watcher = new ZCacheWatcher(); + protected static final String[] ALLOWED_PATHS = new String[] {Constants.ZCOMPACTORS, + Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, + Constants.ZMONITOR_LOCK, Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, + Constants.ZTABLES, Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET}; + + protected final TreeSet watchedPaths = new TreeSet<>(); + // visible for tests + protected final ZCacheWatcher watcher = new ZCacheWatcher(); private final Optional externalWatcher; private static final AtomicLong nextCacheId = new AtomicLong(0); private final String cacheId = "ZC" + nextCacheId.incrementAndGet(); + public static final Duration CACHE_DURATION = Duration.ofMinutes(30); + + // public and non-final because this is being set + // in tests to test the eviction + @SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL", + justification = "being set in tests for eviction test") + public static Ticker ticker = Ticker.systemTicker(); + + // Construct this here, otherwise end up with NPE in some cases + // when the Watcher tries to access nodeCache. Alternative would + // be to mark nodeCache as volatile. + private final Cache cache = + Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false).ticker(ticker) + .expireAfterAccess(CACHE_DURATION).build(); + // The concurrent map returned by Caffiene will only allow one thread to run at a time for a given // key and ZooCache relies on that. Not all concurrent map implementations have this behavior for // their compute functions. - private final ConcurrentMap nodeCache; + private final ConcurrentMap nodeCache = cache.asMap(); private final ZooSession zk; @@ -106,11 +133,11 @@ public long getMzxid() { private final AtomicLong updateCount = new AtomicLong(0); - private class ZCacheWatcher implements Watcher { + class ZCacheWatcher implements Watcher { @Override public void process(WatchedEvent event) { if (log.isTraceEnabled()) { - log.trace("{}: {}", cacheId, event); + log.trace("{} {}: {}", cacheId, event.getType(), event); } switch (event.getType()) { @@ -120,7 +147,14 @@ public void process(WatchedEvent event) { case NodeDeleted: case ChildWatchRemoved: case DataWatchRemoved: - remove(event.getPath()); + // This code use to call remove(path), but that was when a Watcher was set + // on each node. With the Watcher being set at a higher level we need to remove + // the parent of the affected node and all of its children from the cache + // so that the parent and children node can be re-cached. If we only remove the + // affected node, then the cached children in the parent could be incorrect. + int lastSlash = event.getPath().lastIndexOf('/'); + String parent = lastSlash == 0 ? "/" : event.getPath().substring(0, lastSlash); + clear((path) -> path.startsWith(parent)); break; case None: switch (event.getState()) { @@ -156,49 +190,43 @@ public void process(WatchedEvent event) { } } - /** - * Creates a new cache without an external watcher. - * - * @param zk the ZooKeeper instance - * @throws NullPointerException if zk is {@code null} - */ - public ZooCache(ZooSession zk) { - this(zk, Optional.empty(), Duration.ofMinutes(3)); + public ZooCache(ZooSession zk, Optional watcher, String root) { + this.zk = requireNonNull(zk); + this.externalWatcher = watcher; + setupWatchers(requireNonNull(root)); + log.trace("{} created new cache", cacheId, new Exception()); } - /** - * Creates a new cache. The given watcher is called whenever a watched node changes. - * - * @param zk the ZooKeeper instance - * @param watcher watcher object - * @throws NullPointerException if zk or watcher is {@code null} - */ - public ZooCache(ZooSession zk, ZooCacheWatcher watcher) { - this(zk, Optional.of(watcher), Duration.ofMinutes(3)); + // Visible for testing + protected void setupWatchers(String zooRoot) { + try { + for (String path : ALLOWED_PATHS) { + final String zPath = zooRoot + path; + watchedPaths.add(zPath); + zk.addPersistentRecursiveWatcher(zPath, this.watcher); + log.trace("Added persistent recursive watcher at {}", zPath); + } + } catch (KeeperException | InterruptedException e) { + throw new RuntimeException("Error setting up persistent recursive watcher", e); + } } - public ZooCache(ZooSession zk, Optional watcher, Duration timeout) { - this.zk = requireNonNull(zk); - this.externalWatcher = watcher; - RemovalListener removalListerner = (path, zcNode, reason) -> { - try { - log.trace("{} removing watches for {} because {} accesses {}", cacheId, path, reason, - zcNode == null ? -1 : zcNode.getAccessCount()); - zk.removeWatches(path, ZooCache.this.watcher, Watcher.WatcherType.Any, false); - } catch (InterruptedException | KeeperException | RuntimeException e) { - log.warn("{} failed to remove watches on path {} in zookeeper", cacheId, path, e); + private boolean isWatchedPath(String path) { + // Check that the path is equal to, or a descendant of, a watched path + for (String watchedPath : watchedPaths) { + if (path.startsWith(watchedPath)) { + return true; } - }; - // Must register the removal listener using evictionListener inorder for removal to be mutually - // exclusive with any other operations on the same path. This is important for watcher - // consistency, concurrently adding and removing watches for the same path would leave zoocache - // in a really bad state. The cache builder has another way to register a removal listener that - // is not mutually exclusive. - Cache cache = - Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false) - .expireAfterAccess(timeout).evictionListener(removalListerner).build(); - nodeCache = cache.asMap(); - log.trace("{} created new cache", cacheId, new Exception()); + } + return false; + } + + // Use this instead of Preconditions.checkState(isWatchedPath, String) + // so that we are not creating String unnecessarily. + private void ensureWatched(String path) { + if (!isWatchedPath(path)) { + throw new IllegalStateException("Supplied path " + path + " is not watched by this ZooCache"); + } } private abstract static class ZooRunnable { @@ -298,7 +326,7 @@ private ZcInterruptedException(InterruptedException e) { */ public List getChildren(final String zPath) { Preconditions.checkState(!closed); - + ensureWatched(zPath); ZooRunnable> zr = new ZooRunnable<>() { @Override @@ -322,12 +350,12 @@ public List run() throws KeeperException, InterruptedException { // That is ok because the compute() call on the map has a lock and processing the event // will block until compute() returns. After compute() returns the event processing // would clear the map entry. - Stat stat = zk.exists(zPath, watcher); + Stat stat = zk.exists(zPath, null); if (stat == null) { log.trace("{} getChildren saw that {} does not exists", cacheId, zPath); return ZcNode.NON_EXISTENT; } - List children = zk.getChildren(zPath, watcher); + List children = zk.getChildren(zPath, null); log.trace("{} adding {} children of {} to cache", cacheId, children.size(), zPath); return new ZcNode(children, zcn); } catch (KeeperException.NoNodeException nne) { @@ -370,6 +398,7 @@ public byte[] get(final String zPath) { */ public byte[] get(final String zPath, final ZcStat status) { Preconditions.checkState(!closed); + ensureWatched(zPath); ZooRunnable zr = new ZooRunnable<>() { @Override @@ -400,7 +429,7 @@ public byte[] run() throws KeeperException, InterruptedException { * non-existence can not be cached. */ try { - Stat stat = zk.exists(zPath, watcher); + Stat stat = zk.exists(zPath, null); if (stat == null) { if (log.isTraceEnabled()) { log.trace("{} zookeeper did not contain {}", cacheId, zPath); @@ -410,7 +439,7 @@ public byte[] run() throws KeeperException, InterruptedException { byte[] data = null; ZcStat zstat = null; try { - data = zk.getData(zPath, watcher, stat); + data = zk.getData(zPath, null, stat); zstat = new ZcStat(stat); } catch (KeeperException.BadVersionException | KeeperException.NoNodeException e1) { throw new ConcurrentModificationException(e1); @@ -456,12 +485,6 @@ protected void copyStats(ZcStat userStat, ZcStat cachedStat) { } } - private void remove(String zPath) { - nodeCache.remove(zPath); - log.trace("{} removed {} from cache", cacheId, zPath); - updateCount.incrementAndGet(); - } - /** * Clears this cache. */ @@ -493,6 +516,7 @@ public long getUpdateCount() { */ @VisibleForTesting public boolean dataCached(String zPath) { + ensureWatched(zPath); var zcn = nodeCache.get(zPath); return zcn != null && zcn.cachedData(); } @@ -505,6 +529,7 @@ public boolean dataCached(String zPath) { */ @VisibleForTesting public boolean childrenCached(String zPath) { + ensureWatched(zPath); var zcn = nodeCache.get(zPath); return zcn != null && zcn.cachedChildren(); } @@ -515,19 +540,15 @@ public boolean childrenCached(String zPath) { public void clear(Predicate pathPredicate) { Preconditions.checkState(!closed); - Predicate pathPredicateToUse; - if (log.isTraceEnabled()) { - pathPredicateToUse = path -> { - boolean testResult = pathPredicate.test(path); - if (testResult) { - log.trace("{} removing {} from cache", cacheId, path); - } - return testResult; - }; - } else { - pathPredicateToUse = pathPredicate; - } - nodeCache.keySet().removeIf(pathPredicateToUse); + Predicate pathPredicateWrapper = path -> { + boolean testResult = isWatchedPath(path) && pathPredicate.test(path); + if (testResult) { + updateCount.incrementAndGet(); + log.trace("{} removing {} from cache", cacheId, path); + } + return testResult; + }; + nodeCache.keySet().removeIf(pathPredicateWrapper); updateCount.incrementAndGet(); } @@ -537,10 +558,12 @@ public void clear(Predicate pathPredicate) { * @param zPath path of top node */ public void clear(String zPath) { + ensureWatched(zPath); clear(path -> path.startsWith(zPath)); } public Optional getLockData(ServiceLockPath path) { + ensureWatched(path.toString()); List children = ServiceLock.validateAndSort(path, getChildren(path.toString())); if (children == null || children.isEmpty()) { return Optional.empty(); diff --git a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java index 032eecc4534..0f729523116 100644 --- a/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java +++ b/core/src/main/java/org/apache/accumulo/core/zookeeper/ZooSession.java @@ -37,6 +37,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.zookeeper.AddWatchMode; import org.apache.zookeeper.AsyncCallback.StringCallback; import org.apache.zookeeper.AsyncCallback.VoidCallback; import org.apache.zookeeper.CreateMode; @@ -290,6 +291,11 @@ public void sync(final String path, VoidCallback cb, Object ctx) { verifyConnected().sync(path, cb, ctx); } + public void addPersistentRecursiveWatcher(String path, Watcher watcher) + throws KeeperException, InterruptedException { + verifyConnected().addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE); + } + @Override public void close() { if (closed.compareAndSet(false, true)) { diff --git a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java index 6d091d3aae1..4568f84f9b8 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/zookeeper/ZooCacheTest.java @@ -19,7 +19,6 @@ package org.apache.accumulo.core.fate.zookeeper; import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.capture; import static org.easymock.EasyMock.createStrictMock; import static org.easymock.EasyMock.eq; import static org.easymock.EasyMock.expect; @@ -33,7 +32,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZooCacheWatcher; import org.apache.accumulo.core.zookeeper.ZooSession; @@ -41,23 +43,53 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; -import org.easymock.Capture; -import org.easymock.EasyMock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class ZooCacheTest { - private static final String ZPATH = "/some/path/in/zk"; + + /** + * Test class that extends ZooCache to suppress the creation of the persistent recursive watchers + * that are created in the constructor and to provide access to the watcher. + */ + private static class TestZooCache extends ZooCache { + + public TestZooCache(ZooSession zk, String root) { + super(zk, Optional.empty(), root); + } + + public TestZooCache(ZooSession zk, Optional watcher, String root) { + super(zk, watcher, root); + } + + @Override + protected void setupWatchers(String zooRoot) { + for (String path : ALLOWED_PATHS) { + final String zPath = zooRoot + path; + watchedPaths.add(zPath); + } + } + + public void executeWatcher(WatchedEvent event) { + // simulate ZooKeeper calling our Watcher + watcher.process(event); + } + + } + + private static final String instancePath = Constants.ZROOT + "/" + UUID.randomUUID().toString(); + private static final String root = instancePath + Constants.ZTSERVERS; + private static final String ZPATH = root + "/testPath"; private static final byte[] DATA = {(byte) 1, (byte) 2, (byte) 3, (byte) 4}; private static final List CHILDREN = java.util.Arrays.asList("huey", "dewey", "louie"); private ZooSession zk; - private ZooCache zc; + private TestZooCache zc; @BeforeEach public void setUp() { zk = createStrictMock(ZooSession.class); - zc = new ZooCache(zk); + zc = new TestZooCache(zk, instancePath); } @Test @@ -78,8 +110,8 @@ private void testGet(boolean fillStat) throws Exception { final long ephemeralOwner = 123456789L; Stat existsStat = new Stat(); existsStat.setEphemeralOwner(ephemeralOwner); - expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat); - expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))).andReturn(DATA); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(existsStat); + expect(zk.getData(eq(ZPATH), eq(null), eq(existsStat))).andReturn(DATA); replay(zk); assertFalse(zc.dataCached(ZPATH)); @@ -287,29 +319,25 @@ private void testWatchDataNode(byte[] initialData, Watcher.Event.EventType event WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH); TestWatcher exw = new TestWatcher(event); - zc = new ZooCache(zk, exw); + zc = new TestZooCache(zk, Optional.of(exw), instancePath); - Watcher w = watchData(initialData); - w.process(event); + watchData(initialData); + zc.executeWatcher(event); assertTrue(exw.wasCalled()); assertEquals(stillCached, zc.dataCached(ZPATH)); } - private Watcher watchData(byte[] initialData) throws Exception { - Capture cw = EasyMock.newCapture(); + private void watchData(byte[] initialData) throws Exception { Stat existsStat = new Stat(); if (initialData != null) { - expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(existsStat); - expect(zk.getData(eq(ZPATH), anyObject(Watcher.class), eq(existsStat))) - .andReturn(initialData); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(existsStat); + expect(zk.getData(eq(ZPATH), eq(null), eq(existsStat))).andReturn(initialData); } else { - expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(null); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(null); } replay(zk); zc.get(ZPATH); assertTrue(zc.dataCached(ZPATH)); - - return cw.getValue(); } @Test @@ -401,11 +429,11 @@ private void testGetBoth(boolean getDataFirst) throws Exception { private void testWatchDataNode_Clear(Watcher.Event.KeeperState state) throws Exception { WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, state, null); TestWatcher exw = new TestWatcher(event); - zc = new ZooCache(zk, exw); + zc = new TestZooCache(zk, Optional.of(exw), instancePath); - Watcher w = watchData(DATA); + watchData(DATA); assertTrue(zc.dataCached(ZPATH)); - w.process(event); + zc.executeWatcher(event); assertTrue(exw.wasCalled()); assertFalse(zc.dataCached(ZPATH)); } @@ -435,27 +463,24 @@ private void testWatchChildrenNode(List initialChildren, WatchedEvent event = new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH); TestWatcher exw = new TestWatcher(event); - zc = new ZooCache(zk, exw); + zc = new TestZooCache(zk, Optional.of(exw), instancePath); - Watcher w = watchChildren(initialChildren); - w.process(event); + watchChildren(initialChildren); + zc.executeWatcher(event); assertTrue(exw.wasCalled()); assertEquals(stillCached, zc.childrenCached(ZPATH)); } - private Watcher watchChildren(List initialChildren) throws Exception { - Capture cw = EasyMock.newCapture(); + private void watchChildren(List initialChildren) throws Exception { if (initialChildren == null) { - expect(zk.exists(eq(ZPATH), capture(cw))).andReturn(null); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(null); } else { Stat existsStat = new Stat(); - expect(zk.exists(eq(ZPATH), anyObject(Watcher.class))).andReturn(existsStat); - expect(zk.getChildren(eq(ZPATH), capture(cw))).andReturn(initialChildren); + expect(zk.exists(eq(ZPATH), eq(null))).andReturn(existsStat); + expect(zk.getChildren(eq(ZPATH), eq(null))).andReturn(initialChildren); } replay(zk); zc.getChildren(ZPATH); assertTrue(zc.childrenCached(ZPATH)); - - return cw.getValue(); } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index f20aa6dfb55..32bb02a99c9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -30,13 +30,11 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZooCacheWatcher; import org.apache.accumulo.core.lock.ServiceLock; @@ -66,7 +64,6 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Suppliers; import com.google.common.net.HostAndPort; public class LiveTServerSet implements ZooCacheWatcher { @@ -209,16 +206,9 @@ static class TServerInfo { // The set of entries in zookeeper without locks, and the first time each was noticed private final Map locklessServers = new HashMap<>(); - private final Supplier zcSupplier; - public LiveTServerSet(ServerContext context, Listener cback) { this.cback = cback; this.context = context; - this.zcSupplier = Suppliers.memoize(() -> new ZooCache(context.getZooSession(), this)); - } - - public ZooCache getZooCache() { - return zcSupplier.get(); } public synchronized void startListeningForTabletServerChanges() { @@ -268,7 +258,8 @@ private synchronized void checkServer(final Set updates, final TServerInfo info = current.get(tserverPath.getServer()); ZcStat stat = new ZcStat(); - Optional sld = ServiceLock.getLockData(getZooCache(), tserverPath, stat); + Optional sld = + ServiceLock.getLockData(context.getZooCache(), tserverPath, stat); if (sld.isEmpty()) { if (info != null) { @@ -483,7 +474,7 @@ public synchronized void remove(TServerInstance server) { log.error("FATAL: {}", msg, e); Halt.halt(msg, -1); } - getZooCache().clear(slp.toString()); + context.getZooCache().clear(slp.toString()); } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java index f38cb30d611..71ac8e94c40 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/KerberosAuthenticator.java @@ -29,7 +29,6 @@ import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.clientImpl.DelegationTokenImpl; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -51,7 +50,6 @@ public class KerberosAuthenticator implements Authenticator { Set.of(KerberosToken.class.getName(), SystemToken.class.getName()); private final ZKAuthenticator zkAuthenticator = new ZKAuthenticator(); - private ZooCache zooCache; private ServerContext context; private String zkUserPath; private UserImpersonation impersonation; @@ -59,7 +57,6 @@ public class KerberosAuthenticator implements Authenticator { @Override public void initialize(ServerContext context) { this.context = context; - zooCache = new ZooCache(context.getZooSession()); impersonation = new UserImpersonation(context.getConfiguration()); zkAuthenticator.initialize(context); zkUserPath = context.zkUserPath(); @@ -71,12 +68,9 @@ public boolean validSecurityHandlers() { } private void createUserNodeInZk(String principal) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - ZooReaderWriter zoo = context.getZooSession().asReaderWriter(); - zoo.putPrivatePersistentData(zkUserPath + "/" + principal, new byte[0], - NodeExistsPolicy.FAIL); - } + context.getZooCache().clear(zkUserPath + "/" + principal); + ZooReaderWriter zoo = context.getZooSession().asReaderWriter(); + zoo.putPrivatePersistentData(zkUserPath + "/" + principal, new byte[0], NodeExistsPolicy.FAIL); } @Override @@ -84,22 +78,20 @@ public void initializeSecurity(String principal, byte[] token) { try { // remove old settings from zookeeper first, if any ZooReaderWriter zoo = context.getZooSession().asReaderWriter(); - synchronized (zooCache) { - zooCache.clear(); - if (zoo.exists(zkUserPath)) { - zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); - log.info("Removed {}/ from zookeeper", zkUserPath); - } - - // prep parent node of users with root username - // ACCUMULO-4140 The root user needs to be stored un-base64 encoded in the znode's value - byte[] principalData = principal.getBytes(UTF_8); - zoo.putPersistentData(zkUserPath, principalData, NodeExistsPolicy.FAIL); - - // Create the root user in ZK using base64 encoded name (since the name is included in the - // znode) - createUserNodeInZk(Base64.getEncoder().encodeToString(principalData)); + context.getZooCache().clear((path) -> path.startsWith(zkUserPath)); + if (zoo.exists(zkUserPath)) { + zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); + log.info("Removed {}/ from zookeeper", zkUserPath); } + + // prep parent node of users with root username + // ACCUMULO-4140 The root user needs to be stored un-base64 encoded in the znode's value + byte[] principalData = principal.getBytes(UTF_8); + zoo.putPersistentData(zkUserPath, principalData, NodeExistsPolicy.FAIL); + + // Create the root user in ZK using base64 encoded name (since the name is included in the + // znode) + createUserNodeInZk(Base64.getEncoder().encodeToString(principalData)); } catch (KeeperException | InterruptedException e) { log.error("Failed to initialize security", e); throw new IllegalStateException(e); diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java index 1bcb54f0e26..7cf087ac478 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthenticator.java @@ -29,7 +29,6 @@ import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -44,12 +43,10 @@ public final class ZKAuthenticator implements Authenticator { private ServerContext context; private String zkUserPath; - private ZooCache zooCache; @Override public void initialize(ServerContext context) { this.context = context; - zooCache = new ZooCache(context.getZooSession()); zkUserPath = context.zkUserPath(); } @@ -58,18 +55,16 @@ public void initializeSecurity(String principal, byte[] token) { try { // remove old settings from zookeeper first, if any ZooReaderWriter zoo = context.getZooSession().asReaderWriter(); - synchronized (zooCache) { - zooCache.clear(); - if (zoo.exists(zkUserPath)) { - zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); - log.info("Removed {}/ from zookeeper", zkUserPath); - } + context.getZooCache().clear((path) -> path.startsWith(zkUserPath)); + if (zoo.exists(zkUserPath)) { + zoo.recursiveDelete(zkUserPath, NodeMissingPolicy.SKIP); + log.info("Removed {}/ from zookeeper", zkUserPath); + } - // prep parent node of users with root username - zoo.putPersistentData(zkUserPath, principal.getBytes(UTF_8), NodeExistsPolicy.FAIL); + // prep parent node of users with root username + zoo.putPersistentData(zkUserPath, principal.getBytes(UTF_8), NodeExistsPolicy.FAIL); - constructUser(principal, ZKSecurityTool.createPass(token)); - } + constructUser(principal, ZKSecurityTool.createPass(token)); } catch (KeeperException | AccumuloException | InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -82,16 +77,15 @@ public void initializeSecurity(String principal, byte[] token) { */ private void constructUser(String user, byte[] pass) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - ZooReaderWriter zoo = context.getZooSession().asReaderWriter(); - zoo.putPrivatePersistentData(zkUserPath + "/" + user, pass, NodeExistsPolicy.FAIL); - } + String userPath = zkUserPath + "/" + user; + context.getZooCache().clear((path) -> path.startsWith(userPath)); + context.getZooSession().asReaderWriter().putPrivatePersistentData(userPath, pass, + NodeExistsPolicy.FAIL); } @Override public Set listUsers() { - return new TreeSet<>(zooCache.getChildren(zkUserPath)); + return new TreeSet<>(context.getZooCache().getChildren(zkUserPath)); } @Override @@ -120,11 +114,9 @@ public void createUser(String principal, AuthenticationToken token) @Override public void dropUser(String user) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - context.getZooSession().asReaderWriter().recursiveDelete(zkUserPath + "/" + user, - NodeMissingPolicy.FAIL); - } + String userPath = zkUserPath + "/" + user; + context.getZooCache().clear((path) -> path.startsWith(userPath)); + context.getZooSession().asReaderWriter().recursiveDelete(userPath, NodeMissingPolicy.FAIL); } catch (InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -146,12 +138,10 @@ public void changePassword(String principal, AuthenticationToken token) PasswordToken pt = (PasswordToken) token; if (userExists(principal)) { try { - synchronized (zooCache) { - zooCache.clear(zkUserPath + "/" + principal); - context.getZooSession().asReaderWriter().putPrivatePersistentData( - zkUserPath + "/" + principal, ZKSecurityTool.createPass(pt.getPassword()), - NodeExistsPolicy.OVERWRITE); - } + String userPath = zkUserPath + "/" + principal; + context.getZooCache().clear(userPath); + context.getZooSession().asReaderWriter().putPrivatePersistentData(userPath, + ZKSecurityTool.createPass(pt.getPassword()), NodeExistsPolicy.OVERWRITE); } catch (KeeperException e) { log.error("{}", e.getMessage(), e); throw new AccumuloSecurityException(principal, SecurityErrorCode.CONNECTION_ERROR, e); @@ -170,7 +160,7 @@ public void changePassword(String principal, AuthenticationToken token) @Override public boolean userExists(String user) { - return zooCache.get(zkUserPath + "/" + user) != null; + return context.getZooCache().get(zkUserPath + "/" + user) != null; } @Override @@ -187,11 +177,11 @@ public boolean authenticateUser(String principal, AuthenticationToken token) PasswordToken pt = (PasswordToken) token; byte[] zkData; String zpath = zkUserPath + "/" + principal; - zkData = zooCache.get(zpath); + zkData = context.getZooCache().get(zpath); boolean result = authenticateUser(principal, pt, zkData); if (!result) { - zooCache.clear(zpath); - zkData = zooCache.get(zpath); + context.getZooCache().clear(zpath); + zkData = context.getZooCache().get(zpath); result = authenticateUser(principal, pt, zkData); } return result; diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java index 5d18ac46253..a63eee6e827 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKAuthorizor.java @@ -25,7 +25,6 @@ import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -44,18 +43,16 @@ public class ZKAuthorizor implements Authorizor { private ServerContext context; private String zkUserPath; - private ZooCache zooCache; @Override public void initialize(ServerContext context) { this.context = context; - zooCache = new ZooCache(context.getZooSession()); zkUserPath = context.zkUserPath(); } @Override public Authorizations getCachedUserAuthorizations(String user) { - byte[] authsBytes = zooCache.get(zkUserPath + "/" + user + ZKUserAuths); + byte[] authsBytes = context.getZooCache().get(zkUserPath + "/" + user + ZKUserAuths); if (authsBytes != null) { return ZKSecurityTool.convertAuthorizations(authsBytes); } @@ -105,11 +102,9 @@ public void initUser(String user) throws AccumuloSecurityException { @Override public void dropUser(String user) throws AccumuloSecurityException { try { - synchronized (zooCache) { - ZooReaderWriter zoo = context.getZooSession().asReaderWriter(); - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserAuths, NodeMissingPolicy.SKIP); - zooCache.clear(zkUserPath + "/" + user); - } + context.getZooSession().asReaderWriter() + .recursiveDelete(zkUserPath + "/" + user + ZKUserAuths, NodeMissingPolicy.SKIP); + context.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + user)); } catch (InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -127,12 +122,10 @@ public void dropUser(String user) throws AccumuloSecurityException { public void changeAuthorizations(String user, Authorizations authorizations) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - context.getZooSession().asReaderWriter().putPersistentData( - zkUserPath + "/" + user + ZKUserAuths, - ZKSecurityTool.convertAuthorizations(authorizations), NodeExistsPolicy.OVERWRITE); - } + String userAuths = zkUserPath + "/" + user + ZKUserAuths; + context.getZooCache().clear(userAuths); + context.getZooSession().asReaderWriter().putPersistentData(userAuths, + ZKSecurityTool.convertAuthorizations(authorizations), NodeExistsPolicy.OVERWRITE); } catch (KeeperException e) { log.error("{}", e.getMessage(), e); throw new AccumuloSecurityException(user, SecurityErrorCode.CONNECTION_ERROR, e); diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java index 1cc933c347f..4318b9448b9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/handler/ZKPermHandler.java @@ -35,7 +35,6 @@ import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -53,18 +52,18 @@ public class ZKPermHandler implements PermissionHandler { private static final Logger log = LoggerFactory.getLogger(ZKPermHandler.class); + private ServerContext ctx; private ZooReaderWriter zoo; private String zkUserPath; private String ZKTablePath; private String ZKNamespacePath; - private ZooCache zooCache; private final String ZKUserSysPerms = "/System"; private final String ZKUserTablePerms = "/Tables"; private final String ZKUserNamespacePerms = "/Namespaces"; @Override public void initialize(ServerContext context) { - zooCache = new ZooCache(context.getZooSession()); + this.ctx = context; zoo = context.getZooSession().asReaderWriter(); zkUserPath = context.zkUserPath(); ZKTablePath = context.getZooKeeperRoot() + Constants.ZTABLES; @@ -113,7 +112,8 @@ public boolean hasTablePermission(String user, String table, TablePermission per @Override public boolean hasCachedTablePermission(String user, String table, TablePermission permission) { - byte[] serializedPerms = zooCache.get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + byte[] serializedPerms = + ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); if (serializedPerms != null) { return ZKSecurityTool.convertTablePermissions(serializedPerms).contains(permission); } @@ -165,7 +165,7 @@ public boolean hasNamespacePermission(String user, String namespace, public boolean hasCachedNamespacePermission(String user, String namespace, NamespacePermission permission) { byte[] serializedPerms = - zooCache.get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); if (serializedPerms != null) { return ZKSecurityTool.convertNamespacePermissions(serializedPerms).contains(permission); } @@ -175,8 +175,9 @@ public boolean hasCachedNamespacePermission(String user, String namespace, @Override public void grantSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException { + final String sysPermPath = zkUserPath + "/" + user + ZKUserSysPerms; try { - byte[] permBytes = zooCache.get(zkUserPath + "/" + user + ZKUserSysPerms); + byte[] permBytes = ctx.getZooCache().get(sysPermPath); Set perms; if (permBytes == null) { perms = new TreeSet<>(); @@ -185,11 +186,9 @@ public void grantSystemPermission(String user, SystemPermission permission) } if (perms.add(permission)) { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserSysPerms, - ZKSecurityTool.convertSystemPermissions(perms), NodeExistsPolicy.OVERWRITE); - } + ctx.getZooCache().clear(sysPermPath); + zoo.putPersistentData(sysPermPath, ZKSecurityTool.convertSystemPermissions(perms), + NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -204,7 +203,8 @@ public void grantSystemPermission(String user, SystemPermission permission) public void grantTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException { Set tablePerms; - byte[] serializedPerms = zooCache.get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + final String tablePermPath = zkUserPath + "/" + user + ZKUserTablePerms + "/" + table; + byte[] serializedPerms = ctx.getZooCache().get(tablePermPath); if (serializedPerms != null) { tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms); } else { @@ -213,11 +213,9 @@ public void grantTablePermission(String user, String table, TablePermission perm try { if (tablePerms.add(permission)) { - synchronized (zooCache) { - zooCache.clear(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - ZKSecurityTool.convertTablePermissions(tablePerms), NodeExistsPolicy.OVERWRITE); - } + ctx.getZooCache().clear(tablePermPath); + zoo.putPersistentData(tablePermPath, ZKSecurityTool.convertTablePermissions(tablePerms), + NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -231,9 +229,9 @@ public void grantTablePermission(String user, String table, TablePermission perm @Override public void grantNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException { + final String nsPermPath = zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace; Set namespacePerms; - byte[] serializedPerms = - zooCache.get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + byte[] serializedPerms = ctx.getZooCache().get(nsPermPath); if (serializedPerms != null) { namespacePerms = ZKSecurityTool.convertNamespacePermissions(serializedPerms); } else { @@ -242,12 +240,9 @@ public void grantNamespacePermission(String user, String namespace, try { if (namespacePerms.add(permission)) { - synchronized (zooCache) { - zooCache.clear(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, - ZKSecurityTool.convertNamespacePermissions(namespacePerms), - NodeExistsPolicy.OVERWRITE); - } + ctx.getZooCache().clear(nsPermPath); + zoo.putPersistentData(nsPermPath, + ZKSecurityTool.convertNamespacePermissions(namespacePerms), NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -261,7 +256,8 @@ public void grantNamespacePermission(String user, String namespace, @Override public void revokeSystemPermission(String user, SystemPermission permission) throws AccumuloSecurityException { - byte[] sysPermBytes = zooCache.get(zkUserPath + "/" + user + ZKUserSysPerms); + final String sysPermPath = zkUserPath + "/" + user + ZKUserSysPerms; + byte[] sysPermBytes = ctx.getZooCache().get(sysPermPath); // User had no system permission, nothing to revoke. if (sysPermBytes == null) { @@ -272,11 +268,9 @@ public void revokeSystemPermission(String user, SystemPermission permission) try { if (sysPerms.remove(permission)) { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserSysPerms, - ZKSecurityTool.convertSystemPermissions(sysPerms), NodeExistsPolicy.OVERWRITE); - } + ctx.getZooCache().clear((path) -> path.startsWith(sysPermPath)); + zoo.putPersistentData(sysPermPath, ZKSecurityTool.convertSystemPermissions(sysPerms), + NodeExistsPolicy.OVERWRITE); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -290,7 +284,8 @@ public void revokeSystemPermission(String user, SystemPermission permission) @Override public void revokeTablePermission(String user, String table, TablePermission permission) throws AccumuloSecurityException { - byte[] serializedPerms = zooCache.get(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table); + final String tablePermPath = zkUserPath + "/" + user + ZKUserTablePerms + "/" + table; + byte[] serializedPerms = ctx.getZooCache().get(tablePermPath); // User had no table permission, nothing to revoke. if (serializedPerms == null) { @@ -300,13 +295,12 @@ public void revokeTablePermission(String user, String table, TablePermission per Set tablePerms = ZKSecurityTool.convertTablePermissions(serializedPerms); try { if (tablePerms.remove(permission)) { - zooCache.clear(); + ctx.getZooCache().clear((path) -> path.startsWith(tablePermPath)); if (tablePerms.isEmpty()) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - NodeMissingPolicy.SKIP); + zoo.recursiveDelete(tablePermPath, NodeMissingPolicy.SKIP); } else { - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - ZKSecurityTool.convertTablePermissions(tablePerms), NodeExistsPolicy.OVERWRITE); + zoo.putPersistentData(tablePermPath, ZKSecurityTool.convertTablePermissions(tablePerms), + NodeExistsPolicy.OVERWRITE); } } } catch (KeeperException e) { @@ -321,8 +315,8 @@ public void revokeTablePermission(String user, String table, TablePermission per @Override public void revokeNamespacePermission(String user, String namespace, NamespacePermission permission) throws AccumuloSecurityException { - byte[] serializedPerms = - zooCache.get(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace); + final String nsPermPath = zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace; + byte[] serializedPerms = ctx.getZooCache().get(nsPermPath); // User had no namespace permission, nothing to revoke. if (serializedPerms == null) { @@ -333,12 +327,11 @@ public void revokeNamespacePermission(String user, String namespace, ZKSecurityTool.convertNamespacePermissions(serializedPerms); try { if (namespacePerms.remove(permission)) { - zooCache.clear(); + ctx.getZooCache().clear((path) -> path.startsWith(nsPermPath)); if (namespacePerms.isEmpty()) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, - NodeMissingPolicy.SKIP); + zoo.recursiveDelete(nsPermPath, NodeMissingPolicy.SKIP); } else { - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, + zoo.putPersistentData(nsPermPath, ZKSecurityTool.convertNamespacePermissions(namespacePerms), NodeExistsPolicy.OVERWRITE); } @@ -355,12 +348,10 @@ public void revokeNamespacePermission(String user, String namespace, @Override public void cleanTablePermissions(String table) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - for (String user : zooCache.getChildren(zkUserPath)) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - NodeMissingPolicy.SKIP); - } + for (String user : ctx.getZooCache().getChildren(zkUserPath)) { + final String tablePermPath = zkUserPath + "/" + user + ZKUserTablePerms + "/" + table; + ctx.getZooCache().clear((path) -> path.startsWith(tablePermPath)); + zoo.recursiveDelete(tablePermPath, NodeMissingPolicy.SKIP); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -374,12 +365,10 @@ public void cleanTablePermissions(String table) throws AccumuloSecurityException @Override public void cleanNamespacePermissions(String namespace) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zooCache.clear(); - for (String user : zooCache.getChildren(zkUserPath)) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, - NodeMissingPolicy.SKIP); - } + for (String user : ctx.getZooCache().getChildren(zkUserPath)) { + final String nsPermPath = zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace; + ctx.getZooCache().clear((path) -> path.startsWith(nsPermPath)); + zoo.recursiveDelete(nsPermPath, NodeMissingPolicy.SKIP); } } catch (KeeperException e) { log.error("{}", e.getMessage(), e); @@ -455,11 +444,10 @@ public void initUser(String user) throws AccumuloSecurityException { */ private void createTablePerm(String user, TableId table, Set perms) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserTablePerms + "/" + table, - ZKSecurityTool.convertTablePermissions(perms), NodeExistsPolicy.FAIL); - } + final String tablePermPath = zkUserPath + "/" + user + ZKUserTablePerms + "/" + table; + ctx.getZooCache().clear((path) -> path.startsWith(tablePermPath)); + zoo.putPersistentData(tablePermPath, ZKSecurityTool.convertTablePermissions(perms), + NodeExistsPolicy.FAIL); } /** @@ -468,22 +456,19 @@ private void createTablePerm(String user, TableId table, Set pe */ private void createNamespacePerm(String user, NamespaceId namespace, Set perms) throws KeeperException, InterruptedException { - synchronized (zooCache) { - zooCache.clear(); - zoo.putPersistentData(zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace, - ZKSecurityTool.convertNamespacePermissions(perms), NodeExistsPolicy.FAIL); - } + final String nsPermPath = zkUserPath + "/" + user + ZKUserNamespacePerms + "/" + namespace; + ctx.getZooCache().clear((path) -> path.startsWith(nsPermPath)); + zoo.putPersistentData(nsPermPath, ZKSecurityTool.convertNamespacePermissions(perms), + NodeExistsPolicy.FAIL); } @Override public void cleanUser(String user) throws AccumuloSecurityException { try { - synchronized (zooCache) { - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP); - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP); - zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms, NodeMissingPolicy.SKIP); - zooCache.clear(zkUserPath + "/" + user); - } + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserSysPerms, NodeMissingPolicy.SKIP); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserTablePerms, NodeMissingPolicy.SKIP); + zoo.recursiveDelete(zkUserPath + "/" + user + ZKUserNamespacePerms, NodeMissingPolicy.SKIP); + ctx.getZooCache().clear((path) -> path.startsWith(zkUserPath + "/" + user)); } catch (InterruptedException e) { log.error("{}", e.getMessage(), e); throw new IllegalStateException(e); @@ -523,7 +508,7 @@ public boolean hasSystemPermission(String user, SystemPermission permission) { @Override public boolean hasCachedSystemPermission(String user, SystemPermission permission) { - byte[] perms = zooCache.get(zkUserPath + "/" + user + ZKUserSysPerms); + byte[] perms = ctx.getZooCache().get(zkUserPath + "/" + user + ZKUserSysPerms); if (perms == null) { return false; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java index 3bd0f091deb..7b14422e8c0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tables/TableManager.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.accumulo.core.Constants; @@ -122,7 +123,8 @@ public TableManager(ServerContext context) { zkRoot = context.getZooKeeperRoot(); instanceID = context.getInstanceID(); zoo = context.getZooSession().asReaderWriter(); - zooStateCache = new ZooCache(context.getZooSession(), new TableStateWatcher()); + zooStateCache = + new ZooCache(context.getZooSession(), Optional.of(new TableStateWatcher()), zkRoot); updateTableStateCache(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java index e65a5e02a61..26861198f01 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java @@ -92,7 +92,7 @@ static synchronized void listInstances(String keepers, boolean printAll, boolean try (var zk = new ZooSession(ListInstances.class.getSimpleName(), keepers, ZOOKEEPER_TIMER_MILLIS, null)) { ZooReader rdr = zk.asReader(); - ZooCache cache = new ZooCache(zk); + ZooCache cache = new ZooCache(zk, Optional.empty(), Constants.ZROOT); TreeMap instanceNames = getInstanceNames(rdr, printErrors); diff --git a/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java b/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java index a3f91a7cf82..4e16408cffe 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/security/handler/ZKAuthenticatorTest.java @@ -39,6 +39,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.InstanceId; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.SystemPermission; @@ -144,14 +145,16 @@ public void testUserAuthentication() throws Exception { var instanceId = InstanceId.of("example"); ZooSession zk = createMock(ZooSession.class); ServerContext context = MockServerContext.getWithMockZK(zk); + ZooCache zc = createMock(ZooCache.class); expect(context.zkUserPath()).andReturn(ZooUtil.getRoot(instanceId) + Constants.ZUSERS) .anyTimes(); expect(zk.getChildren(anyObject(), anyObject())).andReturn(Arrays.asList(principal)).anyTimes(); expect(zk.exists(matches(ZooUtil.getRoot(instanceId) + Constants.ZUSERS + "/" + principal), anyObject(Watcher.class))).andReturn(new Stat()).anyTimes(); - expect(zk.getData(matches(ZooUtil.getRoot(instanceId) + Constants.ZUSERS + "/" + principal), - anyObject(), anyObject())).andReturn(newHash).once(); - replay(context, zk); + expect(context.getZooCache()).andReturn(zc).anyTimes(); + expect(zc.get(matches(ZooUtil.getRoot(instanceId) + Constants.ZUSERS + "/" + principal))) + .andReturn(newHash); + replay(context, zk, zc); // creating authenticator ZKAuthenticator auth = new ZKAuthenticator(); @@ -160,6 +163,6 @@ public void testUserAuthentication() throws Exception { PasswordToken token = new PasswordToken(rawPass.clone()); // verifying that if the new type of hash is stored in zk authentication works as expected assertTrue(auth.authenticateUser(principal, token)); - verify(context, zk); + verify(context, zk, zc); } } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java index 7a4d5db9c37..d2132762313 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java @@ -37,7 +37,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.cache.Caches.CacheName; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -65,7 +64,6 @@ public class RecoveryManager { private final Cache existenceCache; private final ScheduledExecutorService executor; private final Manager manager; - private final ZooCache zooCache; public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) { this.manager = manager; @@ -76,7 +74,6 @@ public RecoveryManager(Manager manager, long timeToCacheExistsInMillis) { executor = ThreadPools.getServerThreadPools().createScheduledExecutorService(4, "Walog sort starter"); - zooCache = new ZooCache(manager.getContext().getZooSession()); try { List workIDs = new DistributedWorkQueue(manager.getContext().getZooKeeperRoot() + Constants.ZRECOVERY, @@ -182,7 +179,7 @@ public boolean recoverLogs(KeyExtent extent, Collection walogs) throws sortQueued = sortsQueued.contains(sortId); } - if (sortQueued && zooCache.get( + if (sortQueued && this.manager.getContext().getZooCache().get( manager.getContext().getZooKeeperRoot() + Constants.ZRECOVERY + "/" + sortId) == null) { synchronized (this) { sortsQueued.remove(sortId); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 8a409d566b6..eb96286670d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -65,7 +65,6 @@ import org.apache.accumulo.core.dataImpl.thrift.TColumn; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; @@ -204,8 +203,6 @@ private TabletMetadataLoader(Ample ample) { private ScanServerMetrics scanServerMetrics; private BlockCacheMetrics blockCacheMetrics; - private final ZooCache managerLockCache; - public ScanServer(ConfigOpts opts, String[] args) { super("sserver", opts, ServerContext::new, args); @@ -216,8 +213,6 @@ public ScanServer(ConfigOpts opts, String[] args) { this.resourceManager = new TabletServerResourceManager(context, this); - this.managerLockCache = new ZooCache(context.getZooSession()); - var readWriteLock = new ReentrantReadWriteLock(); reservationsReadLock = readWriteLock.readLock(); reservationsWriteLock = readWriteLock.writeLock(); @@ -1107,11 +1102,6 @@ public ServiceLock getLock() { return scanServerLock; } - @Override - public ZooCache getManagerLockCache() { - return managerLockCache; - } - @Override public BlockCacheConfiguration getBlockCacheConfiguration(AccumuloConfiguration acuConf) { return BlockCacheConfiguration.forScanServer(acuConf); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 388bf514091..d45978a9caf 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -70,6 +70,7 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockPaths; import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.manager.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.AccumuloTable; @@ -970,11 +971,12 @@ static void checkPermission(SecurityOperation security, ServerContext context, new ZooUtil.LockID(context.getServerPaths().createManagerPath().toString(), lock); try { - if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) { + if (!ServiceLock.isLockHeld(server.getContext().getZooCache(), lid)) { // maybe the cache is out of date and a new manager holds the // lock? - server.getManagerLockCache().clear(); - if (!ServiceLock.isLockHeld(server.getManagerLockCache(), lid)) { + server.getContext().getZooCache().clear( + (path) -> path.equals(ServiceLockPaths.parse(Optional.empty(), lid.path).toString())); + if (!ServiceLock.isLockHeld(server.getContext().getZooCache(), lid)) { log.warn("Got {} message from a manager that does not hold the current lock {}", request, lock); throw new RuntimeException("bad manager lock"); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java index 8e5047bb40d..9760341a995 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletHostingServer.java @@ -20,7 +20,6 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.scan.ScanServerInfo; @@ -58,7 +57,5 @@ public interface TabletHostingServer { ServiceLock getLock(); - ZooCache getManagerLockCache(); - BlockCacheManager.Configuration getBlockCacheConfiguration(AccumuloConfiguration acuConf); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index aea44fcdafc..9a63bc44e22 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -75,7 +75,6 @@ import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration; @@ -161,8 +160,6 @@ public class TabletServer extends AbstractServer implements TabletHostingServer private static final Logger log = LoggerFactory.getLogger(TabletServer.class); private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = TimeUnit.HOURS.toMillis(1); - final ZooCache managerLockCache; - final TabletServerLogger logger; private TabletServerMetrics metrics; @@ -233,7 +230,6 @@ protected TabletServer(ConfigOpts opts, Function serverContextFactory, String[] args) { super("tserver", opts, serverContextFactory, args); context = super.getContext(); - this.managerLockCache = new ZooCache(context.getZooSession()); final AccumuloConfiguration aconf = getConfiguration(); log.info("Version " + Constants.VERSION); log.info("Instance " + getInstanceID()); @@ -483,11 +479,6 @@ public ServiceLock getLock() { return tabletServerLock; } - @Override - public ZooCache getManagerLockCache() { - return managerLockCache; - } - private void announceExistence() { final ZooReaderWriter zoo = getContext().getZooSession().asReaderWriter(); try { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java index 4d554c5c3e7..cd0639d371c 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CacheTestReader.java @@ -25,6 +25,7 @@ import java.io.ObjectOutputStream; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; import java.util.UUID; @@ -47,7 +48,7 @@ public static void main(String[] args) throws Exception { myfile.deleteOnExit(); try (var zk = new ZooSession(CacheTestReader.class.getSimpleName(), keepers, 30_000, null)) { - ZooCache zc = new ZooCache(zk); + ZooCache zc = new ZooCache(zk, Optional.empty(), "/"); while (true) { if (myfile.exists() && !myfile.delete()) { diff --git a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java index ef15a309598..d969b9ae64e 100644 --- a/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java +++ b/test/src/main/java/org/apache/accumulo/test/zookeeper/ZooCacheIT.java @@ -25,13 +25,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; -import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.UUID; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZooCacheWatcher; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -44,21 +45,48 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import com.github.benmanes.caffeine.cache.Ticker; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + @Tag(ZOOKEEPER_TESTING_SERVER) public class ZooCacheIT { + public static class ZooCacheTicker implements Ticker { + + private int advanceCounter = 0; + + @Override + public long read() { + return System.nanoTime() + (advanceCounter * ZooCache.CACHE_DURATION.toNanos()); + } + + public void advance() { + advanceCounter++; + } + + public void reset() { + advanceCounter = 0; + } + + } + private ZooKeeperTestingServer szk; private ZooSession zk; private ZooReaderWriter zrw; + private ZooCacheTicker ticker = new ZooCacheTicker(); @TempDir private File tempDir; @BeforeEach + @SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", + justification = "setting ticker in test for eviction test") public void setup() throws Exception { szk = new ZooKeeperTestingServer(tempDir); zk = szk.newClient(); zrw = zk.asReaderWriter(); + ZooCache.ticker = ticker; } @AfterEach @@ -80,125 +108,132 @@ public void testGetChildren() throws Exception { watchesRemoved.add(event.getPath()); } }; - ZooCache zooCache = new ZooCache(zk, Optional.of(watcher), Duration.ofSeconds(3)); + final String root = Constants.ZROOT + UUID.randomUUID().toString(); + ZooCache zooCache = new ZooCache(zk, Optional.of(watcher), root); + final String base = root + Constants.ZTSERVERS; - zrw.mkdirs("/test2"); - zrw.mkdirs("/test3/c1"); - zrw.mkdirs("/test3/c2"); + zrw.mkdirs(base + "/test2"); + zrw.mkdirs(base + "/test3/c1"); + zrw.mkdirs(base + "/test3/c2"); // cache non-existence of /test1 and existence of /test2 and /test3 long uc1 = zooCache.getUpdateCount(); - assertNull(zooCache.getChildren("/test1")); + assertNull(zooCache.getChildren(base + "/test1")); long uc2 = zooCache.getUpdateCount(); assertTrue(uc1 < uc2); - assertEquals(List.of(), zooCache.getChildren("/test2")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); long uc3 = zooCache.getUpdateCount(); assertTrue(uc2 < uc3); - assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren("/test3"))); + assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3"))); long uc4 = zooCache.getUpdateCount(); assertTrue(uc3 < uc4); // The cache should be stable now and new accesses should not change the update count - assertNull(zooCache.getChildren("/test1")); + assertNull(zooCache.getChildren(base + "/test1")); // once getChildren discovers that a node does not exists, then get data will also know this - assertNull(zooCache.get("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren("/test3"))); + assertNull(zooCache.get(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc4, zooCache.getUpdateCount()); // Had cached non-existence of "/test1", should get a notification that it was created - zrw.mkdirs("/test1"); + zrw.mkdirs(base + "/test1"); Wait.waitFor(() -> { - var children = zooCache.getChildren("/test1"); + var children = zooCache.getChildren(base + "/test1"); return children != null && children.isEmpty(); }); long uc5 = zooCache.getUpdateCount(); assertTrue(uc4 < uc5); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren("/test3"))); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc5, zooCache.getUpdateCount()); + long uc5b = zooCache.getUpdateCount(); + assertTrue(uc5 < uc5b); // add a child to /test3, should get a notification of the change - zrw.mkdirs("/test3/c3"); + zrw.mkdirs(base + "/test3/c3"); Wait.waitFor(() -> { - var children = zooCache.getChildren("/test3"); + var children = zooCache.getChildren(base + "/test3"); return children != null && children.size() == 3; }); long uc6 = zooCache.getUpdateCount(); - assertTrue(uc5 < uc6); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c2", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); + assertTrue(uc5b < uc6); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c2", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc6, zooCache.getUpdateCount()); // remove a child from /test3 - zrw.delete("/test3/c2"); + zrw.delete(base + "/test3/c2"); Wait.waitFor(() -> { - var children = zooCache.getChildren("/test3"); + var children = zooCache.getChildren(base + "/test3"); return children != null && children.size() == 2; }); long uc7 = zooCache.getUpdateCount(); assertTrue(uc6 < uc7); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc7, zooCache.getUpdateCount()); // remove /test2, should start caching that it does not exist - zrw.delete("/test2"); - Wait.waitFor(() -> zooCache.getChildren("/test2") == null); + zrw.delete(base + "/test2"); + Wait.waitFor(() -> zooCache.getChildren(base + "/test2") == null); long uc8 = zooCache.getUpdateCount(); assertTrue(uc7 < uc8); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertNull(zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); - assertEquals(uc8, zooCache.getUpdateCount()); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertNull(zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); + long uc8b = zooCache.getUpdateCount(); + assertTrue(uc8 < uc8b); // add /test2 back, should update - zrw.mkdirs("/test2"); - Wait.waitFor(() -> zooCache.getChildren("/test2") != null); + zrw.mkdirs(base + "/test2"); + Wait.waitFor(() -> zooCache.getChildren(base + "/test2") != null); long uc9 = zooCache.getUpdateCount(); assertTrue(uc8 < uc9); - assertEquals(List.of(), zooCache.getChildren("/test1")); - assertEquals(List.of(), zooCache.getChildren("/test2")); - assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren("/test3"))); - assertEquals(uc9, zooCache.getUpdateCount()); + assertEquals(List.of(), zooCache.getChildren(base + "/test1")); + assertEquals(List.of(), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c1", "c3"), Set.copyOf(zooCache.getChildren(base + "/test3"))); + long uc9b = zooCache.getUpdateCount(); + assertTrue(uc9 < uc9b); // make multiple changes. the cache should see all of these - zrw.delete("/test1"); - zrw.mkdirs("/test2/ca"); - zrw.delete("/test3/c1"); - zrw.mkdirs("/test3/c4"); - zrw.delete("/test3/c4"); - zrw.mkdirs("/test3/c5"); + zrw.delete(base + "/test1"); + zrw.mkdirs(base + "/test2/ca"); + zrw.delete(base + "/test3/c1"); + zrw.mkdirs(base + "/test3/c4"); + zrw.delete(base + "/test3/c4"); + zrw.mkdirs(base + "/test3/c5"); Wait.waitFor(() -> { - var children1 = zooCache.getChildren("/test1"); - var children2 = zooCache.getChildren("/test2"); - var children3 = zooCache.getChildren("/test3"); + var children1 = zooCache.getChildren(base + "/test1"); + var children2 = zooCache.getChildren(base + "/test2"); + var children3 = zooCache.getChildren(base + "/test3"); return children1 == null && children2 != null && children2.size() == 1 && children3 != null && Set.copyOf(children3).equals(Set.of("c3", "c5")); }); long uc10 = zooCache.getUpdateCount(); - assertTrue(uc9 < uc10); - assertNull(zooCache.getChildren("/test1")); - assertEquals(List.of("ca"), zooCache.getChildren("/test2")); - assertEquals(Set.of("c3", "c5"), Set.copyOf(zooCache.getChildren("/test3"))); + assertTrue(uc9b < uc10); + assertNull(zooCache.getChildren(base + "/test1")); + assertEquals(List.of("ca"), zooCache.getChildren(base + "/test2")); + assertEquals(Set.of("c3", "c5"), Set.copyOf(zooCache.getChildren(base + "/test3"))); assertEquals(uc10, zooCache.getUpdateCount()); // wait for the cache to evict and clear watches + ticker.advance(); Wait.waitFor(() -> { // the cache will not run its eviction handler unless accessed, so access something that is // not expected to be evicted - zooCache.getChildren("/test4"); + zooCache.getChildren(base + "/test4"); return watchesRemoved.equals(Set.of("/test1", "/test2", "/test3")); }); - assertFalse(zooCache.childrenCached("/test1")); - assertFalse(zooCache.childrenCached("/test2")); - assertFalse(zooCache.childrenCached("/test3")); + assertFalse(zooCache.childrenCached(base + "/test1")); + assertFalse(zooCache.childrenCached(base + "/test2")); + assertFalse(zooCache.childrenCached(base + "/test3")); } }