Skip to content

Commit

Permalink
Merge branch 'main' into zc-recursive-watchers
Browse files Browse the repository at this point in the history
  • Loading branch information
ctubbsii authored Jan 14, 2025
2 parents 0d0a9cd + aa7b495 commit 07803d9
Show file tree
Hide file tree
Showing 12 changed files with 630 additions and 867 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
this.singletonReservation = Objects.requireNonNull(reservation);
this.tableops = new TableOperationsImpl(this);
this.namespaceops = new NamespaceOperationsImpl(this, tableops);
this.serverPaths = Suppliers.memoize(() -> new ServiceLockPaths(this));
this.serverPaths =
Suppliers.memoize(() -> new ServiceLockPaths(this.getZooKeeperRoot(), this.getZooCache()));
if (ueh == Threads.UEH) {
clientThreadPools = ThreadPools.getServerThreadPools();
} else {
Expand Down Expand Up @@ -1062,9 +1063,8 @@ public synchronized ZookeeperLockChecker getTServerLockChecker() {
// because that client could be closed, and its ZooSession also closed
// this needs to be fixed; TODO https://github.com/apache/accumulo/issues/2301
var zk = info.getZooKeeperSupplier(ZookeeperLockChecker.class.getSimpleName()).get();
String zkRoot = getZooKeeperRoot();
this.zkLockChecker = new ZookeeperLockChecker(new ZooCache(zk, Optional.empty(), zkRoot),
zkRoot, getServerPaths());
this.zkLockChecker = new ZookeeperLockChecker(new ZooCache(zk), Optional.empty(),
getZooKeeperRoot());
}
return this.zkLockChecker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ public class ZookeeperLockChecker implements TabletServerLockChecker {
private final String root;
private final ServiceLockPaths lockPaths;

ZookeeperLockChecker(ZooCache zooCache, String zkRoot, ServiceLockPaths serviceLockPaths) {
ZookeeperLockChecker(ZooCache zooCache, String zkRoot) {
this.zc = requireNonNull(zooCache);
this.root = requireNonNull(zkRoot);
this.lockPaths = requireNonNull(serviceLockPaths);
this.lockPaths = new ServiceLockPaths(this.root, this.zc);
}

public boolean doesTabletServerLockExist(String server) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
*/
package org.apache.accumulo.core.lock;

import static java.util.Objects.requireNonNull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -33,7 +34,6 @@
import java.util.function.Predicate;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat;
import org.apache.accumulo.core.util.threads.ThreadPoolNames;
Expand Down Expand Up @@ -68,8 +68,8 @@ protected ServiceLockPath(String path) {
* Create a ServiceLockPath for a management process
*/
private ServiceLockPath(String root, String type) {
Objects.requireNonNull(root);
this.type = Objects.requireNonNull(type);
requireNonNull(root);
this.type = requireNonNull(type);
Preconditions.checkArgument(this.type.equals(Constants.ZGC_LOCK)
|| this.type.equals(Constants.ZMANAGER_LOCK) || this.type.equals(Constants.ZMONITOR_LOCK)
|| this.type.equals(Constants.ZTABLE_LOCKS), "Unsupported type: " + type);
Expand All @@ -84,28 +84,28 @@ private ServiceLockPath(String root, String type) {
* Create a ServiceLockPath for ZTABLE_LOCKS
*/
private ServiceLockPath(String root, String type, String content) {
Objects.requireNonNull(root);
this.type = Objects.requireNonNull(type);
requireNonNull(root);
this.type = requireNonNull(type);
Preconditions.checkArgument(
this.type.equals(Constants.ZTABLE_LOCKS) || this.type.equals(Constants.ZMINI_LOCK),
"Unsupported type: " + type);
this.resourceGroup = null;
this.server = Objects.requireNonNull(content);
this.server = requireNonNull(content);
this.path = root + this.type + "/" + this.server;
}

/**
* Create a ServiceLockPath for a worker process
*/
private ServiceLockPath(String root, String type, String resourceGroup, String server) {
Objects.requireNonNull(root);
this.type = Objects.requireNonNull(type);
requireNonNull(root);
this.type = requireNonNull(type);
Preconditions.checkArgument(
this.type.equals(Constants.ZCOMPACTORS) || this.type.equals(Constants.ZSSERVERS)
|| this.type.equals(Constants.ZTSERVERS) || this.type.equals(Constants.ZDEADTSERVERS),
"Unsupported type: " + type);
this.resourceGroup = Objects.requireNonNull(resourceGroup);
this.server = Objects.requireNonNull(server);
this.resourceGroup = requireNonNull(resourceGroup);
this.server = requireNonNull(server);
this.path = root + this.type + "/" + this.resourceGroup + "/" + this.server;
}

Expand Down Expand Up @@ -184,10 +184,12 @@ public String toString() {

private final ExecutorService fetchExectuor;

private final ClientContext ctx;
private final String zkRoot;
private final ZooCache zooCache;

public ServiceLockPaths(ClientContext context) {
this.ctx = context;
public ServiceLockPaths(String zkRoot, ZooCache zc) {
this.zkRoot = requireNonNull(zkRoot);
this.zooCache = requireNonNull(zc);
this.fetchExectuor = ThreadPools.getServerThreadPools()
.getPoolBuilder(ThreadPoolNames.SERVICE_LOCK_POOL).numCoreThreads(16).build();
}
Expand Down Expand Up @@ -219,8 +221,8 @@ private static String determineServerType(final String path) {
* Parse a ZooKeeper path string and return a ServiceLockPath
*/
public static ServiceLockPath parse(Optional<String> serverType, String path) {
Objects.requireNonNull(serverType);
Objects.requireNonNull(path);
requireNonNull(serverType);
requireNonNull(path);

final String type = serverType.orElseGet(() -> determineServerType(path));

Expand Down Expand Up @@ -253,47 +255,47 @@ public static ServiceLockPath parse(Optional<String> serverType, String path) {
}

public ServiceLockPath createGarbageCollectorPath() {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZGC_LOCK);
return new ServiceLockPath(zkRoot, Constants.ZGC_LOCK);
}

public ServiceLockPath createManagerPath() {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZMANAGER_LOCK);
return new ServiceLockPath(zkRoot, Constants.ZMANAGER_LOCK);
}

public ServiceLockPath createMiniPath(String miniUUID) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZMINI_LOCK, miniUUID);
return new ServiceLockPath(zkRoot, Constants.ZMINI_LOCK, miniUUID);
}

public ServiceLockPath createMonitorPath() {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZMONITOR_LOCK);
return new ServiceLockPath(zkRoot, Constants.ZMONITOR_LOCK);
}

public ServiceLockPath createCompactorPath(String resourceGroup, HostAndPort serverAddress) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZCOMPACTORS, resourceGroup,
return new ServiceLockPath(zkRoot, Constants.ZCOMPACTORS, resourceGroup,
serverAddress.toString());
}

public ServiceLockPath createScanServerPath(String resourceGroup, HostAndPort serverAddress) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZSSERVERS, resourceGroup,
return new ServiceLockPath(zkRoot, Constants.ZSSERVERS, resourceGroup,
serverAddress.toString());
}

public ServiceLockPath createTableLocksPath() {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZTABLE_LOCKS);
return new ServiceLockPath(zkRoot, Constants.ZTABLE_LOCKS);
}

public ServiceLockPath createTableLocksPath(String tableId) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZTABLE_LOCKS, tableId);
return new ServiceLockPath(zkRoot, Constants.ZTABLE_LOCKS, tableId);
}

