diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 64fab3eca4..ab370d18e3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -356,7 +356,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final ExecutorService parallelProcessingThreadPool; - protected final CountDownLatch gracefulShutdownLatch = new CountDownLatch(1); + protected Lazy gracefulShutdownLatch = Lazy.of(() -> new CountDownLatch(1)); protected Lazy zkHelixAdmin; protected final String hostName; @@ -1628,6 +1628,7 @@ public boolean isIngestionTaskActive() { */ @Override public void run() { + CountDownLatch shutdownLatch = gracefulShutdownLatch.get(); boolean doFlush = true; try { // Update thread name to include topic to make it easy debugging @@ -1695,7 +1696,7 @@ public void run() { CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0])).get(60, SECONDS); } // Release the latch after all the shutdown completes in DVC/Server. - getGracefulShutdownLatch().countDown(); + shutdownLatch.countDown(); } catch (VeniceIngestionTaskKilledException e) { LOGGER.info("{} has been killed.", ingestionTaskName); ingestionNotificationDispatcher.reportKilled(partitionConsumptionStateMap.values(), e); @@ -4103,7 +4104,7 @@ public void shutdownAndWait(int waitTime) { long startTimeInMs = System.currentTimeMillis(); close(); try { - if (!getGracefulShutdownLatch().await(waitTime, SECONDS)) { + if (getGracefulShutdownLatch().isPresent() && !getGracefulShutdownLatch().get().await(waitTime, SECONDS)) { LOGGER.warn( "Unable to shutdown ingestion task of topic: {} gracefully in {}ms", kafkaVersionTopic, @@ -4622,7 +4623,7 @@ Function getKafkaClusterUrlResolver() { return kafkaClusterUrlResolver; } - CountDownLatch getGracefulShutdownLatch() { + Lazy getGracefulShutdownLatch() { return gracefulShutdownLatch; }