diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java index 018b7eaf896..16b980fdf13 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java @@ -22,6 +22,7 @@ import static org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy.SKIP; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -363,14 +364,29 @@ public synchronized TServerConnection getConnection(TServerInstance server) { return tServerInfo.connection; } - public synchronized Set getCurrentServers() { - return new HashSet<>(currentInstances.keySet()); + public static class LiveTServersSnapshot { + public final Set tservers; + public final Map> tserverGroups; + + public LiveTServersSnapshot(Set currentServers, + Map> serverGroups) { + this.tservers = Set.copyOf(currentServers); + Map> copy = new HashMap<>(); + serverGroups.forEach((k, v) -> copy.put(k, Set.copyOf(v))); + this.tserverGroups = Collections.unmodifiableMap(copy); + + // TODO could check consistency of tservers vs tserverGroups to ensure that every all tservers + // in one exists in the other + } } - public synchronized Map> getCurrentServersGroups() { - Map> copy = new HashMap<>(); - currentGroups.forEach((k, v) -> copy.put(k, new HashSet<>(v))); - return copy; + public synchronized LiveTServersSnapshot getSnapshot() { + // TODO could precompute immutable snapshot on change instead of each time this is requested. + return new LiveTServersSnapshot(currentInstances.keySet(), currentGroups); + } + + public synchronized Set getCurrentServers() { + return new HashSet<>(currentInstances.keySet()); } public synchronized int size() { diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java index 09caba293ee..1b72fa2a55d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/CurrentState.java @@ -25,6 +25,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.thrift.ManagerState; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; public interface CurrentState { @@ -32,7 +33,7 @@ public interface CurrentState { Set onlineTabletServers(); - Map> tServerResourceGroups(); + LiveTServersSnapshot tserversSnapshot(); Set shutdownServers(); diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java index 41646f41450..21ddb5dc26f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java +++ b/server/base/src/main/java/org/apache/accumulo/server/manager/state/TabletManagementIterator.java @@ -76,6 +76,7 @@ import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.server.compaction.CompactionJobGenerator; import org.apache.accumulo.server.iterators.TabletIteratorEnvironment; +import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -304,13 +305,14 @@ public static void configureScanner(final ScannerBase scanner, final CurrentStat IteratorSetting tabletChange = new IteratorSetting(1001, "ManagerTabletInfoIterator", TabletManagementIterator.class); if (state != null) { - TabletManagementIterator.setCurrentServers(tabletChange, state.onlineTabletServers()); + LiveTServersSnapshot tserversSnapshot = state.tserversSnapshot(); + TabletManagementIterator.setCurrentServers(tabletChange, tserversSnapshot.tservers); TabletManagementIterator.setOnlineTables(tabletChange, state.onlineTables()); TabletManagementIterator.setMigrations(tabletChange, state.migrationsSnapshot()); TabletManagementIterator.setManagerState(tabletChange, state.getManagerState()); TabletManagementIterator.setShuttingDown(tabletChange, state.shutdownServers()); TabletManagementIterator.setTServerResourceGroups(tabletChange, - state.tServerResourceGroups()); + tserversSnapshot.tserverGroups); setCompactionHints(tabletChange, state.getCompactionHints()); } scanner.addScanIterator(tabletChange); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 5257a610a3c..b6ce0d3b0e9 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -129,6 +129,7 @@ import org.apache.accumulo.server.compaction.CompactionConfigStorage; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.manager.LiveTServerSet; +import org.apache.accumulo.server.manager.LiveTServerSet.LiveTServersSnapshot; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.manager.balancer.BalancerEnvironmentImpl; import org.apache.accumulo.server.manager.state.CurrentState; @@ -819,12 +820,11 @@ public void run() { } private long updateStatus() { - Set currentServers = tserverSet.getCurrentServers(); + var tseversSnapshot = tserverSet.getSnapshot(); TreeMap temp = new TreeMap<>(); - tserverStatus = gatherTableInformation(currentServers, temp); + tserverStatus = gatherTableInformation(tseversSnapshot.tservers, temp); tserverStatusForBalancer = Collections.unmodifiableSortedMap(temp); - tServerGroupingForBalancer = - Collections.unmodifiableMap(tserverSet.getCurrentServersGroups()); + tServerGroupingForBalancer = tseversSnapshot.tserverGroups; checkForHeldServer(tserverStatus); if (!badServers.isEmpty()) { @@ -838,7 +838,7 @@ private long updateStatus() { log.debug("not balancing while shutting down servers {}", serversToShutdown); } else { for (TabletGroupWatcher tgw : watchers) { - if (!tgw.isSameTserversAsLastScan(currentServers)) { + if (!tgw.isSameTserversAsLastScan(tseversSnapshot.tservers)) { log.debug("not balancing just yet, as collection of live tservers is in flux"); return DEFAULT_WAIT_FOR_WATCHER; } @@ -1606,12 +1606,12 @@ public Set onlineTables() { @Override public Set onlineTabletServers() { - return tserverSet.getCurrentServers(); + return tserverSet.getSnapshot().tservers; } @Override - public Map> tServerResourceGroups() { - return tserverSet.getCurrentServersGroups(); + public LiveTServersSnapshot tserversSnapshot() { + return tserverSet.getSnapshot(); } // recovers state from the persistent transaction to shutdown a server diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index cdb6953e8aa..ba547d9e980 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -81,6 +82,7 @@ import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; +import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.manager.state.Assignment; import org.apache.accumulo.server.manager.state.ClosableIterator; @@ -173,10 +175,30 @@ private static class TabletLists { public TabletLists(Manager m, SortedMap curTServers, Map> grouping) { - var destinationsMod = new TreeMap<>(curTServers); - destinationsMod.keySet().removeAll(m.serversToShutdown); - this.destinations = Collections.unmodifiableSortedMap(destinationsMod); - this.currentTServerGrouping = grouping; + synchronized (m.serversToShutdown) { + var destinationsMod = new TreeMap<>(curTServers); + if (!m.serversToShutdown.isEmpty()) { + // Remove servers that are in the process of shutting down from the lists of tablet + // servers. + destinationsMod.keySet().removeAll(m.serversToShutdown); + HashMap> groupingCopy = new HashMap<>(); + grouping.forEach((group, groupsServers) -> { + if (Collections.disjoint(groupsServers, m.serversToShutdown)) { + groupingCopy.put(group, groupsServers); + } else { + var serversCopy = new HashSet<>(groupsServers); + serversCopy.removeAll(m.serversToShutdown); + groupingCopy.put(group, Collections.unmodifiableSet(serversCopy)); + } + }); + + this.currentTServerGrouping = Collections.unmodifiableMap(groupingCopy); + } else { + this.currentTServerGrouping = grouping; + } + + this.destinations = Collections.unmodifiableSortedMap(destinationsMod); + } } public void reset() { @@ -218,15 +240,17 @@ public void run() { continue; } - var currentTservers = getCurrentTservers(); - if (currentTservers.isEmpty()) { + LiveTServerSet.LiveTServersSnapshot tservers = manager.tserverSet.getSnapshot(); + var tserversStatus = getTserversStatus(tservers.tservers); + + if (tserversStatus.isEmpty()) { setNeedsFullScan(); continue; } try (var iter = store.iterator(ranges)) { long t1 = System.currentTimeMillis(); - manageTablets(iter, currentTservers, false); + manageTablets(iter, tserversStatus, tservers.tserverGroups, false); long t2 = System.currentTimeMillis(); Manager.log.debug(String.format("[%s]: partial scan time %.2f seconds for %,d ranges", store.name(), (t2 - t1) / 1000., ranges.size())); @@ -296,13 +320,14 @@ private static class TableMgmtStats { } private TableMgmtStats manageTablets(Iterator iter, - SortedMap currentTServers, boolean isFullScan) + SortedMap tseversStatus, + Map> tserverGroups, boolean isFullScan) throws BadLocationStateException, TException, DistributedStoreException, WalMarkerException, IOException { TableMgmtStats tableMgmtStats = new TableMgmtStats(); final boolean shuttingDownAllTabletServers = - manager.serversToShutdown.equals(currentTServers.keySet()); + manager.serversToShutdown.equals(tseversStatus.keySet()); if (shuttingDownAllTabletServers && !isFullScan) { // If we are shutting down all of the TabletServers, then don't process any events // from the EventCoordinator. @@ -312,16 +337,13 @@ private TableMgmtStats manageTablets(Iterator iter, int unloaded = 0; - final Map> currentTServerGrouping = - manager.tserverSet.getCurrentServersGroups(); - - TabletLists tLists = new TabletLists(manager, currentTServers, currentTServerGrouping); + TabletLists tLists = new TabletLists(manager, tseversStatus, tserverGroups); CompactionJobGenerator compactionGenerator = new CompactionJobGenerator( new ServiceEnvironmentImpl(manager.getContext()), manager.getCompactionHints()); final Map resourceGroups = new HashMap<>(); - manager.tServerResourceGroups().forEach((group, tservers) -> { + tserverGroups.forEach((group, tservers) -> { tservers.stream().map(TabletServerIdImpl::new) .forEach(tabletServerId -> resourceGroups.put(tabletServerId, group)); }); @@ -360,7 +382,7 @@ private TableMgmtStats manageTablets(Iterator iter, // Don't overwhelm the tablet servers with work if (tLists.unassigned.size() + unloaded - > Manager.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { + > Manager.MAX_TSERVER_WORK_CHUNK * tseversStatus.size()) { flushChanges(tLists); tLists.reset(); unloaded = 0; @@ -370,7 +392,7 @@ private TableMgmtStats manageTablets(Iterator iter, TabletGoalState goal = manager.getGoalState(tm); TabletState state = - TabletState.compute(tm, currentTServers.keySet(), manager.tabletBalancer, resourceGroups); + TabletState.compute(tm, tseversStatus.keySet(), manager.tabletBalancer, resourceGroups); final Location location = tm.getLocation(); Location current = null; @@ -402,7 +424,7 @@ private TableMgmtStats manageTablets(Iterator iter, if (Manager.log.isTraceEnabled()) { Manager.log.trace( "[{}] Shutting down all Tservers: {}, dependentCount: {} Extent: {}, state: {}, goal: {} actions:{}", - store.name(), manager.serversToShutdown.equals(currentTServers.keySet()), + store.name(), manager.serversToShutdown.equals(tseversStatus.keySet()), dependentWatcher == null ? "null" : dependentWatcher.assignedOrHosted(), tm.getExtent(), state, goal, actions); } @@ -541,10 +563,11 @@ private TableMgmtStats manageTablets(Iterator iter, return tableMgmtStats; } - private SortedMap getCurrentTservers() { + private SortedMap + getTserversStatus(Set currentServers) { // Get the current status for the current list of tservers final SortedMap currentTServers = new TreeMap<>(); - for (TServerInstance entry : manager.tserverSet.getCurrentServers()) { + for (TServerInstance entry : currentServers) { currentTServers.put(entry, manager.tserverStatus.get(entry)); } return currentTServers; @@ -563,11 +586,12 @@ public void run() { final long waitTimeBetweenScans = manager.getConfiguration() .getTimeInMillis(Property.MANAGER_TABLET_GROUP_WATCHER_INTERVAL); - var currentTServers = getCurrentTservers(); + LiveTServerSet.LiveTServersSnapshot tservers = manager.tserverSet.getSnapshot(); + var tserversStatus = getTserversStatus(tservers.tservers); ClosableIterator iter = null; try { - if (currentTServers.isEmpty()) { + if (tserversStatus.isEmpty()) { eventHandler.waitForFullScan(waitTimeBetweenScans); synchronized (this) { lastScanServers = Collections.emptySortedSet(); @@ -584,7 +608,7 @@ public void run() { eventHandler.clearNeedsFullScan(); iter = store.iterator(); - var tabletMgmtStats = manageTablets(iter, currentTServers, true); + var tabletMgmtStats = manageTablets(iter, tserversStatus, tservers.tserverGroups, true); // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(managerState); @@ -607,9 +631,9 @@ public void run() { } synchronized (this) { - lastScanServers = ImmutableSortedSet.copyOf(currentTServers.keySet()); + lastScanServers = ImmutableSortedSet.copyOf(tserversStatus.keySet()); } - if (manager.tserverSet.getCurrentServers().equals(currentTServers.keySet())) { + if (manager.tserverSet.getCurrentServers().equals(tserversStatus.keySet())) { Manager.log.debug(String.format("[%s] sleeping for %.2f seconds", store.name(), waitTimeBetweenScans / 1000.)); eventHandler.waitForFullScan(waitTimeBetweenScans); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java index 9c3c2a37f84..3b1a688ca8d 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/TabletManagementIteratorIT.java @@ -71,6 +71,7 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.server.manager.LiveTServerSet; import org.apache.accumulo.server.manager.state.CurrentState; import org.apache.accumulo.server.manager.state.TabletManagementIterator; import org.apache.hadoop.io.Text; @@ -372,6 +373,11 @@ public Set onlineTabletServers() { return tservers; } + @Override + public LiveTServerSet.LiveTServersSnapshot tserversSnapshot() { + return new LiveTServerSet.LiveTServersSnapshot(onlineTabletServers(), new HashMap<>()); + } + @Override public Set onlineTables() { Set onlineTables = context.getTableIdToNameMap().keySet(); @@ -380,11 +386,6 @@ public Set onlineTables() { return this.onlineTables; } - @Override - public Map> tServerResourceGroups() { - return new HashMap<>(); - } - @Override public Set migrationsSnapshot() { return Collections.emptySet();