diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index c4547fc70..39845bef2 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -135,6 +135,15 @@ final ResourceType getType() * @param resourceName the name of the resource that was removed. */ public abstract void onRemoval(String resourceName); + + /** + * Just a signal to notify that all resources (including both changed and removed ones) have been processed. + * Default implementation does nothing. + */ + public void onAllResourcesProcessed() + { + // do nothing + } } public static abstract class WildcardNodeResourceWatcher extends WildcardResourceWatcher diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index cd1189420..c70e62df8 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -374,8 +374,7 @@ private void handleD2NodeResponse(DiscoveryResponseData data) } } sendAckOrNack(data.getResourceType(), data.getNonce(), errors); - handleResourceUpdate(updates, data.getResourceType()); - handleResourceRemoval(data.getRemovedResources(), data.getResourceType()); + processResourceChanges(data.getResourceType(), updates, data.getRemovedResources()); } private void handleD2ClusterOrServiceNameResponse(DiscoveryResponseData data) @@ -404,8 +403,7 @@ private void handleD2ClusterOrServiceNameResponse(DiscoveryResponseData data) } } sendAckOrNack(data.getResourceType(), data.getNonce(), errors); - handleResourceUpdate(updates, data.getResourceType()); - handleResourceRemoval(data.getRemovedResources(), data.getResourceType()); + processResourceChanges(data.getResourceType(), updates, data.getRemovedResources()); } private void handleD2URIMapResponse(DiscoveryResponseData data) @@ -436,8 +434,7 @@ private void handleD2URIMapResponse(DiscoveryResponseData data) } } sendAckOrNack(data.getResourceType(), data.getNonce(), errors); - handleResourceUpdate(updates, data.getResourceType()); - handleResourceRemoval(data.getRemovedResources(), data.getResourceType()); + processResourceChanges(data.getResourceType(), updates, data.getRemovedResources()); } /** @@ -527,9 +524,7 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) } }); sendAckOrNack(data.getResourceType(), data.getNonce(), errors); - - handleResourceUpdate(updates, ResourceType.D2_URI_MAP); - handleResourceRemoval(removedClusters, ResourceType.D2_URI_MAP); + processResourceChanges(ResourceType.D2_URI_MAP, updates, removedClusters); } @VisibleForTesting @@ -546,9 +541,24 @@ void sendAckOrNack(ResourceType type, String nonce, List errors) } } + private void processResourceChanges(ResourceType type, Map updates, + Collection removedResources) + { + handleResourceUpdate(updates, type); + handleResourceRemoval(removedResources, type); + WildcardResourceSubscriber wildcardResourceSubscriber = getWildcardResourceSubscriber(type); + if (wildcardResourceSubscriber != null) + { + // lastly notify wildcard subscriber of the end of the changes + wildcardResourceSubscriber.onAllResourcesProcessed(); + } + } + private void handleResourceUpdate(Map updates, ResourceType type) { Map subscribers = getResourceSubscriberMap(type); + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); + for (Map.Entry entry : updates.entrySet()) { ResourceSubscriber subscriber = subscribers.get(entry.getKey()); @@ -556,7 +566,6 @@ private void handleResourceUpdate(Map updates, { subscriber.onData(entry.getValue(), _serverMetricsProvider); } - WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); if (wildcardSubscriber != null) { wildcardSubscriber.onData(entry.getKey(), entry.getValue()); @@ -570,6 +579,8 @@ private void handleResourceRemoval(Collection removedResources, Resource { return; } + + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); for (String resourceName : removedResources) { _xdsClientJmx.incrementResourceNotFoundCount(); @@ -579,15 +590,13 @@ private void handleResourceRemoval(Collection removedResources, Resource { subscriber.onRemoval(); } - } - WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); - if (wildcardSubscriber != null) - { - removedResources.forEach(wildcardSubscriber::onRemoval); + if (wildcardSubscriber != null) + { + removedResources.forEach(wildcardSubscriber::onRemoval); + } } } - private void notifyStreamError(Status error) { for (Map subscriberMap : getResourceSubscribers().values()) @@ -923,6 +932,14 @@ void onRemoval(String resourceName) watcher.onRemoval(resourceName); } } + + private void onAllResourcesProcessed() + { + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onAllResourcesProcessed(); + } + } } final class RpcRetryTask implements Runnable diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java index b76f7de19..2c1323eb9 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java @@ -10,6 +10,8 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static com.linkedin.d2.xds.XdsClient.*; @@ -19,12 +21,18 @@ public class XdsDirectory implements Directory private final XdsClient _xdsClient; private final ConcurrentMap _serviceNames = new ConcurrentHashMap<>(); private final ConcurrentMap _clusterNames = new ConcurrentHashMap<>(); - private XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher _watcher; + private final AtomicReference _watcher = new AtomicReference<>(); /** - * If service/cluster names are empty, wait for a while before returning the names, callers could set a shorter - * timeout on the callback they passed in to getServiceNames or getClusterNames, as needed + * A flag that shows whether the service/cluster names data is being updated. Requests to the data should wait until + * the update is done. */ - private static final Long DEFAULT_WAIT_TIME = 10000L; + private final AtomicBoolean _isUpdating = new AtomicBoolean(true); + /** + * This lock will be released when the service and cluster names data have been updated and is ready to serve. + * If the data is being updated, requests to access the data will wait indefinitely. Callers could set a shorter + * timeout on the callback passed in to getServiceNames or getClusterNames, as needed. + */ + private final Object _dataReadyLock = new Object(); public XdsDirectory(XdsClient xdsClient) { @@ -51,12 +59,15 @@ public void getClusterNames(Callback> callback) private void addNameWatcher() { - if (_watcher != null) + if (_watcher.get() != null) { return; } - _watcher = createNameWatcher(); - _xdsClient.watchAllXdsResources(_watcher); + boolean created = _watcher.compareAndSet(null, createNameWatcher()); + if (created) + { + _xdsClient.watchAllXdsResources(_watcher.get()); + } } private XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher createNameWatcher() @@ -67,6 +78,7 @@ private XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher createNameWatche @Override public void onChanged(String resourceName, XdsClient.D2ClusterOrServiceNameUpdate update) { + _isUpdating.compareAndSet(false, true); if (update == EMPTY_D2_CLUSTER_OR_SERVICE_NAME_UPDATE) { // invalid data, ignore return; @@ -84,6 +96,7 @@ public void onChanged(String resourceName, XdsClient.D2ClusterOrServiceNameUpdat @Override public void onRemoval(String resourceName) { + _isUpdating.compareAndSet(false, true); // Don't need to differentiate between cluster and service names, will have no op on the map that doesn't // have the key. And the resource won't be both a cluster and a service name, since the two have different d2 // path (/d2/clusters vs /d2/services). @@ -91,6 +104,16 @@ public void onRemoval(String resourceName) _serviceNames.remove(resourceName); } + @Override + public void onAllResourcesProcessed() + { + synchronized (_dataReadyLock) + { + _isUpdating.compareAndSet(true, false); + _dataReadyLock.notifyAll(); + } + } + @Override public void onError(Status error) { @@ -108,15 +131,20 @@ public void onReconnect() private void waitAndRespond(boolean isForService, Callback> callback) { // Changes in the corresponding map will be reflected in the returned collection Collection names = isForService ? _serviceNames.values() : _clusterNames.values(); - if (names.isEmpty()) + + synchronized (_dataReadyLock) { - try { - wait(DEFAULT_WAIT_TIME); - } catch (InterruptedException e) { - // do nothing + while (_isUpdating.get()) + { + try + { + _dataReadyLock.wait(); + } catch (InterruptedException e) + { + // do nothing + } } + callback.onSuccess(new ArrayList<>(names)); } - - callback.onSuccess(new ArrayList<>(names)); } }