Skip to content

Commit

Permalink
Reverted changes in CommitCompaction and CleanUpBulkImport
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Apr 16, 2024
1 parent 899511f commit cbf53b7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,7 +79,7 @@ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
// process died and now its running again. In this case commit should do nothing, but its
// important to still carry on with the rest of the steps after commit. This code ignores a that
// fact that a commit may not have happened in the current call and continues for this reason.
TabletMetadata tabletMetadata = commitCompaction(manager, ecid, newFile);
TabletMetadata tabletMetadata = commitCompaction(manager.getContext(), ecid, newFile);

String loc = null;
if (tabletMetadata != null && tabletMetadata.getLocation() != null) {
Expand All @@ -96,11 +97,11 @@ KeyExtent getExtent() {
return KeyExtent.fromThrift(commitData.textent);
}

private TabletMetadata commitCompaction(Manager manager, ExternalCompactionId ecid,
private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid,
Optional<ReferencedTabletFile> newDatafile) {

var tablet = manager.getContext().getAmple().readTablet(getExtent(), ECOMP, SELECTED, LOCATION,
FILES, COMPACTED, OPID);
var tablet =
ctx.getAmple().readTablet(getExtent(), ECOMP, SELECTED, LOCATION, FILES, COMPACTED, OPID);

Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
Expand All @@ -115,7 +116,7 @@ private TabletMetadata commitCompaction(Manager manager, ExternalCompactionId ec
newFile -> Preconditions.checkState(!tablet2.getFiles().contains(newFile.insert()),
"File already exists in tablet %s %s", newFile, tablet2.getFiles()));

try (var tabletsMutator = manager.getContext().getAmple().conditionallyMutateTablets()) {
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation()
.requireCompaction(ecid).requireSame(tablet, FILES, LOCATION);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {

log.debug("{} removing the metadata table markers for loaded files in range {} {}", fateId,
firstSplit, lastSplit);
removeBulkLoadEntries(manager, ample, info.tableId, fateId, firstSplit, lastSplit);
removeBulkLoadEntries(ample, info.tableId, fateId, firstSplit, lastSplit);

Utils.unreserveHdfsDirectory(manager, info.sourceDir, fateId);
Utils.getReadLock(manager, info.tableId, fateId).unlock();
Expand All @@ -91,8 +91,8 @@ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
return null;
}

private static void removeBulkLoadEntries(Manager manager, Ample ample, TableId tableId,
FateId fateId, Text firstSplit, Text lastSplit) {
private static void removeBulkLoadEntries(Ample ample, TableId tableId, FateId fateId,
Text firstSplit, Text lastSplit) {

Retry retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(1)).backOffFactor(1.5)
Expand Down

0 comments on commit cbf53b7

Please sign in to comment.