Skip to content

Commit

Permalink
Merge pull request #15798 from njr-11/22321-shutdown-callback-for-pol…
Browse files Browse the repository at this point in the history
…icy-executor

CL22321 add callback for policy executor shutdown
  • Loading branch information
njr-11 authored Feb 4, 2021
2 parents ecfcd5f + 385d481 commit f0be9dc
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,15 @@ <T> T invokeAny(Collection<? extends Callable<T>> tasks, PolicyTaskCallback[] ca
*/
Runnable registerQueueSizeCallback(int minAvailable, Runnable callback);

/**
* Registers a one-time callback to be invoked inline when the
* policy executor shuts down. This method is intended for optional use
* on a newly created policy executor instance.
*
* @param callback the callback, or null to unregister.
*/
void registerShutdownCallback(Runnable callback);

/**
* Applies when using the <code>execute</code> or <code>submit</code> methods. Indicates whether or not to run the task on the
* caller's thread when the queue is full and the <code>maxWaitForEnqueue</code> has been exceeded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public String toString() {
private final AtomicReference<Callback> cbConcurrency = new AtomicReference<Callback>();
private final AtomicReference<Callback> cbLateStart = new AtomicReference<Callback>();
private final AtomicReference<Callback> cbQueueSize = new AtomicReference<Callback>();
private Runnable cbShutdown;

/**
* Use this lock to make a consistent update to both expedite and expeditesAvailable,
Expand Down Expand Up @@ -1108,6 +1109,11 @@ public Runnable registerQueueSizeCallback(int minAvailable, Runnable runnable) {
return previous == null ? null : previous.runnable;
}

@Override
public void registerShutdownCallback(Runnable callback) {
cbShutdown = callback;
}

@Override
public PolicyExecutor runIfQueueFull(boolean runIfFull) {
if (state.get() != State.ACTIVE)
Expand Down Expand Up @@ -1195,6 +1201,9 @@ public void shutdown() {
policyExecutors.remove(identifier); // remove tracking of this instance and allow identifier to be reused

shutdownLatch.countDown();

if (cbShutdown != null)
cbShutdown.run();
} else
while (state.get() == State.ENQUEUE_STOPPING)
try { // Await completion of other thread that concurrently invokes shutdown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5341,6 +5341,19 @@ public void testShutdownNowDuringInvokeAll() throws Exception {
assertTrue(executor.awaitTermination(TIMEOUT_NS, TimeUnit.NANOSECONDS));
}

// Register a callback for shutdown. Verify that it gets invoked for both shutdown and shutdownNow methods.
@Test
public void testShutdownCallback() throws Exception {
PolicyExecutor executor1 = provider.create("testShutdownCallback-1");
PolicyExecutor executor2 = provider.create("testShutdownCallback-2");
AtomicInteger count = new AtomicInteger();
executor1.registerShutdownCallback(() -> count.addAndGet(1));
executor2.registerShutdownCallback(() -> count.addAndGet(2));
executor1.shutdown();
executor2.shutdownNow();
assertEquals(3, count.get());
}

// Submit a task that gets queued but times out (due to startTimeout) before it can run.
@Test
public void testStartTimeout() throws Exception {
Expand Down

0 comments on commit f0be9dc

Please sign in to comment.