Skip to content

Commit

Permalink
Recreate watchers on re-connect, address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 17, 2025
1 parent 3281387 commit 956c999
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -87,6 +88,7 @@
import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.AmpleImpl;
import org.apache.accumulo.core.rpc.SaslConnectionParams;
Expand Down Expand Up @@ -237,7 +239,7 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
});

this.zooCache = memoize(() -> new ZooCache(getZooSession(),
ZooCache.createPersistentWatcherPaths(ZooUtil.getRoot(getInstanceID()))));
createPersistentWatcherPaths(ZooUtil.getRoot(getInstanceID()))));
this.accumuloConf = serverConf;
timeoutSupplier = memoizeWithExpiration(
() -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT), 100, MILLISECONDS);
Expand Down Expand Up @@ -1066,7 +1068,7 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() {
var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get();
String zkRoot = getZooKeeperRoot();
this.zkLockChecker =
new ZookeeperLockChecker(new ZooCache(zk, List.of(zkRoot + Constants.ZTSERVERS)), zkRoot);
new ZookeeperLockChecker(new ZooCache(zk, Set.of(zkRoot + Constants.ZTSERVERS)), zkRoot);
}
return this.zkLockChecker;
}
Expand All @@ -1079,4 +1081,15 @@ public NamespaceMapping getNamespaces() {
return namespaces;
}

private static Set<String> createPersistentWatcherPaths(String zkRoot) {
Set<String> pathsToWatch = new HashSet<>();
for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK,
Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK,
Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES,
Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET)) {
pathsToWatch.add(zkRoot + path);
}
return pathsToWatch;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,9 @@
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.util.cache.Caches;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.zookeeper.KeeperException;
Expand All @@ -65,17 +63,6 @@ public interface ZooCacheWatcher extends Consumer<WatchedEvent> {}

private static final Logger log = LoggerFactory.getLogger(ZooCache.class);

public static List<String> createPersistentWatcherPaths(String zkRoot) {
List<String> pathsToWatch = new ArrayList<>();
for (String path : Set.of(Constants.ZCOMPACTORS, Constants.ZDEADTSERVERS, Constants.ZGC_LOCK,
Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK,
Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES,
Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET)) {
pathsToWatch.add(zkRoot + path);
}
return pathsToWatch;
}

