Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/3.1' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
DomGarguilo committed Jan 17, 2025
2 parents 139d850 + deefe4d commit c0fa497
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 147 deletions.
266 changes: 121 additions & 145 deletions server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,18 @@

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER;
import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER;

import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -47,7 +50,6 @@
import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.client.admin.servers.ServerId.Type;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.compaction.thrift.CompactorService;
import org.apache.accumulo.core.compaction.thrift.TExternalCompaction;
import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList;
import org.apache.accumulo.core.conf.Property;
Expand Down Expand Up @@ -342,7 +344,7 @@ private GCStatus fetchGcStatus() {
}
}
} catch (Exception ex) {
log.warn("Unable to contact the garbage collector at " + address, ex);
log.warn("Unable to contact the garbage collector at {}", address, ex);
}
return result;
}
Expand Down Expand Up @@ -507,73 +509,136 @@ public static class CompactionStats {
}
}

private final Map<HostAndPort,ScanStats> tserverScans = new HashMap<>();
private final Map<HostAndPort,ScanStats> sserverScans = new HashMap<>();
private final Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>();
private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();

private long scansFetchedNanos = System.nanoTime();
private long compactsFetchedNanos = System.nanoTime();
private long ecInfoFetchedNanos = System.nanoTime();
private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1);
private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15);
// When there are a large amount of external compactions running the list of external compactions
// could consume a lot of memory. The purpose of this memoizing supplier is to try to avoid
// creating the list of running external compactions in memory per web request. If multiple
// request come in around the same time they should use the same list. It is still possible to
// have multiple list in memory if one request obtains a copy and then another request comes in
// after the timeout and the supplier recomputes the list. The longer the timeout on the supplier
// is the less likely we are to have multiple list of external compactions in memory, however
// increasing the timeout will make the monitor less responsive.
private final Supplier<ExternalCompactionsSnapshot> extCompactionSnapshot =
Suppliers.memoizeWithExpiration(() -> computeExternalCompactionsSnapshot(), fetchTimeNanos,
TimeUnit.NANOSECONDS);
private final long expirationTimeMinutes = 1;

// Use Suppliers.memoizeWithExpiration() to cache the results of expensive fetch operations. This
// avoids unnecessary repeated fetches within the expiration period and ensures that multiple
// requests around the same time use the same cached data.
private final Supplier<Map<HostAndPort,ScanStats>> tserverScansSupplier =
Suppliers.memoizeWithExpiration(this::fetchTServerScans, expirationTimeMinutes, MINUTES);

private final Supplier<Map<HostAndPort,ScanStats>> sserverScansSupplier =
Suppliers.memoizeWithExpiration(this::fetchSServerScans, expirationTimeMinutes, MINUTES);

private final Supplier<Map<HostAndPort,CompactionStats>> compactionsSupplier =
Suppliers.memoizeWithExpiration(this::fetchCompactions, expirationTimeMinutes, MINUTES);

private final Supplier<ExternalCompactionInfo> compactorInfoSupplier =
Suppliers.memoizeWithExpiration(this::fetchCompactorsInfo, expirationTimeMinutes, MINUTES);

private final Supplier<ExternalCompactionsSnapshot> externalCompactionsSupplier =
Suppliers.memoizeWithExpiration(this::computeExternalCompactionsSnapshot,
expirationTimeMinutes, MINUTES);

/**
* Fetch the active scans but only if fetchTimeNanos has elapsed.
* @return active tablet server scans. Values are cached and refresh after
* {@link #expirationTimeMinutes}.
*/
public synchronized Map<HostAndPort,ScanStats> getScans() {
if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) {
log.info("User initiated fetch of Active TabletServer Scans");
fetchScans();
}
return Map.copyOf(tserverScans);
public Map<HostAndPort,ScanStats> getScans() {
return tserverScansSupplier.get();
}

public synchronized Map<HostAndPort,ScanStats> getScanServerScans() {
if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) {
log.info("User initiated fetch of Active ScanServer Scans");
fetchScans();
}
return Map.copyOf(sserverScans);
/**
* @return active scan server scans. Values are cached and refresh after
* {@link #expirationTimeMinutes}.
*/
public Map<HostAndPort,ScanStats> getScanServerScans() {
return sserverScansSupplier.get();
}

