Skip to content

Commit

Permalink
Add XdsDirectory to get d2 service and cluster names from INDIS
Browse files Browse the repository at this point in the history
  • Loading branch information
bohhyang committed Jan 15, 2025
1 parent 1eeec25 commit ef522ff
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 28 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.63.1] - 2025-01-14
- Add XdsDirectory to get d2 service and cluster names from INDIS

## [29.63.0] - 2024-11-06
- Add announcer status delegate interface

Expand Down Expand Up @@ -5761,7 +5764,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.1...master
[29.63.1]: https://github.com/linkedin/rest.li/compare/v29.63.0...v29.63.1
[29.63.0]: https://github.com/linkedin/rest.li/compare/v29.62.1...v29.63.0
[29.62.1]: https://github.com/linkedin/rest.li/compare/v29.62.0...v29.62.1
[29.62.0]: https://github.com/linkedin/rest.li/compare/v29.61.0...v29.62.0
Expand Down
75 changes: 74 additions & 1 deletion d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,27 @@ final void onChanged(String resourceName, ResourceUpdate update)
}
}

public static abstract class WildcardD2ClusterOrServiceNameResourceWatcher extends WildcardResourceWatcher
{
public WildcardD2ClusterOrServiceNameResourceWatcher()
{
super(ResourceType.D2_CLUSTER_OR_SERVICE_NAME);
}

/**
* Called when a D2ClusterOrServiceName resource is added or updated.
* @param resourceName the resource name of the D2ClusterOrServiceName that was added or updated.
* @param update the new data for the D2ClusterOrServiceName resource
*/
public abstract void onChanged(String resourceName, D2ClusterOrServiceNameUpdate update);

@Override
final void onChanged(String resourceName, ResourceUpdate update)
{
onChanged(resourceName, update);
}
}

public interface ResourceUpdate
{
boolean isValid();
Expand Down Expand Up @@ -233,6 +254,54 @@ public String toString()
}
}

public static final class D2ClusterOrServiceNameUpdate implements ResourceUpdate
{
XdsD2.D2ClusterOrServiceName _nameData;

D2ClusterOrServiceNameUpdate(XdsD2.D2ClusterOrServiceName nameData)
{
_nameData = nameData;
}

public XdsD2.D2ClusterOrServiceName getNameData()
{
return _nameData;
}

@Override
public boolean equals(Object object)
{
if (this == object)
{
return true;
}
if (object == null || getClass() != object.getClass())
{
return false;
}
D2ClusterOrServiceNameUpdate that = (D2ClusterOrServiceNameUpdate) object;
return Objects.equals(_nameData, that._nameData);
}

@Override
public int hashCode()
{
return Objects.hash(_nameData);
}

@Override
public boolean isValid()
{
return _nameData != null && (!_nameData.getClusterName().isEmpty() || !_nameData.getServiceName().isEmpty());
}

@Override
public String toString()
{
return MoreObjects.toStringHelper(this).add("_nameData", _nameData).toString();
}
}

public static final class D2URIMapUpdate implements ResourceUpdate
{
Map<String, XdsD2.D2URI> _uriMap;
Expand Down Expand Up @@ -302,12 +371,16 @@ public String toString()

public static final NodeUpdate EMPTY_NODE_UPDATE = new NodeUpdate(null);
public static final D2URIMapUpdate EMPTY_D2_URI_MAP_UPDATE = new D2URIMapUpdate(null);
public static final D2ClusterOrServiceNameUpdate EMPTY_D2_CLUSTER_OR_SERVICE_NAME_UPDATE =
new D2ClusterOrServiceNameUpdate(null);

enum ResourceType
{
NODE("type.googleapis.com/indis.Node", EMPTY_NODE_UPDATE),
D2_URI_MAP("type.googleapis.com/indis.D2URIMap", EMPTY_D2_URI_MAP_UPDATE),
D2_URI("type.googleapis.com/indis.D2URI", EMPTY_D2_URI_MAP_UPDATE);
D2_URI("type.googleapis.com/indis.D2URI", EMPTY_D2_URI_MAP_UPDATE),
D2_CLUSTER_OR_SERVICE_NAME("type.googleapis.com/indis.D2ClusterOrServiceName",
EMPTY_D2_CLUSTER_OR_SERVICE_NAME_UPDATE);

private static final Map<String, ResourceType> TYPE_URL_TO_ENUM = Arrays.stream(values())
.filter(e -> e.typeUrl() != null)
Expand Down
45 changes: 24 additions & 21 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -797,33 +797,39 @@ void addWatcher(WildcardResourceWatcher watcher)
}
}

