Skip to content

Commit

Permalink
Acquires tablet servers and their groups atomically
Browse files Browse the repository at this point in the history
Fixes two bugs that could cause the set of tservers and map of group
tservers to have an inconsistent set of tservers.

Modified LiveTserverSet so that the set of tablet servers and the tablet
servers resource groups can be acquired atomically.  The code was
acquiring this information at two different times with two different lock
acquisitions, which could have led to race condtions resulting
differences in set and the map.

Also modified TGW.TabletLists to filter out shutting down servers from
the grouped tservers in addition to filtering from the tserver set.
  • Loading branch information
keith-turner committed Oct 27, 2023
1 parent 40b41d1 commit 54ecfe7
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -363,14 +364,29 @@ public synchronized TServerConnection getConnection(TServerInstance server) {
return tServerInfo.connection;
}

public synchronized Set<TServerInstance> getCurrentServers() {
return new HashSet<>(currentInstances.keySet());
public static class LiveTServersSnapshot {
public final Set<TServerInstance> tservers;
public final Map<String,Set<TServerInstance>> tserverGroups;

public LiveTServersSnapshot(Set<TServerInstance> currentServers,
Map<String,Set<TServerInstance>> serverGroups) {
this.tservers = Set.copyOf(currentServers);
Map<String,Set<TServerInstance>> copy = new HashMap<>();
serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v)));
this.tserverGroups = Collections.unmodifiableMap(copy);

// TODO could check consistency of tservers vs tserverGroups to ensure that every all tservers
// in one exists in the other
}
}

