diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index b728b8a2b77..ac142dc6717 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -20,15 +20,18 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER; import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -47,7 +50,6 @@ import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; -import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.conf.Property; @@ -342,7 +344,7 @@ private GCStatus fetchGcStatus() { } } } catch (Exception ex) { - log.warn("Unable to contact the garbage collector at " + address, ex); + log.warn("Unable to contact the garbage collector at {}", address, ex); } return result; } @@ -507,73 +509,136 @@ public static class CompactionStats { } } - private final Map tserverScans = new HashMap<>(); - private final Map sserverScans = new HashMap<>(); - private final Map allCompactions = new HashMap<>(); - private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); - - private long scansFetchedNanos = System.nanoTime(); - private long compactsFetchedNanos = System.nanoTime(); - private long ecInfoFetchedNanos = System.nanoTime(); - private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1); - private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15); - // When there are a large amount of external compactions running the list of external compactions - // could consume a lot of memory. The purpose of this memoizing supplier is to try to avoid - // creating the list of running external compactions in memory per web request. If multiple - // request come in around the same time they should use the same list. It is still possible to - // have multiple list in memory if one request obtains a copy and then another request comes in - // after the timeout and the supplier recomputes the list. The longer the timeout on the supplier - // is the less likely we are to have multiple list of external compactions in memory, however - // increasing the timeout will make the monitor less responsive. - private final Supplier extCompactionSnapshot = - Suppliers.memoizeWithExpiration(() -> computeExternalCompactionsSnapshot(), fetchTimeNanos, - TimeUnit.NANOSECONDS); + private final long expirationTimeMinutes = 1; + + // Use Suppliers.memoizeWithExpiration() to cache the results of expensive fetch operations. This + // avoids unnecessary repeated fetches within the expiration period and ensures that multiple + // requests around the same time use the same cached data. + private final Supplier> tserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchTServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier> sserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchSServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier> compactionsSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactions, expirationTimeMinutes, MINUTES); + + private final Supplier compactorInfoSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactorsInfo, expirationTimeMinutes, MINUTES); + + private final Supplier externalCompactionsSupplier = + Suppliers.memoizeWithExpiration(this::computeExternalCompactionsSnapshot, + expirationTimeMinutes, MINUTES); /** - * Fetch the active scans but only if fetchTimeNanos has elapsed. + * @return active tablet server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. */ - public synchronized Map getScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active TabletServer Scans"); - fetchScans(); - } - return Map.copyOf(tserverScans); + public Map getScans() { + return tserverScansSupplier.get(); } - public synchronized Map getScanServerScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active ScanServer Scans"); - fetchScans(); - } - return Map.copyOf(sserverScans); + /** + * @return active scan server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public Map getScanServerScans() { + return sserverScansSupplier.get(); } /** - * Fetch the active compactions but only if fetchTimeNanos has elapsed. + * @return active compactions. Values are cached and refresh after {@link #expirationTimeMinutes}. */ - public synchronized Map getCompactions() { - if (System.nanoTime() - compactsFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active Compactions"); - fetchCompactions(); - } - return Map.copyOf(allCompactions); + public Map getCompactions() { + return compactionsSupplier.get(); } - public synchronized ExternalCompactionInfo getCompactorsInfo() { + /** + * @return external compaction information. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public ExternalCompactionInfo getCompactorsInfo() { if (coordinatorHost.isEmpty()) { throw new IllegalStateException("Tried fetching from compaction coordinator that's missing"); } - if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of External Compaction info"); - Set compactors = - getContext().instanceOperations().getServers(ServerId.Type.COMPACTOR); - log.debug("Found compactors: " + compactors); - ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); - ecInfo.setCompactors(compactors); - ecInfo.setCoordinatorHost(coordinatorHost); - - ecInfoFetchedNanos = System.nanoTime(); + return compactorInfoSupplier.get(); + } + + /** + * @return running compactions. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public RunningCompactions getRunningCompactions() { + return externalCompactionsSupplier.get().runningCompactions; + } + + /** + * @return running compactor details. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public RunningCompactorDetails getRunningCompactorDetails(ExternalCompactionId ecid) { + TExternalCompaction extCompaction = + externalCompactionsSupplier.get().ecRunningMap.get(ecid.canonical()); + if (extCompaction == null) { + return null; + } + return new RunningCompactorDetails(extCompaction); + } + + private Map fetchScans(Collection servers) { + ServerContext context = getContext(); + Map scans = new HashMap<>(); + for (ServerId server : servers) { + final HostAndPort parsedServer = HostAndPort.fromString(server.toHostPortString()); + TabletScanClientService.Client client = null; + try { + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); + List activeScans = client.getActiveScans(null, context.rpcCreds()); + scans.put(parsedServer, new ScanStats(activeScans)); + } catch (Exception ex) { + log.error("Failed to get active scans from {}", server, ex); + } finally { + ThriftUtil.returnClient(client, context); + } + } + return Collections.unmodifiableMap(scans); + } + + private Map fetchTServerScans() { + return fetchScans(getContext().instanceOperations().getServers(TABLET_SERVER)); + } + + private Map fetchSServerScans() { + return fetchScans(getContext().instanceOperations().getServers(SCAN_SERVER)); + } + + private Map fetchCompactions() { + ServerContext context = getContext(); + Map allCompactions = new HashMap<>(); + for (ServerId server : context.instanceOperations().getServers(TABLET_SERVER)) { + final HostAndPort parsedServer = HostAndPort.fromString(server.toHostPortString()); + Client tserver = null; + try { + tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); + var compacts = tserver.getActiveCompactions(null, context.rpcCreds()); + allCompactions.put(parsedServer, new CompactionStats(compacts)); + } catch (Exception ex) { + log.debug("Failed to get active compactions from {}", server, ex); + } finally { + ThriftUtil.returnClient(tserver, context); + } } + return Collections.unmodifiableMap(allCompactions); + } + + private ExternalCompactionInfo fetchCompactorsInfo() { + Set compactors = + getContext().instanceOperations().getServers(ServerId.Type.COMPACTOR); + log.debug("Found compactors: {}", compactors); + ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); + ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); + ecInfo.setCompactors(compactors); + ecInfo.setCoordinatorHost(coordinatorHost); return ecInfo; } @@ -593,7 +658,7 @@ private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { throw new IllegalStateException(coordinatorMissingMsg); } var ccHost = coordinatorHost.orElseThrow(); - log.info("User initiated fetch of running External Compactions from " + ccHost); + log.info("User initiated fetch of running External Compactions from {}", ccHost); try { CompactionCoordinatorService.Client client = ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, ccHost, getContext()); @@ -614,95 +679,6 @@ private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { } } - public RunningCompactions getRunnningCompactions() { - return extCompactionSnapshot.get().runningCompactions; - } - - public RunningCompactorDetails getRunningCompactorDetails(ExternalCompactionId ecid) { - TExternalCompaction extCompaction = - extCompactionSnapshot.get().ecRunningMap.get(ecid.canonical()); - if (extCompaction == null) { - return null; - } - return new RunningCompactorDetails(extCompaction); - } - - private void fetchScans() { - final ServerContext context = getContext(); - final Set servers = new HashSet<>(); - servers.addAll(context.instanceOperations().getServers(ServerId.Type.SCAN_SERVER)); - servers.addAll(context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)); - - for (ServerId server : servers) { - TabletScanClientService.Client tserver = null; - try { - HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort()); - tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); - List scans = tserver.getActiveScans(null, context.rpcCreds()); - tserverScans.put(parsedServer, new ScanStats(scans)); - scansFetchedNanos = System.nanoTime(); - } catch (Exception ex) { - log.error("Failed to get active scans from {}", server, ex); - } finally { - ThriftUtil.returnClient(tserver, context); - } - } - // Age off old scan information - Iterator> tserverIter = tserverScans.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (tserverIter.hasNext()) { - Entry entry = tserverIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - tserverIter.remove(); - } - } - } - - private void fetchCompactions() { - final ServerContext context = getContext(); - - for (ServerId server : context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) { - final HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort()); - Client tserver = null; - try { - tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); - var compacts = tserver.getActiveCompactions(null, context.rpcCreds()); - allCompactions.put(parsedServer, new CompactionStats(compacts)); - compactsFetchedNanos = System.nanoTime(); - } catch (Exception ex) { - log.debug("Failed to get active compactions from {}", server, ex); - } finally { - ThriftUtil.returnClient(tserver, context); - } - } - for (ServerId server : context.instanceOperations().getServers(ServerId.Type.COMPACTOR)) { - final HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort()); - CompactorService.Client compactor = null; - try { - compactor = ThriftUtil.getClient(ThriftClientTypes.COMPACTOR, parsedServer, context); - var compacts = compactor.getActiveCompactions(null, context.rpcCreds()); - allCompactions.put(parsedServer, new CompactionStats(compacts)); - compactsFetchedNanos = System.nanoTime(); - } catch (Exception ex) { - log.debug("Failed to get active compactions from {}", server, ex); - } finally { - ThriftUtil.returnClient(compactor, context); - } - } - - // Age off old compaction information - var entryIter = allCompactions.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (entryIter.hasNext()) { - var entry = entryIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - entryIter.remove(); - } - } - } - /** * Get the monitor lock in ZooKeeper */ diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java index 72d54d70a4e..5fcecef3493 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java @@ -60,7 +60,7 @@ public Compactors getCompactors() { @Path("running") @GET public RunningCompactions getRunning() { - return monitor.getRunnningCompactions(); + return monitor.getRunningCompactions(); } @Path("details") diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java index c7e516c6172..baf740a141c 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java @@ -39,7 +39,8 @@ public class HelpCommand extends Command { @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws ShellCommandException, IOException { - int numColumns = shellState.getTerminal().getWidth(); + int numColumns = + (shellState.getTerminal().getWidth() == 0) ? 80 : shellState.getTerminal().getWidth(); if (cl.hasOption(noWrapOpt.getOpt())) { numColumns = Integer.MAX_VALUE; }