Skip to content

Commit

Permalink
Use ZooCache detached from Context for ServerPaths (apache#5253)
Browse files Browse the repository at this point in the history
* Use ZooCache detached from Context for ServerPaths

ServiceLockPaths cannot use ClientContext because that can be closed,
and ServiceLockPaths needs to be reused inside the tablet locator thread
pool, which is a static singleton that can survive longer than the
client context.

This removes context from ServiceLockPaths and causes
ZookeeperLockChecker to provide it with a separate root path and
ZooCache.

This fixes the ITs broken after the merge of apache#5192 in
e745a5d

* Fix ManagerAssignmentIT

* This fixes ManagerAssignmentIT flakiness that occurs because of the
  reuse of a ZooSession from a closed client connection in subsequent
  tests (flakiness depends on test order)
* This should be fixed in subsequent fixes by removing the static
  singleton location cache and coupling it to the Context lifecycle
  (ServerContext or AccumuloClient/ClientContext) instead
* This fixes the flakiness by maintaining a single AccumuloClient for
  the entirety of the SharedMiniClusterBase test cases; since only one
  cluster instance is used for the test, it only needs one client
  instance
  • Loading branch information
ctubbsii authored Jan 14, 2025
1 parent 57e3dda commit aa7b495
Show file tree
Hide file tree
Showing 9 changed files with 590 additions and 830 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
this.singletonReservation = Objects.requireNonNull(reservation);
this.tableops = new TableOperationsImpl(this);
this.namespaceops = new NamespaceOperationsImpl(this, tableops);
this.serverPaths = Suppliers.memoize(() -> new ServiceLockPaths(this));
this.serverPaths =
Suppliers.memoize(() -> new ServiceLockPaths(this.getZooKeeperRoot(), this.getZooCache()));
if (ueh == Threads.UEH) {
clientThreadPools = ThreadPools.getServerThreadPools();
} else {
Expand Down Expand Up @@ -1061,8 +1062,7 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() {
// because that client could be closed, and its ZooSession also closed
// this needs to be fixed; TODO https://github.com/apache/accumulo/issues/2301
var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get();
this.zkLockChecker =
new ZookeeperLockChecker(new ZooCache(zk), getZooKeeperRoot(), getServerPaths());
this.zkLockChecker = new ZookeeperLockChecker(new ZooCache(zk), getZooKeeperRoot());
}
return this.zkLockChecker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public class ZookeeperLockChecker implements TabletServerLockChecker {
private final String root;
private final ServiceLockPaths lockPaths;

ZookeeperLockChecker(ZooCache zooCache, String zkRoot, ServiceLockPaths serviceLockPaths) {
ZookeeperLockChecker(ZooCache zooCache, String zkRoot) {
this.zc = requireNonNull(zooCache);
this.root = requireNonNull(zkRoot);
this.lockPaths = requireNonNull(serviceLockPaths);
this.lockPaths = new ServiceLockPaths(this.root, this.zc);
}

public boolean doesTabletServerLockExist(String server) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
*/
package org.apache.accumulo.core.lock;

import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -33,7 +34,6 @@
import java.util.function.Predicate;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.util.threads.ThreadPoolNames;
Expand Down Expand Up @@ -68,8 +68,8 @@ protected ServiceLockPath(String path) {
* Create a ServiceLockPath for a management process
*/
private ServiceLockPath(String root, String type) {
Objects.requireNonNull(root);
this.type = Objects.requireNonNull(type);
requireNonNull(root);
this.type = requireNonNull(type);
Preconditions.checkArgument(this.type.equals(Constants.ZGC_LOCK)
|| this.type.equals(Constants.ZMANAGER_LOCK) || this.type.equals(Constants.ZMONITOR_LOCK)
|| this.type.equals(Constants.ZTABLE_LOCKS), "Unsupported type: " + type);
Expand All @@ -84,28 +84,28 @@ private ServiceLockPath(String root, String type) {
* Create a ServiceLockPath for ZTABLE_LOCKS
*/
private ServiceLockPath(String root, String type, String content) {
Objects.requireNonNull(root);
this.type = Objects.requireNonNull(type);
requireNonNull(root);
this.type = requireNonNull(type);
Preconditions.checkArgument(
this.type.equals(Constants.ZTABLE_LOCKS) || this.type.equals(Constants.ZMINI_LOCK),
"Unsupported type: " + type);
this.resourceGroup = null;
this.server = Objects.requireNonNull(content);
this.server = requireNonNull(content);
this.path = root + this.type + "/" + this.server;
}

/**
* Create a ServiceLockPath for a worker process
*/
private ServiceLockPath(String root, String type, String resourceGroup, String server) {
Objects.requireNonNull(root);
this.type = Objects.requireNonNull(type);
requireNonNull(root);
this.type = requireNonNull(type);
Preconditions.checkArgument(
this.type.equals(Constants.ZCOMPACTORS) || this.type.equals(Constants.ZSSERVERS)
|| this.type.equals(Constants.ZTSERVERS) || this.type.equals(Constants.ZDEADTSERVERS),
"Unsupported type: " + type);
this.resourceGroup = Objects.requireNonNull(resourceGroup);
this.server = Objects.requireNonNull(server);
this.resourceGroup = requireNonNull(resourceGroup);
this.server = requireNonNull(server);
this.path = root + this.type + "/" + this.resourceGroup + "/" + this.server;
}

Expand Down Expand Up @@ -184,10 +184,12 @@ public String toString() {

private final ExecutorService fetchExectuor;

private final ClientContext ctx;
private final String zkRoot;
private final ZooCache zooCache;

public ServiceLockPaths(ClientContext context) {
this.ctx = context;
public ServiceLockPaths(String zkRoot, ZooCache zc) {
this.zkRoot = requireNonNull(zkRoot);
this.zooCache = requireNonNull(zc);
this.fetchExectuor = ThreadPools.getServerThreadPools()
.getPoolBuilder(ThreadPoolNames.SERVICE_LOCK_POOL).numCoreThreads(16).build();
}
Expand Down Expand Up @@ -219,8 +221,8 @@ private static String determineServerType(final String path) {
* Parse a ZooKeeper path string and return a ServiceLockPath
*/
public static ServiceLockPath parse(Optional<String> serverType, String path) {
Objects.requireNonNull(serverType);
Objects.requireNonNull(path);
requireNonNull(serverType);
requireNonNull(path);

final String type = serverType.orElseGet(() -> determineServerType(path));

Expand Down Expand Up @@ -253,47 +255,47 @@ public static ServiceLockPath parse(Optional<String> serverType, String path) {
}

public ServiceLockPath createGarbageCollectorPath() {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZGC_LOCK);
return new ServiceLockPath(zkRoot, Constants.ZGC_LOCK);
}

public ServiceLockPath createManagerPath() {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZMANAGER_LOCK);
return new ServiceLockPath(zkRoot, Constants.ZMANAGER_LOCK);
}

public ServiceLockPath createMiniPath(String miniUUID) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZMINI_LOCK, miniUUID);
return new ServiceLockPath(zkRoot, Constants.ZMINI_LOCK, miniUUID);
}

public ServiceLockPath createMonitorPath() {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZMONITOR_LOCK);
return new ServiceLockPath(zkRoot, Constants.ZMONITOR_LOCK);
}

public ServiceLockPath createCompactorPath(String resourceGroup, HostAndPort serverAddress) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZCOMPACTORS, resourceGroup,
return new ServiceLockPath(zkRoot, Constants.ZCOMPACTORS, resourceGroup,
serverAddress.toString());
}

public ServiceLockPath createScanServerPath(String resourceGroup, HostAndPort serverAddress) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZSSERVERS, resourceGroup,
return new ServiceLockPath(zkRoot, Constants.ZSSERVERS, resourceGroup,
serverAddress.toString());
}

public ServiceLockPath createTableLocksPath() {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZTABLE_LOCKS);
return new ServiceLockPath(zkRoot, Constants.ZTABLE_LOCKS);
}

public ServiceLockPath createTableLocksPath(String tableId) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZTABLE_LOCKS, tableId);
return new ServiceLockPath(zkRoot, Constants.ZTABLE_LOCKS, tableId);
}

public ServiceLockPath createTabletServerPath(String resourceGroup, HostAndPort serverAddress) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZTSERVERS, resourceGroup,
return new ServiceLockPath(zkRoot, Constants.ZTSERVERS, resourceGroup,
serverAddress.toString());
}