private void onData(String resourceName, ResourceUpdate data)
private void onData(String resourceName, ResourceUpdate update)
{
if (Objects.equals(_data.get(resourceName), data))
if (Objects.equals(_data.get(resourceName), update))
{
_log.debug("Received resource update data equal to the current data. Will not perform the update.");
return;
}
// null value guard to avoid overwriting the property with null
if (data != null && data.isValid())
if (update != null && update.isValid())
{
_data.put(resourceName, data);
for (WildcardResourceWatcher watcher : _watchers)
{
watcher.onChanged(resourceName, data);
}
_data.put(resourceName, update);
}
else
{
if (_type == ResourceType.D2_URI_MAP || _type == ResourceType.D2_URI)
{
RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data);
RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, update);
}
else
{
_log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data);
_log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, update);
}
}

if (_data.get(resourceName) == null)
{
_log.info("Initializing {} {} to empty data.", _type, resourceName);
_data.put(resourceName, _type.emptyData());
}
for (WildcardResourceWatcher watcher : _watchers)
{
watcher.onChanged(resourceName, update);
}
}

public ResourceType getType()
Expand Down Expand Up @@ -867,29 +873,26 @@ public void run()
for (ResourceType type : _resourceSubscribers.keySet())
{
Set<String> resources = new HashSet<>(getResourceSubscriberMap(type).keySet());
if (resources.isEmpty() && getWildcardResourceSubscriber(type) == null)
if (resources.isEmpty())
{
continue;
}
ResourceType rewrittenType;

ResourceType rewrittenType = type;
if (_subscribeToUriGlobCollection && type == ResourceType.D2_URI_MAP)
{
resources = resources.stream()
.map(GlobCollectionUtils::globCollectionUrlForClusterResource)
.collect(Collectors.toCollection(HashSet::new));
rewrittenType = ResourceType.D2_URI;
}
else
{
rewrittenType = type;
}
// If there is a wildcard subscriber, we should always send a wildcard request to the server.
if (getWildcardResourceSubscriber(type) != null)
{
resources.add("*");
}
_adsStream.sendDiscoveryRequest(rewrittenType, resources);
}

for (ResourceType type: _wildcardSubscribers.keySet())
{
_adsStream.sendDiscoveryRequest(type, Collections.singletonList("*"));
}
}
}

Expand Down
121 changes: 121 additions & 0 deletions d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package com.linkedin.d2.xds.balancer;

import com.linkedin.common.callback.Callback;
import com.linkedin.d2.balancer.Directory;
import com.linkedin.d2.xds.XdsClient;
import io.grpc.Status;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.linkedin.d2.xds.XdsClient.*;


