diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java index 9ec1094ea73..98222a5fa6a 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedMajCIT.java @@ -35,6 +35,7 @@ import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -165,7 +166,15 @@ public void testMajCPauses() throws Exception { // Calling getRunningCompaction on the MemoryConsumingCompactor // will consume the free memory LOG.info("Calling getRunningCompaction on {}", compactorAddr); - ExternalCompactionUtil.getRunningCompaction(compactorAddr, ctx); + boolean success = false; + while (!success) { + try { + ExternalCompactionUtil.getRunningCompaction(compactorAddr, ctx); + success = true; + } catch (Exception e) { + UtilWaitThread.sleep(3000); + } + } ReadWriteIT.ingest(client, 100, 100, 100, 0, table); compactionThread.start(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java index 5d71a2fed59..8273f8e5b85 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -33,18 +34,30 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.DoubleAdder; +import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.ThriftTransportKey; +import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.MemoryUnit; @@ -54,6 +67,7 @@ import org.apache.accumulo.test.metrics.TestStatsDSink; import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.transport.TTransport; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -173,10 +187,25 @@ private void consumeServerMemory(BatchScanner scanner) { } static void freeServerMemory(AccumuloClient client) throws Exception { - // Instantiating this class on the TabletServer will free the memory as it - // frees the buffers created by the MemoryConsumingIterator in its constructor. - client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(), + + final ClientContext context = (ClientContext) client; + final long rpcTimeout = context.getClientTimeoutInMillis(); + final ArrayList servers = new ArrayList<>(); + final String serverPath = context.getZooKeeperRoot() + Constants.ZTSERVERS; + final ZooCache zc = context.getZooCache(); + + for (String server : zc.getChildren(serverPath)) { + ServiceLockPath zLocPath = ServiceLock.path(serverPath + "/" + server); + zc.getLockData(zLocPath).map(sld -> sld.getAddress(ThriftService.CLIENT)) + .map(address -> new ThriftTransportKey(address, rpcTimeout, context)) + .ifPresent(servers::add); + } + + Pair pair = context.getTransportPool().getAnyTransport(servers, false); + Client clientService = ThriftUtil.createClient(ThriftClientTypes.CLIENT, pair.getSecond()); + clientService.checkClass(new TInfo(), context.rpcCreds(), MemoryFreeingIterator.class.getName(), WrappingIterator.class.getName()); + } @Test