Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moves finding split points into FATE #4178

Merged
merged 11 commits into from
Feb 21, 2024
Prev Previous commit
Next Next commit
improves interaction between Fate and FateStore
  • Loading branch information
keith-turner committed Feb 8, 2024

Verified

This commit was signed with the committer’s verified signature.
codemonium Igor Artemenko
commit 8d5046f71624d14625101707ac8779f07b1cde01
61 changes: 39 additions & 22 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
@@ -33,7 +33,6 @@

import java.util.EnumSet;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionException;
@@ -47,7 +46,6 @@
import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.logging.FateLogger;
@@ -59,6 +57,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
* Fault tolerant executor
*/
@@ -336,9 +336,42 @@ public FateId startTransaction() {
return store.create();
}

// TODO combine with seed
public OptionalLong startTransaction(String keyType, ByteSequence key) {
return OptionalLong.empty();
public Optional<FateId> seedTransaction(String txName, FateStore.FateKey fateKey, Repo<T> repo,
boolean autoCleanUp, String goalMessage) {

Optional<FateTxStore<T>> optTxStore = store.createAndReserve(fateKey);

return optTxStore.map(txStore -> {
var fateId = txStore.getID();
try {
Preconditions.checkState(txStore.getStatus() == NEW);
seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore);
} finally {
txStore.unreserve(0, MILLISECONDS);
}
return fateId;
});
}

private void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp,
String goalMessage, FateTxStore<T> txStore) {
if (txStore.top() == null) {
try {
log.info("Seeding {} {}", fateId, goalMessage);
txStore.push(repo);
} catch (StackOverflowException e) {
// this should not happen
throw new IllegalStateException(e);
}
}

if (autoCleanUp) {
txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
}

txStore.setTransactionInfo(TxInfo.TX_NAME, txName);

txStore.setStatus(SUBMITTED);
}

// start work in the transaction.. it is safe to call this
@@ -348,23 +381,7 @@ public void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean
FateTxStore<T> txStore = store.reserve(fateId);
try {
if (txStore.getStatus() == NEW) {
if (txStore.top() == null) {
try {
log.info("Seeding {} {}", fateId, goalMessage);
txStore.push(repo);
} catch (StackOverflowException e) {
// this should not happen
throw new IllegalStateException(e);
}
}

if (autoCleanUp) {
txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
}

txStore.setTransactionInfo(TxInfo.TX_NAME, txName);

txStore.setStatus(SUBMITTED);
seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, txStore);
}
} finally {
txStore.unreserve(0, TimeUnit.MILLISECONDS);
26 changes: 26 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
Original file line number Diff line number Diff line change
@@ -22,6 +22,8 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.dataImpl.KeyExtent;

/**
* Transaction Store: a place to save transactions
*
@@ -39,6 +41,30 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
*/
FateId create();

// temp stub class, eventually use type from #4204 instead
static class FateKey {
public static FateKey forSplit(KeyExtent extent) {
throw new UnsupportedOperationException();
}
}

/**
* Creates and reserves a transaction using the given key. If something is already running for the
* given key, then Optional.empty() will be returned. When this returns a non-empty id, it will be
* in the new state.
*
* <p>
* In the case where a process dies in the middle of a call to this. If later, another call is
* made with the same key and its in the new state then the FateId for that key will be returned.
* </p>
*
* @throws IllegalStateException when there is an unexpected collision. This can occur if two key
* hash to the same FateId or if a random FateId already exists.
*/
default Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
throw new UnsupportedOperationException();
}

/**
* An interface that allows read/write access to the data related to a single fate operation.
*/
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.split.FindSplits;
import org.slf4j.Logger;
@@ -44,11 +45,15 @@ public void run() {
try {
var fateInstanceType = FateInstanceType.fromTableId((extent.tableId()));

Optional<FateId> fateTxId = Optional.empty();
// manager.fate(fateInstanceType).startTransaction("SYSTEM_SPLIT", createSplitKey(extent));
Optional<FateId> optFateId = manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT",
FateStore.FateKey.forSplit(extent), new FindSplits(extent), true,
"System initiated split of tablet " + extent);

fateTxId.ifPresent(txid -> manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT",
txid, new FindSplits(extent), true, "System initiated split of tablet " + extent));
optFateId.ifPresentOrElse(fateId -> {
log.trace("System initiated a split for : {} {}", extent, fateId);
}, () -> {
log.trace("System attempted to initiate a split but one was in progress : {}", extent);
});

} catch (Exception e) {
log.error("Failed to split {}", extent, e);