Skip to content

Commit

Permalink
Fixed failures in RegexGroupBalanceIT after merging PR apache#5070
Browse files Browse the repository at this point in the history
After merging apache#5070 RegexGroupBalanceIT started failing. Both
GroupBalancer and HostRegexTableLoadBalancer have logic that
throttles the frequency that they can be called do not return
any migrations in this scenario. The change in apache#5070 modified
the frequency in which the balancer is called from once for all
DataLevel's to once per DataLevel. This caused the GroupBalancer
and HostRegexTableLoadBalancer to return migrations for the ROOT
DataLevel, but not the METADATA and USER DataLevels.

The solution in this commit is to push the DataLevel down to
the Balancer in the BalancerParams so that the throttling can
be done at the DataLevel level.
  • Loading branch information
dlmarion committed Dec 4, 2024
1 parent df053cc commit f4a5dd2
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.TabletBalancer;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
Expand All @@ -40,35 +41,39 @@ public class BalanceParamsImpl implements TabletBalancer.BalanceParameters {
private final List<TabletMigration> migrationsOut;
private final SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus;
private final Set<KeyExtent> thriftCurrentMigrations;
private final DataLevel currentDataLevel;

public static BalanceParamsImpl fromThrift(SortedMap<TabletServerId,TServerStatus> currentStatus,
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
Set<KeyExtent> thriftCurrentMigrations) {
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
Set<TabletId> currentMigrations = thriftCurrentMigrations.stream().map(TabletIdImpl::new)
.collect(Collectors.toUnmodifiableSet());

return new BalanceParamsImpl(currentStatus, currentMigrations, new ArrayList<>(),
thriftCurrentStatus, thriftCurrentMigrations);
thriftCurrentStatus, thriftCurrentMigrations, currentLevel);
}

public BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut) {
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
DataLevel currentLevel) {
this.currentStatus = currentStatus;
this.currentMigrations = currentMigrations;
this.migrationsOut = migrationsOut;
this.thriftCurrentStatus = null;
this.thriftCurrentMigrations = null;
this.currentDataLevel = currentLevel;
}

private BalanceParamsImpl(SortedMap<TabletServerId,TServerStatus> currentStatus,
Set<TabletId> currentMigrations, List<TabletMigration> migrationsOut,
SortedMap<TServerInstance,TabletServerStatus> thriftCurrentStatus,
Set<KeyExtent> thriftCurrentMigrations) {
Set<KeyExtent> thriftCurrentMigrations, DataLevel currentLevel) {
this.currentStatus = currentStatus;
this.currentMigrations = currentMigrations;
this.migrationsOut = migrationsOut;
this.thriftCurrentStatus = thriftCurrentStatus;
this.thriftCurrentMigrations = thriftCurrentMigrations;
this.currentDataLevel = currentLevel;
}

@Override
Expand Down Expand Up @@ -100,4 +105,9 @@ public void addMigration(KeyExtent extent, TServerInstance oldServer, TServerIns
TabletServerId newTsid = new TabletServerIdImpl(newServer);
migrationsOut.add(new TabletMigration(id, oldTsid, newTsid));
}

