From ebf532154c00b80b30e122d552732046edcad65d Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Wed, 3 Jan 2024 15:02:34 -0800 Subject: [PATCH] not to block the thread calling start callback --- .../d2/balancer/util/WarmUpLoadBalancer.java | 201 ++++++++++-------- .../balancer/util/WarmUpLoadBalancerTest.java | 4 +- 2 files changed, 117 insertions(+), 88 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java index 6ef1451310..eafad048e1 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java @@ -42,6 +42,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -114,91 +115,117 @@ public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService ser @Override public void start(Callback callback) { LOG.info("{} enabled", _printName); - _loadBalancer.start(new Callback() { - @Override - public void onError(Throwable e) { - callback.onError(e); - } + Callback prepareWarmUpCallback = new Callback() { @Override - public void onSuccess(None result) { - _allStartTime = SystemClock.instance().currentTimeMillis(); - - Future prepareTaskFuture = _executorService.submit(() -> prepareWarmUp()); - try - { - prepareTaskFuture.get(_warmUpTimeoutSeconds, TimeUnit.SECONDS); - } - catch (TimeoutException e) + public void onError(Throwable e) { + if (e instanceof TimeoutException) { LOG.info("{} hit timeout: {}s. The WarmUp will continue in background", _printName, _warmUpTimeoutSeconds); callback.onSuccess(None.none()); } - catch (Exception e) + else { LOG.error("{} failed to fetch dual read mode, continuing warmup.", _printName, e); } - finally - { - continueWarmUp(callback); - } + continueWarmUp(callback); + } + + @Override + public void onSuccess(None result) { + continueWarmUp(callback); + } + }; + + _loadBalancer.start(new Callback() { + @Override + public void onError(Throwable e) { + callback.onError(e); + } + + @Override + public void onSuccess(None result) { + _allStartTime = SystemClock.instance().currentTimeMillis(); + _executorService.submit(() -> prepareWarmUp(prepareWarmUpCallback)); } }); } - private void prepareWarmUp() + private void prepareWarmUp(Callback callback) { - _downstreamServicesFetcher.getServiceNames(serviceNames -> { - // The downstreamServicesFetcher is the core group of the services that will be used during the lifecycle - _usedServices.addAll(serviceNames); + AtomicBoolean hasTimedOut = new AtomicBoolean(false); - LOG.info("{} starting to fetch dual read mode with timeout: {}s, for {} services: [{}]", - _printName, _warmUpTimeoutSeconds, serviceNames.size(), String.join(", ", serviceNames)); + try { + _downstreamServicesFetcher.getServiceNames(serviceNames -> { + // The downstreamServicesFetcher is the core group of the services that will be used during the lifecycle + _usedServices.addAll(serviceNames); - _servicesToWarmUp = serviceNames; + LOG.info("{} starting to fetch dual read mode with timeout: {}s, for {} services: [{}]", + _printName, _warmUpTimeoutSeconds, serviceNames.size(), String.join(", ", serviceNames)); - if (_dualReadStateManager != null) - { - // warm up dual read mode for the service and its belonging cluster. This is needed BEFORE fetching the actual - // data of service/cluster/uri (in the WarmUpTask below), so that when the actual data is received, they can - // be reported to dual read monitoring under dual read mode. - DualReadModeProvider dualReadModeProvider = _dualReadStateManager.getDualReadModeProvider(); - _servicesToWarmUp = serviceNames.stream().filter(serviceName -> { - DualReadModeProvider.DualReadMode dualReadMode = dualReadModeProvider.getDualReadMode(serviceName); - _dualReadStateManager.updateService(serviceName, dualReadMode); - - boolean res = isModeToWarmUp(dualReadMode, _isIndis); - if (!res) - { - LOG.info("{} skipping service: {} based on its dual read mode: {}", - _printName, serviceName, dualReadMode); - } - return res; - }).collect(Collectors.toList()); - - _servicesToWarmUp.forEach(serviceName -> { - // To warm up the cluster dual read mode, we need to fetch the service data to know its belonging cluster. - LOG.info("{} fetching service data for service: {}", _printName, serviceName); - - // NOTE: This call blocks! - getLoadBalancedServiceProperties(serviceName, new Callback() { - @Override - public void onError(Throwable e) { - LOG.warn("{} failed to warm up dual read mode for service: {}", _printName, serviceName, e); - } + _servicesToWarmUp = serviceNames; - @Override - public void onSuccess(ServiceProperties result) { - _dualReadStateManager.updateCluster(result.getClusterName(), - _dualReadStateManager.getServiceDualReadMode(result.getServiceName())); + if (_dualReadStateManager != null) + { + // warm up dual read mode for the service and its belonging cluster. This is needed BEFORE fetching the actual + // data of service/cluster/uri (in the WarmUpTask below), so that when the actual data is received, they can + // be reported to dual read monitoring under dual read mode. + DualReadModeProvider dualReadModeProvider = _dualReadStateManager.getDualReadModeProvider(); + _servicesToWarmUp = serviceNames.stream().filter(serviceName -> { + DualReadModeProvider.DualReadMode dualReadMode = dualReadModeProvider.getDualReadMode(serviceName); + _dualReadStateManager.updateService(serviceName, dualReadMode); + + boolean res = isModeToWarmUp(dualReadMode, _isIndis); + if (!res) + { + LOG.info("{} skipping service: {} based on its dual read mode: {}", + _printName, serviceName, dualReadMode); + } + return res; + }).collect(Collectors.toList()); + + _servicesToWarmUp.forEach(serviceName -> { + // check timeout before continue + if (!hasTimedOut.get() + && SystemClock.instance().currentTimeMillis() - _allStartTime > _warmUpTimeoutSeconds * 1000L) + { + hasTimedOut.set(true); + callback.onError(new TimeoutException()); } + + // To warm up the cluster dual read mode, we need to fetch the service data to know its belonging cluster. + LOG.info("{} fetching service data for service: {}", _printName, serviceName); + + // NOTE: This call blocks! + getLoadBalancedServiceProperties(serviceName, new Callback() { + @Override + public void onError(Throwable e) { + LOG.warn("{} failed to warm up dual read mode for service: {}", _printName, serviceName, e); + } + + @Override + public void onSuccess(ServiceProperties result) { + _dualReadStateManager.updateCluster(result.getClusterName(), + _dualReadStateManager.getServiceDualReadMode(result.getServiceName())); + } + }); }); - }); - LOG.info("{} fetched dual read mode for {} services in {}ms. {} services need to warm up.", - _printName, serviceNames.size(), SystemClock.instance().currentTimeMillis() - _allStartTime, - _servicesToWarmUp.size()); - } - }); + + LOG.info("{} fetched dual read mode for {} services in {}ms. {} services need to warm up.", + _printName, serviceNames.size(), SystemClock.instance().currentTimeMillis() - _allStartTime, + _servicesToWarmUp.size()); + } + + if (!hasTimedOut.get()) + { + callback.onSuccess(None.none()); + } + }); + } + catch (Exception e) + { + callback.onError(e); + } } private void continueWarmUp(Callback callback) @@ -222,29 +249,31 @@ private void continueWarmUp(Callback callback) */ private void warmUpServices(Callback startUpCallback) { - long timeoutSeconds = Math.max(0, - _warmUpTimeoutSeconds - (SystemClock.instance().currentTimeMillis() - _allStartTime) / 1000); - LOG.info("{} starting to warm up with timeout: {}s for {} services: [{}]", - _printName, timeoutSeconds, _servicesToWarmUp.size(), String.join(", ", _servicesToWarmUp)); + long timeoutMilli = Math.max(0, + _warmUpTimeoutSeconds * 1000L - (SystemClock.instance().currentTimeMillis() - _allStartTime)); + LOG.info("{} starting to warm up with timeout: {}ms for {} services: [{}]", + _printName, timeoutMilli, _servicesToWarmUp.size(), String.join(", ", _servicesToWarmUp)); - Callback timeoutCallback = new TimeoutCallback<>(_executorService, timeoutSeconds, TimeUnit.SECONDS, new Callback() - { - @Override - public void onError(Throwable e) - { - LOG.info("{} hit timeout, {}ms since initial start time, continuing startup. " - + "Warmup will continue in background", - _printName, SystemClock.instance().currentTimeMillis() - _allStartTime, e); - startUpCallback.onSuccess(None.none()); - } + Callback timeoutCallback = new TimeoutCallback<>(_executorService, timeoutMilli, TimeUnit.MILLISECONDS, + new Callback() + { + @Override + public void onError(Throwable e) + { + LOG.info("{} hit timeout after {}ms since initial start time, continuing startup. " + + "Warmup will continue in background", + _printName, SystemClock.instance().currentTimeMillis() - _allStartTime, e); + startUpCallback.onSuccess(None.none()); + } - @Override - public void onSuccess(None result) - { - LOG.info("{} completed", _printName); - startUpCallback.onSuccess(None.none()); - } - }, "This message will never be used, even in case of timeout, no exception should be passed up"); + @Override + public void onSuccess(None result) + { + LOG.info("{} completed", _printName); + startUpCallback.onSuccess(None.none()); + } + }, "This message will never be used, even in case of timeout, no exception should be passed up" + ); try { diff --git a/d2/src/test/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancerTest.java b/d2/src/test/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancerTest.java index 4ee8ffcd39..bf2ea44545 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancerTest.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancerTest.java @@ -375,11 +375,11 @@ public Object[][] modesToWarmUpDataProvider() public void testSuccessWithDualRead(DualReadModeProvider.DualReadMode mode, Boolean isIndis) throws InterruptedException, ExecutionException, TimeoutException { - int warmUpTimeout = 3; + int warmUpTimeout = 4; createDefaultServicesIniFiles(); setDualReadMode(mode); - // 3 dual read fetches take 1.5s, 3 warmups take at most 3 * (500 +/- 10) ms + // 3 dual read fetches take 1.5s, 3 warmups take at most 3 * (500 +/- 10) ms. Total at most is 3030 ms. TestLoadBalancer balancer = new TestLoadBalancer(500, 500); AtomicInteger completedWarmUpCount = balancer.getCompletedRequestCount(); LoadBalancer warmUpLb = new WarmUpLoadBalancer(balancer, balancer, Executors.newSingleThreadScheduledExecutor(),