From c7a2d5c386104b9c95f36abe62f44235448dd3c9 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 17 Dec 2024 21:01:27 +0000 Subject: [PATCH] Handle SIGTERM for graceful shutdown of server processes Added a ShutdownHook via Hadoop's ShutdownHookManager that interrupts the main server thread and sets the shutdownRequested variable to true. Removed variables from subclasses that were used to track shutdown requests. Modified the server threads run methods to attempt an orderly shutdown. --- .../core/util/threads/ThreadPoolNames.java | 1 + .../accumulo/server/AbstractServer.java | 38 ++- .../coordinator/CompactionCoordinator.java | 49 +-- .../CompactionCoordinatorTest.java | 14 +- .../apache/accumulo/compactor/Compactor.java | 309 +++++++++--------- .../accumulo/compactor/CompactorTest.java | 14 +- .../accumulo/gc/SimpleGarbageCollector.java | 229 ++++++------- .../org/apache/accumulo/manager/Manager.java | 17 +- .../accumulo/monitor/EmbeddedWebServer.java | 2 +- .../org/apache/accumulo/monitor/Monitor.java | 12 + .../apache/accumulo/tserver/ScanServer.java | 39 ++- .../apache/accumulo/tserver/TabletServer.java | 92 ++++-- .../accumulo/tserver/ScanServerTest.java | 5 + .../accumulo/test/SelfStoppingScanServer.java | 2 +- .../ExternalDoNothingCompactor.java | 2 +- 15 files changed, 493 insertions(+), 332 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index bdebd03b2dc..43a4a24b70b 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -59,6 +59,7 @@ public enum ThreadPoolNames { TSERVER_COMPACTION_MINOR_POOL("accumulo.pool.tserver.compaction.minor"), TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"), TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"), + TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL("accumulo.pool.tserver.shutdown.tablet.unload"), TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"), TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"), TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"), diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index c65314a0f12..e1577af8be9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -21,6 +21,7 @@ import java.util.Objects; import java.util.OptionalInt; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.Constants; @@ -34,6 +35,7 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.metrics.ProcessMetrics; import org.apache.accumulo.server.security.SecurityUtil; +import org.apache.hadoop.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +54,7 @@ public abstract class AbstractServer implements AutoCloseable, MetricsProducer, private volatile long idlePeriodStartNanos = 0L; private volatile Thread serverThread; private volatile Thread verificationThread; + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); protected AbstractServer(String appName, ServerOpts opts, String[] args) { this.log = LoggerFactory.getLogger(getClass().getName()); @@ -102,14 +105,47 @@ protected void updateIdleStatus(boolean isIdle) { } } + private void attemptGracefulStop() { + if (serverThread != null) { + serverThread.interrupt(); + } + requestShutdown(); + } + + public void requestShutdown() { + shutdownRequested.compareAndSet(false, true); + } + + public boolean isShutdownRequested() { + return shutdownRequested.get(); + } + /** - * Run this server in a main thread + * Run this server in a main thread. The server's run method should set up the server, then wait + * on isShutdownRequested() to return false, like so: + * + *
+   * public void run() {
+   *   // setup server and start threads
+   *   while (!isShutdownRequested()) {
+   *     try {
+   *       // sleep or other things
+   *     } catch (InterruptedException e) {
+   *       requestShutdown();
+   *     }
+   *   }
+   *   // shut down server
+   * }
+   * 
*/ public void runServer() throws Exception { final AtomicReference err = new AtomicReference<>(); serverThread = new Thread(TraceUtil.wrap(this), applicationName); serverThread.setUncaughtExceptionHandler((thread, exception) -> err.set(exception)); serverThread.start(); + + ShutdownHookManager.get().addShutdownHook(() -> attemptGracefulStop(), 100); + serverThread.join(); if (verificationThread != null) { verificationThread.interrupt(); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index b735d8544dc..66c446735c3 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -74,7 +74,6 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -136,9 +135,6 @@ public class CompactionCoordinator extends AbstractServer private ServiceLock coordinatorLock; - // Exposed for tests - protected volatile Boolean shutdown = false; - private final ScheduledThreadPoolExecutor schedExecutor; private final LoadingCache compactorCounts; @@ -309,31 +305,40 @@ public void run() { startDeadCompactionDetector(); LOG.info("Starting loop to check tservers for compaction summaries"); - while (!shutdown) { - long start = System.currentTimeMillis(); + while (!isShutdownRequested()) { + try { + long start = System.currentTimeMillis(); - updateSummaries(); + updateSummaries(); - long now = System.currentTimeMillis(); + long now = System.currentTimeMillis(); - Map> idleCompactors = getIdleCompactors(); - TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> { - if ((now - lastCheckTime) > getMissingCompactorWarningTime() - && QUEUE_SUMMARIES.isCompactionsQueued(queue) && idleCompactors.containsKey(queue)) { - LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", queue, - getMissingCompactorWarningTime()); - } - }); + Map> idleCompactors = getIdleCompactors(); + TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> { + if ((now - lastCheckTime) > getMissingCompactorWarningTime() + && QUEUE_SUMMARIES.isCompactionsQueued(queue) && idleCompactors.containsKey(queue)) { + LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", queue, + getMissingCompactorWarningTime()); + } + }); - long checkInterval = getTServerCheckInterval(); - long duration = (System.currentTimeMillis() - start); - if (checkInterval - duration > 0) { - LOG.debug("Waiting {}ms for next tserver check", (checkInterval - duration)); - UtilWaitThread.sleep(checkInterval - duration); + long checkInterval = getTServerCheckInterval(); + long duration = (System.currentTimeMillis() - start); + if (checkInterval - duration > 0) { + LOG.debug("Waiting {}ms for next tserver check", (checkInterval - duration)); + Thread.sleep(checkInterval - duration); + } + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + requestShutdown(); } } - LOG.info("Shutting down"); + LOG.debug("Stopping Thrift Servers"); + if (coordinatorAddress.server != null) { + coordinatorAddress.server.stop(); + } + LOG.info("stop requested. exiting ... "); } private Map> getIdleCompactors() { diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 9c6d0064796..5bef12df8f7 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -38,6 +38,7 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; @@ -96,6 +97,7 @@ public class TestCoordinator extends CompactionCoordinator { private final ServerContext context; private final ServerAddress client; private final TabletClientService.Client tabletServerClient; + private final AtomicBoolean shutdown = new AtomicBoolean(false); private Set metadataCompactionIds = null; @@ -116,10 +118,20 @@ protected void startDeadCompactionDetector() {} @Override protected long getTServerCheckInterval() { - this.shutdown = true; + requestShutdown(); return 0L; } + @Override + public void requestShutdown() { + shutdown.set(true); + } + + @Override + public boolean isShutdownRequested() { + return shutdown.get(); + } + @Override protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index b7426ce31bc..f1e22b4a37b 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -167,9 +167,6 @@ public String getQueueName() { private ServiceLock compactorLock; private ServerAddress compactorAddress = null; - // Exposed for tests - protected volatile boolean shutdown = false; - private final AtomicBoolean compactionRunning = new AtomicBoolean(false); protected Compactor(CompactorServerOpts opts, String[] args) { @@ -707,182 +704,184 @@ public void run() { final AtomicReference err = new AtomicReference<>(); - while (!shutdown) { - - // mark compactor as idle while not in the compaction loop - updateIdleStatus(true); + while (!isShutdownRequested()) { + try { + // mark compactor as idle while not in the compaction loop + updateIdleStatus(true); - currentCompactionId.set(null); - err.set(null); - JOB_HOLDER.reset(); + currentCompactionId.set(null); + err.set(null); + JOB_HOLDER.reset(); - TExternalCompactionJob job; - try { - TNextCompactionJob next = getNextJob(getNextId()); - job = next.getJob(); - if (!job.isSetExternalCompactionId()) { - LOG.trace("No external compactions in queue {}", this.queueName); - UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); + TExternalCompactionJob job; + try { + TNextCompactionJob next = getNextJob(getNextId()); + job = next.getJob(); + if (!job.isSetExternalCompactionId()) { + LOG.trace("No external compactions in queue {}", this.queueName); + UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); + continue; + } + if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { + throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() + + " does not match supplied eci " + currentCompactionId.get()); + } + } catch (RetriesExceededException e2) { + LOG.warn("Retries exceeded getting next job. Retrying..."); continue; } - if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { - throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() - + " does not match supplied eci " + currentCompactionId.get()); - } - } catch (RetriesExceededException e2) { - LOG.warn("Retries exceeded getting next job. Retrying..."); - continue; - } - LOG.debug("Received next compaction job: {}", job); + LOG.debug("Received next compaction job: {}", job); - final LongAdder totalInputEntries = new LongAdder(); - final LongAdder totalInputBytes = new LongAdder(); - final CountDownLatch started = new CountDownLatch(1); - final CountDownLatch stopped = new CountDownLatch(1); + final LongAdder totalInputEntries = new LongAdder(); + final LongAdder totalInputBytes = new LongAdder(); + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch stopped = new CountDownLatch(1); - final FileCompactorRunnable fcr = - createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); + final FileCompactorRunnable fcr = + createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); - final Thread compactionThread = - Threads.createThread("Compaction job for tablet " + job.getExtent().toString(), fcr); + final Thread compactionThread = + Threads.createThread("Compaction job for tablet " + job.getExtent().toString(), fcr); - JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); + JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); - try { - // mark compactor as busy while compacting - updateIdleStatus(false); - - // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set - fcr.initialize(); - - compactionThread.start(); // start the compactionThread - started.await(); // wait until the compactor is started - final long inputEntries = totalInputEntries.sum(); - final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); - LOG.debug("Progress checks will occur every {} seconds", waitTime); - String percentComplete = "unknown"; - - while (!stopped.await(waitTime, TimeUnit.SECONDS)) { - List running = - org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); - if (!running.isEmpty()) { - // Compaction has started. There should only be one in the list - CompactionInfo info = running.get(0); - if (info != null) { - final long entriesRead = info.getEntriesRead(); - final long entriesWritten = info.getEntriesWritten(); - if (inputEntries > 0) { - percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); - } - String message = String.format( - "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", - entriesRead, inputEntries, percentComplete, "%", entriesWritten); - watcher.run(); - try { - LOG.debug("Updating coordinator with compaction progress: {}.", message); - TCompactionStatusUpdate update = new TCompactionStatusUpdate( - TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, - entriesWritten, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - } catch (RetriesExceededException e) { - LOG.warn("Error updating coordinator with compaction progress, error: {}", - e.getMessage()); + try { + // mark compactor as busy while compacting + updateIdleStatus(false); + + // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set + fcr.initialize(); + + compactionThread.start(); // start the compactionThread + started.await(); // wait until the compactor is started + final long inputEntries = totalInputEntries.sum(); + final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); + LOG.debug("Progress checks will occur every {} seconds", waitTime); + String percentComplete = "unknown"; + + while (!stopped.await(waitTime, TimeUnit.SECONDS)) { + List running = + org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); + if (!running.isEmpty()) { + // Compaction has started. There should only be one in the list + CompactionInfo info = running.get(0); + if (info != null) { + final long entriesRead = info.getEntriesRead(); + final long entriesWritten = info.getEntriesWritten(); + if (inputEntries > 0) { + percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); + } + String message = String.format( + "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", + entriesRead, inputEntries, percentComplete, "%", entriesWritten); + watcher.run(); + try { + LOG.debug("Updating coordinator with compaction progress: {}.", message); + TCompactionStatusUpdate update = new TCompactionStatusUpdate( + TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, + entriesWritten, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + } catch (RetriesExceededException e) { + LOG.warn("Error updating coordinator with compaction progress, error: {}", + e.getMessage()); + } } + } else { + LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); } - } else { - LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); } - } - compactionThread.join(); - LOG.trace("Compaction thread finished."); - // Run the watcher again to clear out the finished compaction and set the - // stuck count to zero. - watcher.run(); - - if (err.get() != null) { - // maybe the error occured because the table was deleted or something like that, so - // force a cancel check to possibly reduce noise in the logs - checkIfCanceled(); - } - - if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() - || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { - LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); - try { - TCompactionStatusUpdate update = - new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", - -1, -1, -1, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - updateCompactionFailed(job); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction cancellation.", e); - } finally { - currentCompactionId.set(null); + compactionThread.join(); + LOG.trace("Compaction thread finished."); + // Run the watcher again to clear out the finished compaction and set the + // stuck count to zero. + watcher.run(); + + if (err.get() != null) { + // maybe the error occured because the table was deleted or something like that, so + // force a cancel check to possibly reduce noise in the logs + checkIfCanceled(); } - } else if (err.get() != null) { - KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - try { - LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", - job.getExternalCompactionId(), fromThriftExtent); - TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.FAILED, - "Compaction failed due to: " + err.get().getMessage(), -1, -1, -1, - fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - updateCompactionFailed(job); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", - job.getExternalCompactionId(), fromThriftExtent, e); - } finally { - currentCompactionId.set(null); - } - } else { - try { - LOG.trace("Updating coordinator with compaction completion."); - updateCompactionCompleted(job, JOB_HOLDER.getStats()); - } catch (RetriesExceededException e) { - LOG.error( - "Error updating coordinator with compaction completion, cancelling compaction.", - e); + + if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() + || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { + LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); + try { + TCompactionStatusUpdate update = + new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", + -1, -1, -1, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + updateCompactionFailed(job); + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction cancellation.", e); + } finally { + currentCompactionId.set(null); + } + } else if (err.get() != null) { + KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + try { + LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", + job.getExternalCompactionId(), fromThriftExtent); + TCompactionStatusUpdate update = new TCompactionStatusUpdate( + TCompactionState.FAILED, "Compaction failed due to: " + err.get().getMessage(), + -1, -1, -1, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + updateCompactionFailed(job); + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", + job.getExternalCompactionId(), fromThriftExtent, e); + } finally { + currentCompactionId.set(null); + } + } else { try { - cancel(job.getExternalCompactionId()); - } catch (TException e1) { - LOG.error("Error cancelling compaction.", e1); + LOG.trace("Updating coordinator with compaction completion."); + updateCompactionCompleted(job, JOB_HOLDER.getStats()); + } catch (RetriesExceededException e) { + LOG.error( + "Error updating coordinator with compaction completion, cancelling compaction.", + e); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e1) { + LOG.error("Error cancelling compaction.", e1); + } + } finally { + currentCompactionId.set(null); } - } finally { - currentCompactionId.set(null); } - } - } catch (RuntimeException e1) { - LOG.error( - "Compactor thread was interrupted waiting for compaction to start, cancelling job", - e1); - try { - cancel(job.getExternalCompactionId()); - } catch (TException e2) { - LOG.error("Error cancelling compaction.", e2); - } - } finally { - currentCompactionId.set(null); + } catch (RuntimeException e1) { + LOG.error( + "Compactor thread was interrupted waiting for compaction to start, cancelling job", + e1); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e2) { + LOG.error("Error cancelling compaction.", e2); + } + } finally { + currentCompactionId.set(null); - // mark compactor as idle after compaction completes - updateIdleStatus(true); + // mark compactor as idle after compaction completes + updateIdleStatus(true); - // In the case where there is an error in the foreground code the background compaction - // may still be running. Must cancel it before starting another iteration of the loop to - // avoid multiple threads updating shared state. - while (compactionThread.isAlive()) { - compactionThread.interrupt(); - compactionThread.join(1000); + // In the case where there is an error in the foreground code the background compaction + // may still be running. Must cancel it before starting another iteration of the loop to + // avoid multiple threads updating shared state. + while (compactionThread.isAlive()) { + compactionThread.interrupt(); + compactionThread.join(1000); + } } + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + requestShutdown(); } - - } - + } // end while } catch (Exception e) { LOG.error("Unhandled error occurred in Compactor", e); } finally { // Shutdown local thrift server - LOG.info("Stopping Thrift Servers"); + LOG.debug("Stopping Thrift Servers"); if (compactorAddress.server != null) { compactorAddress.server.stop(); } diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index 1eb76e1016f..1819ca9e08e 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Supplier; @@ -195,6 +196,7 @@ public class SuccessfulCompactor extends Compactor { private volatile boolean completedCalled = false; private volatile boolean failedCalled = false; private TCompactionStatusUpdate latestState = null; + private final AtomicBoolean shutdown = new AtomicBoolean(false); SuccessfulCompactor(Supplier uuid, ServerAddress address, TExternalCompactionJob job, ServerContext context, ExternalCompactionId eci, CompactorServerOpts compactorServerOpts) { @@ -227,10 +229,20 @@ protected ServerAddress startCompactorClientService() throws UnknownHostExceptio protected TNextCompactionJob getNextJob(Supplier uuid) throws RetriesExceededException { LOG.info("Attempting to get next job, eci = {}", eci); currentCompactionId.set(eci); - this.shutdown = true; + requestShutdown(); return new TNextCompactionJob(job, 1); } + @Override + public void requestShutdown() { + shutdown.set(true); + } + + @Override + public boolean isShutdownRequested() { + return shutdown.get(); + } + @Override protected synchronized void checkIfCanceled() {} diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index bd78388836e..6d89bc0d7bd 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -193,138 +193,145 @@ public void run() { } }); - while (true) { - Span outerSpan = TraceUtil.startSpan(this.getClass(), "gc"); - try (Scope outerScope = outerSpan.makeCurrent()) { - Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop"); - try (Scope innerScope = innerSpan.makeCurrent()) { - final long tStart = System.nanoTime(); + while (!isShutdownRequested()) { + try { + Span outerSpan = TraceUtil.startSpan(this.getClass(), "gc"); + try (Scope outerScope = outerSpan.makeCurrent()) { + Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop"); + try (Scope innerScope = innerSpan.makeCurrent()) { + final long tStart = System.nanoTime(); + try { + System.gc(); // make room + + status.current.started = System.currentTimeMillis(); + var rootGC = new GCRun(DataLevel.ROOT, getContext()); + var mdGC = new GCRun(DataLevel.METADATA, getContext()); + var userGC = new GCRun(DataLevel.USER, getContext()); + + log.info("Starting Root table Garbage Collection."); + status.current.bulks += new GarbageCollectionAlgorithm().collect(rootGC); + incrementStatsForRun(rootGC); + logStats(); + + log.info("Starting Metadata table Garbage Collection."); + status.current.bulks += new GarbageCollectionAlgorithm().collect(mdGC); + incrementStatsForRun(mdGC); + logStats(); + + log.info("Starting User table Garbage Collection."); + status.current.bulks += new GarbageCollectionAlgorithm().collect(userGC); + incrementStatsForRun(userGC); + logStats(); + + } catch (Exception e) { + TraceUtil.setException(innerSpan, e, false); + log.error("{}", e.getMessage(), e); + } finally { + status.current.finished = System.currentTimeMillis(); + status.last = status.current; + gcCycleMetrics.setLastCollect(status.current); + status.current = new GcCycleStats(); + } + + final long tStop = System.nanoTime(); + log.info(String.format("Collect cycle took %.2f seconds", + (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0))); + + /* + * We want to prune references to fully-replicated WALs from the replication table which + * are no longer referenced in the metadata table before running + * GarbageCollectWriteAheadLogs to ensure we delete as many files as possible. + */ + Span replSpan = TraceUtil.startSpan(this.getClass(), "replicationClose"); + try (Scope replScope = replSpan.makeCurrent()) { + @SuppressWarnings("deprecation") + Runnable closeWals = + new org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences(getContext()); + closeWals.run(); + } catch (Exception e) { + TraceUtil.setException(replSpan, e, false); + log.error("Error trying to close write-ahead logs for replication table", e); + } finally { + replSpan.end(); + } + + // Clean up any unused write-ahead logs + Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs"); + try (Scope walScope = walSpan.makeCurrent()) { + GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs( + getContext(), fs, liveTServerSet, isUsingTrash()); + log.info("Beginning garbage collection of write-ahead logs"); + walogCollector.collect(status); + gcCycleMetrics.setLastWalCollect(status.lastLog); + } catch (Exception e) { + TraceUtil.setException(walSpan, e, false); + log.error("{}", e.getMessage(), e); + } finally { + walSpan.end(); + } + } catch (Exception e) { + TraceUtil.setException(innerSpan, e, true); + throw e; + } finally { + innerSpan.end(); + } + + // we just made a lot of metadata changes: flush them out try { - System.gc(); // make room + AccumuloClient accumuloClient = getContext(); - status.current.started = System.currentTimeMillis(); - var rootGC = new GCRun(DataLevel.ROOT, getContext()); - var mdGC = new GCRun(DataLevel.METADATA, getContext()); - var userGC = new GCRun(DataLevel.USER, getContext()); + final long actionStart = System.nanoTime(); - log.info("Starting Root table Garbage Collection."); - status.current.bulks += new GarbageCollectionAlgorithm().collect(rootGC); - incrementStatsForRun(rootGC); - logStats(); + String action = getConfiguration().get(Property.GC_USE_FULL_COMPACTION); + log.debug("gc post action {} started", action); - log.info("Starting Metadata table Garbage Collection."); - status.current.bulks += new GarbageCollectionAlgorithm().collect(mdGC); - incrementStatsForRun(mdGC); - logStats(); + switch (action) { + case "compact": + accumuloClient.tableOperations().compact(MetadataTable.NAME, null, null, true, + true); + accumuloClient.tableOperations().compact(RootTable.NAME, null, null, true, true); + break; + case "flush": + accumuloClient.tableOperations().flush(MetadataTable.NAME, null, null, true); + accumuloClient.tableOperations().flush(RootTable.NAME, null, null, true); + break; + default: + log.trace("'none - no action' or invalid value provided: {}", action); + } - log.info("Starting User table Garbage Collection."); - status.current.bulks += new GarbageCollectionAlgorithm().collect(userGC); - incrementStatsForRun(userGC); - logStats(); + final long actionComplete = System.nanoTime(); - } catch (Exception e) { - TraceUtil.setException(innerSpan, e, false); - log.error("{}", e.getMessage(), e); - } finally { - status.current.finished = System.currentTimeMillis(); - status.last = status.current; - gcCycleMetrics.setLastCollect(status.current); - status.current = new GcCycleStats(); - } + gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart); - final long tStop = System.nanoTime(); - log.info(String.format("Collect cycle took %.2f seconds", - (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0))); - - /* - * We want to prune references to fully-replicated WALs from the replication table which - * are no longer referenced in the metadata table before running - * GarbageCollectWriteAheadLogs to ensure we delete as many files as possible. - */ - Span replSpan = TraceUtil.startSpan(this.getClass(), "replicationClose"); - try (Scope replScope = replSpan.makeCurrent()) { - @SuppressWarnings("deprecation") - Runnable closeWals = - new org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences(getContext()); - closeWals.run(); - } catch (Exception e) { - TraceUtil.setException(replSpan, e, false); - log.error("Error trying to close write-ahead logs for replication table", e); - } finally { - replSpan.end(); - } + log.info("gc post action {} completed in {} seconds", action, String.format("%.2f", + (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0))); - // Clean up any unused write-ahead logs - Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs"); - try (Scope walScope = walSpan.makeCurrent()) { - GarbageCollectWriteAheadLogs walogCollector = - new GarbageCollectWriteAheadLogs(getContext(), fs, liveTServerSet, isUsingTrash()); - log.info("Beginning garbage collection of write-ahead logs"); - walogCollector.collect(status); - gcCycleMetrics.setLastWalCollect(status.lastLog); } catch (Exception e) { - TraceUtil.setException(walSpan, e, false); - log.error("{}", e.getMessage(), e); - } finally { - walSpan.end(); + TraceUtil.setException(outerSpan, e, false); + log.warn("{}", e.getMessage(), e); } } catch (Exception e) { - TraceUtil.setException(innerSpan, e, true); + TraceUtil.setException(outerSpan, e, true); throw e; } finally { - innerSpan.end(); + outerSpan.end(); } - - // we just made a lot of metadata changes: flush them out try { - AccumuloClient accumuloClient = getContext(); - - final long actionStart = System.nanoTime(); - - String action = getConfiguration().get(Property.GC_USE_FULL_COMPACTION); - log.debug("gc post action {} started", action); - - switch (action) { - case "compact": - accumuloClient.tableOperations().compact(MetadataTable.NAME, null, null, true, true); - accumuloClient.tableOperations().compact(RootTable.NAME, null, null, true, true); - break; - case "flush": - accumuloClient.tableOperations().flush(MetadataTable.NAME, null, null, true); - accumuloClient.tableOperations().flush(RootTable.NAME, null, null, true); - break; - default: - log.trace("'none - no action' or invalid value provided: {}", action); - } - - final long actionComplete = System.nanoTime(); - - gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart); - - log.info("gc post action {} completed in {} seconds", action, String.format("%.2f", - (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0))); - } catch (Exception e) { - TraceUtil.setException(outerSpan, e, false); + gcCycleMetrics.incrementRunCycleCount(); + long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); + log.debug("Sleeping for {} milliseconds", gcDelay); + Thread.sleep(gcDelay); + } catch (InterruptedException e) { log.warn("{}", e.getMessage(), e); + throw e; } - } catch (Exception e) { - TraceUtil.setException(outerSpan, e, true); - throw e; - } finally { - outerSpan.end(); - } - try { - - gcCycleMetrics.incrementRunCycleCount(); - long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); - log.debug("Sleeping for {} milliseconds", gcDelay); - Thread.sleep(gcDelay); } catch (InterruptedException e) { - log.warn("{}", e.getMessage(), e); - return; + log.info("Interrupt Exception received, shutting down"); + requestShutdown(); } } + log.info("stop requested. exiting ... "); } private void incrementStatsForRun(GCRun gcRun) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index a3540fa062a..10326962344 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1443,10 +1443,19 @@ boolean canSuspendTablets() { // The manager is fully initialized. Clients are allowed to connect now. managerInitialized.set(true); - while (clientService.isServing()) { - sleepUninterruptibly(500, MILLISECONDS); + while (!isShutdownRequested() && clientService.isServing()) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + log.info("Interrupt Exception received, shutting down"); + requestShutdown(); + } } - log.info("Shutting down fate."); + + LOG.debug("Stopping Thrift Servers"); + sa.server.stop(); + + log.debug("Shutting down fate."); fate().shutdown(); final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; @@ -1487,7 +1496,7 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception waiting on watcher", e); } } - log.info("exiting"); + log.info("stop requested. exiting ... "); } @Deprecated diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java index 6784206b686..300534858aa 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java @@ -127,7 +127,7 @@ public void start() { } } - private void stop() { + public void stop() { try { server.stop(); server.join(); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 4108282bcb3..8089acae696 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -526,6 +526,18 @@ public void run() { }).start(); monitorInitialized.set(true); + + while (!isShutdownRequested()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + requestShutdown(); + } + } + + server.stop(); + log.info("stop requested. exiting ... "); } private ServletHolder getDefaultServlet() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 90a4fc0854e..a457b07ff3d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -201,7 +201,6 @@ private TabletMetadataLoader(Ample ample) { HostAndPort clientAddress; private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); - protected volatile boolean serverStopRequested = false; private ServiceLock scanServerLock; protected TabletServerScanMetrics scanMetrics; private ScanServerMetrics scanServerMetrics; @@ -352,8 +351,8 @@ private ServiceLock announceExistence() { @Override public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { + Halt.halt(isShutdownRequested() ? 0 : 1, () -> { + if (!isShutdownRequested()) { LOG.error("Lost tablet server lock (reason = {}), exiting.", reason); } gcLogger.logGCInfo(getConfiguration()); @@ -417,17 +416,29 @@ public void run() { ServiceLock lock = announceExistence(); try { - while (!serverStopRequested) { - UtilWaitThread.sleep(1000); - updateIdleStatus( - sessionManager.getActiveScans().isEmpty() && tabletMetadataCache.estimatedSize() == 0); + while (!isShutdownRequested()) { + try { + Thread.sleep(1000); + updateIdleStatus(sessionManager.getActiveScans().isEmpty() + && tabletMetadataCache.estimatedSize() == 0); + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + requestShutdown(); + } } } finally { - LOG.info("Stopping Thrift Servers"); + // Wait for scans to got to zero + while (!sessionManager.getActiveScans().isEmpty()) { + LOG.debug("Waiting on {} active scans to complete.", + sessionManager.getActiveScans().size()); + UtilWaitThread.sleep(1000); + } + + LOG.debug("Stopping Thrift Servers"); address.server.stop(); try { - LOG.info("Removing server scan references"); + LOG.debug("Removing server scan references"); this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(), serverLockUUID); } catch (Exception e) { @@ -944,6 +955,11 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent t long busyTimeout) throws ThriftSecurityException, NotServingTabletException, TooManyFilesException, TSampleNotPresentException, TException { + if (isShutdownRequested()) { + // Prevent scans from starting if shutting down + throw new ScanServerBusyException(); + } + KeyExtent extent = getKeyExtent(textent); if (extent.isMeta() && !isSystemUser(credentials)) { @@ -1004,6 +1020,11 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map executionHints, long busyTimeout) throws ThriftSecurityException, TSampleNotPresentException, TException { + if (isShutdownRequested()) { + // Prevent scans from starting if shutting down + throw new ScanServerBusyException(); + } + if (tbatch.size() == 0) { throw new TException("Scan Server batch must include at least one extent"); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 54764b86725..4f9e0ee32c6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -54,6 +54,7 @@ import java.util.UUID; import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; @@ -87,12 +88,14 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.ComparablePair; import org.apache.accumulo.core.util.Halt; @@ -104,6 +107,7 @@ import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.threads.ThreadPoolNames; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.AbstractServer; @@ -219,9 +223,6 @@ public TabletServerMinCMetrics getMinCMetrics() { volatile HostAndPort clientAddress; - private volatile boolean serverStopRequested = false; - private volatile boolean shutdownComplete = false; - private ServiceLock tabletServerLock; private TServer server; @@ -401,7 +402,7 @@ String getLockID() { void requestStop() { log.info("Stop requested."); - serverStopRequested = true; + requestShutdown(); } private class SplitRunner implements Runnable { @@ -683,8 +684,8 @@ private void announceExistence() { @Override public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { + Halt.halt(isShutdownRequested() ? 0 : 1, () -> { + if (!isShutdownRequested()) { log.error("Lost tablet server lock (reason = {}), exiting.", reason); } gcLogger.logGCInfo(getConfiguration()); @@ -873,7 +874,7 @@ public void run() { CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS)); HostAndPort managerHost; - while (!serverStopRequested) { + while (!isShutdownRequested()) { updateIdleStatus(getOnlineTablets().isEmpty()); @@ -885,7 +886,7 @@ public void run() { try { // wait until a message is ready to send, or a server stop // was requested - while (mm == null && !serverStopRequested) { + while (mm == null && !isShutdownRequested()) { mm = managerMessages.poll(1, TimeUnit.SECONDS); updateIdleStatus(getOnlineTablets().isEmpty()); } @@ -898,7 +899,7 @@ public void run() { // if while loop does not execute at all and mm != null, // then finally block should place mm back on queue - while (!serverStopRequested && mm != null && client != null + while (!isShutdownRequested() && mm != null && client != null && client.getOutputProtocol() != null && client.getOutputProtocol().getTransport() != null && client.getOutputProtocol().getTransport().isOpen()) { @@ -929,7 +930,7 @@ public void run() { } } catch (InterruptedException e) { log.info("Interrupt Exception received, shutting down"); - serverStopRequested = true; + requestShutdown(); } catch (Exception e) { // may have lost connection with manager // loop back to the beginning and wait for a new one @@ -938,21 +939,6 @@ public void run() { } } - // wait for shutdown - // if the main thread exits oldServer the manager listener, the JVM will - // kill the other threads and finalize objects. We want the shutdown that is - // running in the manager listener thread to complete oldServer this happens. - // consider making other threads daemon threads so that objects don't - // get prematurely finalized - synchronized (this) { - while (!shutdownComplete) { - try { - this.wait(1000); - } catch (InterruptedException e) { - log.error(e.toString()); - } - } - } log.debug("Stopping Replication Server"); if (this.replServer != null) { this.replServer.stop(); @@ -963,6 +949,62 @@ public void run() { server.stop(); } + // Best-effort attempt at unloading tablets. + log.debug("Unloading tablets"); + final List> futures = new ArrayList<>(); + final ThreadPoolExecutor tpe = getContext().threadPools() + .getPoolBuilder(ThreadPoolNames.TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL).numCoreThreads(8) + .numMaxThreads(16).build(); + + ManagerClientService.Client iface = managerConnection(getManagerAddress()); + boolean managerDown = false; + + try { + for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) { + getOnlineTablets().keySet().forEach(ke -> { + if (DataLevel.of(ke.tableId()) == level) { + futures.add( + tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED, 5000))); + } + }); + while (!futures.isEmpty()) { + Iterator> unloads = futures.iterator(); + while (unloads.hasNext()) { + Future f = unloads.next(); + if (f.isDone()) { + if (!managerDown) { + ManagerMessage mm = managerMessages.poll(); + try { + mm.send(getContext().rpcCreds(), getClientAddressString(), iface); + } catch (TException e) { + managerDown = true; + LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", + e.getMessage()); + } + } + unloads.remove(); + } + } + log.debug("Waiting on {} {} tablets to close.", futures.size(), level); + } + } + } finally { + if (!managerDown) { + try { + ManagerMessage mm = managerMessages.poll(); + do { + mm.send(getContext().rpcCreds(), getClientAddressString(), iface); + mm = managerMessages.poll(); + } while (mm != null); + } catch (TException e) { + LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", + e.getMessage()); + } + } + returnManagerConnection(iface); + tpe.shutdown(); + } + try { log.debug("Closing filesystems"); getVolumeManager().close(); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java index 73a3e0d03bc..6719bf6b926 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java @@ -117,6 +117,11 @@ protected boolean isSystemUser(TCredentials creds) { return systemUser; } + @Override + public boolean isShutdownRequested() { + return false; + } + } private ThriftScanClientHandler handler; diff --git a/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java b/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java index 8391190984c..80a41ff0610 100644 --- a/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java +++ b/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java @@ -45,7 +45,7 @@ public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDExceptio scanCount.incrementAndGet(); super.closeMultiScan(tinfo, scanID); if (scanCount.get() == 3) { - serverStopRequested = true; + requestShutdown(); } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index 08fc5fbb8d9..208fc736b56 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@ -60,7 +60,7 @@ protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob job, CountDownLatch stopped, AtomicReference err) { // Set this to true so that only 1 external compaction is run - this.shutdown = true; + requestShutdown(); return new FileCompactorRunnable() {