Skip to content

Commit

Permalink
Reused ZooCache where possible, used persistent recursive watchers
Browse files Browse the repository at this point in the history
Reused ZooCache from client / server context where possible. Removed
overloaded ZooCache constructor to make it easier to find where new
instances are constructed. Removed watcher that was being placed in
each call to the underlying ZooKeeper in favor of long-lived persistent
recursive watchers set on specific paths which will fire when any
child under that path is modified.

Related to apache#5134
Closes apache#5154, apache#5157
  • Loading branch information
dlmarion committed Jan 14, 2025
1 parent 862b6e5 commit 8657cc3
Show file tree
Hide file tree
Showing 19 changed files with 378 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
return zk;
});

this.zooCache = memoize(() -> new ZooCache(getZooSession()));
this.zooCache = memoize(
() -> new ZooCache(getZooSession(), Optional.empty(), ZooUtil.getRoot(getInstanceID())));
this.accumuloConf = serverConf;
timeoutSupplier = memoizeWithExpiration(
() -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS);
Expand Down Expand Up @@ -1061,8 +1062,9 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() {
// because that client could be closed, and its ZooSession also closed
// this needs to be fixed; TODO https://github.com/apache/accumulo/issues/2301
var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get();
this.zkLockChecker =
new ZookeeperLockChecker(new ZooCache(zk), getZooKeeperRoot(), getServerPaths());
this.zkLockChecker = new ZookeeperLockChecker(
new ZooCache(zk, Optional.empty(), ZooUtil.getRoot(getInstanceID())), getZooKeeperRoot(),
getServerPaths());
}
return this.zkLockChecker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,18 @@
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.util.cache.Caches;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.zookeeper.KeeperException;
Expand All @@ -46,10 +49,12 @@
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Ticker;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

/**
* A cache for values stored in ZooKeeper. Values are kept up to date as they change.
*/
Expand All @@ -59,16 +64,38 @@ public interface ZooCacheWatcher extends Consumer<WatchedEvent> {}

private static final Logger log = LoggerFactory.getLogger(ZooCache.class);

private final ZCacheWatcher watcher = new ZCacheWatcher();
protected static final String[] ALLOWED_PATHS = new String[] {Constants.ZCOMPACTORS,
Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK,
Constants.ZMONITOR_LOCK, Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS,
Constants.ZTABLES, Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET};

protected final TreeSet<String> watchedPaths = new TreeSet<>();
// visible for tests
protected final ZCacheWatcher watcher = new ZCacheWatcher();
private final Optional<ZooCacheWatcher> externalWatcher;

private static final AtomicLong nextCacheId = new AtomicLong(0);
private final String cacheId = "ZC" + nextCacheId.incrementAndGet();

public static final Duration CACHE_DURATION = Duration.ofMinutes(30);

// public and non-final because this is being set
// in tests to test the eviction
@SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL",
justification = "being set in tests for eviction test")
public static Ticker ticker = Ticker.systemTicker();

// Construct this here, otherwise end up with NPE in some cases
// when the Watcher tries to access nodeCache. Alternative would
// be to mark nodeCache as volatile.
private final Cache<String,ZcNode> cache =
Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false).ticker(ticker)
.expireAfterAccess(CACHE_DURATION).build();

// The concurrent map returned by Caffiene will only allow one thread to run at a time for a given
// key and ZooCache relies on that. Not all concurrent map implementations have this behavior for
// their compute functions.
private final ConcurrentMap<String,ZcNode> nodeCache;
private final ConcurrentMap<String,ZcNode> nodeCache = cache.asMap();

private final ZooSession zk;

