Skip to content

Commit

Permalink
persist time to tablet in bulk update (#4072)
Browse files Browse the repository at this point in the history
When bulk import operations set time and a tablet was hosted the time
was not persisted.  The bulk import fate operation now persist time in
tablet metadata.  The tablet code assumed it was the only thing
updating a tablets time field.  The tablet code was modified to
accomodate the bulk import code running in the manager updating the
tablets time column in the metadata table.
  • Loading branch information
keith-turner authored Feb 21, 2024
1 parent 08cdb0e commit 4378e02
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ interface TabletUpdates<T> {

T putFlushId(long flushId);

T putFlushNonce(long flushNonce);

T putLocation(Location location);

T deleteLocation(Location location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ public static void validateDirCol(String dirName) {
public static final String FLUSH_QUAL = "flush";
public static final ColumnFQ FLUSH_COLUMN = new ColumnFQ(NAME, new Text(FLUSH_QUAL));

/**
* Holds a nonce that is written when a new flush file is added. The nonce is used to check if
* the write was successful in failure cases. The value is a random 64bit integer.
*/
public static final String FLUSH_NONCE_QUAL = "flonce";
public static final ColumnFQ FLUSH_NONCE_COLUMN =
new ColumnFQ(NAME, new Text(FLUSH_NONCE_QUAL));

/**
* Holds lock IDs to enable a sanity check to ensure that the TServer writing to the metadata
* tablet is not dead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.core.metadata.schema;

import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_NONCE_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_QUAL;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_QUAL;
Expand Down Expand Up @@ -108,6 +109,7 @@ public class TabletMetadata {
private String cloned;
private SortedMap<Key,Value> keyValues;
private OptionalLong flush = OptionalLong.empty();
private OptionalLong flushNonce = OptionalLong.empty();
private List<LogEntry> logs;
private Map<ExternalCompactionId,CompactionMetadata> extCompactions;
private boolean merged;
Expand Down Expand Up @@ -136,6 +138,7 @@ public enum ColumnType {
TIME,
CLONED,
FLUSH_ID,
FLUSH_NONCE,
LOGS,
SUSPEND,
ECOMP,
Expand Down Expand Up @@ -345,6 +348,11 @@ public OptionalLong getFlushId() {
return flush;
}

public OptionalLong getFlushNonce() {
ensureFetched(ColumnType.FLUSH_NONCE);
return flushNonce;
}

public boolean hasMerged() {
ensureFetched(ColumnType.MERGED);
return merged;
Expand Down Expand Up @@ -476,6 +484,9 @@ public static <E extends Entry<Key,Value>> TabletMetadata convertRow(Iterator<E>
case FLUSH_QUAL:
te.flush = OptionalLong.of(Long.parseLong(val));
break;
case FLUSH_NONCE_QUAL:
te.flushNonce = OptionalLong.of(Long.parseUnsignedLong(val, 16));
break;
case OPID_QUAL:
te.setOperationIdOnce(val, suppressLocationError);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_NONCE;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_REQUESTED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED;
import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION;
Expand Down Expand Up @@ -122,6 +123,13 @@ public TabletMetadataBuilder putFlushId(long flushId) {
return this;
}

@Override
public TabletMetadataBuilder putFlushNonce(long flushNonce) {
fetched.add(FLUSH_NONCE);
internalBuilder.putFlushId(flushNonce);
return this;
}

@Override
public TabletMetadataBuilder putLocation(TabletMetadata.Location location) {
fetched.add(LOCATION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ public T putFlushId(long flushId) {
return getThis();
}

@Override
public T putFlushNonce(long flushNonce) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
ServerColumnFamily.FLUSH_NONCE_COLUMN.put(mutation, new Value(Long.toHexString(flushNonce)));
return getThis();
}

@Override
public T putTime(MetadataTime time) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class MetadataConstraints implements Constraint {
ServerColumnFamily.TIME_COLUMN,
ServerColumnFamily.LOCK_COLUMN,
ServerColumnFamily.FLUSH_COLUMN,
ServerColumnFamily.FLUSH_NONCE_COLUMN,
ServerColumnFamily.OPID_COLUMN,
TabletColumnFamily.AVAILABILITY_COLUMN,
TabletColumnFamily.REQUESTED_COLUMN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

public abstract class TabletTime {

public abstract void useMaxTimeFromWALog(long time);
public abstract void updateTimeIfGreater(long time);

public abstract MetadataTime getMetadataTime();

Expand Down Expand Up @@ -86,7 +86,7 @@ public MetadataTime getMetadataTime(long time) {
}

@Override
public void useMaxTimeFromWALog(long time) {
public void updateTimeIfGreater(long time) {
if (time > lastTime) {
lastTime = time;
}
Expand Down Expand Up @@ -155,7 +155,7 @@ private LogicalTime(Long time) {
}

@Override
public void useMaxTimeFromWALog(long time) {
public void updateTimeIfGreater(long time) {
time++;

if (this.nextTime.get() < time) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ public void testGetMetadataValue() {

@Test
public void testUseMaxTimeFromWALog_Update() {
ltime.useMaxTimeFromWALog(5678L);
ltime.updateTimeIfGreater(5678L);
assertEquals("L5678", ltime.getMetadataTime().encode());
}

@Test
public void testUseMaxTimeFromWALog_NoUpdate() {
ltime.useMaxTimeFromWALog(0L);
ltime.updateTimeIfGreater(0L);
assertEquals("L1234", ltime.getMetadataTime().encode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ public void testGetMetadataValue() {

@Test
public void testUseMaxTimeFromWALog_Yes() {
mtime.useMaxTimeFromWALog(5678L);
mtime.updateTimeIfGreater(5678L);
assertEquals("M5678", mtime.getMetadataTime().encode());
}

@Test
public void testUseMaxTimeFromWALog_No() {
mtime.useMaxTimeFromWALog(0L);
mtime.updateTimeIfGreater(0L);
assertEquals("M1234", mtime.getMetadataTime().encode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ void load(List<TabletMetadata> tablets, Files files) {
dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(),
tabletTime.getAndUpdateTime());
} else {
long fileTime = hostedTimestamps.get(tablet.getExtent()) + timeOffset;
dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(),
hostedTimestamps.get(tablet.getExtent()) + timeOffset);
fileTime);
tabletTime.updateTimeIfGreater(fileTime);
timeOffset++;
}
} else {
Expand All @@ -193,15 +195,21 @@ void load(List<TabletMetadata> tablets, Files files) {
});

if (!filesToLoad.isEmpty()) {
var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent())
.requireAbsentOperation().requireSame(tablet, LOADED, TIME, LOCATION);
var tabletMutator =
conditionalMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation();

if (setTime) {
tabletMutator.requireSame(tablet, LOADED, TIME, LOCATION);
} else {
tabletMutator.requireSame(tablet, LOADED, LOCATION);
}

filesToLoad.forEach((f, v) -> {
tabletMutator.putBulkFile(f, fateId);
tabletMutator.putFile(f, v);
});

if (setTime && tablet.getLocation() == null) {
if (setTime) {
tabletMutator.putTime(tabletTime.getMetadataTime());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toList;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.Pair;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.compaction.CompactionStats;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.problems.ProblemReport;
Expand Down Expand Up @@ -270,7 +272,7 @@ public Tablet(final TabletServer tabletServer, final KeyExtent extent,
});

if (maxTime.get() != Long.MIN_VALUE) {
tabletTime.useMaxTimeFromWALog(maxTime.get());
tabletTime.updateTimeIfGreater(maxTime.get());
}
commitSession.updateMaxCommittedTime(tabletTime.getTime());

Expand Down Expand Up @@ -1301,63 +1303,76 @@ public Optional<StoredTabletFile> updateTabletDataFile(long maxCommittedTime,
// Read these once in case of buggy race conditions will get consistent logging. If all other
// code is locking properly these should not change during this method.
var lastTabletMetadata = getMetadata();
var expectedTime = lastTabletMetadata.getTime();

// Expect time to only move forward from what was recently seen in metadata table.
Preconditions.checkArgument(maxCommittedTime >= expectedTime.getTime());

// The tablet time is used to determine if the write succeeded, in order to do this the tablet
// time needs to be different from what is currently stored in the metadata table.
while (maxCommittedTime == expectedTime.getTime()) {
var nextTime = tabletTime.getAndUpdateTime();
Preconditions.checkState(nextTime >= maxCommittedTime);
if (nextTime > maxCommittedTime) {
maxCommittedTime++;
}
}

try (var tabletsMutator = getContext().getAmple().conditionallyMutateTablets()) {

var expectedLocation = mincReason == MinorCompactionReason.RECOVERY
? Location.future(tabletServer.getTabletSession())
: Location.current(tabletServer.getTabletSession());
while (true) {
try (var tabletsMutator = getContext().getAmple().conditionallyMutateTablets()) {

var tablet = tabletsMutator.mutateTablet(extent).requireLocation(expectedLocation)
.requireSame(lastTabletMetadata, ColumnType.TIME);
var expectedLocation = mincReason == MinorCompactionReason.RECOVERY
? Location.future(tabletServer.getTabletSession())
: Location.current(tabletServer.getTabletSession());

Optional<StoredTabletFile> newFile = Optional.empty();
var tablet = tabletsMutator.mutateTablet(extent).requireLocation(expectedLocation);

// if entries are present, write to path to metadata table
if (dfv.getNumEntries() > 0) {
tablet.putFile(newDatafile, dfv);
newFile = Optional.of(newDatafile.insert());
}
Optional<StoredTabletFile> newFile = Optional.empty();

var newTime = tabletTime.getMetadataTime(maxCommittedTime);
tablet.putTime(newTime);
// if entries are present, write to path to metadata table
if (dfv.getNumEntries() > 0) {
tablet.putFile(newDatafile, dfv);
newFile = Optional.of(newDatafile.insert());
}

tablet.putFlushId(flushId);
boolean setTime = false;
// bulk imports can also update time in the metadata table, so only update if we are moving
// time forward
if (maxCommittedTime > lastTabletMetadata.getTime().getTime()) {
tablet.requireSame(lastTabletMetadata, ColumnType.TIME);
var newTime = tabletTime.getMetadataTime(maxCommittedTime);
tablet.putTime(newTime);
setTime = true;
}

unusedWalLogs.forEach(tablet::deleteWal);
tablet.putFlushId(flushId);

tablet.putZooLock(getContext().getZooKeeperRoot(), tabletServer.getLock());
long flushNonce = RANDOM.get().nextLong();
tablet.putFlushNonce(flushNonce);

// When trying to determine if write was successful, check if the time was updated. Can not
// check if the new file exists because of two reasons. First, it could be compacted away
// between the write and check. Second, some flushes do not produce a file.
tablet.submit(tabletMetadata -> tabletMetadata.getTime().equals(newTime));
unusedWalLogs.forEach(tablet::deleteWal);

var result = tabletsMutator.process().get(extent);
if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
tablet.putZooLock(getContext().getZooKeeperRoot(), tabletServer.getLock());

log.error("Metadata for failed tablet file update : {}", result.readMetadata());
// When trying to determine if write was successful, check if the flush nonce was updated.
// Can not check if the new file exists because of two reasons. First, it could be compacted
// away between the write and check. Second, some flushes do not produce a file.
tablet.submit(tabletMetadata -> {
// ELASTICITY_TODO need to test this, need a general way of testing these failure checks
var persistedNonce = tabletMetadata.getFlushNonce();
if (persistedNonce.isPresent()) {
return persistedNonce.getAsLong() == flushNonce;
}
return false;
});

// Include the things that could have caused the write to fail.
throw new IllegalStateException("Unable to write minor compaction. " + extent + " "
+ expectedLocation + " " + expectedTime);
var result = tabletsMutator.process().get(extent);
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
return newFile;
} else {
var updatedTableMetadata = result.readMetadata();
if (setTime && expectedLocation.equals(updatedTableMetadata.getLocation())
&& !lastTabletMetadata.getTime().equals(updatedTableMetadata.getTime())) {
// ELASTICITY_TODO need to test this
// The update failed because the time changed, so lets try again.
log.debug("Failed to add {} to {} because time changed {}!={}, will retry", newFile,
extent, lastTabletMetadata.getTime(), updatedTableMetadata.getTime());
lastTabletMetadata = updatedTableMetadata;
UtilWaitThread.sleep(1000);
} else {
log.error("Metadata for failed tablet file update : {}", updatedTableMetadata);
// Include the things that could have caused the write to fail.
throw new IllegalStateException(
"Unable to add file to tablet. " + extent + " " + expectedLocation);
}
}
}

return newFile;
}
}

Expand Down Expand Up @@ -1540,13 +1555,21 @@ public void refreshMetadata(RefreshPurpose refreshPurpose) {
var prevMetadata = latestMetadata;
latestMetadata = tabletMetadata;

// Its expected that what is persisted should be less than equal to the time that tablet has
// in memory.
Preconditions.checkState(tabletMetadata.getTime().getTime() <= tabletTime.getTime(),
"Time in metadata is ahead of tablet %s memory:%s metadata:%s", extent, tabletTime,
tabletMetadata.getTime());

if (log.isDebugEnabled() && !prevMetadata.getFiles().equals(latestMetadata.getFiles())) {
SetView<StoredTabletFile> removed =
Sets.difference(prevMetadata.getFiles(), latestMetadata.getFiles());
SetView<StoredTabletFile> added =
Sets.difference(latestMetadata.getFiles(), prevMetadata.getFiles());
log.debug("Tablet {} was refreshed. Files removed: {} Files added: {}", this.getExtent(),
removed, added);
log.debug("Tablet {} was refreshed because {}. Files removed: [{}] Files added: [{}]",
this.getExtent(), refreshPurpose,
removed.stream().map(StoredTabletFile::getFileName).collect(Collectors.joining(",")),
added.stream().map(StoredTabletFile::getFileName).collect(Collectors.joining(",")));
}

if (refreshPurpose == RefreshPurpose.MINC_COMPLETION) {
Expand Down
Loading

0 comments on commit 4378e02

Please sign in to comment.