protected final TreeSet<String> watchedPaths = new TreeSet<>();
// visible for tests
protected final ZCacheWatcher watcher = new ZCacheWatcher();
Expand All @@ -97,6 +84,8 @@ public static List<String> createPersistentWatcherPaths(String zkRoot) {
// their compute functions.
private final ConcurrentMap<String,ZcNode> nodeCache;

private final Set<String> pathsToWatch;

private final ZooSession zk;

private volatile boolean closed = false;
Expand Down Expand Up @@ -177,7 +166,9 @@ public void process(WatchedEvent event) {
clear();
break;
case SyncConnected:
log.trace("{} ZooKeeper connection established, ignoring; {}", cacheId, event);
log.trace("{} ZooKeeper connection established, re-establishing watchers; {}",
cacheId, event);
setupWatchers(pathsToWatch);
break;
case Expired:
log.trace("{} ZooKeeper connection expired, clearing cache; {}", cacheId, event);
Expand Down Expand Up @@ -207,24 +198,18 @@ public void process(WatchedEvent event) {
* @param zk ZooSession for this instance
* @param pathsToWatch Paths in ZooKeeper to watch
*/
public ZooCache(ZooSession zk, List<String> pathsToWatch) {
this.zk = requireNonNull(zk);
this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
.ticker(Ticker.systemTicker()).expireAfterAccess(CACHE_DURATION).build();
this.nodeCache = cache.asMap();

setupWatchers(requireNonNull(pathsToWatch));
log.trace("{} created new cache", cacheId, new Exception());
public ZooCache(ZooSession zk, Set<String> pathsToWatch) {
this(zk, pathsToWatch, Ticker.systemTicker());
}

// for tests that use a Ticker
public ZooCache(ZooSession zk, List<String> pathsToWatch, Ticker ticker) {
public ZooCache(ZooSession zk, Set<String> pathsToWatch, Ticker ticker) {
this.zk = requireNonNull(zk);
this.cache = Caches.getInstance().createNewBuilder(Caches.CacheName.ZOO_CACHE, false)
.ticker(requireNonNull(ticker)).expireAfterAccess(CACHE_DURATION).build();
this.nodeCache = cache.asMap();

setupWatchers(requireNonNull(pathsToWatch));
this.pathsToWatch = requireNonNull(pathsToWatch);
setupWatchers(pathsToWatch);
log.trace("{} created new cache", cacheId, new Exception());
}

Expand All @@ -233,7 +218,7 @@ public void addZooCacheWatcher(ZooCacheWatcher watcher) {
}

// Visible for testing
protected void setupWatchers(List<String> pathsToWatch) {
protected void setupWatchers(Set<String> pathsToWatch) {

for (String left : pathsToWatch) {
for (String right : pathsToWatch) {
Expand Down Expand Up @@ -584,7 +569,7 @@ public boolean childrenCached(String zPath) {
public void clear(Predicate<String> pathPredicate) {
Preconditions.checkState(!closed);
Predicate<String> pathPredicateWrapper = path -> {
boolean testResult = isWatchedPath(path) && pathPredicate.test(path);
boolean testResult = pathPredicate.test(path);
if (testResult) {
updateCount.incrementAndGet();
log.trace("{} removing {} from cache", cacheId, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.List;
import java.util.Set;
import java.util.UUID;

import org.apache.accumulo.core.Constants;
Expand All @@ -54,12 +55,12 @@ public class ZooCacheTest {
*/
private static class TestZooCache extends ZooCache {

public TestZooCache(ZooSession zk, List<String> pathsToWatch) {
public TestZooCache(ZooSession zk, Set<String> pathsToWatch) {
super(zk, pathsToWatch);
}

@Override
protected void setupWatchers(List<String> pathsToWatch) {
protected void setupWatchers(Set<String> pathsToWatch) {
for (String path : pathsToWatch) {
watchedPaths.add(path);
}
Expand All @@ -84,15 +85,15 @@ public void executeWatcher(WatchedEvent event) {
@BeforeEach
public void setUp() {
zk = createStrictMock(ZooSession.class);
zc = new TestZooCache(zk, List.of(root));
zc = new TestZooCache(zk, Set.of(root));
}

@Test
public void testOverlappingPaths() {
assertThrows(IllegalArgumentException.class,
() -> new ZooCache(zk, List.of(root, root + "/localhost:9995")));
() -> new ZooCache(zk, Set.of(root, root + "/localhost:9995")));

List<String> goodPaths = List.of("/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/compactors",
Set<String> goodPaths = Set.of("/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/compactors",
"/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/dead/tservers",
"/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/gc/lock",
"/accumulo/8247eee6-a176-4e19-baf7-e3da965fe050/managers/lock",
Expand Down Expand Up @@ -336,7 +337,7 @@ private void testWatchDataNode(byte[] initialData, Watcher.Event.EventType event
WatchedEvent event =
new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH);
TestWatcher exw = new TestWatcher(event);
zc = new TestZooCache(zk, List.of(root));
zc = new TestZooCache(zk, Set.of(root));
zc.addZooCacheWatcher(exw);

watchData(initialData);
Expand Down Expand Up @@ -447,7 +448,7 @@ private void testGetBoth(boolean getDataFirst) throws Exception {
private void testWatchDataNode_Clear(Watcher.Event.KeeperState state) throws Exception {
WatchedEvent event = new WatchedEvent(Watcher.Event.EventType.None, state, null);
TestWatcher exw = new TestWatcher(event);
zc = new TestZooCache(zk, List.of(root));
zc = new TestZooCache(zk, Set.of(root));
zc.addZooCacheWatcher(exw);

watchData(DATA);
Expand Down Expand Up @@ -482,7 +483,7 @@ private void testWatchChildrenNode(List<String> initialChildren,
WatchedEvent event =
new WatchedEvent(eventType, Watcher.Event.KeeperState.SyncConnected, ZPATH);
TestWatcher exw = new TestWatcher(event);
zc = new TestZooCache(zk, List.of(root));
zc = new TestZooCache(zk, Set.of(root));
zc.addZooCacheWatcher(exw);

watchChildren(initialChildren);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;

Expand All @@ -47,7 +48,7 @@ public static void main(String[] args) throws Exception {
myfile.deleteOnExit();

try (var zk = new ZooSession(CacheTestReader.class.getSimpleName(), keepers, 30_000, null)) {
ZooCache zc = new ZooCache(zk, List.of("/"));
ZooCache zc = new ZooCache(zk, Set.of("/"));

while (true) {
if (myfile.exists() && !myfile.delete()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static class TestZooCache extends ZooCache {

private static final ZooCacheTicker ticker = new ZooCacheTicker();

public TestZooCache(ZooSession zk, List<String> pathsToWatch) {
public TestZooCache(ZooSession zk, Set<String> pathsToWatch) {
super(zk, pathsToWatch, ticker);
}

Expand Down Expand Up @@ -105,7 +105,7 @@ public void testGetChildren() throws Exception {

final String root = Constants.ZROOT + UUID.randomUUID().toString();
final String base = root + Constants.ZTSERVERS;
TestZooCache zooCache = new TestZooCache(zk, List.of(base));
TestZooCache zooCache = new TestZooCache(zk, Set.of(base));

zrw.mkdirs(base + "/test2");
zrw.mkdirs(base + "/test3/c1");
Expand Down

0 comments on commit 956c999

Please sign in to comment.