Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
brycezhongqing committed Nov 13, 2023
1 parent 7efca5f commit 9868e63
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -203,7 +202,7 @@ public D2Client build()
_config.dualReadStateManager,
_config.xdsExecutorService,
_config.xdsStreamReadyTimeout,
_config.loadBalancerThreadPool
_config.loadBalancerExecutor
);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
Expand Down Expand Up @@ -645,8 +644,8 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat
return this;
}

public D2ClientBuilder setLoadBalancerThreadPool(ThreadPoolExecutor loadBalancerThreadPool) {
_config.loadBalancerThreadPool = loadBalancerThreadPool;
public D2ClientBuilder setLoadBalancerExecutor(ExecutorService loadBalancerExecutor) {
_config.loadBalancerExecutor = loadBalancerExecutor;
return this;
}

Expand Down
8 changes: 4 additions & 4 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -128,7 +128,7 @@ public class D2ClientConfig

public ScheduledExecutorService xdsExecutorService = null;
public Long xdsStreamReadyTimeout = null;
public ThreadPoolExecutor loadBalancerThreadPool = null;
public ExecutorService loadBalancerExecutor = null;

public D2ClientConfig()
{
Expand Down Expand Up @@ -198,7 +198,7 @@ public D2ClientConfig()
DualReadStateManager dualReadStateManager,
ScheduledExecutorService xdsExecutorService,
Long xdsStreamReadyTimeout,
ThreadPoolExecutor loadBalancerThreadPool)
ExecutorService loadBalancerExecutor)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -264,6 +264,6 @@ public D2ClientConfig()
this.dualReadStateManager = dualReadStateManager;
this.xdsExecutorService = xdsExecutorService;
this.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
this.loadBalancerThreadPool = loadBalancerThreadPool;
this.loadBalancerExecutor = loadBalancerExecutor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.linkedin.d2.balancer.dualread;

import com.google.common.util.concurrent.MoreExecutors;
import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.util.None;
Expand All @@ -34,9 +35,7 @@
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.transport.common.TransportClientFactory;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
Expand All @@ -61,16 +60,31 @@ public class DualReadLoadBalancer implements LoadBalancerWithFacilities {
private final LoadBalancerWithFacilities _oldLb;
private final LoadBalancerWithFacilities _newLb;
private final DualReadStateManager _dualReadStateManager;
private ThreadPoolExecutor _loadBalancerThreadPool;
private ExecutorService _executor;
private boolean _isNewLbReady;

@Deprecated
public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb,
@Nonnull DualReadStateManager dualReadStateManager)
@Nonnull DualReadStateManager dualReadStateManager) {
this(oldLb, newLb, dualReadStateManager, null);
LOG.warn("Deprecated DualReadLoadBalancer constructor used without a threadpool");
}

public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb,
@Nonnull DualReadStateManager dualReadStateManager, ExecutorService executor)
{
_oldLb = oldLb;
_newLb = newLb;
_dualReadStateManager = dualReadStateManager;
_isNewLbReady = false;
if(executor == null){
// Using a direct executor here means the code is executed directly,
// blocking the caller. This means the old behavior is preserved.
_executor = MoreExecutors.newDirectExecutorService();
LOG.warn("Deprecated DualReadLoadBalancer constructor used without a threadpool executor");
}else{
_executor = executor;
}
}

@Override
Expand Down Expand Up @@ -111,11 +125,11 @@ public void getClient(Request request, RequestContext requestContext, Callback<T
_newLb.getClient(request, requestContext, clientCallback);
break;
case DUAL_READ:
getLoadBalancerThreadPool().execute(
_executor.execute(
() -> _newLb.getLoadBalancedServiceProperties(serviceName, new Callback<ServiceProperties>() {
@Override
public void onError(Throwable e) {
LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e);
LOG.error("Double read failure. Unable to read service properties from: {}", serviceName, e);
}

@Override
Expand All @@ -126,12 +140,12 @@ public void onSuccess(ServiceProperties result) {
new Callback<Pair<ClusterProperties, UriProperties>>() {
@Override
public void onError(Throwable e) {
LOG.error("Dual read failure. Unable to read cluster properties from: " + clusterName, e);
LOG.error("Dual read failure. Unable to read cluster properties from: {}", clusterName, e);
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result) {
LOG.debug("Dual read is successful. Get cluster and uri properties: " + result);
LOG.debug("Dual read is successful. Get cluster and uri properties: {}", result);
}
});
}
Expand All @@ -153,7 +167,7 @@ public void getLoadBalancedServiceProperties(String serviceName, Callback<Servic
_newLb.getLoadBalancedServiceProperties(serviceName, clientCallback);
break;
case DUAL_READ:
getLoadBalancerThreadPool().execute(() -> _newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty()));
_executor.execute(() -> _newLb.getLoadBalancedServiceProperties(serviceName, Callbacks.empty()));
_oldLb.getLoadBalancedServiceProperties(serviceName, clientCallback);
break;
case OLD_LB_ONLY:
Expand All @@ -172,7 +186,7 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName,
_newLb.getLoadBalancedClusterAndUriProperties(clusterName, callback);
break;
case DUAL_READ:
getLoadBalancerThreadPool().execute(() -> _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty()));
_executor.execute(() -> _newLb.getLoadBalancedClusterAndUriProperties(clusterName, Callbacks.empty()));
_oldLb.getLoadBalancedClusterAndUriProperties(clusterName, callback);
break;
case OLD_LB_ONLY:
Expand Down Expand Up @@ -280,29 +294,6 @@ private DualReadModeProvider.DualReadMode getDualReadMode(String d2ServiceName)
return _dualReadStateManager.getServiceDualReadMode(d2ServiceName);
}


/**
* Get the thread pool for load balancer tasks. If not set, a default thread pool will be used.
*/
public ThreadPoolExecutor getLoadBalancerThreadPool() {
return _loadBalancerThreadPool;
}

/**
* Set the thread pool for load balancer tasks. If not set, a default thread pool will be used.
*/
public void setLoadBalancerThreadPool(ThreadPoolExecutor loadBalancerThreadPool) {
if (loadBalancerThreadPool != null) {
this._loadBalancerThreadPool = loadBalancerThreadPool;
} else {
LOG.info("LoadBalancerTaskThreadPool is null, using default thread pool");
this._loadBalancerThreadPool =
new ThreadPoolExecutor(2, 3, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), (r, executor) -> {
LOG.error("LoadBalancerTaskThreadPool rejected execution, isNewReady: " + _isNewLbReady);
});
}
}

@Override
public void shutdown(PropertyEventThread.PropertyEventShutdownCallback callback)
{
Expand All @@ -312,6 +303,5 @@ public void shutdown(PropertyEventThread.PropertyEventShutdownCallback callback)
});

_oldLb.shutdown(callback);
_loadBalancerThreadPool.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ public DualReadZkAndXdsLoadBalancerFactory(@Nonnull DualReadStateManager dualRea
@Override
public LoadBalancerWithFacilities create(D2ClientConfig config)
{
DualReadLoadBalancer loadBalancer = new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager);
loadBalancer.setLoadBalancerThreadPool(config.loadBalancerThreadPool);
return loadBalancer;
return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager, config.loadBalancerExecutor);
}
}

0 comments on commit 9868e63

Please sign in to comment.