/**
* Fetch the active compactions but only if fetchTimeNanos has elapsed.
* @return active compactions. Values are cached and refresh after {@link #expirationTimeMinutes}.
*/
public synchronized Map<HostAndPort,CompactionStats> getCompactions() {
if (System.nanoTime() - compactsFetchedNanos > fetchTimeNanos) {
log.info("User initiated fetch of Active Compactions");
fetchCompactions();
}
return Map.copyOf(allCompactions);
public Map<HostAndPort,CompactionStats> getCompactions() {
return compactionsSupplier.get();
}

public synchronized ExternalCompactionInfo getCompactorsInfo() {
/**
* @return external compaction information. Values are cached and refresh after
* {@link #expirationTimeMinutes}.
*/
public ExternalCompactionInfo getCompactorsInfo() {
if (coordinatorHost.isEmpty()) {
throw new IllegalStateException("Tried fetching from compaction coordinator that's missing");
}
if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) {
log.info("User initiated fetch of External Compaction info");
Set<ServerId> compactors =
getContext().instanceOperations().getServers(ServerId.Type.COMPACTOR);
log.debug("Found compactors: " + compactors);
ecInfo.setFetchedTimeMillis(System.currentTimeMillis());
ecInfo.setCompactors(compactors);
ecInfo.setCoordinatorHost(coordinatorHost);

ecInfoFetchedNanos = System.nanoTime();
return compactorInfoSupplier.get();
}

/**
* @return running compactions. Values are cached and refresh after
* {@link #expirationTimeMinutes}.
*/
public RunningCompactions getRunningCompactions() {
return externalCompactionsSupplier.get().runningCompactions;
}

/**
* @return running compactor details. Values are cached and refresh after
* {@link #expirationTimeMinutes}.
*/
public RunningCompactorDetails getRunningCompactorDetails(ExternalCompactionId ecid) {
TExternalCompaction extCompaction =
externalCompactionsSupplier.get().ecRunningMap.get(ecid.canonical());
if (extCompaction == null) {
return null;
}
return new RunningCompactorDetails(extCompaction);
}

private Map<HostAndPort,ScanStats> fetchScans(Collection<ServerId> servers) {
ServerContext context = getContext();
Map<HostAndPort,ScanStats> scans = new HashMap<>();
for (ServerId server : servers) {
final HostAndPort parsedServer = HostAndPort.fromString(server.toHostPortString());
TabletScanClientService.Client client = null;
try {
client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context);
List<ActiveScan> activeScans = client.getActiveScans(null, context.rpcCreds());
scans.put(parsedServer, new ScanStats(activeScans));
} catch (Exception ex) {
log.error("Failed to get active scans from {}", server, ex);
} finally {
ThriftUtil.returnClient(client, context);
}
}
return Collections.unmodifiableMap(scans);
}

private Map<HostAndPort,ScanStats> fetchTServerScans() {
return fetchScans(getContext().instanceOperations().getServers(TABLET_SERVER));
}

private Map<HostAndPort,ScanStats> fetchSServerScans() {
return fetchScans(getContext().instanceOperations().getServers(SCAN_SERVER));
}

private Map<HostAndPort,CompactionStats> fetchCompactions() {
ServerContext context = getContext();
Map<HostAndPort,CompactionStats> allCompactions = new HashMap<>();
for (ServerId server : context.instanceOperations().getServers(TABLET_SERVER)) {
final HostAndPort parsedServer = HostAndPort.fromString(server.toHostPortString());
Client tserver = null;
try {
tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context);
var compacts = tserver.getActiveCompactions(null, context.rpcCreds());
allCompactions.put(parsedServer, new CompactionStats(compacts));
} catch (Exception ex) {
log.debug("Failed to get active compactions from {}", server, ex);
} finally {
ThriftUtil.returnClient(tserver, context);
}
}
return Collections.unmodifiableMap(allCompactions);
}

private ExternalCompactionInfo fetchCompactorsInfo() {
Set<ServerId> compactors =
getContext().instanceOperations().getServers(ServerId.Type.COMPACTOR);
log.debug("Found compactors: {}", compactors);
ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
ecInfo.setFetchedTimeMillis(System.currentTimeMillis());
ecInfo.setCompactors(compactors);
ecInfo.setCoordinatorHost(coordinatorHost);
return ecInfo;
}

