Skip to content

Commit

Permalink
not to block the thread calling start callback
Browse files Browse the repository at this point in the history
  • Loading branch information
bohhyang committed Jan 3, 2024
1 parent c8f026d commit ebf5321
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 88 deletions.
201 changes: 115 additions & 86 deletions d2/src/main/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -114,91 +115,117 @@ public WarmUpLoadBalancer(LoadBalancerWithFacilities balancer, WarmUpService ser
@Override
public void start(Callback<None> callback) {
LOG.info("{} enabled", _printName);
_loadBalancer.start(new Callback<None>() {
@Override
public void onError(Throwable e) {
callback.onError(e);
}

Callback<None> prepareWarmUpCallback = new Callback<None>() {
@Override
public void onSuccess(None result) {
_allStartTime = SystemClock.instance().currentTimeMillis();

Future<?> prepareTaskFuture = _executorService.submit(() -> prepareWarmUp());
try
{
prepareTaskFuture.get(_warmUpTimeoutSeconds, TimeUnit.SECONDS);
}
catch (TimeoutException e)
public void onError(Throwable e) {
if (e instanceof TimeoutException)
{
LOG.info("{} hit timeout: {}s. The WarmUp will continue in background", _printName, _warmUpTimeoutSeconds);
callback.onSuccess(None.none());
}
catch (Exception e)
else
{
LOG.error("{} failed to fetch dual read mode, continuing warmup.", _printName, e);
}
finally
{
continueWarmUp(callback);
}
continueWarmUp(callback);
}

@Override
public void onSuccess(None result) {
continueWarmUp(callback);
}
};

_loadBalancer.start(new Callback<None>() {
@Override
public void onError(Throwable e) {
callback.onError(e);
}

@Override
public void onSuccess(None result) {
_allStartTime = SystemClock.instance().currentTimeMillis();
_executorService.submit(() -> prepareWarmUp(prepareWarmUpCallback));
}
});
}

private void prepareWarmUp()
private void prepareWarmUp(Callback<None> callback)
{
_downstreamServicesFetcher.getServiceNames(serviceNames -> {
// The downstreamServicesFetcher is the core group of the services that will be used during the lifecycle
_usedServices.addAll(serviceNames);
AtomicBoolean hasTimedOut = new AtomicBoolean(false);

LOG.info("{} starting to fetch dual read mode with timeout: {}s, for {} services: [{}]",
_printName, _warmUpTimeoutSeconds, serviceNames.size(), String.join(", ", serviceNames));
try {
_downstreamServicesFetcher.getServiceNames(serviceNames -> {
// The downstreamServicesFetcher is the core group of the services that will be used during the lifecycle
_usedServices.addAll(serviceNames);

_servicesToWarmUp = serviceNames;
LOG.info("{} starting to fetch dual read mode with timeout: {}s, for {} services: [{}]",
_printName, _warmUpTimeoutSeconds, serviceNames.size(), String.join(", ", serviceNames));

if (_dualReadStateManager != null)
{
// warm up dual read mode for the service and its belonging cluster. This is needed BEFORE fetching the actual
// data of service/cluster/uri (in the WarmUpTask below), so that when the actual data is received, they can
// be reported to dual read monitoring under dual read mode.
DualReadModeProvider dualReadModeProvider = _dualReadStateManager.getDualReadModeProvider();
_servicesToWarmUp = serviceNames.stream().filter(serviceName -> {
DualReadModeProvider.DualReadMode dualReadMode = dualReadModeProvider.getDualReadMode(serviceName);
_dualReadStateManager.updateService(serviceName, dualReadMode);

boolean res = isModeToWarmUp(dualReadMode, _isIndis);
if (!res)
{
LOG.info("{} skipping service: {} based on its dual read mode: {}",
_printName, serviceName, dualReadMode);
}
return res;
}).collect(Collectors.toList());

_servicesToWarmUp.forEach(serviceName -> {
// To warm up the cluster dual read mode, we need to fetch the service data to know its belonging cluster.
LOG.info("{} fetching service data for service: {}", _printName, serviceName);

// NOTE: This call blocks!
getLoadBalancedServiceProperties(serviceName, new Callback<ServiceProperties>() {
@Override
public void onError(Throwable e) {
LOG.warn("{} failed to warm up dual read mode for service: {}", _printName, serviceName, e);
}
_servicesToWarmUp = serviceNames;

@Override
public void onSuccess(ServiceProperties result) {
_dualReadStateManager.updateCluster(result.getClusterName(),
_dualReadStateManager.getServiceDualReadMode(result.getServiceName()));
if (_dualReadStateManager != null)
{
// warm up dual read mode for the service and its belonging cluster. This is needed BEFORE fetching the actual
// data of service/cluster/uri (in the WarmUpTask below), so that when the actual data is received, they can
// be reported to dual read monitoring under dual read mode.
DualReadModeProvider dualReadModeProvider = _dualReadStateManager.getDualReadModeProvider();
_servicesToWarmUp = serviceNames.stream().filter(serviceName -> {
DualReadModeProvider.DualReadMode dualReadMode = dualReadModeProvider.getDualReadMode(serviceName);
_dualReadStateManager.updateService(serviceName, dualReadMode);

boolean res = isModeToWarmUp(dualReadMode, _isIndis);
if (!res)
{
LOG.info("{} skipping service: {} based on its dual read mode: {}",
_printName, serviceName, dualReadMode);
}
return res;
}).collect(Collectors.toList());

_servicesToWarmUp.forEach(serviceName -> {
// check timeout before continue
if (!hasTimedOut.get()
&& SystemClock.instance().currentTimeMillis() - _allStartTime > _warmUpTimeoutSeconds * 1000L)
{
hasTimedOut.set(true);
callback.onError(new TimeoutException());
}

// To warm up the cluster dual read mode, we need to fetch the service data to know its belonging cluster.
LOG.info("{} fetching service data for service: {}", _printName, serviceName);

// NOTE: This call blocks!
getLoadBalancedServiceProperties(serviceName, new Callback<ServiceProperties>() {
@Override
public void onError(Throwable e) {
LOG.warn("{} failed to warm up dual read mode for service: {}", _printName, serviceName, e);
}

@Override
public void onSuccess(ServiceProperties result) {
_dualReadStateManager.updateCluster(result.getClusterName(),
_dualReadStateManager.getServiceDualReadMode(result.getServiceName()));
}
});
});
});
LOG.info("{} fetched dual read mode for {} services in {}ms. {} services need to warm up.",
_printName, serviceNames.size(), SystemClock.instance().currentTimeMillis() - _allStartTime,
_servicesToWarmUp.size());
}
});

LOG.info("{} fetched dual read mode for {} services in {}ms. {} services need to warm up.",
_printName, serviceNames.size(), SystemClock.instance().currentTimeMillis() - _allStartTime,
_servicesToWarmUp.size());
}

if (!hasTimedOut.get())
{
callback.onSuccess(None.none());
}
});
}
catch (Exception e)
{
callback.onError(e);
}
}

private void continueWarmUp(Callback<None> callback)
Expand All @@ -222,29 +249,31 @@ private void continueWarmUp(Callback<None> callback)
*/
private void warmUpServices(Callback<None> startUpCallback)
{
long timeoutSeconds = Math.max(0,
_warmUpTimeoutSeconds - (SystemClock.instance().currentTimeMillis() - _allStartTime) / 1000);
LOG.info("{} starting to warm up with timeout: {}s for {} services: [{}]",
_printName, timeoutSeconds, _servicesToWarmUp.size(), String.join(", ", _servicesToWarmUp));
long timeoutMilli = Math.max(0,
_warmUpTimeoutSeconds * 1000L - (SystemClock.instance().currentTimeMillis() - _allStartTime));
LOG.info("{} starting to warm up with timeout: {}ms for {} services: [{}]",
_printName, timeoutMilli, _servicesToWarmUp.size(), String.join(", ", _servicesToWarmUp));

Callback<None> timeoutCallback = new TimeoutCallback<>(_executorService, timeoutSeconds, TimeUnit.SECONDS, new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.info("{} hit timeout, {}ms since initial start time, continuing startup. "
+ "Warmup will continue in background",
_printName, SystemClock.instance().currentTimeMillis() - _allStartTime, e);
startUpCallback.onSuccess(None.none());
}
Callback<None> timeoutCallback = new TimeoutCallback<>(_executorService, timeoutMilli, TimeUnit.MILLISECONDS,
new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.info("{} hit timeout after {}ms since initial start time, continuing startup. "
+ "Warmup will continue in background",
_printName, SystemClock.instance().currentTimeMillis() - _allStartTime, e);
startUpCallback.onSuccess(None.none());
}

@Override
public void onSuccess(None result)
{
LOG.info("{} completed", _printName);
startUpCallback.onSuccess(None.none());
}
}, "This message will never be used, even in case of timeout, no exception should be passed up");
@Override
public void onSuccess(None result)
{
LOG.info("{} completed", _printName);
startUpCallback.onSuccess(None.none());
}
}, "This message will never be used, even in case of timeout, no exception should be passed up"
);

try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,11 @@ public Object[][] modesToWarmUpDataProvider()
public void testSuccessWithDualRead(DualReadModeProvider.DualReadMode mode, Boolean isIndis)
throws InterruptedException, ExecutionException, TimeoutException
{
int warmUpTimeout = 3;
int warmUpTimeout = 4;
createDefaultServicesIniFiles();
setDualReadMode(mode);

// 3 dual read fetches take 1.5s, 3 warmups take at most 3 * (500 +/- 10) ms
// 3 dual read fetches take 1.5s, 3 warmups take at most 3 * (500 +/- 10) ms. Total at most is 3030 ms.
TestLoadBalancer balancer = new TestLoadBalancer(500, 500);
AtomicInteger completedWarmUpCount = balancer.getCompletedRequestCount();
LoadBalancer warmUpLb = new WarmUpLoadBalancer(balancer, balancer, Executors.newSingleThreadScheduledExecutor(),
Expand Down

0 comments on commit ebf5321

Please sign in to comment.