From 7b1182192dd3b41eaac0e31a5a4808913d7369d3 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Wed, 15 Jan 2025 15:41:31 -0500 Subject: [PATCH] Made ITs that restart MAC faster (#5246) Modified MAC so that it cleaned up lock paths in ZooKeeper and ZooCache when stopping. Noticed that in tests that restarted MAC the Manager process would wait for the previous lock to be removed on the session timeout. The lock paths would also be cached in ZooCache and not updated right away because the Watcher would not fire when MAC was stopped, a ConnectionLoss error would be returned when MAC started, and it would take a while for ZooCache to fix itself. --- .../core/fate/zookeeper/ZooCache.java | 28 +++ .../MiniAccumuloClusterControl.java | 6 + .../MiniAccumuloClusterImpl.java | 40 +++++ .../apache/accumulo/server/util/ZooZap.java | 166 +++++++++--------- 4 files changed, 158 insertions(+), 82 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index fe8807c5703..d7f924f5767 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@ -30,6 +30,7 @@ import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; import org.apache.zookeeper.KeeperException; @@ -525,6 +526,33 @@ public void clear(String zPath) { } } + /** + * Removes all paths in the cache match the predicate. + */ + public void clear(Predicate pathPredicate) { + Preconditions.checkState(!closed); + Predicate pathPredicateToUse; + if (log.isTraceEnabled()) { + pathPredicateToUse = pathPredicate.and(path -> { + log.trace("removing {} from cache", path); + return true; + }); + } else { + pathPredicateToUse = pathPredicate; + } + cacheWriteLock.lock(); + try { + cache.keySet().removeIf(pathPredicateToUse); + childrenCache.keySet().removeIf(pathPredicateToUse); + statCache.keySet().removeIf(pathPredicateToUse); + + immutableCache = new ImmutableCacheCopies(++updateCount, cache, statCache, childrenCache); + + } finally { + cacheWriteLock.unlock(); + } + } + public byte[] getLockData(ServiceLockPath path) { List children = ServiceLock.validateAndSort(path, getChildren(path.toString())); if (children == null || children.isEmpty()) { diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index e40679ebb0b..b0c46aadf97 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@ -47,6 +47,7 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.server.util.Admin; +import org.apache.accumulo.server.util.ZooZap; import org.apache.accumulo.tserver.ScanServer; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; @@ -278,6 +279,11 @@ public synchronized void stop(ServerType server, String hostname) throws IOExcep if (managerProcess != null) { try { cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS); + try { + new ZooZap().zap(cluster.getServerContext().getSiteConfiguration(), "-manager"); + } catch (RuntimeException e) { + log.error("Error zapping Manager zookeeper lock", e); + } } catch (ExecutionException | TimeoutException e) { log.warn("Manager did not fully stop after 30 seconds", e); } catch (InterruptedException e) { diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 8695bc41657..95f8a4ca2b1 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Stream; @@ -90,6 +91,7 @@ import org.apache.accumulo.server.init.Initialize; import org.apache.accumulo.server.util.AccumuloStatus; import org.apache.accumulo.server.util.PortUtils; +import org.apache.accumulo.server.util.ZooZap; import org.apache.accumulo.start.Main; import org.apache.accumulo.start.classloader.vfs.MiniDFSUtil; import org.apache.accumulo.start.spi.KeywordExecutable; @@ -824,9 +826,47 @@ public synchronized void stop() throws IOException, InterruptedException { control.stop(ServerType.GARBAGE_COLLECTOR, null); control.stop(ServerType.MANAGER, null); + control.stop(ServerType.COMPACTION_COORDINATOR); control.stop(ServerType.TABLET_SERVER, null); + control.stop(ServerType.COMPACTOR, null); + control.stop(ServerType.SCAN_SERVER, null); + + // The method calls above kill the server + // Clean up the locks in ZooKeeper fo that if the cluster + // is restarted, then the processes will start right away + // and not wait for the old locks to be cleaned up. + try { + new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager", + "-compaction-coordinators", "-tservers", "-compactors", "-sservers"); + } catch (RuntimeException e) { + log.error("Error zapping zookeeper locks", e); + } control.stop(ServerType.ZOOKEEPER, null); + // Clear the location of the servers in ZooCache. + // When ZooKeeper was stopped in the previous method call, + // the local ZooKeeper watcher did not fire. If MAC is + // restarted, then ZooKeeper will start on the same port with + // the same data, but no Watchers will fire. + boolean startCalled = true; + try { + getServerContext(); + } catch (RuntimeException e) { + if (e.getMessage().startsWith("Accumulo not initialized")) { + startCalled = false; + } + } + if (startCalled) { + final ServerContext ctx = getServerContext(); + final String zRoot = getServerContext().getZooKeeperRoot(); + Predicate pred = path -> false; + for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, Constants.ZGC_LOCK, + Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) { + pred = pred.or(path -> path.startsWith(zRoot + lockPath)); + } + ctx.getZooCache().clear(pred); + } + // ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs if (executor != null) { List tasksRemaining = executor.shutdownNow(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index 8e37cba708b..d16b9fe9843 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@ -90,6 +90,19 @@ public static void main(String[] args) throws Exception { @Override public void execute(String[] args) throws Exception { + try { + var siteConf = SiteConfiguration.auto(); + // Login as the server on secure HDFS + if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + SecurityUtil.serverLogin(siteConf); + } + zap(siteConf, args); + } finally { + SingletonManager.setMode(Mode.CLOSED); + } + } + + public void zap(SiteConfiguration siteConf, String... args) { Opts opts = new Opts(); opts.parseArgs(keyword(), args); @@ -98,111 +111,100 @@ public void execute(String[] args) throws Exception { return; } - try { - var siteConf = SiteConfiguration.auto(); - // Login as the server on secure HDFS - if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { - SecurityUtil.serverLogin(siteConf); - } + String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); + Path instanceDir = new Path(volDir, "instance_id"); + InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); + ZooReaderWriter zoo = new ZooReaderWriter(siteConf); - String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); - Path instanceDir = new Path(volDir, "instance_id"); - InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); - ZooReaderWriter zoo = new ZooReaderWriter(siteConf); - - if (opts.zapMaster) { - log.warn("The -master option is deprecated. Please use -manager instead."); - } - if (opts.zapManager || opts.zapMaster) { - String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK; + if (opts.zapMaster) { + log.warn("The -master option is deprecated. Please use -manager instead."); + } + if (opts.zapManager || opts.zapMaster) { + String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK; - try { - zapDirectory(zoo, managerLockPath, opts); - } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); - } + try { + zapDirectory(zoo, managerLockPath, opts); + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); } + } - if (opts.zapTservers) { - String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; - try { - List children = zoo.getChildren(tserversPath); - for (String child : children) { - message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts); - - if (opts.zapManager || opts.zapMaster) { - zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP); - } else { - var zLockPath = ServiceLock.path(tserversPath + "/" + child); - if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { - if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) { - message("Did not delete " + tserversPath + "/" + child, opts); - } + if (opts.zapTservers) { + String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; + try { + List children = zoo.getChildren(tserversPath); + for (String child : children) { + message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts); + + if (opts.zapManager || opts.zapMaster) { + zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP); + } else { + var zLockPath = ServiceLock.path(tserversPath + "/" + child); + if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { + if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) { + message("Did not delete " + tserversPath + "/" + child, opts); } } } - } catch (KeeperException | InterruptedException e) { - log.error("{}", e.getMessage(), e); } + } catch (KeeperException | InterruptedException e) { + log.error("{}", e.getMessage(), e); } + } - // Remove the tracers, we don't use them anymore. - @SuppressWarnings("deprecation") - String path = siteConf.get(Property.TRACE_ZK_PATH); + // Remove the tracers, we don't use them anymore. + @SuppressWarnings("deprecation") + String path = siteConf.get(Property.TRACE_ZK_PATH); + try { + zapDirectory(zoo, path, opts); + } catch (Exception e) { + // do nothing if the /tracers node does not exist. + } + + if (opts.zapCoordinators) { + final String coordinatorPath = Constants.ZROOT + "/" + iid + Constants.ZCOORDINATOR_LOCK; try { - zapDirectory(zoo, path, opts); - } catch (Exception e) { - // do nothing if the /tracers node does not exist. + if (zoo.exists(coordinatorPath)) { + zapDirectory(zoo, coordinatorPath, opts); + } + } catch (KeeperException | InterruptedException e) { + log.error("Error deleting coordinator from zookeeper, {}", e.getMessage(), e); } + } - if (opts.zapCoordinators) { - final String coordinatorPath = Constants.ZROOT + "/" + iid + Constants.ZCOORDINATOR_LOCK; - try { - if (zoo.exists(coordinatorPath)) { - zapDirectory(zoo, coordinatorPath, opts); + if (opts.zapCompactors) { + String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS; + try { + if (zoo.exists(compactorsBasepath)) { + List queues = zoo.getChildren(compactorsBasepath); + for (String queue : queues) { + message("Deleting " + compactorsBasepath + "/" + queue + " from zookeeper", opts); + zoo.recursiveDelete(compactorsBasepath + "/" + queue, NodeMissingPolicy.SKIP); } - } catch (KeeperException | InterruptedException e) { - log.error("Error deleting coordinator from zookeeper, {}", e.getMessage(), e); } + } catch (KeeperException | InterruptedException e) { + log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e); } - if (opts.zapCompactors) { - String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS; - try { - if (zoo.exists(compactorsBasepath)) { - List queues = zoo.getChildren(compactorsBasepath); - for (String queue : queues) { - message("Deleting " + compactorsBasepath + "/" + queue + " from zookeeper", opts); - zoo.recursiveDelete(compactorsBasepath + "/" + queue, NodeMissingPolicy.SKIP); - } - } - } catch (KeeperException | InterruptedException e) { - log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e); - } + } - } + if (opts.zapScanServers) { + String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS; + try { + if (zoo.exists(sserversPath)) { + List children = zoo.getChildren(sserversPath); + for (String child : children) { + message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts); - if (opts.zapScanServers) { - String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS; - try { - if (zoo.exists(sserversPath)) { - List children = zoo.getChildren(sserversPath); - for (String child : children) { - message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts); - - var zLockPath = ServiceLock.path(sserversPath + "/" + child); - if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { - ServiceLock.deleteLock(zoo, zLockPath); - } + var zLockPath = ServiceLock.path(sserversPath + "/" + child); + if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { + ServiceLock.deleteLock(zoo, zLockPath); } } - } catch (KeeperException | InterruptedException e) { - log.error("{}", e.getMessage(), e); } + } catch (KeeperException | InterruptedException e) { + log.error("{}", e.getMessage(), e); } - - } finally { - SingletonManager.setMode(Mode.CLOSED); } }