Skip to content

Commit

Permalink
Fix SuspendedTabletsIT
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Dec 12, 2023
1 parent 73821c5 commit b817b71
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public static void afterAll() throws Exception {
SharedMiniClusterBase.stopMiniCluster();
}

public static Map<String,String> getTServerGroups(MiniAccumuloClusterImpl cluster) throws Exception {
public static Map<String,String> getTServerGroups(MiniAccumuloClusterImpl cluster)
throws Exception {

Map<String,String> tservers = new HashMap<>();
ZooCache zk = cluster.getServerContext().getZooCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.IOException;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -37,6 +37,7 @@
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
Expand All @@ -58,6 +59,8 @@
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException;
import org.apache.accumulo.miniclusterImpl.ProcessReference;
import org.apache.accumulo.test.functional.TabletResourceGroupBalanceIT;
import org.apache.accumulo.test.util.Wait;
import org.apache.accumulo.tserver.TabletServer;
Expand All @@ -74,7 +77,6 @@
import com.google.common.collect.SetMultimap;
import com.google.common.net.HostAndPort;

//@Disabled // ELASTICITY_TODO
public class SuspendedTabletsIT extends AccumuloClusterHarness {
private static final Logger log = LoggerFactory.getLogger(SuspendedTabletsIT.class);
private static ExecutorService THREAD_POOL;
Expand All @@ -83,10 +85,10 @@ public class SuspendedTabletsIT extends AccumuloClusterHarness {
public static final int TSERVERS = 3;
public static final long SUSPEND_DURATION = 80;
public static final int TABLETS = 30;


private String defaultGroup;
private Set<String> testGroup = new HashSet<>();
private List<ProcessReference> tabletServerProcesses;

@Override
protected Duration defaultTimeout() {
Expand All @@ -101,19 +103,26 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration fsCon
cfg.setProperty(Property.TABLE_SUSPEND_DURATION, SUSPEND_DURATION + "s");
cfg.setClientProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, "5s");
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s");
// Start 1 tserver in the default group
cfg.getClusterServerConfiguration().setNumDefaultTabletServers(1);
}



@BeforeEach
public void setUp() throws Exception {

((MiniAccumuloClusterImpl) getCluster()).getConfig().getClusterServerConfiguration().addTabletServerResourceGroup(TEST_GROUP_NAME, 2);

MiniAccumuloClusterImpl mac = (MiniAccumuloClusterImpl) getCluster();
ProcessReference defaultTabletServer =
mac.getProcesses().get(ServerType.TABLET_SERVER).iterator().next();
assertNotNull(defaultTabletServer);

mac.getConfig().getClusterServerConfiguration().addTabletServerResourceGroup(TEST_GROUP_NAME,
2);
getCluster().start();

Map<String, String> hostAndGroup = TabletResourceGroupBalanceIT.getTServerGroups((MiniAccumuloClusterImpl) getCluster());
hostAndGroup.forEach((k,v) -> {
tabletServerProcesses = mac.getProcesses().get(ServerType.TABLET_SERVER).stream()
.filter(p -> !p.equals(defaultTabletServer)).collect(Collectors.toList());

Map<String,String> hostAndGroup = TabletResourceGroupBalanceIT.getTServerGroups(mac);
hostAndGroup.forEach((k, v) -> {
if (v.equals(Constants.DEFAULT_RESOURCE_GROUP_NAME)) {
defaultGroup = k;
} else {
Expand All @@ -123,23 +132,25 @@ public void setUp() throws Exception {

assertNotNull(defaultGroup);
assertEquals(2, testGroup.size());

log.info("TabletServers in default group: {}", defaultGroup);
log.info("TabletServers in {} group: {}", TEST_GROUP_NAME, testGroup);
}

@Test
public void crashAndResumeTserver() throws Exception {
// Run the test body. When we get to the point where we need a tserver to go away, get rid of it
// via crashing
suspensionTestBody((ctx, locs, count) -> {
// Kill the TabletServers in the test group
testGroup.forEach(ts -> {
tabletServerProcesses.forEach(proc -> {
try {
log.info("Calling kill for TabletServer {}", ts);
getCluster().getClusterControl().kill(ServerType.TABLET_SERVER, ts);
} catch (IOException e) {
throw new RuntimeException("Error killing tablet server: " + ts, e);
log.info("Killing processes: {}", proc);
((MiniAccumuloClusterImpl) getCluster()).getClusterControl()
.killProcess(ServerType.TABLET_SERVER, proc);
} catch (ProcessNotFoundException | InterruptedException e) {
throw new RuntimeException("Error killing process: " + proc, e);
}
});

});
}

Expand All @@ -148,7 +159,7 @@ public void shutdownAndResumeTserver() throws Exception {
// Run the test body. When we get to the point where we need tservers to go away, stop them via
// a clean shutdown.
suspensionTestBody((ctx, locs, count) -> {

testGroup.forEach(ts -> {
try {
ThriftClientTypes.MANAGER.executeVoid(ctx, client -> {
Expand All @@ -159,8 +170,9 @@ public void shutdownAndResumeTserver() throws Exception {
throw new RuntimeException("Error calling shutdownTabletServer for " + ts, e);
}
});

try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) {

try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
Wait.waitFor(() -> client.instanceOperations().getTabletServers().size() == 1);
}

Expand All @@ -173,7 +185,8 @@ public void shutdownAndResumeTserver() throws Exception {
* @param serverStopper callback which shuts down some tablet servers.
*/
private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
try (AccumuloClient client = Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
try (AccumuloClient client =
Accumulo.newClient().from(getCluster().getClientProperties()).build()) {
ClientContext ctx = (ClientContext) client;

String tableName = getUniqueNames(1)[0];
Expand All @@ -186,8 +199,7 @@ private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
Map<String,String> properties = new HashMap<>();
properties.put("table.custom.assignment.group", TEST_GROUP_NAME);

NewTableConfiguration ntc = new NewTableConfiguration()
.withSplits(splitPoints)
NewTableConfiguration ntc = new NewTableConfiguration().withSplits(splitPoints)
.withInitialHostingGoal(TabletHostingGoal.ALWAYS);
ntc.setProperties(properties);
ctx.tableOperations().create(tableName, ntc);
Expand All @@ -214,7 +226,7 @@ private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
// and some are hosted on each of the tablet servers other than the one reserved for hosting
// the metadata table.
assertEquals(TSERVERS - 1, ds.hosted.keySet().size());
log.info("Tablets balance verified.");
log.info("Tablet balance verified.");

// Kill two tablet servers hosting our tablets. This should put tablets into suspended state,
// and thus halt balancing.
Expand Down Expand Up @@ -242,11 +254,11 @@ private void suspensionTestBody(TServerKiller serverStopper) throws Exception {
// Restart the first tablet server, making sure it ends up on the same port
HostAndPort restartedServer = deadTabletsByServer.keySet().iterator().next();
log.info("Restarting " + restartedServer);
((MiniAccumuloClusterImpl) getCluster())._exec(TabletServer.class, ServerType.TABLET_SERVER,
((MiniAccumuloClusterImpl) getCluster())._exec(TabletServer.class, ServerType.TABLET_SERVER,
Map.of(Property.TSERV_CLIENTPORT.getKey(), "" + restartedServer.getPort(),
Property.TSERV_PORTSEARCH.getKey(), "false"), "-o",
Property.TSERV_GROUP_NAME.getKey() + "=" + TEST_GROUP_NAME);
Property.TSERV_PORTSEARCH.getKey(), "false"),
"-o", Property.TSERV_GROUP_NAME.getKey() + "=" + TEST_GROUP_NAME);

// Eventually, the suspended tablets should be reassigned to the newly alive tserver.
log.info("Awaiting tablet unsuspension for tablets belonging to " + restartedServer);
while (ds.suspended.containsKey(restartedServer) || ds.assignedCount != 0) {
Expand Down

0 comments on commit b817b71

Please sign in to comment.