Skip to content

Commit

Permalink
wait until all changes to names complete before serving
Browse files Browse the repository at this point in the history
  • Loading branch information
bohhyang committed Jan 18, 2025
1 parent bd4bf5b commit 3de5965
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 30 deletions.
9 changes: 9 additions & 0 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,15 @@ final ResourceType getType()
* @param resourceName the name of the resource that was removed.
*/
public abstract void onRemoval(String resourceName);

/**
* Just a signal to notify that all resources (including both changed and removed ones) have been processed.
* Default implementation does nothing.
*/
public void onAllResourcesProcessed()
{
// do nothing
}
}

public static abstract class WildcardNodeResourceWatcher extends WildcardResourceWatcher
Expand Down
49 changes: 33 additions & 16 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ private void handleD2NodeResponse(DiscoveryResponseData data)
}
}
sendAckOrNack(data.getResourceType(), data.getNonce(), errors);
handleResourceUpdate(updates, data.getResourceType());
handleResourceRemoval(data.getRemovedResources(), data.getResourceType());
processResourceChanges(data.getResourceType(), updates, data.getRemovedResources());
}

private void handleD2ClusterOrServiceNameResponse(DiscoveryResponseData data)
Expand Down Expand Up @@ -404,8 +403,7 @@ private void handleD2ClusterOrServiceNameResponse(DiscoveryResponseData data)
}
}
sendAckOrNack(data.getResourceType(), data.getNonce(), errors);
handleResourceUpdate(updates, data.getResourceType());
handleResourceRemoval(data.getRemovedResources(), data.getResourceType());
processResourceChanges(data.getResourceType(), updates, data.getRemovedResources());
}

private void handleD2URIMapResponse(DiscoveryResponseData data)
Expand Down Expand Up @@ -436,8 +434,7 @@ private void handleD2URIMapResponse(DiscoveryResponseData data)
}
}
sendAckOrNack(data.getResourceType(), data.getNonce(), errors);
handleResourceUpdate(updates, data.getResourceType());
handleResourceRemoval(data.getRemovedResources(), data.getResourceType());
processResourceChanges(data.getResourceType(), updates, data.getRemovedResources());
}

/**
Expand Down Expand Up @@ -527,9 +524,7 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)
}
});
sendAckOrNack(data.getResourceType(), data.getNonce(), errors);

handleResourceUpdate(updates, ResourceType.D2_URI_MAP);
handleResourceRemoval(removedClusters, ResourceType.D2_URI_MAP);
processResourceChanges(ResourceType.D2_URI_MAP, updates, removedClusters);
}

@VisibleForTesting
Expand All @@ -546,17 +541,31 @@ void sendAckOrNack(ResourceType type, String nonce, List<String> errors)
}
}

private void processResourceChanges(ResourceType type, Map<String, ? extends ResourceUpdate> updates,
Collection<String> removedResources)
{
handleResourceUpdate(updates, type);
handleResourceRemoval(removedResources, type);
WildcardResourceSubscriber wildcardResourceSubscriber = getWildcardResourceSubscriber(type);
if (wildcardResourceSubscriber != null)
{
// lastly notify wildcard subscriber of the end of the changes
wildcardResourceSubscriber.onAllResourcesProcessed();
}
}

private void handleResourceUpdate(Map<String, ? extends ResourceUpdate> updates, ResourceType type)
{
Map<String, ResourceSubscriber> subscribers = getResourceSubscriberMap(type);
WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type);

for (Map.Entry<String, ? extends ResourceUpdate> entry : updates.entrySet())
{
ResourceSubscriber subscriber = subscribers.get(entry.getKey());
if (subscriber != null)
{
subscriber.onData(entry.getValue(), _serverMetricsProvider);
}
WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type);
if (wildcardSubscriber != null)
{
wildcardSubscriber.onData(entry.getKey(), entry.getValue());
Expand All @@ -570,6 +579,8 @@ private void handleResourceRemoval(Collection<String> removedResources, Resource
{
return;
}

WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type);
for (String resourceName : removedResources)
{
_xdsClientJmx.incrementResourceNotFoundCount();
Expand All @@ -579,15 +590,13 @@ private void handleResourceRemoval(Collection<String> removedResources, Resource
{
subscriber.onRemoval();
}
}
WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type);
if (wildcardSubscriber != null)
{
removedResources.forEach(wildcardSubscriber::onRemoval);
if (wildcardSubscriber != null)
{
removedResources.forEach(wildcardSubscriber::onRemoval);
}
}
}


private void notifyStreamError(Status error)
{
for (Map<String, ResourceSubscriber> subscriberMap : getResourceSubscribers().values())
Expand Down Expand Up @@ -923,6 +932,14 @@ void onRemoval(String resourceName)
watcher.onRemoval(resourceName);
}
}

private void onAllResourcesProcessed()
{
for (WildcardResourceWatcher watcher : _watchers)
{
watcher.onAllResourcesProcessed();
}
}
}

final class RpcRetryTask implements Runnable
Expand Down
56 changes: 42 additions & 14 deletions d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static com.linkedin.d2.xds.XdsClient.*;

Expand All @@ -19,12 +21,18 @@ public class XdsDirectory implements Directory
private final XdsClient _xdsClient;
private final ConcurrentMap<String, String> _serviceNames = new ConcurrentHashMap<>();
private final ConcurrentMap<String, String> _clusterNames = new ConcurrentHashMap<>();
private XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher _watcher;
private final AtomicReference<WildcardD2ClusterOrServiceNameResourceWatcher> _watcher = new AtomicReference<>();
/**
* If service/cluster names are empty, wait for a while before returning the names, callers could set a shorter
* timeout on the callback they passed in to getServiceNames or getClusterNames, as needed
* A flag that shows whether the service/cluster names data is being updated. Requests to the data should wait until
* the update is done.
*/
private static final Long DEFAULT_WAIT_TIME = 10000L;
private final AtomicBoolean _isUpdating = new AtomicBoolean(true);
/**
* This lock will be released when the service and cluster names data have been updated and is ready to serve.
* If the data is being updated, requests to access the data will wait indefinitely. Callers could set a shorter
* timeout on the callback passed in to getServiceNames or getClusterNames, as needed.
*/
private final Object _dataReadyLock = new Object();