public ServiceLockPath createTabletServerPath(String resourceGroup, HostAndPort serverAddress) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZTSERVERS, resourceGroup,
return new ServiceLockPath(zkRoot, Constants.ZTSERVERS, resourceGroup,
serverAddress.toString());
}

public ServiceLockPath createDeadTabletServerPath(String resourceGroup,
HostAndPort serverAddress) {
return new ServiceLockPath(ctx.getZooKeeperRoot(), Constants.ZDEADTSERVERS, resourceGroup,
return new ServiceLockPath(zkRoot, Constants.ZDEADTSERVERS, resourceGroup,
serverAddress.toString());
}

Expand Down Expand Up @@ -421,13 +423,12 @@ private Set<ServiceLockPath> get(final String serverType,
ResourceGroupPredicate resourceGroupPredicate, AddressSelector addressSelector,
boolean withLock) {

Objects.requireNonNull(serverType);
Objects.requireNonNull(resourceGroupPredicate);
Objects.requireNonNull(addressSelector);
requireNonNull(serverType);
requireNonNull(resourceGroupPredicate);
requireNonNull(addressSelector);

final Set<ServiceLockPath> results = ConcurrentHashMap.newKeySet();
final String typePath = ctx.getZooKeeperRoot() + serverType;
final ZooCache cache = ctx.getZooCache();
final String typePath = zkRoot + serverType;

if (serverType.equals(Constants.ZGC_LOCK) || serverType.equals(Constants.ZMANAGER_LOCK)
|| serverType.equals(Constants.ZMONITOR_LOCK)) {
Expand All @@ -436,22 +437,22 @@ private Set<ServiceLockPath> get(final String serverType,
if (!withLock) {
results.add(slp);
} else {
Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, slp, stat);
Optional<ServiceLockData> sld = ServiceLock.getLockData(zooCache, slp, stat);
if (!sld.isEmpty()) {
results.add(slp);
}
}
} else if (serverType.equals(Constants.ZCOMPACTORS) || serverType.equals(Constants.ZSSERVERS)
|| serverType.equals(Constants.ZTSERVERS) || serverType.equals(Constants.ZDEADTSERVERS)) {
final List<String> resourceGroups = cache.getChildren(typePath);
final List<String> resourceGroups = zooCache.getChildren(typePath);
for (final String group : resourceGroups) {
if (resourceGroupPredicate.test(group)) {
final Collection<String> servers;
final Predicate<String> addressPredicate;

if (addressSelector.getExactAddress() != null) {
var server = addressSelector.getExactAddress().toString();
if (withLock || cache.get(typePath + "/" + group + "/" + server) != null) {
if (withLock || zooCache.get(typePath + "/" + group + "/" + server) != null) {
// When withLock is true the server in the list may not exist in zookeeper, if it does
// not exist then no lock will be found later when looking for a lock in zookeeper.
servers = List.of(server);
Expand All @@ -460,7 +461,7 @@ private Set<ServiceLockPath> get(final String serverType,
}
addressPredicate = s -> true;
} else {
servers = cache.getChildren(typePath + "/" + group);
servers = zooCache.getChildren(typePath + "/" + group);
addressPredicate = addressSelector.getPredicate();
}

Expand All @@ -484,7 +485,7 @@ private Set<ServiceLockPath> get(final String serverType,
// connection at the same time though.
var futureTask = new FutureTask<>(() -> {
final ZcStat stat = new ZcStat();
Optional<ServiceLockData> sld = ServiceLock.getLockData(cache, slp, stat);
Optional<ServiceLockData> sld = ServiceLock.getLockData(zooCache, slp, stat);
if (sld.isPresent()) {
results.add(slp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLockPaths;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -59,8 +58,7 @@ public void tearDown() {

@Test
public void testInvalidateCache() {
var zklc =
new ZookeeperLockChecker(zc, context.getZooKeeperRoot(), new ServiceLockPaths(context));
var zklc = new ZookeeperLockChecker(zc, context.getZooKeeperRoot());

verify(zc);
reset(zc);
Expand Down
Loading

0 comments on commit 07803d9

Please sign in to comment.