Skip to content

Commit

Permalink
Made ITs that restart MAC faster (apache#5246)
Browse files Browse the repository at this point in the history
Modified MAC so that it cleaned up lock paths
in ZooKeeper and ZooCache when stopping. Noticed
that in tests that restarted MAC the Manager
process would wait for the previous lock to be
removed on the session timeout. The lock paths
would also be cached in ZooCache and not updated
right away because the Watcher would not fire
when MAC was stopped, a ConnectionLoss error
would be returned when MAC started, and it would
take a while for ZooCache to fix itself.
  • Loading branch information
dlmarion authored Jan 15, 2025
1 parent f3bd1b8 commit 7b11821
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;

import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -525,6 +526,33 @@ public void clear(String zPath) {
}
}

/**
* Removes all paths in the cache match the predicate.
*/
public void clear(Predicate<String> pathPredicate) {
Preconditions.checkState(!closed);
Predicate<String> pathPredicateToUse;
if (log.isTraceEnabled()) {
pathPredicateToUse = pathPredicate.and(path -> {
log.trace("removing {} from cache", path);
return true;
});
} else {
pathPredicateToUse = pathPredicate;
}
cacheWriteLock.lock();
try {
cache.keySet().removeIf(pathPredicateToUse);
childrenCache.keySet().removeIf(pathPredicateToUse);
statCache.keySet().removeIf(pathPredicateToUse);

immutableCache = new ImmutableCacheCopies(++updateCount, cache, statCache, childrenCache);

} finally {
cacheWriteLock.unlock();
}
}

