From b685237ada69b925effdcc6b40d7d694bde42977 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 10 Jan 2025 22:52:27 +0000 Subject: [PATCH] Made ITs that restart MAC faster Modified MAC so that it cleaned up lock paths in ZooKeeper and ZooCache when stopping. Noticed that in tests that restarted MAC the Manager process would wait for the previous lock to be removed on the session timeout. The lock paths would also be cached in ZooCache and not updated right away because the Watcher would not fire when MAC was stopped, a ConnectionLoss error would be returned when MAC started, and it would take a while for ZooCache to fix itself. --- .../core/fate/zookeeper/ZooCache.java | 31 ++++ .../MiniAccumuloClusterControl.java | 10 ++ .../MiniAccumuloClusterImpl.java | 39 ++++ .../apache/accumulo/server/util/ZooZap.java | 166 +++++++++--------- 4 files changed, 164 insertions(+), 82 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index fe8807c5703..e4ef61bfac8 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@ -30,6 +30,7 @@ import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; import org.apache.zookeeper.KeeperException; @@ -525,6 +526,36 @@ public void clear(String zPath) { } } + /** + * Removes all paths in the cache match the predicate. + */ + public void clear(Predicate pathPredicate) { + Preconditions.checkState(!closed); + Predicate pathPredicateToUse; + if (log.isTraceEnabled()) { + pathPredicateToUse = path -> { + boolean testResult = pathPredicate.test(path); + if (testResult) { + log.trace("removing {} from cache", path); + } + return testResult; + }; + } else { + pathPredicateToUse = pathPredicate; + } + cacheWriteLock.lock(); + try { + cache.keySet().removeIf(pathPredicateToUse); + childrenCache.keySet().removeIf(pathPredicateToUse); + statCache.keySet().removeIf(pathPredicateToUse); + + immutableCache = new ImmutableCacheCopies(++updateCount, cache, statCache, childrenCache); + + } finally { + cacheWriteLock.unlock(); + } + } + public byte[] getLockData(ServiceLockPath path) { List children = ServiceLock.validateAndSort(path, getChildren(path.toString())); if (children == null || children.isEmpty()) { diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index e40679ebb0b..f126c414ad1 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@ -47,6 +47,7 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.server.util.Admin; +import org.apache.accumulo.server.util.ZooZap; import org.apache.accumulo.tserver.ScanServer; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; @@ -278,6 +279,15 @@ public synchronized void stop(ServerType server, String hostname) throws IOExcep if (managerProcess != null) { try { cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS); + try { + System.setProperty("accumulo.properties", + "file://" + cluster.getAccumuloPropertiesPath()); + new ZooZap().zap(cluster.getServerContext().getSiteConfiguration(), + new String[] {"-manager"}); + } catch (Exception e) { + log.error("Error zapping Manager zookeeper lock", e); + } + } catch (ExecutionException | TimeoutException e) { log.warn("Manager did not fully stop after 30 seconds", e); } catch (InterruptedException e) { diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 8695bc41657..7e67088f309 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -90,6 +90,7 @@ import org.apache.accumulo.server.init.Initialize; import org.apache.accumulo.server.util.AccumuloStatus; import org.apache.accumulo.server.util.PortUtils; +import org.apache.accumulo.server.util.ZooZap; import org.apache.accumulo.start.Main; import org.apache.accumulo.start.classloader.vfs.MiniDFSUtil; import org.apache.accumulo.start.spi.KeywordExecutable; @@ -824,9 +825,47 @@ public synchronized void stop() throws IOException, InterruptedException { control.stop(ServerType.GARBAGE_COLLECTOR, null); control.stop(ServerType.MANAGER, null); + control.stop(ServerType.COMPACTION_COORDINATOR); control.stop(ServerType.TABLET_SERVER, null); + control.stop(ServerType.COMPACTOR, null); + control.stop(ServerType.SCAN_SERVER, null); + + // The method calls above kill the server + // Clean up the locks in ZooKeeper fo that if the cluster + // is restarted, then the processes will start right away + // and not wait for the old locks to be cleaned up. + try { + System.setProperty("accumulo.properties", "file://" + getAccumuloPropertiesPath()); + new ZooZap().zap(getServerContext().getSiteConfiguration(), new String[] {"-manager", + "-compaction-coordinators", "-tservers", "-compactors", "-sservers"}); + } catch (Exception e) { + log.error("Error zapping zookeeper locks", e); + } control.stop(ServerType.ZOOKEEPER, null); + // Clear the location of the servers in ZooCache. + // When ZooKeeper was stopped in the previous method call, + // the local ZooKeeper watcher did not fire. If MAC is + // restarted, then ZooKeeper will start on the same port with + // the same data, but no Watchers will fire. + boolean startCalled = true; + try { + getServerContext(); + } catch (RuntimeException e) { + if (e.getMessage().startsWith("Accumulo not initialized")) { + startCalled = false; + } + } + if (startCalled) { + final ServerContext ctx = getServerContext(); + final String zRoot = getServerContext().getZooKeeperRoot(); + ctx.getZooCache().clear((path) -> path.startsWith(zRoot + Constants.ZMANAGER_LOCK)); + ctx.getZooCache().clear((path) -> path.startsWith(zRoot + Constants.ZGC_LOCK)); + ctx.getZooCache().clear((path) -> path.startsWith(zRoot + Constants.ZCOMPACTORS)); + ctx.getZooCache().clear((path) -> path.startsWith(zRoot + Constants.ZSSERVERS)); + ctx.getZooCache().clear((path) -> path.startsWith(zRoot + Constants.ZTSERVERS)); + } + // ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs if (executor != null) { List tasksRemaining = executor.shutdownNow(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index 8e37cba708b..3dfcd459a03 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@ -90,6 +90,19 @@ public static void main(String[] args) throws Exception { @Override public void execute(String[] args) throws Exception { + try { + var siteConf = SiteConfiguration.auto(); + // Login as the server on secure HDFS + if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + SecurityUtil.serverLogin(siteConf); + } + zap(siteConf, args); + } finally { + SingletonManager.setMode(Mode.CLOSED); + } + } + + public void zap(SiteConfiguration siteConf, String[] args) { Opts opts = new Opts(); opts.parseArgs(keyword(), args); @@ -98,111 +111,100 @@ public void execute(String[] args) throws Exception { return; } - try { - var siteConf = SiteConfiguration.auto(); - // Login as the server on secure HDFS - if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { - SecurityUtil.serverLogin(siteConf); - } + String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); + Path instanceDir = new Path(volDir, "instance_id"); + InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); + ZooReaderWriter zoo = new ZooReaderWriter(siteConf); - String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); - Path instanceDir = new Path(volDir, "instance_id"); - InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); - ZooReaderWriter zoo = new ZooReaderWriter(siteConf); - - if (opts.zapMaster) { - log.warn("The -master option is deprecated. Please use -manager instead."); - } - if (opts.zapManager || opts.zapMaster) { - String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK; + if (opts.zapMaster) { + log.warn("The -master option is deprecated. Please use -manager instead."); + } + if (opts.zapManager || opts.zapMaster) { + String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK; - try { - zapDirectory(zoo, managerLockPath, opts); - } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); - } + try { + zapDirectory(zoo, managerLockPath, opts); + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); } + } - if (opts.zapTservers) { - String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; - try { - List children = zoo.getChildren(tserversPath); - for (String child : children) { - message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts); - - if (opts.zapManager || opts.zapMaster) { - zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP); - } else { - var zLockPath = ServiceLock.path(tserversPath + "/" + child); - if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { - if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) { - message("Did not delete " + tserversPath + "/" + child, opts); - } + if (opts.zapTservers) { + String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; + try { + List children = zoo.getChildren(tserversPath); + for (String child : children) { + message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts); + + if (opts.zapManager || opts.zapMaster) { + zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP); + } else { + var zLockPath = ServiceLock.path(tserversPath + "/" + child); + if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { + if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) { + message("Did not delete " + tserversPath + "/" + child, opts); } } } - } catch (KeeperException | InterruptedException e) { - log.error("{}", e.getMessage(), e); } + } catch (KeeperException | InterruptedException e) { + log.error("{}", e.getMessage(), e); } + } - // Remove the tracers, we don't use them anymore. - @SuppressWarnings("deprecation") - String path = siteConf.get(Property.TRACE_ZK_PATH); + // Remove the tracers, we don't use them anymore. + @SuppressWarnings("deprecation") + String path = siteConf.get(Property.TRACE_ZK_PATH); + try { + zapDirectory(zoo, path, opts); + } catch (Exception e) { + // do nothing if the /tracers node does not exist. + } + + if (opts.zapCoordinators) { + final String coordinatorPath = Constants.ZROOT + "/" + iid + Constants.ZCOORDINATOR_LOCK; try { - zapDirectory(zoo, path, opts); - } catch (Exception e) { - // do nothing if the /tracers node does not exist. + if (zoo.exists(coordinatorPath)) { + zapDirectory(zoo, coordinatorPath, opts); + } + } catch (KeeperException | InterruptedException e) { + log.error("Error deleting coordinator from zookeeper, {}", e.getMessage(), e); } + } - if (opts.zapCoordinators) { - final String coordinatorPath = Constants.ZROOT + "/" + iid + Constants.ZCOORDINATOR_LOCK; - try { - if (zoo.exists(coordinatorPath)) { - zapDirectory(zoo, coordinatorPath, opts); + if (opts.zapCompactors) { + String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS; + try { + if (zoo.exists(compactorsBasepath)) { + List queues = zoo.getChildren(compactorsBasepath); + for (String queue : queues) { + message("Deleting " + compactorsBasepath + "/" + queue + " from zookeeper", opts); + zoo.recursiveDelete(compactorsBasepath + "/" + queue, NodeMissingPolicy.SKIP); } - } catch (KeeperException | InterruptedException e) { - log.error("Error deleting coordinator from zookeeper, {}", e.getMessage(), e); } + } catch (KeeperException | InterruptedException e) { + log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e); } - if (opts.zapCompactors) { - String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS; - try { - if (zoo.exists(compactorsBasepath)) { - List queues = zoo.getChildren(compactorsBasepath); - for (String queue : queues) { - message("Deleting " + compactorsBasepath + "/" + queue + " from zookeeper", opts); - zoo.recursiveDelete(compactorsBasepath + "/" + queue, NodeMissingPolicy.SKIP); - } - } - } catch (KeeperException | InterruptedException e) { - log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e); - } + } - } + if (opts.zapScanServers) { + String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS; + try { + if (zoo.exists(sserversPath)) { + List children = zoo.getChildren(sserversPath); + for (String child : children) { + message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts); - if (opts.zapScanServers) { - String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS; - try { - if (zoo.exists(sserversPath)) { - List children = zoo.getChildren(sserversPath); - for (String child : children) { - message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts); - - var zLockPath = ServiceLock.path(sserversPath + "/" + child); - if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { - ServiceLock.deleteLock(zoo, zLockPath); - } + var zLockPath = ServiceLock.path(sserversPath + "/" + child); + if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { + ServiceLock.deleteLock(zoo, zLockPath); } } - } catch (KeeperException | InterruptedException e) { - log.error("{}", e.getMessage(), e); } + } catch (KeeperException | InterruptedException e) { + log.error("{}", e.getMessage(), e); } - - } finally { - SingletonManager.setMode(Mode.CLOSED); } }