diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java index 6270fcb6c1b..32e70834031 100644 --- a/core/src/main/java/org/apache/accumulo/core/Constants.java +++ b/core/src/main/java/org/apache/accumulo/core/Constants.java @@ -88,6 +88,8 @@ public class Constants { public static final String ZTABLE_LOCKS = "/table_locks"; public static final String ZMINI_LOCK = "/mini"; + public static final String ZADMIN_LOCK = "/admin/lock"; + public static final String ZTEST_LOCK = "/test/lock"; public static final String BULK_PREFIX = "b-"; public static final String BULK_RENAME_FILE = "renames.json"; diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index c5bcdf4d452..555785f5806 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -465,14 +465,12 @@ public enum Property { "The number of threads used to seed fate split task, the actual split work is done by fate" + " threads.", "4.0.0"), - - MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE( - "manager.compaction.major.service.queue.initial.size", "10000", PropertyType.COUNT, - "The initial size of each resource groups compaction job priority queue.", "4.0.0"), - MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR( - "manager.compaction.major.service.queue.size.factor", "3.0", PropertyType.FRACTION, - "The dynamic resizing of the compaction job priority queue is based on" - + " the number of compactors for the group multiplied by this factor.", + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size", + "1M", PropertyType.MEMORY, + "The data size of each resource groups compaction job priority queue. The memory size of " + + "each compaction job is estimated and the sum of these sizes per resource group will not " + + "exceed this setting. When the size is exceeded the lowest priority jobs are dropped as " + + "needed.", "4.0.0"), SPLIT_PREFIX("split.", null, PropertyType.PREFIX, "System wide properties related to splitting tablets.", "3.1.0"), @@ -1460,7 +1458,7 @@ public static boolean isValidTablePropertyKey(String key) { RPC_MAX_MESSAGE_SIZE, // compaction coordiantor properties - MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE, + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, // block cache options GENERAL_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 749a3260649..9b7d7965d61 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -101,10 +101,6 @@ public FateId newRandomId(FateInstanceType instanceType) { // Keeps track of the number of concurrent callers to waitForStatusChange() private final AtomicInteger concurrentStatusChangeCallers = new AtomicInteger(0); - public AbstractFateStore() { - this(createDummyLockID(), null, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); - } - public AbstractFateStore(ZooUtil.LockID lockID, Predicate isLockHeld) { this(lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } @@ -114,9 +110,7 @@ public AbstractFateStore(ZooUtil.LockID lockID, Predicate isLock this.maxDeferred = maxDeferred; this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator); this.deferred = Collections.synchronizedMap(new HashMap<>()); - this.lockID = Objects.requireNonNull(lockID); - // If the store is used for a Fate which runs a dead reservation cleaner, - // this should be non-null, otherwise null is fine + this.lockID = lockID; this.isLockHeld = isLockHeld; } @@ -291,6 +285,10 @@ protected void verifyFateKey(FateId fateId, Optional fateKeySeen, "Collision detected for fate id " + fateId); } + protected void verifyLock(ZooUtil.LockID lockID, FateId fateId) { + Preconditions.checkState(lockID != null, "Tried to reserve " + fateId + " with null lockID"); + } + protected abstract Stream getTransactions(EnumSet statuses); protected abstract TStatus _getStatus(FateId fateId); @@ -450,14 +448,4 @@ protected static Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { throw new IllegalStateException("Bad node data " + txInfo); } } - - /** - * this is a temporary method used to create a dummy lock when using a FateStore outside the - * context of a Manager (one example is testing) so reservations can still be made. - * - * @return a dummy {@link ZooUtil.LockID} - */ - public static ZooUtil.LockID createDummyLockID() { - return new ZooUtil.LockID("/path", "node", 123); - } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 54fb62e6a5a..c54ee85d1a8 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Stream; @@ -41,15 +42,13 @@ import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; +import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; - /** * A utility to administer FATE operations */ @@ -203,18 +202,19 @@ public Map> getDanglingWaitingLocks() { * instance type. This method does not process lock information, if lock information is desired, * use {@link #getStatus(ReadOnlyFateStore, ZooSession, ServiceLockPath, Set, EnumSet, EnumSet)} * - * @param fateStores read-only fate stores + * @param readOnlyFateStores read-only fate stores * @param fateIdFilter filter results to include only provided fate transaction ids * @param statusFilter filter results to include only provided status types * @param typesFilter filter results to include only provided fate instance types * @return list of FATE transactions that match filter criteria */ public List getTransactionStatus( - Map> fateStores, Set fateIdFilter, + Map> readOnlyFateStores, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) { - FateStatus status = getTransactionStatus(fateStores, fateIdFilter, statusFilter, typesFilter, - Collections.>emptyMap(), Collections.>emptyMap()); + FateStatus status = getTransactionStatus(readOnlyFateStores, fateIdFilter, statusFilter, + typesFilter, Collections.>emptyMap(), + Collections.>emptyMap()); return status.getTransactions(); } @@ -223,7 +223,7 @@ public List getTransactionStatus( * Get the FATE transaction status and lock information stored in zookeeper, optionally filtered * by fate id, status, and fate instance type * - * @param mfs read-only MetaFateStore + * @param readOnlyMFS read-only MetaFateStore * @param zk zookeeper reader. * @param lockPath the zookeeper path for locks * @param fateIdFilter filter results to include only provided fate transaction ids @@ -233,36 +233,37 @@ public List getTransactionStatus( * @throws KeeperException if zookeeper exception occurs * @throws InterruptedException if process is interrupted. */ - public FateStatus getStatus(ReadOnlyFateStore mfs, ZooSession zk, ServiceLockPath lockPath, - Set fateIdFilter, EnumSet statusFilter, + public FateStatus getStatus(ReadOnlyFateStore readOnlyMFS, ZooSession zk, + ServiceLockPath lockPath, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) throws KeeperException, InterruptedException { Map> heldLocks = new HashMap<>(); Map> waitingLocks = new HashMap<>(); findLocks(zk, lockPath, heldLocks, waitingLocks); - return getTransactionStatus(Map.of(FateInstanceType.META, mfs), fateIdFilter, statusFilter, - typesFilter, heldLocks, waitingLocks); + return getTransactionStatus(Map.of(FateInstanceType.META, readOnlyMFS), fateIdFilter, + statusFilter, typesFilter, heldLocks, waitingLocks); } - public FateStatus getStatus(ReadOnlyFateStore ufs, Set fateIdFilter, + public FateStatus getStatus(ReadOnlyFateStore readOnlyUFS, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) throws KeeperException, InterruptedException { - return getTransactionStatus(Map.of(FateInstanceType.USER, ufs), fateIdFilter, statusFilter, - typesFilter, new HashMap<>(), new HashMap<>()); + return getTransactionStatus(Map.of(FateInstanceType.USER, readOnlyUFS), fateIdFilter, + statusFilter, typesFilter, new HashMap<>(), new HashMap<>()); } - public FateStatus getStatus(Map> fateStores, ZooSession zk, - ServiceLockPath lockPath, Set fateIdFilter, EnumSet statusFilter, - EnumSet typesFilter) throws KeeperException, InterruptedException { + public FateStatus getStatus(Map> readOnlyFateStores, + ZooSession zk, ServiceLockPath lockPath, Set fateIdFilter, + EnumSet statusFilter, EnumSet typesFilter) + throws KeeperException, InterruptedException { Map> heldLocks = new HashMap<>(); Map> waitingLocks = new HashMap<>(); findLocks(zk, lockPath, heldLocks, waitingLocks); - return getTransactionStatus(fateStores, fateIdFilter, statusFilter, typesFilter, heldLocks, - waitingLocks); + return getTransactionStatus(readOnlyFateStores, fateIdFilter, statusFilter, typesFilter, + heldLocks, waitingLocks); } /** @@ -341,7 +342,7 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath, /** * Returns fate status, possibly filtered * - * @param fateStores read-only access to populated transaction stores. + * @param readOnlyFateStores read-only access to populated transaction stores. * @param fateIdFilter Optional. List of transactions to filter results - if null, all * transactions are returned * @param statusFilter Optional. List of status types to filter results - if null, all @@ -353,12 +354,12 @@ private void findLocks(ZooSession zk, final ServiceLockPath lockPath, * @return current fate and lock status */ public static FateStatus getTransactionStatus( - Map> fateStores, Set fateIdFilter, + Map> readOnlyFateStores, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter, Map> heldLocks, Map> waitingLocks) { final List statuses = new ArrayList<>(); - fateStores.forEach((type, store) -> { + readOnlyFateStores.forEach((type, store) -> { try (Stream fateIds = store.list().map(FateIdStatus::getFateId)) { fateIds.forEach(fateId -> { @@ -413,17 +414,17 @@ private static boolean includeByInstanceType(FateInstanceType type, return typesFilter == null || typesFilter.isEmpty() || typesFilter.contains(type); } - public void printAll(Map> fateStores, ZooSession zk, + public void printAll(Map> readOnlyFateStores, ZooSession zk, ServiceLockPath tableLocksPath) throws KeeperException, InterruptedException { - print(fateStores, zk, tableLocksPath, new Formatter(System.out), null, null, null); + print(readOnlyFateStores, zk, tableLocksPath, new Formatter(System.out), null, null, null); } - public void print(Map> fateStores, ZooSession zk, + public void print(Map> readOnlyFateStores, ZooSession zk, ServiceLockPath tableLocksPath, Formatter fmt, Set fateIdFilter, EnumSet statusFilter, EnumSet typesFilter) throws KeeperException, InterruptedException { FateStatus fateStatus = - getStatus(fateStores, zk, tableLocksPath, fateIdFilter, statusFilter, typesFilter); + getStatus(readOnlyFateStores, zk, tableLocksPath, fateIdFilter, statusFilter, typesFilter); for (TransactionStatus txStatus : fateStatus.getTransactions()) { fmt.format( @@ -434,11 +435,7 @@ public void print(Map> fateStores, ZooSess fmt.format(" %s transactions", fateStatus.getTransactions().size()); } - public boolean prepDelete(Map> stores, ZooSession zk, - ServiceLockPath path, String fateIdStr) { - if (!checkGlobalLock(zk, path)) { - return false; - } + public boolean prepDelete(Map> stores, String fateIdStr) { FateId fateId; try { @@ -452,36 +449,37 @@ public boolean prepDelete(Map> stores, ZooSession // determine which store to use FateStore store = stores.get(fateId.getType()); - FateTxStore txStore = store.reserve(fateId); - try { - TStatus ts = txStore.getStatus(); - switch (ts) { - case UNKNOWN: - System.out.println("Invalid transaction ID: " + fateId); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - case FAILED: - case FAILED_IN_PROGRESS: - case SUCCESSFUL: - System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts); - txStore.forceDelete(); - state = true; - break; + Optional> opTxStore = tryReserve(store, fateId, "delete"); + if (opTxStore.isPresent()) { + var txStore = opTxStore.orElseThrow(); + + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.println("Invalid transaction ID: " + fateId); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + case FAILED: + case FAILED_IN_PROGRESS: + case SUCCESSFUL: + System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts); + txStore.forceDelete(); + state = true; + break; + } + } finally { + txStore.unreserve(Duration.ZERO); } - } finally { - txStore.unreserve(Duration.ZERO); } + return state; } - public boolean prepFail(Map> stores, ZooSession zk, - ServiceLockPath zLockManagerPath, String fateIdStr) { - if (!checkGlobalLock(zk, zLockManagerPath)) { - return false; - } + public boolean prepFail(Map> stores, String fateIdStr) { FateId fateId; try { @@ -495,39 +493,75 @@ public boolean prepFail(Map> stores, ZooSession zk // determine which store to use FateStore store = stores.get(fateId.getType()); - FateTxStore txStore = store.reserve(fateId); - try { - TStatus ts = txStore.getStatus(); - switch (ts) { - case UNKNOWN: - System.out.println("Invalid fate ID: " + fateId); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - System.out.printf("Failing transaction: %s (%s)%n", fateId, ts); - txStore.setStatus(TStatus.FAILED_IN_PROGRESS); - state = true; - break; - - case SUCCESSFUL: - System.out.printf("Transaction already completed: %s (%s)%n", fateId, ts); - break; - - case FAILED: - case FAILED_IN_PROGRESS: - System.out.printf("Transaction already failed: %s (%s)%n", fateId, ts); - state = true; - break; + Optional> opTxStore = tryReserve(store, fateId, "fail"); + if (opTxStore.isPresent()) { + var txStore = opTxStore.orElseThrow(); + + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.println("Invalid fate ID: " + fateId); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + System.out.printf("Failing transaction: %s (%s)%n", fateId, ts); + txStore.setStatus(TStatus.FAILED_IN_PROGRESS); + state = true; + break; + + case SUCCESSFUL: + System.out.printf("Transaction already completed: %s (%s)%n", fateId, ts); + break; + + case FAILED: + case FAILED_IN_PROGRESS: + System.out.printf("Transaction already failed: %s (%s)%n", fateId, ts); + state = true; + break; + } + } finally { + txStore.unreserve(Duration.ZERO); } - } finally { - txStore.unreserve(Duration.ZERO); } return state; } + /** + * Try to reserve the transaction for a minute. If it could not be reserved, return an empty + * optional + */ + private Optional> tryReserve(FateStore store, FateId fateId, String op) { + var retry = Retry.builder().maxRetriesWithinDuration(Duration.ofMinutes(1)) + .retryAfter(Duration.ofMillis(25)).incrementBy(Duration.ofMillis(25)) + .maxWait(Duration.ofSeconds(15)).backOffFactor(1.5).logInterval(Duration.ofSeconds(15)) + .createRetry(); + + Optional> reserveAttempt = store.tryReserve(fateId); + while (reserveAttempt.isEmpty() && retry.canRetry()) { + retry.useRetry(); + try { + retry.waitForNextAttempt(log, "Attempting to reserve " + fateId); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalArgumentException(e); + } + reserveAttempt = store.tryReserve(fateId); + } + if (reserveAttempt.isPresent()) { + retry.logCompletion(log, "Attempting to reserve " + fateId); + } else { + log.error("Could not {} {} in a reasonable time. This indicates the Manager is currently " + + "working on it. The Manager may need to be stopped and the command rerun to complete " + + "this.", op, fateId); + } + + return reserveAttempt; + } + public void deleteLocks(ZooSession zk, ServiceLockPath path, String fateIdStr) throws KeeperException, InterruptedException { var zrw = zk.asReaderWriter(); @@ -548,35 +582,4 @@ public void deleteLocks(ZooSession zk, ServiceLockPath path, String fateIdStr) } } } - - @SuppressFBWarnings(value = "DM_EXIT", - justification = "TODO - should probably avoid System.exit here; " - + "this code is used by the fate admin shell command") - public boolean checkGlobalLock(ZooSession zk, ServiceLockPath zLockManagerPath) { - try { - if (ServiceLock.getLockData(zk, zLockManagerPath).isPresent()) { - System.err.println("ERROR: Manager lock is held, not running"); - if (this.exitOnError) { - System.exit(1); - } else { - return false; - } - } - } catch (KeeperException e) { - System.err.println("ERROR: Could not read manager lock, not running " + e.getMessage()); - if (this.exitOnError) { - System.exit(1); - } else { - return false; - } - } catch (InterruptedException e) { - System.err.println("ERROR: Could not read manager lock, not running" + e.getMessage()); - if (this.exitOnError) { - System.exit(1); - } else { - return false; - } - } - return true; - } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 3d7c039e0f2..977adb2440f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -53,7 +53,6 @@ import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.UtilWaitThread; @@ -75,6 +74,17 @@ public class UserFateStore extends AbstractFateStore { private static final com.google.common.collect.Range REPO_RANGE = com.google.common.collect.Range.closed(1, MAX_REPOS); + /** + * Constructs a UserFateStore + * + * @param context the {@link ClientContext} + * @param tableName the name of the table which will store the Fate data + * @param lockID the {@link ZooUtil.LockID} held by the process creating this store. Should be + * null if this store will be used as read-only (will not be used to reserve transactions) + * @param isLockHeld the {@link Predicate} used to determine if the lockID is held or not at the + * time of invocation. If the store is used for a {@link Fate} which runs a dead + * reservation cleaner, this should be non-null, otherwise null is fine + */ public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID, Predicate isLockHeld) { this(context, tableName, lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); @@ -88,11 +98,6 @@ public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID loc this.tableName = Objects.requireNonNull(tableName); } - public UserFateStore(ClientContext context, ZooUtil.LockID lockID, - Predicate isLockHeld) { - this(context, AccumuloTable.FATE.tableName(), lockID, isLockHeld); - } - @Override public FateId create() { @@ -184,6 +189,7 @@ private boolean seedTransaction(Supplier> mutatorFactory, String @Override public Optional> tryReserve(FateId fateId) { + verifyLock(lockID, fateId); // Create a unique FateReservation for this reservation attempt FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java index 148292ed745..242e0b5e74f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java @@ -83,6 +83,17 @@ private String getTXPath(FateId fateId) { return path + "/tx_" + fateId.getTxUUIDStr(); } + /** + * Constructs a MetaFateStore + * + * @param path the path in ZK where the fate data will reside + * @param zk the {@link ZooSession} + * @param lockID the {@link ZooUtil.LockID} held by the process creating this store. Should be + * null if this store will be used as read-only (will not be used to reserve transactions) + * @param isLockHeld the {@link Predicate} used to determine if the lockID is held or not at the + * time of invocation. If the store is used for a {@link Fate} which runs a dead + * reservation cleaner, this should be non-null, otherwise null is fine + */ public MetaFateStore(String path, ZooSession zk, ZooUtil.LockID lockID, Predicate isLockHeld) throws KeeperException, InterruptedException { this(path, zk, lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); @@ -100,11 +111,6 @@ public MetaFateStore(String path, ZooSession zk, ZooUtil.LockID lockID, this.zrw.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); } - /** - * For testing only - */ - MetaFateStore() {} - @Override public FateId create() { while (true) { @@ -124,8 +130,9 @@ public FateId create() { } private Optional> createAndReserve(FateKey fateKey) { - final var reservation = FateReservation.from(lockID, UUID.randomUUID()); final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + verifyLock(lockID, fateId); + final var reservation = FateReservation.from(lockID, UUID.randomUUID()); try { byte[] newSerFateData = @@ -136,8 +143,7 @@ private Optional> createAndReserve(FateKey fateKey) { // node if it doesn't yet exist: // TStatus = TStatus.NEW, FateReservation = reservation, FateKey = fateKey // This might occur if there was a ZK server fault and the same write is running a - // 2nd - // time + // 2nd time // 2) The existing node for fateId has: // TStatus = TStatus.NEW, no FateReservation present, FateKey = fateKey // The fateId is NEW/unseeded and not reserved, so we can allow it to be reserved @@ -223,6 +229,7 @@ private void seedTransaction(Fate.FateOperation txName, Repo repo, boolean au @Override public Optional> tryReserve(FateId fateId) { + verifyLock(lockID, fateId); // uniquely identify this attempt to reserve the fate operation data FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java index a4934ae6eed..156b4f14a6b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java @@ -97,7 +97,7 @@ public String serialize(String root) { @Override public String toString() { - return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid); + return "path = " + path + " node = " + node + " eid = " + Long.toHexString(eid); } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java index 04c185e321d..3956510e5d5 100644 --- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java +++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java @@ -1294,24 +1294,32 @@ public Reader(CachableBlockFile.CachableBuilder b) throws IOException { this(new CachableBlockFile.Reader(b)); } - private void closeLocalityGroupReaders() { + private void closeLocalityGroupReaders(boolean ignoreIOExceptions) throws IOException { for (LocalityGroupReader lgr : currentReaders) { try { lgr.close(); } catch (IOException e) { - log.warn("Errored out attempting to close LocalityGroupReader.", e); + if (ignoreIOExceptions) { + log.warn("Errored out attempting to close LocalityGroupReader.", e); + } else { + throw e; + } } } } @Override - public void closeDeepCopies() { + public void closeDeepCopies() throws IOException { + closeDeepCopies(false); + } + + private void closeDeepCopies(boolean ignoreIOExceptions) throws IOException { if (deepCopy) { throw new IllegalStateException("Calling closeDeepCopies on a deep copy is not supported"); } for (Reader deepCopy : deepCopies) { - deepCopy.closeLocalityGroupReaders(); + deepCopy.closeLocalityGroupReaders(ignoreIOExceptions); } deepCopies.clear(); @@ -1323,8 +1331,9 @@ public void close() throws IOException { throw new IllegalStateException("Calling close on a deep copy is not supported"); } - closeDeepCopies(); - closeLocalityGroupReaders(); + // Closes as much as possible igoring and logging exceptions along the way + closeDeepCopies(true); + closeLocalityGroupReaders(true); if (sampleReaders != null) { for (LocalityGroupReader lgr : sampleReaders) { diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java index 4c82d085e59..8fd3f361dcc 100644 --- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java +++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLockPaths.java @@ -72,7 +72,8 @@ private ServiceLockPath(String root, String type) { this.type = requireNonNull(type); Preconditions.checkArgument(this.type.equals(Constants.ZGC_LOCK) || this.type.equals(Constants.ZMANAGER_LOCK) || this.type.equals(Constants.ZMONITOR_LOCK) - || this.type.equals(Constants.ZTABLE_LOCKS), "Unsupported type: " + type); + || this.type.equals(Constants.ZTABLE_LOCKS) || this.type.equals(Constants.ZADMIN_LOCK) + || this.type.equals(Constants.ZTEST_LOCK), "Unsupported type: " + type); // These server types support only one active instance, so they use a lock at // a known path, not the server's address. this.resourceGroup = null; @@ -203,6 +204,10 @@ private static String determineServerType(final String path) { return Constants.ZMONITOR_LOCK; } else if (path.contains(Constants.ZMINI_LOCK)) { return Constants.ZMINI_LOCK; + } else if (path.contains(Constants.ZADMIN_LOCK)) { + return Constants.ZADMIN_LOCK; + } else if (path.contains(Constants.ZTEST_LOCK)) { + return Constants.ZTEST_LOCK; } else if (path.contains(Constants.ZCOMPACTORS)) { return Constants.ZCOMPACTORS; } else if (path.contains(Constants.ZSSERVERS)) { @@ -230,6 +235,8 @@ public static ServiceLockPath parse(Optional serverType, String path) { case Constants.ZGC_LOCK: case Constants.ZMANAGER_LOCK: case Constants.ZMONITOR_LOCK: + case Constants.ZADMIN_LOCK: + case Constants.ZTEST_LOCK: return new ServiceLockPath(path.substring(0, path.indexOf(type)), type); default: { final String[] pathParts = path.replaceFirst("/", "").split("/"); @@ -299,6 +306,14 @@ public ServiceLockPath createDeadTabletServerPath(String resourceGroup, serverAddress.toString()); } + public ServiceLockPath createAdminLockPath() { + return new ServiceLockPath(zkRoot, Constants.ZADMIN_LOCK); + } + + public ServiceLockPath createTestLockPath() { + return new ServiceLockPath(zkRoot, Constants.ZTEST_LOCK); + } + public Set getCompactor(ResourceGroupPredicate resourceGroupPredicate, AddressSelector address, boolean withLock) { return get(Constants.ZCOMPACTORS, resourceGroupPredicate, address, withLock); diff --git a/minicluster/pom.xml b/minicluster/pom.xml index 50ca43cdc71..9904356bfff 100644 --- a/minicluster/pom.xml +++ b/minicluster/pom.xml @@ -96,10 +96,6 @@ org.apache.zookeeper zookeeper - - org.apache.zookeeper - zookeeper-jute - org.slf4j slf4j-api diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java index 4982f9683ae..6a86981def1 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java @@ -81,7 +81,7 @@ import org.apache.accumulo.core.data.InstanceId; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher; import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; @@ -121,7 +121,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -683,7 +682,7 @@ public void unableToMonitorLockNode(Exception e) { @Override public void acquiredLock() { - log.warn("Acquired ZK lock for MiniAccumuloClusterImpl"); + log.debug("Acquired ZK lock for MiniAccumuloClusterImpl"); lockAcquired.set(true); lockWatcherInvoked.countDown(); } @@ -723,14 +722,9 @@ public void failedToAcquireLock(Exception e) { String miniZDirPath = miniZInstancePath.substring(0, miniZInstancePath.indexOf("/" + miniUUID.toString())); try { - if (miniLockZk.exists(miniZDirPath, null) == null) { - miniLockZk.create(miniZDirPath, new byte[0], ZooUtil.PUBLIC, CreateMode.PERSISTENT); - log.info("Created: {}", miniZDirPath); - } - if (miniLockZk.exists(miniZInstancePath, null) == null) { - miniLockZk.create(miniZInstancePath, new byte[0], ZooUtil.PUBLIC, CreateMode.PERSISTENT); - log.info("Created: {}", miniZInstancePath); - } + var zrw = miniLockZk.asReaderWriter(); + zrw.putPersistentData(miniZDirPath, new byte[0], NodeExistsPolicy.SKIP); + zrw.putPersistentData(miniZInstancePath, new byte[0], NodeExistsPolicy.SKIP); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Error creating path in ZooKeeper", e); } diff --git a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java index 5b81fac4684..b5c6667519f 100644 --- a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java @@ -188,8 +188,8 @@ MiniAccumuloConfigImpl initialize() { mergeProp(Property.COMPACTOR_PORTSEARCH.getKey(), "true"); - mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getKey(), - Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE.getDefaultValue()); + mergeProp(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getKey(), + Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE.getDefaultValue()); mergeProp(Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getKey(), Property.COMPACTION_SERVICE_DEFAULT_PLANNER.getDefaultValue()); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index c1d2bf71081..bb350ec5326 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -20,7 +20,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import java.io.BufferedWriter; import java.io.File; @@ -42,6 +41,8 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -73,7 +74,9 @@ import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; import org.apache.accumulo.core.lock.ServiceLockPaths.ServiceLockPath; import org.apache.accumulo.core.manager.thrift.FateService; @@ -90,7 +93,9 @@ import org.apache.accumulo.core.singletons.SingletonManager.Mode; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.util.AddressUtil; +import org.apache.accumulo.core.util.Halt; import org.apache.accumulo.core.util.tables.TableMap; +import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.cli.ServerUtilOpts; import org.apache.accumulo.server.security.SecurityUtil; @@ -123,6 +128,7 @@ @AutoService(KeywordExecutable.class) public class Admin implements KeywordExecutable { private static final Logger log = LoggerFactory.getLogger(Admin.class); + private final CountDownLatch lockAcquiredLatch = new CountDownLatch(1); static class AdminOpts extends ServerUtilOpts { @Parameter(names = {"-f", "--force"}, @@ -330,11 +336,11 @@ static class FateOpsCommand { boolean cancel; @Parameter(names = {"-f", "--fail"}, - description = "... Transition FaTE transaction status to FAILED_IN_PROGRESS (requires Manager to be down)") + description = "... Transition FaTE transaction status to FAILED_IN_PROGRESS") boolean fail; @Parameter(names = {"-d", "--delete"}, - description = "... Delete FaTE transaction and its associated table locks (requires Manager to be down)") + description = "... Delete FaTE transaction and its associated table locks") boolean delete; @Parameter(names = {"-p", "--print", "-print", "-l", "--list", "-list"}, @@ -358,6 +364,36 @@ static class FateOpsCommand { List instanceTypes = new ArrayList<>(); } + class AdminLockWatcher implements ServiceLock.AccumuloLockWatcher { + @Override + public void lostLock(ServiceLock.LockLossReason reason) { + String msg = "Admin lost lock: " + reason.toString(); + if (reason == ServiceLock.LockLossReason.LOCK_DELETED) { + Halt.halt(msg, 0); + } else { + Halt.halt(msg, 1); + } + } + + @Override + public void unableToMonitorLockNode(Exception e) { + String msg = "Admin unable to monitor lock: " + e.getMessage(); + log.warn(msg); + Halt.halt(msg, 1); + } + + @Override + public void acquiredLock() { + lockAcquiredLatch.countDown(); + log.debug("Acquired ZooKeeper lock for Admin"); + } + + @Override + public void failedToAcquireLock(Exception e) { + log.warn("Failed to acquire ZooKeeper lock for Admin, msg: " + e.getMessage()); + } + } + public static void main(String[] args) { new Admin().execute(args); } @@ -886,50 +922,107 @@ private void executeFateOpsCommand(ServerContext context, FateOpsCommand fateOps AdminUtil admin = new AdminUtil<>(true); final String zkRoot = context.getZooKeeperRoot(); - var zLockManagerPath = context.getServerPaths().createManagerPath(); var zTableLocksPath = context.getServerPaths().createTableLocksPath(); String fateZkPath = zkRoot + Constants.ZFATE; var zk = context.getZooSession(); - MetaFateStore mfs = new MetaFateStore<>(fateZkPath, zk, createDummyLockID(), null); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); - Map> fateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); - Map> readOnlyFateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); - - if (fateOpsCommand.cancel) { - cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList); - } else if (fateOpsCommand.fail) { - for (String fateIdStr : fateOpsCommand.fateIdList) { - if (!admin.prepFail(fateStores, zk, zLockManagerPath, fateIdStr)) { - throw new AccumuloException("Could not fail transaction: " + fateIdStr); + ServiceLock adminLock = null; + Map> fateStores; + Map> readOnlyFateStores = null; + + try { + if (fateOpsCommand.cancel) { + cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList); + } else if (fateOpsCommand.fail) { + adminLock = createAdminLock(context); + fateStores = createFateStores(context, zk, fateZkPath, adminLock); + for (String fateIdStr : fateOpsCommand.fateIdList) { + if (!admin.prepFail(fateStores, fateIdStr)) { + throw new AccumuloException("Could not fail transaction: " + fateIdStr); + } + } + } else if (fateOpsCommand.delete) { + adminLock = createAdminLock(context); + fateStores = createFateStores(context, zk, fateZkPath, adminLock); + for (String fateIdStr : fateOpsCommand.fateIdList) { + if (!admin.prepDelete(fateStores, fateIdStr)) { + throw new AccumuloException("Could not delete transaction: " + fateIdStr); + } + admin.deleteLocks(zk, zTableLocksPath, fateIdStr); } } - } else if (fateOpsCommand.delete) { - for (String fateIdStr : fateOpsCommand.fateIdList) { - if (!admin.prepDelete(fateStores, zk, zLockManagerPath, fateIdStr)) { - throw new AccumuloException("Could not delete transaction: " + fateIdStr); + + if (fateOpsCommand.print) { + final Set fateIdFilter = new TreeSet<>(); + fateOpsCommand.fateIdList.forEach(fateIdStr -> fateIdFilter.add(FateId.from(fateIdStr))); + EnumSet statusFilter = + getCmdLineStatusFilters(fateOpsCommand.states); + EnumSet typesFilter = + getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes); + readOnlyFateStores = createReadOnlyFateStores(context, zk, fateZkPath); + admin.print(readOnlyFateStores, zk, zTableLocksPath, new Formatter(System.out), + fateIdFilter, statusFilter, typesFilter); + // print line break at the end + System.out.println(); + } + + if (fateOpsCommand.summarize) { + if (readOnlyFateStores == null) { + readOnlyFateStores = createReadOnlyFateStores(context, zk, fateZkPath); } - admin.deleteLocks(zk, zTableLocksPath, fateIdStr); + summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores, zTableLocksPath); + } + } finally { + if (adminLock != null) { + adminLock.unlock(); } } + } - if (fateOpsCommand.print) { - final Set fateIdFilter = new TreeSet<>(); - fateOpsCommand.fateIdList.forEach(fateIdStr -> fateIdFilter.add(FateId.from(fateIdStr))); - EnumSet statusFilter = - getCmdLineStatusFilters(fateOpsCommand.states); - EnumSet typesFilter = - getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes); - admin.print(readOnlyFateStores, zk, zTableLocksPath, new Formatter(System.out), fateIdFilter, - statusFilter, typesFilter); - // print line break at the end - System.out.println(); - } + private Map> createFateStores(ServerContext context, + ZooSession zk, String fateZkPath, ServiceLock adminLock) + throws InterruptedException, KeeperException { + var lockId = adminLock.getLockID(); + MetaFateStore mfs = new MetaFateStore<>(fateZkPath, zk, lockId, null); + UserFateStore ufs = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), lockId, null); + return Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); + } - if (fateOpsCommand.summarize) { - summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores, zTableLocksPath); + private Map> + createReadOnlyFateStores(ServerContext context, ZooSession zk, String fateZkPath) + throws InterruptedException, KeeperException { + MetaFateStore readOnlyMFS = new MetaFateStore<>(fateZkPath, zk, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); + return Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); + } + + private ServiceLock createAdminLock(ServerContext context) throws InterruptedException { + var zk = context.getZooSession(); + UUID uuid = UUID.randomUUID(); + ServiceLockPath slp = context.getServerPaths().createAdminLockPath(); + ServiceLock adminLock = new ServiceLock(zk, slp, uuid); + AdminLockWatcher lw = new AdminLockWatcher(); + ServiceLockData.ServiceDescriptors descriptors = new ServiceLockData.ServiceDescriptors(); + descriptors + .addService(new ServiceLockData.ServiceDescriptor(uuid, ServiceLockData.ThriftService.NONE, + "fake_admin_util_host", Constants.DEFAULT_RESOURCE_GROUP_NAME)); + ServiceLockData sld = new ServiceLockData(descriptors); + String lockPath = slp.toString(); + String parentLockPath = lockPath.substring(0, lockPath.lastIndexOf("/")); + + try { + var zrw = zk.asReaderWriter(); + zrw.putPersistentData(parentLockPath, new byte[0], ZooUtil.NodeExistsPolicy.SKIP); + zrw.putPersistentData(lockPath, new byte[0], ZooUtil.NodeExistsPolicy.SKIP); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error creating path in ZooKeeper", e); } + + adminLock.lock(lw, sld); + lockAcquiredLatch.await(); + + return adminLock; } private void validateFateUserInput(FateOpsCommand cmd) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java index 3c749d8df8e..76a37505fab 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListInstances.java @@ -33,7 +33,6 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.InstanceId; -import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReader; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLock; @@ -92,7 +91,6 @@ static synchronized void listInstances(String keepers, boolean printAll, boolean try (var zk = new ZooSession(ListInstances.class.getSimpleName(), keepers, ZOOKEEPER_TIMER_MILLIS, null)) { ZooReader rdr = zk.asReader(); - ZooCache cache = new ZooCache(zk, List.of(Constants.ZROOT)); TreeMap instanceNames = getInstanceNames(rdr, printErrors); @@ -100,7 +98,7 @@ static synchronized void listInstances(String keepers, boolean printAll, boolean printHeader(); for (Entry entry : instanceNames.entrySet()) { - printInstanceInfo(cache, entry.getKey(), entry.getValue(), printErrors); + printInstanceInfo(zk, entry.getKey(), entry.getValue(), printErrors); } TreeSet instancedIds = getInstanceIDs(rdr, printErrors); @@ -108,7 +106,7 @@ static synchronized void listInstances(String keepers, boolean printAll, boolean if (printAll) { for (InstanceId uuid : instancedIds) { - printInstanceInfo(cache, null, uuid, printErrors); + printInstanceInfo(zk, null, uuid, printErrors); } } else if (!instancedIds.isEmpty()) { System.out.println(); @@ -149,9 +147,9 @@ private static void printHeader() { } - private static void printInstanceInfo(ZooCache cache, String instanceName, InstanceId iid, + private static void printInstanceInfo(ZooSession zs, String instanceName, InstanceId iid, boolean printErrors) { - String manager = getManager(cache, iid, printErrors); + String manager = getManager(zs, iid, printErrors); if (instanceName == null) { instanceName = ""; } @@ -164,7 +162,7 @@ private static void printInstanceInfo(ZooCache cache, String instanceName, Insta "\"" + instanceName + "\"", iid, manager); } - private static String getManager(ZooCache cache, InstanceId iid, boolean printErrors) { + private static String getManager(ZooSession zs, InstanceId iid, boolean printErrors) { if (iid == null) { return null; @@ -173,7 +171,7 @@ private static String getManager(ZooCache cache, InstanceId iid, boolean printEr try { var zLockManagerPath = ServiceLockPaths.parse(Optional.of(Constants.ZMANAGER_LOCK), ZooUtil.getRoot(iid) + Constants.ZMANAGER_LOCK); - Optional sld = ServiceLock.getLockData(cache, zLockManagerPath, null); + Optional sld = ServiceLock.getLockData(zs, zLockManagerPath); if (sld.isEmpty()) { return null; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java index 4a1438dc379..f8efa64c6ae 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java @@ -24,7 +24,6 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; @@ -65,10 +64,9 @@ private static Admin.CheckCommand.CheckStatus checkTableLocks(ServerContext cont final var zTableLocksPath = context.getServerPaths().createTableLocksPath(); final String fateZkPath = zkRoot + Constants.ZFATE; final var zk = context.getZooSession(); - final MetaFateStore mfs = - new MetaFateStore<>(fateZkPath, zk, AbstractFateStore.createDummyLockID(), null); - final UserFateStore ufs = new UserFateStore<>(context, AccumuloTable.FATE.tableName(), - AbstractFateStore.createDummyLockID(), null); + final MetaFateStore mfs = new MetaFateStore<>(fateZkPath, zk, null, null); + final UserFateStore ufs = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); log.trace("Ensuring table and namespace locks are valid..."); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 4dc42c11379..f1e6f54a5cd 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -194,7 +194,7 @@ public class CompactionCoordinator private final Manager manager; private final LoadingCache compactorCounts; - private final int jobQueueInitialSize; + private final long jobQueueInitialSize; private volatile long coordinatorStartTime; @@ -208,8 +208,8 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security, this.security = security; this.manager = Objects.requireNonNull(manager); - this.jobQueueInitialSize = ctx.getConfiguration() - .getCount(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE); + this.jobQueueInitialSize = + ctx.getConfiguration().getAsBytes(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE); this.jobQueues = new CompactionJobQueues(jobQueueInitialSize); @@ -1121,8 +1121,6 @@ private void cleanUpEmptyCompactorPathInZK() { final String compactorQueuesPath = this.ctx.getZooKeeperRoot() + Constants.ZCOMPACTORS; final var zoorw = this.ctx.getZooSession().asReaderWriter(); - final double queueSizeFactor = ctx.getConfiguration() - .getFraction(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR); try { var groups = zoorw.getChildren(compactorQueuesPath); @@ -1139,7 +1137,6 @@ private void cleanUpEmptyCompactorPathInZK() { CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid); if (queue != null) { queue.clearIfInactive(Duration.ofMinutes(10)); - queue.setMaxSize(this.jobQueueInitialSize); } } else { int aliveCompactorsForGroup = 0; @@ -1152,16 +1149,8 @@ private void cleanUpEmptyCompactorPathInZK() { aliveCompactorsForGroup++; } } - CompactionJobPriorityQueue queue = getJobQueues().getQueue(cgid); - if (queue != null) { - queue.setMaxSize(Math.min( - Math.max(1, (int) (aliveCompactorsForGroup * queueSizeFactor)), Integer.MAX_VALUE)); - } - } - } - } catch (KeeperException | RuntimeException e) { LOG.warn("Failed to clean up compactors", e); } catch (InterruptedException e) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java index 1f9738dac78..f183b50b86f 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueue.java @@ -31,11 +31,9 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -116,8 +114,8 @@ public boolean equals(Object o) { // behavior is not supported with a PriorityQueue. Second a PriorityQueue does not support // efficiently removing entries from anywhere in the queue. Efficient removal is needed for the // case where tablets decided to issues different compaction jobs than what is currently queued. - private final TreeMap jobQueue; - private final AtomicInteger maxSize; + private final SizeTrackingTreeMap jobQueue; + private final AtomicLong maxSize; private final AtomicLong rejectedJobs; private final AtomicLong dequeuedJobs; private final ArrayDeque> futures; @@ -142,9 +140,10 @@ private TabletJobs(long generation, HashSet jobs) { private final AtomicLong nextSeq = new AtomicLong(0); - public CompactionJobPriorityQueue(CompactorGroupId groupId, int maxSize) { - this.jobQueue = new TreeMap<>(); - this.maxSize = new AtomicInteger(maxSize); + public CompactionJobPriorityQueue(CompactorGroupId groupId, long maxSize, + SizeTrackingTreeMap.Weigher weigher) { + this.jobQueue = new SizeTrackingTreeMap<>(weigher); + this.maxSize = new AtomicLong(maxSize); this.tabletJobs = new HashMap<>(); this.groupId = groupId; this.rejectedJobs = new AtomicLong(0); @@ -230,11 +229,11 @@ public synchronized int add(TabletMetadata tabletMetadata, Collection 0, "Maximum size of the Compaction job priority queue must be greater than 0"); this.maxSize.set(maxSize); @@ -249,7 +248,7 @@ public long getDequeuedJobs() { } public synchronized long getQueuedJobs() { - return jobQueue.size(); + return jobQueue.entrySize(); } public synchronized long getLowestPriority() { @@ -332,7 +331,7 @@ private void removePreviousSubmissions(KeyExtent extent, boolean removeJobAges) } private CjpqKey addJobToQueue(TabletMetadata tabletMetadata, CompactionJob job) { - if (jobQueue.size() >= maxSize.get()) { + if (jobQueue.dataSize() >= maxSize.get()) { var lastEntry = jobQueue.lastKey(); if (job.getPriority() <= lastEntry.job.getPriority()) { // the queue is full and this job has a lower or same priority than the lowest job in the diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java index b9fe1ed424c..2e2dc3cef95 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueues.java @@ -48,18 +48,20 @@ public class CompactionJobQueues { private final ConcurrentHashMap priorityQueues = new ConcurrentHashMap<>(); - private final int queueSize; + private final long queueSize; private final Map currentGenerations; - public CompactionJobQueues(int queueSize) { + private SizeTrackingTreeMap.Weigher weigher = + val -> val.getTabletMetadata().toString().length() + val.getJob().toString().length(); + + public CompactionJobQueues(long queueSize) { this.queueSize = queueSize; Map cg = new EnumMap<>(DataLevel.class); for (var level : DataLevel.values()) { cg.put(level, new AtomicLong()); } currentGenerations = Collections.unmodifiableMap(cg); - } public void beginFullScan(DataLevel level) { @@ -164,7 +166,7 @@ public TabletMetadata getTabletMetadata() { */ public CompletableFuture getAsync(CompactorGroupId groupId) { var pq = priorityQueues.computeIfAbsent(groupId, - gid -> new CompactionJobPriorityQueue(gid, queueSize)); + gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher)); return pq.getAsync(); } @@ -187,7 +189,7 @@ private void add(TabletMetadata tabletMetadata, CompactorGroupId groupId, } var pq = priorityQueues.computeIfAbsent(groupId, - gid -> new CompactionJobPriorityQueue(gid, queueSize)); + gid -> new CompactionJobPriorityQueue(gid, queueSize, weigher)); pq.add(tabletMetadata, jobs, currentGenerations.get(DataLevel.of(tabletMetadata.getTableId())).get()); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java new file mode 100644 index 00000000000..306a56fb647 --- /dev/null +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMap.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.queue; + +import java.util.AbstractMap; +import java.util.Map; +import java.util.TreeMap; + +import com.google.common.base.Preconditions; + +/** + * This class wraps a treemap and tracks the data size of everything added and removed from the + * treemap. + */ +class SizeTrackingTreeMap { + + private static class ValueWrapper { + final V2 val; + final long computedSize; + + private ValueWrapper(V2 val, long computedSize) { + this.val = val; + this.computedSize = computedSize; + } + } + + private final TreeMap> map = new TreeMap<>(); + private long dataSize = 0; + private Weigher weigher; + + private Map.Entry unwrap(Map.Entry> wrapperEntry) { + if (wrapperEntry == null) { + return null; + } + return new AbstractMap.SimpleImmutableEntry<>(wrapperEntry.getKey(), + wrapperEntry.getValue().val); + } + + private void incrementDataSize(ValueWrapper val) { + Preconditions.checkState(dataSize >= 0); + dataSize += val.computedSize; + } + + private void decrementDataSize(Map.Entry> entry) { + if (entry != null) { + decrementDataSize(entry.getValue()); + } + } + + private void decrementDataSize(ValueWrapper val) { + if (val != null) { + Preconditions.checkState(dataSize >= val.computedSize); + dataSize -= val.computedSize; + } + } + + interface Weigher { + long weigh(V2 val); + } + + public SizeTrackingTreeMap(Weigher weigher) { + this.weigher = weigher; + } + + public boolean isEmpty() { + return map.isEmpty(); + } + + public long dataSize() { + return dataSize; + } + + public int entrySize() { + return map.size(); + } + + public K lastKey() { + return map.lastKey(); + } + + public Map.Entry firstEntry() { + return unwrap(map.firstEntry()); + } + + public void remove(K key) { + var prev = map.remove(key); + decrementDataSize(prev); + } + + public Map.Entry pollFirstEntry() { + var first = map.pollFirstEntry(); + decrementDataSize(first); + return unwrap(first); + } + + public Map.Entry pollLastEntry() { + var last = map.pollLastEntry(); + decrementDataSize(last); + return unwrap(last); + } + + public void put(K key, V val) { + var wrapped = new ValueWrapper<>(val, weigher.weigh(val)); + var prev = map.put(key, wrapped); + decrementDataSize(prev); + incrementDataSize(wrapped); + } + + public void clear() { + map.clear(); + dataSize = 0; + } +} diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index 200b3ff2b48..23cc841e9ce 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -53,7 +53,7 @@ public abstract class FateMetrics implements Metrics private static final String OP_TYPE_TAG = "op.type"; protected final ServerContext context; - protected final ReadOnlyFateStore> fateStore; + protected final ReadOnlyFateStore> readOnlyFateStore; protected final long refreshDelay; protected final AtomicLong totalCurrentOpsCount = new AtomicLong(0); @@ -62,14 +62,14 @@ public abstract class FateMetrics implements Metrics public FateMetrics(final ServerContext context, final long minimumRefreshDelay) { this.context = context; this.refreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, minimumRefreshDelay); - this.fateStore = Objects.requireNonNull(buildStore(context)); + this.readOnlyFateStore = Objects.requireNonNull(buildReadOnlyStore(context)); for (TStatus status : TStatus.values()) { txStatusCounters.put(status, new AtomicLong(0)); } } - protected abstract ReadOnlyFateStore> buildStore(ServerContext context); + protected abstract ReadOnlyFateStore> buildReadOnlyStore(ServerContext context); protected abstract T getMetricValues(); @@ -95,7 +95,7 @@ protected void update(T metricValues) { @Override public void registerMetrics(final MeterRegistry registry) { - String type = fateStore.type().name().toLowerCase(); + String type = readOnlyFateStore.type().name().toLowerCase(); Gauge.builder(FATE_OPS.getName(), totalCurrentOpsCount, AtomicLong::get) .description(FATE_OPS.getDescription()).register(registry); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java index 8412a17aad3..b87f3d48223 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.manager.metrics.fate.FateMetrics; @@ -62,10 +61,10 @@ public void registerMetrics(MeterRegistry registry) { } @Override - protected ReadOnlyFateStore> buildStore(ServerContext context) { + protected ReadOnlyFateStore> + buildReadOnlyStore(ServerContext context) { try { - return new MetaFateStore<>(getFateRootPath(context), context.getZooSession(), - AbstractFateStore.createDummyLockID(), null); + return new MetaFateStore<>(getFateRootPath(context), context.getZooSession(), null, null); } catch (KeeperException ex) { throw new IllegalStateException( "FATE Metrics - Failed to create zoo store - metrics unavailable", ex); @@ -78,7 +77,7 @@ protected ReadOnlyFateStore> buildStore(Server @Override protected MetaFateMetricValues getMetricValues() { - return MetaFateMetricValues.getMetaStoreMetrics(context, fateRootPath, fateStore); + return MetaFateMetricValues.getMetaStoreMetrics(context, fateRootPath, readOnlyFateStore); } private static String getFateRootPath(ServerContext context) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java index 92ac8568810..e4fad01899c 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java @@ -18,9 +18,9 @@ */ package org.apache.accumulo.manager.metrics.fate.user; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.manager.metrics.fate.FateMetrics; import org.apache.accumulo.server.ServerContext; @@ -31,12 +31,13 @@ public UserFateMetrics(ServerContext context, long minimumRefreshDelay) { } @Override - protected ReadOnlyFateStore> buildStore(ServerContext context) { - return new UserFateStore<>(context, AbstractFateStore.createDummyLockID(), null); + protected ReadOnlyFateStore> + buildReadOnlyStore(ServerContext context) { + return new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); } @Override protected UserFateMetricValues getMetricValues() { - return UserFateMetricValues.getUserStoreMetrics(fateStore); + return UserFateMetricValues.getUserStoreMetrics(readOnlyFateStore); } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java index 37213cdc488..01c18798754 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobPriorityQueueTest.java @@ -74,7 +74,7 @@ public void testTabletFileReplacement() { EasyMock.replay(tm, cj1, cj2); - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2, mj -> 1); assertEquals(1, queue.add(tm, List.of(cj1), 1L)); MetaJob job = queue.peek(); @@ -129,7 +129,7 @@ public void testAddEqualToMaxSize() { EasyMock.replay(tm, cj1, cj2); - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2, mj -> 1); assertEquals(2, queue.add(tm, List.of(cj1, cj2), 1L)); EasyMock.verify(tm, cj1, cj2); @@ -186,7 +186,7 @@ public void testAddMoreThanMax() { EasyMock.replay(tm, cj1, cj2, cj3); - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 2, mj -> 1); assertEquals(2, queue.add(tm, List.of(cj1, cj2, cj3), 1L)); EasyMock.verify(tm, cj1, cj2, cj3); @@ -247,7 +247,7 @@ public void test() { TreeSet expected = new TreeSet<>(CompactionJobPrioritizer.JOB_COMPARATOR); - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 1000, mj -> 10); // create and add 1000 jobs for (int x = 0; x < 1000; x++) { @@ -256,7 +256,7 @@ public void test() { expected.add(pair.getSecond()); } - assertEquals(100, queue.getMaxSize()); + assertEquals(1000, queue.getMaxSize()); assertEquals(100, queue.getQueuedJobs()); assertEquals(900, queue.getRejectedJobs()); // There should be 1000 total job ages even though 900 were rejected @@ -268,7 +268,7 @@ public void test() { assertTrue(stats.getMaxAge().toMillis() > 0); assertTrue(stats.getAvgAge().toMillis() > 0); - // iterate over the expected set and make sure that they next job in the queue + // iterate over the expected set and make sure that the next job in the queue // matches int matchesSeen = 0; for (CompactionJob expectedJob : expected) { @@ -312,7 +312,7 @@ public void test() { */ @Test public void testAsyncCancelCleanup() { - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100, mj -> 1); List> futures = new ArrayList<>(); @@ -342,7 +342,7 @@ public void testAsyncCancelCleanup() { @Test public void testChangeMaxSize() { - CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100); + CompactionJobPriorityQueue queue = new CompactionJobPriorityQueue(GROUP, 100, mj -> 1); assertEquals(100, queue.getMaxSize()); queue.setMaxSize(50); assertEquals(50, queue.getMaxSize()); @@ -351,5 +351,4 @@ public void testChangeMaxSize() { // Make sure previous value was not changed after invalid setting assertEquals(50, queue.getMaxSize()); } - } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java index 3d8933fa386..f63e56cc490 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/CompactionJobQueuesTest.java @@ -81,7 +81,7 @@ public void testFullScanHandling() throws Exception { var cg2 = CompactorGroupId.of("CG2"); var cg3 = CompactorGroupId.of("CG3"); - CompactionJobQueues jobQueues = new CompactionJobQueues(100); + CompactionJobQueues jobQueues = new CompactionJobQueues(1000000); jobQueues.beginFullScan(DataLevel.USER); @@ -247,7 +247,7 @@ public void testFullScanLevels() throws Exception { var cg1 = CompactorGroupId.of("CG1"); - CompactionJobQueues jobQueues = new CompactionJobQueues(100); + CompactionJobQueues jobQueues = new CompactionJobQueues(1000000); jobQueues.add(tm1, List.of(newJob((short) 1, 5, cg1))); jobQueues.add(tm2, List.of(newJob((short) 2, 6, cg1))); @@ -283,7 +283,7 @@ public void testAddPollRaceCondition() throws Exception { final int numToAdd = 100_000; - CompactionJobQueues jobQueues = new CompactionJobQueues(numToAdd + 1); + CompactionJobQueues jobQueues = new CompactionJobQueues(10000000); CompactorGroupId[] groups = Stream.of("G1", "G2", "G3").map(CompactorGroupId::of).toArray(CompactorGroupId[]::new); @@ -342,7 +342,7 @@ public void testAddPollRaceCondition() throws Exception { @Test public void testGetAsync() throws Exception { - CompactionJobQueues jobQueues = new CompactionJobQueues(100); + CompactionJobQueues jobQueues = new CompactionJobQueues(1000000); var tid = TableId.of("1"); var extent1 = new KeyExtent(tid, new Text("z"), new Text("q")); diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java new file mode 100644 index 00000000000..384363228f0 --- /dev/null +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/queue/SizeTrackingTreeMapTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.manager.compaction.queue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.TreeMap; + +import org.junit.jupiter.api.Test; + +public class SizeTrackingTreeMapTest { + @Test + public void testSizeTracking() { + List computeSizeCalls = new ArrayList<>(); + var stmap = new SizeTrackingTreeMap(val -> { + computeSizeCalls.add(val); + return val.length(); + }); + + TreeMap expected = new TreeMap<>(); + + check(expected, stmap); + assertEquals(List.of(), computeSizeCalls); + + stmap.put(3, "1234567890"); + expected.put(3, "1234567890"); + check(expected, stmap); + assertEquals(List.of("1234567890"), computeSizeCalls); + + stmap.put(4, "12345"); + expected.put(4, "12345"); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345"), computeSizeCalls); + + // remove a key that does not exist + stmap.remove(2); + expected.remove(2); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345"), computeSizeCalls); + + // remove a key that does exist + stmap.remove(3); + expected.remove(3); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345"), computeSizeCalls); + + // update an existing key, should decrement the old size and increment the new size + stmap.put(4, "123"); + expected.put(4, "123"); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123"), computeSizeCalls); + + stmap.put(7, "123456789012345"); + expected.put(7, "123456789012345"); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123", "123456789012345"), computeSizeCalls); + + stmap.put(11, "1234567"); + expected.put(11, "1234567"); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123", "123456789012345", "1234567"), + computeSizeCalls); + + assertEquals(expected.pollFirstEntry(), stmap.pollFirstEntry()); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123", "123456789012345", "1234567"), + computeSizeCalls); + + assertEquals(expected.pollLastEntry(), stmap.pollLastEntry()); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123", "123456789012345", "1234567"), + computeSizeCalls); + + expected.clear(); + stmap.clear(); + check(expected, stmap); + assertEquals(List.of("1234567890", "12345", "123", "123456789012345", "1234567"), + computeSizeCalls); + } + + private void check(TreeMap expected, SizeTrackingTreeMap stmap) { + long expectedDataSize = expected.values().stream().mapToLong(String::length).sum(); + assertEquals(expectedDataSize, stmap.dataSize()); + assertEquals(expected.size(), stmap.entrySize()); + assertEquals(expected.isEmpty(), stmap.isEmpty()); + assertEquals(expected.firstEntry(), stmap.firstEntry()); + if (expected.isEmpty()) { + assertThrows(NoSuchElementException.class, stmap::lastKey); + } else { + assertEquals(expected.lastKey(), stmap.lastKey()); + } + } +} diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java index b728b8a2b77..ac142dc6717 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java @@ -20,15 +20,18 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.SCAN_SERVER; +import static org.apache.accumulo.core.client.admin.servers.ServerId.Type.TABLET_SERVER; import java.net.InetAddress; import java.net.URL; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -47,7 +50,6 @@ import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.client.admin.servers.ServerId.Type; import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService; -import org.apache.accumulo.core.compaction.thrift.CompactorService; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.compaction.thrift.TExternalCompactionList; import org.apache.accumulo.core.conf.Property; @@ -342,7 +344,7 @@ private GCStatus fetchGcStatus() { } } } catch (Exception ex) { - log.warn("Unable to contact the garbage collector at " + address, ex); + log.warn("Unable to contact the garbage collector at {}", address, ex); } return result; } @@ -507,73 +509,136 @@ public static class CompactionStats { } } - private final Map tserverScans = new HashMap<>(); - private final Map sserverScans = new HashMap<>(); - private final Map allCompactions = new HashMap<>(); - private final ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); - - private long scansFetchedNanos = System.nanoTime(); - private long compactsFetchedNanos = System.nanoTime(); - private long ecInfoFetchedNanos = System.nanoTime(); - private final long fetchTimeNanos = TimeUnit.MINUTES.toNanos(1); - private final long ageOffEntriesMillis = TimeUnit.MINUTES.toMillis(15); - // When there are a large amount of external compactions running the list of external compactions - // could consume a lot of memory. The purpose of this memoizing supplier is to try to avoid - // creating the list of running external compactions in memory per web request. If multiple - // request come in around the same time they should use the same list. It is still possible to - // have multiple list in memory if one request obtains a copy and then another request comes in - // after the timeout and the supplier recomputes the list. The longer the timeout on the supplier - // is the less likely we are to have multiple list of external compactions in memory, however - // increasing the timeout will make the monitor less responsive. - private final Supplier extCompactionSnapshot = - Suppliers.memoizeWithExpiration(() -> computeExternalCompactionsSnapshot(), fetchTimeNanos, - TimeUnit.NANOSECONDS); + private final long expirationTimeMinutes = 1; + + // Use Suppliers.memoizeWithExpiration() to cache the results of expensive fetch operations. This + // avoids unnecessary repeated fetches within the expiration period and ensures that multiple + // requests around the same time use the same cached data. + private final Supplier> tserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchTServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier> sserverScansSupplier = + Suppliers.memoizeWithExpiration(this::fetchSServerScans, expirationTimeMinutes, MINUTES); + + private final Supplier> compactionsSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactions, expirationTimeMinutes, MINUTES); + + private final Supplier compactorInfoSupplier = + Suppliers.memoizeWithExpiration(this::fetchCompactorsInfo, expirationTimeMinutes, MINUTES); + + private final Supplier externalCompactionsSupplier = + Suppliers.memoizeWithExpiration(this::computeExternalCompactionsSnapshot, + expirationTimeMinutes, MINUTES); /** - * Fetch the active scans but only if fetchTimeNanos has elapsed. + * @return active tablet server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. */ - public synchronized Map getScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active TabletServer Scans"); - fetchScans(); - } - return Map.copyOf(tserverScans); + public Map getScans() { + return tserverScansSupplier.get(); } - public synchronized Map getScanServerScans() { - if (System.nanoTime() - scansFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active ScanServer Scans"); - fetchScans(); - } - return Map.copyOf(sserverScans); + /** + * @return active scan server scans. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public Map getScanServerScans() { + return sserverScansSupplier.get(); } /** - * Fetch the active compactions but only if fetchTimeNanos has elapsed. + * @return active compactions. Values are cached and refresh after {@link #expirationTimeMinutes}. */ - public synchronized Map getCompactions() { - if (System.nanoTime() - compactsFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of Active Compactions"); - fetchCompactions(); - } - return Map.copyOf(allCompactions); + public Map getCompactions() { + return compactionsSupplier.get(); } - public synchronized ExternalCompactionInfo getCompactorsInfo() { + /** + * @return external compaction information. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public ExternalCompactionInfo getCompactorsInfo() { if (coordinatorHost.isEmpty()) { throw new IllegalStateException("Tried fetching from compaction coordinator that's missing"); } - if (System.nanoTime() - ecInfoFetchedNanos > fetchTimeNanos) { - log.info("User initiated fetch of External Compaction info"); - Set compactors = - getContext().instanceOperations().getServers(ServerId.Type.COMPACTOR); - log.debug("Found compactors: " + compactors); - ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); - ecInfo.setCompactors(compactors); - ecInfo.setCoordinatorHost(coordinatorHost); - - ecInfoFetchedNanos = System.nanoTime(); + return compactorInfoSupplier.get(); + } + + /** + * @return running compactions. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public RunningCompactions getRunningCompactions() { + return externalCompactionsSupplier.get().runningCompactions; + } + + /** + * @return running compactor details. Values are cached and refresh after + * {@link #expirationTimeMinutes}. + */ + public RunningCompactorDetails getRunningCompactorDetails(ExternalCompactionId ecid) { + TExternalCompaction extCompaction = + externalCompactionsSupplier.get().ecRunningMap.get(ecid.canonical()); + if (extCompaction == null) { + return null; + } + return new RunningCompactorDetails(extCompaction); + } + + private Map fetchScans(Collection servers) { + ServerContext context = getContext(); + Map scans = new HashMap<>(); + for (ServerId server : servers) { + final HostAndPort parsedServer = HostAndPort.fromString(server.toHostPortString()); + TabletScanClientService.Client client = null; + try { + client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); + List activeScans = client.getActiveScans(null, context.rpcCreds()); + scans.put(parsedServer, new ScanStats(activeScans)); + } catch (Exception ex) { + log.error("Failed to get active scans from {}", server, ex); + } finally { + ThriftUtil.returnClient(client, context); + } + } + return Collections.unmodifiableMap(scans); + } + + private Map fetchTServerScans() { + return fetchScans(getContext().instanceOperations().getServers(TABLET_SERVER)); + } + + private Map fetchSServerScans() { + return fetchScans(getContext().instanceOperations().getServers(SCAN_SERVER)); + } + + private Map fetchCompactions() { + ServerContext context = getContext(); + Map allCompactions = new HashMap<>(); + for (ServerId server : context.instanceOperations().getServers(TABLET_SERVER)) { + final HostAndPort parsedServer = HostAndPort.fromString(server.toHostPortString()); + Client tserver = null; + try { + tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); + var compacts = tserver.getActiveCompactions(null, context.rpcCreds()); + allCompactions.put(parsedServer, new CompactionStats(compacts)); + } catch (Exception ex) { + log.debug("Failed to get active compactions from {}", server, ex); + } finally { + ThriftUtil.returnClient(tserver, context); + } } + return Collections.unmodifiableMap(allCompactions); + } + + private ExternalCompactionInfo fetchCompactorsInfo() { + Set compactors = + getContext().instanceOperations().getServers(ServerId.Type.COMPACTOR); + log.debug("Found compactors: {}", compactors); + ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); + ecInfo.setFetchedTimeMillis(System.currentTimeMillis()); + ecInfo.setCompactors(compactors); + ecInfo.setCoordinatorHost(coordinatorHost); return ecInfo; } @@ -593,7 +658,7 @@ private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { throw new IllegalStateException(coordinatorMissingMsg); } var ccHost = coordinatorHost.orElseThrow(); - log.info("User initiated fetch of running External Compactions from " + ccHost); + log.info("User initiated fetch of running External Compactions from {}", ccHost); try { CompactionCoordinatorService.Client client = ThriftUtil.getClient(ThriftClientTypes.COORDINATOR, ccHost, getContext()); @@ -614,95 +679,6 @@ private ExternalCompactionsSnapshot computeExternalCompactionsSnapshot() { } } - public RunningCompactions getRunnningCompactions() { - return extCompactionSnapshot.get().runningCompactions; - } - - public RunningCompactorDetails getRunningCompactorDetails(ExternalCompactionId ecid) { - TExternalCompaction extCompaction = - extCompactionSnapshot.get().ecRunningMap.get(ecid.canonical()); - if (extCompaction == null) { - return null; - } - return new RunningCompactorDetails(extCompaction); - } - - private void fetchScans() { - final ServerContext context = getContext(); - final Set servers = new HashSet<>(); - servers.addAll(context.instanceOperations().getServers(ServerId.Type.SCAN_SERVER)); - servers.addAll(context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)); - - for (ServerId server : servers) { - TabletScanClientService.Client tserver = null; - try { - HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort()); - tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); - List scans = tserver.getActiveScans(null, context.rpcCreds()); - tserverScans.put(parsedServer, new ScanStats(scans)); - scansFetchedNanos = System.nanoTime(); - } catch (Exception ex) { - log.error("Failed to get active scans from {}", server, ex); - } finally { - ThriftUtil.returnClient(tserver, context); - } - } - // Age off old scan information - Iterator> tserverIter = tserverScans.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (tserverIter.hasNext()) { - Entry entry = tserverIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - tserverIter.remove(); - } - } - } - - private void fetchCompactions() { - final ServerContext context = getContext(); - - for (ServerId server : context.instanceOperations().getServers(ServerId.Type.TABLET_SERVER)) { - final HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort()); - Client tserver = null; - try { - tserver = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER, parsedServer, context); - var compacts = tserver.getActiveCompactions(null, context.rpcCreds()); - allCompactions.put(parsedServer, new CompactionStats(compacts)); - compactsFetchedNanos = System.nanoTime(); - } catch (Exception ex) { - log.debug("Failed to get active compactions from {}", server, ex); - } finally { - ThriftUtil.returnClient(tserver, context); - } - } - for (ServerId server : context.instanceOperations().getServers(ServerId.Type.COMPACTOR)) { - final HostAndPort parsedServer = HostAndPort.fromParts(server.getHost(), server.getPort()); - CompactorService.Client compactor = null; - try { - compactor = ThriftUtil.getClient(ThriftClientTypes.COMPACTOR, parsedServer, context); - var compacts = compactor.getActiveCompactions(null, context.rpcCreds()); - allCompactions.put(parsedServer, new CompactionStats(compacts)); - compactsFetchedNanos = System.nanoTime(); - } catch (Exception ex) { - log.debug("Failed to get active compactions from {}", server, ex); - } finally { - ThriftUtil.returnClient(compactor, context); - } - } - - // Age off old compaction information - var entryIter = allCompactions.entrySet().iterator(); - // clock time used for fetched for date friendly display - long now = System.currentTimeMillis(); - while (entryIter.hasNext()) { - var entry = entryIter.next(); - if (now - entry.getValue().fetched > ageOffEntriesMillis) { - entryIter.remove(); - } - } - } - /** * Get the monitor lock in ZooKeeper */ diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java index 72d54d70a4e..5fcecef3493 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java @@ -60,7 +60,7 @@ public Compactors getCompactors() { @Path("running") @GET public RunningCompactions getRunning() { - return monitor.getRunnningCompactions(); + return monitor.getRunningCompactions(); } @Path("details") diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java index c7e516c6172..baf740a141c 100644 --- a/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java +++ b/shell/src/main/java/org/apache/accumulo/shell/commands/HelpCommand.java @@ -39,7 +39,8 @@ public class HelpCommand extends Command { @Override public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws ShellCommandException, IOException { - int numColumns = shellState.getTerminal().getWidth(); + int numColumns = + (shellState.getTerminal().getWidth() == 0) ? 80 : shellState.getTerminal().getWidth(); if (cl.hasOption(noWrapOpt.getOpt())) { numColumns = Integer.MAX_VALUE; } diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java index bdee45f9353..2a032565953 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/CompactionPriorityQueueMetricsIT.java @@ -109,7 +109,7 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase { public static final String QUEUE1 = "METRICSQ1"; public static final String QUEUE1_METRIC_LABEL = MetricsUtil.formatString(QUEUE1); public static final String QUEUE1_SERVICE = "Q1"; - public static final int QUEUE1_SIZE = 6; + public static final int QUEUE1_SIZE = 10 * 1024; // Metrics collector Thread final LinkedBlockingQueue queueMetrics = new LinkedBlockingQueue<>(); @@ -202,7 +202,7 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf) Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE1_SERVICE + ".planner.opts.groups", "[{'group':'" + QUEUE1 + "'}]"); - cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE, "6"); + cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, "10K"); cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0); // This test waits for dead compactors to be absent in zookeeper. The following setting will diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 82c0f9b8bce..99002079d4a 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.test.compaction; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3; @@ -87,6 +86,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.schema.CompactionMetadata; @@ -102,17 +102,20 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.util.FindCompactionTmpFiles; +import org.apache.accumulo.test.fate.TestLock; import org.apache.accumulo.test.functional.CompactionIT.ErrorThrowingSelector; import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import com.google.common.base.Preconditions; public class ExternalCompaction_1_IT extends SharedMiniClusterBase { + private static ServiceLock testLock; public static class ExternalCompaction1Config implements MiniClusterConfigurationCallback { @Override @@ -124,6 +127,14 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreS @BeforeAll public static void beforeTests() throws Exception { startMiniClusterWithConfig(new ExternalCompaction1Config()); + testLock = new TestLock().createTestLock(getCluster().getServerContext()); + } + + @AfterAll + public static void afterTests() throws Exception { + if (testLock != null) { + testLock.unlock(); + } } public static class TestFilter extends Filter { @@ -237,7 +248,7 @@ public void testExternalCompaction() throws Exception { public void testCompactionCommitAndDeadDetectionRoot() throws Exception { var ctx = getCluster().getServerContext(); FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, - ctx.getZooSession(), createDummyLockID(), null); + ctx.getZooSession(), testLock.getLockID(), null); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { var tableId = ctx.getTableId(AccumuloTable.ROOT.tableName()); @@ -256,7 +267,7 @@ public void testCompactionCommitAndDeadDetectionRoot() throws Exception { public void testCompactionCommitAndDeadDetectionMeta() throws Exception { var ctx = getCluster().getServerContext(); FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, - ctx.getZooSession(), createDummyLockID(), null); + ctx.getZooSession(), testLock.getLockID(), null); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { // Metadata table by default already has 2 tablets @@ -278,7 +289,8 @@ public void testCompactionCommitAndDeadDetectionUser() throws Exception { final String tableName = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore userFateStore = new UserFateStore<>(ctx, createDummyLockID(), null); + UserFateStore userFateStore = + new UserFateStore<>(ctx, AccumuloTable.FATE.tableName(), testLock.getLockID(), null); SortedSet splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); @@ -301,9 +313,11 @@ public void testCompactionCommitAndDeadDetectionAll() throws Exception { final String userTable = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore userFateStore = new UserFateStore<>(ctx, createDummyLockID(), null); - FateStore metaFateStore = new MetaFateStore<>( - ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooSession(), createDummyLockID(), null); + UserFateStore userFateStore = + new UserFateStore<>(ctx, AccumuloTable.FATE.tableName(), testLock.getLockID(), null); + FateStore metaFateStore = + new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooSession(), + testLock.getLockID(), null); SortedSet splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java index 71b198c0ac9..f15fb6caaa1 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java @@ -27,7 +27,8 @@ import org.apache.accumulo.core.fate.Repo; /** - * A FATE which performs the dead reservation cleanup with a much shorter delay between + * A FATE which performs the dead reservation cleanup with a much shorter delay between. Useful for + * shortening test times for tests that are waiting for a cleanup to occur. */ public class FastFate extends Fate { @@ -38,6 +39,6 @@ public FastFate(T environment, FateStore store, boolean runDeadResCleaner, @Override public Duration getDeadResCleanupDelay() { - return Duration.ofSeconds(15); + return Duration.ofSeconds(5); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index b1fce72f834..b0f3d7ffeae 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; @@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import java.io.IOException; import java.lang.reflect.Method; import java.time.Duration; import java.util.ArrayList; @@ -50,8 +51,7 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.conf.ConfigurationCopy; -import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.Fate; @@ -64,39 +64,34 @@ import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.core.lock.ServiceLockPaths.AddressSelector; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.zookeeper.ZooSession; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; -import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.Admin; import org.apache.accumulo.server.util.fateCommand.FateSummaryReport; import org.apache.accumulo.server.util.fateCommand.FateTxnDetails; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestRepo; import org.apache.accumulo.test.functional.ConfigurableMacBase; import org.apache.accumulo.test.functional.ReadWriteIT; import org.apache.accumulo.test.functional.SlowIterator; import org.apache.accumulo.test.util.Wait; -import org.apache.hadoop.conf.Configuration; import org.easymock.EasyMock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public abstract class FateOpsCommandsIT extends ConfigurableMacBase - implements FateTestRunner { + implements FateTestRunner { @Override protected Duration defaultTimeout() { return Duration.ofMinutes(3); } - @Override - public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - // Used for tests that shutdown the manager so the sleep time after shutdown isn't too long - cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10s"); - } - @BeforeEach - public void shutdownCompactor() throws Exception { + public void beforeEachSetup() throws Exception { // Occasionally, the summary/print cmds will see a COMMIT_COMPACTION transaction which was // initiated on starting the manager, causing the test to fail. Stopping the compactor fixes // this issue. @@ -110,10 +105,10 @@ public void testFateSummaryCommand() throws Exception { executeTest(this::testFateSummaryCommand); } - protected void testFateSummaryCommand(FateStore store, ServerContext sctx) + protected void testFateSummaryCommand(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // validate blank report, no transactions have started ProcessInfo p = getCluster().exec(Admin.class, "fate", "--summary", "-j"); @@ -298,7 +293,7 @@ protected void testFateSummaryCommand(FateStore store, ServerContext sc validateFateDetails(report.getFateDetails(), 2, fateIdsStarted); } - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test @@ -306,10 +301,10 @@ public void testFateSummaryCommandPlainText() throws Exception { executeTest(this::testFateSummaryCommandPlainText); } - protected void testFateSummaryCommandPlainText(FateStore store, ServerContext sctx) + protected void testFateSummaryCommandPlainText(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -326,7 +321,7 @@ protected void testFateSummaryCommandPlainText(FateStore store, ServerC "Fate ID Filters: [" + fateId2.canonical() + ", " + fateId1.canonical() + "]")); assertTrue(result.contains("Instance Types Filters: [" + store.type().name() + "]")); - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test @@ -334,10 +329,10 @@ public void testFatePrintCommand() throws Exception { executeTest(this::testFatePrintCommand); } - protected void testFatePrintCommand(FateStore store, ServerContext sctx) + protected void testFatePrintCommand(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // validate no transactions ProcessInfo p = getCluster().exec(Admin.class, "fate", "--print"); @@ -428,7 +423,7 @@ protected void testFatePrintCommand(FateStore store, ServerContext sctx assertTrue(fateIdsFromResult.isEmpty()); } - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test @@ -436,7 +431,7 @@ public void testTransactionNameAndStep() throws Exception { executeTest(this::testTransactionNameAndStep); } - protected void testTransactionNameAndStep(FateStore store, ServerContext sctx) + protected void testTransactionNameAndStep(FateStore store, ServerContext sctx) throws Exception { // Since the other tests just use NEW transactions for simplicity, there are some fields of the // summary and print outputs which are null and not tested for (transaction name and transaction @@ -504,10 +499,10 @@ public void testFateCancelCommand() throws Exception { executeTest(this::testFateCancelCommand); } - protected void testFateCancelCommand(FateStore store, ServerContext sctx) + protected void testFateCancelCommand(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + Fate fate = initFateNoDeadResCleaner(store); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -529,18 +524,19 @@ protected void testFateCancelCommand(FateStore store, ServerContext sct assertEquals(Map.of(fateId1.canonical(), "FAILED", fateId2.canonical(), "NEW"), fateIdsFromSummary); - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test - public void testFateFailCommand() throws Exception { - executeTest(this::testFateFailCommand); + public void testFateFailCommandTimeout() throws Exception { + stopManagerAndExecuteTest(this::testFateFailCommandTimeout); } - protected void testFateFailCommand(FateStore store, ServerContext sctx) + protected void testFateFailCommandTimeout(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + LatchTestEnv env = new LatchTestEnv(); + FastFate fate = initFateWithDeadResCleaner(store, env); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -551,19 +547,50 @@ protected void testFateFailCommand(FateStore store, ServerContext sctx) assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Attempt to --fail the transaction. Should not work as the Manager is still up + // Seed the transaction with the latch repo, so we can have an IN_PROGRESS transaction + fate.seedTransaction(TEST_FATE_OP, fateId1, new LatchTestRepo(), true, "test"); + // Wait for 'fate' to reserve fateId1 (will be IN_PROGRESS on fateId1) + Wait.waitFor(() -> env.numWorkers.get() == 1); + + // Try to fail fateId1 + // This should not work as it is already reserved and being worked on by our running FATE + // ('fate'). Admin should try to reserve it for a bit, but should fail and exit ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); - assertEquals(1, p.getProcess().waitFor()); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Could not fail " + fateId1 + " in a reasonable time")); fateIdsFromSummary = getFateIdsFromSummary(); - assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + assertEquals(Map.of(fateId1.canonical(), "IN_PROGRESS", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Stop MANAGER so --fail can be called - getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); - Thread.sleep(20_000); + // Finish work and shutdown + env.workersLatch.countDown(); + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFateFailCommandSuccess() throws Exception { + executeTest(this::testFateFailCommandSuccess); + } + + protected void testFateFailCommandSuccess(FateStore store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate fate = initFateNoDeadResCleaner(store); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); + + // Check that summary output lists both the transactions with a NEW status + Map fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); - // Fail the first transaction and ensure that it was failed - p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); + // Try to fail fateId1 + // This should work since nothing has fateId1 reserved (it is NEW) + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--fail"); assertEquals(0, p.getProcess().waitFor()); String result = p.readStdOut(); @@ -574,18 +601,19 @@ protected void testFateFailCommand(FateStore store, ServerContext sctx) || fateIdsFromSummary .equals(Map.of(fateId1.canonical(), "FAILED", fateId2.canonical(), "NEW"))); - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test - public void testFateDeleteCommand() throws Exception { - executeTest(this::testFateDeleteCommand); + public void testFateDeleteCommandTimeout() throws Exception { + stopManagerAndExecuteTest(this::testFateDeleteCommandTimeout); } - protected void testFateDeleteCommand(FateStore store, ServerContext sctx) + protected void testFateDeleteCommandTimeout(FateStore store, ServerContext sctx) throws Exception { // Configure Fate - Fate fate = initializeFate(store); + LatchTestEnv env = new LatchTestEnv(); + FastFate fate = initFateWithDeadResCleaner(store, env); // Start some transactions FateId fateId1 = fate.startTransaction(); @@ -596,19 +624,50 @@ protected void testFateDeleteCommand(FateStore store, ServerContext sct assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Attempt to --delete the transaction. Should not work as the Manager is still up + // Seed the transaction with the latch repo, so we can have an IN_PROGRESS transaction + fate.seedTransaction(TEST_FATE_OP, fateId1, new LatchTestRepo(), true, "test"); + // Wait for 'fate' to reserve fateId1 (will be IN_PROGRESS on fateId1) + Wait.waitFor(() -> env.numWorkers.get() == 1); + + // Try to delete fateId1 + // This should not work as it is already reserved and being worked on by our running FATE + // ('fate'). Admin should try to reserve it for a bit, but should fail and exit ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); - assertEquals(1, p.getProcess().waitFor()); + assertEquals(0, p.getProcess().waitFor()); + String result = p.readStdOut(); + + assertTrue(result.contains("Could not delete " + fateId1 + " in a reasonable time")); fateIdsFromSummary = getFateIdsFromSummary(); - assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + assertEquals(Map.of(fateId1.canonical(), "IN_PROGRESS", fateId2.canonical(), "NEW"), fateIdsFromSummary); - // Stop MANAGER so --delete can be called - getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); - Thread.sleep(20_000); + // Finish work and shutdown + env.workersLatch.countDown(); + fate.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testFateDeleteCommandSuccess() throws Exception { + executeTest(this::testFateDeleteCommandSuccess); + } + + protected void testFateDeleteCommandSuccess(FateStore store, ServerContext sctx) + throws Exception { + // Configure Fate + Fate fate = initFateNoDeadResCleaner(store); + + // Start some transactions + FateId fateId1 = fate.startTransaction(); + FateId fateId2 = fate.startTransaction(); - // Delete the first transaction and ensure that it was deleted - p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); + // Check that summary output lists both the transactions with a NEW status + Map fateIdsFromSummary = getFateIdsFromSummary(); + assertEquals(Map.of(fateId1.canonical(), "NEW", fateId2.canonical(), "NEW"), + fateIdsFromSummary); + + // Try to delete fateId1 + // This should work since nothing has fateId1 reserved (it is NEW) + ProcessInfo p = getCluster().exec(Admin.class, "fate", fateId1.canonical(), "--delete"); assertEquals(0, p.getProcess().waitFor()); String result = p.readStdOut(); @@ -616,7 +675,7 @@ protected void testFateDeleteCommand(FateStore store, ServerContext sct fateIdsFromSummary = getFateIdsFromSummary(); assertEquals(Map.of(fateId2.canonical(), "NEW"), fateIdsFromSummary); - fate.shutdown(10, TimeUnit.MINUTES); + fate.shutdown(1, TimeUnit.MINUTES); } @Test @@ -624,7 +683,7 @@ public void testFatePrintAndSummaryCommandsWithInProgressTxns() throws Exception executeTest(this::testFatePrintAndSummaryCommandsWithInProgressTxns); } - protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore store, + protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore store, ServerContext sctx) throws Exception { // This test was written for an issue with the 'admin fate --print' and 'admin fate --summary' // commands where transactions could complete mid-print causing the command to fail. These @@ -632,20 +691,20 @@ protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore mockedStore; + FateStore mockedStore; // This error was occurring in AdminUtil.getTransactionStatus(), so we will test this method. if (store.type().equals(FateInstanceType.USER)) { Method listMethod = UserFateStore.class.getMethod("list"); mockedStore = EasyMock.createMockBuilder(UserFateStore.class) - .withConstructor(ClientContext.class, ZooUtil.LockID.class, Predicate.class) - .withArgs(sctx, createDummyLockID(), null).addMockedMethod(listMethod).createMock(); + .withConstructor(ClientContext.class, String.class, ZooUtil.LockID.class, Predicate.class) + .withArgs(sctx, AccumuloTable.FATE.tableName(), null, null).addMockedMethod(listMethod) + .createMock(); } else { Method listMethod = MetaFateStore.class.getMethod("list"); mockedStore = EasyMock.createMockBuilder(MetaFateStore.class) .withConstructor(String.class, ZooSession.class, ZooUtil.LockID.class, Predicate.class) - .withArgs(sctx.getZooKeeperRoot() + Constants.ZFATE, sctx.getZooSession(), - createDummyLockID(), null) + .withArgs(sctx.getZooKeeperRoot() + Constants.ZFATE, sctx.getZooSession(), null, null) .addMockedMethod(listMethod).createMock(); } @@ -780,11 +839,17 @@ private void validateFateDetails(Set details, int expDetailsSize } } - private Fate initializeFate(FateStore store) { - ConfigurationCopy config = new ConfigurationCopy(); - config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); - config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - return new Fate<>(new TestEnv(), store, false, Object::toString, config); + protected FastFate initFateWithDeadResCleaner(FateStore store, + LatchTestEnv env) { + // Using FastFate so the cleanup will run often. This ensures that the cleanup will run when + // there are reservations present and that the cleanup will not unexpectedly delete these live + // reservations + return new FastFate<>(env, store, true, Object::toString, DefaultConfiguration.getInstance()); + } + + protected Fate initFateNoDeadResCleaner(FateStore store) { + return new Fate<>(new LatchTestEnv(), store, false, Object::toString, + DefaultConfiguration.getInstance()); } private boolean wordIsTStatus(String word) { @@ -795,4 +860,19 @@ private boolean wordIsTStatus(String word) { } return true; } + + /** + * Stop the MANAGER. For some of our tests, we want to be able to seed transactions with our own + * test repos. We want our fate to reserve these transactions (and not the real fates running in + * the Manager as that will lead to exceptions since the real fates wouldn't be able to handle our + * test repos). So, we essentially have the fates created here acting as the real fates: they have + * the same threads running that the real fates would, use a fate store with a ZK lock, use the + * same locations to store fate data that the Manager does, and are running in a separate process + * from the Admin process. Note that we cannot simply use different locations for our fate data + * from Manager to keep our test env separate from Manager. Admin uses the real fate data + * locations, so our test must also use the real locations. + */ + protected void stopManager() throws IOException { + getCluster().getClusterControl().stopAllServers(ServerType.MANAGER); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java b/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java index 244f7991116..dc9b7d22fc6 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java @@ -34,6 +34,11 @@ default void executeTest(FateTestExecutor testMethod) throws Exception { AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); } + default void stopManagerAndExecuteTest(FateTestExecutor testMethod) + throws Exception { + throw new UnsupportedOperationException("Not implemented"); + } + interface FateTestExecutor { void execute(FateStore store, ServerContext sctx) throws Exception; } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index 9dd65770183..f4eb390701e 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -342,8 +342,8 @@ private void testDeadReservationsCleanup(TestStoreFactory testStor // Create the new Fate/start the Fate threads (the work finder and the workers). // Don't run another dead reservation cleaner since we already have one running from fate1. - FastFate fate2 = new FastFate<>(testEnv2, store2, false, Object::toString, - DefaultConfiguration.getInstance()); + Fate fate2 = + new Fate<>(testEnv2, store2, false, Object::toString, DefaultConfiguration.getInstance()); // Wait for the "dead" reservations to be deleted and picked up again (reserved using // fate2/store2/lock2 now). @@ -462,5 +462,5 @@ protected interface MultipleStoresTestExecutor void execute(TestStoreFactory fateStoreFactory) throws Exception; } - protected static class MultipleStoresTestEnv {} + protected static class MultipleStoresTestEnv extends FateTestRunner.TestEnv {} } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/TestLock.java b/test/src/main/java/org/apache/accumulo/test/fate/TestLock.java new file mode 100644 index 00000000000..733b21379dd --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/TestLock.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.server.ServerContext; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestLock { + private static final Logger log = LoggerFactory.getLogger(TestLock.class); + final CountDownLatch lockAcquiredLatch; + + public TestLock() { + this.lockAcquiredLatch = new CountDownLatch(1); + } + + /** + * Used to create a dummy lock id to be passed in the creation of a {@link UserFateStore} or a + * {@link MetaFateStore}. Useful as a quicker and simpler alternative to + * {@link TestLock#createTestLock(ServerContext)} for tests where reserving transactions is needed + * AND the reservations for the test will be stored in a different location from the Managers fate + * stores. Can always use {@link TestLock#createTestLock(ServerContext)} to be safe if unsure + * which to use. + */ + public static ZooUtil.LockID createDummyLockID() { + return new ZooUtil.LockID("/path", "node", 123); + } + + /** + * Used to create a real lock (one held in ZK) to be passed in the creation of a + * {@link UserFateStore} or a {@link MetaFateStore}. Useful for tests where reserving transactions + * is needed AND the reservations for the test will be stored in the same location as the Managers + * fate stores. This is needed so the Manager will recognize and not delete these reservations. + * See similar {@link TestLock#createDummyLockID()} + */ + public ServiceLock createTestLock(ServerContext context) throws InterruptedException { + var zk = context.getZooSession(); + UUID uuid = UUID.randomUUID(); + ServiceLockPaths.ServiceLockPath slp = context.getServerPaths().createTestLockPath(); + ServiceLock lock = new ServiceLock(zk, slp, uuid); + TestLockWatcher lw = new TestLockWatcher(); + ServiceLockData.ServiceDescriptors descriptors = new ServiceLockData.ServiceDescriptors(); + descriptors + .addService(new ServiceLockData.ServiceDescriptor(uuid, ServiceLockData.ThriftService.NONE, + "fake_test_host", Constants.DEFAULT_RESOURCE_GROUP_NAME)); + ServiceLockData sld = new ServiceLockData(descriptors); + String lockPath = slp.toString(); + String parentLockPath = lockPath.substring(0, lockPath.lastIndexOf("/")); + + try { + var zrw = zk.asReaderWriter(); + zrw.putPersistentData(parentLockPath, new byte[0], NodeExistsPolicy.SKIP); + zrw.putPersistentData(lockPath, new byte[0], NodeExistsPolicy.SKIP); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException("Error creating path in ZooKeeper", e); + } + + lock.lock(lw, sld); + lockAcquiredLatch.await(); + + return lock; + } + + class TestLockWatcher implements ServiceLock.AccumuloLockWatcher { + + @Override + public void lostLock(ServiceLock.LockLossReason reason) { + log.warn("Lost lock: " + reason.toString()); + } + + @Override + public void unableToMonitorLockNode(Exception e) { + log.warn("Unable to monitor lock: " + e.getMessage()); + } + + @Override + public void acquiredLock() { + lockAcquiredLatch.countDown(); + log.debug("Acquired ZooKeeper lock for test"); + } + + @Override + public void failedToAcquireLock(Exception e) { + log.warn("Failed to acquire ZooKeeper lock for test, msg: " + e.getMessage()); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java index 06e1b43031d..5c7a0c26fb1 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java index 3e2cd3a2391..e24fc0af5ba 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java @@ -18,7 +18,7 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import java.util.UUID; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java index 937a675f9c4..7009e55dd28 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java @@ -18,21 +18,58 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import java.util.function.Predicate; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateOpsCommandsIT; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv; +import org.apache.accumulo.test.fate.TestLock; public class MetaFateOpsCommandsIT extends FateOpsCommandsIT { + /** + * This should be used for tests that will not seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateNoDeadResCleaner(FateStore)} + */ @Override - public void executeTest(FateTestExecutor testMethod, int maxDeferred, + public void executeTest(FateTestExecutor testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { - ServerContext sctx = getCluster().getServerContext(); - String path = sctx.getZooKeeperRoot() + Constants.ZFATE; - var zk = sctx.getZooSession(); - testMethod.execute(new MetaFateStore<>(path, zk, createDummyLockID(), null), sctx); + ServerContext context = getCluster().getServerContext(); + String path = context.getZooKeeperRoot() + Constants.ZFATE; + var zk = context.getZooSession(); + // test should not be reserving txns or checking reservations, so null lockID and isLockHeld + testMethod.execute(new MetaFateStore<>(path, zk, null, null), context); + } + + /** + * This should be used for tests that will seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateWithDeadResCleaner(FateStore, LatchTestEnv)} + */ + @Override + public void stopManagerAndExecuteTest(FateTestExecutor testMethod) + throws Exception { + stopManager(); + ServerContext context = getCluster().getServerContext(); + String path = context.getZooKeeperRoot() + Constants.ZFATE; + var zk = context.getZooSession(); + ServiceLock testLock = null; + try { + testLock = new TestLock().createTestLock(context); + ZooUtil.LockID lockID = testLock.getLockID(); + Predicate isLockHeld = + lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); + testMethod.execute(new MetaFateStore<>(path, zk, lockID, isLockHeld), context); + } finally { + if (testLock != null) { + testLock.unlock(); + } + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java index d6e1410b7d5..e5165d982ab 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java @@ -18,9 +18,10 @@ */ package org.apache.accumulo.test.fate.meta; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; + import java.io.File; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.test.fate.FateStatusEnforcementIT; import org.apache.accumulo.test.fate.FateStoreUtil; @@ -46,7 +47,7 @@ public static void afterAllTeardown() throws Exception { @BeforeEach public void beforeEachSetup() throws Exception { store = new MetaFateStore<>(FateStoreUtil.MetaFateZKSetup.getZkFatePath(), - FateStoreUtil.MetaFateZKSetup.getZk(), AbstractFateStore.createDummyLockID(), null); + FateStoreUtil.MetaFateZKSetup.getZk(), createDummyLockID(), null); fateId = store.create(); txStore = store.reserve(fateId); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java index f8e89b101c3..5bb43d4b8ca 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.fate.meta; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java index 014b6c97bcb..26b8103e4d6 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java index de30125e5df..3a0aaeecd5f 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java @@ -18,8 +18,8 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java index 3fe3192e6ba..92f5fae847f 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java @@ -18,18 +18,56 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; +import java.util.function.Predicate; import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.lock.ServiceLock; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.test.fate.FateOpsCommandsIT; +import org.apache.accumulo.test.fate.MultipleStoresIT.LatchTestEnv; +import org.apache.accumulo.test.fate.TestLock; public class UserFateOpsCommandsIT extends FateOpsCommandsIT { + /** + * This should be used for tests that will not seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateNoDeadResCleaner(FateStore)} + */ @Override - public void executeTest(FateTestExecutor testMethod, int maxDeferred, + public void executeTest(FateTestExecutor testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { - testMethod.execute( - new UserFateStore<>(getCluster().getServerContext(), createDummyLockID(), null), - getCluster().getServerContext()); + var context = getCluster().getServerContext(); + // the test should not be reserving or checking reservations, so null lockID and isLockHeld + testMethod.execute(new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null), + context); + } + + /** + * This should be used for tests that will seed a txn with work/reserve a txn. Note that this + * should be used in conjunction with + * {@link FateOpsCommandsIT#initFateWithDeadResCleaner(FateStore, LatchTestEnv)} + */ + @Override + public void stopManagerAndExecuteTest(FateTestExecutor testMethod) + throws Exception { + stopManager(); + var context = getCluster().getServerContext(); + ServiceLock testLock = null; + try { + testLock = new TestLock().createTestLock(context); + ZooUtil.LockID lockID = testLock.getLockID(); + Predicate isLockHeld = + lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); + testMethod.execute( + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), lockID, isLockHeld), + context); + } finally { + if (testLock != null) { + testLock.unlock(); + } + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java index 22ecb9fe6e2..904882fcf29 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT.java @@ -19,10 +19,10 @@ package org.apache.accumulo.test.fate.user; import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.test.fate.FateStatusEnforcementIT; @@ -50,7 +50,7 @@ public void beforeEachSetup() throws Exception { client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); table = getUniqueNames(1)[0]; createFateTable(client, table); - store = new UserFateStore<>(client, table, AbstractFateStore.createDummyLockID(), null); + store = new UserFateStore<>(client, table, createDummyLockID(), null); fateId = store.create(); txStore = store.reserve(fateId); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java index 17cc06a5ac2..71af0824961 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java @@ -18,9 +18,9 @@ */ package org.apache.accumulo.test.fate.user; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId; import static org.apache.accumulo.test.fate.FateStoreUtil.createFateTable; +import static org.apache.accumulo.test.fate.TestLock.createDummyLockID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.BatchWriter; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java index de2ed8475f8..bd29f6e10e5 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java @@ -18,49 +18,81 @@ */ package org.apache.accumulo.test.functional; -import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TabletAvailability; +import org.apache.accumulo.core.client.admin.servers.ServerId; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.Timer; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.test.VerifyIngest; import org.apache.accumulo.test.VerifyIngest.VerifyParams; +import org.apache.accumulo.test.util.Wait; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This test verifies that when a lot of files are bulk imported into a table with one tablet and * then splits that not all data files go to the children tablets. */ public class BulkSplitOptimizationIT extends AccumuloClusterHarness { + private static final Logger log = LoggerFactory.getLogger(BulkSplitOptimizationIT.class); + + Path testDir; @Override protected Duration defaultTimeout() { - return Duration.ofMinutes(2); + return Duration.ofMinutes(5); } @BeforeEach public void alterConfig() throws Exception { try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + final int initialTserverCount = + client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size(); + log.info("Tserver count: {}", initialTserverCount); + Timer timer = Timer.startNew(); getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + Wait.waitFor( + () -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).isEmpty(), + 120_000); + log.info("Took {} ms to stop all tservers", timer.elapsed(MILLISECONDS)); + timer.restart(); getClusterControl().startAllServers(ServerType.TABLET_SERVER); + Wait.waitFor(() -> client.instanceOperations().getServers(ServerId.Type.TABLET_SERVER).size() + < initialTserverCount, 120_000); + log.info("Took {} ms to start all tservers", timer.elapsed(MILLISECONDS)); + + FileSystem fs = cluster.getFileSystem(); + testDir = new Path(cluster.getTemporaryPath(), "testmf"); + fs.deleteOnExit(testDir); + + timer.restart(); + FunctionalTestUtils.createRFiles(client, fs, testDir.toString(), ROWS, SPLITS, 8); + long elapsed = timer.elapsed(MILLISECONDS); + FileStatus[] stats = fs.listStatus(testDir); + log.info("Generated {} files in {} ms", stats.length, elapsed); } } @AfterEach public void resetConfig() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - getClusterControl().stopAllServers(ServerType.TABLET_SERVER); - getClusterControl().startAllServers(ServerType.TABLET_SERVER); - } + getClusterControl().stopAllServers(ServerType.TABLET_SERVER); + getClusterControl().startAllServers(ServerType.TABLET_SERVER); } private static final int ROWS = 100000; @@ -68,35 +100,51 @@ public void resetConfig() throws Exception { @Test public void testBulkSplitOptimization() throws Exception { + log.info("Starting BulkSplitOptimizationIT test"); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - final String tableName = getUniqueNames(1)[0]; - c.tableOperations().create(tableName); - c.tableOperations().setProperty(tableName, Property.TABLE_MAJC_RATIO.getKey(), "1000"); - c.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "1000"); - c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G"); - FileSystem fs = cluster.getFileSystem(); - Path testDir = new Path(cluster.getTemporaryPath(), "testmf"); - fs.deleteOnExit(testDir); - FunctionalTestUtils.createRFiles(c, fs, testDir.toString(), ROWS, SPLITS, 8); - FileStatus[] stats = fs.listStatus(testDir); - System.out.println("Number of generated files: " + stats.length); + final String tableName = getUniqueNames(1)[0]; + Map tableProps = new HashMap<>(); + tableProps.put(Property.TABLE_MAJC_RATIO.getKey(), "1000"); + tableProps.put(Property.TABLE_FILE_MAX.getKey(), "1000"); + tableProps.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "1G"); + + log.info("Creating table {}", tableName); + Timer timer = Timer.startNew(); + c.tableOperations().create(tableName, new NewTableConfiguration().setProperties(tableProps) + .withInitialTabletAvailability(TabletAvailability.HOSTED)); + log.info("Created table in {} ms. Starting bulk import", timer.elapsed(MILLISECONDS)); + + timer.restart(); c.tableOperations().importDirectory(testDir.toString()).to(tableName).load(); + log.info("Imported into table {} in {} ms", tableName, timer.elapsed(MILLISECONDS)); + timer.restart(); FunctionalTestUtils.checkSplits(c, tableName, 0, 0); FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100); + log.info("Checked splits and rfiles in {} ms", timer.elapsed(MILLISECONDS)); - // initiate splits + log.info("Lowering split threshold to 100K to initiate splits"); c.tableOperations().setProperty(tableName, Property.TABLE_SPLIT_THRESHOLD.getKey(), "100K"); - Thread.sleep(SECONDS.toMillis(2)); + timer.restart(); // wait until over split threshold -- should be 78 splits - while (c.tableOperations().listSplits(tableName).size() < 50) { - Thread.sleep(500); - } + Wait.waitFor(() -> { + try { + FunctionalTestUtils.checkSplits(c, tableName, 50, 100); + } catch (Exception e) { + if (e.getMessage().contains("splits points out of range")) { + return false; + } else { + throw e; + } + } + return true; + }); + + log.info("Took {} ms for split count to reach expected range", timer.elapsed(MILLISECONDS)); - FunctionalTestUtils.checkSplits(c, tableName, 50, 100); VerifyParams params = new VerifyParams(getClientProps(), tableName, ROWS); params.timestamp = 1; params.dataSize = 50; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 33d1bd33187..515f4a62133 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -21,7 +21,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -56,6 +55,7 @@ import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; @@ -259,17 +259,18 @@ public void getFateStatus() { InstanceId instanceId = context.getInstanceID(); var zk = context.getZooSession(); - MetaFateStore mfs = new MetaFateStore<>( - ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, createDummyLockID(), null); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); + MetaFateStore readOnlyMFS = + new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); var lockPath = context.getServerPaths().createTableLocksPath(tableId.toString()); - Map> fateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); + Map> readOnlyFateStores = + Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); - withLocks = admin.getStatus(fateStores, zk, lockPath, null, null, null); + withLocks = admin.getStatus(readOnlyFateStores, zk, lockPath, null, null, null); // call method that does not use locks. - noLocks = admin.getTransactionStatus(fateStores, null, null, null); + noLocks = admin.getTransactionStatus(readOnlyFateStores, null, null, null); // no zk exception, no need to retry break; @@ -352,10 +353,11 @@ private boolean lookupFateInZookeeper(final String tableName) throws KeeperExcep InstanceId instanceId = context.getInstanceID(); var zk = context.getZooSession(); - MetaFateStore mfs = new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, - zk, createDummyLockID(), null); + MetaFateStore readOnlyMFS = + new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, null, null); var lockPath = context.getServerPaths().createTableLocksPath(tableId.toString()); - AdminUtil.FateStatus fateStatus = admin.getStatus(mfs, zk, lockPath, null, null, null); + AdminUtil.FateStatus fateStatus = + admin.getStatus(readOnlyMFS, zk, lockPath, null, null, null); log.trace("current fates: {}", fateStatus.getTransactions().size()); @@ -382,8 +384,9 @@ private boolean lookupFateInAccumulo(final String tableName) throws KeeperExcept log.trace("tid: {}", tableId); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); - AdminUtil.FateStatus fateStatus = admin.getStatus(ufs, null, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); + AdminUtil.FateStatus fateStatus = admin.getStatus(readOnlyUFS, null, null, null); log.trace("current fates: {}", fateStatus.getTransactions().size()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index ee4b187a61f..a910a040a78 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -19,7 +19,6 @@ package org.apache.accumulo.test.functional; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -233,13 +232,14 @@ private static FateStatus getFateStatus(AccumuloCluster cluster) { AdminUtil admin = new AdminUtil<>(false); ServerContext context = cluster.getServerContext(); var zk = context.getZooSession(); - MetaFateStore mfs = new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, - zk, createDummyLockID(), null); - UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); - Map> fateStores = - Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); + MetaFateStore readOnlyMFS = + new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, zk, null, null); + UserFateStore readOnlyUFS = + new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null, null); + Map> readOnlyFateStores = + Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER, readOnlyUFS); var lockPath = context.getServerPaths().createTableLocksPath(); - return admin.getStatus(fateStores, zk, lockPath, null, null, null); + return admin.getStatus(readOnlyFateStores, zk, lockPath, null, null, null); } catch (KeeperException | InterruptedException e) { throw new RuntimeException(e); }