public ServiceLockPath createDeadTabletServerPath(String resourceGroup,
HostAndPort serverAddress) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZDEADTSERVERS, resourceGroup,
return new ServiceLockPath(zkRoot, Constants.ZDEADTSERVERS, resourceGroup,
serverAddress.toString());
}

Expand Down Expand Up @@ -421,13 +423,12 @@ private Set<ServiceLockPath> get(final String serverType,
ResourceGroupPredicate resourceGroupPredicate, AddressSelector addressSelector,
boolean withLock) {

Objects.requireNonNull(serverType);
Objects.requireNonNull(resourceGroupPredicate);
Objects.requireNonNull(addressSelector);
requireNonNull(serverType);
requireNonNull(resourceGroupPredicate);
requireNonNull(addressSelector);

final Set<ServiceLockPath> results = ConcurrentHashMap.newKeySet();
final String typePath = ctx.getZooKeeperRoot() + serverType;
final ZooCache cache = ctx.getZooCache();
final String typePath = zkRoot + serverType;

if (serverType.equals(Constants.ZGC_LOCK) || serverType.equals(Constants.ZMANAGER_LOCK)
|| serverType.equals(Constants.ZMONITOR_LOCK)) {
Expand All @@ -436,22 +437,22 @@ private Set<ServiceLockPath> get(final String serverType,
if (!withLock) {
results.add(slp);
} else {
Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, slp, stat);
Optional<ServiceLockData> sld = ServiceLock.getLockData(zooCache, slp, stat);
if (!sld.isEmpty()) {
results.add(slp);
}
}
} else if (serverType.equals(Constants.ZCOMPACTORS) || serverType.equals(Constants.ZSSERVERS)
|| serverType.equals(Constants.ZTSERVERS) || serverType.equals(Constants.ZDEADTSERVERS)) {
final List<String> resourceGroups = cache.getChildren(typePath);
final List<String> resourceGroups = zooCache.getChildren(typePath);
for (final String group : resourceGroups) {
if (resourceGroupPredicate.test(group)) {
final Collection<String> servers;
final Predicate<String> addressPredicate;

if (addressSelector.getExactAddress() != null) {
var server = addressSelector.getExactAddress().toString();
if (withLock || cache.get(typePath + "/" + group + "/" + server) != null) {
if (withLock || zooCache.get(typePath + "/" + group + "/" + server) != null) {
// When withLock is true the server in the list may not exist in zookeeper, if it does
// not exist then no lock will be found later when looking for a lock in zookeeper.
servers = List.of(server);
Expand All @@ -460,7 +461,7 @@ private Set<ServiceLockPath> get(final String serverType,
}
addressPredicate = s -> true;
} else {
servers = cache.getChildren(typePath + "/" + group);
servers = zooCache.getChildren(typePath + "/" + group);
addressPredicate = addressSelector.getPredicate();
}

Expand All @@ -484,7 +485,7 @@ private Set<ServiceLockPath> get(final String serverType,
// connection at the same time though.
var futureTask = new FutureTask<>(() -> {
final ZcStat stat = new ZcStat();
Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, slp, stat);
Optional<ServiceLockData> sld = ServiceLock.getLockData(zooCache, slp, stat);
if (sld.isPresent()) {
results.add(slp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLockPaths;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -59,8 +58,7 @@ public void tearDown() {

@Test
public void testInvalidateCache() {
var zklc =
new ZookeeperLockChecker(zc, context.getZooKeeperRoot(), new ServiceLockPaths(context));
var zklc = new ZookeeperLockChecker(zc, context.getZooKeeperRoot());

verify(zc);
reset(zc);
Expand Down
Loading

0 comments on commit aa7b495

Please sign in to comment.