Skip to content

Commit

Permalink
Merge branch '2.1' into 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 15, 2025
2 parents 6636fea + 7b11821 commit 98cbd85
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
Expand Down Expand Up @@ -529,6 +530,33 @@ public void clear(String zPath) {
}
}

/**
* Removes all paths in the cache match the predicate.
*/
public void clear(Predicate<String> pathPredicate) {
Preconditions.checkState(!closed);
Predicate<String> pathPredicateToUse;
if (log.isTraceEnabled()) {
pathPredicateToUse = pathPredicate.and(path -> {
log.trace("removing {} from cache", path);
return true;
});
} else {
pathPredicateToUse = pathPredicate;
}
cacheWriteLock.lock();
try {
cache.keySet().removeIf(pathPredicateToUse);
childrenCache.keySet().removeIf(pathPredicateToUse);
statCache.keySet().removeIf(pathPredicateToUse);

immutableCache = new ImmutableCacheCopies(++updateCount, cache, statCache, childrenCache);

} finally {
cacheWriteLock.unlock();
}
}

public Optional<ServiceLockData> getLockData(ServiceLockPath path) {
List<String> children = ServiceLock.validateAndSort(path, getChildren(path.toString()));
if (children == null || children.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
import org.apache.accumulo.server.util.Admin;
import org.apache.accumulo.server.util.ZooZap;
import org.apache.accumulo.tserver.ScanServer;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
Expand Down Expand Up @@ -280,6 +281,11 @@ public synchronized void stop(ServerType server, String hostname) throws IOExcep
if (managerProcess != null) {
try {
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
try {
new ZooZap().zap(cluster.getServerContext().getSiteConfiguration(), "-manager");
} catch (RuntimeException e) {
log.error("Error zapping Manager zookeeper lock", e);
}
} catch (ExecutionException | TimeoutException e) {
log.warn("Manager did not fully stop after 30 seconds", e);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Stream;

Expand Down Expand Up @@ -90,6 +91,7 @@
import org.apache.accumulo.server.init.Initialize;
import org.apache.accumulo.server.util.AccumuloStatus;
import org.apache.accumulo.server.util.PortUtils;
import org.apache.accumulo.server.util.ZooZap;
import org.apache.accumulo.start.Main;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.accumulo.start.util.MiniDFSUtil;
Expand Down Expand Up @@ -783,9 +785,47 @@ public synchronized void stop() throws IOException, InterruptedException {

control.stop(ServerType.GARBAGE_COLLECTOR, null);
control.stop(ServerType.MANAGER, null);
control.stop(ServerType.COMPACTION_COORDINATOR);
control.stop(ServerType.TABLET_SERVER, null);
control.stop(ServerType.COMPACTOR, null);
control.stop(ServerType.SCAN_SERVER, null);

// The method calls above kill the server
// Clean up the locks in ZooKeeper fo that if the cluster
// is restarted, then the processes will start right away
// and not wait for the old locks to be cleaned up.
try {
new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager",
"-compaction-coordinators", "-tservers", "-compactors", "-sservers");
} catch (RuntimeException e) {
log.error("Error zapping zookeeper locks", e);
}
control.stop(ServerType.ZOOKEEPER, null);

// Clear the location of the servers in ZooCache.
// When ZooKeeper was stopped in the previous method call,
// the local ZooKeeper watcher did not fire. If MAC is
// restarted, then ZooKeeper will start on the same port with
// the same data, but no Watchers will fire.
boolean startCalled = true;
try {
getServerContext().getZooKeeperRoot();
} catch (IllegalStateException e) {
if (e.getMessage().startsWith("Accumulo not initialized")) {
startCalled = false;
}
}
if (startCalled) {
final ServerContext ctx = getServerContext();
final String zRoot = ctx.getZooKeeperRoot();
Predicate<String> pred = path -> false;
for (String lockPath : Set.of(Constants.ZMANAGER_LOCK, Constants.ZGC_LOCK,
Constants.ZCOMPACTORS, Constants.ZSSERVERS, Constants.ZTSERVERS)) {
pred = pred.or(path -> path.startsWith(zRoot + lockPath));
}
ctx.getZooCache().clear(pred);
}

// ACCUMULO-2985 stop the ExecutorService after we finished using it to stop accumulo procs
if (executor != null) {
List<Runnable> tasksRemaining = executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ public static void main(String[] args) throws Exception {

@Override
public void execute(String[] args) throws Exception {
try {
var siteConf = SiteConfiguration.auto();
// Login as the server on secure HDFS
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
SecurityUtil.serverLogin(siteConf);
}
zap(siteConf, args);
} finally {
SingletonManager.setMode(Mode.CLOSED);
}
}

public void zap(SiteConfiguration siteConf, String... args) {
Opts opts = new Opts();
opts.parseArgs(keyword(), args);

Expand All @@ -96,7 +109,6 @@ public void execute(String[] args) throws Exception {
return;
}

var siteConf = SiteConfiguration.auto();
try (var zk = new ZooSession(getClass().getSimpleName(), siteConf)) {
// Login as the server on secure HDFS
if (siteConf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
Expand Down Expand Up @@ -187,10 +199,7 @@ public void execute(String[] args) throws Exception {
}
}

} finally {
SingletonManager.setMode(Mode.CLOSED);
}

}

private static void zapDirectory(ZooReaderWriter zoo, String path, Opts opts)
Expand Down

0 comments on commit 98cbd85

Please sign in to comment.