From a33a1e11c1239d58e5502591636b099901cc3489 Mon Sep 17 00:00:00 2001 From: Paul Chesnais Date: Mon, 13 Nov 2023 15:25:24 -0800 Subject: [PATCH] Use Node instead of D2Node and D2URIMap instead of NodeMap for xDS flow (#944) * Use Node(Map) instead of D2node(Map) The initial intent of using D2Node was to pre-deserialize the JSON, but because there is no way for the rest.li client to actually leverage this, it simply re-serializes the JSON then deserializes it again. This causes memory pressure along with validation errors as numerical typesa get mungled. * Leverage pre-deserialize JSON struct instead of switching over to raw JSON data Additionally introduce a D2URI type which has a proper schema instead of being a Struct. This will significantly decrease the resource size sent by the observer and significantly decrease the cost of deserialization on the client. * Bump minor version * Comment and simplify D2URI * Meet proto standard of snake_case naming * Update comment on D2URI * Simplify D2URI even further * Use the raw JSON Node type instead of D2Node This is causing all sorts of validation issues due to integers being interpreted as floats and vice versa * Fix some compile errors * Update changelog * Fix test failing due to missing property * Update changelog --- CHANGELOG.md | 6 +- .../ClusterPropertiesJsonSerializer.java | 17 +++ .../ServicePropertiesJsonSerializer.java | 17 +++ .../UriPropertiesJsonSerializer.java | 36 ++++- .../properties/util/PropertyUtil.java | 67 ++++++++- .../stores/zk/SymlinkAwareZooKeeper.java | 13 ++ .../java/com/linkedin/d2/xds/XdsClient.java | 83 ++++------- .../com/linkedin/d2/xds/XdsClientImpl.java | 85 ++++------- .../d2/xds/XdsToD2PropertiesAdaptor.java | 95 ++++++------ d2/src/main/proto/XdsD2.proto | 66 +++++++++ .../d2/xds/TestXdsToD2PropertiesAdaptor.java | 138 ++++++++++++------ gradle.properties | 2 +- 12 files changed, 403 insertions(+), 222 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21a72fe58b..ba7e181a23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.47.0] - 2023-11-13 +- Use Node instead of D2Node and D2URIMap instead of NodeMap for xDS flow + ## [29.46.9] - 2023-11-02 - Update FieldDef so that it will lazily cache the hashCode. @@ -5557,7 +5560,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.9...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.47.0...master +[29.47.0]: https://github.com/linkedin/rest.li/compare/v29.46.9...v29.47.0 [29.46.9]: https://github.com/linkedin/rest.li/compare/v29.46.8...v29.46.9 [29.46.8]: https://github.com/linkedin/rest.li/compare/v29.46.7...v29.46.8 [29.46.7]: https://github.com/linkedin/rest.li/compare/v29.46.6...v29.46.7 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/properties/ClusterPropertiesJsonSerializer.java b/d2/src/main/java/com/linkedin/d2/balancer/properties/ClusterPropertiesJsonSerializer.java index 8e5c465ade..27a8da8324 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/properties/ClusterPropertiesJsonSerializer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/properties/ClusterPropertiesJsonSerializer.java @@ -16,6 +16,7 @@ package com.linkedin.d2.balancer.properties; +import com.google.protobuf.ByteString; import com.linkedin.d2.balancer.properties.util.PropertyUtil; import com.linkedin.d2.balancer.util.JacksonUtil; import com.linkedin.d2.discovery.PropertyBuilder; @@ -84,6 +85,22 @@ public ClusterProperties fromBytes(byte[] bytes, long version) throws PropertySe clusterProperties.setVersion(version); return clusterProperties; } + + public ClusterProperties fromBytes(ByteString bytes, long version) throws PropertySerializationException + { + try + { + @SuppressWarnings("unchecked") + Map untyped = JacksonUtil.getObjectMapper().readValue(bytes.newInput(), HashMap.class); + ClusterProperties clusterProperties = fromMap(untyped); + clusterProperties.setVersion(version); + return clusterProperties; + } + catch (Exception e) + { + throw new PropertySerializationException(e); + } + } @SuppressWarnings("unchecked") private static T mapGetOrDefault(Map map, String key, T defaultValue) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/properties/ServicePropertiesJsonSerializer.java b/d2/src/main/java/com/linkedin/d2/balancer/properties/ServicePropertiesJsonSerializer.java index 2ed2deb67a..28da19ee02 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/properties/ServicePropertiesJsonSerializer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/properties/ServicePropertiesJsonSerializer.java @@ -17,6 +17,7 @@ package com.linkedin.d2.balancer.properties; +import com.google.protobuf.ByteString; import com.linkedin.d2.balancer.properties.util.PropertyUtil; import com.linkedin.d2.balancer.subsetting.SubsettingStrategy; import com.linkedin.d2.balancer.util.JacksonUtil; @@ -202,6 +203,22 @@ public ServiceProperties fromBytes(byte[] bytes, long version) throws PropertySe return serviceProperties; } + public ServiceProperties fromBytes(ByteString bytes, long version) throws PropertySerializationException + { + try + { + @SuppressWarnings("unchecked") + Map untyped = JacksonUtil.getObjectMapper().readValue(bytes.newInput(), Map.class); + ServiceProperties serviceProperties = fromMap(untyped); + serviceProperties.setVersion(version); + return serviceProperties; + } + catch (Exception e) + { + throw new PropertySerializationException(e); + } + } + /** * Always return the composite class {@link ServiceStoreProperties} to include ALL properties stored on service registry (like Zookeeper), * such as canary configs, distribution strategy, etc. diff --git a/d2/src/main/java/com/linkedin/d2/balancer/properties/UriPropertiesJsonSerializer.java b/d2/src/main/java/com/linkedin/d2/balancer/properties/UriPropertiesJsonSerializer.java index fd2aaadc66..20d3f4e5c5 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/properties/UriPropertiesJsonSerializer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/properties/UriPropertiesJsonSerializer.java @@ -16,20 +16,19 @@ package com.linkedin.d2.balancer.properties; - import com.linkedin.d2.balancer.properties.util.PropertyUtil; import com.linkedin.d2.balancer.util.JacksonUtil; import com.linkedin.d2.balancer.util.partitions.DefaultPartitionAccessor; import com.linkedin.d2.discovery.PropertyBuilder; import com.linkedin.d2.discovery.PropertySerializationException; import com.linkedin.d2.discovery.PropertySerializer; -import java.util.Collections; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import indis.XdsD2; import java.net.URI; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class UriPropertiesJsonSerializer implements PropertySerializer, PropertyBuilder { @@ -109,6 +108,33 @@ public UriProperties fromBytes(byte[] bytes, long version) throws PropertySerial return uriProperties; } + public UriProperties fromProto(XdsD2.D2URI protoUri) throws PropertySerializationException + { + try + { + URI uri = URI.create(protoUri.getUri()); + + Map partitionDesc = new HashMap<>(protoUri.getPartitionDescCount()); + for (Map.Entry partition : protoUri.getPartitionDescMap().entrySet()) + { + partitionDesc.put(partition.getKey(), new PartitionData(partition.getValue())); + } + + Map applicationProperties = PropertyUtil.protoStructToMap(protoUri.getUriSpecificProperties()); + + return new UriProperties( + protoUri.getClusterName(), + Collections.singletonMap(uri, partitionDesc), + Collections.singletonMap(uri, applicationProperties), + protoUri.getVersion() + ); + } + catch (Exception e) + { + throw new PropertySerializationException(e); + } + } + @Override @SuppressWarnings("unchecked") public UriProperties fromMap(Map map) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/properties/util/PropertyUtil.java b/d2/src/main/java/com/linkedin/d2/balancer/properties/util/PropertyUtil.java index b99eb9afa6..f6d426f7a8 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/properties/util/PropertyUtil.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/properties/util/PropertyUtil.java @@ -16,9 +16,13 @@ package com.linkedin.d2.balancer.properties.util; -import com.linkedin.data.template.TemplateOutputCastException; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; import com.linkedin.util.ArgumentUtil; - +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; public class PropertyUtil @@ -152,4 +156,63 @@ else if (value instanceof Double && clazz.equals(Integer.class)) } return (T) value; } + + /** + * Efficiently translates a proto JSON {@link Struct} into a {@code Map} without additional + * serialization or deserialization. + */ + public static Map protoStructToMap(Struct struct) + { + if (struct.getFieldsCount() == 0) { + return Collections.emptyMap(); + } + Map map = new HashMap<>(struct.getFieldsMap().size()); + for (Map.Entry entry : struct.getFieldsMap().entrySet()) + { + map.put(entry.getKey(), valueToObject(entry.getValue())); + } + return map; + } + + private static Object valueToObject(Value value) + { + if (value.hasBoolValue()) + { + return value.getBoolValue(); + } + else if (value.hasStringValue()) + { + return value.getStringValue(); + } + else if (value.hasNumberValue()) + { + return value.getNumberValue(); + } + else if (value.hasNullValue()) + { + return null; + } + else if (value.hasStructValue()) + { + Map map = new HashMap<>(value.getStructValue().getFieldsCount()); + for (Map.Entry entry : value.getStructValue().getFieldsMap().entrySet()) + { + map.put(entry.getKey(), valueToObject(entry.getValue())); + } + return map; + } + else if (value.hasListValue()) + { + List list = new ArrayList<>(value.getListValue().getValuesCount()); + for (Value element : value.getListValue().getValuesList()) + { + list.add(valueToObject(element)); + } + return list; + } + else + { + throw new RuntimeException("Unexpected proto value of unknown type: " + value); + } + } } diff --git a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/SymlinkAwareZooKeeper.java b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/SymlinkAwareZooKeeper.java index 9fba57c64c..8fea5e4256 100644 --- a/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/SymlinkAwareZooKeeper.java +++ b/d2/src/main/java/com/linkedin/d2/discovery/stores/zk/SymlinkAwareZooKeeper.java @@ -16,6 +16,7 @@ package com.linkedin.d2.discovery.stores.zk; +import com.google.protobuf.ByteString; import com.linkedin.d2.discovery.PropertySerializationException; import com.linkedin.d2.discovery.PropertySerializer; import org.apache.zookeeper.AsyncCallback; @@ -601,6 +602,18 @@ public String fromBytes(byte[] bytes) throws PropertySerializationException } } + public String fromBytes(ByteString bytes) throws PropertySerializationException + { + try + { + return bytes.toString("UTF-8"); + } + catch (UnsupportedEncodingException e) + { + throw new PropertySerializationException(e); + } + } + @Override public byte[] toBytes(String property) { diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index a9b48de6c5..5c1d1e2c28 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -24,9 +24,8 @@ public abstract class XdsClient { - private static final String D2_NODE_TYPE_URL = "type.googleapis.com/indis.D2Node"; - private static final String D2_SYMLINK_NODE_TYPE_URL = "type.googleapis.com/indis.D2SymlinkNode"; - private static final String D2_NODE_MAP_TYPE_URL = "type.googleapis.com/indis.D2NodeMap"; + private static final String NODE_TYPE_URL = "type.googleapis.com/indis.Node"; + private static final String D2_URI_MAP_TYPE_URL = "type.googleapis.com/indis.D2URIMap"; interface ResourceWatcher { @@ -41,19 +40,19 @@ interface ResourceWatcher void onReconnect(); } - interface D2NodeResourceWatcher extends ResourceWatcher + interface NodeResourceWatcher extends ResourceWatcher { - void onChanged(D2NodeUpdate update); + void onChanged(NodeUpdate update); } - interface D2SymlinkNodeResourceWatcher extends ResourceWatcher + interface SymlinkNodeResourceWatcher extends ResourceWatcher { - void onChanged(String resourceName, D2SymlinkNodeUpdate update); + void onChanged(String resourceName, NodeUpdate update); } - interface D2NodeMapResourceWatcher extends ResourceWatcher + interface D2URIMapResourceWatcher extends ResourceWatcher { - void onChanged(D2NodeMapUpdate update); + void onChanged(D2URIMapUpdate update); } interface ResourceUpdate @@ -61,18 +60,18 @@ interface ResourceUpdate } - static final class D2NodeUpdate implements ResourceUpdate + static final class NodeUpdate implements ResourceUpdate { String _version; - XdsD2.D2Node _nodeData; + XdsD2.Node _nodeData; - D2NodeUpdate(String version, XdsD2.D2Node nodeData) + NodeUpdate(String version, XdsD2.Node nodeData) { _version = version; _nodeData = nodeData; } - XdsD2.D2Node getNodeData() + XdsD2.Node getNodeData() { return _nodeData; } @@ -83,42 +82,20 @@ public String getVersion() } } - static final class D2SymlinkNodeUpdate implements ResourceUpdate + static final class D2URIMapUpdate implements ResourceUpdate { String _version; - XdsD2.D2SymlinkNode _nodeData; + Map _uriMap; - D2SymlinkNodeUpdate(String version, XdsD2.D2SymlinkNode nodeData) + D2URIMapUpdate(String version, Map uriMap) { _version = version; - _nodeData = nodeData; - } - - XdsD2.D2SymlinkNode getNodeData() - { - return _nodeData; - } - - public String getVersion() - { - return _version; - } - } - - static final class D2NodeMapUpdate implements ResourceUpdate - { - String _version; - Map _nodeDataMap; - - D2NodeMapUpdate(String version, Map nodeDataMap) - { - _version = version; - _nodeDataMap = nodeDataMap; + _uriMap = uriMap; } - public Map getNodeDataMap() + public Map getURIMap() { - return _nodeDataMap; + return _uriMap; } public String getVersion() @@ -129,21 +106,17 @@ public String getVersion() enum ResourceType { - UNKNOWN, D2_NODE, D2_SYMLINK_NODE, D2_NODE_MAP; + UNKNOWN, NODE, D2_URI_MAP; static ResourceType fromTypeUrl(String typeUrl) { - if (typeUrl.equals(D2_NODE_TYPE_URL)) - { - return D2_NODE; - } - if (typeUrl.equals(D2_SYMLINK_NODE_TYPE_URL)) + if (typeUrl.equals(NODE_TYPE_URL)) { - return D2_SYMLINK_NODE; + return NODE; } - if (typeUrl.equals(D2_NODE_MAP_TYPE_URL)) + if (typeUrl.equals(D2_URI_MAP_TYPE_URL)) { - return D2_NODE_MAP; + return D2_URI_MAP; } return UNKNOWN; } @@ -152,12 +125,10 @@ String typeUrl() { switch (this) { - case D2_NODE: - return D2_NODE_TYPE_URL; - case D2_SYMLINK_NODE: - return D2_SYMLINK_NODE_TYPE_URL; - case D2_NODE_MAP: - return D2_NODE_MAP_TYPE_URL; + case NODE: + return NODE_TYPE_URL; + case D2_URI_MAP: + return D2_URI_MAP_TYPE_URL; case UNKNOWN: default: throw new AssertionError("Unknown or missing case in enum switch: " + this); diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 2b1a5b7b15..59765a311e 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -56,8 +56,7 @@ public class XdsClientImpl extends XdsClient public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L; private final Map _d2NodeSubscribers = new HashMap<>(); - private final Map _d2SymlinkNodeSubscribers = new HashMap<>(); - private final Map _d2NodeMapSubscribers = new HashMap<>(); + private final Map _d2URIMapSubscribers = new HashMap<>(); private final Node _node; private final ManagedChannel _managedChannel; @@ -214,7 +213,7 @@ private void readyHandler() private void handleD2NodeResponse(DiscoveryResponseData data) { - Map updates = new HashMap<>(); + Map updates = new HashMap<>(); List errors = new ArrayList<>(); for (Resource resource: data.getResourcesList()) @@ -222,21 +221,21 @@ private void handleD2NodeResponse(DiscoveryResponseData data) String resourceName = resource.getName(); try { - XdsD2.D2Node d2Node = resource.getResource().unpack(XdsD2.D2Node.class); - updates.put(resourceName, new D2NodeUpdate(resource.getVersion(), d2Node)); + XdsD2.Node d2Node = resource.getResource().unpack(XdsD2.Node.class); + updates.put(resourceName, new NodeUpdate(resource.getVersion(), d2Node)); } catch (InvalidProtocolBufferException e) { - _log.warn("Failed to unpack D2Node response", e); - errors.add("Failed to unpack D2Node response"); + _log.warn("Failed to unpack Node response", e); + errors.add("Failed to unpack Node response"); } } handleResourceUpdate(updates, data.getResourceType(), data.getNonce(), errors); } - private void handleD2SymlinkNodeResponse(DiscoveryResponseData data) + private void handleD2URIMapResponse(DiscoveryResponseData data) { - Map updates = new HashMap<>(); + Map updates = new HashMap<>(); List errors = new ArrayList<>(); for (Resource resource: data.getResourcesList()) @@ -244,35 +243,13 @@ private void handleD2SymlinkNodeResponse(DiscoveryResponseData data) String resourceName = resource.getName(); try { - XdsD2.D2SymlinkNode symlinkNode = resource.getResource().unpack(XdsD2.D2SymlinkNode.class); - updates.put(resourceName, new D2SymlinkNodeUpdate(resource.getVersion(), symlinkNode)); + XdsD2.D2URIMap uriMap = resource.getResource().unpack(XdsD2.D2URIMap.class); + Map nodeData = uriMap.getUrisMap(); + updates.put(resourceName, new D2URIMapUpdate(resource.getVersion(), nodeData)); } catch (InvalidProtocolBufferException e) { - _log.warn("Failed to unpack D2SymlinkNode response", e); - errors.add("Failed to unpack D2SymlinkNode response"); - } - } - - handleResourceUpdate(updates, data.getResourceType(), data.getNonce(), errors); - } - - private void handleD2NodeMapResponse(DiscoveryResponseData data) - { - Map updates = new HashMap<>(); - List errors = new ArrayList<>(); - - for (Resource resource: data.getResourcesList()) - { - String resourceName = resource.getName(); - try - { - XdsD2.D2NodeMap d2NodeMap = resource.getResource().unpack(XdsD2.D2NodeMap.class); - Map nodeData = d2NodeMap.getNodesMap(); - updates.put(resourceName, new D2NodeMapUpdate(resource.getVersion(), nodeData)); - } catch (InvalidProtocolBufferException e) - { - _log.warn("Failed to unpack D2NodeMap response", e); - errors.add("Failed to unpack D2NodeMap response"); + _log.warn("Failed to unpack D2URIMap response", e); + errors.add("Failed to unpack D2URIMap response"); } } @@ -307,7 +284,7 @@ private void notifyStreamError(Status error) { for (ResourceSubscriber subscriber : _d2NodeSubscribers.values()) { subscriber.onError(error); } - for (ResourceSubscriber subscriber : _d2NodeMapSubscribers.values()) { + for (ResourceSubscriber subscriber : _d2URIMapSubscribers.values()) { subscriber.onError(error); } } @@ -316,7 +293,7 @@ private void notifyStreamReconnect() { for (ResourceSubscriber subscriber : _d2NodeSubscribers.values()) { subscriber.onReconnect(); } - for (ResourceSubscriber subscriber : _d2NodeMapSubscribers.values()) { + for (ResourceSubscriber subscriber : _d2URIMapSubscribers.values()) { subscriber.onReconnect(); } } @@ -325,12 +302,10 @@ private Map getResourceSubscriberMap(ResourceType ty { switch (type) { - case D2_NODE: + case NODE: return _d2NodeSubscribers; - case D2_SYMLINK_NODE: - return _d2SymlinkNodeSubscribers; - case D2_NODE_MAP: - return _d2NodeMapSubscribers; + case D2_URI_MAP: + return _d2URIMapSubscribers; case UNKNOWN: default: throw new AssertionError("Unknown resource type"); @@ -369,14 +344,15 @@ private void notifyWatcher(ResourceWatcher watcher, ResourceUpdate update) { switch (_type) { - case D2_NODE: - ((D2NodeResourceWatcher) watcher).onChanged((D2NodeUpdate) update); + case NODE: + if (watcher instanceof NodeResourceWatcher) { + ((NodeResourceWatcher) watcher).onChanged((NodeUpdate) update); + } else { + ((SymlinkNodeResourceWatcher) watcher).onChanged(_resource, (NodeUpdate) update); + } break; - case D2_SYMLINK_NODE: - ((D2SymlinkNodeResourceWatcher) watcher).onChanged(_resource, (D2SymlinkNodeUpdate) update); - break; - case D2_NODE_MAP: - ((D2NodeMapResourceWatcher) watcher).onChanged((D2NodeMapUpdate) update); + case D2_URI_MAP: + ((D2URIMapResourceWatcher) watcher).onChanged((D2URIMapUpdate) update); break; case UNKNOWN: default: @@ -643,14 +619,11 @@ private void handleResponse(DiscoveryResponseData response) ResourceType resourceType = response.getResourceType(); switch (resourceType) { - case D2_NODE: + case NODE: handleD2NodeResponse(response); break; - case D2_SYMLINK_NODE: - handleD2SymlinkNodeResponse(response); - break; - case D2_NODE_MAP: - handleD2NodeMapResponse(response); + case D2_URI_MAP: + handleD2URIMapResponse(response); break; case UNKNOWN: _log.warn("Received an unknown type of DiscoveryResponse\n{}", respNonce); diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java index 8836a7c628..3a5c77848e 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java @@ -20,8 +20,6 @@ import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Struct; -import com.google.protobuf.util.JsonFormat; import com.linkedin.d2.balancer.dualread.DualReadStateManager; import com.linkedin.d2.balancer.properties.ClusterProperties; import com.linkedin.d2.balancer.properties.ClusterPropertiesJsonSerializer; @@ -67,10 +65,10 @@ public class XdsToD2PropertiesAdaptor private final UriPropertiesJsonSerializer _uriPropertiesJsonSerializer; private final UriPropertiesMerger _uriPropertiesMerger; private final DualReadStateManager _dualReadStateManager; - private final ConcurrentMap _watchedClusterResources; - private final ConcurrentMap _watchedSymlinkResources; - private final ConcurrentMap _watchedServiceResources; - private final ConcurrentMap _watchedUriResources; + private final ConcurrentMap _watchedClusterResources; + private final ConcurrentMap _watchedSymlinkResources; + private final ConcurrentMap _watchedServiceResources; + private final ConcurrentMap _watchedUriResources; // Mapping between a symlink name, like "$FooClusterMaster" and the actual node name it's pointing to, like // "FooCluster-prod-ltx1". // (Note that this name does NOT include the full path so that it works for both cluster symlink @@ -153,9 +151,8 @@ public void listenToCluster(String clusterName) { _watchedClusterResources.computeIfAbsent(clusterName, k -> { - XdsClient.D2NodeResourceWatcher watcher = getClusterResourceWatcher(clusterName); - _xdsClient.watchXdsResource(resourceName, XdsClient.ResourceType.D2_NODE, - watcher); + XdsClient.NodeResourceWatcher watcher = getClusterResourceWatcher(clusterName); + _xdsClient.watchXdsResource(resourceName, XdsClient.ResourceType.NODE, watcher); return watcher; }); } @@ -173,9 +170,8 @@ public void listenToUris(String clusterName) { _watchedUriResources.computeIfAbsent(clusterName, k -> { - XdsClient.D2NodeMapResourceWatcher watcher = getUriResourceWatcher(clusterName); - _xdsClient.watchXdsResource(resourceName, XdsClient.ResourceType.D2_NODE_MAP, - watcher); + XdsClient.D2URIMapResourceWatcher watcher = getUriResourceWatcher(clusterName); + _xdsClient.watchXdsResource(resourceName, XdsClient.ResourceType.D2_URI_MAP, watcher); return watcher; }); } @@ -185,9 +181,8 @@ public void listenToService(String serviceName) { _watchedServiceResources.computeIfAbsent(serviceName, k -> { - XdsClient.D2NodeResourceWatcher watcher = getServiceResourceWatcher(serviceName); - _xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, XdsClient.ResourceType.D2_NODE, - watcher); + XdsClient.NodeResourceWatcher watcher = getServiceResourceWatcher(serviceName); + _xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, XdsClient.ResourceType.NODE, watcher); return watcher; }); } @@ -204,33 +199,31 @@ private void listenToSymlink(String name, String fullResourceName) _watchedSymlinkResources.computeIfAbsent(fullResourceName, k -> { // use symlink name "$FooClusterMaster" to create the watcher - XdsClient.D2SymlinkNodeResourceWatcher watcher = getSymlinkResourceWatcher(name); - _xdsClient.watchXdsResource(k, XdsClient.ResourceType.D2_SYMLINK_NODE, - watcher); + XdsClient.SymlinkNodeResourceWatcher watcher = getSymlinkResourceWatcher(name); + _xdsClient.watchXdsResource(k, XdsClient.ResourceType.NODE, watcher); return watcher; }); } - XdsClient.D2NodeResourceWatcher getServiceResourceWatcher(String serviceName) + XdsClient.NodeResourceWatcher getServiceResourceWatcher(String serviceName) { - return new XdsClient.D2NodeResourceWatcher() + return new XdsClient.NodeResourceWatcher() { @Override - public void onChanged(XdsClient.D2NodeUpdate update) + public void onChanged(XdsClient.NodeUpdate update) { if (_serviceEventBus != null) { try { - ServiceProperties serviceProperties = toServiceProperties(update.getNodeData().getData(), - update.getNodeData().getStat().getMzxid()); + ServiceProperties serviceProperties = toServiceProperties(update.getNodeData()); _serviceEventBus.publishInitialize(serviceName, serviceProperties); if (_dualReadStateManager != null) { _dualReadStateManager.reportData(serviceName, serviceProperties, true); } } - catch (InvalidProtocolBufferException | PropertySerializationException e) + catch (PropertySerializationException e) { _log.error("Failed to parse D2 service properties from xDS update. Service name: " + serviceName, e); } @@ -251,19 +244,18 @@ public void onReconnect() }; } - XdsClient.D2NodeResourceWatcher getClusterResourceWatcher(String clusterName) + XdsClient.NodeResourceWatcher getClusterResourceWatcher(String clusterName) { - return new XdsClient.D2NodeResourceWatcher() + return new XdsClient.NodeResourceWatcher() { @Override - public void onChanged(XdsClient.D2NodeUpdate update) + public void onChanged(XdsClient.NodeUpdate update) { if (_clusterEventBus != null) { try { - ClusterProperties clusterProperties = toClusterProperties(update.getNodeData().getData(), - update.getNodeData().getStat().getMzxid()); + ClusterProperties clusterProperties = toClusterProperties(update.getNodeData()); // For symlink clusters, ClusterLoadBalancerSubscriber subscribed to the symlinks, instead of the actual node, in event bus, // so we need to publish under the symlink names. Also, rarely and possibly, the original cluster could have subscribers // too when calls are made directly to the original cluster, so we publish for it too. @@ -285,7 +277,7 @@ public void onChanged(XdsClient.D2NodeUpdate update) _dualReadStateManager.reportData(clusterName, clusterProperties, true); } } - catch (InvalidProtocolBufferException | PropertySerializationException e) + catch (PropertySerializationException e) { _log.error("Failed to parse D2 cluster properties from xDS update. Cluster name: " + clusterName, e); } @@ -306,20 +298,20 @@ public void onReconnect() }; } - XdsClient.D2NodeMapResourceWatcher getUriResourceWatcher(String clusterName) + XdsClient.D2URIMapResourceWatcher getUriResourceWatcher(String clusterName) { return new UriPropertiesResourceWatcher(clusterName); } - XdsClient.D2SymlinkNodeResourceWatcher getSymlinkResourceWatcher(String symlinkName) + XdsClient.SymlinkNodeResourceWatcher getSymlinkResourceWatcher(String symlinkName) { - return new XdsClient.D2SymlinkNodeResourceWatcher() + return new XdsClient.SymlinkNodeResourceWatcher() { @Override - public void onChanged(String resourceName, XdsClient.D2SymlinkNodeUpdate update) + public void onChanged(String resourceName, XdsClient.NodeUpdate update) { // Update maps between symlink name and actual node name - String actualResourceName = update.getNodeData().getMasterClusterNodePath(); + String actualResourceName = update.getNodeData().getData().toString(StandardCharsets.UTF_8); String actualNodeName = getNodeName(actualResourceName); updateSymlinkAndActualNodeMap(symlinkName, actualNodeName); // listen to the actual nodes @@ -385,37 +377,34 @@ private void notifyAvailabilityChanges(boolean isAvailable) } } - private ServiceProperties toServiceProperties(Struct serviceProperties, long version) - throws InvalidProtocolBufferException, PropertySerializationException + private ServiceProperties toServiceProperties(XdsD2.Node serviceProperties) throws PropertySerializationException { - return _servicePropertiesJsonSerializer.fromBytes( - JsonFormat.printer().print(serviceProperties).getBytes(StandardCharsets.UTF_8), version); + return _servicePropertiesJsonSerializer.fromBytes(serviceProperties.getData(), + serviceProperties.getStat().getMzxid()); } - private ClusterProperties toClusterProperties(Struct clusterProperties, long version) - throws InvalidProtocolBufferException, PropertySerializationException + private ClusterProperties toClusterProperties(XdsD2.Node clusterProperties) throws PropertySerializationException { - return _clusterPropertiesJsonSerializer.fromBytes( - JsonFormat.printer().print(clusterProperties).getBytes(StandardCharsets.UTF_8), version); + return _clusterPropertiesJsonSerializer.fromBytes(clusterProperties.getData(), + clusterProperties.getStat().getMzxid()); } - private Map toUriProperties(Map uriDataMap) - throws InvalidProtocolBufferException, PropertySerializationException + private Map toUriProperties(Map uriDataMap) + throws PropertySerializationException { Map parsedMap = new HashMap<>(); - for (Map.Entry entry : uriDataMap.entrySet()) + for (Map.Entry entry : uriDataMap.entrySet()) { - XdsD2.D2Node d2Node = entry.getValue(); - UriProperties uriProperties = _uriPropertiesJsonSerializer.fromBytes( - JsonFormat.printer().print(d2Node.getData()).getBytes(StandardCharsets.UTF_8), d2Node.getStat().getMzxid()); + XdsD2.D2URI d2URI = entry.getValue(); + UriProperties uriProperties = _uriPropertiesJsonSerializer.fromProto(d2URI); parsedMap.put(entry.getKey(), uriProperties); } return parsedMap; } - private class UriPropertiesResourceWatcher implements XdsClient.D2NodeMapResourceWatcher + private class UriPropertiesResourceWatcher implements XdsClient.D2URIMapResourceWatcher { final String _clusterName; final AtomicBoolean _isInit; @@ -441,7 +430,7 @@ public UriPropertiesResourceWatcher(String clusterName) // (like "FooCluster-prod-ltx1"), which has no subscribers anyway, so no harm to publish. Yet, we still emit the tracking // events about receiving uri updates of this cluster for measuring update propagation latencies. @Override - public void onChanged(XdsClient.D2NodeMapUpdate update) + public void onChanged(XdsClient.D2URIMapUpdate update) { boolean isInit = _isInit.compareAndSet(true, false); if (isInit) @@ -453,7 +442,7 @@ public void onChanged(XdsClient.D2NodeMapUpdate update) { try { - Map updates = toUriProperties(update.getNodeDataMap()); + Map updates = toUriProperties(update.getURIMap()); if (!isInit) { emitSDStatusUpdateReceiptEvents(updates); @@ -473,7 +462,7 @@ public void onChanged(XdsClient.D2NodeMapUpdate update) _dualReadStateManager.reportData(_clusterName, mergedUriProperties, true); } } - catch (InvalidProtocolBufferException | PropertySerializationException e) + catch (PropertySerializationException e) { _log.error("Failed to parse D2 uri properties from xDS update. Cluster name: " + _clusterName, e); } diff --git a/d2/src/main/proto/XdsD2.proto b/d2/src/main/proto/XdsD2.proto index 537cde9723..ad7ec09fd8 100644 --- a/d2/src/main/proto/XdsD2.proto +++ b/d2/src/main/proto/XdsD2.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package indis; import "google/protobuf/struct.proto"; +import "google/protobuf/timestamp.proto"; message Stat { // The zxid of the change that caused this znode to be created. @@ -30,15 +31,80 @@ message Stat { } message D2Node { + // Deprecated in favor of Node + option deprecated = true; Stat stat = 1; google.protobuf.Struct data = 2; } message D2SymlinkNode { + // Deprecated in favor of Node + option deprecated = true; Stat stat = 1; string masterClusterNodePath = 2; } message D2NodeMap { + // Deprecated in favor of D2UriMap + option deprecated = true; map nodes = 1; } + +message Node { + Stat stat = 1; + bytes data = 2 ; +} + +// D2URI is a proto representation of com.linkedin.d2.balancer.properties.UriProperties. Note that a D2 UriProperties is +// is designed to hold all the announcements of a cluster, which is why it's represented as a map of URI to data. The +// UriProperties class is reused wholesale for serialization to write the data to ZK, which is why all fields are +// actually maps, even though these maps only ever have one key in them. It is clear from the implementation of +// ZooKeeperServer and ZooKeeperAnnouncer that there cannot ever be more than one URI in one ZK announcement, therefore +// this new proto representation does not need to share the same shortcomings and can, instead, represent things more +// linearly. +// +// Here is a sample ZK announcement in JSON serialized from a UriProperties for additional clarity on the fields that +// are represented as maps when they do not need to be: +// { +// "weights": { +// "https://foo.stg.linkedin.com:18792/Toki/resources": 1.0 +// }, +// "partitionDesc": { +// "https://foo.stg.linkedin.com:18792/Toki/resources": { +// "0": { +// "weight": 1.0 +// } +// } +// }, +// "uriSpecificProperties": { +// "https://foo.stg.linkedin.com:18792/Toki/resources": { +// "com.linkedin.app.version": "0.1.76" +// } +// }, +// "clusterName": "Toki" +// } +message D2URI { + // The version of this announcement. When coming from ZK, this will be the node's mzxid. + int64 version = 1; + + // The time at which this announcement was last updated. When coming from ZK this will be the node's mtime. + google.protobuf.Timestamp modified_time = 2; + + // The name of the cluster this announcement belongs to. This is inferred from the original "clusterName" field. + string cluster_name = 3; + + // The URI for this announcement, i.e. the host, port and context path that requests should be sent to. + string uri = 4; + + // The partitions and their corresponding weight for this announcement. This is inferred from the original + // "partitionDesc" and "weights" fields. If "partitionDesc" is present in the original ZK node, it is always used + // regardless of "weights". Otherwise, "weights" is assumed to be for partition 0, as specified in UriProperties. + map partition_desc = 5; + + // Additional metadata for this announcement. This is inferred from the original "uriSpecificProperties" field. + google.protobuf.Struct uri_specific_properties = 6; +} + +message D2URIMap { + map uris = 1; +} diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java index 2d4cb726f2..d268536ba3 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java @@ -1,22 +1,29 @@ package com.linkedin.d2.xds; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.ListValue; +import com.google.protobuf.ByteString; import com.google.protobuf.Struct; import com.google.protobuf.Value; import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ClusterPropertiesJsonSerializer; import com.linkedin.d2.balancer.properties.ClusterStoreProperties; +import com.linkedin.d2.balancer.properties.PartitionData; import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.ServicePropertiesJsonSerializer; import com.linkedin.d2.balancer.properties.ServiceStoreProperties; import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.d2.balancer.properties.UriPropertiesJsonSerializer; +import com.linkedin.d2.discovery.PropertySerializationException; import com.linkedin.d2.discovery.event.PropertyEventBus; import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter; import indis.XdsD2; +import java.net.URI; import java.util.Collections; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import org.testng.Assert; import org.testng.annotations.Test; import static org.mockito.Mockito.*; @@ -34,8 +41,9 @@ public class TestXdsToD2PropertiesAdaptor { private static final String URI_SYMLINK_RESOURCE_NAME = URI_NODE_PREFIX + SYMLINK_NAME; private static final String PRIMARY_URI_RESOURCE_NAME = URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME; - private static final XdsClient.D2NodeMapUpdate DUMMY_NODE_MAP_UPDATE = new XdsClient.D2NodeMapUpdate("", + private static final XdsClient.D2URIMapUpdate DUMMY_NODE_MAP_UPDATE = new XdsClient.D2URIMapUpdate("", Collections.emptyMap()); + private static final long DUMMY_VERSION = 123; @Test public void testListenToService() @@ -44,21 +52,23 @@ public void testListenToService() String serviceName = "FooService"; fixture.getSpiedAdaptor().listenToService(serviceName); - verify(fixture._xdsClient).watchXdsResource(eq("/d2/services/" + serviceName), eq(XdsClient.ResourceType.D2_NODE), any()); - - XdsClient.D2NodeResourceWatcher symlinkNodeWatcher = - (XdsClient.D2NodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue(); - symlinkNodeWatcher.onChanged(new XdsClient.D2NodeUpdate("", XdsD2.D2Node.newBuilder() - .setData(Struct.newBuilder().putAllFields( - ImmutableMap.of( - "serviceName", getProtoStringValue(serviceName), - "clusterName", getProtoStringValue(PRIMARY_CLUSTER_NAME), - "path", getProtoStringValue(""), - "loadBalancerStrategyList", Value.newBuilder().setListValue( - ListValue.newBuilder().addValues(getProtoStringValue("relative")).build() - ).build() + verify(fixture._xdsClient).watchXdsResource(eq("/d2/services/" + serviceName), eq(XdsClient.ResourceType.NODE), any()); + + XdsClient.NodeResourceWatcher symlinkNodeWatcher = + (XdsClient.NodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue(); + symlinkNodeWatcher.onChanged(new XdsClient.NodeUpdate("", XdsD2.Node.newBuilder() + .setData( + ByteString.copyFrom( + new ServicePropertiesJsonSerializer().toBytes( + new ServiceProperties( + serviceName, + PRIMARY_CLUSTER_NAME, + "", + Collections.singletonList("relative") + ) + ) ) - )) + ) .setStat(XdsD2.Stat.newBuilder().setMzxid(1L).build()) .build()) ); @@ -74,26 +84,27 @@ public void testListenToNormalCluster() XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); fixture.getSpiedAdaptor().listenToCluster(PRIMARY_CLUSTER_NAME); - verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE), any()); + verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), eq(XdsClient.ResourceType.NODE), any()); verifyClusterNodeUpdate(fixture, PRIMARY_CLUSTER_NAME, null, PRIMARY_CLUSTER_PROPERTIES); } @Test - public void testListenToClusterSymlink() { + public void testListenToClusterSymlink() + { XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); fixture.getSpiedAdaptor().listenToCluster(SYMLINK_NAME); - verify(fixture._xdsClient).watchXdsResource(eq(CLUSTER_SYMLINK_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_SYMLINK_NODE), any()); + verify(fixture._xdsClient).watchXdsResource(eq(CLUSTER_SYMLINK_RESOURCE_NAME), eq(XdsClient.ResourceType.NODE), any()); - XdsClient.D2SymlinkNodeResourceWatcher symlinkNodeWatcher = - (XdsClient.D2SymlinkNodeResourceWatcher) fixture._symlinkWatcherArgumentCaptor.getValue(); + XdsClient.SymlinkNodeResourceWatcher symlinkNodeWatcher = + (XdsClient.SymlinkNodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue(); symlinkNodeWatcher.onChanged(CLUSTER_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(PRIMARY_CLUSTER_RESOURCE_NAME)); - verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE), any()); - verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE_MAP), any()); + verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), eq(XdsClient.ResourceType.NODE), any()); + verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_URI_MAP), any()); - XdsClient.D2NodeResourceWatcher clusterNodeWatcher = - (XdsClient.D2NodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue(); + XdsClient.NodeResourceWatcher clusterNodeWatcher = + (XdsClient.NodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue(); clusterNodeWatcher.onChanged(getClusterNodeUpdate(PRIMARY_CLUSTER_NAME)); verify(fixture._clusterEventBus).publishInitialize(SYMLINK_NAME, PRIMARY_CLUSTER_PROPERTIES); @@ -105,9 +116,9 @@ public void testListenToClusterSymlink() { symlinkNodeWatcher.onChanged(CLUSTER_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(primaryClusterResourceName2)); - verify(fixture._xdsClient).watchXdsResource(eq(primaryClusterResourceName2), eq(XdsClient.ResourceType.D2_NODE), any()); + verify(fixture._xdsClient).watchXdsResource(eq(primaryClusterResourceName2), eq(XdsClient.ResourceType.NODE), any()); verify(fixture._xdsClient).watchXdsResource(eq(URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2), - eq(XdsClient.ResourceType.D2_NODE_MAP), any()); + eq(XdsClient.ResourceType.D2_URI_MAP), any()); verifyClusterNodeUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME, primaryClusterProperties2); // if the old primary cluster gets an update, it will be published under its original cluster name @@ -125,7 +136,7 @@ public void testListenToNormalUri() XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); fixture.getSpiedAdaptor().listenToUris(PRIMARY_CLUSTER_NAME); - verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE_MAP), any()); + verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_URI_MAP), any()); verifyUriUpdate(fixture, PRIMARY_CLUSTER_NAME, null); } @@ -135,16 +146,16 @@ public void testListenToUriSymlink() XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); fixture.getSpiedAdaptor().listenToUris(SYMLINK_NAME); - verify(fixture._xdsClient).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_SYMLINK_NODE), any()); + verify(fixture._xdsClient).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), eq(XdsClient.ResourceType.NODE), any()); - XdsClient.D2SymlinkNodeResourceWatcher symlinkNodeWatcher = - (XdsClient.D2SymlinkNodeResourceWatcher) fixture._symlinkWatcherArgumentCaptor.getValue(); + XdsClient.SymlinkNodeResourceWatcher symlinkNodeWatcher = + (XdsClient.SymlinkNodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue(); symlinkNodeWatcher.onChanged(URI_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME)); - verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE_MAP), any()); + verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_URI_MAP), any()); - XdsClient.D2NodeMapResourceWatcher watcher = - (XdsClient.D2NodeMapResourceWatcher) fixture._uriWatcherArgumentCaptor.getValue(); + XdsClient.D2URIMapResourceWatcher watcher = + (XdsClient.D2URIMapResourceWatcher) fixture._uriWatcherArgumentCaptor.getValue(); watcher.onChanged(DUMMY_NODE_MAP_UPDATE); UriProperties uriProps = getDefaultUriProperties(PRIMARY_CLUSTER_NAME); @@ -155,7 +166,7 @@ public void testListenToUriSymlink() String primaryUriResourceName2 = URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2; symlinkNodeWatcher.onChanged(URI_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(primaryUriResourceName2)); - verify(fixture._xdsClient).watchXdsResource(eq(primaryUriResourceName2), eq(XdsClient.ResourceType.D2_NODE_MAP), any()); + verify(fixture._xdsClient).watchXdsResource(eq(primaryUriResourceName2), eq(XdsClient.ResourceType.D2_URI_MAP), any()); verifyUriUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME); // if the old primary cluster gets an update, it will be published under its original cluster name @@ -165,24 +176,57 @@ public void testListenToUriSymlink() verify(fixture._uriEventBus, times(2)).publishInitialize(PRIMARY_CLUSTER_NAME, uriProps); } + @Test + public void testURIPropertiesDeserialization() throws PropertySerializationException + { + URI localhost = URI.create("https://localhost:8443"); + XdsD2.D2URI uri = XdsD2.D2URI.newBuilder() + .setVersion(DUMMY_VERSION) + .setUri(localhost.toString()) + .setClusterName(PRIMARY_CLUSTER_NAME) + .setUriSpecificProperties(Struct.newBuilder() + .putFields("foo", Value.newBuilder().setStringValue("bar").build()) + .build()) + .putPartitionDesc(0, 42) + .putPartitionDesc(1, 27) + .build(); + + UriProperties properties = new UriPropertiesJsonSerializer().fromProto(uri); + Assert.assertEquals(properties.getClusterName(), PRIMARY_CLUSTER_NAME); + Assert.assertEquals(properties.getVersion(), DUMMY_VERSION); + Assert.assertEquals(properties.getUriSpecificProperties(), + Collections.singletonMap(localhost, Collections.singletonMap("foo", "bar"))); + Assert.assertEquals(properties.getPartitionDesc(), + Collections.singletonMap(localhost, ImmutableMap.of( + 0, new PartitionData(42), + 1, new PartitionData(27) + ))); + } + private static Value getProtoStringValue(String v) { return Value.newBuilder().setStringValue(v).build(); } - private static XdsClient.D2SymlinkNodeUpdate getSymlinkNodeUpdate(String primaryClusterResourceName) + private static XdsClient.NodeUpdate getSymlinkNodeUpdate(String primaryClusterResourceName) { - return new XdsClient.D2SymlinkNodeUpdate("", - XdsD2.D2SymlinkNode.newBuilder() - .setMasterClusterNodePath(primaryClusterResourceName) + return new XdsClient.NodeUpdate("", + XdsD2.Node.newBuilder() + .setData(ByteString.copyFromUtf8(primaryClusterResourceName)) .build() ); } - private static XdsClient.D2NodeUpdate getClusterNodeUpdate(String clusterName) + private static XdsClient.NodeUpdate getClusterNodeUpdate(String clusterName) { - return new XdsClient.D2NodeUpdate("", XdsD2.D2Node.newBuilder() - .setData(Struct.newBuilder().putFields("clusterName", getProtoStringValue(clusterName))) + return new XdsClient.NodeUpdate("", XdsD2.Node.newBuilder() + .setData( + ByteString.copyFrom( + new ClusterPropertiesJsonSerializer().toBytes( + new ClusterProperties(clusterName) + ) + ) + ) .setStat(XdsD2.Stat.newBuilder().setMzxid(1L).build()) .build() ); @@ -191,7 +235,7 @@ private static XdsClient.D2NodeUpdate getClusterNodeUpdate(String clusterName) private void verifyClusterNodeUpdate(XdsToD2PropertiesAdaptorFixture fixture, String clusterName, String symlinkName, ClusterStoreProperties expectedPublishProp) { - XdsClient.D2NodeResourceWatcher watcher = (XdsClient.D2NodeResourceWatcher) + XdsClient.NodeResourceWatcher watcher = (XdsClient.NodeResourceWatcher) fixture._clusterWatcherArgumentCaptor.getValue(); watcher.onChanged(getClusterNodeUpdate(clusterName)); verify(fixture._clusterEventBus).publishInitialize(clusterName, expectedPublishProp); @@ -203,7 +247,7 @@ private void verifyClusterNodeUpdate(XdsToD2PropertiesAdaptorFixture fixture, St private void verifyUriUpdate(XdsToD2PropertiesAdaptorFixture fixture, String clusterName, String symlinkName) { - XdsClient.D2NodeMapResourceWatcher watcher = (XdsClient.D2NodeMapResourceWatcher) + XdsClient.D2URIMapResourceWatcher watcher = (XdsClient.D2URIMapResourceWatcher) fixture._uriWatcherArgumentCaptor.getValue(); watcher.onChanged(DUMMY_NODE_MAP_UPDATE); UriProperties uriProps = getDefaultUriProperties(clusterName); @@ -243,11 +287,9 @@ private static class XdsToD2PropertiesAdaptorFixture XdsToD2PropertiesAdaptorFixture() { MockitoAnnotations.initMocks(this); - doNothing().when(_xdsClient).watchXdsResource(any(), eq(XdsClient.ResourceType.D2_SYMLINK_NODE), - _symlinkWatcherArgumentCaptor.capture()); - doNothing().when(_xdsClient).watchXdsResource(any(), eq(XdsClient.ResourceType.D2_NODE), + doNothing().when(_xdsClient).watchXdsResource(any(), eq(XdsClient.ResourceType.NODE), _clusterWatcherArgumentCaptor.capture()); - doNothing().when(_xdsClient).watchXdsResource(any(), eq(XdsClient.ResourceType.D2_NODE_MAP), + doNothing().when(_xdsClient).watchXdsResource(any(), eq(XdsClient.ResourceType.D2_URI_MAP), _uriWatcherArgumentCaptor.capture()); doNothing().when(_clusterEventBus).publishInitialize(any(), any()); doNothing().when(_serviceEventBus).publishInitialize(any(), any()); diff --git a/gradle.properties b/gradle.properties index 33ae9d13ae..567d162c59 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.46.9 +version=29.47.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true