public byte[] getLockData(ServiceLockPath path) {
List<String> children = ServiceLock.validateAndSort(path, getChildren(path.toString()));
if (children == null || children.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.server.util.ZooZap;
import org.apache.accumulo.tserver.ScanServer;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
Expand Down Expand Up @@ -278,6 +279,11 @@ public synchronized void stop(ServerType server, String hostname) throws IOExcep
if (managerProcess != null) {
try {
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
try {
new ZooZap().zap(cluster.getServerContext().getSiteConfiguration(), "-manager");
} catch (RuntimeException e) {
log.error("Error zapping Manager zookeeper lock", e);
}
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand Down Expand Up @@ -90,6 +91,7 @@
import org.apache.accumulo.server.init.Initialize;
import org.apache.accumulo.server.util.AccumuloStatus;
import org.apache.accumulo.server.util.PortUtils;
import org.apache.accumulo.server.util.ZooZap;
import org.apache.accumulo.start.Main;
import org.apache.accumulo.start.classloader.vfs.MiniDFSUtil;
import org.apache.accumulo.start.spi.KeywordExecutable;
Expand Down Expand Up @@ -824,9 +826,47 @@ public synchronized void stop() throws IOException, InterruptedException {

control.stop(ServerType.GARBAGE_COLLECTOR, null);
control.stop(ServerType.MANAGER, null);
control.stop(ServerType.COMPACTION_COORDINATOR);
control.stop(ServerType.TABLET_SERVER, null);
control.stop(ServerType.COMPACTOR, null);
control.stop(ServerType.SCAN_SERVER, null);

// The method calls above kill the server
// Clean up the locks in ZooKeeper fo that if the cluster
// is restarted, then the processes will start right away
// and not wait for the old locks to be cleaned up.
try {
new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager",
"-compaction-coordinators", "-tservers", "-compactors", "-sservers");
} catch (RuntimeException e) {
log.error("Error zapping zookeeper locks", e);
}
control.stop(ServerType.ZOOKEEPER, null);

// Clear the location of the servers in ZooCache.
// When ZooKeeper was stopped in the previous method call,
// the local ZooKeeper watcher did not fire. If MAC is
// restarted, then ZooKeeper will start on the same port with
// the same data, but no Watchers will fire.
boolean startCalled = true;
try {
getServerContext();
} catch (RuntimeException e) {
if (e.getMessage().startsWith("Accumulo not initialized")) {
startCalled = false;
}
}
if (startCalled) {
final ServerContext ctx = getServerContext();
final String zRoot = getServerContext().getZooKeeperRoot();
Predicate<String> pred = path -> false;
for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, Constants.ZGC_LOCK,
Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) {
pred = pred.or(path -> path.startsWith(zRoot + lockPath));
}
ctx.getZooCache().clear(pred);
}

// ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs
if (executor != null) {
List<Runnable> tasksRemaining = executor.shutdownNow();
Expand Down
166 changes: 84 additions & 82 deletions server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ public static void main(String[] args) throws Exception {

@Override
public void execute(String[] args) throws Exception {
try {
var siteConf = SiteConfiguration.auto();
// Login as the server on secure HDFS
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
SecurityUtil.serverLogin(siteConf);
}
zap(siteConf, args);
} finally {
SingletonManager.setMode(Mode.CLOSED);
}
}

public void zap(SiteConfiguration siteConf, String... args) {
Opts opts = new Opts();
opts.parseArgs(keyword(), args);

Expand All @@ -98,111 +111,100 @@ public void execute(String[] args) throws Exception {
return;
}

try {
var siteConf = SiteConfiguration.auto();
// Login as the server on secure HDFS
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
SecurityUtil.serverLogin(siteConf);
}
String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next();
Path instanceDir = new Path(volDir, "instance_id");
InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration());
ZooReaderWriter zoo = new ZooReaderWriter(siteConf);

String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next();
Path instanceDir = new Path(volDir, "instance_id");
InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration());
ZooReaderWriter zoo = new ZooReaderWriter(siteConf);

if (opts.zapMaster) {
log.warn("The -master option is deprecated. Please use -manager instead.");
}
if (opts.zapManager || opts.zapMaster) {
String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK;
if (opts.zapMaster) {
log.warn("The -master option is deprecated. Please use -manager instead.");
}
if (opts.zapManager || opts.zapMaster) {
String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK;

try {
zapDirectory(zoo, managerLockPath, opts);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
try {
zapDirectory(zoo, managerLockPath, opts);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}

if (opts.zapTservers) {
String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS;
try {
List<String> children = zoo.getChildren(tserversPath);
for (String child : children) {
message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts);

if (opts.zapManager || opts.zapMaster) {
zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP);
} else {
var zLockPath = ServiceLock.path(tserversPath + "/" + child);
if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) {
message("Did not delete " + tserversPath + "/" + child, opts);
}
if (opts.zapTservers) {
String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS;
try {
List<String> children = zoo.getChildren(tserversPath);
for (String child : children) {
message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts);

if (opts.zapManager || opts.zapMaster) {
zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP);
} else {
var zLockPath = ServiceLock.path(tserversPath + "/" + child);
if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) {
message("Did not delete " + tserversPath + "/" + child, opts);
}
}
}
} catch (KeeperException | InterruptedException e) {
log.error("{}", e.getMessage(), e);
}
} catch (KeeperException | InterruptedException e) {
log.error("{}", e.getMessage(), e);
}
}

// Remove the tracers, we don't use them anymore.
@SuppressWarnings("deprecation")
String path = siteConf.get(Property.TRACE_ZK_PATH);
// Remove the tracers, we don't use them anymore.
@SuppressWarnings("deprecation")
String path = siteConf.get(Property.TRACE_ZK_PATH);
try {
zapDirectory(zoo, path, opts);
} catch (Exception e) {
// do nothing if the /tracers node does not exist.
}

if (opts.zapCoordinators) {
final String coordinatorPath = Constants.ZROOT + "/" + iid + Constants.ZCOORDINATOR_LOCK;
try {
zapDirectory(zoo, path, opts);
} catch (Exception e) {
// do nothing if the /tracers node does not exist.
if (zoo.exists(coordinatorPath)) {
zapDirectory(zoo, coordinatorPath, opts);
}
} catch (KeeperException | InterruptedException e) {
log.error("Error deleting coordinator from zookeeper, {}", e.getMessage(), e);
}
}

if (opts.zapCoordinators) {
final String coordinatorPath = Constants.ZROOT + "/" + iid + Constants.ZCOORDINATOR_LOCK;
try {
if (zoo.exists(coordinatorPath)) {
zapDirectory(zoo, coordinatorPath, opts);
if (opts.zapCompactors) {
String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS;
try {
if (zoo.exists(compactorsBasepath)) {
List<String> queues = zoo.getChildren(compactorsBasepath);
for (String queue : queues) {
message("Deleting " + compactorsBasepath + "/" + queue + " from zookeeper", opts);
zoo.recursiveDelete(compactorsBasepath + "/" + queue, NodeMissingPolicy.SKIP);
}
} catch (KeeperException | InterruptedException e) {
log.error("Error deleting coordinator from zookeeper, {}", e.getMessage(), e);
}
} catch (KeeperException | InterruptedException e) {
log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e);
}

if (opts.zapCompactors) {
String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS;
try {
if (zoo.exists(compactorsBasepath)) {
List<String> queues = zoo.getChildren(compactorsBasepath);
for (String queue : queues) {
message("Deleting " + compactorsBasepath + "/" + queue + " from zookeeper", opts);
zoo.recursiveDelete(compactorsBasepath + "/" + queue, NodeMissingPolicy.SKIP);
}
}
} catch (KeeperException | InterruptedException e) {
log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e);
}
}

}
if (opts.zapScanServers) {
String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS;
try {
if (zoo.exists(sserversPath)) {
List<String> children = zoo.getChildren(sserversPath);
for (String child : children) {
message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts);

if (opts.zapScanServers) {
String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS;
try {
if (zoo.exists(sserversPath)) {
List<String> children = zoo.getChildren(sserversPath);
for (String child : children) {
message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts);

var zLockPath = ServiceLock.path(sserversPath + "/" + child);
if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
ServiceLock.deleteLock(zoo, zLockPath);
}
var zLockPath = ServiceLock.path(sserversPath + "/" + child);
if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
ServiceLock.deleteLock(zoo, zLockPath);
}
}
} catch (KeeperException | InterruptedException e) {
log.error("{}", e.getMessage(), e);
}
} catch (KeeperException | InterruptedException e) {
log.error("{}", e.getMessage(), e);
}

} finally {
SingletonManager.setMode(Mode.CLOSED);
}

}
Expand Down

0 comments on commit 7b11821

Please sign in to comment.