public XdsDirectory(XdsClient xdsClient)
{
Expand All @@ -51,12 +59,15 @@ public void getClusterNames(Callback<List<String>> callback)

private void addNameWatcher()
{
if (_watcher != null)
if (_watcher.get() != null)
{
return;
}
_watcher = createNameWatcher();
_xdsClient.watchAllXdsResources(_watcher);
boolean created = _watcher.compareAndSet(null, createNameWatcher());
if (created)
{
_xdsClient.watchAllXdsResources(_watcher.get());
}
}

private XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher createNameWatcher()
Expand All @@ -67,6 +78,7 @@ private XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher createNameWatche
@Override
public void onChanged(String resourceName, XdsClient.D2ClusterOrServiceNameUpdate update)
{
_isUpdating.compareAndSet(false, true);
if (update == EMPTY_D2_CLUSTER_OR_SERVICE_NAME_UPDATE)
{ // invalid data, ignore
return;
Expand All @@ -84,13 +96,24 @@ public void onChanged(String resourceName, XdsClient.D2ClusterOrServiceNameUpdat
@Override
public void onRemoval(String resourceName)
{
_isUpdating.compareAndSet(false, true);
// Don't need to differentiate between cluster and service names, will have no op on the map that doesn't
// have the key. And the resource won't be both a cluster and a service name, since the two have different d2
// path (/d2/clusters vs /d2/services).
_clusterNames.remove(resourceName);
_serviceNames.remove(resourceName);
}

@Override
public void onAllResourcesProcessed()
{
synchronized (_dataReadyLock)
{
_isUpdating.compareAndSet(true, false);
_dataReadyLock.notifyAll();
}
}

@Override
public void onError(Status error)
{
Expand All @@ -108,15 +131,20 @@ public void onReconnect()
private void waitAndRespond(boolean isForService, Callback<List<String>> callback) {
// Changes in the corresponding map will be reflected in the returned collection
Collection<String> names = isForService ? _serviceNames.values() : _clusterNames.values();
if (names.isEmpty())

synchronized (_dataReadyLock)
{
try {
wait(DEFAULT_WAIT_TIME);
} catch (InterruptedException e) {
// do nothing
while (_isUpdating.get())
{
try
{
_dataReadyLock.wait();
} catch (InterruptedException e)
{
// do nothing
}
}
callback.onSuccess(new ArrayList<>(names));
}

callback.onSuccess(new ArrayList<>(names));
}
}

0 comments on commit 3de5965

Please sign in to comment.