Skip to content

Commit

Permalink
Fixed FlushNoFileIT, re-enabled the test (apache#4064)
Browse files Browse the repository at this point in the history
The test was failing because it was expecting that all tablets
had the same flushID. However, it is only inserting data into
one tablet in a tablet with the ONDEMAND tablet availability.
Since only one tablet was hosted, only that tablet has a flushID
and the others had none.
  • Loading branch information
dlmarion authored Dec 13, 2023
1 parent 6596c5c commit c9d6b95
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeSet;

Expand All @@ -35,27 +36,28 @@
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletHostingGoal;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.IteratorUtil;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import com.google.common.collect.Iterables;

/**
* Tests that Accumulo will flush but not create a file that has 0 entries.
*/
@Disabled // ELASTICITY_TODO
public class FlushNoFileIT extends AccumuloClusterHarness {

@Override
Expand Down Expand Up @@ -89,7 +91,21 @@ public void test() throws Exception {

FunctionalTestUtils.checkRFiles(c, tableName, 3, 3, 0, 0);

long flushId = FunctionalTestUtils.checkFlushId((ClientContext) c, tableId, 0);
Map<KeyExtent,OptionalLong> flushIds =
FunctionalTestUtils.getFlushIds((ClientContext) c, tableId);
assertEquals(3, flushIds.size());
// There are three tablets in this table, but the batchWriter above only wrote to
// one of the tablets. The table is using the default tablet availability (ONDEMAND),
// so only one of the tablets was hosted and flushed. The other two tablets won't
// have a flushId.
KeyExtent extentWithData = new KeyExtent(tableId, new Text("s"), new Text("a"));
flushIds.forEach((k, v) -> {
if (k.equals(extentWithData)) {
assertEquals(1, v.getAsLong());
} else {
assertTrue(v.isEmpty());
}
});

try (BatchWriter bw = c.createBatchWriter(tableName)) {
Mutation m = new Mutation(new Text("r2"));
Expand All @@ -101,8 +117,31 @@ public void test() throws Exception {

FunctionalTestUtils.checkRFiles(c, tableName, 3, 3, 0, 0);

long secondFlushId = FunctionalTestUtils.checkFlushId((ClientContext) c, tableId, flushId);
assertTrue(secondFlushId > flushId, "Flush ID did not change");
flushIds = FunctionalTestUtils.getFlushIds((ClientContext) c, tableId);
assertEquals(3, flushIds.size());
flushIds.forEach((k, v) -> {
if (k.equals(extentWithData)) {
assertEquals(2, v.getAsLong());
} else {
assertTrue(v.isEmpty());
}
});

// Host all tablets
c.tableOperations().setTabletHostingGoal(tableName, new Range(), TabletHostingGoal.ALWAYS);
// Wait for all tablets to be hosted
Wait.waitFor(() -> ManagerAssignmentIT.countTabletsWithLocation(c, tableId) == 3);

// Flush and validate that all flushIds are the same
c.tableOperations().flush(tableName, null, null, true);

FunctionalTestUtils.checkRFiles(c, tableName, 3, 3, 0, 0);

flushIds = FunctionalTestUtils.getFlushIds((ClientContext) c, tableId);
assertEquals(3, flushIds.size());
flushIds.forEach((k, v) -> {
assertEquals(3, v.getAsLong());
});

try (Scanner scanner = c.createScanner(tableName)) {
assertEquals(0, Iterables.size(scanner), "Expected 0 Entries in table");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.OptionalLong;
import java.util.Set;
Expand All @@ -56,6 +57,7 @@
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.AdminUtil;
import org.apache.accumulo.core.fate.AdminUtil.FateStatus;
import org.apache.accumulo.core.fate.ZooStore;
Expand Down Expand Up @@ -237,31 +239,19 @@ private static FateStatus getFateStatus(AccumuloCluster cluster) {
/**
* Verify that flush ID gets updated properly and is the same for all tablets.
*/
static long checkFlushId(ClientContext c, TableId tableId, long prevFlushID) throws Exception {
static Map<KeyExtent,OptionalLong> getFlushIds(ClientContext c, TableId tableId)
throws Exception {

Map<KeyExtent,OptionalLong> flushValues = new HashMap<>();

try (TabletsMetadata metaScan =
c.getAmple().readTablets().forTable(tableId).fetch(FLUSH_ID).checkConsistency().build()) {

long flushId = 0, prevTabletFlushId = 0;
for (TabletMetadata tabletMetadata : metaScan) {
OptionalLong optFlushId = tabletMetadata.getFlushId();
if (optFlushId.isPresent()) {
flushId = optFlushId.getAsLong();
if (prevTabletFlushId > 0 && prevTabletFlushId != flushId) {
throw new Exception("Flush ID different between tablets");
} else {
prevTabletFlushId = flushId;
}
} else {
throw new Exception("Missing flush ID");
}
}

if (prevFlushID >= flushId) {
throw new Exception(
"Flush ID did not increase. prevFlushID: " + prevFlushID + " current: " + flushId);
flushValues.put(tabletMetadata.getExtent(), tabletMetadata.getFlushId());
}

return flushId;
}
return flushValues;
}
}

0 comments on commit c9d6b95

Please sign in to comment.