public synchronized Map<String,Set<TServerInstance>> getCurrentServersGroups() {
Map<String,Set<TServerInstance>> copy = new HashMap<>();
currentGroups.forEach((k, v) -> copy.put(k, new HashSet<>(v)));
return copy;
public synchronized LiveTServersSnapshot getSnapshot() {
// TODO could precompute immutable snapshot on change instead of each time this is requested.
return new LiveTServersSnapshot(currentInstances.keySet(), currentGroups);
}

public synchronized Set<TServerInstance> getCurrentServers() {
return new HashSet<>(currentInstances.keySet());
}

public synchronized int size() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.manager.thrift.ManagerState;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot;

public interface CurrentState {

Set<TableId> onlineTables();

Set<TServerInstance> onlineTabletServers();

Map<String,Set<TServerInstance>> tServerResourceGroups();
LiveTServersSnapshot tserversSnapshot();

Set<TServerInstance> shutdownServers();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.server.compaction.CompactionJobGenerator;
import org.apache.accumulo.server.iterators.TabletIteratorEnvironment;
import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot;
import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
Expand Down Expand Up @@ -304,13 +305,14 @@ public static void configureScanner(final ScannerBase scanner, final CurrentStat
IteratorSetting tabletChange =
new IteratorSetting(1001, "ManagerTabletInfoIterator", TabletManagementIterator.class);
if (state != null) {
TabletManagementIterator.setCurrentServers(tabletChange, state.onlineTabletServers());
LiveTServersSnapshot tserversSnapshot = state.tserversSnapshot();
TabletManagementIterator.setCurrentServers(tabletChange, tserversSnapshot.tservers);
TabletManagementIterator.setOnlineTables(tabletChange, state.onlineTables());
TabletManagementIterator.setMigrations(tabletChange, state.migrationsSnapshot());
TabletManagementIterator.setManagerState(tabletChange, state.getManagerState());
TabletManagementIterator.setShuttingDown(tabletChange, state.shutdownServers());
TabletManagementIterator.setTServerResourceGroups(tabletChange,
state.tServerResourceGroups());
tserversSnapshot.tserverGroups);
setCompactionHints(tabletChange, state.getCompactionHints());
}
scanner.addScanIterator(tabletChange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import org.apache.accumulo.server.compaction.CompactionConfigStorage;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot;
import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl;
import org.apache.accumulo.server.manager.state.CurrentState;
Expand Down Expand Up @@ -819,12 +820,11 @@ public void run() {
}

private long updateStatus() {
Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
var tseversSnapshot = tserverSet.getSnapshot();
TreeMap<TabletServerId,TServerStatus> temp = new TreeMap<>();
tserverStatus = gatherTableInformation(currentServers, temp);
tserverStatus = gatherTableInformation(tseversSnapshot.tservers, temp);
tserverStatusForBalancer = Collections.unmodifiableSortedMap(temp);
tServerGroupingForBalancer =
Collections.unmodifiableMap(tserverSet.getCurrentServersGroups());
tServerGroupingForBalancer = tseversSnapshot.tserverGroups;
checkForHeldServer(tserverStatus);

if (!badServers.isEmpty()) {
Expand All @@ -838,7 +838,7 @@ private long updateStatus() {
log.debug("not balancing while shutting down servers {}", serversToShutdown);
} else {
for (TabletGroupWatcher tgw : watchers) {
if (!tgw.isSameTserversAsLastScan(currentServers)) {
if (!tgw.isSameTserversAsLastScan(tseversSnapshot.tservers)) {
log.debug("not balancing just yet, as collection of live tservers is in flux");
return DEFAULT_WAIT_FOR_WATCHER;
}
Expand Down Expand Up @@ -1606,12 +1606,12 @@ public Set<TableId> onlineTables() {

@Override
public Set<TServerInstance> onlineTabletServers() {
return tserverSet.getCurrentServers();
return tserverSet.getSnapshot().tservers;
}

@Override
public Map<String,Set<TServerInstance>> tServerResourceGroups() {
return tserverSet.getCurrentServersGroups();
public LiveTServersSnapshot tserversSnapshot() {
return tserverSet.getSnapshot();
}

// recovers state from the persistent transaction to shutdown a server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -81,6 +82,7 @@
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection;
import org.apache.accumulo.server.manager.state.Assignment;
import org.apache.accumulo.server.manager.state.ClosableIterator;
Expand Down Expand Up @@ -173,10 +175,30 @@ private static class TabletLists {

public TabletLists(Manager m, SortedMap<TServerInstance,TabletServerStatus> curTServers,
Map<String,Set<TServerInstance>> grouping) {
var destinationsMod = new TreeMap<>(curTServers);
destinationsMod.keySet().removeAll(m.serversToShutdown);
this.destinations = Collections.unmodifiableSortedMap(destinationsMod);
this.currentTServerGrouping = grouping;
synchronized (m.serversToShutdown) {
var destinationsMod = new TreeMap<>(curTServers);
if (!m.serversToShutdown.isEmpty()) {
// Remove servers that are in the process of shutting down from the lists of tablet
// servers.
destinationsMod.keySet().removeAll(m.serversToShutdown);
HashMap<String,Set<TServerInstance>> groupingCopy = new HashMap<>();
grouping.forEach((group, groupsServers) -> {
if (Collections.disjoint(groupsServers, m.serversToShutdown)) {
groupingCopy.put(group, groupsServers);
} else {
var serversCopy = new HashSet<>(groupsServers);
serversCopy.removeAll(m.serversToShutdown);
groupingCopy.put(group, Collections.unmodifiableSet(serversCopy));
}
});

this.currentTServerGrouping = Collections.unmodifiableMap(groupingCopy);
} else {
this.currentTServerGrouping = grouping;
}

this.destinations = Collections.unmodifiableSortedMap(destinationsMod);
}
}

public void reset() {
Expand Down Expand Up @@ -218,15 +240,17 @@ public void run() {
continue;
}

var currentTservers = getCurrentTservers();
if (currentTservers.isEmpty()) {
LiveTServerSet.LiveTServersSnapshot tservers = manager.tserverSet.getSnapshot();
var tserversStatus = getTserversStatus(tservers.tservers);

if (tserversStatus.isEmpty()) {
setNeedsFullScan();
continue;
}

try (var iter = store.iterator(ranges)) {
long t1 = System.currentTimeMillis();
manageTablets(iter, currentTservers, false);
manageTablets(iter, tserversStatus, tservers.tserverGroups, false);
long t2 = System.currentTimeMillis();
Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds for %,d ranges",
store.name(), (t2 - t1) / 1000., ranges.size()));
Expand Down Expand Up @@ -296,13 +320,14 @@ private static class TableMgmtStats {
}

private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
SortedMap<TServerInstance,TabletServerStatus> currentTServers, boolean isFullScan)
SortedMap<TServerInstance,TabletServerStatus> tseversStatus,
Map<String,Set<TServerInstance>> tserverGroups, boolean isFullScan)
throws BadLocationStateException, TException, DistributedStoreException, WalMarkerException,
IOException {

TableMgmtStats tableMgmtStats = new TableMgmtStats();
final boolean shuttingDownAllTabletServers =
manager.serversToShutdown.equals(currentTServers.keySet());
manager.serversToShutdown.equals(tseversStatus.keySet());
if (shuttingDownAllTabletServers && !isFullScan) {
// If we are shutting down all of the TabletServers, then don't process any events
// from the EventCoordinator.
Expand All @@ -312,16 +337,13 @@ private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,

int unloaded = 0;

final Map<String,Set<TServerInstance>> currentTServerGrouping =
manager.tserverSet.getCurrentServersGroups();

TabletLists tLists = new TabletLists(manager, currentTServers, currentTServerGrouping);
TabletLists tLists = new TabletLists(manager, tseversStatus, tserverGroups);

CompactionJobGenerator compactionGenerator = new CompactionJobGenerator(
new ServiceEnvironmentImpl(manager.getContext()), manager.getCompactionHints());

final Map<TabletServerId,String> resourceGroups = new HashMap<>();
manager.tServerResourceGroups().forEach((group, tservers) -> {
tserverGroups.forEach((group, tservers) -> {
tservers.stream().map(TabletServerIdImpl::new)
.forEach(tabletServerId -> resourceGroups.put(tabletServerId, group));
});
Expand Down Expand Up @@ -360,7 +382,7 @@ private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,

// Don't overwhelm the tablet servers with work
if (tLists.unassigned.size() + unloaded
> Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) {
> Manager.MAX_TSERVER_WORK_CHUNK * tseversStatus.size()) {
flushChanges(tLists);
tLists.reset();
unloaded = 0;
Expand All @@ -370,7 +392,7 @@ private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,

TabletGoalState goal = manager.getGoalState(tm);
TabletState state =
TabletState.compute(tm, currentTServers.keySet(), manager.tabletBalancer, resourceGroups);
TabletState.compute(tm, tseversStatus.keySet(), manager.tabletBalancer, resourceGroups);

final Location location = tm.getLocation();
Location current = null;
Expand Down Expand Up @@ -402,7 +424,7 @@ private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
if (Manager.log.isTraceEnabled()) {
Manager.log.trace(
"[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {} actions:{}",
store.name(), manager.serversToShutdown.equals(currentTServers.keySet()),
store.name(), manager.serversToShutdown.equals(tseversStatus.keySet()),
dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tm.getExtent(),
state, goal, actions);
}
Expand Down Expand Up @@ -541,10 +563,11 @@ private TableMgmtStats manageTablets(Iterator<TabletManagement> iter,
return tableMgmtStats;
}

private SortedMap<TServerInstance,TabletServerStatus> getCurrentTservers() {
private SortedMap<TServerInstance,TabletServerStatus>
getTserversStatus(Set<TServerInstance> currentServers) {
// Get the current status for the current list of tservers
final SortedMap<TServerInstance,TabletServerStatus> currentTServers = new TreeMap<>();
for (TServerInstance entry : manager.tserverSet.getCurrentServers()) {
for (TServerInstance entry : currentServers) {
currentTServers.put(entry, manager.tserverStatus.get(entry));
}
return currentTServers;
Expand All @@ -563,11 +586,12 @@ public void run() {
final long waitTimeBetweenScans = manager.getConfiguration()
.getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL);

var currentTServers = getCurrentTservers();
LiveTServerSet.LiveTServersSnapshot tservers = manager.tserverSet.getSnapshot();
var tserversStatus = getTserversStatus(tservers.tservers);

ClosableIterator<TabletManagement> iter = null;
try {
if (currentTServers.isEmpty()) {
if (tserversStatus.isEmpty()) {
eventHandler.waitForFullScan(waitTimeBetweenScans);
synchronized (this) {
lastScanServers = Collections.emptySortedSet();
Expand All @@ -584,7 +608,7 @@ public void run() {
eventHandler.clearNeedsFullScan();

iter = store.iterator();
var tabletMgmtStats = manageTablets(iter, currentTServers, true);
var tabletMgmtStats = manageTablets(iter, tserversStatus, tservers.tserverGroups, true);

// provide stats after flushing changes to avoid race conditions w/ delete table
stats.end(managerState);
Expand All @@ -607,9 +631,9 @@ public void run() {
}

synchronized (this) {
lastScanServers = ImmutableSortedSet.copyOf(currentTServers.keySet());
lastScanServers = ImmutableSortedSet.copyOf(tserversStatus.keySet());
}
if (manager.tserverSet.getCurrentServers().equals(currentTServers.keySet())) {
if (manager.tserverSet.getCurrentServers().equals(tserversStatus.keySet())) {
Manager.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(),
waitTimeBetweenScans / 1000.));
eventHandler.waitForFullScan(waitTimeBetweenScans);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.server.manager.LiveTServerSet;
import org.apache.accumulo.server.manager.state.CurrentState;
import org.apache.accumulo.server.manager.state.TabletManagementIterator;
import org.apache.hadoop.io.Text;
Expand Down Expand Up @@ -372,6 +373,11 @@ public Set<TServerInstance> onlineTabletServers() {
return tservers;
}

@Override
public LiveTServerSet.LiveTServersSnapshot tserversSnapshot() {
return new LiveTServerSet.LiveTServersSnapshot(onlineTabletServers(), new HashMap<>());
}

@Override
public Set<TableId> onlineTables() {
Set<TableId> onlineTables = context.getTableIdToNameMap().keySet();
Expand All @@ -380,11 +386,6 @@ public Set<TableId> onlineTables() {
return this.onlineTables;
}

@Override
public Map<String,Set<TServerInstance>> tServerResourceGroups() {
return new HashMap<>();
}

@Override
public Set<KeyExtent> migrationsSnapshot() {
return Collections.emptySet();
Expand Down

0 comments on commit 54ecfe7

Please sign in to comment.