Skip to content

Commit

Permalink
Merge branch '2.1' into 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Dec 3, 2024
2 parents 39c753e + df053cc commit 1224686
Showing 1 changed file with 31 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ private long balanceTablets() {
}
// Create a view of the tserver status such that it only contains the tables
// for this level in the tableMap.
final SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel =
SortedMap<TServerInstance,TabletServerStatus> tserverStatusForLevel =
createTServerStatusView(dl, tserverStatus);
// Construct the Thrift variant of the map above for the BalancerParams
final SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancerLevel =
Expand All @@ -1048,17 +1048,36 @@ private long balanceTablets() {
int attemptNum = 0;
do {
log.debug("Balancing for tables at level {}, times-in-loop: {}", dl, ++attemptNum);
params = BalanceParamsImpl.fromThrift(tserverStatusForBalancerLevel,
tserverStatusForLevel, partitionedMigrations.get(dl));

SortedMap<TabletServerId,TServerStatus> statusForBalancerLevel =
tserverStatusForBalancerLevel;
if (attemptNum > 1 && (dl == DataLevel.ROOT || dl == DataLevel.METADATA)) {
// If we are still migrating then perform a re-check on the tablet
// servers to make sure non of them have failed.
Set<TServerInstance> currentServers = tserverSet.getCurrentServers();
tserverStatus = gatherTableInformation(currentServers);
// Create a view of the tserver status such that it only contains the tables
// for this level in the tableMap.
tserverStatusForLevel = createTServerStatusView(dl, tserverStatus);
final SortedMap<TabletServerId,TServerStatus> tserverStatusForBalancerLevel2 =
new TreeMap<>();
tserverStatusForLevel.forEach((tsi, status) -> tserverStatusForBalancerLevel2
.put(new TabletServerIdImpl(tsi), TServerStatusImpl.fromThrift(status)));
statusForBalancerLevel = tserverStatusForBalancerLevel2;
}

params = BalanceParamsImpl.fromThrift(statusForBalancerLevel, tserverStatusForLevel,
partitionedMigrations.get(dl));
wait = Math.max(tabletBalancer.balance(params), wait);
migrationsOutForLevel = params.migrationsOut().size();
for (TabletMigration m : checkMigrationSanity(tserverStatusForBalancerLevel.keySet(),
params.migrationsOut())) {
migrationsOutForLevel = 0;
for (TabletMigration m : checkMigrationSanity(statusForBalancerLevel.keySet(),
params.migrationsOut(), dl)) {
final KeyExtent ke = KeyExtent.fromTabletId(m.getTablet());
if (migrations.containsKey(ke)) {
log.warn("balancer requested migration more than once, skipping {}", m);
continue;
}
migrationsOutForLevel++;
migrations.put(ke, TabletServerIdImpl.toThrift(m.getNewTabletServer()));
log.debug("migration {}", m);
}
Expand All @@ -1082,11 +1101,16 @@ private long balanceTablets() {
}

private List<TabletMigration> checkMigrationSanity(Set<TabletServerId> current,
List<TabletMigration> migrations) {
List<TabletMigration> migrations, DataLevel level) {
return migrations.stream().filter(m -> {
boolean includeMigration = false;
if (m.getTablet() == null) {
log.error("Balancer gave back a null tablet {}", m);
} else if (DataLevel.of(m.getTablet().getTable()) != level) {
log.trace(
"Balancer wants to move a tablet ({}) outside of the current processing level ({}), "
+ "ignoring and should be processed at the correct level ({})",
m.getTablet(), level, DataLevel.of(m.getTablet().getTable()));
} else if (m.getNewTabletServer() == null) {
log.error("Balancer did not set the destination {}", m);
} else if (m.getOldTabletServer() == null) {
Expand Down

0 comments on commit 1224686

Please sign in to comment.