From cb1158ece9f20e8acdab9fa796a230de629364fc Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Thu, 16 Jan 2025 11:27:58 -0500 Subject: [PATCH] `admin fate` improvements, `LockID`s use for fate stores improved/fixed (#5028) * `admin fate` improvements, `LockID`s use for fate stores improved/fixed This commit makes several improvements/fixes: improvements to the `admin fate` util which were made possible from #4524, replaced incorrect use of `createDummyLockID()` in real code (now only used in tests), `FateStore` now support a null lock id if they will be used as read-only stores: write ops will fail on a store with a null lock, and some other misc. changes. Full list of changes: - Removed the check for a dead Manager in the Admin fate util (AdminUtil) which was checked before `admin fate delete ` or `admin fate fail ` was able to run. This check is no longer needed with the changes from #4524. #4524 moved reservations out of Manager memory into the FATE table (for UserFateStore) and into ZK (for MetaFateStore). Prior to this, the Admin process would have no way of knowing if the Manager process had a transaction reserved, so the Manager had to be shutdown to ensure it was not. But now that reservations are visible to any process, we can try to reserve the transaction in Admin, and if it cannot be reserved and deleted/failed in a reasonable time, we let the user know that the Manager would need to be shutdown if deleting/failing the transaction is still desired. - This has several benefits: - It is one less thing to worry about when implementing multiple managers in the future since Admin assumes only one Manager for these commands. However, there is still the case where the Manager may keep a transaction reserved for a long period of time and the Admin can never reserve it. In this case, we inform the user that the transaction could not be deleted/failed and that if deleting/failing is still desired, the Manager may need to be shutdown. - It covers a potential issue in the previously existing code where there was nothing stopping or ensuring a Manager is not started after the check is already performed in Admin but before the delete/ fail was executed. - It also should make the commands easier to use now since the Manager is not required to be shutdown before use. - Changes and adds some tests for `admin fate fail` and `admin fate delete`: ensures the Manager is not required to be down to fail/delete a transaction, and ensures that if the Manager does have a transaction reserved, admin will fail to reserve and fail/delete the transaction. - Another change which was needed as a prerequisite for the above changes was creating a ZK lock for Admin so transactions can be properly reserved by the command. Added new constant `Constants.ZADMIN_LOCK = "/admin/lock"`, changed `ServiceLockPaths`, and added `Admin.createAdminLock()` to support this - New class `TestLock` (in test package) which is used by tests to create a real ZK lock, or a dummy one. Removed `createDummyLockID()` from `AbstractFateStore` (moved to TestLock), and `createDummyLock()` is now only used in test code. Added new constant `ZTEST_LOCK = "/test/lock"`, changed `ServiceLockPaths`, and added `createTestLock()` which is used to create a real lock id (one held in ZK) which is needed for some tests. - This fixes an unexpected failure that could have occurred for `ExternalCompaction_1_IT`. Was using a dummy lock for the store before and the fate data was being stored in the same locations that the Manager uses for it's fate data. The DeadReservationCleaner running in Manager would have cleaned up reservations created using this store if it ran when reservations were present. Now the test creates a real ZK lock so the DeadReservationCleaner won't clean these up unexpectedly. - Stores now support a null lock id for the situation where they will be used as read-only stores. A store with a null lock id will fail on write ops. Changed all existing uses of stores to only have a lock id if writes will occur (previously, all instances of the stores had a lock id). - Removed unused or unneccesary constructors for AbstractFateStore, MetaFateStore, UserFateStore - Ensured all tests changed, all FATE tests, and sunny day tests still pass closes #4904 --- .../org/apache/accumulo/core/Constants.java | 2 + .../accumulo/core/fate/AbstractFateStore.java | 22 +- .../apache/accumulo/core/fate/AdminUtil.java | 233 +++++++++--------- .../core/fate/user/UserFateStore.java | 18 +- .../core/fate/zookeeper/MetaFateStore.java | 23 +- .../accumulo/core/fate/zookeeper/ZooUtil.java | 2 +- .../accumulo/core/lock/ServiceLockPaths.java | 17 +- .../MiniAccumuloClusterImpl.java | 2 +- .../apache/accumulo/server/util/Admin.java | 165 ++++++++++--- .../checkCommand/TableLocksCheckRunner.java | 8 +- .../manager/metrics/fate/FateMetrics.java | 8 +- .../metrics/fate/meta/MetaFateMetrics.java | 9 +- .../metrics/fate/user/UserFateMetrics.java | 9 +- .../compaction/ExternalCompaction_1_IT.java | 28 ++- .../apache/accumulo/test/fate/FastFate.java | 5 +- .../accumulo/test/fate/FateOpsCommandsIT.java | 206 +++++++++++----- .../accumulo/test/fate/FateTestRunner.java | 5 + .../accumulo/test/fate/MultipleStoresIT.java | 6 +- .../apache/accumulo/test/fate/TestLock.java | 115 +++++++++ .../accumulo/test/fate/meta/MetaFateIT.java | 2 +- .../fate/meta/MetaFateInterleavingIT.java | 2 +- .../test/fate/meta/MetaFateOpsCommandsIT.java | 49 +++- .../meta/MetaFateStatusEnforcementIT.java | 5 +- .../test/fate/meta/MetaFateStoreFateIT.java | 2 +- .../accumulo/test/fate/user/UserFateIT.java | 2 +- .../fate/user/UserFateInterleavingIT.java | 2 +- .../test/fate/user/UserFateOpsCommandsIT.java | 48 +++- .../user/UserFateStatusEnforcementIT.java | 4 +- .../test/fate/user/UserFateStoreFateIT.java | 2 +- .../test/functional/FateConcurrencyIT.java | 29 ++- .../test/functional/FunctionalTestUtils.java | 14 +- 31 files changed, 725 insertions(+), 319 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/fate/TestLock.java diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 6270fcb6c1b..32e70834031 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -88,6 +88,8 @@ public class Constants { public static final String ZTABLE_LOCKS = "/table_locks"; public static final String ZMINI_LOCK = "/mini"; + public static final String ZADMIN_LOCK = "/admin/lock"; + public static final String ZTEST_LOCK = "/test/lock"; public static final String BULK_PREFIX = "b-"; public static final String BULK_RENAME_FILE = "renames.json"; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 749a3260649..9b7d7965d61 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -101,10 +101,6 @@ public FateId newRandomId(FateInstanceType instanceType) { // Keeps track of the number of concurrent callers to waitForStatusChange() private final AtomicInteger concurrentStatusChangeCallers = new AtomicInteger(0); - public AbstractFateStore() { - this(createDummyLockID(), null, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); - } - public AbstractFateStore(ZooUtil.LockID lockID, Predicate isLockHeld) { this(lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } @@ -114,9 +110,7 @@ public AbstractFateStore(ZooUtil.LockID lockID, Predicate isLock this.maxDeferred = maxDeferred; this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator); this.deferred = Collections.synchronizedMap(new HashMap<>()); - this.lockID = Objects.requireNonNull(lockID); - // If the store is used for a Fate which runs a dead reservation cleaner, - // this should be non-null, otherwise null is fine + this.lockID = lockID; this.isLockHeld = isLockHeld; } @@ -291,6 +285,10 @@ protected void verifyFateKey(FateId fateId, Optional fateKeySeen, "Collision detected for fate id " + fateId); } + protected void verifyLock(ZooUtil.LockID lockID, FateId fateId) { + Preconditions.checkState(lockID != null, "Tried to reserve " + fateId + " with null lockID"); + } + protected abstract Stream getTransactions(EnumSet statuses); protected abstract TStatus _getStatus(FateId fateId); @@ -450,14 +448,4 @@ protected static Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { throw new IllegalStateException("Bad node data " + txInfo); } } - - /** - * this is a temporary method used to create a dummy lock when using a FateStore outside the - * context of a Manager (one example is testing) so reservations can still be made. - * - * @return a dummy {@link ZooUtil.LockID} - */ - public static ZooUtil.LockID createDummyLockID() { - return new ZooUtil.LockID("/path", "node", 123); - } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 54fb62e6a5a..c54ee85d1a8 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Stream; @@ -41,15 +42,13 @@ import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; - /** * A utility to administer FATE operations */ @@ -203,18 +202,19 @@ public Map> getDanglingWaitingLocks() { * instance type. This method does not process lock information, if lock information is desired, * use {@link #getStatus(ReadOnlyFateStore, ZooSession, ServiceLockPath, Set, EnumSet, EnumSet)} * - * @param fateStores read-only fate stores + * @param readOnlyFateStores read-only fate stores * @param fateIdFilter filter results to include only provided fate transaction ids * @param statusFilter filter results to include only provided status types * @param typesFilter filter results to include only provided fate instance types * @return list of FATE transactions that match filter criteria */ public List getTransactionStatus( - Map> fateStores, Set fateIdFilter, + Map> readOnlyFateStores, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) { - FateStatus status = getTransactionStatus(fateStores, fateIdFilter, statusFilter, typesFilter, - Collections.>emptyMap(), Collections.>emptyMap()); + FateStatus status = getTransactionStatus(readOnlyFateStores, fateIdFilter, statusFilter, + typesFilter, Collections.>emptyMap(), + Collections.>emptyMap()); return status.getTransactions(); } @@ -223,7 +223,7 @@ public List getTransactionStatus( * Get the FATE transaction status and lock information stored in zookeeper, optionally filtered * by fate id, status, and fate instance type * - * @param mfs read-only MetaFateStore + * @param readOnlyMFS read-only MetaFateStore * @param zk zookeeper reader. * @param lockPath the zookeeper path for locks * @param fateIdFilter filter results to include only provided fate transaction ids @@ -233,36 +233,37 @@ public List getTransactionStatus( * @throws KeeperException if zookeeper exception occurs * @throws InterruptedException if process is interrupted. */ - public FateStatus getStatus(ReadOnlyFateStore mfs, ZooSession zk, ServiceLockPath lockPath, - Set fateIdFilter, EnumSet statusFilter, + public FateStatus getStatus(ReadOnlyFateStore readOnlyMFS, ZooSession zk, + ServiceLockPath lockPath, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) throws KeeperException, InterruptedException { Map> heldLocks = new HashMap<>(); Map> waitingLocks = new HashMap<>(); findLocks(zk, lockPath, heldLocks, waitingLocks); - return getTransactionStatus(Map.of(FateInstanceType.META, mfs), fateIdFilter, statusFilter, - typesFilter, heldLocks, waitingLocks); + return getTransactionStatus(Map.of(FateInstanceType.META, readOnlyMFS), fateIdFilter, + statusFilter, typesFilter, heldLocks, waitingLocks); } - public FateStatus getStatus(ReadOnlyFateStore ufs, Set fateIdFilter, + public FateStatus getStatus(ReadOnlyFateStore readOnlyUFS, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) throws KeeperException, InterruptedException { - return getTransactionStatus(Map.of(FateInstanceType.USER, ufs), fateIdFilter, statusFilter, - typesFilter, new HashMap<>(), new HashMap<>()); + return getTransactionStatus(Map.of(FateInstanceType.USER, readOnlyUFS), fateIdFilter, + statusFilter, typesFilter, new HashMap<>(), new HashMap<>()); } - public FateStatus getStatus(Map> fateStores, ZooSession zk, - ServiceLockPath lockPath, Set fateIdFilter, EnumSet statusFilter, - EnumSet typesFilter) throws KeeperException, InterruptedException { + public FateStatus getStatus(Map> readOnlyFateStores, + ZooSession zk, ServiceLockPath lockPath, Set fateIdFilter, + EnumSet statusFilter, EnumSet typesFilter) + throws KeeperException, InterruptedException { Map> heldLocks = new HashMap<>(); Map> waitingLocks = new HashMap<>(); findLocks(zk, lockPath, heldLocks, waitingLocks); - return getTransactionStatus(fateStores, fateIdFilter, statusFilter, typesFilter, heldLocks, - waitingLocks); + return getTransactionStatus(readOnlyFateStores, fateIdFilter, statusFilter, typesFilter, + heldLocks, waitingLocks); } /** @@ -341,7 +342,7 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath, /** * Returns fate status, possibly filtered * - * @param fateStores read-only access to populated transaction stores. + * @param readOnlyFateStores read-only access to populated transaction stores. * @param fateIdFilter Optional. List of transactions to filter results - if null, all * transactions are returned * @param statusFilter Optional. List of status types to filter results - if null, all @@ -353,12 +354,12 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath, * @return current fate and lock status */ public static FateStatus getTransactionStatus( - Map> fateStores, Set fateIdFilter, + Map> readOnlyFateStores, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter, Map> heldLocks, Map> waitingLocks) { final List statuses = new ArrayList<>(); - fateStores.forEach((type, store) -> { + readOnlyFateStores.forEach((type, store) -> { try (Stream fateIds = store.list().map(FateIdStatus::getFateId)) { fateIds.forEach(fateId -> { @@ -413,17 +414,17 @@ private static boolean includeByInstanceType(FateInstanceType type, return typesFilter == null || typesFilter.isEmpty() || typesFilter.contains(type); } - public void printAll(Map> fateStores, ZooSession zk, + public void printAll(Map> readOnlyFateStores, ZooSession zk, ServiceLockPath tableLocksPath) throws KeeperException, InterruptedException { - print(fateStores, zk, tableLocksPath, new Formatter(System.out), null, null, null); + print(readOnlyFateStores, zk, tableLocksPath, new Formatter(System.out), null, null, null); } - public void print(Map> fateStores, ZooSession zk, + public void print(Map> readOnlyFateStores, ZooSession zk, ServiceLockPath tableLocksPath, Formatter fmt, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) throws KeeperException, InterruptedException { FateStatus fateStatus = - getStatus(fateStores, zk, tableLocksPath, fateIdFilter, statusFilter, typesFilter); + getStatus(readOnlyFateStores, zk, tableLocksPath, fateIdFilter, statusFilter, typesFilter); for (TransactionStatus txStatus : fateStatus.getTransactions()) { fmt.format( @@ -434,11 +435,7 @@ public void print(Map> fateStores, ZooSess fmt.format(" %s transactions", fateStatus.getTransactions().size()); } - public boolean prepDelete(Map> stores, ZooSession zk, - ServiceLockPath path, String fateIdStr) { - if (!checkGlobalLock(zk, path)) { - return false; - } + public boolean prepDelete(Map> stores, String fateIdStr) { FateId fateId; try { @@ -452,36 +449,37 @@ public boolean prepDelete(Map> stores, ZooSession // determine which store to use FateStore store = stores.get(fateId.getType()); - FateTxStore txStore = store.reserve(fateId); - try { - TStatus ts = txStore.getStatus(); - switch (ts) { - case UNKNOWN: - System.out.println("Invalid transaction ID: " + fateId); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - case FAILED: - case FAILED_IN_PROGRESS: - case SUCCESSFUL: - System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts); - txStore.forceDelete(); - state = true; - break; + Optional> opTxStore = tryReserve(store, fateId, "delete"); + if (opTxStore.isPresent()) { + var txStore = opTxStore.orElseThrow(); + + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.println("Invalid transaction ID: " + fateId); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + case FAILED: + case FAILED_IN_PROGRESS: + case SUCCESSFUL: + System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts); + txStore.forceDelete(); + state = true; + break; + } + } finally { + txStore.unreserve(Duration.ZERO); } - } finally { - txStore.unreserve(Duration.ZERO); } + return state; } - public boolean prepFail(Map> stores, ZooSession zk, - ServiceLockPath zLockManagerPath, String fateIdStr) { - if (!checkGlobalLock(zk, zLockManagerPath)) { - return false; - } + public boolean prepFail(Map> stores, String fateIdStr) { FateId fateId; try { @@ -495,39 +493,75 @@ public boolean prepFail(Map> stores, ZooSession zk // determine which store to use FateStore store = stores.get(fateId.getType()); - FateTxStore txStore = store.reserve(fateId); - try { - TStatus ts = txStore.getStatus(); - switch (ts) { - case UNKNOWN: - System.out.println("Invalid fate ID: " + fateId); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - System.out.printf("Failing transaction: %s (%s)%n", fateId, ts); - txStore.setStatus(TStatus.FAILED_IN_PROGRESS); - state = true; - break; - - case SUCCESSFUL: - System.out.printf("Transaction already completed: %s (%s)%n", fateId, ts); - break; - - case FAILED: - case FAILED_IN_PROGRESS: - System.out.printf("Transaction already failed: %s (%s)%n", fateId, ts); - state = true; - break; + Optional> opTxStore = tryReserve(store, fateId, "fail"); + if (opTxStore.isPresent()) { + var txStore = opTxStore.orElseThrow(); + + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.println("Invalid fate ID: " + fateId); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + System.out.printf("Failing transaction: %s (%s)%n", fateId, ts); + txStore.setStatus(TStatus.FAILED_IN_PROGRESS); + state = true; + break; + + case SUCCESSFUL: + System.out.printf("Transaction already completed: %s (%s)%n", fateId, ts); + break; + + case FAILED: + case FAILED_IN_PROGRESS: + System.out.printf("Transaction already failed: %s (%s)%n", fateId, ts); + state = true; + break; + } + } finally { + txStore.unreserve(Duration.ZERO); } - } finally { - txStore.unreserve(Duration.ZERO); } return state; } + /** + * Try to reserve the transaction for a minute. If it could not be reserved, return an empty + * optional + */ + private Optional> tryReserve(FateStore store, FateId fateId, String op) { + var retry = Retry.builder().maxRetriesWithinDuration(Duration.ofMinutes(1)) + .retryAfter(Duration.ofMillis(25)).incrementBy(Duration.ofMillis(25)) + .maxWait(Duration.ofSeconds(15)).backOffFactor(1.5).logInterval(Duration.ofSeconds(15)) + .createRetry(); + + Optional> reserveAttempt = store.tryReserve(fateId); + while (reserveAttempt.isEmpty() && retry.canRetry()) { + retry.useRetry(); + try { + retry.waitForNextAttempt(log, "Attempting to reserve " + fateId); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalArgumentException(e); + } + reserveAttempt = store.tryReserve(fateId); + } + if (reserveAttempt.isPresent()) { + retry.logCompletion(log, "Attempting to reserve " + fateId); + } else { + log.error("Could not {} {} in a reasonable time. This indicates the Manager is currently " + + "working on it. The Manager may need to be stopped and the command rerun to complete " + + "this.", op, fateId); + } + + return reserveAttempt; + } + public void deleteLocks(ZooSession zk, ServiceLockPath path, String fateIdStr) throws KeeperException, InterruptedException { var zrw = zk.asReaderWriter(); @@ -548,35 +582,4 @@ public void deleteLocks(ZooSession zk, ServiceLockPath path, String fateIdStr) } } } - - @SuppressFBWarnings(value = "DM_EXIT", - justification = "TODO - should probably avoid System.exit here; " - + "this code is used by the fate admin shell command") - public boolean checkGlobalLock(ZooSession zk, ServiceLockPath zLockManagerPath) { - try { - if (ServiceLock.getLockData(zk, zLockManagerPath).isPresent()) { - System.err.println("ERROR: Manager lock is held, not running"); - if (this.exitOnError) { - System.exit(1); - } else { - return false; - } - } - } catch (KeeperException e) { - System.err.println("ERROR: Could not read manager lock, not running " + e.getMessage()); - if (this.exitOnError) { - System.exit(1); - } else { - return false; - } - } catch (InterruptedException e) { - System.err.println("ERROR: Could not read manager lock, not running" + e.getMessage()); - if (this.exitOnError) { - System.exit(1); - } else { - return false; - } - } - return true; - } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 3d7c039e0f2..977adb2440f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -53,7 +53,6 @@ import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.UtilWaitThread; @@ -75,6 +74,17 @@ public class UserFateStore extends AbstractFateStore { private static final com.google.common.collect.Range REPO_RANGE = com.google.common.collect.Range.closed(1, MAX_REPOS); + /** + * Constructs a UserFateStore + * + * @param context the {@link ClientContext} + * @param tableName the name of the table which will store the Fate data + * @param lockID the {@link ZooUtil.LockID} held by the process creating this store. Should be + * null if this store will be used as read-only (will not be used to reserve transactions) + * @param isLockHeld the {@link Predicate} used to determine if the lockID is held or not at the + * time of invocation. If the store is used for a {@link Fate} which runs a dead + * reservation cleaner, this should be non-null, otherwise null is fine + */ public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID, Predicate isLockHeld) { this(context, tableName, lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); @@ -88,11 +98,6 @@ public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID loc this.tableName = Objects.requireNonNull(tableName); } - public UserFateStore(ClientContext context, ZooUtil.LockID lockID, - Predicate isLockHeld) { - this(context, AccumuloTable.FATE.tableName(), lockID, isLockHeld); - } - @Override public FateId create() { @@ -184,6 +189,7 @@ private boolean seedTransaction(Supplier> mutatorFactory, String @Override public Optional> tryReserve(FateId fateId) { + verifyLock(lockID, fateId); // Create a unique FateReservation for this reservation attempt FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java index 148292ed745..242e0b5e74f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java @@ -83,6 +83,17 @@ private String getTXPath(FateId fateId) { return path + "/tx_" + fateId.getTxUUIDStr(); } + /** + * Constructs a MetaFateStore + * + * @param path the path in ZK where the fate data will reside + * @param zk the {@link ZooSession} + * @param lockID the {@link ZooUtil.LockID} held by the process creating this store. Should be + * null if this store will be used as read-only (will not be used to reserve transactions) + * @param isLockHeld the {@link Predicate} used to determine if the lockID is held or not at the + * time of invocation. If the store is used for a {@link Fate} which runs a dead + * reservation cleaner, this should be non-null, otherwise null is fine + */ public MetaFateStore(String path, ZooSession zk, ZooUtil.LockID lockID, Predicate isLockHeld) throws KeeperException, InterruptedException { this(path, zk, lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); @@ -100,11 +111,6 @@ public MetaFateStore(String path, ZooSession zk, ZooUtil.LockID lockID, this.zrw.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); } - /** - * For testing only - */ - MetaFateStore() {} - @Override public FateId create() { while (true) { @@ -124,8 +130,9 @@ public FateId create() { } private Optional> createAndReserve(FateKey fateKey) { - final var reservation = FateReservation.from(lockID, UUID.randomUUID()); final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + verifyLock(lockID, fateId); + final var reservation = FateReservation.from(lockID, UUID.randomUUID()); try { byte[] newSerFateData = @@ -136,8 +143,7 @@ private Optional> createAndReserve(FateKey fateKey) { // node if it doesn't yet exist: // TStatus = TStatus.NEW, FateReservation = reservation, FateKey = fateKey // This might occur if there was a ZK server fault and the same write is running a - // 2nd - // time + // 2nd time // 2) The existing node for fateId has: // TStatus = TStatus.NEW, no FateReservation present, FateKey = fateKey // The fateId is NEW/unseeded and not reserved, so we can allow it to be reserved @@ -223,6 +229,7 @@ private void seedTransaction(Fate.FateOperation txName, Repo repo, boolean au @Override public Optional> tryReserve(FateId fateId) { + verifyLock(lockID, fateId); // uniquely identify this attempt to reserve the fate operation data FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java index a4934ae6eed..156b4f14a6b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java @@ -97,7 +97,7 @@ public String serialize(String root) { @Override public String toString() { - return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid); + return "path = " + path + " node = " + node + " eid = " + Long.toHexString(eid); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java index 4c82d085e59..8fd3f361dcc 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java @@ -72,7 +72,8 @@ private ServiceLockPath(String root, String type) { this.type = requireNonNull(type); Preconditions.checkArgument(this.type.equals(Constants.ZGC_LOCK) || this.type.equals(Constants.ZMANAGER_LOCK) || this.type.equals(Constants.ZMONITOR_LOCK) - || this.type.equals(Constants.ZTABLE_LOCKS), "Unsupported type: " + type); + || this.type.equals(Constants.ZTABLE_LOCKS) || this.type.equals(Constants.ZADMIN_LOCK) + || this.type.equals(Constants.ZTEST_LOCK), "Unsupported type: " + type); // These server types support only one active instance, so they use a lock at // a known path, not the server's address. this.resourceGroup = null; @@ -203,6 +204,10 @@ private static String determineServerType(final String path) { return Constants.ZMONITOR_LOCK; } else if (path.contains(Constants.ZMINI_LOCK)) { return Constants.ZMINI_LOCK; + } else if (path.contains(Constants.ZADMIN_LOCK)) { + return Constants.ZADMIN_LOCK; + } else if (path.contains(Constants.ZTEST_LOCK)) { + return Constants.ZTEST_LOCK; } else if (path.contains(Constants.ZCOMPACTORS)) { return Constants.ZCOMPACTORS; } else if (path.contains(Constants.ZSSERVERS)) { @@ -230,6 +235,8 @@ public static ServiceLockPath parse(Optional serverType, String path) { case Constants.ZGC_LOCK: case Constants.ZMANAGER_LOCK: case Constants.ZMONITOR_LOCK: + case Constants.ZADMIN_LOCK: + case Constants.ZTEST_LOCK: return new ServiceLockPath(path.substring(0, path.indexOf(type)), type); default: { final String[] pathParts = path.replaceFirst("/", "").split("/"); @@ -299,6 +306,14 @@ public ServiceLockPath createDeadTabletServerPath(String resourceGroup, serverAddress.toString()); } + public ServiceLockPath createAdminLockPath() { + return new ServiceLockPath(zkRoot, Constants.ZADMIN_LOCK); + } + + public ServiceLockPath createTestLockPath() { + return new ServiceLockPath(zkRoot, Constants.ZTEST_LOCK); + } + public Set getCompactor(ResourceGroupPredicate resourceGroupPredicate, AddressSelector address, boolean withLock) { return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, withLock); diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 38c4c2add96..86cec11696d 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -683,7 +683,7 @@ public void unableToMonitorLockNode(Exception e) { @Override public void acquiredLock() { - log.warn("Acquired ZK lock for MiniAccumuloClusterImpl"); + log.debug("Acquired ZK lock for MiniAccumuloClusterImpl"); lockAcquired.set(true); lockWatcherInvoked.countDown(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index c1d2bf71081..bb350ec5326 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -20,7 +20,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import java.io.BufferedWriter; import java.io.File; @@ -42,6 +41,8 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -73,7 +74,9 @@ import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.thrift.FateService; @@ -90,7 +93,9 @@ import org.apache.accumulo.core.singletons.SingletonManager.Mode; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.tables.TableMap; +import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.cli.ServerUtilOpts; import org.apache.accumulo.server.security.SecurityUtil; @@ -123,6 +128,7 @@ @AutoService(KeywordExecutable.class) public class Admin implements KeywordExecutable { private static final Logger log = LoggerFactory.getLogger(Admin.class); + private final CountDownLatch lockAcquiredLatch = new CountDownLatch(1); static class AdminOpts extends ServerUtilOpts { @Parameter(names = {"-f", "--force"}, @@ -330,11 +336,11 @@ static class FateOpsCommand { boolean cancel; @Parameter(names = {"-f", "--fail"}, - description = "... Transition FaTE transaction status to FAILED_IN_PROGRESS (requires Manager to be down)") + description = "... Transition FaTE transaction status to FAILED_IN_PROGRESS") boolean fail; @Parameter(names = {"-d", "--delete"}, - description = "... Delete FaTE transaction and its associated table locks (requires Manager to be down)") + description = "... Delete FaTE transaction and its associated table locks") boolean delete; @Parameter(names = {"-p", "--print", "-print", "-l", "--list", "-list"}, @@ -358,6 +364,36 @@ static class FateOpsCommand { List instanceTypes = new ArrayList<>(); } + class AdminLockWatcher implements ServiceLock.AccumuloLockWatcher { + @Override + public void lostLock(ServiceLock.LockLossReason reason) { + String msg = "Admin lost lock: " + reason.toString(); + if (reason == ServiceLock.LockLossReason.LOCK_DELETED) { + Halt.halt(msg, 0); + } else { + Halt.halt(msg, 1); + } + } + + @Override + public void unableToMonitorLockNode(Exception e) { + String msg = "Admin unable to monitor lock: " + e.getMessage(); + log.warn(msg); + Halt.halt(msg, 1); + } + + @Override + public void acquiredLock() { + lockAcquiredLatch.countDown(); + log.debug("Acquired ZooKeeper lock for Admin"); + } + + @Override + public void failedToAcquireLock(Exception e) { + log.warn("Failed to acquire ZooKeeper lock for Admin, msg: " + e.getMessage()); + } + } + public static void main(String[] args) { new Admin().execute(args); } @@ -886,50 +922,107 @@ private void executeFateOpsCommand(ServerContext context, FateOpsCommand fateOps AdminUtil admin = new AdminUtil<>(true); final String zkRoot = context.getZooKeeperRoot(); - var zLockManagerPath = context.getServerPaths().createManagerPath(); var zTableLocksPath = context.getServerPaths().createTableLocksPath(); String fateZkPath = zkRoot + Constants.ZFATE; var zk = context.getZooSession(); - MetaFateStore mfs = new MetaFateStore<>(fateZkPath, zk, createDummyLockID(), null); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); - Map> fateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); - Map> readOnlyFateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); - - if (fateOpsCommand.cancel) { - cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList); - } else if (fateOpsCommand.fail) { - for (String fateIdStr : fateOpsCommand.fateIdList) { - if (!admin.prepFail(fateStores, zk, zLockManagerPath, fateIdStr)) { - throw new AccumuloException("Could not fail transaction: " + fateIdStr); + ServiceLock adminLock = null; + Map> fateStores; + Map> readOnlyFateStores = null; + + try { + if (fateOpsCommand.cancel) { + cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList); + } else if (fateOpsCommand.fail) { + adminLock = createAdminLock(context); + fateStores = createFateStores(context, zk, fateZkPath, adminLock); + for (String fateIdStr : fateOpsCommand.fateIdList) { + if (!admin.prepFail(fateStores, fateIdStr)) { + throw new AccumuloException("Could not fail transaction: " + fateIdStr); + } + } + } else if (fateOpsCommand.delete) { + adminLock = createAdminLock(context); + fateStores = createFateStores(context, zk, fateZkPath, adminLock); + for (String fateIdStr : fateOpsCommand.fateIdList) { + if (!admin.prepDelete(fateStores, fateIdStr)) { + throw new AccumuloException("Could not delete transaction: " + fateIdStr); + } + admin.deleteLocks(zk, zTableLocksPath, fateIdStr); } } - } else if (fateOpsCommand.delete) { - for (String fateIdStr : fateOpsCommand.fateIdList) { - if (!admin.prepDelete(fateStores, zk, zLockManagerPath, fateIdStr)) { - throw new AccumuloException("Could not delete transaction: " + fateIdStr); + + if (fateOpsCommand.print) { + final Set fateIdFilter = new TreeSet<>(); + fateOpsCommand.fateIdList.forEach(fateIdStr -> fateIdFilter.add(FateId.from(fateIdStr))); + EnumSet statusFilter = + getCmdLineStatusFilters(fateOpsCommand.states); + EnumSet typesFilter = + getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes); + readOnlyFateStores = createReadOnlyFateStores(context, zk, fateZkPath); + admin.print(readOnlyFateStores, zk, zTableLocksPath, new Formatter(System.out), + fateIdFilter, statusFilter, typesFilter); + // print line break at the end + System.out.println(); + } + + if (fateOpsCommand.summarize) { + if (readOnlyFateStores == null) { + readOnlyFateStores = createReadOnlyFateStores(context, zk, fateZkPath); } - admin.deleteLocks(zk, zTableLocksPath, fateIdStr); + summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores, zTableLocksPath); + } + } finally { + if (adminLock != null) { + adminLock.unlock(); } } + } - if (fateOpsCommand.print) { - final Set fateIdFilter = new TreeSet<>(); - fateOpsCommand.fateIdList.forEach(fateIdStr -> fateIdFilter.add(FateId.from(fateIdStr))); - EnumSet statusFilter = - getCmdLineStatusFilters(fateOpsCommand.states); - EnumSet typesFilter = - getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes); - admin.print(readOnlyFateStores, zk, zTableLocksPath, new Formatter(System.out), fateIdFilter, - statusFilter, typesFilter); - // print line break at the end - System.out.println(); - } + private Map> createFateStores(ServerContext context, + ZooSession zk, String fateZkPath, ServiceLock adminLock) + throws InterruptedException, KeeperException { + var lockId = adminLock.getLockID(); + MetaFateStore mfs = new MetaFateStore<>(fateZkPath, zk, lockId, null); + UserFateStore ufs = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), lockId, null); + return Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); + } - if (fateOpsCommand.summarize) { - summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores, zTableLocksPath); + private Map> + createReadOnlyFateStores(ServerContext context, ZooSession zk, String fateZkPath) + throws InterruptedException, KeeperException { + MetaFateStore readOnlyMFS = new MetaFateStore<>(fateZkPath, zk, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); + return Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); + } + + private ServiceLock createAdminLock(ServerContext context) throws InterruptedException { + var zk = context.getZooSession(); + UUID uuid = UUID.randomUUID(); + ServiceLockPath slp = context.getServerPaths().createAdminLockPath(); + ServiceLock adminLock = new ServiceLock(zk, slp, uuid); + AdminLockWatcher lw = new AdminLockWatcher(); + ServiceLockData.ServiceDescriptors descriptors = new ServiceLockData.ServiceDescriptors(); + descriptors + .addService(new ServiceLockData.ServiceDescriptor(uuid, ServiceLockData.ThriftService.NONE, + "fake_admin_util_host", Constants.DEFAULT_RESOURCE_GROUP_NAME)); + ServiceLockData sld = new ServiceLockData(descriptors); + String lockPath = slp.toString(); + String parentLockPath = lockPath.substring(0, lockPath.lastIndexOf("/")); + + try { + var zrw = zk.asReaderWriter(); + zrw.putPersistentData(parentLockPath, new byte[0], ZooUtil.NodeExistsPolicy.SKIP); + zrw.putPersistentData(lockPath, new byte[0], ZooUtil.NodeExistsPolicy.SKIP); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error creating path in ZooKeeper", e); } + + adminLock.lock(lw, sld); + lockAcquiredLatch.await(); + + return adminLock; } private void validateFateUserInput(FateOpsCommand cmd) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java index 4a1438dc379..f8efa64c6ae 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java @@ -24,7 +24,6 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; @@ -65,10 +64,9 @@ private static Admin.CheckCommand.CheckStatus checkTableLocks(ServerContext cont final var zTableLocksPath = context.getServerPaths().createTableLocksPath(); final String fateZkPath = zkRoot + Constants.ZFATE; final var zk = context.getZooSession(); - final MetaFateStore mfs = - new MetaFateStore<>(fateZkPath, zk, AbstractFateStore.createDummyLockID(), null); - final UserFateStore ufs = new UserFateStore<>(context, AccumuloTable.FATE.tableName(), - AbstractFateStore.createDummyLockID(), null); + final MetaFateStore mfs = new MetaFateStore<>(fateZkPath, zk, null, null); + final UserFateStore ufs = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); log.trace("Ensuring table and namespace locks are valid..."); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index 200b3ff2b48..23cc841e9ce 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -53,7 +53,7 @@ public abstract class FateMetrics implements Metrics private static final String OP_TYPE_TAG = "op.type"; protected final ServerContext context; - protected final ReadOnlyFateStore> fateStore; + protected final ReadOnlyFateStore> readOnlyFateStore; protected final long refreshDelay; protected final AtomicLong totalCurrentOpsCount = new AtomicLong(0); @@ -62,14 +62,14 @@ public abstract class FateMetrics implements Metrics public FateMetrics(final ServerContext context, final long minimumRefreshDelay) { this.context = context; this.refreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, minimumRefreshDelay); - this.fateStore = Objects.requireNonNull(buildStore(context)); + this.readOnlyFateStore = Objects.requireNonNull(buildReadOnlyStore(context)); for (TStatus status : TStatus.values()) { txStatusCounters.put(status, new AtomicLong(0)); } } - protected abstract ReadOnlyFateStore> buildStore(ServerContext context); + protected abstract ReadOnlyFateStore> buildReadOnlyStore(ServerContext context); protected abstract T getMetricValues(); @@ -95,7 +95,7 @@ protected void update(T metricValues) { @Override public void registerMetrics(final MeterRegistry registry) { - String type = fateStore.type().name().toLowerCase(); + String type = readOnlyFateStore.type().name().toLowerCase(); Gauge.builder(FATE_OPS.getName(), totalCurrentOpsCount, AtomicLong::get) .description(FATE_OPS.getDescription()).register(registry); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java index 8412a17aad3..b87f3d48223 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.manager.metrics.fate.FateMetrics; @@ -62,10 +61,10 @@ public void registerMetrics(MeterRegistry registry) { } @Override - protected ReadOnlyFateStore> buildStore(ServerContext context) { + protected ReadOnlyFateStore> + buildReadOnlyStore(ServerContext context) { try { - return new MetaFateStore<>(getFateRootPath(context), context.getZooSession(), - AbstractFateStore.createDummyLockID(), null); + return new MetaFateStore<>(getFateRootPath(context), context.getZooSession(), null, null); } catch (KeeperException ex) { throw new IllegalStateException( "FATE Metrics - Failed to create zoo store - metrics unavailable", ex); @@ -78,7 +77,7 @@ protected ReadOnlyFateStore> buildStore(Server @Override protected MetaFateMetricValues getMetricValues() { - return MetaFateMetricValues.getMetaStoreMetrics(context, fateRootPath, fateStore); + return MetaFateMetricValues.getMetaStoreMetrics(context, fateRootPath, readOnlyFateStore); } private static String getFateRootPath(ServerContext context) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java index 92ac8568810..e4fad01899c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java @@ -18,9 +18,9 @@ */ package org.apache.accumulo.manager.metrics.fate.user; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.manager.metrics.fate.FateMetrics; import org.apache.accumulo.server.ServerContext; @@ -31,12 +31,13 @@ public UserFateMetrics(ServerContext context, long minimumRefreshDelay) { } @Override - protected ReadOnlyFateStore> buildStore(ServerContext context) { - return new UserFateStore<>(context, AbstractFateStore.createDummyLockID(), null); + protected ReadOnlyFateStore> + buildReadOnlyStore(ServerContext context) { + return new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); } @Override protected UserFateMetricValues getMetricValues() { - return UserFateMetricValues.getUserStoreMetrics(fateStore); + return UserFateMetricValues.getUserStoreMetrics(readOnlyFateStore); } } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 82c0f9b8bce..99002079d4a 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.test.compaction; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3; @@ -87,6 +86,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.schema.CompactionMetadata; @@ -102,17 +102,20 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.util.FindCompactionTmpFiles; +import org.apache.accumulo.test.fate.TestLock; import org.apache.accumulo.test.functional.CompactionIT.ErrorThrowingSelector; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import com.google.common.base.Preconditions; public class ExternalCompaction_1_IT extends SharedMiniClusterBase { + private static ServiceLock testLock; public static class ExternalCompaction1Config implements MiniClusterConfigurationCallback { @Override @@ -124,6 +127,14 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS @BeforeAll public static void beforeTests() throws Exception { startMiniClusterWithConfig(new ExternalCompaction1Config()); + testLock = new TestLock().createTestLock(getCluster().getServerContext()); + } + + @AfterAll + public static void afterTests() throws Exception { + if (testLock != null) { + testLock.unlock(); + } } public static class TestFilter extends Filter { @@ -237,7 +248,7 @@ public void testExternalCompaction() throws Exception { public void testCompactionCommitAndDeadDetectionRoot() throws Exception { var ctx = getCluster().getServerContext(); FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, - ctx.getZooSession(), createDummyLockID(), null); + ctx.getZooSession(), testLock.getLockID(), null); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { var tableId = ctx.getTableId(AccumuloTable.ROOT.tableName()); @@ -256,7 +267,7 @@ public void testCompactionCommitAndDeadDetectionRoot() throws Exception { public void testCompactionCommitAndDeadDetectionMeta() throws Exception { var ctx = getCluster().getServerContext(); FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, - ctx.getZooSession(), createDummyLockID(), null); + ctx.getZooSession(), testLock.getLockID(), null); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { // Metadata table by default already has 2 tablets @@ -278,7 +289,8 @@ public void testCompactionCommitAndDeadDetectionUser() throws Exception { final String tableName = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore userFateStore = new UserFateStore<>(ctx, createDummyLockID(), null); + UserFateStore userFateStore = + new UserFateStore<>(ctx, AccumuloTable.FATE.tableName(), testLock.getLockID(), null); SortedSet splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); @@ -301,9 +313,11 @@ public void testCompactionCommitAndDeadDetectionAll() throws Exception { final String userTable = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore userFateStore = new UserFateStore<>(ctx, createDummyLockID(), null); - FateStore metaFateStore = new MetaFateStore<>( - ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooSession(), createDummyLockID(), null); + UserFateStore userFateStore = + new UserFateStore<>(ctx, AccumuloTable.FATE.tableName(), testLock.getLockID(), null); + FateStore metaFateStore = + new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooSession(), + testLock.getLockID(), null); SortedSet splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java index 71b198c0ac9..f15fb6caaa1 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java @@ -27,7 +27,8 @@ import org.apache.accumulo.core.fate.Repo; /** - * A FATE which performs the dead reservation cleanup with a much shorter delay between + * A FATE which performs the dead reservation cleanup with a much shorter delay between. Useful for + * shortening test times for tests that are waiting for a cleanup to occur. */ public class FastFate extends Fate { @@ -38,6 +39,6 @@ public FastFate(T environment, FateStore store, boolean runDeadResCleaner, @Override public Duration getDeadResCleanupDelay() { - return Duration.ofSeconds(15); + return Duration.ofSeconds(5); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index b1fce72f834..b0f3d7ffeae 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; @@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.io.IOException; import java.lang.reflect.Method; import java.time.Duration; import java.util.ArrayList; @@ -50,8 +51,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.conf.ConfigurationCopy; -import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.Fate; @@ -64,39 +64,34 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.Admin; import org.apache.accumulo.server.util.fateCommand.FateSummaryReport; import org.apache.accumulo.server.util.fateCommand.FateTxnDetails; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestRepo; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.accumulo.test.functional.ReadWriteIT; import org.apache.accumulo.test.functional.SlowIterator; import org.apache.accumulo.test.util.Wait; -import org.apache.hadoop.conf.Configuration; import org.easymock.EasyMock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public abstract class FateOpsCommandsIT extends ConfigurableMacBase - implements FateTestRunner { + implements FateTestRunner { @Override protected Duration defaultTimeout() { return Duration.ofMinutes(3); } - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - // Used for tests that shutdown the manager so the sleep time after shutdown isn't too long - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s"); - } - @BeforeEach - public void shutdownCompactor() throws Exception { + public void beforeEachSetup() throws Exception { // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes // this issue. @@ -110,10 +105,10 @@ public void testFateSummaryCommand() throws Exception { executeTest(this::testFateSummaryCommand); } - protected void testFateSummaryCommand(FateStore store, ServerContext sctx) + protected void testFateSummaryCommand(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // validate blank report, no transactions have started ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); @@ -298,7 +293,7 @@ protected void testFateSummaryCommand(FateStore store, ServerContext sc validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); } - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test @@ -306,10 +301,10 @@ public void testFateSummaryCommandPlainText() throws Exception { executeTest(this::testFateSummaryCommandPlainText); } - protected void testFateSummaryCommandPlainText(FateStore store, ServerContext sctx) + protected void testFateSummaryCommandPlainText(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -326,7 +321,7 @@ protected void testFateSummaryCommandPlainText(FateStore store, ServerC "Fate ID Filters: [" + fateId2.canonical() + ", " + fateId1.canonical() + "]")); assertTrue(result.contains("Instance Types Filters: [" + store.type().name() + "]")); - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test @@ -334,10 +329,10 @@ public void testFatePrintCommand() throws Exception { executeTest(this::testFatePrintCommand); } - protected void testFatePrintCommand(FateStore store, ServerContext sctx) + protected void testFatePrintCommand(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // validate no transactions ProcessInfo p = getCluster().exec(Admin.class, "fate", "--print"); @@ -428,7 +423,7 @@ protected void testFatePrintCommand(FateStore store, ServerContext sctx assertTrue(fateIdsFromResult.isEmpty()); } - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test @@ -436,7 +431,7 @@ public void testTransactionNameAndStep() throws Exception { executeTest(this::testTransactionNameAndStep); } - protected void testTransactionNameAndStep(FateStore store, ServerContext sctx) + protected void testTransactionNameAndStep(FateStore store, ServerContext sctx) throws Exception { // Since the other tests just use NEW transactions for simplicity, there are some fields of the // summary and print outputs which are null and not tested for (transaction name and transaction @@ -504,10 +499,10 @@ public void testFateCancelCommand() throws Exception { executeTest(this::testFateCancelCommand); } - protected void testFateCancelCommand(FateStore store, ServerContext sctx) + protected void testFateCancelCommand(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -529,18 +524,19 @@ protected void testFateCancelCommand(FateStore store, ServerContext sct assertEquals(Map.of(fateId1.canonical(), "FAILED", fateId2.canonical(), "NEW"), fateIdsFromSummary); - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test - public void testFateFailCommand() throws Exception { - executeTest(this::testFateFailCommand); + public void testFateFailCommandTimeout() throws Exception { + stopManagerAndExecuteTest(this::testFateFailCommandTimeout); } - protected void testFateFailCommand(FateStore store, ServerContext sctx) + protected void testFateFailCommandTimeout(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + LatchTestEnv env = new LatchTestEnv(); + FastFate fate = initFateWithDeadResCleaner(store, env); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -551,19 +547,50 @@ protected void testFateFailCommand(FateStore store, ServerContext sctx) assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Attempt to --fail the transaction. Should not work as the Manager is still up + // Seed the transaction with the latch repo, so we can have an IN_PROGRESS transaction + fate.seedTransaction(TEST_FATE_OP, fateId1, new LatchTestRepo(), true, "test"); + // Wait for 'fate' to reserve fateId1 (will be IN_PROGRESS on fateId1) + Wait.waitFor(() -> env.numWorkers.get() == 1); + + // Try to fail fateId1 + // This should not work as it is already reserved and being worked on by our running FATE + // ('fate'). Admin should try to reserve it for a bit, but should fail and exit ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); - assertEquals(1, p.getProcess().waitFor()); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Could not fail " + fateId1 + " in a reasonable time")); fateIdsFromSummary = getFateIdsFromSummary(); - assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + assertEquals(Map.of(fateId1.canonical(), "IN_PROGRESS", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Stop MANAGER so --fail can be called - getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); - Thread.sleep(20_000); + // Finish work and shutdown + env.workersLatch.countDown(); + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFateFailCommandSuccess() throws Exception { + executeTest(this::testFateFailCommandSuccess); + } + + protected void testFateFailCommandSuccess(FateStore store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate fate = initFateNoDeadResCleaner(store); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); - // Fail the first transaction and ensure that it was failed - p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); + // Try to fail fateId1 + // This should work since nothing has fateId1 reserved (it is NEW) + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); assertEquals(0, p.getProcess().waitFor()); String result = p.readStdOut(); @@ -574,18 +601,19 @@ protected void testFateFailCommand(FateStore store, ServerContext sctx) || fateIdsFromSummary .equals(Map.of(fateId1.canonical(), "FAILED", fateId2.canonical(), "NEW"))); - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test - public void testFateDeleteCommand() throws Exception { - executeTest(this::testFateDeleteCommand); + public void testFateDeleteCommandTimeout() throws Exception { + stopManagerAndExecuteTest(this::testFateDeleteCommandTimeout); } - protected void testFateDeleteCommand(FateStore store, ServerContext sctx) + protected void testFateDeleteCommandTimeout(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + LatchTestEnv env = new LatchTestEnv(); + FastFate fate = initFateWithDeadResCleaner(store, env); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -596,19 +624,50 @@ protected void testFateDeleteCommand(FateStore store, ServerContext sct assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Attempt to --delete the transaction. Should not work as the Manager is still up + // Seed the transaction with the latch repo, so we can have an IN_PROGRESS transaction + fate.seedTransaction(TEST_FATE_OP, fateId1, new LatchTestRepo(), true, "test"); + // Wait for 'fate' to reserve fateId1 (will be IN_PROGRESS on fateId1) + Wait.waitFor(() -> env.numWorkers.get() == 1); + + // Try to delete fateId1 + // This should not work as it is already reserved and being worked on by our running FATE + // ('fate'). Admin should try to reserve it for a bit, but should fail and exit ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); - assertEquals(1, p.getProcess().waitFor()); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Could not delete " + fateId1 + " in a reasonable time")); fateIdsFromSummary = getFateIdsFromSummary(); - assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + assertEquals(Map.of(fateId1.canonical(), "IN_PROGRESS", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Stop MANAGER so --delete can be called - getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); - Thread.sleep(20_000); + // Finish work and shutdown + env.workersLatch.countDown(); + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFateDeleteCommandSuccess() throws Exception { + executeTest(this::testFateDeleteCommandSuccess); + } + + protected void testFateDeleteCommandSuccess(FateStore store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate fate = initFateNoDeadResCleaner(store); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); - // Delete the first transaction and ensure that it was deleted - p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); + // Check that summary output lists both the transactions with a NEW status + Map fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Try to delete fateId1 + // This should work since nothing has fateId1 reserved (it is NEW) + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); assertEquals(0, p.getProcess().waitFor()); String result = p.readStdOut(); @@ -616,7 +675,7 @@ protected void testFateDeleteCommand(FateStore store, ServerContext sct fateIdsFromSummary = getFateIdsFromSummary(); assertEquals(Map.of(fateId2.canonical(), "NEW"), fateIdsFromSummary); - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test @@ -624,7 +683,7 @@ public void testFatePrintAndSummaryCommandsWithInProgressTxns() throws Exception executeTest(this::testFatePrintAndSummaryCommandsWithInProgressTxns); } - protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore store, + protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore store, ServerContext sctx) throws Exception { // This test was written for an issue with the 'admin fate --print' and 'admin fate --summary' // commands where transactions could complete mid-print causing the command to fail. These @@ -632,20 +691,20 @@ protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore mockedStore; + FateStore mockedStore; // This error was occurring in AdminUtil.getTransactionStatus(), so we will test this method. if (store.type().equals(FateInstanceType.USER)) { Method listMethod = UserFateStore.class.getMethod("list"); mockedStore = EasyMock.createMockBuilder(UserFateStore.class) - .withConstructor(ClientContext.class, ZooUtil.LockID.class, Predicate.class) - .withArgs(sctx, createDummyLockID(), null).addMockedMethod(listMethod).createMock(); + .withConstructor(ClientContext.class, String.class, ZooUtil.LockID.class, Predicate.class) + .withArgs(sctx, AccumuloTable.FATE.tableName(), null, null).addMockedMethod(listMethod) + .createMock(); } else { Method listMethod = MetaFateStore.class.getMethod("list"); mockedStore = EasyMock.createMockBuilder(MetaFateStore.class) .withConstructor(String.class, ZooSession.class, ZooUtil.LockID.class, Predicate.class) - .withArgs(sctx.getZooKeeperRoot() + Constants.ZFATE, sctx.getZooSession(), - createDummyLockID(), null) + .withArgs(sctx.getZooKeeperRoot() + Constants.ZFATE, sctx.getZooSession(), null, null) .addMockedMethod(listMethod).createMock(); } @@ -780,11 +839,17 @@ private void validateFateDetails(Set details, int expDetailsSize } } - private Fate initializeFate(FateStore store) { - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - return new Fate<>(new TestEnv(), store, false, Object::toString, config); + protected FastFate initFateWithDeadResCleaner(FateStore store, + LatchTestEnv env) { + // Using FastFate so the cleanup will run often. This ensures that the cleanup will run when + // there are reservations present and that the cleanup will not unexpectedly delete these live + // reservations + return new FastFate<>(env, store, true, Object::toString, DefaultConfiguration.getInstance()); + } + + protected Fate initFateNoDeadResCleaner(FateStore store) { + return new Fate<>(new LatchTestEnv(), store, false, Object::toString, + DefaultConfiguration.getInstance()); } private boolean wordIsTStatus(String word) { @@ -795,4 +860,19 @@ private boolean wordIsTStatus(String word) { } return true; } + + /** + * Stop the MANAGER. For some of our tests, we want to be able to seed transactions with our own + * test repos. We want our fate to reserve these transactions (and not the real fates running in + * the Manager as that will lead to exceptions since the real fates wouldn't be able to handle our + * test repos). So, we essentially have the fates created here acting as the real fates: they have + * the same threads running that the real fates would, use a fate store with a ZK lock, use the + * same locations to store fate data that the Manager does, and are running in a separate process + * from the Admin process. Note that we cannot simply use different locations for our fate data + * from Manager to keep our test env separate from Manager. Admin uses the real fate data + * locations, so our test must also use the real locations. + */ + protected void stopManager() throws IOException { + getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java b/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java index 244f7991116..dc9b7d22fc6 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java @@ -34,6 +34,11 @@ default void executeTest(FateTestExecutor testMethod) throws Exception { AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); } + default void stopManagerAndExecuteTest(FateTestExecutor testMethod) + throws Exception { + throw new UnsupportedOperationException("Not implemented"); + } + interface FateTestExecutor { void execute(FateStore store, ServerContext sctx) throws Exception; } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index 9dd65770183..f4eb390701e 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -342,8 +342,8 @@ private void testDeadReservationsCleanup(TestStoreFactory testStor // Create the new Fate/start the Fate threads (the work finder and the workers). // Don't run another dead reservation cleaner since we already have one running from fate1. - FastFate fate2 = new FastFate<>(testEnv2, store2, false, Object::toString, - DefaultConfiguration.getInstance()); + Fate fate2 = + new Fate<>(testEnv2, store2, false, Object::toString, DefaultConfiguration.getInstance()); // Wait for the "dead" reservations to be deleted and picked up again (reserved using // fate2/store2/lock2 now). @@ -462,5 +462,5 @@ protected interface MultipleStoresTestExecutor void execute(TestStoreFactory fateStoreFactory) throws Exception; } - protected static class MultipleStoresTestEnv {} + protected static class MultipleStoresTestEnv extends FateTestRunner.TestEnv {} } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/TestLock.java b/test/src/main/java/org/apache/accumulo/test/fate/TestLock.java new file mode 100644 index 00000000000..733b21379dd --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/TestLock.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.server.ServerContext; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestLock { + private static final Logger log = LoggerFactory.getLogger(TestLock.class); + final CountDownLatch lockAcquiredLatch; + + public TestLock() { + this.lockAcquiredLatch = new CountDownLatch(1); + } + + /** + * Used to create a dummy lock id to be passed in the creation of a {@link UserFateStore} or a + * {@link MetaFateStore}. Useful as a quicker and simpler alternative to + * {@link TestLock#createTestLock(ServerContext)} for tests where reserving transactions is needed + * AND the reservations for the test will be stored in a different location from the Managers fate + * stores. Can always use {@link TestLock#createTestLock(ServerContext)} to be safe if unsure + * which to use. + */ + public static ZooUtil.LockID createDummyLockID() { + return new ZooUtil.LockID("/path", "node", 123); + } + + /** + * Used to create a real lock (one held in ZK) to be passed in the creation of a + * {@link UserFateStore} or a {@link MetaFateStore}. Useful for tests where reserving transactions + * is needed AND the reservations for the test will be stored in the same location as the Managers + * fate stores. This is needed so the Manager will recognize and not delete these reservations. + * See similar {@link TestLock#createDummyLockID()} + */ + public ServiceLock createTestLock(ServerContext context) throws InterruptedException { + var zk = context.getZooSession(); + UUID uuid = UUID.randomUUID(); + ServiceLockPaths.ServiceLockPath slp = context.getServerPaths().createTestLockPath(); + ServiceLock lock = new ServiceLock(zk, slp, uuid); + TestLockWatcher lw = new TestLockWatcher(); + ServiceLockData.ServiceDescriptors descriptors = new ServiceLockData.ServiceDescriptors(); + descriptors + .addService(new ServiceLockData.ServiceDescriptor(uuid, ServiceLockData.ThriftService.NONE, + "fake_test_host", Constants.DEFAULT_RESOURCE_GROUP_NAME)); + ServiceLockData sld = new ServiceLockData(descriptors); + String lockPath = slp.toString(); + String parentLockPath = lockPath.substring(0, lockPath.lastIndexOf("/")); + + try { + var zrw = zk.asReaderWriter(); + zrw.putPersistentData(parentLockPath, new byte[0], NodeExistsPolicy.SKIP); + zrw.putPersistentData(lockPath, new byte[0], NodeExistsPolicy.SKIP); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error creating path in ZooKeeper", e); + } + + lock.lock(lw, sld); + lockAcquiredLatch.await(); + + return lock; + } + + class TestLockWatcher implements ServiceLock.AccumuloLockWatcher { + + @Override + public void lostLock(ServiceLock.LockLossReason reason) { + log.warn("Lost lock: " + reason.toString()); + } + + @Override + public void unableToMonitorLockNode(Exception e) { + log.warn("Unable to monitor lock: " + e.getMessage()); + } + + @Override + public void acquiredLock() { + lockAcquiredLatch.countDown(); + log.debug("Acquired ZooKeeper lock for test"); + } + + @Override + public void failedToAcquireLock(Exception e) { + log.warn("Failed to acquire ZooKeeper lock for test, msg: " + e.getMessage()); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java index 06e1b43031d..5c7a0c26fb1 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java index 3e2cd3a2391..e24fc0af5ba 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import java.util.UUID; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java index 937a675f9c4..7009e55dd28 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java @@ -18,21 +18,58 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import java.util.function.Predicate; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateOpsCommandsIT; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv; +import org.apache.accumulo.test.fate.TestLock; public class MetaFateOpsCommandsIT extends FateOpsCommandsIT { + /** + * This should be used for tests that will not seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateNoDeadResCleaner(FateStore)} + */ @Override - public void executeTest(FateTestExecutor testMethod, int maxDeferred, + public void executeTest(FateTestExecutor testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { - ServerContext sctx = getCluster().getServerContext(); - String path = sctx.getZooKeeperRoot() + Constants.ZFATE; - var zk = sctx.getZooSession(); - testMethod.execute(new MetaFateStore<>(path, zk, createDummyLockID(), null), sctx); + ServerContext context = getCluster().getServerContext(); + String path = context.getZooKeeperRoot() + Constants.ZFATE; + var zk = context.getZooSession(); + // test should not be reserving txns or checking reservations, so null lockID and isLockHeld + testMethod.execute(new MetaFateStore<>(path, zk, null, null), context); + } + + /** + * This should be used for tests that will seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateWithDeadResCleaner(FateStore, LatchTestEnv)} + */ + @Override + public void stopManagerAndExecuteTest(FateTestExecutor testMethod) + throws Exception { + stopManager(); + ServerContext context = getCluster().getServerContext(); + String path = context.getZooKeeperRoot() + Constants.ZFATE; + var zk = context.getZooSession(); + ServiceLock testLock = null; + try { + testLock = new TestLock().createTestLock(context); + ZooUtil.LockID lockID = testLock.getLockID(); + Predicate isLockHeld = + lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); + testMethod.execute(new MetaFateStore<>(path, zk, lockID, isLockHeld), context); + } finally { + if (testLock != null) { + testLock.unlock(); + } + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java index d6e1410b7d5..e5165d982ab 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java @@ -18,9 +18,10 @@ */ package org.apache.accumulo.test.fate.meta; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; + import java.io.File; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.test.fate.FateStatusEnforcementIT; import org.apache.accumulo.test.fate.FateStoreUtil; @@ -46,7 +47,7 @@ public static void afterAllTeardown() throws Exception { @BeforeEach public void beforeEachSetup() throws Exception { store = new MetaFateStore<>(FateStoreUtil.MetaFateZKSetup.getZkFatePath(), - FateStoreUtil.MetaFateZKSetup.getZk(), AbstractFateStore.createDummyLockID(), null); + FateStoreUtil.MetaFateZKSetup.getZk(), createDummyLockID(), null); fateId = store.create(); txStore = store.reserve(fateId); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java index f8e89b101c3..5bb43d4b8ca 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java index 014b6c97bcb..26b8103e4d6 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java index de30125e5df..3a0aaeecd5f 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java index 3fe3192e6ba..92f5fae847f 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java @@ -18,18 +18,56 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import java.util.function.Predicate; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.test.fate.FateOpsCommandsIT; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv; +import org.apache.accumulo.test.fate.TestLock; public class UserFateOpsCommandsIT extends FateOpsCommandsIT { + /** + * This should be used for tests that will not seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateNoDeadResCleaner(FateStore)} + */ @Override - public void executeTest(FateTestExecutor testMethod, int maxDeferred, + public void executeTest(FateTestExecutor testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { - testMethod.execute( - new UserFateStore<>(getCluster().getServerContext(), createDummyLockID(), null), - getCluster().getServerContext()); + var context = getCluster().getServerContext(); + // the test should not be reserving or checking reservations, so null lockID and isLockHeld + testMethod.execute(new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null), + context); + } + + /** + * This should be used for tests that will seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateWithDeadResCleaner(FateStore, LatchTestEnv)} + */ + @Override + public void stopManagerAndExecuteTest(FateTestExecutor testMethod) + throws Exception { + stopManager(); + var context = getCluster().getServerContext(); + ServiceLock testLock = null; + try { + testLock = new TestLock().createTestLock(context); + ZooUtil.LockID lockID = testLock.getLockID(); + Predicate isLockHeld = + lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); + testMethod.execute( + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), lockID, isLockHeld), + context); + } finally { + if (testLock != null) { + testLock.unlock(); + } + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java index 22ecb9fe6e2..904882fcf29 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java @@ -19,10 +19,10 @@ package org.apache.accumulo.test.fate.user; import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.test.fate.FateStatusEnforcementIT; @@ -50,7 +50,7 @@ public void beforeEachSetup() throws Exception { client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); table = getUniqueNames(1)[0]; createFateTable(client, table); - store = new UserFateStore<>(client, table, AbstractFateStore.createDummyLockID(), null); + store = new UserFateStore<>(client, table, createDummyLockID(), null); fateId = store.create(); txStore = store.reserve(fateId); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java index 17cc06a5ac2..71af0824961 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java @@ -18,9 +18,9 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId; import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.BatchWriter; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 33d1bd33187..515f4a62133 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -21,7 +21,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -56,6 +55,7 @@ import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; @@ -259,17 +259,18 @@ public void getFateStatus() { InstanceId instanceId = context.getInstanceID(); var zk = context.getZooSession(); - MetaFateStore mfs = new MetaFateStore<>( - ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, createDummyLockID(), null); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); + MetaFateStore readOnlyMFS = + new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); var lockPath = context.getServerPaths().createTableLocksPath(tableId.toString()); - Map> fateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); + Map> readOnlyFateStores = + Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); - withLocks = admin.getStatus(fateStores, zk, lockPath, null, null, null); + withLocks = admin.getStatus(readOnlyFateStores, zk, lockPath, null, null, null); // call method that does not use locks. - noLocks = admin.getTransactionStatus(fateStores, null, null, null); + noLocks = admin.getTransactionStatus(readOnlyFateStores, null, null, null); // no zk exception, no need to retry break; @@ -352,10 +353,11 @@ private boolean lookupFateInZookeeper(final String tableName) throws KeeperExcep InstanceId instanceId = context.getInstanceID(); var zk = context.getZooSession(); - MetaFateStore mfs = new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, - zk, createDummyLockID(), null); + MetaFateStore readOnlyMFS = + new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, null, null); var lockPath = context.getServerPaths().createTableLocksPath(tableId.toString()); - AdminUtil.FateStatus fateStatus = admin.getStatus(mfs, zk, lockPath, null, null, null); + AdminUtil.FateStatus fateStatus = + admin.getStatus(readOnlyMFS, zk, lockPath, null, null, null); log.trace("current fates: {}", fateStatus.getTransactions().size()); @@ -382,8 +384,9 @@ private boolean lookupFateInAccumulo(final String tableName) throws KeeperExcept log.trace("tid: {}", tableId); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); - AdminUtil.FateStatus fateStatus = admin.getStatus(ufs, null, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); + AdminUtil.FateStatus fateStatus = admin.getStatus(readOnlyUFS, null, null, null); log.trace("current fates: {}", fateStatus.getTransactions().size()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index ee4b187a61f..a910a040a78 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -19,7 +19,6 @@ package org.apache.accumulo.test.functional; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -233,13 +232,14 @@ private static FateStatus getFateStatus(AccumuloCluster cluster) { AdminUtil admin = new AdminUtil<>(false); ServerContext context = cluster.getServerContext(); var zk = context.getZooSession(); - MetaFateStore mfs = new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, - zk, createDummyLockID(), null); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); - Map> fateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); + MetaFateStore readOnlyMFS = + new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, zk, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); + Map> readOnlyFateStores = + Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); var lockPath = context.getServerPaths().createTableLocksPath(); - return admin.getStatus(fateStores, zk, lockPath, null, null, null); + return admin.getStatus(readOnlyFateStores, zk, lockPath, null, null, null); } catch (KeeperException | InterruptedException e) { throw new RuntimeException(e); }