Skip to content

Commit

Permalink
Track lastAccessTime in Tablet, not in TabletServer collection (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion authored Oct 18, 2023
1 parent 2782d1d commit 0f4d1bb
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
Expand All @@ -38,12 +37,12 @@ public class UnloaderParamsImpl implements UnloaderParams {
private final SortedMap<TabletId,Long> online;
private final Set<KeyExtent> unloads;

public UnloaderParamsImpl(TableId tid, ServiceEnvironment env, Map<KeyExtent,AtomicLong> online,
public UnloaderParamsImpl(TableId tid, ServiceEnvironment env, Map<KeyExtent,Long> online,
Set<KeyExtent> unload) {
this.tid = tid;
this.env = env;
this.online = new TreeMap<>();
online.forEach((k, v) -> this.online.put(new TabletIdImpl(k), v.get()));
online.forEach((k, v) -> this.online.put(new TabletIdImpl(k), v));
this.unloads = unload;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.admin.TabletHostingGoal;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.manager.thrift.TabletLoadState;
Expand Down Expand Up @@ -195,6 +194,7 @@ public void run() {
server.recentlyUnloadedCache.remove(tablet.getExtent());
}
}

tablet = null; // release this reference
successful = true;
} catch (Exception e) {
Expand All @@ -213,9 +213,6 @@ public void run() {

if (successful) {
server.enqueueManagerMessage(new TabletStatusMessage(TabletLoadState.LOADED, extent));
if (tabletMetadata.getHostingGoal() == TabletHostingGoal.ONDEMAND) {
server.insertOnDemandAccessTime(extent);
}
} else {
synchronized (server.unopenedTablets) {
synchronized (server.openingTablets) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HostAndPort;

import io.opentelemetry.context.Scope;

public class TabletServer extends AbstractServer implements TabletHostingServer {

private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
Expand Down Expand Up @@ -195,8 +193,6 @@ public PausedCompactionMetrics getPausedCompactionMetrics() {
private final AtomicLong syncCounter = new AtomicLong(0);

final OnlineTablets onlineTablets = new OnlineTablets();
private final Map<KeyExtent,AtomicLong> onDemandTabletAccessTimes =
Collections.synchronizedMap(new HashMap<>());
final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<>());
final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<>());
final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap<>(1000));
Expand Down Expand Up @@ -1000,8 +996,8 @@ public SortedMap<KeyExtent,Tablet> getOnlineTablets() {
@Override
public Tablet getOnlineTablet(KeyExtent extent) {
Tablet t = onlineTablets.snapshot().get(extent);
if (t != null && t.isOnDemand()) {
updateOnDemandAccessTime(extent);
if (t != null) {
t.setLastAccessTime();
}
return t;
}
Expand Down Expand Up @@ -1142,28 +1138,6 @@ public int getOnDemandOnlineUnloadedForLowMemory() {
return onDemandUnloadedLowMemory.get();
}

// called from AssignmentHandler
public void insertOnDemandAccessTime(KeyExtent extent) {
if (extent.isMeta()) {
return;
}
onDemandTabletAccessTimes.putIfAbsent(extent, new AtomicLong(System.nanoTime()));
}

// called from getOnlineExtent
private void updateOnDemandAccessTime(KeyExtent extent) {
final long currentTime = System.nanoTime();
AtomicLong l = onDemandTabletAccessTimes.get(extent);
if (l != null) {
l.set(currentTime);
}
}

// called from UnloadTabletHandler
public void removeOnDemandAccessTime(KeyExtent extent) {
onDemandTabletAccessTimes.remove(extent);
}

private boolean isTabletInUse(KeyExtent extent) {
// Don't call getOnlineTablet as that will update the last access time
final Tablet t = onlineTablets.snapshot().get(extent);
Expand All @@ -1182,42 +1156,29 @@ public void evaluateOnDemandTabletsForUnload() {

final SortedMap<KeyExtent,Tablet> online = getOnlineTablets();

// Find and remove access time entries for KeyExtents
// that are no longer in the onlineTablets collection
Set<KeyExtent> missing = onDemandTabletAccessTimes.keySet().stream()
.filter(k -> !online.containsKey(k)).collect(Collectors.toSet());
if (!missing.isEmpty()) {
log.debug("Removing onDemandAccessTimes for tablets as tablets no longer online: {}",
missing);
missing.forEach(onDemandTabletAccessTimes::remove);
if (onDemandTabletAccessTimes.isEmpty()) {
return;
}
}

// It's possible, from a tablet split or merge for example,
// that there is an on-demand tablet that is hosted for which
// we have no access time. Add any missing online on-demand
// tablets
online.forEach((k, v) -> {
if (v.isOnDemand() && !onDemandTabletAccessTimes.containsKey(k)) {
insertOnDemandAccessTime(k);
// Sort the extents so that we can process them by table.
final SortedMap<KeyExtent,Long> sortedOnDemandExtents = new TreeMap<>();
// We only want to operate on OnDemand Tablets
online.entrySet().forEach((e) -> {
if (e.getValue().isOnDemand()) {
sortedOnDemandExtents.put(e.getKey(), e.getValue().getLastAccessTime());
}
});

log.debug("Evaluating online on-demand tablets: {}", onDemandTabletAccessTimes);

if (onDemandTabletAccessTimes.isEmpty()) {
if (sortedOnDemandExtents.isEmpty()) {
return;
}

log.debug("Evaluating online on-demand tablets: {}", sortedOnDemandExtents);

// If the TabletServer is running low on memory, don't call the SPI
// plugin to evaluate which on-demand tablets to unload, just get the
// on-demand tablet with the oldest access time and unload it.
if (getContext().getLowMemoryDetector().isRunningLowOnMemory()) {
final SortedMap<Long,KeyExtent> timeSortedOnDemandExtents = new TreeMap<>();
onDemandTabletAccessTimes.forEach((k, v) -> timeSortedOnDemandExtents.put(v.get(), k));
Long oldestAccessTime = timeSortedOnDemandExtents.firstKey();
long currTime = System.nanoTime();
sortedOnDemandExtents.forEach((k, v) -> timeSortedOnDemandExtents.put(v - currTime, k));
Long oldestAccessTime = timeSortedOnDemandExtents.lastKey();
KeyExtent oldestKeyExtent = timeSortedOnDemandExtents.get(oldestAccessTime);
log.warn("Unloading on-demand tablet: {} for table: {} due to low memory", oldestKeyExtent,
oldestKeyExtent.tableId());
Expand All @@ -1226,12 +1187,6 @@ public void evaluateOnDemandTabletsForUnload() {
return;
}

// onDemandTabletAccessTimes is a HashMap. Sort the extents
// so that we can process them by table.
final SortedMap<KeyExtent,AtomicLong> sortedOnDemandExtents =
new TreeMap<KeyExtent,AtomicLong>();
sortedOnDemandExtents.putAll(onDemandTabletAccessTimes);

// The access times are updated when getOnlineTablet is called by other methods,
// but may not necessarily capture whether or not the Tablet is currently being used.
// For example, getOnlineTablet is called from startScan but not from continueScan.
Expand Down Expand Up @@ -1276,7 +1231,7 @@ public void evaluateOnDemandTabletsForUnload() {
});
tableIds.forEach(tid -> {
Map<KeyExtent,
AtomicLong> subset = sortedOnDemandExtents.entrySet().stream()
Long> subset = sortedOnDemandExtents.entrySet().stream()
.filter((e) -> e.getKey().tableId().equals(tid))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
Set<KeyExtent> onDemandTabletsToUnload = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,6 @@ public void run() {
// exceptions
server.recentlyUnloadedCache.put(extent, System.currentTimeMillis());
server.onlineTablets.remove(extent);
if (t.isOnDemand()) {
server.removeOnDemandAccessTime(extent);
}

try {
TServerInstance instance = server.getTabletSession();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ enum CompactionState {

private final int logId;

private volatile long lastAccessTime = System.nanoTime();

public int getLogId() {
return logId;
}
Expand Down Expand Up @@ -1502,4 +1504,13 @@ public void refreshMetadata(RefreshPurpose refreshPurpose) {
scanfileManager.removeFilesAfterScan(getMetadata().getScans());
}
}

public long getLastAccessTime() {
return lastAccessTime;
}

public void setLastAccessTime() {
this.lastAccessTime = System.nanoTime();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
Expand Down Expand Up @@ -63,13 +62,13 @@ public void evaluationTest() {
expect(tconf.get(DefaultOnDemandTabletUnloader.INACTIVITY_THRESHOLD))
.andReturn(inactivityTimeSeconds);
expect(tconf.newDeriver(anyObject())).andReturn(Map::of).anyTimes();
Map<KeyExtent,AtomicLong> online = new HashMap<>();
Map<KeyExtent,Long> online = new HashMap<>();
// add an extent whose last access time is less than the currentTime - inactivityTime
final KeyExtent activeExtent = new KeyExtent(tid, new Text("m"), new Text("a"));
online.put(activeExtent, new AtomicLong(currentTime - inactivityTime - 10));
online.put(activeExtent, currentTime - inactivityTime - 10);
// add an extent whose last access time is greater than the currentTime - inactivityTime
final KeyExtent inactiveExtent = new KeyExtent(tid, new Text("z"), new Text("m"));
online.put(inactiveExtent, new AtomicLong(currentTime - inactivityTime + 10));
online.put(inactiveExtent, currentTime - inactivityTime + 10);
Set<KeyExtent> onDemandTabletsToUnload = new HashSet<>();

replay(context, tconf);
Expand Down

0 comments on commit 0f4d1bb

Please sign in to comment.