Skip to content

Commit

Permalink
Merge branch '3.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 15, 2025
2 parents 7fada71 + 98cbd85 commit 0add23e
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,13 +517,10 @@ public void clear(Predicate<String> pathPredicate) {

Predicate<String> pathPredicateToUse;
if (log.isTraceEnabled()) {
pathPredicateToUse = path -> {
boolean testResult = pathPredicate.test(path);
if (testResult) {
log.trace("{} removing {} from cache", cacheId, path);
}
return testResult;
};
pathPredicateToUse = pathPredicate.and(path -> {
log.trace("removing {} from cache", path);
return true;
});
} else {
pathPredicateToUse = pathPredicate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.server.util.ZooZap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -270,6 +271,11 @@ public synchronized void stop(ServerType server, String hostname) throws IOExcep
if (managerProcess != null) {
try {
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
try {
new ZooZap().zap(cluster.getServerContext().getSiteConfiguration(), "-manager");
} catch (RuntimeException e) {
log.error("Error zapping Manager zookeeper lock", e);
}
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -109,6 +110,7 @@
import org.apache.accumulo.server.init.Initialize;
import org.apache.accumulo.server.util.AccumuloStatus;
import org.apache.accumulo.server.util.PortUtils;
import org.apache.accumulo.server.util.ZooZap;
import org.apache.accumulo.start.Main;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.accumulo.start.util.MiniDFSUtil;
Expand Down Expand Up @@ -994,10 +996,45 @@ public synchronized void stop() throws IOException, InterruptedException {
control.stop(ServerType.GARBAGE_COLLECTOR, null);
control.stop(ServerType.MANAGER, null);
control.stop(ServerType.TABLET_SERVER, null);
control.stop(ServerType.ZOOKEEPER, null);
control.stop(ServerType.COMPACTOR, null);
control.stop(ServerType.SCAN_SERVER, null);

// The method calls above kill the server
// Clean up the locks in ZooKeeper fo that if the cluster
// is restarted, then the processes will start right away
// and not wait for the old locks to be cleaned up.
try {
new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager", "-tservers",
"-compactors", "-sservers");
} catch (RuntimeException e) {
log.error("Error zapping zookeeper locks", e);
}
control.stop(ServerType.ZOOKEEPER, null);

// Clear the location of the servers in ZooCache.
// When ZooKeeper was stopped in the previous method call,
// the local ZooKeeper watcher did not fire. If MAC is
// restarted, then ZooKeeper will start on the same port with
// the same data, but no Watchers will fire.
boolean startCalled = true;
try {
getServerContext().getZooKeeperRoot();
} catch (IllegalStateException e) {
if (e.getMessage().startsWith("Accumulo not initialized")) {
startCalled = false;
}
}
if (startCalled) {
final ServerContext ctx = getServerContext();
final String zRoot = ctx.getZooKeeperRoot();
Predicate<String> pred = path -> false;
for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, Constants.ZGC_LOCK,
Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) {
pred = pred.or(path -> path.startsWith(zRoot + lockPath));
}
ctx.getZooCache().clear(pred);
}

// ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs
if (executor != null) {
List<Runnable> tasksRemaining = executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath;
import org.apache.accumulo.core.singletons.SingletonManager;
import org.apache.accumulo.core.singletons.SingletonManager.Mode;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.accumulo.start.spi.KeywordExecutable;
Expand Down Expand Up @@ -86,6 +87,19 @@ public static void main(String[] args) throws Exception {

@Override
public void execute(String[] args) throws Exception {
try {
var siteConf = SiteConfiguration.auto();
// Login as the server on secure HDFS
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
SecurityUtil.serverLogin(siteConf);
}
zap(siteConf, args);
} finally {
SingletonManager.setMode(Mode.CLOSED);
}
}

public void zap(SiteConfiguration siteConf, String... args) {
Opts opts = new Opts();
opts.parseArgs(keyword(), args);

Expand All @@ -94,8 +108,7 @@ public void execute(String[] args) throws Exception {
return;
}

try {
var siteConf = SiteConfiguration.auto();
try (var zk = new ZooSession(getClass().getSimpleName(), siteConf)) {
// Login as the server on secure HDFS
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
SecurityUtil.serverLogin(siteConf);
Expand Down Expand Up @@ -175,10 +188,7 @@ public void execute(String[] args) throws Exception {
}
}

} finally {
SingletonManager.setMode(Mode.CLOSED);
}

}

private static void zapDirectory(ZooReaderWriter zoo, ServiceLockPath path, Opts opts)
Expand Down

0 comments on commit 0add23e

Please sign in to comment.