diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index de6e7073ecf..d209260e0a5 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -457,7 +457,7 @@ public Fate(T environment, FateStore store, boolean runDeadResCleaner, idleCountHistory.add(workQueue.getWaitingConsumerCount()); } } - }, 3, 30, SECONDS)); + }, getInitialDelay().toSeconds(), getPoolWatcherDelay().toSeconds(), SECONDS)); this.transactionExecutor = pool; ScheduledExecutorService deadResCleanerExecutor = null; @@ -466,8 +466,9 @@ public Fate(T environment, FateStore store, boolean runDeadResCleaner, // reservations held by dead processes, if they exist. deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1, store.type() + "-dead-reservation-cleaner-pool"); - ScheduledFuture deadReservationCleaner = deadResCleanerExecutor.scheduleWithFixedDelay( - new DeadReservationCleaner(), 3, getDeadResCleanupDelay().toSeconds(), SECONDS); + ScheduledFuture deadReservationCleaner = + deadResCleanerExecutor.scheduleWithFixedDelay(new DeadReservationCleaner(), + getInitialDelay().toSeconds(), getDeadResCleanupDelay().toSeconds(), SECONDS); ThreadPools.watchCriticalScheduledTask(deadReservationCleaner); } this.deadResCleanerExecutor = deadResCleanerExecutor; @@ -476,10 +477,18 @@ public Fate(T environment, FateStore store, boolean runDeadResCleaner, this.workFinder.start(); } + public Duration getInitialDelay() { + return Duration.ofSeconds(3); + } + public Duration getDeadResCleanupDelay() { return Duration.ofMinutes(3); } + public Duration getPoolWatcherDelay() { + return Duration.ofSeconds(30); + } + // get a transaction id back to the requester before doing any work public FateId startTransaction() { return store.create(); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index e7b3e073c9b..76ff585df0c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -238,7 +238,7 @@ protected void testTransactionStatus(FateStore store, ServerContext sct try { // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); + Thread.sleep(fate.getInitialDelay().toMillis() * 2); callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); @@ -295,7 +295,7 @@ protected void testCancelWhileNew(FateStore store, ServerContext sctx) try { // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); + Thread.sleep(fate.getInitialDelay().toMillis() * 2); callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); @@ -330,7 +330,7 @@ protected void testCancelWhileSubmittedAndRunning(FateStore store, Serv try { // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); + Thread.sleep(fate.getInitialDelay().toMillis() * 2); callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); @@ -365,7 +365,7 @@ protected void testCancelWhileInCall(FateStore store, ServerContext sct try { // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); + Thread.sleep(fate.getInitialDelay().toMillis() * 2); callStarted = new CountDownLatch(1); finishCall = new CountDownLatch(1); @@ -402,7 +402,7 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx try { // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); + Thread.sleep(fate.getInitialDelay().toMillis() * 2); DeferredTestRepo.executedCalls.set(0); // Initialize the repo to have a delay of 30 seconds @@ -479,7 +479,7 @@ protected void testRepoFails(FateStore store, ServerContext sctx) throw try { // Wait for the transaction runner to be scheduled. - Thread.sleep(3000); + Thread.sleep(fate.getInitialDelay().toMillis() * 2); List expectedUndoOrder = List.of("OP3", "OP2", "OP1"); /*