diff --git a/CHANGELOG.md b/CHANGELOG.md index 81c5000922..8d70099a6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.63.1] - 2025-01-14 +- Add XdsDirectory to get d2 service and cluster names from INDIS + ## [29.63.0] - 2024-11-06 - Add announcer status delegate interface @@ -5761,7 +5764,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.0...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.1...master +[29.63.1]: https://github.com/linkedin/rest.li/compare/v29.63.0...v29.63.1 [29.63.0]: https://github.com/linkedin/rest.li/compare/v29.62.1...v29.63.0 [29.62.1]: https://github.com/linkedin/rest.li/compare/v29.62.0...v29.62.1 [29.62.0]: https://github.com/linkedin/rest.li/compare/v29.61.0...v29.62.0 diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index f5040dcb45..c4547fc70c 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -180,6 +180,27 @@ final void onChanged(String resourceName, ResourceUpdate update) } } + public static abstract class WildcardD2ClusterOrServiceNameResourceWatcher extends WildcardResourceWatcher + { + public WildcardD2ClusterOrServiceNameResourceWatcher() + { + super(ResourceType.D2_CLUSTER_OR_SERVICE_NAME); + } + + /** + * Called when a D2ClusterOrServiceName resource is added or updated. + * @param resourceName the resource name of the D2ClusterOrServiceName that was added or updated. + * @param update the new data for the D2ClusterOrServiceName resource + */ + public abstract void onChanged(String resourceName, D2ClusterOrServiceNameUpdate update); + + @Override + final void onChanged(String resourceName, ResourceUpdate update) + { + onChanged(resourceName, update); + } + } + public interface ResourceUpdate { boolean isValid(); @@ -233,6 +254,54 @@ public String toString() } } + public static final class D2ClusterOrServiceNameUpdate implements ResourceUpdate + { + XdsD2.D2ClusterOrServiceName _nameData; + + D2ClusterOrServiceNameUpdate(XdsD2.D2ClusterOrServiceName nameData) + { + _nameData = nameData; + } + + public XdsD2.D2ClusterOrServiceName getNameData() + { + return _nameData; + } + + @Override + public boolean equals(Object object) + { + if (this == object) + { + return true; + } + if (object == null || getClass() != object.getClass()) + { + return false; + } + D2ClusterOrServiceNameUpdate that = (D2ClusterOrServiceNameUpdate) object; + return Objects.equals(_nameData, that._nameData); + } + + @Override + public int hashCode() + { + return Objects.hash(_nameData); + } + + @Override + public boolean isValid() + { + return _nameData != null && (!_nameData.getClusterName().isEmpty() || !_nameData.getServiceName().isEmpty()); + } + + @Override + public String toString() + { + return MoreObjects.toStringHelper(this).add("_nameData", _nameData).toString(); + } + } + public static final class D2URIMapUpdate implements ResourceUpdate { Map _uriMap; @@ -302,12 +371,16 @@ public String toString() public static final NodeUpdate EMPTY_NODE_UPDATE = new NodeUpdate(null); public static final D2URIMapUpdate EMPTY_D2_URI_MAP_UPDATE = new D2URIMapUpdate(null); + public static final D2ClusterOrServiceNameUpdate EMPTY_D2_CLUSTER_OR_SERVICE_NAME_UPDATE = + new D2ClusterOrServiceNameUpdate(null); enum ResourceType { NODE("type.googleapis.com/indis.Node", EMPTY_NODE_UPDATE), D2_URI_MAP("type.googleapis.com/indis.D2URIMap", EMPTY_D2_URI_MAP_UPDATE), - D2_URI("type.googleapis.com/indis.D2URI", EMPTY_D2_URI_MAP_UPDATE); + D2_URI("type.googleapis.com/indis.D2URI", EMPTY_D2_URI_MAP_UPDATE), + D2_CLUSTER_OR_SERVICE_NAME("type.googleapis.com/indis.D2ClusterOrServiceName", + EMPTY_D2_CLUSTER_OR_SERVICE_NAME_UPDATE); private static final Map TYPE_URL_TO_ENUM = Arrays.stream(values()) .filter(e -> e.typeUrl() != null) diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 05e4a922a3..c2e133e0f8 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -797,33 +797,39 @@ void addWatcher(WildcardResourceWatcher watcher) } } - private void onData(String resourceName, ResourceUpdate data) + private void onData(String resourceName, ResourceUpdate update) { - if (Objects.equals(_data.get(resourceName), data)) + if (Objects.equals(_data.get(resourceName), update)) { _log.debug("Received resource update data equal to the current data. Will not perform the update."); return; } // null value guard to avoid overwriting the property with null - if (data != null && data.isValid()) + if (update != null && update.isValid()) { - _data.put(resourceName, data); - for (WildcardResourceWatcher watcher : _watchers) - { - watcher.onChanged(resourceName, data); - } + _data.put(resourceName, update); } else { if (_type == ResourceType.D2_URI_MAP || _type == ResourceType.D2_URI) { - RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); + RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, update); } else { - _log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); + _log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, update); } } + + if (_data.get(resourceName) == null) + { + _log.info("Initializing {} {} to empty data.", _type, resourceName); + _data.put(resourceName, _type.emptyData()); + } + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onChanged(resourceName, update); + } } public ResourceType getType() @@ -867,11 +873,12 @@ public void run() for (ResourceType type : _resourceSubscribers.keySet()) { Set resources = new HashSet<>(getResourceSubscriberMap(type).keySet()); - if (resources.isEmpty() && getWildcardResourceSubscriber(type) == null) + if (resources.isEmpty()) { continue; } - ResourceType rewrittenType; + + ResourceType rewrittenType = type; if (_subscribeToUriGlobCollection && type == ResourceType.D2_URI_MAP) { resources = resources.stream() @@ -879,17 +886,13 @@ public void run() .collect(Collectors.toCollection(HashSet::new)); rewrittenType = ResourceType.D2_URI; } - else - { - rewrittenType = type; - } - // If there is a wildcard subscriber, we should always send a wildcard request to the server. - if (getWildcardResourceSubscriber(type) != null) - { - resources.add("*"); - } _adsStream.sendDiscoveryRequest(rewrittenType, resources); } + + for (ResourceType type: _wildcardSubscribers.keySet()) + { + _adsStream.sendDiscoveryRequest(type, Collections.singletonList("*")); + } } } diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java new file mode 100644 index 0000000000..5290368824 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsDirectory.java @@ -0,0 +1,121 @@ +package com.linkedin.d2.xds.balancer; + +import com.linkedin.common.callback.Callback; +import com.linkedin.d2.balancer.Directory; +import com.linkedin.d2.xds.XdsClient; +import io.grpc.Status; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static com.linkedin.d2.xds.XdsClient.*; + + +public class XdsDirectory implements Directory +{ + private final XdsClient _xdsClient; + private final ConcurrentMap _serviceNames = new ConcurrentHashMap<>(); + private final ConcurrentMap _clusterNames = new ConcurrentHashMap<>(); + private XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher _watcher; + /** + * If service/cluster names are empty, wait for a while before returning the names, callers could set a shorter + * timeout on the callback they passed in to getServiceNames or getClusterNames, as needed + */ + private static final Long DEFAULT_WAIT_TIME = 10000L; + + public XdsDirectory(XdsClient xdsClient) + { + _xdsClient = xdsClient; + } + + public void start() { + addNameWatcher(); + } + + @Override + public void getServiceNames(Callback> callback) + { + addNameWatcher(); + waitAndRespond(true, callback); + } + + @Override + public void getClusterNames(Callback> callback) + { + addNameWatcher(); + waitAndRespond(false, callback); + } + + private void addNameWatcher() + { + if (_watcher != null) + { + return; + } + _watcher = createNameWatcher(); + _xdsClient.watchAllXdsResources(_watcher); + } + + private XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher createNameWatcher() + { + return new XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher() + { + + @Override + public void onChanged(String resourceName, XdsClient.D2ClusterOrServiceNameUpdate update) + { + if (update == EMPTY_D2_CLUSTER_OR_SERVICE_NAME_UPDATE) + { // invalid data, ignore + return; + } + D2ClusterOrServiceName nameData = update.getNameData(); + if (nameData.getClusterName() != null) + { + _clusterNames.put(resourceName, nameData.getClusterName()); + } else + { + _serviceNames.put(resourceName, nameData.getServiceName()); + } + } + + @Override + public void onRemoval(String resourceName) + { + // Don't need to differentiate between cluster and service names, will have no op on the map that doesn't + // have the key. And the resource won't be both a cluster and a service name, since the two have different d2 + // path (/d2/clusters vs /d2/services). + _clusterNames.remove(resourceName); + _serviceNames.remove(resourceName); + } + + @Override + public void onError(Status error) + { + // do nothing + } + + @Override + public void onReconnect() + { + // do nothing + } + }; + } + + private void waitAndRespond(boolean isForService, Callback> callback) { + // Changes in the corresponding map will be reflected in the returned collection + Collection names = isForService ? _serviceNames.values() : _clusterNames.values(); + if (names.isEmpty()) + { + try { + wait(DEFAULT_WAIT_TIME); + } catch (InterruptedException e) { + // do nothing + } + } + + callback.onSuccess(new ArrayList<>(names)); + } +} diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancer.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancer.java index 8d68a8844b..0362d973d9 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancer.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancer.java @@ -26,6 +26,7 @@ import com.linkedin.d2.balancer.properties.ServiceProperties; import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.balancer.util.ClusterInfoProvider; +import com.linkedin.d2.balancer.util.DirectoryProvider; import com.linkedin.d2.balancer.util.TogglingLoadBalancer; import com.linkedin.d2.balancer.util.hashing.ConsistentHashKeyMapper; import com.linkedin.d2.balancer.util.hashing.HashRingProvider; @@ -51,21 +52,30 @@ * When xDS connection is temporarily unavailable, it switches back to discover from backup file store. * It reconnects and rebuilds state when the connection is back alive. */ -public class XdsLoadBalancer implements LoadBalancerWithFacilities, WarmUpService +public class XdsLoadBalancer implements LoadBalancerWithFacilities, WarmUpService, DirectoryProvider { private static final Logger _log = LoggerFactory.getLogger(XdsLoadBalancer.class); private final TogglingLoadBalancer _loadBalancer; private final XdsToD2PropertiesAdaptor _xdsAdaptor; private final ScheduledExecutorService _executorService; + private final XdsDirectory _directory; + @Deprecated public XdsLoadBalancer(XdsToD2PropertiesAdaptor xdsAdaptor, ScheduledExecutorService executorService, XdsFsTogglingLoadBalancerFactory factory) + { + this(xdsAdaptor, executorService, factory, null); + } + + public XdsLoadBalancer(XdsToD2PropertiesAdaptor xdsAdaptor, ScheduledExecutorService executorService, + XdsFsTogglingLoadBalancerFactory factory, XdsDirectory directory) { _xdsAdaptor = xdsAdaptor; _loadBalancer = factory.create(executorService, xdsAdaptor); _executorService = executorService; registerXdsFSToggle(); + _directory = directory; } private void registerXdsFSToggle() @@ -121,8 +131,7 @@ public void getClient(Request request, RequestContext requestContext, Callback callback) { _xdsAdaptor.start(); + _directory.start(); callback.onSuccess(None.none()); } diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index 294b5b7430..5b6847e032 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -65,6 +65,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) XdsToD2PropertiesAdaptor adaptor = new XdsToD2PropertiesAdaptor(xdsClient, config.dualReadStateManager, config.serviceDiscoveryEventEmitter, config.clientServicesConfig); + XdsDirectory directory = new XdsDirectory(xdsClient); + XdsLoadBalancer xdsLoadBalancer = new XdsLoadBalancer( adaptor, executorService, @@ -72,7 +74,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) config.clientFactories, config.loadBalancerStrategyFactories, config.d2ServicePath, config.sslContext, config.sslParameters, config.isSSLEnabled, config.clientServicesConfig, config.partitionAccessorRegistry, config.sslSessionValidatorFactory, d2ClientJmxManager, config.deterministicSubsettingMetadataProvider, - config.failoutConfigProviderFactory, config.canaryDistributionProvider, config.loadBalanceStreamException) + config.failoutConfigProviderFactory, config.canaryDistributionProvider, config.loadBalanceStreamException), + directory ); LoadBalancerWithFacilities balancer = xdsLoadBalancer; diff --git a/gradle.properties b/gradle.properties index 99272913b7..23824f1c75 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.63.0 +version=29.63.1 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true