Skip to content

Commit

Permalink
[server][da-vinci] Instantiate the gracefulShutdownLatch lazily
Browse files Browse the repository at this point in the history
Instantiate the gracefulShutdownLatch lazily to avoid waiting for the
latch when shutting down an ingestion task that was just intialized.
This is more of an issue in unit tests where we mock out certain parts and
the run() method of SIT never get to execute so waiting on the latch that was
originally initialized during instance instantiation will timeout.
  • Loading branch information
xunyin8 committed Jan 16, 2025
1 parent c77004a commit bda42fd
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {

protected final ExecutorService parallelProcessingThreadPool;

protected final CountDownLatch gracefulShutdownLatch = new CountDownLatch(1);
protected Lazy<CountDownLatch> gracefulShutdownLatch = Lazy.of(() -> new CountDownLatch(1));
protected Lazy<ZKHelixAdmin> zkHelixAdmin;
protected final String hostName;

Expand Down Expand Up @@ -1628,6 +1628,7 @@ public boolean isIngestionTaskActive() {
*/
@Override
public void run() {
CountDownLatch shutdownLatch = gracefulShutdownLatch.get();
boolean doFlush = true;
try {
// Update thread name to include topic to make it easy debugging
Expand Down Expand Up @@ -1695,7 +1696,7 @@ public void run() {
CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0])).get(60, SECONDS);
}
// Release the latch after all the shutdown completes in DVC/Server.
getGracefulShutdownLatch().countDown();
shutdownLatch.countDown();
} catch (VeniceIngestionTaskKilledException e) {
LOGGER.info("{} has been killed.", ingestionTaskName);
ingestionNotificationDispatcher.reportKilled(partitionConsumptionStateMap.values(), e);
Expand Down Expand Up @@ -4103,7 +4104,7 @@ public void shutdownAndWait(int waitTime) {
long startTimeInMs = System.currentTimeMillis();
close();
try {
if (!getGracefulShutdownLatch().await(waitTime, SECONDS)) {
if (getGracefulShutdownLatch().isPresent() && !getGracefulShutdownLatch().get().await(waitTime, SECONDS)) {
LOGGER.warn(
"Unable to shutdown ingestion task of topic: {} gracefully in {}ms",
kafkaVersionTopic,
Expand Down Expand Up @@ -4622,7 +4623,7 @@ Function<String, String> getKafkaClusterUrlResolver() {
return kafkaClusterUrlResolver;
}

CountDownLatch getGracefulShutdownLatch() {
Lazy<CountDownLatch> getGracefulShutdownLatch() {
return gracefulShutdownLatch;
}

Expand Down

0 comments on commit bda42fd

Please sign in to comment.