Skip to content

Commit

Permalink
Fix MemoryStarved ITs (apache#4062)
Browse files Browse the repository at this point in the history
In apache#3951 the ScanServer and Compactor were modified to advertise
the Thrift Client service. Since then, the freeServerMemory method
would connect to any TabletServer, Compactor, or ScanServer during
the test. However, we need it to only connect to the TabletServer. Modified
freeServerMemory to only connect to the TabletServer and introduced
a wait in MemoryStarvedMajCIT as it was trying to connect before
the Compactor was fully up.
  • Loading branch information
dlmarion authored Dec 13, 2023
1 parent 4276d14 commit 6485294
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
Expand Down Expand Up @@ -165,7 +166,15 @@ public void testMajCPauses() throws Exception {
// Calling getRunningCompaction on the MemoryConsumingCompactor
// will consume the free memory
LOG.info("Calling getRunningCompaction on {}", compactorAddr);
ExternalCompactionUtil.getRunningCompaction(compactorAddr, ctx);
boolean success = false;
while (!success) {
try {
ExternalCompactionUtil.getRunningCompaction(compactorAddr, ctx);
success = true;
} catch (Exception e) {
UtilWaitThread.sleep(3000);
}
}

ReadWriteIT.ingest(client, 100, 100, 100, 0, table);
compactionThread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -33,18 +34,30 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.DoubleAdder;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.TableOperations;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ThriftTransportKey;
import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
import org.apache.accumulo.core.iterators.WrappingIterator;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.MemoryUnit;
Expand All @@ -54,6 +67,7 @@
import org.apache.accumulo.test.metrics.TestStatsDSink;
import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
import org.apache.hadoop.conf.Configuration;
import org.apache.thrift.transport.TTransport;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -173,10 +187,25 @@ private void consumeServerMemory(BatchScanner scanner) {
}

static void freeServerMemory(AccumuloClient client) throws Exception {
// Instantiating this class on the TabletServer will free the memory as it
// frees the buffers created by the MemoryConsumingIterator in its constructor.
client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(),

final ClientContext context = (ClientContext) client;
final long rpcTimeout = context.getClientTimeoutInMillis();
final ArrayList<ThriftTransportKey> servers = new ArrayList<>();
final String serverPath = context.getZooKeeperRoot() + Constants.ZTSERVERS;
final ZooCache zc = context.getZooCache();

for (String server : zc.getChildren(serverPath)) {
ServiceLockPath zLocPath = ServiceLock.path(serverPath + "/" + server);
zc.getLockData(zLocPath).map(sld -> sld.getAddress(ThriftService.CLIENT))
.map(address -> new ThriftTransportKey(address, rpcTimeout, context))
.ifPresent(servers::add);
}

Pair<String,TTransport> pair = context.getTransportPool().getAnyTransport(servers, false);
Client clientService = ThriftUtil.createClient(ThriftClientTypes.CLIENT, pair.getSecond());
clientService.checkClass(new TInfo(), context.rpcCreds(), MemoryFreeingIterator.class.getName(),
WrappingIterator.class.getName());

}

@Test
Expand Down

0 comments on commit 6485294

Please sign in to comment.