Skip to content

Commit

Permalink
Handle SIGTERM for graceful shutdown of server processes
Browse files Browse the repository at this point in the history
Added a ShutdownHook via Hadoop's ShutdownHookManager that
interrupts the main server thread and sets the shutdownRequested
variable to true. Removed variables from subclasses that were used
to track shutdown requests. Modified the server threads run methods
to attempt an orderly shutdown.
  • Loading branch information
dlmarion committed Dec 17, 2024
1 parent 79d972f commit c7a2d5c
Show file tree
Hide file tree
Showing 15 changed files with 493 additions and 332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public enum ThreadPoolNames {
TSERVER_COMPACTION_MINOR_POOL("accumulo.pool.tserver.compaction.minor"),
TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"),
TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"),
TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL("accumulo.pool.tserver.shutdown.tablet.unload"),
TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"),
TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"),
TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.accumulo.core.Constants;
Expand All @@ -34,6 +35,7 @@
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.metrics.ProcessMetrics;
import org.apache.accumulo.server.security.SecurityUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,6 +54,7 @@ public abstract class AbstractServer implements AutoCloseable, MetricsProducer,
private volatile long idlePeriodStartNanos = 0L;
private volatile Thread serverThread;
private volatile Thread verificationThread;
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);

protected AbstractServer(String appName, ServerOpts opts, String[] args) {
this.log = LoggerFactory.getLogger(getClass().getName());
Expand Down Expand Up @@ -102,14 +105,47 @@ protected void updateIdleStatus(boolean isIdle) {
}
}

private void attemptGracefulStop() {
if (serverThread != null) {
serverThread.interrupt();
}
requestShutdown();
}

public void requestShutdown() {
shutdownRequested.compareAndSet(false, true);
}

public boolean isShutdownRequested() {
return shutdownRequested.get();
}

/**
* Run this server in a main thread
* Run this server in a main thread. The server's run method should set up the server, then wait
* on isShutdownRequested() to return false, like so:
*
* <pre>
* public void run() {
* // setup server and start threads
* while (!isShutdownRequested()) {
* try {
* // sleep or other things
* } catch (InterruptedException e) {
* requestShutdown();
* }
* }
* // shut down server
* }
* </pre>
*/
public void runServer() throws Exception {
final AtomicReference<Throwable> err = new AtomicReference<>();
serverThread = new Thread(TraceUtil.wrap(this), applicationName);
serverThread.setUncaughtExceptionHandler((thread, exception) -> err.set(exception));
serverThread.start();

ShutdownHookManager.get().addShutdownHook(() -> attemptGracefulStop(), 100);

serverThread.join();
if (verificationThread != null) {
verificationThread.interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.core.util.compaction.RunningCompaction;
import org.apache.accumulo.core.util.threads.ThreadPools;
Expand Down Expand Up @@ -136,9 +135,6 @@ public class CompactionCoordinator extends AbstractServer

private ServiceLock coordinatorLock;

// Exposed for tests
protected volatile Boolean shutdown = false;

private final ScheduledThreadPoolExecutor schedExecutor;

private final LoadingCache<String,Integer> compactorCounts;
Expand Down Expand Up @@ -309,31 +305,40 @@ public void run() {
startDeadCompactionDetector();

LOG.info("Starting loop to check tservers for compaction summaries");
while (!shutdown) {
long start = System.currentTimeMillis();
while (!isShutdownRequested()) {
try {
long start = System.currentTimeMillis();

updateSummaries();
updateSummaries();

long now = System.currentTimeMillis();
long now = System.currentTimeMillis();

Map<String,List<HostAndPort>> idleCompactors = getIdleCompactors();
TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> {
if ((now - lastCheckTime) > getMissingCompactorWarningTime()
&& QUEUE_SUMMARIES.isCompactionsQueued(queue) && idleCompactors.containsKey(queue)) {
LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", queue,
getMissingCompactorWarningTime());
}
});
Map<String,List<HostAndPort>> idleCompactors = getIdleCompactors();
TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> {
if ((now - lastCheckTime) > getMissingCompactorWarningTime()
&& QUEUE_SUMMARIES.isCompactionsQueued(queue) && idleCompactors.containsKey(queue)) {
LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", queue,
getMissingCompactorWarningTime());
}
});

long checkInterval = getTServerCheckInterval();
long duration = (System.currentTimeMillis() - start);
if (checkInterval - duration > 0) {
LOG.debug("Waiting {}ms for next tserver check", (checkInterval - duration));
UtilWaitThread.sleep(checkInterval - duration);
long checkInterval = getTServerCheckInterval();
long duration = (System.currentTimeMillis() - start);
if (checkInterval - duration > 0) {
LOG.debug("Waiting {}ms for next tserver check", (checkInterval - duration));
Thread.sleep(checkInterval - duration);
}
} catch (InterruptedException e) {
LOG.info("Interrupt Exception received, shutting down");
requestShutdown();
}
}

LOG.info("Shutting down");
LOG.debug("Stopping Thrift Servers");
if (coordinatorAddress.server != null) {
coordinatorAddress.server.stop();
}
LOG.info("stop requested. exiting ... ");
}

private Map<String,List<HostAndPort>> getIdleCompactors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class TestCoordinator extends CompactionCoordinator {
private final ServerContext context;
private final ServerAddress client;
private final TabletClientService.Client tabletServerClient;
private final AtomicBoolean shutdown = new AtomicBoolean(false);

private Set<ExternalCompactionId> metadataCompactionIds = null;

Expand All @@ -116,10 +118,20 @@ protected void startDeadCompactionDetector() {}

@Override
protected long getTServerCheckInterval() {
this.shutdown = true;
requestShutdown();
return 0L;
}

@Override
public void requestShutdown() {
shutdown.set(true);
}

@Override
public boolean isShutdownRequested() {
return shutdown.get();
}

@Override
protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {}

Expand Down
Loading

0 comments on commit c7a2d5c

Please sign in to comment.