Skip to content

Commit

Permalink
Adds Static "named" constructors and updates GC (apache#3805)
Browse files Browse the repository at this point in the history
* Adds static named constructors to ensure that inuse scan candidates are not removed.
* Fix possible race condition with InUse Candidates

This change writes the gcCandidates twice when performing a major
compaction to ensure that valid candidates were not removed before the
tablet mutation had completed.

Fixes: apache#3802

* Refactored test method name

Renamed `assertRemoved` to `assertFileRemoved` to convey that the
candidate is now an hdfs file reference that has been deleted by the GC
  • Loading branch information
ddanielr authored Oct 24, 2023
1 parent d021f3c commit e6cbdb4
Show file tree
Hide file tree
Showing 16 changed files with 181 additions and 106 deletions.
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/gc/Reference.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public interface Reference {
*/
boolean isDirectory();

/**
* Only return true if the reference is a scan.
*/
boolean isScan();

/**
* Get the {@link TableId} of the reference.
*/
Expand All @@ -42,6 +47,8 @@ public interface Reference {
* {@link org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily}
* A directory will be read from the "srv:dir" column family:
* {@link org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily}
* A scan will be read from the Tablet "scan" column family:
* {@link org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily}
*/
String getMetadataEntry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class ReferenceDirectory extends ReferenceFile {
private final String tabletDir; // t-0003

public ReferenceDirectory(TableId tableId, String dirName) {
super(tableId, dirName);
super(tableId, dirName, false);
MetadataSchema.TabletsSection.ServerColumnFamily.validateDirCol(dirName);
this.tabletDir = dirName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,35 @@
public class ReferenceFile implements Reference, Comparable<ReferenceFile> {
// parts of an absolute URI, like "hdfs://1.2.3.4/accumulo/tables/2a/t-0003"
public final TableId tableId; // 2a
public final boolean isScan;

// the exact string that is stored in the metadata
protected final String metadataEntry;

public ReferenceFile(TableId tableId, String metadataEntry) {
protected ReferenceFile(TableId tableId, String metadataEntry, boolean isScan) {
this.tableId = Objects.requireNonNull(tableId);
this.metadataEntry = Objects.requireNonNull(metadataEntry);
this.isScan = isScan;
}

public static ReferenceFile forFile(TableId tableId, String metadataEntry) {
return new ReferenceFile(tableId, metadataEntry, false);
}

public static ReferenceFile forScan(TableId tableId, String metadataEntry) {
return new ReferenceFile(tableId, metadataEntry, true);
}

@Override
public boolean isDirectory() {
return false;
}

@Override
public boolean isScan() {
return isScan;
}

@Override
public TableId getTableId() {
return tableId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class AllVolumesDirectory extends ReferenceFile {

public AllVolumesDirectory(TableId tableId, String dirName) {
super(tableId, getDeleteTabletOnAllVolumesUri(tableId, dirName));
super(tableId, getDeleteTabletOnAllVolumesUri(tableId, dirName), false);
}

private static String getDeleteTabletOnAllVolumesUri(TableId tableId, String dirName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ public void deleteGcCandidates(DataLevel level, Collection<GcCandidate> candidat

if (level == DataLevel.ROOT) {
if (type == GcCandidateType.INUSE) {
// Deletion of INUSE candidates is not supported in 2.1.x.
// Since there is only a single root tablet, supporting INUSE candidate deletions would add
// additional code complexity without any substantial benefit.
// Therefore, deletion of root INUSE candidates is not supported.
return;
}
mutateRootGcCandidates(rgcc -> rgcc.remove(candidates.stream()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ public static void replaceDatafiles(ServerContext context, KeyExtent extent,
TServerInstance tServerInstance, Location lastLocation, ServiceLock zooLock,
Optional<ExternalCompactionId> ecid) {

// Write candidates before the mutation to ensure that a process failure after a mutation would
// not affect candidate creation
context.getAmple().putGcCandidates(extent.tableId(), datafilesToDelete);

TabletMutator tablet = context.getAmple().mutateTablet(extent);
Expand All @@ -204,6 +206,8 @@ public static void replaceDatafiles(ServerContext context, KeyExtent extent,
tablet.putZooLock(zooLock);

tablet.mutate();
// Write candidates again to avoid a possible race condition when removing InUse candidates
context.getAmple().putGcCandidates(extent.tableId(), datafilesToDelete);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ public static void deleteTable(TableId tableId, boolean insertDeletes, ServerCon

if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
StoredTabletFile stf = new StoredTabletFile(key.getColumnQualifierData().toString());
bw.addMutation(
ample.createDeleteMutation(new ReferenceFile(tableId, stf.getMetaUpdateDelete())));
bw.addMutation(ample
.createDeleteMutation(ReferenceFile.forFile(tableId, stf.getMetaUpdateDelete())));
}

if (ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) {
Expand Down
32 changes: 18 additions & 14 deletions server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,36 +189,40 @@ public Stream<Reference> getReferences() {

// there is a lot going on in this "one line" so see below for more info
var tabletReferences = tabletStream.flatMap(tm -> {
var tableId = tm.getTableId();

// verify that dir and prev row entries present for to check for complete row scan
log.trace("tablet metadata table id: {}, end row:{}, dir:{}, saw: {}, prev row: {}",
tm.getTableId(), tm.getEndRow(), tm.getDirName(), tm.sawPrevEndRow(), tm.getPrevEndRow());
log.trace("tablet metadata table id: {}, end row:{}, dir:{}, saw: {}, prev row: {}", tableId,
tm.getEndRow(), tm.getDirName(), tm.sawPrevEndRow(), tm.getPrevEndRow());
if (tm.getDirName() == null || tm.getDirName().isEmpty() || !tm.sawPrevEndRow()) {
throw new IllegalStateException("possible incomplete metadata scan for table id: "
+ tm.getTableId() + ", end row: " + tm.getEndRow() + ", dir: " + tm.getDirName()
+ ", saw prev row: " + tm.sawPrevEndRow());
throw new IllegalStateException("possible incomplete metadata scan for table id: " + tableId
+ ", end row: " + tm.getEndRow() + ", dir: " + tm.getDirName() + ", saw prev row: "
+ tm.sawPrevEndRow());
}

// combine all the entries read from file and scan columns in the metadata table
Stream<StoredTabletFile> fileStream = tm.getFiles().stream();
Stream<StoredTabletFile> stfStream = tm.getFiles().stream();
// map the files to Reference objects
var fileStream = stfStream.map(f -> ReferenceFile.forFile(tableId, f.getMetaUpdateDelete()));

// scans are normally empty, so only introduce a layer of indirection when needed
final var tmScans = tm.getScans();
if (!tmScans.isEmpty()) {
fileStream = Stream.concat(fileStream, tmScans.stream());
var scanStream =
tmScans.stream().map(s -> ReferenceFile.forScan(tableId, s.getMetaUpdateDelete()));
fileStream = Stream.concat(fileStream, scanStream);
}
// map the files to Reference objects
var stream = fileStream.map(f -> new ReferenceFile(tm.getTableId(), f.getMetaUpdateDelete()));
// if dirName is populated then we have a tablet directory aka srv:dir
// if dirName is populated, then we have a tablet directory aka srv:dir
if (tm.getDirName() != null) {
// add the tablet directory to the stream
var tabletDir = new ReferenceDirectory(tm.getTableId(), tm.getDirName());
stream = Stream.concat(stream, Stream.of(tabletDir));
var tabletDir = new ReferenceDirectory(tableId, tm.getDirName());
fileStream = Stream.concat(fileStream, Stream.of(tabletDir));
}
return stream;
return fileStream;
});

var scanServerRefs = context.getAmple().getScanServerFileReferences()
.map(sfr -> new ReferenceFile(sfr.getTableId(), sfr.getPathStr()));
.map(sfr -> ReferenceFile.forScan(sfr.getTableId(), sfr.getPathStr()));

return Stream.concat(tabletReferences, scanServerRefs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private SortedMap<String,GcCandidate> makeRelative(Collection<GcCandidate> candi
private void removeCandidatesInUse(GarbageCollectionEnvironment gce,
SortedMap<String,GcCandidate> candidateMap) throws InterruptedException {

List<GcCandidate> inUseCandidates = new ArrayList<>();
List<GcCandidate> candidateEntriesToBeDeleted = new ArrayList<>();
Set<TableId> tableIdsBefore = gce.getCandidateTableIDs();
Set<TableId> tableIdsSeen = new HashSet<>();
Iterator<Reference> iter = gce.getReferences().iterator();
Expand All @@ -163,8 +163,7 @@ private void removeCandidatesInUse(GarbageCollectionEnvironment gce,
GcCandidate gcTemp = candidateMap.remove(dir);
if (gcTemp != null) {
log.debug("Directory Candidate was still in use by dir ref: {}", dir);
// Intentionally not adding dir candidates to inUseCandidates as they are only added once.
// If dir candidates are deleted, due to being in use, nothing will add them again.
// Do not add dir candidates to candidateEntriesToBeDeleted as they are only created once.
}
} else {
String reference = ref.getMetadataEntry();
Expand All @@ -183,23 +182,26 @@ private void removeCandidatesInUse(GarbageCollectionEnvironment gce,
GcCandidate gcTemp = candidateMap.remove(relativePath);
if (gcTemp != null) {
log.debug("File Candidate was still in use: {}", relativePath);
inUseCandidates.add(gcTemp);
// Prevent deletion of candidates that are still in use by scans, because they won't be
// recreated once the scan is finished.
if (!ref.isScan()) {
candidateEntriesToBeDeleted.add(gcTemp);
}
}

String dir = relativePath.substring(0, relativePath.lastIndexOf('/'));
GcCandidate gcT = candidateMap.remove(dir);
if (gcT != null) {
log.debug("Directory Candidate was still in use by file ref: {}", relativePath);
// Intentionally not adding dir candidates to inUseCandidates as they are only added once.
// If dir candidates are deleted, due to being in use, nothing will add them again.
// Do not add dir candidates to candidateEntriesToBeDeleted as they are only created once.
}
}
}
Set<TableId> tableIdsAfter = gce.getCandidateTableIDs();
ensureAllTablesChecked(Collections.unmodifiableSet(tableIdsBefore),
Collections.unmodifiableSet(tableIdsSeen), Collections.unmodifiableSet(tableIdsAfter));
if (gce.canRemoveInUseCandidates()) {
gce.deleteGcCandidates(inUseCandidates, GcCandidateType.INUSE);
gce.deleteGcCandidates(candidateEntriesToBeDeleted, GcCandidateType.INUSE);
}
}

Expand Down
Loading

0 comments on commit e6cbdb4

Please sign in to comment.