@Override
public String currentLevel() {
return currentDataLevel.name();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
Expand Down Expand Up @@ -68,7 +69,8 @@ public abstract class GroupBalancer implements TabletBalancer {

protected BalancerEnvironment environment;
private final TableId tableId;
private long lastRun = 0;

protected final Map<DataLevel,Long> lastRunTimes = new HashMap<>(DataLevel.values().length);

@Override
public void init(BalancerEnvironment balancerEnvironment) {
Expand Down Expand Up @@ -211,7 +213,9 @@ public long balance(BalanceParameters params) {
return 5000;
}

if (System.currentTimeMillis() - lastRun < getWaitTime()) {
final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel());

if (System.currentTimeMillis() - lastRunTimes.getOrDefault(currentLevel, 0L) < getWaitTime()) {
return 5000;
}

Expand Down Expand Up @@ -275,7 +279,7 @@ public long balance(BalanceParameters params) {

populateMigrations(tservers.keySet(), params.migrationsOut(), moves);

lastRun = System.currentTimeMillis();
lastRunTimes.put(currentLevel, System.currentTimeMillis());

return 5000;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
import org.apache.accumulo.core.manager.balancer.TableStatisticsImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TableStatistics;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
Expand Down Expand Up @@ -181,7 +182,7 @@ static class HrtlbConf {
}

private static final Set<TabletId> EMPTY_MIGRATIONS = Collections.emptySet();
private volatile long lastOOBCheck = System.currentTimeMillis();
protected final Map<DataLevel,Long> lastOOBCheckTimes = new HashMap<>(DataLevel.values().length);
private Map<String,SortedMap<TabletServerId,TServerStatus>> pools = new HashMap<>();
private final Map<TabletId,TabletMigration> migrationsFromLastPass = new HashMap<>();
private final Map<TableId,Long> tableToTimeSinceNoMigrations = new HashMap<>();
Expand Down Expand Up @@ -394,7 +395,10 @@ public long balance(BalanceParameters params) {

Map<String,SortedMap<TabletServerId,TServerStatus>> currentGrouped =
splitCurrentByRegex(params.currentStatus());
if ((now - this.lastOOBCheck) > myConf.oobCheckMillis) {
final DataLevel currentLevel = DataLevel.valueOf(params.currentLevel());

if ((now - this.lastOOBCheckTimes.getOrDefault(currentLevel, System.currentTimeMillis()))
> myConf.oobCheckMillis) {
try {
// Check to see if a tablet is assigned outside the bounds of the pool. If so, migrate it.
for (String table : tableIdMap.keySet()) {
Expand Down Expand Up @@ -454,7 +458,7 @@ public long balance(BalanceParameters params) {
}
} finally {
// this could have taken a while...get a new time
this.lastOOBCheck = System.currentTimeMillis();
this.lastOOBCheckTimes.put(currentLevel, System.currentTimeMillis());
}
}

Expand Down Expand Up @@ -507,8 +511,8 @@ public long balance(BalanceParameters params) {
continue;
}
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
getBalancerForTable(tableId)
.balance(new BalanceParamsImpl(currentView, migrations, newMigrations));
getBalancerForTable(tableId).balance(
new BalanceParamsImpl(currentView, migrations, newMigrations, DataLevel.of(tableId)));

if (newMigrations.isEmpty()) {
tableToTimeSinceNoMigrations.remove(tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.slf4j.Logger;
Expand Down Expand Up @@ -124,10 +125,12 @@ public void getAssignments(AssignmentParameters params) {
public long balance(BalanceParameters params) {
long minBalanceTime = 5_000;
// Iterate over the tables and balance each of them
final DataLevel currentDataLevel = DataLevel.valueOf(params.currentLevel());
for (TableId tableId : environment.getTableIdMap().values()) {
ArrayList<TabletMigration> newMigrations = new ArrayList<>();
long tableBalanceTime = getBalancerForTable(tableId).balance(
new BalanceParamsImpl(params.currentStatus(), params.currentMigrations(), newMigrations));
long tableBalanceTime =
getBalancerForTable(tableId).balance(new BalanceParamsImpl(params.currentStatus(),
params.currentMigrations(), newMigrations, currentDataLevel));
if (tableBalanceTime < minBalanceTime) {
minBalanceTime = tableBalanceTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
Expand Down Expand Up @@ -93,6 +94,14 @@ interface BalanceParameters {
* migrations.
*/
List<TabletMigration> migrationsOut();

/**
* Return the DataLevel name for which the Manager is currently balancing. Balancers should
* return migrations for tables within the current DataLevel.
*
* @return name of current balancing iteration data level
*/
String currentLevel();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,12 @@ protected SortedMap<TabletServerId,TServerStatus> createCurrent(int numTservers)
}
return current;
}

@Override
public long balance(BalanceParameters params) {
long wait = super.balance(params);
super.lastOOBCheckTimes.clear();
return wait;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.manager.balancer.TServerStatusImpl;
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
Expand Down Expand Up @@ -83,7 +84,8 @@ public void balance() {
}

public void balance(final int maxMigrations) {
GroupBalancer balancer = new GroupBalancer(TableId.of("1")) {
TableId tid = TableId.of("1");
GroupBalancer balancer = new GroupBalancer(tid) {

@Override
protected Map<TabletId,TabletServerId> getLocationProvider() {
Expand All @@ -106,10 +108,10 @@ protected int getMaxMigrations() {
}
};

balance(balancer, maxMigrations);
balance(balancer, maxMigrations, tid);
}

public void balance(TabletBalancer balancer, int maxMigrations) {
public void balance(TabletBalancer balancer, int maxMigrations, TableId tid) {

while (true) {
Set<TabletId> migrations = new HashSet<>();
Expand All @@ -121,7 +123,8 @@ public void balance(TabletBalancer balancer, int maxMigrations) {
new org.apache.accumulo.core.master.thrift.TabletServerStatus()));
}

balancer.balance(new BalanceParamsImpl(current, migrations, migrationsOut));
balancer
.balance(new BalanceParamsImpl(current, migrations, migrationsOut, DataLevel.of(tid)));

assertTrue(migrationsOut.size() <= (maxMigrations + 5),
"Max Migration exceeded " + maxMigrations + " " + migrationsOut.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl;
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.apache.accumulo.core.spi.balancer.data.TabletStatistics;
Expand Down Expand Up @@ -107,16 +108,19 @@ public void testConfigurationChanges() {
// getOnlineTabletsForTable
UtilWaitThread.sleep(3000);
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
migrations, migrationsOut));
migrations, migrationsOut, DataLevel.USER));
assertEquals(0, migrationsOut.size());
// Change property, simulate call by TableConfWatcher

config.set(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + BAR.getTableName(), "r01.*");

// Wait to trigger the out of bounds check and the repool check
UtilWaitThread.sleep(10000);
// calls to balance will clear the lastOOBCheckTimes map
// in the HostRegexTableLoadBalancer. For this test we want
// to get into the out of bounds checking code, so we need to
// populate the map with an older time value
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(allTabletServers),
migrations, migrationsOut));
migrations, migrationsOut, DataLevel.USER));
assertEquals(5, migrationsOut.size());
for (TabletMigration migration : migrationsOut) {
assertTrue(migration.getNewTabletServer().getHost().startsWith("192.168.0.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@
import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl;
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
import org.apache.accumulo.core.spi.balancer.data.TabletStatistics;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.util.ConfigurationImpl;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.junit.jupiter.api.Test;

public class HostRegexTableLoadBalancerTest extends BaseHostRegexTableLoadBalancerTest {
Expand Down Expand Up @@ -98,7 +98,7 @@ public void testBalance() {
List<TabletMigration> migrationsOut = new ArrayList<>();
long wait =
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
migrations, migrationsOut));
migrations, migrationsOut, DataLevel.USER));
assertEquals(20000, wait);
// should balance four tablets in one of the tables before reaching max
assertEquals(4, migrationsOut.size());
Expand All @@ -109,7 +109,7 @@ public void testBalance() {
}
migrationsOut.clear();
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
migrations, migrationsOut));
migrations, migrationsOut, DataLevel.USER));
assertEquals(20000, wait);
// should balance four tablets in one of the other tables before reaching max
assertEquals(4, migrationsOut.size());
Expand All @@ -120,7 +120,7 @@ public void testBalance() {
}
migrationsOut.clear();
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
migrations, migrationsOut));
migrations, migrationsOut, DataLevel.USER));
assertEquals(20000, wait);
// should balance four tablets in one of the other tables before reaching max
assertEquals(4, migrationsOut.size());
Expand All @@ -131,7 +131,7 @@ public void testBalance() {
}
migrationsOut.clear();
wait = this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
migrations, migrationsOut));
migrations, migrationsOut, DataLevel.USER));
assertEquals(20000, wait);
// no more balancing to do
assertEquals(0, migrationsOut.size());
Expand All @@ -148,7 +148,7 @@ public void testBalanceWithTooManyOutstandingMigrations() {
migrations.addAll(tableTablets.get(BAR.getTableName()));
long wait =
this.balance(new BalanceParamsImpl(Collections.unmodifiableSortedMap(createCurrent(15)),
migrations, migrationsOut));
migrations, migrationsOut, DataLevel.USER));
assertEquals(20000, wait);
// no migrations should have occurred as 10 is the maxOutstandingMigrations
assertEquals(0, migrationsOut.size());
Expand Down Expand Up @@ -487,13 +487,16 @@ public void testUnassignedWithNoDefaultPool() {

@Test
public void testOutOfBoundsTablets() {
// calls to balance will clear the lastOOBCheckTimes map
// in the HostRegexTableLoadBalancer. For this test we want
// to get into the out of bounds checking code, so we need to
// populate the map with an older time value
this.lastOOBCheckTimes.put(DataLevel.USER, System.currentTimeMillis() / 2);
init(DEFAULT_TABLE_PROPERTIES);
// Wait to trigger the out of bounds check which will call our version of
// getOnlineTabletsForTable
UtilWaitThread.sleep(11000);
Set<TabletId> migrations = new HashSet<>();
List<TabletMigration> migrationsOut = new ArrayList<>();
this.balance(new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut));
this.balance(
new BalanceParamsImpl(createCurrent(15), migrations, migrationsOut, DataLevel.USER));
assertEquals(2, migrationsOut.size());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.accumulo.core.manager.balancer.TabletServerIdImpl;
import org.apache.accumulo.core.manager.balancer.TabletStatisticsImpl;
import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.spi.balancer.data.TServerStatus;
import org.apache.accumulo.core.spi.balancer.data.TabletMigration;
import org.apache.accumulo.core.spi.balancer.data.TabletServerId;
Expand Down Expand Up @@ -202,7 +203,8 @@ public void testUnevenAssignment() {
// balance until we can't balance no more!
while (true) {
List<TabletMigration> migrationsOut = new ArrayList<>();
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut));
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut,
DataLevel.USER));
if (migrationsOut.isEmpty()) {
break;
}
Expand Down Expand Up @@ -244,7 +246,8 @@ public void testUnevenAssignment2() {
// balance until we can't balance no more!
while (true) {
List<TabletMigration> migrationsOut = new ArrayList<>();
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut));
balancer.balance(new BalanceParamsImpl(getAssignments(servers), migrations, migrationsOut,
DataLevel.USER));
if (migrationsOut.isEmpty()) {
break;
}
Expand Down
Loading

0 comments on commit f4a5dd2

Please sign in to comment.