From bda42fdcd84ea39fdffe8803465c6008b15ea143 Mon Sep 17 00:00:00 2001 From: Xun Yin Date: Thu, 16 Jan 2025 14:52:54 -0800 Subject: [PATCH] [server][da-vinci] Instantiate the gracefulShutdownLatch lazily Instantiate the gracefulShutdownLatch lazily to avoid waiting for the latch when shutting down an ingestion task that was just intialized. This is more of an issue in unit tests where we mock out certain parts and the run() method of SIT never get to execute so waiting on the latch that was originally initialized during instance instantiation will timeout. --- .../davinci/kafka/consumer/StoreIngestionTask.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 64fab3eca4..ab370d18e3 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -356,7 +356,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final ExecutorService parallelProcessingThreadPool; - protected final CountDownLatch gracefulShutdownLatch = new CountDownLatch(1); + protected Lazy gracefulShutdownLatch = Lazy.of(() -> new CountDownLatch(1)); protected Lazy zkHelixAdmin; protected final String hostName; @@ -1628,6 +1628,7 @@ public boolean isIngestionTaskActive() { */ @Override public void run() { + CountDownLatch shutdownLatch = gracefulShutdownLatch.get(); boolean doFlush = true; try { // Update thread name to include topic to make it easy debugging @@ -1695,7 +1696,7 @@ public void run() { CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0])).get(60, SECONDS); } // Release the latch after all the shutdown completes in DVC/Server. - getGracefulShutdownLatch().countDown(); + shutdownLatch.countDown(); } catch (VeniceIngestionTaskKilledException e) { LOGGER.info("{} has been killed.", ingestionTaskName); ingestionNotificationDispatcher.reportKilled(partitionConsumptionStateMap.values(), e); @@ -4103,7 +4104,7 @@ public void shutdownAndWait(int waitTime) { long startTimeInMs = System.currentTimeMillis(); close(); try { - if (!getGracefulShutdownLatch().await(waitTime, SECONDS)) { + if (getGracefulShutdownLatch().isPresent() && !getGracefulShutdownLatch().get().await(waitTime, SECONDS)) { LOGGER.warn( "Unable to shutdown ingestion task of topic: {} gracefully in {}ms", kafkaVersionTopic, @@ -4622,7 +4623,7 @@ Function getKafkaClusterUrlResolver() { return kafkaClusterUrlResolver; } - CountDownLatch getGracefulShutdownLatch() { + Lazy getGracefulShutdownLatch() { return gracefulShutdownLatch; }