public class XdsDirectory implements Directory
{
private final XdsClient _xdsClient;
private final ConcurrentMap<String, String> _serviceNames = new ConcurrentHashMap<>();
private final ConcurrentMap<String, String> _clusterNames = new ConcurrentHashMap<>();
private XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher _watcher;
/**
* If service/cluster names are empty, wait for a while before returning the names, callers could set a shorter
* timeout on the callback they passed in to getServiceNames or getClusterNames, as needed
*/
private static final Long DEFAULT_WAIT_TIME = 10000L;

public XdsDirectory(XdsClient xdsClient)
{
_xdsClient = xdsClient;
}

public void start() {
addNameWatcher();
}

@Override
public void getServiceNames(Callback<List<String>> callback)
{
addNameWatcher();
waitAndRespond(true, callback);
}

@Override
public void getClusterNames(Callback<List<String>> callback)
{
addNameWatcher();
waitAndRespond(false, callback);
}

private void addNameWatcher()
{
if (_watcher != null)
{
return;
}
_watcher = createNameWatcher();
_xdsClient.watchAllXdsResources(_watcher);
}

private XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher createNameWatcher()
{
return new XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher()
{

@Override
public void onChanged(String resourceName, XdsClient.D2ClusterOrServiceNameUpdate update)
{
if (update == EMPTY_D2_CLUSTER_OR_SERVICE_NAME_UPDATE)
{ // invalid data, ignore
return;
}
D2ClusterOrServiceName nameData = update.getNameData();
if (nameData.getClusterName() != null)
{
_clusterNames.put(resourceName, nameData.getClusterName());
} else
{
_serviceNames.put(resourceName, nameData.getServiceName());
}
}

@Override
public void onRemoval(String resourceName)
{
// Don't need to differentiate between cluster and service names, will have no op on the map that doesn't
// have the key. And the resource won't be both a cluster and a service name, since the two have different d2
// path (/d2/clusters vs /d2/services).
_clusterNames.remove(resourceName);
_serviceNames.remove(resourceName);
}

@Override
public void onError(Status error)
{
// do nothing
}

@Override
public void onReconnect()
{
// do nothing
}
};
}

private void waitAndRespond(boolean isForService, Callback<List<String>> callback) {
// Changes in the corresponding map will be reflected in the returned collection
Collection<String> names = isForService ? _serviceNames.values() : _clusterNames.values();
if (names.isEmpty())
{
try {
wait(DEFAULT_WAIT_TIME);
} catch (InterruptedException e) {
// do nothing
}
}

callback.onSuccess(new ArrayList<>(names));
}
}
16 changes: 13 additions & 3 deletions d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.d2.balancer.util.ClusterInfoProvider;
import com.linkedin.d2.balancer.util.DirectoryProvider;
import com.linkedin.d2.balancer.util.TogglingLoadBalancer;
import com.linkedin.d2.balancer.util.hashing.ConsistentHashKeyMapper;
import com.linkedin.d2.balancer.util.hashing.HashRingProvider;
Expand All @@ -51,21 +52,30 @@
* When xDS connection is temporarily unavailable, it switches back to discover from backup file store.
* It reconnects and rebuilds state when the connection is back alive.
*/
public class XdsLoadBalancer implements LoadBalancerWithFacilities, WarmUpService
public class XdsLoadBalancer implements LoadBalancerWithFacilities, WarmUpService, DirectoryProvider
{
private static final Logger _log = LoggerFactory.getLogger(XdsLoadBalancer.class);

private final TogglingLoadBalancer _loadBalancer;
private final XdsToD2PropertiesAdaptor _xdsAdaptor;
private final ScheduledExecutorService _executorService;
private final XdsDirectory _directory;

@Deprecated
public XdsLoadBalancer(XdsToD2PropertiesAdaptor xdsAdaptor, ScheduledExecutorService executorService,
XdsFsTogglingLoadBalancerFactory factory)
{
this(xdsAdaptor, executorService, factory, null);
}

public XdsLoadBalancer(XdsToD2PropertiesAdaptor xdsAdaptor, ScheduledExecutorService executorService,
XdsFsTogglingLoadBalancerFactory factory, XdsDirectory directory)
{
_xdsAdaptor = xdsAdaptor;
_loadBalancer = factory.create(executorService, xdsAdaptor);
_executorService = executorService;
registerXdsFSToggle();
_directory = directory;
}

private void registerXdsFSToggle()
Expand Down Expand Up @@ -121,8 +131,7 @@ public void getClient(Request request, RequestContext requestContext, Callback<T
@Override
public Directory getDirectory()
{
// TODO: get a list of all ZK services and clusters names
throw new UnsupportedOperationException();
return _directory;
}

@Override
Expand Down Expand Up @@ -172,6 +181,7 @@ public ClusterInfoProvider getClusterInfoProvider()
public void start(Callback<None> callback)
{
_xdsAdaptor.start();
_directory.start();
callback.onSuccess(None.none());
}

Expand Down
Loading

0 comments on commit ef522ff

Please sign in to comment.