Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Fix PulsarService/BrokerService shutdown when brokerShu…
Browse files Browse the repository at this point in the history
…tdownTimeoutMs=0 (apache#21496)
  • Loading branch information
lhotari authored Nov 1, 2023
1 parent 7c6a4b8 commit 6ab322e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,14 +639,18 @@ private synchronized void resetMetricsServlet() {
}

private CompletableFuture<Void> addTimeoutHandling(CompletableFuture<Void> future) {
long brokerShutdownTimeoutMs = getConfiguration().getBrokerShutdownTimeoutMs();
if (brokerShutdownTimeoutMs <= 0) {
return future;
}
ScheduledExecutorService shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
new ExecutorProvider.ExtendedThreadFactory(getClass().getSimpleName() + "-shutdown"));
FutureUtil.addTimeoutHandling(future,
Duration.ofMillis(Math.max(1L, getConfiguration().getBrokerShutdownTimeoutMs())),
Duration.ofMillis(brokerShutdownTimeoutMs),
shutdownExecutor, () -> FutureUtil.createTimeoutException("Timeout in close", getClass(), "close"));
future.handle((v, t) -> {
if (t != null && getConfiguration().getBrokerShutdownTimeoutMs() > 0) {
LOG.info("Shutdown timed out after {} ms", getConfiguration().getBrokerShutdownTimeoutMs());
if (t instanceof TimeoutException) {
LOG.info("Shutdown timed out after {} ms", brokerShutdownTimeoutMs);
LOG.info(ThreadDumpUtil.buildThreadDiagnosticString());
}
// shutdown the shutdown executor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ public CompletableFuture<Void> closeAsync() {
for (EventLoopGroup group : protocolHandlersWorkerGroups) {
shutdownEventLoops.add(shutdownEventLoopGracefully(group));
}

CompletableFuture<Void> shutdownFuture =
CompletableFuture.allOf(shutdownEventLoops.toArray(new CompletableFuture[0]))
.handle((v, t) -> {
Expand All @@ -836,7 +837,7 @@ public CompletableFuture<Void> closeAsync() {
}
return null;
})
.thenCompose(__ -> {
.thenComposeAsync(__ -> {
log.info("Continuing to second phase in shutdown.");

List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>();
Expand Down Expand Up @@ -900,6 +901,12 @@ public CompletableFuture<Void> closeAsync() {
return null;
});
return combined;
}, runnable -> {
// run the 2nd phase of the shutdown in a separate thread
Thread thread = new Thread(runnable);
thread.setName("BrokerService-shutdown-phase2");
thread.setDaemon(false);
thread.start();
});
FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () -> cancellableDownstreamFutureReference
.thenAccept(future -> future.cancel(false)));
Expand Down

0 comments on commit 6ab322e

Please sign in to comment.