diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index bdebd03b2dc..43a4a24b70b 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -59,6 +59,7 @@ public enum ThreadPoolNames { TSERVER_COMPACTION_MINOR_POOL("accumulo.pool.tserver.compaction.minor"), TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"), TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"), + TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL("accumulo.pool.tserver.shutdown.tablet.unload"), TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"), TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"), TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"), diff --git a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java index c65314a0f12..e1577af8be9 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java @@ -21,6 +21,7 @@ import java.util.Objects; import java.util.OptionalInt; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.accumulo.core.Constants; @@ -34,6 +35,7 @@ import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.metrics.ProcessMetrics; import org.apache.accumulo.server.security.SecurityUtil; +import org.apache.hadoop.util.ShutdownHookManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +54,7 @@ public abstract class AbstractServer implements AutoCloseable, MetricsProducer, private volatile long idlePeriodStartNanos = 0L; private volatile Thread serverThread; private volatile Thread verificationThread; + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); protected AbstractServer(String appName, ServerOpts opts, String[] args) { this.log = LoggerFactory.getLogger(getClass().getName()); @@ -102,14 +105,47 @@ protected void updateIdleStatus(boolean isIdle) { } } + private void attemptGracefulStop() { + if (serverThread != null) { + serverThread.interrupt(); + } + requestShutdown(); + } + + public void requestShutdown() { + shutdownRequested.compareAndSet(false, true); + } + + public boolean isShutdownRequested() { + return shutdownRequested.get(); + } + /** - * Run this server in a main thread + * Run this server in a main thread. The server's run method should set up the server, then wait + * on isShutdownRequested() to return false, like so: + * + *
+   * public void run() {
+   *   // setup server and start threads
+   *   while (!isShutdownRequested()) {
+   *     try {
+   *       // sleep or other things
+   *     } catch (InterruptedException e) {
+   *       requestShutdown();
+   *     }
+   *   }
+   *   // shut down server
+   * }
+   * 
*/ public void runServer() throws Exception { final AtomicReference err = new AtomicReference<>(); serverThread = new Thread(TraceUtil.wrap(this), applicationName); serverThread.setUncaughtExceptionHandler((thread, exception) -> err.set(exception)); serverThread.start(); + + ShutdownHookManager.get().addShutdownHook(() -> attemptGracefulStop(), 100); + serverThread.join(); if (verificationThread != null) { verificationThread.interrupt(); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index b735d8544dc..66c446735c3 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -74,7 +74,6 @@ import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.HostAndPort; -import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.compaction.RunningCompaction; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -136,9 +135,6 @@ public class CompactionCoordinator extends AbstractServer private ServiceLock coordinatorLock; - // Exposed for tests - protected volatile Boolean shutdown = false; - private final ScheduledThreadPoolExecutor schedExecutor; private final LoadingCache compactorCounts; @@ -309,31 +305,40 @@ public void run() { startDeadCompactionDetector(); LOG.info("Starting loop to check tservers for compaction summaries"); - while (!shutdown) { - long start = System.currentTimeMillis(); + while (!isShutdownRequested()) { + try { + long start = System.currentTimeMillis(); - updateSummaries(); + updateSummaries(); - long now = System.currentTimeMillis(); + long now = System.currentTimeMillis(); - Map> idleCompactors = getIdleCompactors(); - TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> { - if ((now - lastCheckTime) > getMissingCompactorWarningTime() - && QUEUE_SUMMARIES.isCompactionsQueued(queue) && idleCompactors.containsKey(queue)) { - LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", queue, - getMissingCompactorWarningTime()); - } - }); + Map> idleCompactors = getIdleCompactors(); + TIME_COMPACTOR_LAST_CHECKED.forEach((queue, lastCheckTime) -> { + if ((now - lastCheckTime) > getMissingCompactorWarningTime() + && QUEUE_SUMMARIES.isCompactionsQueued(queue) && idleCompactors.containsKey(queue)) { + LOG.warn("No compactors have checked in with coordinator for queue {} in {}ms", queue, + getMissingCompactorWarningTime()); + } + }); - long checkInterval = getTServerCheckInterval(); - long duration = (System.currentTimeMillis() - start); - if (checkInterval - duration > 0) { - LOG.debug("Waiting {}ms for next tserver check", (checkInterval - duration)); - UtilWaitThread.sleep(checkInterval - duration); + long checkInterval = getTServerCheckInterval(); + long duration = (System.currentTimeMillis() - start); + if (checkInterval - duration > 0) { + LOG.debug("Waiting {}ms for next tserver check", (checkInterval - duration)); + Thread.sleep(checkInterval - duration); + } + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + requestShutdown(); } } - LOG.info("Shutting down"); + LOG.debug("Stopping Thrift Servers"); + if (coordinatorAddress.server != null) { + coordinatorAddress.server.stop(); + } + LOG.info("stop requested. exiting ... "); } private Map> getIdleCompactors() { diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 9c6d0064796..5bef12df8f7 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -38,6 +38,7 @@ import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; @@ -96,6 +97,7 @@ public class TestCoordinator extends CompactionCoordinator { private final ServerContext context; private final ServerAddress client; private final TabletClientService.Client tabletServerClient; + private final AtomicBoolean shutdown = new AtomicBoolean(false); private Set metadataCompactionIds = null; @@ -116,10 +118,20 @@ protected void startDeadCompactionDetector() {} @Override protected long getTServerCheckInterval() { - this.shutdown = true; + requestShutdown(); return 0L; } + @Override + public void requestShutdown() { + shutdown.set(true); + } + + @Override + public boolean isShutdownRequested() { + return shutdown.get(); + } + @Override protected void startCompactionCleaner(ScheduledThreadPoolExecutor schedExecutor) {} diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index b7426ce31bc..f1e22b4a37b 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -167,9 +167,6 @@ public String getQueueName() { private ServiceLock compactorLock; private ServerAddress compactorAddress = null; - // Exposed for tests - protected volatile boolean shutdown = false; - private final AtomicBoolean compactionRunning = new AtomicBoolean(false); protected Compactor(CompactorServerOpts opts, String[] args) { @@ -707,182 +704,184 @@ public void run() { final AtomicReference err = new AtomicReference<>(); - while (!shutdown) { - - // mark compactor as idle while not in the compaction loop - updateIdleStatus(true); + while (!isShutdownRequested()) { + try { + // mark compactor as idle while not in the compaction loop + updateIdleStatus(true); - currentCompactionId.set(null); - err.set(null); - JOB_HOLDER.reset(); + currentCompactionId.set(null); + err.set(null); + JOB_HOLDER.reset(); - TExternalCompactionJob job; - try { - TNextCompactionJob next = getNextJob(getNextId()); - job = next.getJob(); - if (!job.isSetExternalCompactionId()) { - LOG.trace("No external compactions in queue {}", this.queueName); - UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); + TExternalCompactionJob job; + try { + TNextCompactionJob next = getNextJob(getNextId()); + job = next.getJob(); + if (!job.isSetExternalCompactionId()) { + LOG.trace("No external compactions in queue {}", this.queueName); + UtilWaitThread.sleep(getWaitTimeBetweenCompactionChecks(next.getCompactorCount())); + continue; + } + if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { + throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() + + " does not match supplied eci " + currentCompactionId.get()); + } + } catch (RetriesExceededException e2) { + LOG.warn("Retries exceeded getting next job. Retrying..."); continue; } - if (!job.getExternalCompactionId().equals(currentCompactionId.get().toString())) { - throw new IllegalStateException("Returned eci " + job.getExternalCompactionId() - + " does not match supplied eci " + currentCompactionId.get()); - } - } catch (RetriesExceededException e2) { - LOG.warn("Retries exceeded getting next job. Retrying..."); - continue; - } - LOG.debug("Received next compaction job: {}", job); + LOG.debug("Received next compaction job: {}", job); - final LongAdder totalInputEntries = new LongAdder(); - final LongAdder totalInputBytes = new LongAdder(); - final CountDownLatch started = new CountDownLatch(1); - final CountDownLatch stopped = new CountDownLatch(1); + final LongAdder totalInputEntries = new LongAdder(); + final LongAdder totalInputBytes = new LongAdder(); + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch stopped = new CountDownLatch(1); - final FileCompactorRunnable fcr = - createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); + final FileCompactorRunnable fcr = + createCompactionJob(job, totalInputEntries, totalInputBytes, started, stopped, err); - final Thread compactionThread = - Threads.createThread("Compaction job for tablet " + job.getExtent().toString(), fcr); + final Thread compactionThread = + Threads.createThread("Compaction job for tablet " + job.getExtent().toString(), fcr); - JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); + JOB_HOLDER.set(job, compactionThread, fcr.getFileCompactor()); - try { - // mark compactor as busy while compacting - updateIdleStatus(false); - - // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set - fcr.initialize(); - - compactionThread.start(); // start the compactionThread - started.await(); // wait until the compactor is started - final long inputEntries = totalInputEntries.sum(); - final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); - LOG.debug("Progress checks will occur every {} seconds", waitTime); - String percentComplete = "unknown"; - - while (!stopped.await(waitTime, TimeUnit.SECONDS)) { - List running = - org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); - if (!running.isEmpty()) { - // Compaction has started. There should only be one in the list - CompactionInfo info = running.get(0); - if (info != null) { - final long entriesRead = info.getEntriesRead(); - final long entriesWritten = info.getEntriesWritten(); - if (inputEntries > 0) { - percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); - } - String message = String.format( - "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", - entriesRead, inputEntries, percentComplete, "%", entriesWritten); - watcher.run(); - try { - LOG.debug("Updating coordinator with compaction progress: {}.", message); - TCompactionStatusUpdate update = new TCompactionStatusUpdate( - TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, - entriesWritten, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - } catch (RetriesExceededException e) { - LOG.warn("Error updating coordinator with compaction progress, error: {}", - e.getMessage()); + try { + // mark compactor as busy while compacting + updateIdleStatus(false); + + // Need to call FileCompactorRunnable.initialize after calling JOB_HOLDER.set + fcr.initialize(); + + compactionThread.start(); // start the compactionThread + started.await(); // wait until the compactor is started + final long inputEntries = totalInputEntries.sum(); + final long waitTime = calculateProgressCheckTime(totalInputBytes.sum()); + LOG.debug("Progress checks will occur every {} seconds", waitTime); + String percentComplete = "unknown"; + + while (!stopped.await(waitTime, TimeUnit.SECONDS)) { + List running = + org.apache.accumulo.server.compaction.FileCompactor.getRunningCompactions(); + if (!running.isEmpty()) { + // Compaction has started. There should only be one in the list + CompactionInfo info = running.get(0); + if (info != null) { + final long entriesRead = info.getEntriesRead(); + final long entriesWritten = info.getEntriesWritten(); + if (inputEntries > 0) { + percentComplete = Float.toString((entriesRead / (float) inputEntries) * 100); + } + String message = String.format( + "Compaction in progress, read %d of %d input entries ( %s %s ), written %d entries", + entriesRead, inputEntries, percentComplete, "%", entriesWritten); + watcher.run(); + try { + LOG.debug("Updating coordinator with compaction progress: {}.", message); + TCompactionStatusUpdate update = new TCompactionStatusUpdate( + TCompactionState.IN_PROGRESS, message, inputEntries, entriesRead, + entriesWritten, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + } catch (RetriesExceededException e) { + LOG.warn("Error updating coordinator with compaction progress, error: {}", + e.getMessage()); + } } + } else { + LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); } - } else { - LOG.debug("Waiting on compaction thread to finish, but no RUNNING compaction"); } - } - compactionThread.join(); - LOG.trace("Compaction thread finished."); - // Run the watcher again to clear out the finished compaction and set the - // stuck count to zero. - watcher.run(); - - if (err.get() != null) { - // maybe the error occured because the table was deleted or something like that, so - // force a cancel check to possibly reduce noise in the logs - checkIfCanceled(); - } - - if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() - || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { - LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); - try { - TCompactionStatusUpdate update = - new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", - -1, -1, -1, fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - updateCompactionFailed(job); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction cancellation.", e); - } finally { - currentCompactionId.set(null); + compactionThread.join(); + LOG.trace("Compaction thread finished."); + // Run the watcher again to clear out the finished compaction and set the + // stuck count to zero. + watcher.run(); + + if (err.get() != null) { + // maybe the error occured because the table was deleted or something like that, so + // force a cancel check to possibly reduce noise in the logs + checkIfCanceled(); } - } else if (err.get() != null) { - KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); - try { - LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", - job.getExternalCompactionId(), fromThriftExtent); - TCompactionStatusUpdate update = new TCompactionStatusUpdate(TCompactionState.FAILED, - "Compaction failed due to: " + err.get().getMessage(), -1, -1, -1, - fcr.getCompactionAge().toNanos()); - updateCompactionState(job, update); - updateCompactionFailed(job); - } catch (RetriesExceededException e) { - LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", - job.getExternalCompactionId(), fromThriftExtent, e); - } finally { - currentCompactionId.set(null); - } - } else { - try { - LOG.trace("Updating coordinator with compaction completion."); - updateCompactionCompleted(job, JOB_HOLDER.getStats()); - } catch (RetriesExceededException e) { - LOG.error( - "Error updating coordinator with compaction completion, cancelling compaction.", - e); + + if (compactionThread.isInterrupted() || JOB_HOLDER.isCancelled() + || (err.get() != null && err.get().getClass().equals(InterruptedException.class))) { + LOG.warn("Compaction thread was interrupted, sending CANCELLED state"); + try { + TCompactionStatusUpdate update = + new TCompactionStatusUpdate(TCompactionState.CANCELLED, "Compaction cancelled", + -1, -1, -1, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + updateCompactionFailed(job); + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction cancellation.", e); + } finally { + currentCompactionId.set(null); + } + } else if (err.get() != null) { + KeyExtent fromThriftExtent = KeyExtent.fromThrift(job.getExtent()); + try { + LOG.info("Updating coordinator with compaction failure: id: {}, extent: {}", + job.getExternalCompactionId(), fromThriftExtent); + TCompactionStatusUpdate update = new TCompactionStatusUpdate( + TCompactionState.FAILED, "Compaction failed due to: " + err.get().getMessage(), + -1, -1, -1, fcr.getCompactionAge().toNanos()); + updateCompactionState(job, update); + updateCompactionFailed(job); + } catch (RetriesExceededException e) { + LOG.error("Error updating coordinator with compaction failure: id: {}, extent: {}", + job.getExternalCompactionId(), fromThriftExtent, e); + } finally { + currentCompactionId.set(null); + } + } else { try { - cancel(job.getExternalCompactionId()); - } catch (TException e1) { - LOG.error("Error cancelling compaction.", e1); + LOG.trace("Updating coordinator with compaction completion."); + updateCompactionCompleted(job, JOB_HOLDER.getStats()); + } catch (RetriesExceededException e) { + LOG.error( + "Error updating coordinator with compaction completion, cancelling compaction.", + e); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e1) { + LOG.error("Error cancelling compaction.", e1); + } + } finally { + currentCompactionId.set(null); } - } finally { - currentCompactionId.set(null); } - } - } catch (RuntimeException e1) { - LOG.error( - "Compactor thread was interrupted waiting for compaction to start, cancelling job", - e1); - try { - cancel(job.getExternalCompactionId()); - } catch (TException e2) { - LOG.error("Error cancelling compaction.", e2); - } - } finally { - currentCompactionId.set(null); + } catch (RuntimeException e1) { + LOG.error( + "Compactor thread was interrupted waiting for compaction to start, cancelling job", + e1); + try { + cancel(job.getExternalCompactionId()); + } catch (TException e2) { + LOG.error("Error cancelling compaction.", e2); + } + } finally { + currentCompactionId.set(null); - // mark compactor as idle after compaction completes - updateIdleStatus(true); + // mark compactor as idle after compaction completes + updateIdleStatus(true); - // In the case where there is an error in the foreground code the background compaction - // may still be running. Must cancel it before starting another iteration of the loop to - // avoid multiple threads updating shared state. - while (compactionThread.isAlive()) { - compactionThread.interrupt(); - compactionThread.join(1000); + // In the case where there is an error in the foreground code the background compaction + // may still be running. Must cancel it before starting another iteration of the loop to + // avoid multiple threads updating shared state. + while (compactionThread.isAlive()) { + compactionThread.interrupt(); + compactionThread.join(1000); + } } + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + requestShutdown(); } - - } - + } // end while } catch (Exception e) { LOG.error("Unhandled error occurred in Compactor", e); } finally { // Shutdown local thrift server - LOG.info("Stopping Thrift Servers"); + LOG.debug("Stopping Thrift Servers"); if (compactorAddress.server != null) { compactorAddress.server.stop(); } diff --git a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java index 1eb76e1016f..1819ca9e08e 100644 --- a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java +++ b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java @@ -33,6 +33,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.Supplier; @@ -195,6 +196,7 @@ public class SuccessfulCompactor extends Compactor { private volatile boolean completedCalled = false; private volatile boolean failedCalled = false; private TCompactionStatusUpdate latestState = null; + private final AtomicBoolean shutdown = new AtomicBoolean(false); SuccessfulCompactor(Supplier uuid, ServerAddress address, TExternalCompactionJob job, ServerContext context, ExternalCompactionId eci, CompactorServerOpts compactorServerOpts) { @@ -227,10 +229,20 @@ protected ServerAddress startCompactorClientService() throws UnknownHostExceptio protected TNextCompactionJob getNextJob(Supplier uuid) throws RetriesExceededException { LOG.info("Attempting to get next job, eci = {}", eci); currentCompactionId.set(eci); - this.shutdown = true; + requestShutdown(); return new TNextCompactionJob(job, 1); } + @Override + public void requestShutdown() { + shutdown.set(true); + } + + @Override + public boolean isShutdownRequested() { + return shutdown.get(); + } + @Override protected synchronized void checkIfCanceled() {} diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index bd78388836e..6d89bc0d7bd 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -193,138 +193,145 @@ public void run() { } }); - while (true) { - Span outerSpan = TraceUtil.startSpan(this.getClass(), "gc"); - try (Scope outerScope = outerSpan.makeCurrent()) { - Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop"); - try (Scope innerScope = innerSpan.makeCurrent()) { - final long tStart = System.nanoTime(); + while (!isShutdownRequested()) { + try { + Span outerSpan = TraceUtil.startSpan(this.getClass(), "gc"); + try (Scope outerScope = outerSpan.makeCurrent()) { + Span innerSpan = TraceUtil.startSpan(this.getClass(), "loop"); + try (Scope innerScope = innerSpan.makeCurrent()) { + final long tStart = System.nanoTime(); + try { + System.gc(); // make room + + status.current.started = System.currentTimeMillis(); + var rootGC = new GCRun(DataLevel.ROOT, getContext()); + var mdGC = new GCRun(DataLevel.METADATA, getContext()); + var userGC = new GCRun(DataLevel.USER, getContext()); + + log.info("Starting Root table Garbage Collection."); + status.current.bulks += new GarbageCollectionAlgorithm().collect(rootGC); + incrementStatsForRun(rootGC); + logStats(); + + log.info("Starting Metadata table Garbage Collection."); + status.current.bulks += new GarbageCollectionAlgorithm().collect(mdGC); + incrementStatsForRun(mdGC); + logStats(); + + log.info("Starting User table Garbage Collection."); + status.current.bulks += new GarbageCollectionAlgorithm().collect(userGC); + incrementStatsForRun(userGC); + logStats(); + + } catch (Exception e) { + TraceUtil.setException(innerSpan, e, false); + log.error("{}", e.getMessage(), e); + } finally { + status.current.finished = System.currentTimeMillis(); + status.last = status.current; + gcCycleMetrics.setLastCollect(status.current); + status.current = new GcCycleStats(); + } + + final long tStop = System.nanoTime(); + log.info(String.format("Collect cycle took %.2f seconds", + (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0))); + + /* + * We want to prune references to fully-replicated WALs from the replication table which + * are no longer referenced in the metadata table before running + * GarbageCollectWriteAheadLogs to ensure we delete as many files as possible. + */ + Span replSpan = TraceUtil.startSpan(this.getClass(), "replicationClose"); + try (Scope replScope = replSpan.makeCurrent()) { + @SuppressWarnings("deprecation") + Runnable closeWals = + new org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences(getContext()); + closeWals.run(); + } catch (Exception e) { + TraceUtil.setException(replSpan, e, false); + log.error("Error trying to close write-ahead logs for replication table", e); + } finally { + replSpan.end(); + } + + // Clean up any unused write-ahead logs + Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs"); + try (Scope walScope = walSpan.makeCurrent()) { + GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs( + getContext(), fs, liveTServerSet, isUsingTrash()); + log.info("Beginning garbage collection of write-ahead logs"); + walogCollector.collect(status); + gcCycleMetrics.setLastWalCollect(status.lastLog); + } catch (Exception e) { + TraceUtil.setException(walSpan, e, false); + log.error("{}", e.getMessage(), e); + } finally { + walSpan.end(); + } + } catch (Exception e) { + TraceUtil.setException(innerSpan, e, true); + throw e; + } finally { + innerSpan.end(); + } + + // we just made a lot of metadata changes: flush them out try { - System.gc(); // make room + AccumuloClient accumuloClient = getContext(); - status.current.started = System.currentTimeMillis(); - var rootGC = new GCRun(DataLevel.ROOT, getContext()); - var mdGC = new GCRun(DataLevel.METADATA, getContext()); - var userGC = new GCRun(DataLevel.USER, getContext()); + final long actionStart = System.nanoTime(); - log.info("Starting Root table Garbage Collection."); - status.current.bulks += new GarbageCollectionAlgorithm().collect(rootGC); - incrementStatsForRun(rootGC); - logStats(); + String action = getConfiguration().get(Property.GC_USE_FULL_COMPACTION); + log.debug("gc post action {} started", action); - log.info("Starting Metadata table Garbage Collection."); - status.current.bulks += new GarbageCollectionAlgorithm().collect(mdGC); - incrementStatsForRun(mdGC); - logStats(); + switch (action) { + case "compact": + accumuloClient.tableOperations().compact(MetadataTable.NAME, null, null, true, + true); + accumuloClient.tableOperations().compact(RootTable.NAME, null, null, true, true); + break; + case "flush": + accumuloClient.tableOperations().flush(MetadataTable.NAME, null, null, true); + accumuloClient.tableOperations().flush(RootTable.NAME, null, null, true); + break; + default: + log.trace("'none - no action' or invalid value provided: {}", action); + } - log.info("Starting User table Garbage Collection."); - status.current.bulks += new GarbageCollectionAlgorithm().collect(userGC); - incrementStatsForRun(userGC); - logStats(); + final long actionComplete = System.nanoTime(); - } catch (Exception e) { - TraceUtil.setException(innerSpan, e, false); - log.error("{}", e.getMessage(), e); - } finally { - status.current.finished = System.currentTimeMillis(); - status.last = status.current; - gcCycleMetrics.setLastCollect(status.current); - status.current = new GcCycleStats(); - } + gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart); - final long tStop = System.nanoTime(); - log.info(String.format("Collect cycle took %.2f seconds", - (TimeUnit.NANOSECONDS.toMillis(tStop - tStart) / 1000.0))); - - /* - * We want to prune references to fully-replicated WALs from the replication table which - * are no longer referenced in the metadata table before running - * GarbageCollectWriteAheadLogs to ensure we delete as many files as possible. - */ - Span replSpan = TraceUtil.startSpan(this.getClass(), "replicationClose"); - try (Scope replScope = replSpan.makeCurrent()) { - @SuppressWarnings("deprecation") - Runnable closeWals = - new org.apache.accumulo.gc.replication.CloseWriteAheadLogReferences(getContext()); - closeWals.run(); - } catch (Exception e) { - TraceUtil.setException(replSpan, e, false); - log.error("Error trying to close write-ahead logs for replication table", e); - } finally { - replSpan.end(); - } + log.info("gc post action {} completed in {} seconds", action, String.format("%.2f", + (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0))); - // Clean up any unused write-ahead logs - Span walSpan = TraceUtil.startSpan(this.getClass(), "walogs"); - try (Scope walScope = walSpan.makeCurrent()) { - GarbageCollectWriteAheadLogs walogCollector = - new GarbageCollectWriteAheadLogs(getContext(), fs, liveTServerSet, isUsingTrash()); - log.info("Beginning garbage collection of write-ahead logs"); - walogCollector.collect(status); - gcCycleMetrics.setLastWalCollect(status.lastLog); } catch (Exception e) { - TraceUtil.setException(walSpan, e, false); - log.error("{}", e.getMessage(), e); - } finally { - walSpan.end(); + TraceUtil.setException(outerSpan, e, false); + log.warn("{}", e.getMessage(), e); } } catch (Exception e) { - TraceUtil.setException(innerSpan, e, true); + TraceUtil.setException(outerSpan, e, true); throw e; } finally { - innerSpan.end(); + outerSpan.end(); } - - // we just made a lot of metadata changes: flush them out try { - AccumuloClient accumuloClient = getContext(); - - final long actionStart = System.nanoTime(); - - String action = getConfiguration().get(Property.GC_USE_FULL_COMPACTION); - log.debug("gc post action {} started", action); - - switch (action) { - case "compact": - accumuloClient.tableOperations().compact(MetadataTable.NAME, null, null, true, true); - accumuloClient.tableOperations().compact(RootTable.NAME, null, null, true, true); - break; - case "flush": - accumuloClient.tableOperations().flush(MetadataTable.NAME, null, null, true); - accumuloClient.tableOperations().flush(RootTable.NAME, null, null, true); - break; - default: - log.trace("'none - no action' or invalid value provided: {}", action); - } - - final long actionComplete = System.nanoTime(); - - gcCycleMetrics.setPostOpDurationNanos(actionComplete - actionStart); - - log.info("gc post action {} completed in {} seconds", action, String.format("%.2f", - (TimeUnit.NANOSECONDS.toMillis(actionComplete - actionStart) / 1000.0))); - } catch (Exception e) { - TraceUtil.setException(outerSpan, e, false); + gcCycleMetrics.incrementRunCycleCount(); + long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); + log.debug("Sleeping for {} milliseconds", gcDelay); + Thread.sleep(gcDelay); + } catch (InterruptedException e) { log.warn("{}", e.getMessage(), e); + throw e; } - } catch (Exception e) { - TraceUtil.setException(outerSpan, e, true); - throw e; - } finally { - outerSpan.end(); - } - try { - - gcCycleMetrics.incrementRunCycleCount(); - long gcDelay = getConfiguration().getTimeInMillis(Property.GC_CYCLE_DELAY); - log.debug("Sleeping for {} milliseconds", gcDelay); - Thread.sleep(gcDelay); } catch (InterruptedException e) { - log.warn("{}", e.getMessage(), e); - return; + log.info("Interrupt Exception received, shutting down"); + requestShutdown(); } } + log.info("stop requested. exiting ... "); } private void incrementStatsForRun(GCRun gcRun) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index a3540fa062a..10326962344 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -1443,10 +1443,19 @@ boolean canSuspendTablets() { // The manager is fully initialized. Clients are allowed to connect now. managerInitialized.set(true); - while (clientService.isServing()) { - sleepUninterruptibly(500, MILLISECONDS); + while (!isShutdownRequested() && clientService.isServing()) { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + log.info("Interrupt Exception received, shutting down"); + requestShutdown(); + } } - log.info("Shutting down fate."); + + LOG.debug("Stopping Thrift Servers"); + sa.server.stop(); + + log.debug("Shutting down fate."); fate().shutdown(); final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME; @@ -1487,7 +1496,7 @@ boolean canSuspendTablets() { throw new IllegalStateException("Exception waiting on watcher", e); } } - log.info("exiting"); + log.info("stop requested. exiting ... "); } @Deprecated diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java index 6784206b686..300534858aa 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/EmbeddedWebServer.java @@ -127,7 +127,7 @@ public void start() { } } - private void stop() { + public void stop() { try { server.stop(); server.join(); diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 4108282bcb3..8089acae696 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -526,6 +526,18 @@ public void run() { }).start(); monitorInitialized.set(true); + + while (!isShutdownRequested()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + requestShutdown(); + } + } + + server.stop(); + log.info("stop requested. exiting ... "); } private ServletHolder getDefaultServlet() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 90a4fc0854e..a457b07ff3d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -201,7 +201,6 @@ private TabletMetadataLoader(Ample ample) { HostAndPort clientAddress; private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); - protected volatile boolean serverStopRequested = false; private ServiceLock scanServerLock; protected TabletServerScanMetrics scanMetrics; private ScanServerMetrics scanServerMetrics; @@ -352,8 +351,8 @@ private ServiceLock announceExistence() { @Override public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { + Halt.halt(isShutdownRequested() ? 0 : 1, () -> { + if (!isShutdownRequested()) { LOG.error("Lost tablet server lock (reason = {}), exiting.", reason); } gcLogger.logGCInfo(getConfiguration()); @@ -417,17 +416,29 @@ public void run() { ServiceLock lock = announceExistence(); try { - while (!serverStopRequested) { - UtilWaitThread.sleep(1000); - updateIdleStatus( - sessionManager.getActiveScans().isEmpty() && tabletMetadataCache.estimatedSize() == 0); + while (!isShutdownRequested()) { + try { + Thread.sleep(1000); + updateIdleStatus(sessionManager.getActiveScans().isEmpty() + && tabletMetadataCache.estimatedSize() == 0); + } catch (InterruptedException e) { + LOG.info("Interrupt Exception received, shutting down"); + requestShutdown(); + } } } finally { - LOG.info("Stopping Thrift Servers"); + // Wait for scans to got to zero + while (!sessionManager.getActiveScans().isEmpty()) { + LOG.debug("Waiting on {} active scans to complete.", + sessionManager.getActiveScans().size()); + UtilWaitThread.sleep(1000); + } + + LOG.debug("Stopping Thrift Servers"); address.server.stop(); try { - LOG.info("Removing server scan references"); + LOG.debug("Removing server scan references"); this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(), serverLockUUID); } catch (Exception e) { @@ -944,6 +955,11 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent t long busyTimeout) throws ThriftSecurityException, NotServingTabletException, TooManyFilesException, TSampleNotPresentException, TException { + if (isShutdownRequested()) { + // Prevent scans from starting if shutting down + throw new ScanServerBusyException(); + } + KeyExtent extent = getKeyExtent(textent); if (extent.isMeta() && !isSystemUser(credentials)) { @@ -1004,6 +1020,11 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map executionHints, long busyTimeout) throws ThriftSecurityException, TSampleNotPresentException, TException { + if (isShutdownRequested()) { + // Prevent scans from starting if shutting down + throw new ScanServerBusyException(); + } + if (tbatch.size() == 0) { throw new TException("Scan Server batch must include at least one extent"); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 54764b86725..4f9e0ee32c6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -54,6 +54,7 @@ import java.util.UUID; import java.util.concurrent.BlockingDeque; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadPoolExecutor; @@ -87,12 +88,14 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsInfo; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.spi.fs.VolumeChooserEnvironment; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.ComparablePair; import org.apache.accumulo.core.util.Halt; @@ -104,6 +107,7 @@ import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.core.util.threads.ThreadPoolNames; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.server.AbstractServer; @@ -219,9 +223,6 @@ public TabletServerMinCMetrics getMinCMetrics() { volatile HostAndPort clientAddress; - private volatile boolean serverStopRequested = false; - private volatile boolean shutdownComplete = false; - private ServiceLock tabletServerLock; private TServer server; @@ -401,7 +402,7 @@ String getLockID() { void requestStop() { log.info("Stop requested."); - serverStopRequested = true; + requestShutdown(); } private class SplitRunner implements Runnable { @@ -683,8 +684,8 @@ private void announceExistence() { @Override public void lostLock(final LockLossReason reason) { - Halt.halt(serverStopRequested ? 0 : 1, () -> { - if (!serverStopRequested) { + Halt.halt(isShutdownRequested() ? 0 : 1, () -> { + if (!isShutdownRequested()) { log.error("Lost tablet server lock (reason = {}), exiting.", reason); } gcLogger.logGCInfo(getConfiguration()); @@ -873,7 +874,7 @@ public void run() { CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS)); HostAndPort managerHost; - while (!serverStopRequested) { + while (!isShutdownRequested()) { updateIdleStatus(getOnlineTablets().isEmpty()); @@ -885,7 +886,7 @@ public void run() { try { // wait until a message is ready to send, or a server stop // was requested - while (mm == null && !serverStopRequested) { + while (mm == null && !isShutdownRequested()) { mm = managerMessages.poll(1, TimeUnit.SECONDS); updateIdleStatus(getOnlineTablets().isEmpty()); } @@ -898,7 +899,7 @@ public void run() { // if while loop does not execute at all and mm != null, // then finally block should place mm back on queue - while (!serverStopRequested && mm != null && client != null + while (!isShutdownRequested() && mm != null && client != null && client.getOutputProtocol() != null && client.getOutputProtocol().getTransport() != null && client.getOutputProtocol().getTransport().isOpen()) { @@ -929,7 +930,7 @@ public void run() { } } catch (InterruptedException e) { log.info("Interrupt Exception received, shutting down"); - serverStopRequested = true; + requestShutdown(); } catch (Exception e) { // may have lost connection with manager // loop back to the beginning and wait for a new one @@ -938,21 +939,6 @@ public void run() { } } - // wait for shutdown - // if the main thread exits oldServer the manager listener, the JVM will - // kill the other threads and finalize objects. We want the shutdown that is - // running in the manager listener thread to complete oldServer this happens. - // consider making other threads daemon threads so that objects don't - // get prematurely finalized - synchronized (this) { - while (!shutdownComplete) { - try { - this.wait(1000); - } catch (InterruptedException e) { - log.error(e.toString()); - } - } - } log.debug("Stopping Replication Server"); if (this.replServer != null) { this.replServer.stop(); @@ -963,6 +949,62 @@ public void run() { server.stop(); } + // Best-effort attempt at unloading tablets. + log.debug("Unloading tablets"); + final List> futures = new ArrayList<>(); + final ThreadPoolExecutor tpe = getContext().threadPools() + .getPoolBuilder(ThreadPoolNames.TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL).numCoreThreads(8) + .numMaxThreads(16).build(); + + ManagerClientService.Client iface = managerConnection(getManagerAddress()); + boolean managerDown = false; + + try { + for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) { + getOnlineTablets().keySet().forEach(ke -> { + if (DataLevel.of(ke.tableId()) == level) { + futures.add( + tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED, 5000))); + } + }); + while (!futures.isEmpty()) { + Iterator> unloads = futures.iterator(); + while (unloads.hasNext()) { + Future f = unloads.next(); + if (f.isDone()) { + if (!managerDown) { + ManagerMessage mm = managerMessages.poll(); + try { + mm.send(getContext().rpcCreds(), getClientAddressString(), iface); + } catch (TException e) { + managerDown = true; + LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", + e.getMessage()); + } + } + unloads.remove(); + } + } + log.debug("Waiting on {} {} tablets to close.", futures.size(), level); + } + } + } finally { + if (!managerDown) { + try { + ManagerMessage mm = managerMessages.poll(); + do { + mm.send(getContext().rpcCreds(), getClientAddressString(), iface); + mm = managerMessages.poll(); + } while (mm != null); + } catch (TException e) { + LOG.debug("Error sending message to Manager during tablet unloading, msg: {}", + e.getMessage()); + } + } + returnManagerConnection(iface); + tpe.shutdown(); + } + try { log.debug("Closing filesystems"); getVolumeManager().close(); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java index 73a3e0d03bc..6719bf6b926 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java @@ -117,6 +117,11 @@ protected boolean isSystemUser(TCredentials creds) { return systemUser; } + @Override + public boolean isShutdownRequested() { + return false; + } + } private ThriftScanClientHandler handler; diff --git a/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java b/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java index 8391190984c..80a41ff0610 100644 --- a/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java +++ b/test/src/main/java/org/apache/accumulo/test/SelfStoppingScanServer.java @@ -45,7 +45,7 @@ public void closeMultiScan(TInfo tinfo, long scanID) throws NoSuchScanIDExceptio scanCount.incrementAndGet(); super.closeMultiScan(tinfo, scanID); if (scanCount.get() == 3) { - serverStopRequested = true; + requestShutdown(); } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java index 08fc5fbb8d9..208fc736b56 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalDoNothingCompactor.java @@ -60,7 +60,7 @@ protected FileCompactorRunnable createCompactionJob(TExternalCompactionJob job, CountDownLatch stopped, AtomicReference err) { // Set this to true so that only 1 external compaction is run - this.shutdown = true; + requestShutdown(); return new FileCompactorRunnable() {