diff --git a/dev/com.ibm.ws.threading/src/com/ibm/ws/threading/PolicyExecutor.java b/dev/com.ibm.ws.threading/src/com/ibm/ws/threading/PolicyExecutor.java index ba2de88af168..1733428bfa9f 100644 --- a/dev/com.ibm.ws.threading/src/com/ibm/ws/threading/PolicyExecutor.java +++ b/dev/com.ibm.ws.threading/src/com/ibm/ws/threading/PolicyExecutor.java @@ -271,6 +271,15 @@ T invokeAny(Collection> tasks, PolicyTaskCallback[] ca */ Runnable registerQueueSizeCallback(int minAvailable, Runnable callback); + /** + * Registers a one-time callback to be invoked inline when the + * policy executor shuts down. This method is intended for optional use + * on a newly created policy executor instance. + * + * @param callback the callback, or null to unregister. + */ + void registerShutdownCallback(Runnable callback); + /** * Applies when using the execute or submit methods. Indicates whether or not to run the task on the * caller's thread when the queue is full and the maxWaitForEnqueue has been exceeded. diff --git a/dev/com.ibm.ws.threading/src/com/ibm/ws/threading/internal/PolicyExecutorImpl.java b/dev/com.ibm.ws.threading/src/com/ibm/ws/threading/internal/PolicyExecutorImpl.java index 734f4dbfe232..db15679b4967 100644 --- a/dev/com.ibm.ws.threading/src/com/ibm/ws/threading/internal/PolicyExecutorImpl.java +++ b/dev/com.ibm.ws.threading/src/com/ibm/ws/threading/internal/PolicyExecutorImpl.java @@ -70,6 +70,7 @@ public String toString() { private final AtomicReference cbConcurrency = new AtomicReference(); private final AtomicReference cbLateStart = new AtomicReference(); private final AtomicReference cbQueueSize = new AtomicReference(); + private Runnable cbShutdown; /** * Use this lock to make a consistent update to both expedite and expeditesAvailable, @@ -1108,6 +1109,11 @@ public Runnable registerQueueSizeCallback(int minAvailable, Runnable runnable) { return previous == null ? null : previous.runnable; } + @Override + public void registerShutdownCallback(Runnable callback) { + cbShutdown = callback; + } + @Override public PolicyExecutor runIfQueueFull(boolean runIfFull) { if (state.get() != State.ACTIVE) @@ -1195,6 +1201,9 @@ public void shutdown() { policyExecutors.remove(identifier); // remove tracking of this instance and allow identifier to be reused shutdownLatch.countDown(); + + if (cbShutdown != null) + cbShutdown.run(); } else while (state.get() == State.ENQUEUE_STOPPING) try { // Await completion of other thread that concurrently invokes shutdown. diff --git a/dev/com.ibm.ws.threading_policy_fat/test-applications/basicfat/src/web/PolicyExecutorServlet.java b/dev/com.ibm.ws.threading_policy_fat/test-applications/basicfat/src/web/PolicyExecutorServlet.java index 53da9ca2db0d..edfddd8a0ef0 100644 --- a/dev/com.ibm.ws.threading_policy_fat/test-applications/basicfat/src/web/PolicyExecutorServlet.java +++ b/dev/com.ibm.ws.threading_policy_fat/test-applications/basicfat/src/web/PolicyExecutorServlet.java @@ -5341,6 +5341,19 @@ public void testShutdownNowDuringInvokeAll() throws Exception { assertTrue(executor.awaitTermination(TIMEOUT_NS, TimeUnit.NANOSECONDS)); } + // Register a callback for shutdown. Verify that it gets invoked for both shutdown and shutdownNow methods. + @Test + public void testShutdownCallback() throws Exception { + PolicyExecutor executor1 = provider.create("testShutdownCallback-1"); + PolicyExecutor executor2 = provider.create("testShutdownCallback-2"); + AtomicInteger count = new AtomicInteger(); + executor1.registerShutdownCallback(() -> count.addAndGet(1)); + executor2.registerShutdownCallback(() -> count.addAndGet(2)); + executor1.shutdown(); + executor2.shutdownNow(); + assertEquals(3, count.get()); + } + // Submit a task that gets queued but times out (due to startTimeout) before it can run. @Test public void testStartTimeout() throws Exception {