Expand All @@ -593,7 +658,7 @@ private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() {
throw new IllegalStateException(coordinatorMissingMsg);
}
var ccHost = coordinatorHost.orElseThrow();
log.info("User initiated fetch of running External Compactions from " + ccHost);
log.info("User initiated fetch of running External Compactions from {}", ccHost);
try {
CompactionCoordinatorService.Client client =
ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, ccHost, getContext());
Expand All @@ -614,95 +679,6 @@ private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() {
}
}

public RunningCompactions getRunnningCompactions() {
return extCompactionSnapshot.get().runningCompactions;
}

public RunningCompactorDetails getRunningCompactorDetails(ExternalCompactionId ecid) {
TExternalCompaction extCompaction =
extCompactionSnapshot.get().ecRunningMap.get(ecid.canonical());
if (extCompaction == null) {
return null;
}
return new RunningCompactorDetails(extCompaction);
}

private void fetchScans() {
final ServerContext context = getContext();
final Set<ServerId> servers = new HashSet<>();
servers.addAll(context.instanceOperations().getServers(ServerId.Type.SCAN_SERVER));
servers.addAll(context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER));

for (ServerId server : servers) {
TabletScanClientService.Client tserver = null;
try {
HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort());
tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context);
List<ActiveScan> scans = tserver.getActiveScans(null, context.rpcCreds());
tserverScans.put(parsedServer, new ScanStats(scans));
scansFetchedNanos = System.nanoTime();
} catch (Exception ex) {
log.error("Failed to get active scans from {}", server, ex);
} finally {
ThriftUtil.returnClient(tserver, context);
}
}
// Age off old scan information
Iterator<Entry<HostAndPort,ScanStats>> tserverIter = tserverScans.entrySet().iterator();
// clock time used for fetched for date friendly display
long now = System.currentTimeMillis();
while (tserverIter.hasNext()) {
Entry<HostAndPort,ScanStats> entry = tserverIter.next();
if (now - entry.getValue().fetched > ageOffEntriesMillis) {
tserverIter.remove();
}
}
}

private void fetchCompactions() {
final ServerContext context = getContext();

for (ServerId server : context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) {
final HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort());
Client tserver = null;
try {
tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context);
var compacts = tserver.getActiveCompactions(null, context.rpcCreds());
allCompactions.put(parsedServer, new CompactionStats(compacts));
compactsFetchedNanos = System.nanoTime();
} catch (Exception ex) {
log.debug("Failed to get active compactions from {}", server, ex);
} finally {
ThriftUtil.returnClient(tserver, context);
}
}
for (ServerId server : context.instanceOperations().getServers(ServerId.Type.COMPACTOR)) {
final HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort());
CompactorService.Client compactor = null;
try {
compactor = ThriftUtil.getClient(ThriftClientTypes.COMPACTOR, parsedServer, context);
var compacts = compactor.getActiveCompactions(null, context.rpcCreds());
allCompactions.put(parsedServer, new CompactionStats(compacts));
compactsFetchedNanos = System.nanoTime();
} catch (Exception ex) {
log.debug("Failed to get active compactions from {}", server, ex);
} finally {
ThriftUtil.returnClient(compactor, context);
}
}

// Age off old compaction information
var entryIter = allCompactions.entrySet().iterator();
// clock time used for fetched for date friendly display
long now = System.currentTimeMillis();
while (entryIter.hasNext()) {
var entry = entryIter.next();
if (now - entry.getValue().fetched > ageOffEntriesMillis) {
entryIter.remove();
}
}
}

/**
* Get the monitor lock in ZooKeeper
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Compactors getCompactors() {
@Path("running")
@GET
public RunningCompactions getRunning() {
return monitor.getRunnningCompactions();
return monitor.getRunningCompactions();
}

@Path("details")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public class HelpCommand extends Command {
@Override
public int execute(final String fullCommand, final CommandLine cl, final Shell shellState)
throws ShellCommandException, IOException {
int numColumns = shellState.getTerminal().getWidth();
int numColumns =
(shellState.getTerminal().getWidth() == 0) ? 80 : shellState.getTerminal().getWidth();
if (cl.hasOption(noWrapOpt.getOpt())) {
numColumns = Integer.MAX_VALUE;
}
Expand Down

0 comments on commit c0fa497

Please sign in to comment.