Skip to content

Commit

Permalink
Addressed PR suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 15, 2025
1 parent 8246fd7 commit fb6aaa7
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
Expand Down Expand Up @@ -235,7 +236,8 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
return zk;
});

this.zooCache = memoize(() -> new ZooCache(getZooSession(), ZooUtil.getRoot(getInstanceID())));
this.zooCache = memoize(() -> new ZooCache(getZooSession(),
ZooCache.createPersistentWatcherPaths(ZooUtil.getRoot(getInstanceID()))));
this.accumuloConf = serverConf;
timeoutSupplier = memoizeWithExpiration(
() -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS);
Expand Down Expand Up @@ -1063,7 +1065,8 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() {
// this needs to be fixed; TODO https://github.com/apache/accumulo/issues/2301
var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get();
String zkRoot = getZooKeeperRoot();
this.zkLockChecker = new ZookeeperLockChecker(new ZooCache(zk, zkRoot), zkRoot);
this.zkLockChecker =
new ZookeeperLockChecker(new ZooCache(zk, List.of(zkRoot + Constants.ZTSERVERS)), zkRoot);
}
return this.zkLockChecker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -55,8 +56,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

/**
* A cache for values stored in ZooKeeper. Values are kept up to date as they change.
*/
Expand All @@ -66,10 +65,16 @@ public interface ZooCacheWatcher extends Consumer<WatchedEvent> {}

private static final Logger log = LoggerFactory.getLogger(ZooCache.class);

protected static final String[] ALLOWED_PATHS = new String[] {Constants.ZCOMPACTORS,
Constants.ZDEADTSERVERS, Constants.ZGC_LOCK, Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK,
Constants.ZMONITOR_LOCK, Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS,
Constants.ZTABLES, Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET};
public static List<String> createPersistentWatcherPaths(String zkRoot) {
List<String> pathsToWatch = new ArrayList<>();
for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK,
Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK,
Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES,
Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET)) {
pathsToWatch.add(zkRoot + path);
}
return pathsToWatch;
}

protected final TreeSet<String> watchedPaths = new TreeSet<>();
// visible for tests
Expand All @@ -82,23 +87,15 @@ public interface ZooCacheWatcher extends Consumer<WatchedEvent> {}

public static final Duration CACHE_DURATION = Duration.ofMinutes(30);

// public and non-final because this is being set
// in tests to test the eviction
@SuppressFBWarnings(value = "MS_SHOULD_BE_FINAL",
justification = "being set in tests for eviction test")
public static Ticker ticker = Ticker.systemTicker();

// Construct this here, otherwise end up with NPE in some cases
// when the Watcher tries to access nodeCache. Alternative would
// be to mark nodeCache as volatile.
private final Cache<String,ZcNode> cache =
Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false).ticker(ticker)
.expireAfterAccess(CACHE_DURATION).build();
private final Cache<String,ZcNode> cache;

// The concurrent map returned by Caffiene will only allow one thread to run at a time for a given
// key and ZooCache relies on that. Not all concurrent map implementations have this behavior for
// their compute functions.
private final ConcurrentMap<String,ZcNode> nodeCache = cache.asMap();
private final ConcurrentMap<String,ZcNode> nodeCache;

private final ZooSession zk;

Expand Down Expand Up @@ -140,18 +137,25 @@ class ZCacheWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (log.isTraceEnabled()) {
log.trace("{} {}: {}", cacheId, event.getType(), event);
log.trace("{}: {}", cacheId, event);
}

switch (event.getType()) {
case NodeDataChanged:
case NodeChildrenChanged:
case NodeCreated:
case NodeDeleted:
// According to documentation we should not receive this event.
// According to https://issues.apache.org/jira/browse/ZOOKEEPER-4475 we
// may receive this event (Fixed in 3.9.0)
break;
case ChildWatchRemoved:
case DataWatchRemoved:
// This code use to call remove(path), but that was when a Watcher was set
// on each node. With the Watcher being set at a higher level we need to remove
// We don't need to do anything with the cache on these events.
break;
case NodeDataChanged:
clear(path -> path.equals(event.getPath()));
break;
case NodeCreated:
case NodeDeleted:
// With the Watcher being set at a higher level we need to remove
// the parent of the affected node and all of its children from the cache
// so that the parent and children node can be re-cached. If we only remove the
// affected node, then the cached children in the parent could be incorrect.
Expand Down Expand Up @@ -193,12 +197,34 @@ public void process(WatchedEvent event) {
}
}