Expand Down Expand Up @@ -106,11 +133,11 @@ public long getMzxid() {

private final AtomicLong updateCount = new AtomicLong(0);

private class ZCacheWatcher implements Watcher {
class ZCacheWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (log.isTraceEnabled()) {
log.trace("{}: {}", cacheId, event);
log.trace("{} {}: {}", cacheId, event.getType(), event);
}

switch (event.getType()) {
Expand All @@ -120,7 +147,14 @@ public void process(WatchedEvent event) {
case NodeDeleted:
case ChildWatchRemoved:
case DataWatchRemoved:
remove(event.getPath());
// This code use to call remove(path), but that was when a Watcher was set
// on each node. With the Watcher being set at a higher level we need to remove
// the parent of the affected node and all of its children from the cache
// so that the parent and children node can be re-cached. If we only remove the
// affected node, then the cached children in the parent could be incorrect.
int lastSlash = event.getPath().lastIndexOf('/');
String parent = lastSlash == 0 ? "/" : event.getPath().substring(0, lastSlash);
clear((path) -> path.startsWith(parent));
break;
case None:
switch (event.getState()) {
Expand Down Expand Up @@ -156,49 +190,43 @@ public void process(WatchedEvent event) {
}
}

/**
* Creates a new cache without an external watcher.
*
* @param zk the ZooKeeper instance
* @throws NullPointerException if zk is {@code null}
*/
public ZooCache(ZooSession zk) {
this(zk, Optional.empty(), Duration.ofMinutes(3));
public ZooCache(ZooSession zk, Optional<ZooCacheWatcher> watcher, String root) {
this.zk = requireNonNull(zk);
this.externalWatcher = watcher;
setupWatchers(requireNonNull(root));
log.trace("{} created new cache", cacheId, new Exception());
}

/**
* Creates a new cache. The given watcher is called whenever a watched node changes.
*
* @param zk the ZooKeeper instance
* @param watcher watcher object
* @throws NullPointerException if zk or watcher is {@code null}
*/
public ZooCache(ZooSession zk, ZooCacheWatcher watcher) {
this(zk, Optional.of(watcher), Duration.ofMinutes(3));
// Visible for testing
protected void setupWatchers(String zooRoot) {
try {
for (String path : ALLOWED_PATHS) {
final String zPath = zooRoot + path;
watchedPaths.add(zPath);
zk.addPersistentRecursiveWatcher(zPath, this.watcher);
log.trace("Added persistent recursive watcher at {}", zPath);
}
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException("Error setting up persistent recursive watcher", e);
}
}

public ZooCache(ZooSession zk, Optional<ZooCacheWatcher> watcher, Duration timeout) {
this.zk = requireNonNull(zk);
this.externalWatcher = watcher;
RemovalListener<String,ZcNode> removalListerner = (path, zcNode, reason) -> {
try {
log.trace("{} removing watches for {} because {} accesses {}", cacheId, path, reason,
zcNode == null ? -1 : zcNode.getAccessCount());
zk.removeWatches(path, ZooCache.this.watcher, Watcher.WatcherType.Any, false);
} catch (InterruptedException | KeeperException | RuntimeException e) {
log.warn("{} failed to remove watches on path {} in zookeeper", cacheId, path, e);
private boolean isWatchedPath(String path) {
// Check that the path is equal to, or a descendant of, a watched path
for (String watchedPath : watchedPaths) {
if (path.startsWith(watchedPath)) {
return true;
}
};
// Must register the removal listener using evictionListener inorder for removal to be mutually
// exclusive with any other operations on the same path. This is important for watcher
// consistency, concurrently adding and removing watches for the same path would leave zoocache
// in a really bad state. The cache builder has another way to register a removal listener that
// is not mutually exclusive.
Cache<String,ZcNode> cache =
Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
.expireAfterAccess(timeout).evictionListener(removalListerner).build();
nodeCache = cache.asMap();
log.trace("{} created new cache", cacheId, new Exception());
}
return false;
}

// Use this instead of Preconditions.checkState(isWatchedPath, String)
// so that we are not creating String unnecessarily.
private void ensureWatched(String path) {
if (!isWatchedPath(path)) {
throw new IllegalStateException("Supplied path " + path + " is not watched by this ZooCache");
}
}

private abstract static class ZooRunnable<T> {
Expand Down Expand Up @@ -298,7 +326,7 @@ private ZcInterruptedException(InterruptedException e) {
*/
public List<String> getChildren(final String zPath) {
Preconditions.checkState(!closed);

ensureWatched(zPath);
ZooRunnable<List<String>> zr = new ZooRunnable<>() {

@Override
Expand All @@ -322,12 +350,12 @@ public List<String> run() throws KeeperException, InterruptedException {
// That is ok because the compute() call on the map has a lock and processing the event
// will block until compute() returns. After compute() returns the event processing
// would clear the map entry.
Stat stat = zk.exists(zPath, watcher);
Stat stat = zk.exists(zPath, null);
if (stat == null) {
log.trace("{} getChildren saw that {} does not exists", cacheId, zPath);
return ZcNode.NON_EXISTENT;
}
List<String> children = zk.getChildren(zPath, watcher);
List<String> children = zk.getChildren(zPath, null);
log.trace("{} adding {} children of {} to cache", cacheId, children.size(), zPath);
return new ZcNode(children, zcn);
} catch (KeeperException.NoNodeException nne) {
Expand Down Expand Up @@ -370,6 +398,7 @@ public byte[] get(final String zPath) {
*/
public byte[] get(final String zPath, final ZcStat status) {
Preconditions.checkState(!closed);
ensureWatched(zPath);
ZooRunnable<byte[]> zr = new ZooRunnable<>() {

@Override
Expand Down Expand Up @@ -400,7 +429,7 @@ public byte[] run() throws KeeperException, InterruptedException {
* non-existence can not be cached.
*/
try {
Stat stat = zk.exists(zPath, watcher);
Stat stat = zk.exists(zPath, null);
if (stat == null) {
if (log.isTraceEnabled()) {
log.trace("{} zookeeper did not contain {}", cacheId, zPath);
Expand All @@ -410,7 +439,7 @@ public byte[] run() throws KeeperException, InterruptedException {
byte[] data = null;
ZcStat zstat = null;
try {
data = zk.getData(zPath, watcher, stat);
data = zk.getData(zPath, null, stat);
zstat = new ZcStat(stat);
} catch (KeeperException.BadVersionException | KeeperException.NoNodeException e1) {
throw new ConcurrentModificationException(e1);
Expand Down Expand Up @@ -456,12 +485,6 @@ protected void copyStats(ZcStat userStat, ZcStat cachedStat) {
}
}

private void remove(String zPath) {
nodeCache.remove(zPath);
log.trace("{} removed {} from cache", cacheId, zPath);
updateCount.incrementAndGet();
}

/**
* Clears this cache.
*/
Expand Down Expand Up @@ -493,6 +516,7 @@ public long getUpdateCount() {
*/
@VisibleForTesting
public boolean dataCached(String zPath) {
ensureWatched(zPath);
var zcn = nodeCache.get(zPath);
return zcn != null && zcn.cachedData();
}
Expand All @@ -505,6 +529,7 @@ public boolean dataCached(String zPath) {
*/
@VisibleForTesting
public boolean childrenCached(String zPath) {
ensureWatched(zPath);
var zcn = nodeCache.get(zPath);
return zcn != null && zcn.cachedChildren();
}
Expand All @@ -515,19 +540,15 @@ public boolean childrenCached(String zPath) {
public void clear(Predicate<String> pathPredicate) {
Preconditions.checkState(!closed);

Predicate<String> pathPredicateToUse;
if (log.isTraceEnabled()) {
pathPredicateToUse = path -> {
boolean testResult = pathPredicate.test(path);
if (testResult) {
log.trace("{} removing {} from cache", cacheId, path);
}
return testResult;
};
} else {
pathPredicateToUse = pathPredicate;
}
nodeCache.keySet().removeIf(pathPredicateToUse);
Predicate<String> pathPredicateWrapper = path -> {
boolean testResult = isWatchedPath(path) && pathPredicate.test(path);
if (testResult) {
updateCount.incrementAndGet();
log.trace("{} removing {} from cache", cacheId, path);
}
return testResult;
};
nodeCache.keySet().removeIf(pathPredicateWrapper);
updateCount.incrementAndGet();
}

Expand All @@ -537,10 +558,12 @@ public void clear(Predicate<String> pathPredicate) {
* @param zPath path of top node
*/
public void clear(String zPath) {
ensureWatched(zPath);
clear(path -> path.startsWith(zPath));
}

public Optional<ServiceLockData> getLockData(ServiceLockPath path) {
ensureWatched(path.toString());
List<String> children = ServiceLock.validateAndSort(path, getChildren(path.toString()));
if (children == null || children.isEmpty()) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.zookeeper.AddWatchMode;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -290,6 +291,11 @@ public void sync(final String path, VoidCallback cb, Object ctx) {
verifyConnected().sync(path, cb, ctx);
}

public void addPersistentRecursiveWatcher(String path, Watcher watcher)
throws KeeperException, InterruptedException {
verifyConnected().addWatch(path, watcher, AddWatchMode.PERSISTENT_RECURSIVE);
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Loading

0 comments on commit 8657cc3

Please sign in to comment.