Skip to content

Commit

Permalink
Require prevEndRow when conditionally modifying an extent
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Oct 18, 2023
1 parent 0f4d1bb commit bda3e75
Show file tree
Hide file tree
Showing 16 changed files with 148 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public interface ConditionalTabletsMutator extends AutoCloseable {
* @return A fluent interface to conditional mutating a tablet. Ensure you call
* {@link ConditionalTabletMutator#submit(RejectionHandler)} when finished.
*/
OperationRequirements mutateTablet(KeyExtent extent);
OperationRequirements mutateTablet(KeyExtent extent, Text prevEndRow);

/**
* After creating one or more conditional mutations using {@link #mutateTablet(KeyExtent)}, call
Expand Down Expand Up @@ -448,11 +448,6 @@ interface ConditionalTabletMutator extends TabletUpdates<ConditionalTabletMutato
*/
ConditionalTabletMutator requireLocation(Location location);

/**
* Require that a tablet has the specified previous end row.
*/
ConditionalTabletMutator requirePrevEndRow(Text per);

/**
* Requires the tablet to have the specified hosting goal before any changes are made.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ protected AbstractTabletStateStore(ClientContext context) {
public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException {
try (var tabletsMutator = ample.conditionallyMutateTablets()) {
for (Assignment assignment : assignments) {
var conditionalMutator = tabletsMutator.mutateTablet(assignment.tablet)
var conditionalMutator = tabletsMutator
.mutateTablet(assignment.tablet, assignment.tablet.prevEndRow())
.requireLocation(TabletMetadata.Location.future(assignment.server))
.requirePrevEndRow(assignment.tablet.prevEndRow())
.putLocation(TabletMetadata.Location.current(assignment.server))
.deleteLocation(TabletMetadata.Location.future(assignment.server)).deleteSuspension();

Expand Down Expand Up @@ -80,9 +80,9 @@ public void setFutureLocations(Collection<Assignment> assignments)
throws DistributedStoreException {
try (var tabletsMutator = ample.conditionallyMutateTablets()) {
for (Assignment assignment : assignments) {
tabletsMutator.mutateTablet(assignment.tablet).requireAbsentOperation()
.requireAbsentLocation().requirePrevEndRow(assignment.tablet.prevEndRow())
.deleteSuspension().putLocation(TabletMetadata.Location.future(assignment.server))
tabletsMutator.mutateTablet(assignment.tablet, assignment.tablet.prevEndRow())
.requireAbsentOperation().requireAbsentLocation().deleteSuspension()
.putLocation(TabletMetadata.Location.future(assignment.server))
.submit(tabletMetadata -> {
Preconditions.checkArgument(tabletMetadata.getExtent().equals(assignment.tablet));
return tabletMetadata.getLocation() != null && tabletMetadata.getLocation()
Expand Down Expand Up @@ -127,8 +127,8 @@ private void unassign(Collection<TabletMetadata> tablets,
continue;
}

var tabletMutator = tabletsMutator.mutateTablet(tm.getExtent())
.requireLocation(tm.getLocation()).requirePrevEndRow(tm.getExtent().prevEndRow());
var tabletMutator = tabletsMutator.mutateTablet(tm.getExtent(), tm.getExtent().prevEndRow())
.requireLocation(tm.getLocation());

if (tm.hasCurrent()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public void unsuspend(Collection<TabletMetadata> tablets) throws DistributedStor
}

// ELASTICITY_TODO pending #3314, add conditional mutation check that tls.suspend exists
tabletsMutator.mutateTablet(tm.getExtent()).requireAbsentOperation()
.requirePrevEndRow(tm.getExtent().prevEndRow()).deleteSuspension()
tabletsMutator.mutateTablet(tm.getExtent(), tm.getExtent().prevEndRow())
.requireAbsentOperation().deleteSuspension()
.submit(tabletMetadata -> tabletMetadata.getSuspend() == null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow;

import java.util.Objects;
import java.util.function.BiConsumer;
Expand All @@ -46,6 +45,7 @@
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
Expand Down Expand Up @@ -76,14 +76,20 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit
private boolean sawOperationRequirement = false;

protected ConditionalTabletMutatorImpl(Ample.ConditionalTabletsMutator parent,
ServerContext context, KeyExtent extent, Consumer<ConditionalMutation> mutationConsumer,
ServerContext context, KeyExtent extent, Text prevEndRow,
Consumer<ConditionalMutation> mutationConsumer,
BiConsumer<KeyExtent,Ample.RejectionHandler> rejectionHandlerConsumer) {
super(new ConditionalMutation(extent.toMetaRow()));
this.mutation = (ConditionalMutation) super.mutation;
this.mutationConsumer = mutationConsumer;
this.parent = parent;
this.rejectionHandlerConsumer = rejectionHandlerConsumer;
this.extent = extent;

Condition c =
new Condition(PREV_ROW_COLUMN.getColumnFamily(), PREV_ROW_COLUMN.getColumnQualifier())
.setValue(TabletColumnFamily.encodePrevEndRow(prevEndRow).get());
mutation.addCondition(c);
}

@Override
Expand All @@ -107,16 +113,6 @@ public Ample.ConditionalTabletMutator requireLocation(Location location) {
return this;
}

@Override
public Ample.ConditionalTabletMutator requirePrevEndRow(Text per) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Condition c =
new Condition(PREV_ROW_COLUMN.getColumnFamily(), PREV_ROW_COLUMN.getColumnQualifier())
.setValue(encodePrevEndRow(per).get());
mutation.addCondition(c);
return this;
}

@Override
public Ample.ConditionalTabletMutator requireHostingGoal(TabletHostingGoal goal) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Expand Down Expand Up @@ -167,9 +163,6 @@ public Ample.ConditionalTabletMutator requireOperation(TabletOperationId opid) {

private void requireSameSingle(TabletMetadata tabletMetadata, ColumnType type) {
switch (type) {
case PREV_ROW:
requirePrevEndRow(tabletMetadata.getPrevEndRow());
break;
case COMPACT_ID: {
Condition c =
new Condition(COMPACT_COLUMN.getColumnFamily(), COMPACT_COLUMN.getColumnQualifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ConditionalTabletsMutatorImpl(ServerContext context) {
}

@Override
public Ample.OperationRequirements mutateTablet(KeyExtent extent) {
public Ample.OperationRequirements mutateTablet(KeyExtent extent, Text prevEndRow) {
Preconditions.checkState(active);

var dataLevel = Ample.DataLevel.of(extent.tableId());
Expand All @@ -84,7 +84,7 @@ public Ample.OperationRequirements mutateTablet(KeyExtent extent) {

Preconditions.checkState(extents.putIfAbsent(extent.toMetaRow(), extent) == null,
"Duplicate extents not handled");
return new ConditionalTabletMutatorImpl(this, context, extent, mutations::add,
return new ConditionalTabletMutatorImpl(this, context, extent, prevEndRow, mutations::add,
rejectedHandlers::put);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,19 @@ public void testRejectionHandler() {
try (var mutator =
new TestConditionalTabletsMutator(List.of(statuses1::get, statuses2::get), failedExtents)) {

mutator.mutateTablet(ke1).requireAbsentOperation().putDirName("dir1")
mutator.mutateTablet(ke1, ke1.prevEndRow()).requireAbsentOperation().putDirName("dir1")
.submit(tmeta -> tmeta.getDirName().equals("dir1"));

mutator.mutateTablet(ke2).requireAbsentOperation().putDirName("dir3")
mutator.mutateTablet(ke2, ke2.prevEndRow()).requireAbsentOperation().putDirName("dir3")
.submit(tmeta -> tmeta.getDirName().equals("dir3"));

mutator.mutateTablet(ke3).requireAbsentOperation().putDirName("dir4")
mutator.mutateTablet(ke3, ke3.prevEndRow()).requireAbsentOperation().putDirName("dir4")
.submit(tmeta -> tmeta.getDirName().equals("dir4"));

mutator.mutateTablet(ke4).requireAbsentOperation().putDirName("dir5").submit(tmeta -> {
throw new IllegalStateException();
});
mutator.mutateTablet(ke4, ke4.prevEndRow()).requireAbsentOperation().putDirName("dir5")
.submit(tmeta -> {
throw new IllegalStateException();
});

Map<KeyExtent,Ample.ConditionalResult> results = mutator.process();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,9 @@ public void requestTabletHosting(TInfo tinfo, TCredentials credentials, String t
log.info("Tablet hosting requested for: {} ", KeyExtent.fromThrift(e));
KeyExtent ke = KeyExtent.fromThrift(e);
if (recentHostingRequest.getIfPresent(ke) == null) {
mutator.mutateTablet(ke).requireAbsentOperation()
mutator.mutateTablet(ke, ke.prevEndRow()).requireAbsentOperation()
.requireHostingGoal(TabletHostingGoal.ONDEMAND).requireAbsentLocation()
.requirePrevEndRow(ke.prevEndRow()).setHostingRequested()
.submit(TabletMetadata::getHostingRequested);
.setHostingRequested().submit(TabletMetadata::getHostingRequested);
} else {
log.trace("Ignoring hosting request because it was recently requested {}", ke);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,8 +562,8 @@ private ExternalCompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob

// any data that is read from the tablet to make a decision about if it can compact or not
// must be included in the requireSame call
var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
.requireSame(tabletMetadata, PREV_ROW, FILES, SELECTED, ECOMP);
var tabletMutator = tabletsMutator.mutateTablet(extent, tabletMetadata.getPrevEndRow())
.requireAbsentOperation().requireSame(tabletMetadata, FILES, SELECTED, ECOMP);

var ecid = ExternalCompactionId.of(externalCompactionId);
tabletMutator.putExternalCompaction(ecid, ecm);
Expand Down Expand Up @@ -925,8 +925,8 @@ private TabletMetadata commitCompaction(TCompactionStats stats, ExternalCompacti
}

try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
.requireCompaction(ecid).requireSame(tablet, PREV_ROW, FILES, LOCATION);
var tabletMutator = tabletsMutator.mutateTablet(extent, tablet.getPrevEndRow())
.requireAbsentOperation().requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);

if (ecm.getKind() == CompactionKind.USER || ecm.getKind() == CompactionKind.SELECTOR) {
tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
Expand Down Expand Up @@ -1049,8 +1049,8 @@ void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
compactions.forEach((ecid, extent) -> {
try {
ctx.requireNotDeleted(extent.tableId());
tabletsMutator.mutateTablet(extent).requireAbsentOperation().requireCompaction(ecid)
.requirePrevEndRow(extent.prevEndRow()).deleteExternalCompaction(ecid)
tabletsMutator.mutateTablet(extent, extent.prevEndRow()).requireAbsentOperation()
.requireCompaction(ecid).deleteExternalCompaction(ecid)
.submit(tabletMetadata -> !tabletMetadata.getExternalCompactions().containsKey(ecid));
} catch (TableDeletedException e) {
LOG.warn("Table {} was deleted, unable to update metadata for compaction failure.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ private static void removeBulkLoadEntries(Ample ample, TableId tableId, long tid

for (var tablet : tablets) {
if (tablet.getLoaded().values().stream().anyMatch(l -> l == tid)) {
var tabletMutator =
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();
var tabletMutator = tabletsMutator
.mutateTablet(tablet.getExtent(), tablet.getPrevEndRow()).requireAbsentOperation();
tablet.getLoaded().entrySet().stream().filter(entry -> entry.getValue() == tid)
.map(Map.Entry::getKey).forEach(tabletMutator::deleteBulkFile);
tabletMutator.submit(tm -> false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ void load(List<TabletMetadata> tablets, Files files) {

if (!filesToLoad.isEmpty()) {
// ELASTICITY_TODO lets automatically call require prev end row
var tabletMutator =
conditionalMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();
var tabletMutator = conditionalMutator
.mutateTablet(tablet.getExtent(), tablet.getPrevEndRow()).requireAbsentOperation();

if (setTime) {
tabletMutator.requireSame(tablet, PREV_ROW, LOADED, TIME, LOCATION);
tabletMutator.requireSame(tablet, LOADED, TIME, LOCATION);
} else {
tabletMutator.requireSame(tablet, PREV_ROW, LOADED);
tabletMutator.requireSame(tablet, LOADED);
}

filesToLoad.forEach((f, v) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ public long isReady(long tid, Manager manager) throws Exception {
long t1 = System.nanoTime();
for (TabletMetadata tablet : tablets) {
if (tablet.getCompacted().contains(tid)) {
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, PREV_ROW, COMPACTED).deleteCompacted(tid)
tabletsMutator.mutateTablet(tablet.getExtent(), tablet.getPrevEndRow())
.requireAbsentOperation().requireSame(tablet, COMPACTED).deleteCompacted(tid)
.submit(tabletMetadata -> !tabletMetadata.getCompacted().contains(tid));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ public int updateAndCheckTablets(Manager manager, long tid)
log.debug("{} tablet {} has no files, attempting to mark as compacted ",
FateTxId.formatTid(tid), tablet.getExtent());
// this tablet has no files try to mark it as done
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, PREV_ROW, FILES, COMPACTED).putCompacted(tid)
tabletsMutator.mutateTablet(tablet.getExtent(), tablet.getPrevEndRow())
.requireAbsentOperation().requireSame(tablet, FILES, COMPACTED).putCompacted(tid)
.submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid));
} else if (tablet.getSelectedFiles() == null && tablet.getExternalCompactions().isEmpty()) {
// there are no selected files
Expand Down Expand Up @@ -200,12 +200,13 @@ public int updateAndCheckTablets(Manager manager, long tid)

if (filesToCompact.isEmpty()) {
// no files were selected so mark the tablet as compacted
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, PREV_ROW, FILES, SELECTED, ECOMP, COMPACTED).putCompacted(tid)
tabletsMutator.mutateTablet(tablet.getExtent(), tablet.getPrevEndRow())
.requireAbsentOperation().requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED)
.putCompacted(tid)
.submit(tabletMetadata -> tabletMetadata.getCompacted().contains(tid));
} else {
var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, PREV_ROW, FILES, SELECTED, ECOMP, COMPACTED);
var mutator = tabletsMutator.mutateTablet(tablet.getExtent(), tablet.getPrevEndRow())
.requireAbsentOperation().requireSame(tablet, FILES, SELECTED, ECOMP, COMPACTED);
var selectedFiles =
new SelectedFiles(filesToCompact, tablet.getFiles().equals(filesToCompact), tid);

Expand Down Expand Up @@ -308,8 +309,8 @@ private void cleanupTabletMetadata(long tid, Manager manager) throws Exception {
for (TabletMetadata tablet : tablets) {

if (needsUpdate.test(tablet)) {
var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
.requireSame(tablet, PREV_ROW, COMPACTED, SELECTED);
var mutator = tabletsMutator.mutateTablet(tablet.getExtent(), tablet.getPrevEndRow())
.requireAbsentOperation().requireSame(tablet, COMPACTED, SELECTED);
if (tablet.getSelectedFiles() != null
&& tablet.getSelectedFiles().getFateTxId() == tid) {
mutator.deleteSelectedFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public Repo<Manager> call(long tid, Manager manager) throws Exception {
tabletMetadata -> !opid.equals(tabletMetadata.getOperationId());

splitInfo.getTablets().forEach(extent -> {
tabletsMutator.mutateTablet(extent).requireOperation(opid).requireAbsentLocation()
.deleteOperation().submit(rejectionHandler);
tabletsMutator.mutateTablet(extent, extent.prevEndRow()).requireOperation(opid)
.requireAbsentLocation().deleteOperation().submit(rejectionHandler);
});

var results = tabletsMutator.process();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public long isReady(long tid, Manager manager) throws Exception {
} else {
try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {

tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireAbsentOperation()
.requireSame(tabletMetadata, LOCATION, PREV_ROW).putOperation(opid)
tabletsMutator.mutateTablet(splitInfo.getOriginal(), tabletMetadata.getPrevEndRow())
.requireAbsentOperation().requireSame(tabletMetadata, LOCATION).putOperation(opid)
.submit(tmeta -> opid.equals(tmeta.getOperationId()));

Map<KeyExtent,Ample.ConditionalResult> results = tabletsMutator.process();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ private void addNewTablets(long tid, Manager manager, TabletMetadata tabletMetad
continue;
}

var mutator = tabletsMutator.mutateTablet(newExtent).requireAbsentTablet();
var mutator =
tabletsMutator.mutateTablet(newExtent, newExtent.prevEndRow()).requireAbsentTablet();

mutator.putOperation(opid);
mutator.putDirName(dirNameIter.next());
Expand Down Expand Up @@ -216,8 +217,9 @@ private void updateExistingTablet(long tid, Manager manager, TabletMetadata tabl
try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
var newExtent = newTablets.last();

var mutator = tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireOperation(opid)
.requirePrevEndRow(splitInfo.getOriginal().prevEndRow()).requireAbsentLocation();
var mutator =
tabletsMutator.mutateTablet(splitInfo.getOriginal(), splitInfo.getOriginal().prevEndRow())
.requireOperation(opid).requireAbsentLocation();

mutator.putPrevEndRow(newExtent.prevEndRow());

Expand Down
Loading

0 comments on commit bda3e75

Please sign in to comment.