Skip to content

Commit

Permalink
Suggestions for graceful shutdown branch (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
DomGarguilo authored Jan 2, 2025
1 parent 0d5f014 commit 8b7b0fa
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -532,12 +531,7 @@ public void refreshProcesses(ServerType type) {
}
break;
case COMPACTOR:
Iterator<Process> iterC = compactorProcesses.iterator();
while (iterC.hasNext()) {
if (!iterC.next().isAlive()) {
iterC.remove();
}
}
compactorProcesses.removeIf(process -> !process.isAlive());
break;
case GARBAGE_COLLECTOR:
if (!gcProcess.isAlive()) {
Expand All @@ -556,20 +550,10 @@ public void refreshProcesses(ServerType type) {
}
break;
case SCAN_SERVER:
Iterator<Process> iterS = scanServerProcesses.iterator();
while (iterS.hasNext()) {
if (!iterS.next().isAlive()) {
iterS.remove();
}
}
scanServerProcesses.removeIf(process -> !process.isAlive());
break;
case TABLET_SERVER:
Iterator<Process> iterT = tabletServerProcesses.iterator();
while (iterT.hasNext()) {
if (!iterT.next().isAlive()) {
iterT.remove();
}
}
tabletServerProcesses.removeIf(process -> !process.isAlive());
break;
case ZOOKEEPER:
if (!zooKeeperProcess.isAlive()) {
Expand All @@ -586,8 +570,7 @@ public Set<Process> getProcesses(ServerType type) {
case COMPACTION_COORDINATOR:
return coordinatorProcess == null ? Set.of() : Set.of(coordinatorProcess);
case COMPACTOR:
return compactorProcesses == null ? Set.of()
: Set.of(compactorProcesses.toArray(new Process[] {}));
return Set.of(compactorProcesses.toArray(new Process[] {}));
case GARBAGE_COLLECTOR:
return gcProcess == null ? Set.of() : Set.of(gcProcess);
case MANAGER:
Expand All @@ -596,8 +579,7 @@ public Set<Process> getProcesses(ServerType type) {
case MONITOR:
return monitor == null ? Set.of() : Set.of(monitor);
case SCAN_SERVER:
return scanServerProcesses == null ? Set.of()
: Set.of(scanServerProcesses.toArray(new Process[] {}));
return Set.of(scanServerProcesses.toArray(new Process[] {}));
case TABLET_SERVER:
return tabletServerProcesses == null ? Set.of()
: Set.of(tabletServerProcesses.toArray(new Process[] {}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock.ServiceLockPath;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
Expand Down Expand Up @@ -147,17 +148,17 @@ public void testGracefulShutdown() throws Exception {
final TableId tid = ctx.getTableId(tableName);

// Insert 10 rows, flush after every row to create 10 files
final BatchWriter writer = client.createBatchWriter(tableName);
for (int i : IntStream.rangeClosed(1, 10).toArray()) {
String val = i + "";
Mutation m = new Mutation(val);
m.put(val, val, val);
writer.addMutation(m);
writer.flush();
client.tableOperations().flush(tableName, null, null, true);
try (BatchWriter writer = client.createBatchWriter(tableName)) {
for (int i : IntStream.rangeClosed(1, 10).toArray()) {
String val = i + "";
Mutation m = new Mutation(val);
m.put(val, val, val);
writer.addMutation(m);
writer.flush();
client.tableOperations().flush(tableName, null, null, true);
}
}
final long numFiles = ctx.getAmple().readTablets().forTable(tid).build().stream()
.mapToLong(tm -> tm.getFiles().size()).sum();
long numFiles = getNumFilesForTable(ctx, tid);
assertEquals(10, numFiles);
client.instanceOperations().waitForBalance();

Expand Down Expand Up @@ -220,9 +221,8 @@ public void testGracefulShutdown() throws Exception {
cc.setIterators(List.of(is));
cc.setWait(false);

final long numFiles2 = ctx.getAmple().readTablets().forTable(tid).build().stream()
.mapToLong(tm -> tm.getFiles().size()).sum();
assertTrue(numFiles2 == numFiles);
final long numFiles2 = getNumFilesForTable(ctx, tid);
assertEquals(numFiles2, numFiles);
assertEquals(0, ExternalCompactionTestUtils.getRunningCompactions(ctx).getCompactionsSize());
client.tableOperations().compact(tableName, cc);
Wait.waitFor(
Expand All @@ -232,8 +232,7 @@ public void testGracefulShutdown() throws Exception {
control.refreshProcesses(ServerType.COMPACTOR);
return control.getProcesses(ServerType.COMPACTOR).isEmpty();
});
final long numFiles3 = ctx.getAmple().readTablets().forTable(tid).build().stream()
.mapToLong(tm -> tm.getFiles().size()).sum();
final long numFiles3 = getNumFilesForTable(ctx, tid);
assertTrue(numFiles3 < numFiles2);
assertEquals(1, numFiles3);

Expand Down Expand Up @@ -266,4 +265,10 @@ public void testGracefulShutdown() throws Exception {
}

}

long getNumFilesForTable(ServerContext ctx, TableId tid) {
try (TabletsMetadata tablets = ctx.getAmple().readTablets().forTable(tid).build()) {
return tablets.stream().mapToLong(tm -> tm.getFiles().size()).sum();
}
}
}

0 comments on commit 8b7b0fa

Please sign in to comment.