Skip to content

Commit

Permalink
Merge branch 'main' into zc-recursive-watchers
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 17, 2025
2 parents 733ce37 + 428f7c7 commit 3281387
Show file tree
Hide file tree
Showing 48 changed files with 1,230 additions and 573 deletions.
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class Constants {

public static final String ZTABLE_LOCKS = "/table_locks";
public static final String ZMINI_LOCK = "/mini";
public static final String ZADMIN_LOCK = "/admin/lock";
public static final String ZTEST_LOCK = "/test/lock";

public static final String BULK_PREFIX = "b-";
public static final String BULK_RENAME_FILE = "renames.json";
Expand Down
16 changes: 7 additions & 9 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -465,14 +465,12 @@ public enum Property {
"The number of threads used to seed fate split task, the actual split work is done by fate"
+ " threads.",
"4.0.0"),

MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE(
"manager.compaction.major.service.queue.initial.size", "10000", PropertyType.COUNT,
"The initial size of each resource groups compaction job priority queue.", "4.0.0"),
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE_FACTOR(
"manager.compaction.major.service.queue.size.factor", "3.0", PropertyType.FRACTION,
"The dynamic resizing of the compaction job priority queue is based on"
+ " the number of compactors for the group multiplied by this factor.",
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE("manager.compaction.major.service.queue.size",
"1M", PropertyType.MEMORY,
"The data size of each resource groups compaction job priority queue. The memory size of "
+ "each compaction job is estimated and the sum of these sizes per resource group will not "
+ "exceed this setting. When the size is exceeded the lowest priority jobs are dropped as "
+ "needed.",
"4.0.0"),
SPLIT_PREFIX("split.", null, PropertyType.PREFIX,
"System wide properties related to splitting tablets.", "3.1.0"),
Expand Down Expand Up @@ -1460,7 +1458,7 @@ public static boolean isValidTablePropertyKey(String key) {
RPC_MAX_MESSAGE_SIZE,

// compaction coordiantor properties
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE,
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE,

// block cache options
GENERAL_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ public FateId newRandomId(FateInstanceType instanceType) {
// Keeps track of the number of concurrent callers to waitForStatusChange()
private final AtomicInteger concurrentStatusChangeCallers = new AtomicInteger(0);

public AbstractFateStore() {
this(createDummyLockID(), null, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
}

public AbstractFateStore(ZooUtil.LockID lockID, Predicate<ZooUtil.LockID> isLockHeld) {
this(lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
}
Expand All @@ -114,9 +110,7 @@ public AbstractFateStore(ZooUtil.LockID lockID, Predicate<ZooUtil.LockID> isLock
this.maxDeferred = maxDeferred;
this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator);
this.deferred = Collections.synchronizedMap(new HashMap<>());
this.lockID = Objects.requireNonNull(lockID);
// If the store is used for a Fate which runs a dead reservation cleaner,
// this should be non-null, otherwise null is fine
this.lockID = lockID;
this.isLockHeld = isLockHeld;
}

Expand Down Expand Up @@ -291,6 +285,10 @@ protected void verifyFateKey(FateId fateId, Optional<FateKey> fateKeySeen,
"Collision detected for fate id " + fateId);
}

protected void verifyLock(ZooUtil.LockID lockID, FateId fateId) {
Preconditions.checkState(lockID != null, "Tried to reserve " + fateId + " with null lockID");
}

protected abstract Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses);

protected abstract TStatus _getStatus(FateId fateId);
Expand Down Expand Up @@ -450,14 +448,4 @@ protected static Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
throw new IllegalStateException("Bad node data " + txInfo);
}
}

/**
* this is a temporary method used to create a dummy lock when using a FateStore outside the
* context of a Manager (one example is testing) so reservations can still be made.
*
* @return a dummy {@link ZooUtil.LockID}
*/
public static ZooUtil.LockID createDummyLockID() {
return new ZooUtil.LockID("/path", "node", 123);
}
}
Loading

0 comments on commit 3281387

Please sign in to comment.