diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 47bd5e4d1d7..18d54d1f252 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -358,6 +358,8 @@ interface TabletUpdates { T putFlushId(long flushId); + T putFlushNonce(long flushNonce); + T putLocation(Location location); T deleteLocation(Location location); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index d024fbfd59f..852eb257c8b 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -235,6 +235,14 @@ public static void validateDirCol(String dirName) { public static final String FLUSH_QUAL = "flush"; public static final ColumnFQ FLUSH_COLUMN = new ColumnFQ(NAME, new Text(FLUSH_QUAL)); + /** + * Holds a nonce that is written when a new flush file is added. The nonce is used to check if + * the write was successful in failure cases. The value is a random 64bit integer. + */ + public static final String FLUSH_NONCE_QUAL = "flonce"; + public static final ColumnFQ FLUSH_NONCE_COLUMN = + new ColumnFQ(NAME, new Text(FLUSH_NONCE_QUAL)); + /** * Holds lock IDs to enable a sanity check to ensure that the TServer writing to the metadata * tablet is not dead diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index 5b426952a48..693ae04b62f 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.metadata.schema; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_QUAL; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_NONCE_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.SELECTED_QUAL; @@ -108,6 +109,7 @@ public class TabletMetadata { private String cloned; private SortedMap keyValues; private OptionalLong flush = OptionalLong.empty(); + private OptionalLong flushNonce = OptionalLong.empty(); private List logs; private Map extCompactions; private boolean merged; @@ -136,6 +138,7 @@ public enum ColumnType { TIME, CLONED, FLUSH_ID, + FLUSH_NONCE, LOGS, SUSPEND, ECOMP, @@ -345,6 +348,11 @@ public OptionalLong getFlushId() { return flush; } + public OptionalLong getFlushNonce() { + ensureFetched(ColumnType.FLUSH_NONCE); + return flushNonce; + } + public boolean hasMerged() { ensureFetched(ColumnType.MERGED); return merged; @@ -476,6 +484,9 @@ public static > TabletMetadata convertRow(Iterator case FLUSH_QUAL: te.flush = OptionalLong.of(Long.parseLong(val)); break; + case FLUSH_NONCE_QUAL: + te.flushNonce = OptionalLong.of(Long.parseUnsignedLong(val, 16)); + break; case OPID_QUAL: te.setOperationIdOnce(val, suppressLocationError); break; diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java index 75adaabe518..15583d42ae3 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java @@ -24,6 +24,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_NONCE; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.HOSTING_REQUESTED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; @@ -122,6 +123,13 @@ public TabletMetadataBuilder putFlushId(long flushId) { return this; } + @Override + public TabletMetadataBuilder putFlushNonce(long flushNonce) { + fetched.add(FLUSH_NONCE); + internalBuilder.putFlushId(flushNonce); + return this; + } + @Override public TabletMetadataBuilder putLocation(TabletMetadata.Location location) { fetched.add(LOCATION); diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java index 49c88570da7..04dd6baa508 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java @@ -133,6 +133,13 @@ public T putFlushId(long flushId) { return getThis(); } + @Override + public T putFlushNonce(long flushNonce) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + ServerColumnFamily.FLUSH_NONCE_COLUMN.put(mutation, new Value(Long.toHexString(flushNonce))); + return getThis(); + } + @Override public T putTime(MetadataTime time) { Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index 5851e7224e6..90eb3b6f0a6 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -89,6 +89,7 @@ public class MetadataConstraints implements Constraint { ServerColumnFamily.TIME_COLUMN, ServerColumnFamily.LOCK_COLUMN, ServerColumnFamily.FLUSH_COLUMN, + ServerColumnFamily.FLUSH_NONCE_COLUMN, ServerColumnFamily.OPID_COLUMN, TabletColumnFamily.AVAILABILITY_COLUMN, TabletColumnFamily.REQUESTED_COLUMN, diff --git a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java index 7c997d3e181..ab882625c7e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java +++ b/server/base/src/main/java/org/apache/accumulo/server/tablets/TabletTime.java @@ -29,7 +29,7 @@ public abstract class TabletTime { - public abstract void useMaxTimeFromWALog(long time); + public abstract void updateTimeIfGreater(long time); public abstract MetadataTime getMetadataTime(); @@ -86,7 +86,7 @@ public MetadataTime getMetadataTime(long time) { } @Override - public void useMaxTimeFromWALog(long time) { + public void updateTimeIfGreater(long time) { if (time > lastTime) { lastTime = time; } @@ -155,7 +155,7 @@ private LogicalTime(Long time) { } @Override - public void useMaxTimeFromWALog(long time) { + public void updateTimeIfGreater(long time) { time++; if (this.nextTime.get() < time) { diff --git a/server/base/src/test/java/org/apache/accumulo/server/tablets/LogicalTimeTest.java b/server/base/src/test/java/org/apache/accumulo/server/tablets/LogicalTimeTest.java index d887b1457fb..b1629eb1441 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/tablets/LogicalTimeTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/tablets/LogicalTimeTest.java @@ -49,13 +49,13 @@ public void testGetMetadataValue() { @Test public void testUseMaxTimeFromWALog_Update() { - ltime.useMaxTimeFromWALog(5678L); + ltime.updateTimeIfGreater(5678L); assertEquals("L5678", ltime.getMetadataTime().encode()); } @Test public void testUseMaxTimeFromWALog_NoUpdate() { - ltime.useMaxTimeFromWALog(0L); + ltime.updateTimeIfGreater(0L); assertEquals("L1234", ltime.getMetadataTime().encode()); } diff --git a/server/base/src/test/java/org/apache/accumulo/server/tablets/MillisTimeTest.java b/server/base/src/test/java/org/apache/accumulo/server/tablets/MillisTimeTest.java index 1f0d087d255..6720e30156c 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/tablets/MillisTimeTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/tablets/MillisTimeTest.java @@ -49,13 +49,13 @@ public void testGetMetadataValue() { @Test public void testUseMaxTimeFromWALog_Yes() { - mtime.useMaxTimeFromWALog(5678L); + mtime.updateTimeIfGreater(5678L); assertEquals("M5678", mtime.getMetadataTime().encode()); } @Test public void testUseMaxTimeFromWALog_No() { - mtime.useMaxTimeFromWALog(0L); + mtime.updateTimeIfGreater(0L); assertEquals("M1234", mtime.getMetadataTime().encode()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java index 4cb42f01cd8..d336358700f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java @@ -175,8 +175,10 @@ void load(List tablets, Files files) { dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), tabletTime.getAndUpdateTime()); } else { + long fileTime = hostedTimestamps.get(tablet.getExtent()) + timeOffset; dfv = new DataFileValue(fileInfo.getEstFileSize(), fileInfo.getEstNumEntries(), - hostedTimestamps.get(tablet.getExtent()) + timeOffset); + fileTime); + tabletTime.updateTimeIfGreater(fileTime); timeOffset++; } } else { @@ -193,15 +195,21 @@ void load(List tablets, Files files) { }); if (!filesToLoad.isEmpty()) { - var tabletMutator = conditionalMutator.mutateTablet(tablet.getExtent()) - .requireAbsentOperation().requireSame(tablet, LOADED, TIME, LOCATION); + var tabletMutator = + conditionalMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation(); + + if (setTime) { + tabletMutator.requireSame(tablet, LOADED, TIME, LOCATION); + } else { + tabletMutator.requireSame(tablet, LOADED, LOCATION); + } filesToLoad.forEach((f, v) -> { tabletMutator.putBulkFile(f, fateId); tabletMutator.putFile(f, v); }); - if (setTime && tablet.getLocation() == null) { + if (setTime) { tabletMutator.putTime(tabletTime.getMetadataTime()); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 323176f5c88..a0f8b958573 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -21,6 +21,7 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.toList; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import java.io.FileNotFoundException; import java.io.IOException; @@ -75,6 +76,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.compaction.CompactionStats; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.problems.ProblemReport; @@ -270,7 +272,7 @@ public Tablet(final TabletServer tabletServer, final KeyExtent extent, }); if (maxTime.get() != Long.MIN_VALUE) { - tabletTime.useMaxTimeFromWALog(maxTime.get()); + tabletTime.updateTimeIfGreater(maxTime.get()); } commitSession.updateMaxCommittedTime(tabletTime.getTime()); @@ -1301,63 +1303,76 @@ public Optional updateTabletDataFile(long maxCommittedTime, // Read these once in case of buggy race conditions will get consistent logging. If all other // code is locking properly these should not change during this method. var lastTabletMetadata = getMetadata(); - var expectedTime = lastTabletMetadata.getTime(); - - // Expect time to only move forward from what was recently seen in metadata table. - Preconditions.checkArgument(maxCommittedTime >= expectedTime.getTime()); - - // The tablet time is used to determine if the write succeeded, in order to do this the tablet - // time needs to be different from what is currently stored in the metadata table. - while (maxCommittedTime == expectedTime.getTime()) { - var nextTime = tabletTime.getAndUpdateTime(); - Preconditions.checkState(nextTime >= maxCommittedTime); - if (nextTime > maxCommittedTime) { - maxCommittedTime++; - } - } - - try (var tabletsMutator = getContext().getAmple().conditionallyMutateTablets()) { - var expectedLocation = mincReason == MinorCompactionReason.RECOVERY - ? Location.future(tabletServer.getTabletSession()) - : Location.current(tabletServer.getTabletSession()); + while (true) { + try (var tabletsMutator = getContext().getAmple().conditionallyMutateTablets()) { - var tablet = tabletsMutator.mutateTablet(extent).requireLocation(expectedLocation) - .requireSame(lastTabletMetadata, ColumnType.TIME); + var expectedLocation = mincReason == MinorCompactionReason.RECOVERY + ? Location.future(tabletServer.getTabletSession()) + : Location.current(tabletServer.getTabletSession()); - Optional newFile = Optional.empty(); + var tablet = tabletsMutator.mutateTablet(extent).requireLocation(expectedLocation); - // if entries are present, write to path to metadata table - if (dfv.getNumEntries() > 0) { - tablet.putFile(newDatafile, dfv); - newFile = Optional.of(newDatafile.insert()); - } + Optional newFile = Optional.empty(); - var newTime = tabletTime.getMetadataTime(maxCommittedTime); - tablet.putTime(newTime); + // if entries are present, write to path to metadata table + if (dfv.getNumEntries() > 0) { + tablet.putFile(newDatafile, dfv); + newFile = Optional.of(newDatafile.insert()); + } - tablet.putFlushId(flushId); + boolean setTime = false; + // bulk imports can also update time in the metadata table, so only update if we are moving + // time forward + if (maxCommittedTime > lastTabletMetadata.getTime().getTime()) { + tablet.requireSame(lastTabletMetadata, ColumnType.TIME); + var newTime = tabletTime.getMetadataTime(maxCommittedTime); + tablet.putTime(newTime); + setTime = true; + } - unusedWalLogs.forEach(tablet::deleteWal); + tablet.putFlushId(flushId); - tablet.putZooLock(getContext().getZooKeeperRoot(), tabletServer.getLock()); + long flushNonce = RANDOM.get().nextLong(); + tablet.putFlushNonce(flushNonce); - // When trying to determine if write was successful, check if the time was updated. Can not - // check if the new file exists because of two reasons. First, it could be compacted away - // between the write and check. Second, some flushes do not produce a file. - tablet.submit(tabletMetadata -> tabletMetadata.getTime().equals(newTime)); + unusedWalLogs.forEach(tablet::deleteWal); - var result = tabletsMutator.process().get(extent); - if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { + tablet.putZooLock(getContext().getZooKeeperRoot(), tabletServer.getLock()); - log.error("Metadata for failed tablet file update : {}", result.readMetadata()); + // When trying to determine if write was successful, check if the flush nonce was updated. + // Can not check if the new file exists because of two reasons. First, it could be compacted + // away between the write and check. Second, some flushes do not produce a file. + tablet.submit(tabletMetadata -> { + // ELASTICITY_TODO need to test this, need a general way of testing these failure checks + var persistedNonce = tabletMetadata.getFlushNonce(); + if (persistedNonce.isPresent()) { + return persistedNonce.getAsLong() == flushNonce; + } + return false; + }); - // Include the things that could have caused the write to fail. - throw new IllegalStateException("Unable to write minor compaction. " + extent + " " - + expectedLocation + " " + expectedTime); + var result = tabletsMutator.process().get(extent); + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + return newFile; + } else { + var updatedTableMetadata = result.readMetadata(); + if (setTime && expectedLocation.equals(updatedTableMetadata.getLocation()) + && !lastTabletMetadata.getTime().equals(updatedTableMetadata.getTime())) { + // ELASTICITY_TODO need to test this + // The update failed because the time changed, so lets try again. + log.debug("Failed to add {} to {} because time changed {}!={}, will retry", newFile, + extent, lastTabletMetadata.getTime(), updatedTableMetadata.getTime()); + lastTabletMetadata = updatedTableMetadata; + UtilWaitThread.sleep(1000); + } else { + log.error("Metadata for failed tablet file update : {}", updatedTableMetadata); + // Include the things that could have caused the write to fail. + throw new IllegalStateException( + "Unable to add file to tablet. " + extent + " " + expectedLocation); + } + } } - - return newFile; } } @@ -1540,13 +1555,21 @@ public void refreshMetadata(RefreshPurpose refreshPurpose) { var prevMetadata = latestMetadata; latestMetadata = tabletMetadata; + // Its expected that what is persisted should be less than equal to the time that tablet has + // in memory. + Preconditions.checkState(tabletMetadata.getTime().getTime() <= tabletTime.getTime(), + "Time in metadata is ahead of tablet %s memory:%s metadata:%s", extent, tabletTime, + tabletMetadata.getTime()); + if (log.isDebugEnabled() && !prevMetadata.getFiles().equals(latestMetadata.getFiles())) { SetView removed = Sets.difference(prevMetadata.getFiles(), latestMetadata.getFiles()); SetView added = Sets.difference(latestMetadata.getFiles(), prevMetadata.getFiles()); - log.debug("Tablet {} was refreshed. Files removed: {} Files added: {}", this.getExtent(), - removed, added); + log.debug("Tablet {} was refreshed because {}. Files removed: [{}] Files added: [{}]", + this.getExtent(), refreshPurpose, + removed.stream().map(StoredTabletFile::getFileName).collect(Collectors.joining(",")), + added.stream().map(StoredTabletFile::getFileName).collect(Collectors.joining(","))); } if (refreshPurpose == RefreshPurpose.MINC_COMPLETION) { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index cb04fc74644..80c577526ac 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -256,6 +256,11 @@ public void testSetTime() throws Exception { } } + // Writes to a tablet should not change time unless it flushes, so time in metadata table + // should be the same + assertEquals(new MetadataTime(1, TimeType.LOGICAL), + ctx.getAmple().readTablet(extent).getTime()); + // verify data written by batch writer overwrote bulk imported data try (var scanner = client.createScanner(tableName)) { assertEquals(2, @@ -287,10 +292,20 @@ public void testSetTime() throws Exception { }); } + // the bulk import should update the time in the metadata table + assertEquals(new MetadataTime(2 + added, TimeType.LOGICAL), + ctx.getAmple().readTablet(extent).getTime()); + client.tableOperations().flush(tableName, null, null, true); + + // the flush should not change the time in the metadata table assertEquals(new MetadataTime(2 + added, TimeType.LOGICAL), ctx.getAmple().readTablet(extent).getTime()); + try (var scanner = client.createScanner("accumulo.metadata")) { + scanner.forEach((k, v) -> System.out.println(k + " " + v)); + } + } }