public ZooCache(ZooSession zk, String root, ZooCacheWatcher... watchers) {
/**
* Creates a ZooCache instance that uses the supplied ZooSession for communicating with the
* instance's ZooKeeper servers. The ZooCache will create persistent watchers at the given
* pathsToWatch, if any, to be updated when changes are made in ZooKeeper for nodes at or below in
* the tree. If ZooCacheWatcher's are added via {@code addZooCacheWatcher}, then they will be
* notified when this object is notified of changes via the PersistentWatcher callback.
*
* @param zk ZooSession for this instance
* @param pathsToWatch Paths in ZooKeeper to watch
*/
public ZooCache(ZooSession zk, List<String> pathsToWatch) {
this.zk = requireNonNull(zk);
for (ZooCacheWatcher zcw : watchers) {
externalWatchers.add(zcw);
}
setupWatchers(requireNonNull(root));
this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
.ticker(Ticker.systemTicker()).expireAfterAccess(CACHE_DURATION).build();
this.nodeCache = cache.asMap();

setupWatchers(requireNonNull(pathsToWatch));
log.trace("{} created new cache", cacheId, new Exception());
}

// for tests that use a Ticker
public ZooCache(ZooSession zk, List<String> pathsToWatch, Ticker ticker) {
this.zk = requireNonNull(zk);
this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
.ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build();
this.nodeCache = cache.asMap();

setupWatchers(requireNonNull(pathsToWatch));
log.trace("{} created new cache", cacheId, new Exception());
}

Expand All @@ -207,13 +233,12 @@ public void addZooCacheWatcher(ZooCacheWatcher watcher) {
}

// Visible for testing
protected void setupWatchers(String zooRoot) {
protected void setupWatchers(List<String> pathsToWatch) {
try {
for (String path : ALLOWED_PATHS) {
final String zPath = zooRoot + path;
watchedPaths.add(zPath);
zk.addPersistentRecursiveWatcher(zPath, this.watcher);
log.trace("Added persistent recursive watcher at {}", zPath);
for (String path : pathsToWatch) {
watchedPaths.add(path);
zk.addPersistentRecursiveWatcher(path, this.watcher);
log.trace("Added persistent recursive watcher at {}", path);
}
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException("Error setting up persistent recursive watcher", e);
Expand Down Expand Up @@ -558,7 +583,6 @@ public void clear(Predicate<String> pathPredicate) {
return testResult;
};
nodeCache.keySet().removeIf(pathPredicateWrapper);
updateCount.incrementAndGet();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,14 @@ public class ZooCacheTest {
*/
private static class TestZooCache extends ZooCache {

public TestZooCache(ZooSession zk, String root, ZooCacheWatcher... watchers) {
super(zk, root, watchers);
public TestZooCache(ZooSession zk, List<String> pathsToWatch) {
super(zk, pathsToWatch);
}

@Override
protected void setupWatchers(String zooRoot) {
for (String path : ALLOWED_PATHS) {
final String zPath = zooRoot + path;
watchedPaths.add(zPath);
protected void setupWatchers(List<String> pathsToWatch) {
for (String path : pathsToWatch) {
watchedPaths.add(path);
}
}

Expand All @@ -72,8 +71,8 @@ public void executeWatcher(WatchedEvent event) {

}

private static final String instancePath = Constants.ZROOT + "/" + UUID.randomUUID().toString();
private static final String root = instancePath + Constants.ZTSERVERS;
private static final String root =
Constants.ZROOT + "/" + UUID.randomUUID().toString() + Constants.ZTSERVERS;
private static final String ZPATH = root + "/testPath";
private static final byte[] DATA = {(byte) 1, (byte) 2, (byte) 3, (byte) 4};
private static final List<String> CHILDREN = java.util.Arrays.asList("huey", "dewey", "louie");
Expand All @@ -84,7 +83,7 @@ public void executeWatcher(WatchedEvent event) {
@BeforeEach
public void setUp() {
zk = createStrictMock(ZooSession.class);
zc = new TestZooCache(zk, instancePath);
zc = new TestZooCache(zk, List.of(root));
}

@Test
Expand Down Expand Up @@ -314,7 +313,8 @@ private void testWatchDataNode(byte[] initialData, Watcher.Event.EventType event
WatchedEvent event =
new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH);
TestWatcher exw = new TestWatcher(event);
zc = new TestZooCache(zk, instancePath, exw);
zc = new TestZooCache(zk, List.of(root));
zc.addZooCacheWatcher(exw);

watchData(initialData);
zc.executeWatcher(event);
Expand Down Expand Up @@ -424,7 +424,8 @@ private void testGetBoth(boolean getDataFirst) throws Exception {
private void testWatchDataNode_Clear(Watcher.Event.KeeperState state) throws Exception {
WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, state, null);
TestWatcher exw = new TestWatcher(event);
zc = new TestZooCache(zk, instancePath, exw);
zc = new TestZooCache(zk, List.of(root));
zc.addZooCacheWatcher(exw);

watchData(DATA);
assertTrue(zc.dataCached(ZPATH));
Expand All @@ -440,7 +441,7 @@ public void testWatchChildrenNode_Deleted() throws Exception {

@Test
public void testWatchChildrenNode_ChildrenChanged() throws Exception {
testWatchChildrenNode(CHILDREN, Watcher.Event.EventType.NodeChildrenChanged, false);
testWatchChildrenNode(CHILDREN, Watcher.Event.EventType.NodeChildrenChanged, true);
}

@Test
Expand All @@ -458,7 +459,8 @@ private void testWatchChildrenNode(List<String> initialChildren,
WatchedEvent event =
new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH);
TestWatcher exw = new TestWatcher(event);
zc = new TestZooCache(zk, instancePath, exw);
zc = new TestZooCache(zk, List.of(root));
zc.addZooCacheWatcher(exw);

watchChildren(initialChildren);
zc.executeWatcher(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ static synchronized void listInstances(String keepers, boolean printAll, boolean
try (var zk = new ZooSession(ListInstances.class.getSimpleName(), keepers,
ZOOKEEPER_TIMER_MILLIS, null)) {
ZooReader rdr = zk.asReader();
ZooCache cache = new ZooCache(zk, Constants.ZROOT);
ZooCache cache = new ZooCache(zk, List.of(Constants.ZROOT));

TreeMap<String,InstanceId> instanceNames = getInstanceNames(rdr, printErrors);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static void main(String[] args) throws Exception {
myfile.deleteOnExit();

try (var zk = new ZooSession(CacheTestReader.class.getSimpleName(), keepers, 30_000, null)) {
ZooCache zc = new ZooCache(zk, "/");
ZooCache zc = new ZooCache(zk, List.of("/"));

while (true) {
if (myfile.exists() && !myfile.delete()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,19 @@

import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.File;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZooCacheWatcher;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.accumulo.test.util.Wait;
import org.apache.zookeeper.Watcher;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -70,10 +65,19 @@ public void reset() {

}

public static class TestZooCache extends ZooCache {

private static final ZooCacheTicker ticker = new ZooCacheTicker();

public TestZooCache(ZooSession zk, List<String> pathsToWatch) {
super(zk, pathsToWatch, ticker);
}

}

private ZooKeeperTestingServer szk;
private ZooSession zk;
private ZooReaderWriter zrw;
private ZooCacheTicker ticker = new ZooCacheTicker();

@TempDir
private File tempDir;
Expand All @@ -85,7 +89,6 @@ public void setup() throws Exception {
szk = new ZooKeeperTestingServer(tempDir);
zk = szk.newClient();
zrw = zk.asReaderWriter();
ZooCache.ticker = ticker;
}

@AfterEach
Expand All @@ -100,16 +103,9 @@ public void teardown() throws Exception {
@Test
public void testGetChildren() throws Exception {

Set<String> watchesRemoved = Collections.synchronizedSet(new HashSet<>());
ZooCacheWatcher watcher = event -> {
if (event.getType() == Watcher.Event.EventType.ChildWatchRemoved
|| event.getType() == Watcher.Event.EventType.DataWatchRemoved) {
watchesRemoved.add(event.getPath());
}
};
final String root = Constants.ZROOT + UUID.randomUUID().toString();
ZooCache zooCache = new ZooCache(zk, root, watcher);
final String base = root + Constants.ZTSERVERS;
TestZooCache zooCache = new TestZooCache(zk, List.of(base));

zrw.mkdirs(base + "/test2");
zrw.mkdirs(base + "/test3/c1");
Expand Down Expand Up @@ -148,7 +144,6 @@ public void testGetChildren() throws Exception {
assertEquals(List.of(), zooCache.getChildren(base + "/test1"));
assertEquals(List.of(), zooCache.getChildren(base + "/test2"));
assertEquals(Set.of("c1", "c2"), Set.copyOf(zooCache.getChildren(base + "/test3")));
assertEquals(uc5, zooCache.getUpdateCount());
long uc5b = zooCache.getUpdateCount();
assertTrue(uc5 < uc5b);

Expand Down Expand Up @@ -223,16 +218,14 @@ public void testGetChildren() throws Exception {
assertEquals(uc10, zooCache.getUpdateCount());

// wait for the cache to evict and clear watches
ticker.advance();
TestZooCache.ticker.advance();
Wait.waitFor(() -> {
// the cache will not run its eviction handler unless accessed, so access something that is
// not expected to be evicted
zooCache.getChildren(base + "/test4");
return watchesRemoved.equals(Set.of("/test1", "/test2", "/test3"));
return zooCache.childrenCached(base + "/test1") == false
&& zooCache.childrenCached(base + "/test2") == false
&& zooCache.childrenCached(base + "/test3") == false;
});

assertFalse(zooCache.childrenCached(base + "/test1"));
assertFalse(zooCache.childrenCached(base + "/test2"));
assertFalse(zooCache.childrenCached(base + "/test3"));
}
}

0 comments on commit fb6aaa7

Please sign in to comment.