From d6f823c24f4ecfcdc4a0584c72ef9911de1c6b6c Mon Sep 17 00:00:00 2001 From: "Dom G." Date: Wed, 15 Jan 2025 15:45:02 -0500 Subject: [PATCH 1/3] Improve/standardize rate limiting logic in Monitor (#4894) * Improve/standardize rate limiting logic in monitor --- .../org/apache/accumulo/monitor/Monitor.java | 261 ++++++++---------- .../rest/compactions/external/ECResource.java | 2 +- 2 files changed, 117 insertions(+), 146 deletions(-) diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index 17ed1737149..548d2a985c7 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.HOURS; +import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import java.net.InetAddress; @@ -27,10 +28,10 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -75,6 +76,7 @@ import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.monitor.rest.compactions.external.ExternalCompactionInfo; @@ -178,7 +180,7 @@ public boolean add(Pair obj) { private Exception problemException; private GCStatus gcStatus; private Optional coordinatorHost = Optional.empty(); - private long coordinatorCheckNanos = 0L; + private Timer coordinatorCheck = null; private CompactionCoordinatorService.Client coordinatorClient; private final String coordinatorMissingMsg = "Error getting the compaction coordinator. Check that it is running. It is not " @@ -388,11 +390,10 @@ public void fetchData() { } // check for compaction coordinator host and only notify its discovery - Optional previousHost; - if (System.nanoTime() - coordinatorCheckNanos > fetchTimeNanos) { - previousHost = coordinatorHost; + if (coordinatorCheck == null || coordinatorCheck.hasElapsed(expirationTimeMinutes, MINUTES)) { + Optional previousHost = coordinatorHost; coordinatorHost = ExternalCompactionUtil.findCompactionCoordinator(context); - coordinatorCheckNanos = System.nanoTime(); + coordinatorCheck = Timer.startNew(); if (previousHost.isEmpty() && coordinatorHost.isPresent()) { log.info("External Compaction Coordinator found at {}", coordinatorHost.orElseThrow()); } @@ -611,112 +612,78 @@ public static class CompactionStats { } } - private final Map tserverScans = new HashMap<>(); - private final Map sserverScans = new HashMap<>(); - private final Map allCompactions = new HashMap<>(); + private final long expirationTimeMinutes = 1; + + // Use Suppliers.memoizeWithExpiration() to cache the results of expensive fetch operations. This + // avoids unnecessary repeated fetches within the expiration period and ensures that multiple + // requests around the same time use the same cached data. + private final Supplier> tserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchTServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier> sserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchSServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier> compactionsSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactions, expirationTimeMinutes, MINUTES); + + private final Supplier compactorInfoSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactorsInfo, expirationTimeMinutes, MINUTES); + + private final Supplier externalCompactionsSupplier = + Suppliers.memoizeWithExpiration(this::computeExternalCompactionsSnapshot, + expirationTimeMinutes, MINUTES); + private final RecentLogs recentLogs = new RecentLogs(); - private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); - - private long scansFetchedNanos = System.nanoTime(); - private long compactsFetchedNanos = System.nanoTime(); - private long ecInfoFetchedNanos = System.nanoTime(); - private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1); - private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15); - // When there are a large amount of external compactions running the list of external compactions - // could consume a lot of memory. The purpose of this memoizing supplier is to try to avoid - // creating the list of running external compactions in memory per web request. If multiple - // request come in around the same time they should use the same list. It is still possible to - // have multiple list in memory if one request obtains a copy and then another request comes in - // after the timeout and the supplier recomputes the list. The longer the timeout on the supplier - // is the less likely we are to have multiple list of external compactions in memory, however - // increasing the timeout will make the monitor less responsive. - private final Supplier extCompactionSnapshot = - Suppliers.memoizeWithExpiration(() -> computeExternalCompactionsSnapshot(), fetchTimeNanos, - TimeUnit.NANOSECONDS); /** - * Fetch the active scans but only if fetchTimeNanos has elapsed. + * @return active tablet server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. */ - public synchronized Map getScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active TabletServer Scans"); - fetchScans(); - } - return Map.copyOf(tserverScans); + public Map getScans() { + return tserverScansSupplier.get(); } - public synchronized Map getScanServerScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active ScanServer Scans"); - fetchScans(); - } - return Map.copyOf(sserverScans); + /** + * @return active scan server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public Map getScanServerScans() { + return sserverScansSupplier.get(); } /** - * Fetch the active compactions but only if fetchTimeNanos has elapsed. + * @return active compactions. Values are cached and refresh after {@link #expirationTimeMinutes}. */ - public synchronized Map getCompactions() { - if (System.nanoTime() - compactsFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active Compactions"); - fetchCompactions(); - } - return Map.copyOf(allCompactions); + public Map getCompactions() { + return compactionsSupplier.get(); } - public synchronized ExternalCompactionInfo getCompactorsInfo() { + /** + * @return external compaction information. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public ExternalCompactionInfo getCompactorsInfo() { if (coordinatorHost.isEmpty()) { throw new IllegalStateException("Tried fetching from compaction coordinator that's missing"); } - if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of External Compaction info"); - Map> compactors = - ExternalCompactionUtil.getCompactorAddrs(getContext()); - log.debug("Found compactors: " + compactors); - ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); - ecInfo.setCompactors(compactors); - ecInfo.setCoordinatorHost(coordinatorHost); - - ecInfoFetchedNanos = System.nanoTime(); - } - return ecInfo; + return compactorInfoSupplier.get(); } - private static class ExternalCompactionsSnapshot { - public final RunningCompactions runningCompactions; - public final Map ecRunningMap; - - private ExternalCompactionsSnapshot(Optional> ecRunningMapOpt) { - this.ecRunningMap = - ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap()); - this.runningCompactions = new RunningCompactions(this.ecRunningMap); - } - } - - private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { - if (coordinatorHost.isEmpty()) { - throw new IllegalStateException(coordinatorMissingMsg); - } - var ccHost = coordinatorHost.orElseThrow(); - log.info("User initiated fetch of running External Compactions from " + ccHost); - var client = getCoordinator(ccHost); - TExternalCompactionList running; - try { - running = client.getRunningCompactions(TraceUtil.traceInfo(), getContext().rpcCreds()); - } catch (Exception e) { - throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); - } - - return new ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions())); - } - - public RunningCompactions getRunnningCompactions() { - return extCompactionSnapshot.get().runningCompactions; + /** + * @return running compactions. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public RunningCompactions getRunningCompactions() { + return externalCompactionsSupplier.get().runningCompactions; } + /** + * @return running compactor details. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ public RunningCompactorDetails getRunningCompactorDetails(ExternalCompactionId ecid) { TExternalCompaction extCompaction = - extCompactionSnapshot.get().ecRunningMap.get(ecid.canonical()); + externalCompactionsSupplier.get().ecRunningMap.get(ecid.canonical()); if (extCompaction == null) { return null; } @@ -736,61 +703,36 @@ private CompactionCoordinatorService.Client getCoordinator(HostAndPort address) return coordinatorClient; } - private void fetchScans() { + private Map fetchScans(Collection servers) { ServerContext context = getContext(); - for (String server : context.instanceOperations().getTabletServers()) { + Map scans = new HashMap<>(); + for (String server : servers) { final HostAndPort parsedServer = HostAndPort.fromString(server); - TabletScanClientService.Client tserver = null; + TabletScanClientService.Client client = null; try { - tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); - List scans = tserver.getActiveScans(null, context.rpcCreds()); - tserverScans.put(parsedServer, new ScanStats(scans)); - scansFetchedNanos = System.nanoTime(); + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); + List activeScans = client.getActiveScans(null, context.rpcCreds()); + scans.put(parsedServer, new ScanStats(activeScans)); } catch (Exception ex) { log.error("Failed to get active scans from {}", server, ex); } finally { - ThriftUtil.returnClient(tserver, context); - } - } - // Age off old scan information - Iterator> tserverIter = tserverScans.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (tserverIter.hasNext()) { - Entry entry = tserverIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - tserverIter.remove(); - } - } - // Scan Servers - for (String server : context.instanceOperations().getScanServers()) { - final HostAndPort parsedServer = HostAndPort.fromString(server); - TabletScanClientService.Client sserver = null; - try { - sserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); - List scans = sserver.getActiveScans(null, context.rpcCreds()); - sserverScans.put(parsedServer, new ScanStats(scans)); - scansFetchedNanos = System.nanoTime(); - } catch (Exception ex) { - log.error("Failed to get active scans from {}", server, ex); - } finally { - ThriftUtil.returnClient(sserver, context); - } - } - // Age off old scan information - Iterator> sserverIter = sserverScans.entrySet().iterator(); - // clock time used for fetched for date friendly display - now = System.currentTimeMillis(); - while (sserverIter.hasNext()) { - Entry entry = sserverIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - sserverIter.remove(); + ThriftUtil.returnClient(client, context); } } + return Collections.unmodifiableMap(scans); } - private void fetchCompactions() { + private Map fetchTServerScans() { + return fetchScans(getContext().instanceOperations().getTabletServers()); + } + + private Map fetchSServerScans() { + return fetchScans(getContext().instanceOperations().getScanServers()); + } + + private Map fetchCompactions() { ServerContext context = getContext(); + Map allCompactions = new HashMap<>(); for (String server : context.instanceOperations().getTabletServers()) { final HostAndPort parsedServer = HostAndPort.fromString(server); Client tserver = null; @@ -798,23 +740,52 @@ private void fetchCompactions() { tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); var compacts = tserver.getActiveCompactions(null, context.rpcCreds()); allCompactions.put(parsedServer, new CompactionStats(compacts)); - compactsFetchedNanos = System.nanoTime(); } catch (Exception ex) { log.debug("Failed to get active compactions from {}", server, ex); } finally { ThriftUtil.returnClient(tserver, context); } } - // Age off old compaction information - var entryIter = allCompactions.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (entryIter.hasNext()) { - var entry = entryIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - entryIter.remove(); - } + return Collections.unmodifiableMap(allCompactions); + } + + private ExternalCompactionInfo fetchCompactorsInfo() { + ServerContext context = getContext(); + Map> compactors = ExternalCompactionUtil.getCompactorAddrs(context); + log.debug("Found compactors: {}", compactors); + ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); + ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); + ecInfo.setCompactors(compactors); + ecInfo.setCoordinatorHost(coordinatorHost); + return ecInfo; + } + + private static class ExternalCompactionsSnapshot { + public final RunningCompactions runningCompactions; + public final Map ecRunningMap; + + private ExternalCompactionsSnapshot(Optional> ecRunningMapOpt) { + this.ecRunningMap = + ecRunningMapOpt.map(Collections::unmodifiableMap).orElse(Collections.emptyMap()); + this.runningCompactions = new RunningCompactions(this.ecRunningMap); + } + } + + private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { + if (coordinatorHost.isEmpty()) { + throw new IllegalStateException(coordinatorMissingMsg); + } + var ccHost = coordinatorHost.orElseThrow(); + log.info("User initiated fetch of running External Compactions from " + ccHost); + var client = getCoordinator(ccHost); + TExternalCompactionList running; + try { + running = client.getRunningCompactions(TraceUtil.traceInfo(), getContext().rpcCreds()); + } catch (Exception e) { + throw new IllegalStateException("Unable to get running compactions from " + ccHost, e); } + + return new ExternalCompactionsSnapshot(Optional.ofNullable(running.getCompactions())); } /** diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java index 72d54d70a4e..5fcecef3493 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java @@ -60,7 +60,7 @@ public Compactors getCompactors() { @Path("running") @GET public RunningCompactions getRunning() { - return monitor.getRunnningCompactions(); + return monitor.getRunningCompactions(); } @Path("details") From d758759fcb0ce112337d809456c73ff58bf61ff6 Mon Sep 17 00:00:00 2001 From: Daniel Roberts Date: Thu, 16 Jan 2025 08:31:07 -0500 Subject: [PATCH 2/3] Sets default column size when terminal size is 0 (#5265) Sets a default column size of 80 characters if the terminal width is set to 0. This supports running the help command in a dumb terminal --- .../java/org/apache/accumulo/shell/commands/HelpCommand.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java index c7e516c6172..baf740a141c 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java @@ -39,7 +39,8 @@ public class HelpCommand extends Command { @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws ShellCommandException, IOException { - int numColumns = shellState.getTerminal().getWidth(); + int numColumns = + (shellState.getTerminal().getWidth() == 0) ? 80 : shellState.getTerminal().getWidth(); if (cl.hasOption(noWrapOpt.getOpt())) { numColumns = Integer.MAX_VALUE; } From 22831aae2015c17aa0b2b1ab9a4f08d5aed3f1da Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 16 Jan 2025 17:58:34 -0500 Subject: [PATCH 3/3] Propagates errors in Rfile.closeDeepCopies (#5248) Rfile.closeLocalityGroupReaders() was suppressing IOExceptions. This method was called by Rfile.closeDeepCopies() which was called by FileManager.releaseReaders(). Suppressing the exception meant that releaseReaders() did not see the exception and would decided to return the rfile to the pool when it should not. The only other code calling Rfile.closeLocalityGroupReaders() was Rfile.close(). Refactored the code so that Rfile.close() still suppressed the exception and Rfile.closeDeepCopies() does not suppress. Tried to preserve the behavior that Rfile.close() closes as many of its underlying resource as possible even if some exceptions occur. --- .../accumulo/core/file/rfile/RFile.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 7c350afb951..68e2be016d1 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -1288,24 +1288,32 @@ public Reader(CachableBlockFile.CachableBuilder b) throws IOException { this(new CachableBlockFile.Reader(b)); } - private void closeLocalityGroupReaders() { + private void closeLocalityGroupReaders(boolean ignoreIOExceptions) throws IOException { for (LocalityGroupReader lgr : currentReaders) { try { lgr.close(); } catch (IOException e) { - log.warn("Errored out attempting to close LocalityGroupReader.", e); + if (ignoreIOExceptions) { + log.warn("Errored out attempting to close LocalityGroupReader.", e); + } else { + throw e; + } } } } @Override - public void closeDeepCopies() { + public void closeDeepCopies() throws IOException { + closeDeepCopies(false); + } + + private void closeDeepCopies(boolean ignoreIOExceptions) throws IOException { if (deepCopy) { throw new RuntimeException("Calling closeDeepCopies on a deep copy is not supported"); } for (Reader deepCopy : deepCopies) { - deepCopy.closeLocalityGroupReaders(); + deepCopy.closeLocalityGroupReaders(ignoreIOExceptions); } deepCopies.clear(); @@ -1317,8 +1325,9 @@ public void close() throws IOException { throw new RuntimeException("Calling close on a deep copy is not supported"); } - closeDeepCopies(); - closeLocalityGroupReaders(); + // Closes as much as possible igoring and logging exceptions along the way + closeDeepCopies(true); + closeLocalityGroupReaders(true); if (sampleReaders != null) { for (LocalityGroupReader lgr : sampleReaders) {