From e0b2683ab3063d0334d1c3968a48936838dabf12 Mon Sep 17 00:00:00 2001 From: Paul Chesnais Date: Mon, 23 Sep 2024 20:30:40 -0400 Subject: [PATCH] Revert "Add watchAllResource subscriber (#1020)" (#1023) --- CHANGELOG.md | 6 +- .../java/com/linkedin/d2/xds/XdsClient.java | 116 +---------- .../com/linkedin/d2/xds/XdsClientImpl.java | 181 ++---------------- .../linkedin/d2/xds/TestXdsClientImpl.java | 46 +---- gradle.properties | 2 +- 5 files changed, 24 insertions(+), 327 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bdfc3b4c8..34fd020d28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.58.8] - 2024-09-23 +- Revert Add WildcardResourceSubscriber which could subscribe to all resources, like NODE and URIMap resources. + ## [29.58.7] - 2024-09-13 - Add WildcardResourceSubscriber which could subscribe to all resources, like NODE and URIMap resources. @@ -5731,7 +5734,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.7...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.8...master +[29.58.8]: https://github.com/linkedin/rest.li/compare/v29.58.7...v29.58.8 [29.58.7]: https://github.com/linkedin/rest.li/compare/v29.58.6...v29.58.7 [29.58.6]: https://github.com/linkedin/rest.li/compare/v29.58.5...v29.58.6 [29.58.5]: https://github.com/linkedin/rest.li/compare/v29.58.4...v29.58.5 diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index f5040dcb45..a0c08b04f0 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -95,91 +95,6 @@ final void onChanged(ResourceUpdate update) } } - public static abstract class WildcardResourceWatcher - { - private final ResourceType _type; - - /** - * Defining a private constructor means only classes that are defined in this file can extend this class (see - * {@link ResourceWatcher}). - */ - WildcardResourceWatcher(ResourceType type) - { - _type = type; - } - - final ResourceType getType() - { - return _type; - } - - /** - * Called when the resource discovery RPC encounters some transient error. - */ - public abstract void onError(Status error); - - /** - * Called when the resource discovery RPC reestablishes connection. - */ - public abstract void onReconnect(); - - /** - * Called when a resource is added or updated. - * @param resourceName the name of the resource that was added or updated. - * @param update the new data {@link ResourceUpdate} for the resource. - */ - abstract void onChanged(String resourceName, ResourceUpdate update); - - /** - * Called when a resource is removed. - * @param resourceName the name of the resource that was removed. - */ - public abstract void onRemoval(String resourceName); - } - - public static abstract class WildcardNodeResourceWatcher extends WildcardResourceWatcher - { - public WildcardNodeResourceWatcher() - { - super(ResourceType.NODE); - } - - /** - * Called when a node resource is added or updated. - * @param resourceName the resource name of the {@link NodeUpdate} that was added or updated. - * @param update the new data for the {@link NodeUpdate}, including D2 cluster and service information. - */ - public abstract void onChanged(String resourceName, NodeUpdate update); - - @Override - final void onChanged(String resourceName, ResourceUpdate update) - { - onChanged(resourceName, (NodeUpdate) update); - } - } - - public static abstract class WildcardD2URIMapResourceWatcher extends WildcardResourceWatcher - { - public WildcardD2URIMapResourceWatcher() - { - super(ResourceType.D2_URI_MAP); - } - - /** - * Called when a {@link D2URIMapUpdate} resource is added or updated. - * @param resourceName the resource name of the {@link D2URIMapUpdate} map resource that was added or updated. - * like the /d2/uris/clusterName - * @param update the new data for the {@link D2URIMapUpdate} resource - */ - public abstract void onChanged(String resourceName, D2URIMapUpdate update); - - @Override - final void onChanged(String resourceName, ResourceUpdate update) - { - onChanged(resourceName, (D2URIMapUpdate) update); - } - } - public interface ResourceUpdate { boolean isValid(); @@ -194,7 +109,7 @@ public static final class NodeUpdate implements ResourceUpdate _nodeData = nodeData; } - public XdsD2.Node getNodeData() + XdsD2.Node getNodeData() { return _nodeData; } @@ -346,32 +261,13 @@ static ResourceType fromTypeUrl(String typeUrl) * will always notify the given watcher of the current data if it is already present, even if the given watcher was * already subscribed to said resource. However, the subscription will only be added once. */ - public abstract void watchXdsResource(String resourceName, ResourceWatcher watcher); + abstract void watchXdsResource(String resourceName, ResourceWatcher watcher); - /** - * Subscribes the given {@link WildcardResourceWatcher} to all the resources of the corresponding type. The watcher - * will be notified whenever a resource is added or removed. Repeated calls to this function with the same watcher - * will always notify the given watcher of the current data. - */ - public abstract void watchAllXdsResources(WildcardResourceWatcher watcher); + abstract void startRpcStream(); - /** - * Initiates the RPC stream to the xDS server. - */ - public abstract void startRpcStream(); - - /** - * Shuts down the xDS client. - */ - public abstract void shutdown(); + abstract void shutdown(); - /** - * Returns the authority of the xDS server. - */ - public abstract String getXdsServerAuthority(); + abstract String getXdsServerAuthority(); - /** - * Returns the JMX bean for the xDS client. - */ - public abstract XdsClientJmx getXdsClientJmx(); + abstract public XdsClientJmx getXdsClientJmx(); } diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index e53ed30235..b5ee6b0126 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -77,7 +77,6 @@ public class XdsClientImpl extends XdsClient Arrays.stream(ResourceType.values()) .filter(e -> e.typeUrl() != null) .collect(Collectors.toMap(Function.identity(), e -> new HashMap<>()))); - private final Map _wildcardSubscribers = Maps.newEnumMap(ResourceType.class); private final Node _node; private final ManagedChannel _managedChannel; private final ScheduledExecutorService _executorService; @@ -132,7 +131,7 @@ public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutor } @Override - public void watchXdsResource(String resourceName, ResourceWatcher watcher) + void watchXdsResource(String resourceName, ResourceWatcher watcher) { _executorService.execute(() -> { @@ -169,32 +168,6 @@ public void watchXdsResource(String resourceName, ResourceWatcher watcher) }); } - @Override - public void watchAllXdsResources(WildcardResourceWatcher watcher) - { - _executorService.execute(() -> - { - _log.info("Subscribing to wildcard for resource type: {}", watcher.getType()); - WildcardResourceSubscriber subscriber = getWildcardResourceSubscriber(watcher.getType()); - if (subscriber == null) - { - subscriber = new WildcardResourceSubscriber(watcher.getType()); - _wildcardSubscribers.put(watcher.getType(), subscriber); - - if (_adsStream == null && !isInBackoff()) - { - startRpcStreamLocal(); - } - if (_adsStream != null) - { - _adsStream.sendDiscoveryRequest(watcher.getType(), Collections.singletonList("*")); - } - } - - subscriber.addWatcher(watcher); - }); - } - @Override public void startRpcStream() { @@ -252,7 +225,7 @@ private void startRpcStreamLocal() { } @Override - public void shutdown() + void shutdown() { _executorService.execute(() -> { @@ -265,7 +238,7 @@ public void shutdown() } @Override - public String getXdsServerAuthority() + String getXdsServerAuthority() { return _managedChannel.authority(); } @@ -498,11 +471,6 @@ private void handleResourceUpdate(Map updates, { subscriber.onData(entry.getValue(), _serverMetricsProvider); } - WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type); - if (wildcardSubscriber != null) - { - wildcardSubscriber.onData(entry.getKey(), entry.getValue()); - } } } @@ -522,11 +490,6 @@ private void handleResourceRemoval(Collection removedResources, Resource subscriber.onRemoval(); } } - WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type); - if (wildcardSubscriber != null) - { - removedResources.forEach(wildcardSubscriber::onRemoval); - } } @@ -539,10 +502,6 @@ private void notifyStreamError(Status error) subscriber.onError(error); } } - for (WildcardResourceSubscriber wildcardResourceSubscriber : _wildcardSubscribers.values()) - { - wildcardResourceSubscriber.onError(error); - } _xdsClientJmx.setIsConnected(false); } @@ -555,10 +514,6 @@ private void notifyStreamReconnect() subscriber.onReconnect(); } } - for (WildcardResourceSubscriber wildcardResourceSubscriber : _wildcardSubscribers.values()) - { - wildcardResourceSubscriber.onReconnect(); - } _xdsClientJmx.setIsConnected(true); } @@ -568,12 +523,6 @@ Map getResourceSubscriberMap(ResourceType type) return _resourceSubscribers.get(type); } - @VisibleForTesting - WildcardResourceSubscriber getWildcardResourceSubscriber(ResourceType type) - { - return _wildcardSubscribers.get(type); - } - static class ResourceSubscriber { private final ResourceType _type; @@ -741,132 +690,24 @@ void onRemoval() } } - static class WildcardResourceSubscriber - { - private final ResourceType _type; - private final Set _watchers = new HashSet<>(); - private Map _data = new HashMap<>(); - - @VisibleForTesting - public Map getData() - { - return _data; - } - - @VisibleForTesting - public void setData(@Nullable Map data) - { - _data = data; - } - - WildcardResourceSubscriber(ResourceType type) - { - _type = type; - } - - void addWatcher(WildcardResourceWatcher watcher) - { - _watchers.add(watcher); - for (Map.Entry entry : _data.entrySet()) - { - watcher.onChanged(entry.getKey(), entry.getValue()); - _log.debug("Notifying watcher of current data for resource {} of type {}: {}", - entry.getKey(), _type, entry.getValue()); - } - } - - private void onData(String resourceName, ResourceUpdate data) - { - if (Objects.equals(_data.get(resourceName), data)) - { - _log.debug("Received resource update data equal to the current data. Will not perform the update."); - return; - } - // null value guard to avoid overwriting the property with null - if (data != null && data.isValid()) - { - _data.put(resourceName, data); - for (WildcardResourceWatcher watcher : _watchers) - { - watcher.onChanged(resourceName, data); - } - } - else - { - if (_type == ResourceType.D2_URI_MAP || _type == ResourceType.D2_URI) - { - RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); - } - else - { - _log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); - } - } - } - - public ResourceType getType() - { - return _type; - } - - private void onError(Status error) - { - for (WildcardResourceWatcher watcher : _watchers) - { - watcher.onError(error); - } - } - - private void onReconnect() - { - for (WildcardResourceWatcher watcher : _watchers) - { - watcher.onReconnect(); - } - } - - @VisibleForTesting - void onRemoval(String resourceName) - { - _data.remove(resourceName); - for (WildcardResourceWatcher watcher : _watchers) - { - watcher.onRemoval(resourceName); - } - } - } - - final class RpcRetryTask implements Runnable - { + final class RpcRetryTask implements Runnable { @Override - public void run() - { + public void run() { startRpcStreamLocal(); - for (ResourceType type : ResourceType.values()) - { - Set resources = new HashSet<>(getResourceSubscriberMap(type).keySet()); - if (resources.isEmpty() && getWildcardResourceSubscriber(type) == null) + for (ResourceType type : ResourceType.values()) { + Collection resources = getResourceSubscriberMap(type).keySet(); + if (resources.isEmpty()) { continue; } - ResourceType rewrittenType; if (_subscribeToUriGlobCollection && type == ResourceType.D2_URI_MAP) { resources = resources.stream() .map(GlobCollectionUtils::globCollectionUrlForClusterResource) - .collect(Collectors.toCollection(HashSet::new)); - rewrittenType = ResourceType.D2_URI; - } - else - { - rewrittenType = type; - } - // If there is a wildcard subscriber, we should always send a wildcard request to the server. - if (getWildcardResourceSubscriber(type) != null) - { - resources.add("*"); + .collect(Collectors.toSet()); + type = ResourceType.D2_URI; } - _adsStream.sendDiscoveryRequest(rewrittenType, resources); + _adsStream.sendDiscoveryRequest(type, resources); } } } diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index 6c89dbabc1..565a95c11e 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -443,37 +443,6 @@ public void testHandleD2URICollectionResponseWithRemoval() Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); } - @Test - public void testWildCardResourceSubscription() - { - XdsClientImplFixture fixture = new XdsClientImplFixture(); - - XdsClient.WildcardNodeResourceWatcher nodeWildCardWatcher = Mockito.mock(XdsClient.WildcardNodeResourceWatcher.class); - XdsClient.WildcardD2URIMapResourceWatcher uriMapWildCardWatcher = Mockito.mock(XdsClient.WildcardD2URIMapResourceWatcher.class); - fixture._xdsClientImpl.getWildcardResourceSubscriber(NODE).addWatcher(nodeWildCardWatcher); - fixture._xdsClientImpl.getWildcardResourceSubscriber(D2_URI_MAP).addWatcher(uriMapWildCardWatcher); - - // NODE resource added - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); - fixture.verifyAckSent(1); - nodeWildCardWatcher.onChanged(eq(SERVICE_RESOURCE_NAME) , eq(NODE_UPDATE1)); - - // NODE resource removed - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL); - fixture.verifyAckSent(2); - nodeWildCardWatcher.onRemoval(eq(SERVICE_RESOURCE_NAME)); - - // URI_MAP resource added - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); - fixture.verifyAckSent(3); - uriMapWildCardWatcher.onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); - - // URI_MAP resource removed - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA_WITH_REMOVAL); - fixture.verifyAckSent(4); - uriMapWildCardWatcher.onRemoval(eq(CLUSTER_RESOURCE_NAME)); - } - @Test public void testResourceSubscriberAddWatcher() { @@ -498,11 +467,7 @@ private static class XdsClientImplFixture XdsClientJmx _xdsClientJmx; ResourceSubscriber _nodeSubscriber; ResourceSubscriber _clusterSubscriber; - XdsClientImpl.WildcardResourceSubscriber _nodeWildcardSubscriber; - XdsClientImpl.WildcardResourceSubscriber _uriMapWildcardSubscriber; Map> _subscribers = new HashMap<>(); - Map _wildcardSubscribers = new HashMap<>(); - @Mock XdsClient.ResourceWatcher _resourceWatcher; @Mock @@ -518,10 +483,6 @@ private static class XdsClientImplFixture MockitoAnnotations.initMocks(this); _nodeSubscriber = spy(new ResourceSubscriber(NODE, SERVICE_RESOURCE_NAME, _xdsClientJmx)); _clusterSubscriber = spy(new ResourceSubscriber(D2_URI_MAP, CLUSTER_RESOURCE_NAME, _xdsClientJmx)); - _nodeWildcardSubscriber = new XdsClientImpl.WildcardResourceSubscriber(NODE); - _uriMapWildcardSubscriber = new XdsClientImpl.WildcardResourceSubscriber(D2_URI_MAP); - - doNothing().when(_resourceWatcher).onChanged(any()); for (ResourceSubscriber subscriber : Lists.newArrayList(_nodeSubscriber, _clusterSubscriber)) @@ -529,8 +490,7 @@ private static class XdsClientImplFixture subscriber.addWatcher(_resourceWatcher); _subscribers.put(subscriber.getType(), Collections.singletonMap(subscriber.getResource(), subscriber)); } - _wildcardSubscribers.put(NODE, _nodeWildcardSubscriber); - _wildcardSubscribers.put(D2_URI_MAP, _uriMapWildcardSubscriber); + doNothing().when(_serverMetricsProvider).trackLatency(anyLong()); _xdsClientImpl = spy(new XdsClientImpl(null, null, null, 0, useGlobCollections, _serverMetricsProvider)); @@ -538,10 +498,6 @@ private static class XdsClientImplFixture when(_xdsClientImpl.getXdsClientJmx()).thenReturn(_xdsClientJmx); when(_xdsClientImpl.getResourceSubscriberMap(any())) .thenAnswer(a -> _subscribers.get((ResourceType) a.getArguments()[0])); - doNothing().when(_xdsClientImpl).watchAllXdsResources(any()); - when(_xdsClientImpl.getWildcardResourceSubscriber(any())) - .thenAnswer(a -> _wildcardSubscribers.get((ResourceType) a.getArguments()[0])); - } void verifyAckSent(int count) diff --git a/gradle.properties b/gradle.properties index 61da0787d6..dcf79f962f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.58.7 +version=29.58.8 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true