diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java index fe8807c5703..d7f924f5767 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooCache.java @@ -30,6 +30,7 @@ import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Predicate; import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath; import org.apache.zookeeper.KeeperException; @@ -525,6 +526,33 @@ public void clear(String zPath) { } } + /** + * Removes all paths in the cache match the predicate. + */ + public void clear(Predicate pathPredicate) { + Preconditions.checkState(!closed); + Predicate pathPredicateToUse; + if (log.isTraceEnabled()) { + pathPredicateToUse = pathPredicate.and(path -> { + log.trace("removing {} from cache", path); + return true; + }); + } else { + pathPredicateToUse = pathPredicate; + } + cacheWriteLock.lock(); + try { + cache.keySet().removeIf(pathPredicateToUse); + childrenCache.keySet().removeIf(pathPredicateToUse); + statCache.keySet().removeIf(pathPredicateToUse); + + immutableCache = new ImmutableCacheCopies(++updateCount, cache, statCache, childrenCache); + + } finally { + cacheWriteLock.unlock(); + } + } + public byte[] getLockData(ServiceLockPath path) { List children = ServiceLock.validateAndSort(path, getChildren(path.toString())); if (children == null || children.isEmpty()) { diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java index e40679ebb0b..b0c46aadf97 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterControl.java @@ -47,6 +47,7 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; import org.apache.accumulo.server.util.Admin; +import org.apache.accumulo.server.util.ZooZap; import org.apache.accumulo.tserver.ScanServer; import org.apache.thrift.TException; import org.apache.thrift.transport.TTransportException; @@ -278,6 +279,11 @@ public synchronized void stop(ServerType server, String hostname) throws IOExcep if (managerProcess != null) { try { cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS); + try { + new ZooZap().zap(cluster.getServerContext().getSiteConfiguration(), "-manager"); + } catch (RuntimeException e) { + log.error("Error zapping Manager zookeeper lock", e); + } } catch (ExecutionException | TimeoutException e) { log.warn("Manager did not fully stop after 30 seconds", e); } catch (InterruptedException e) { diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 8695bc41657..95f8a4ca2b1 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -56,6 +56,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Stream; @@ -90,6 +91,7 @@ import org.apache.accumulo.server.init.Initialize; import org.apache.accumulo.server.util.AccumuloStatus; import org.apache.accumulo.server.util.PortUtils; +import org.apache.accumulo.server.util.ZooZap; import org.apache.accumulo.start.Main; import org.apache.accumulo.start.classloader.vfs.MiniDFSUtil; import org.apache.accumulo.start.spi.KeywordExecutable; @@ -824,9 +826,47 @@ public synchronized void stop() throws IOException, InterruptedException { control.stop(ServerType.GARBAGE_COLLECTOR, null); control.stop(ServerType.MANAGER, null); + control.stop(ServerType.COMPACTION_COORDINATOR); control.stop(ServerType.TABLET_SERVER, null); + control.stop(ServerType.COMPACTOR, null); + control.stop(ServerType.SCAN_SERVER, null); + + // The method calls above kill the server + // Clean up the locks in ZooKeeper fo that if the cluster + // is restarted, then the processes will start right away + // and not wait for the old locks to be cleaned up. + try { + new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager", + "-compaction-coordinators", "-tservers", "-compactors", "-sservers"); + } catch (RuntimeException e) { + log.error("Error zapping zookeeper locks", e); + } control.stop(ServerType.ZOOKEEPER, null); + // Clear the location of the servers in ZooCache. + // When ZooKeeper was stopped in the previous method call, + // the local ZooKeeper watcher did not fire. If MAC is + // restarted, then ZooKeeper will start on the same port with + // the same data, but no Watchers will fire. + boolean startCalled = true; + try { + getServerContext(); + } catch (RuntimeException e) { + if (e.getMessage().startsWith("Accumulo not initialized")) { + startCalled = false; + } + } + if (startCalled) { + final ServerContext ctx = getServerContext(); + final String zRoot = getServerContext().getZooKeeperRoot(); + Predicate pred = path -> false; + for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, Constants.ZGC_LOCK, + Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) { + pred = pred.or(path -> path.startsWith(zRoot + lockPath)); + } + ctx.getZooCache().clear(pred); + } + // ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs if (executor != null) { List tasksRemaining = executor.shutdownNow(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java index 8e37cba708b..d16b9fe9843 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java @@ -90,6 +90,19 @@ public static void main(String[] args) throws Exception { @Override public void execute(String[] args) throws Exception { + try { + var siteConf = SiteConfiguration.auto(); + // Login as the server on secure HDFS + if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + SecurityUtil.serverLogin(siteConf); + } + zap(siteConf, args); + } finally { + SingletonManager.setMode(Mode.CLOSED); + } + } + + public void zap(SiteConfiguration siteConf, String... args) { Opts opts = new Opts(); opts.parseArgs(keyword(), args); @@ -98,111 +111,100 @@ public void execute(String[] args) throws Exception { return; } - try { - var siteConf = SiteConfiguration.auto(); - // Login as the server on secure HDFS - if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { - SecurityUtil.serverLogin(siteConf); - } + String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); + Path instanceDir = new Path(volDir, "instance_id"); + InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); + ZooReaderWriter zoo = new ZooReaderWriter(siteConf); - String volDir = VolumeConfiguration.getVolumeUris(siteConf).iterator().next(); - Path instanceDir = new Path(volDir, "instance_id"); - InstanceId iid = VolumeManager.getInstanceIDFromHdfs(instanceDir, new Configuration()); - ZooReaderWriter zoo = new ZooReaderWriter(siteConf); - - if (opts.zapMaster) { - log.warn("The -master option is deprecated. Please use -manager instead."); - } - if (opts.zapManager || opts.zapMaster) { - String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK; + if (opts.zapMaster) { + log.warn("The -master option is deprecated. Please use -manager instead."); + } + if (opts.zapManager || opts.zapMaster) { + String managerLockPath = Constants.ZROOT + "/" + iid + Constants.ZMANAGER_LOCK; - try { - zapDirectory(zoo, managerLockPath, opts); - } catch (KeeperException | InterruptedException e) { - e.printStackTrace(); - } + try { + zapDirectory(zoo, managerLockPath, opts); + } catch (KeeperException | InterruptedException e) { + e.printStackTrace(); } + } - if (opts.zapTservers) { - String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; - try { - List children = zoo.getChildren(tserversPath); - for (String child : children) { - message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts); - - if (opts.zapManager || opts.zapMaster) { - zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP); - } else { - var zLockPath = ServiceLock.path(tserversPath + "/" + child); - if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { - if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) { - message("Did not delete " + tserversPath + "/" + child, opts); - } + if (opts.zapTservers) { + String tserversPath = Constants.ZROOT + "/" + iid + Constants.ZTSERVERS; + try { + List children = zoo.getChildren(tserversPath); + for (String child : children) { + message("Deleting " + tserversPath + "/" + child + " from zookeeper", opts); + + if (opts.zapManager || opts.zapMaster) { + zoo.recursiveDelete(tserversPath + "/" + child, NodeMissingPolicy.SKIP); + } else { + var zLockPath = ServiceLock.path(tserversPath + "/" + child); + if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { + if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) { + message("Did not delete " + tserversPath + "/" + child, opts); } } } - } catch (KeeperException | InterruptedException e) { - log.error("{}", e.getMessage(), e); } + } catch (KeeperException | InterruptedException e) { + log.error("{}", e.getMessage(), e); } + } - // Remove the tracers, we don't use them anymore. - @SuppressWarnings("deprecation") - String path = siteConf.get(Property.TRACE_ZK_PATH); + // Remove the tracers, we don't use them anymore. + @SuppressWarnings("deprecation") + String path = siteConf.get(Property.TRACE_ZK_PATH); + try { + zapDirectory(zoo, path, opts); + } catch (Exception e) { + // do nothing if the /tracers node does not exist. + } + + if (opts.zapCoordinators) { + final String coordinatorPath = Constants.ZROOT + "/" + iid + Constants.ZCOORDINATOR_LOCK; try { - zapDirectory(zoo, path, opts); - } catch (Exception e) { - // do nothing if the /tracers node does not exist. + if (zoo.exists(coordinatorPath)) { + zapDirectory(zoo, coordinatorPath, opts); + } + } catch (KeeperException | InterruptedException e) { + log.error("Error deleting coordinator from zookeeper, {}", e.getMessage(), e); } + } - if (opts.zapCoordinators) { - final String coordinatorPath = Constants.ZROOT + "/" + iid + Constants.ZCOORDINATOR_LOCK; - try { - if (zoo.exists(coordinatorPath)) { - zapDirectory(zoo, coordinatorPath, opts); + if (opts.zapCompactors) { + String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS; + try { + if (zoo.exists(compactorsBasepath)) { + List queues = zoo.getChildren(compactorsBasepath); + for (String queue : queues) { + message("Deleting " + compactorsBasepath + "/" + queue + " from zookeeper", opts); + zoo.recursiveDelete(compactorsBasepath + "/" + queue, NodeMissingPolicy.SKIP); } - } catch (KeeperException | InterruptedException e) { - log.error("Error deleting coordinator from zookeeper, {}", e.getMessage(), e); } + } catch (KeeperException | InterruptedException e) { + log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e); } - if (opts.zapCompactors) { - String compactorsBasepath = Constants.ZROOT + "/" + iid + Constants.ZCOMPACTORS; - try { - if (zoo.exists(compactorsBasepath)) { - List queues = zoo.getChildren(compactorsBasepath); - for (String queue : queues) { - message("Deleting " + compactorsBasepath + "/" + queue + " from zookeeper", opts); - zoo.recursiveDelete(compactorsBasepath + "/" + queue, NodeMissingPolicy.SKIP); - } - } - } catch (KeeperException | InterruptedException e) { - log.error("Error deleting compactors from zookeeper, {}", e.getMessage(), e); - } + } - } + if (opts.zapScanServers) { + String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS; + try { + if (zoo.exists(sserversPath)) { + List children = zoo.getChildren(sserversPath); + for (String child : children) { + message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts); - if (opts.zapScanServers) { - String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS; - try { - if (zoo.exists(sserversPath)) { - List children = zoo.getChildren(sserversPath); - for (String child : children) { - message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts); - - var zLockPath = ServiceLock.path(sserversPath + "/" + child); - if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { - ServiceLock.deleteLock(zoo, zLockPath); - } + var zLockPath = ServiceLock.path(sserversPath + "/" + child); + if (!zoo.getChildren(zLockPath.toString()).isEmpty()) { + ServiceLock.deleteLock(zoo, zLockPath); } } - } catch (KeeperException | InterruptedException e) { - log.error("{}", e.getMessage(), e); } + } catch (KeeperException | InterruptedException e) { + log.error("{}", e.getMessage(), e); } - - } finally { - SingletonManager.setMode(Mode.CLOSED); } }